From 3105b9b1f923de688733826646e36fe62fb6c0ca Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 12 Oct 2018 11:45:23 -0400 Subject: [PATCH] Significantly rework the collective startup behavior One, make the tracking bools enforce a lock to reduce confusion Treat an initializing peer as failed, to avoid getting too fixated on an uncertain target. Make sure that no more than one follower is tried at a time by killing before starting a new one, and syncing up the configmanager state Decline to act on an assimilation request if we are trying to connect and also if the current leader asks us to connect and we already are. Avoid calling get_leader while connecting, as that can cause a member to decide to become a leader while trying to connect, by swapping the reactions to the connect request. Avoid trying to assimilate existing followers. Fix some logging. --- .../confluent/collective/manager.py | 52 ++++++++++++++----- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 63189aa5..5e49a2cf 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -41,12 +41,15 @@ retrythread = None class ContextBool(object): def __init__(self): self.active = False + self.mylock = threading.RLock() def __enter__(self): self.active = True + self.mylock.__enter__() def __exit__(self, exc_type, exc_val, exc_tb): self.active = False + self.mylock.__exit__(exc_type, exc_val, exc_tb) connecting = ContextBool() leader_init = ContextBool() @@ -81,11 +84,9 @@ def connect_to_leader(cert=None, name=None, leader=None): if 'backoff' in keydata: log.log({ 'info': 'Collective initialization in progress on ' - '{0}, will retry connection'.format(leader), + '{0}'.format(leader), 'subsystem': 'collective'}) - eventlet.spawn_after(random.random(), connect_to_leader, - cert, name, leader) - return True + return False if 'leader' in keydata: log.log( {'info': 'Prospective leader {0} has redirected this ' @@ -101,13 +102,17 @@ def connect_to_leader(cert=None, name=None, leader=None): log.log({'info': 'Prospective leader {0} has inferior ' 'transaction count, becoming leader' - ''.format(leader), 'subsystem': 'collective'}) + ''.format(leader), 'subsystem': 'collective', + 'subsystem': 'collective'}) return become_leader(remote) - print(keydata['error']) return False follower.kill() cfm.stop_following() follower = None + if follower: + follower.kill() + cfm.stop_following() + follower = None log.log({'info': 'Following leader {0}'.format(leader), 'subsystem': 'collective'}) colldata = tlvdata.recv(remote) @@ -364,6 +369,18 @@ def handle_connection(connection, cert, request, local=False): 'transaction count', 'txcount': cfm._txcount,}) return + if connecting.active: + # don't try to connect while actively already trying to connect + tlvdata.send(connection, {'status': 0}) + connection.close() + return + if (currentleader == connection.getpeername()[0] and + follower and follower.isAlive()): + # if we are happily following this leader already, don't stir + # the pot + tlvdata.send(connection, {'status': 0}) + connection.close() + return log.log({'info': 'Connecting in response to assimilation', 'subsystem': 'collective'}) eventlet.spawn_n(connect_to_leader, None, None, @@ -394,6 +411,11 @@ def handle_connection(connection, cert, request, local=False): connection.close() return myself = connection.getsockname()[0] + if connecting.active: + tlvdata.send(connection, {'error': 'Connecting right now', + 'backoff': True}) + connection.close() + return if myself != get_leader(connection): tlvdata.send( connection, @@ -401,11 +423,6 @@ def handle_connection(connection, cert, request, local=False): 'in another castle', 'leader': currentleader}) connection.close() return - if connecting.active: - tlvdata.send(connection, {'error': 'Connecting right now', - 'backoff': True}) - connection.close() - return if request['txcount'] > cfm._txcount: retire_as_leader() tlvdata.send(connection, @@ -491,6 +508,12 @@ def try_assimilate(drone): def get_leader(connection): if currentleader is None or connection.getpeername()[0] == currentleader: + if currentleader is None: + msg = 'Becoming leader as no leader known' + else: + msg = 'Becoming leader because {0} attempted to connect and it ' \ + 'is current leader'.format(currentleader) + log.log({'info': msg, 'subsystem': 'collective'}) become_leader(connection) return currentleader @@ -507,6 +530,7 @@ def become_leader(connection): 'subsystem': 'collective'}) if follower: follower.kill() + cfm.stop_following() follower = None if retrythread: retrythread.cancel() @@ -514,9 +538,12 @@ def become_leader(connection): currentleader = connection.getsockname()[0] skipaddr = connection.getpeername()[0] myname = get_myname() + skipem = set(cfm.cfgstreams) + skipem.add(currentleader) + skipem.add(skipaddr) for member in cfm.list_collective(): dronecandidate = cfm.get_collective_member(member)['address'] - if dronecandidate in (currentleader, skipaddr) or member == myname: + if dronecandidate in skipem or member == myname: continue eventlet.spawn_n(try_assimilate, dronecandidate) @@ -533,6 +560,7 @@ def start_collective(): global retrythread if follower: follower.kill() + cfm.stop_following() follower = None try: if cfm.cfgstreams: