diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 63189aa5..5e49a2cf 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -41,12 +41,15 @@ retrythread = None class ContextBool(object): def __init__(self): self.active = False + self.mylock = threading.RLock() def __enter__(self): self.active = True + self.mylock.__enter__() def __exit__(self, exc_type, exc_val, exc_tb): self.active = False + self.mylock.__exit__(exc_type, exc_val, exc_tb) connecting = ContextBool() leader_init = ContextBool() @@ -81,11 +84,9 @@ def connect_to_leader(cert=None, name=None, leader=None): if 'backoff' in keydata: log.log({ 'info': 'Collective initialization in progress on ' - '{0}, will retry connection'.format(leader), + '{0}'.format(leader), 'subsystem': 'collective'}) - eventlet.spawn_after(random.random(), connect_to_leader, - cert, name, leader) - return True + return False if 'leader' in keydata: log.log( {'info': 'Prospective leader {0} has redirected this ' @@ -101,13 +102,17 @@ def connect_to_leader(cert=None, name=None, leader=None): log.log({'info': 'Prospective leader {0} has inferior ' 'transaction count, becoming leader' - ''.format(leader), 'subsystem': 'collective'}) + ''.format(leader), 'subsystem': 'collective', + 'subsystem': 'collective'}) return become_leader(remote) - print(keydata['error']) return False follower.kill() cfm.stop_following() follower = None + if follower: + follower.kill() + cfm.stop_following() + follower = None log.log({'info': 'Following leader {0}'.format(leader), 'subsystem': 'collective'}) colldata = tlvdata.recv(remote) @@ -364,6 +369,18 @@ def handle_connection(connection, cert, request, local=False): 'transaction count', 'txcount': cfm._txcount,}) return + if connecting.active: + # don't try to connect while actively already trying to connect + tlvdata.send(connection, {'status': 0}) + connection.close() + return + if (currentleader == connection.getpeername()[0] and + follower and follower.isAlive()): + # if we are happily following this leader already, don't stir + # the pot + tlvdata.send(connection, {'status': 0}) + connection.close() + return log.log({'info': 'Connecting in response to assimilation', 'subsystem': 'collective'}) eventlet.spawn_n(connect_to_leader, None, None, @@ -394,6 +411,11 @@ def handle_connection(connection, cert, request, local=False): connection.close() return myself = connection.getsockname()[0] + if connecting.active: + tlvdata.send(connection, {'error': 'Connecting right now', + 'backoff': True}) + connection.close() + return if myself != get_leader(connection): tlvdata.send( connection, @@ -401,11 +423,6 @@ def handle_connection(connection, cert, request, local=False): 'in another castle', 'leader': currentleader}) connection.close() return - if connecting.active: - tlvdata.send(connection, {'error': 'Connecting right now', - 'backoff': True}) - connection.close() - return if request['txcount'] > cfm._txcount: retire_as_leader() tlvdata.send(connection, @@ -491,6 +508,12 @@ def try_assimilate(drone): def get_leader(connection): if currentleader is None or connection.getpeername()[0] == currentleader: + if currentleader is None: + msg = 'Becoming leader as no leader known' + else: + msg = 'Becoming leader because {0} attempted to connect and it ' \ + 'is current leader'.format(currentleader) + log.log({'info': msg, 'subsystem': 'collective'}) become_leader(connection) return currentleader @@ -507,6 +530,7 @@ def become_leader(connection): 'subsystem': 'collective'}) if follower: follower.kill() + cfm.stop_following() follower = None if retrythread: retrythread.cancel() @@ -514,9 +538,12 @@ def become_leader(connection): currentleader = connection.getsockname()[0] skipaddr = connection.getpeername()[0] myname = get_myname() + skipem = set(cfm.cfgstreams) + skipem.add(currentleader) + skipem.add(skipaddr) for member in cfm.list_collective(): dronecandidate = cfm.get_collective_member(member)['address'] - if dronecandidate in (currentleader, skipaddr) or member == myname: + if dronecandidate in skipem or member == myname: continue eventlet.spawn_n(try_assimilate, dronecandidate) @@ -533,6 +560,7 @@ def start_collective(): global retrythread if follower: follower.kill() + cfm.stop_following() follower = None try: if cfm.cfgstreams: