From 2f566fb81ddfdd14b3b623ee6d1ff48d67e636b4 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 10 Oct 2018 09:41:25 -0400 Subject: [PATCH 01/15] Provide fallback for unexpected reply in collective show --- confluent_server/bin/collective | 2 +- .../confluent/config/configmanager.py | 21 ++++++++++--------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/confluent_server/bin/collective b/confluent_server/bin/collective index 5f0c78f6..c8d31e62 100644 --- a/confluent_server/bin/collective +++ b/confluent_server/bin/collective @@ -68,7 +68,7 @@ def join_collective(server, invitation): res = tlvdata.recv(s) res = res.get('collective', {'status': 'Unknown response: ' + repr(res)}) - print(res.get('status', res['error'])) + print(res.get('status', res.get('error', repr(res)))) def show_collective(): diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index bc114426..cf01c73e 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -63,6 +63,7 @@ import eventlet import eventlet.event as event import eventlet.green.select as select import eventlet.green.threading as gthread +import eventlet.greenpool as gpool import fnmatch import json import operator @@ -947,7 +948,7 @@ class ConfigManager(object): os.getenv('SystemDrive'), '\\ProgramData', 'confluent', 'cfg') else: _cfgdir = "/etc/confluent/cfg" - _cfgwriter = None + _cfgwriter = gpool.GreenPool(1) _writepending = False _syncrunning = False _syncstate = threading.RLock() @@ -2045,11 +2046,9 @@ class ConfigManager(object): @classmethod def wait_for_sync(cls, fullsync=False): - if cls._cfgwriter is not None: - cls._cfgwriter.join() + cls._cfgwriter.waitall() cls._bg_sync_to_file(fullsync) - if cls._cfgwriter is not None: - cls._cfgwriter.join() + cls._cfgwriter.waitall() @classmethod def shutdown(cls): @@ -2065,11 +2064,13 @@ class ConfigManager(object): cls._writepending = True return cls._syncrunning = True - # if the thread is exiting, join it to let it close, just in case - if cls._cfgwriter is not None: - cls._cfgwriter.join() - cls._cfgwriter = threading.Thread(target=cls._sync_to_file, args=(fullsync,)) - cls._cfgwriter.start() + cls._cfgwriter.spawn_n(cls._g_sync_to_file, fullsync) + + @classmethod + def _g_sync_to_file(cls, fullsync): + cls._sync_to_file(fullsync) + + @classmethod def _sync_to_file(cls, fullsync=False): From cf9d2a43e8b56c4fc77dc87db695505b0c3f459e Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 10 Oct 2018 09:44:06 -0400 Subject: [PATCH 02/15] Revert "Provide fallback for unexpected reply in collective show" This reverts commit 2f566fb81ddfdd14b3b623ee6d1ff48d67e636b4. --- confluent_server/bin/collective | 2 +- .../confluent/config/configmanager.py | 21 +++++++++---------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/confluent_server/bin/collective b/confluent_server/bin/collective index c8d31e62..5f0c78f6 100644 --- a/confluent_server/bin/collective +++ b/confluent_server/bin/collective @@ -68,7 +68,7 @@ def join_collective(server, invitation): res = tlvdata.recv(s) res = res.get('collective', {'status': 'Unknown response: ' + repr(res)}) - print(res.get('status', res.get('error', repr(res)))) + print(res.get('status', res['error'])) def show_collective(): diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index cf01c73e..bc114426 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -63,7 +63,6 @@ import eventlet import eventlet.event as event import eventlet.green.select as select import eventlet.green.threading as gthread -import eventlet.greenpool as gpool import fnmatch import json import operator @@ -948,7 +947,7 @@ class ConfigManager(object): os.getenv('SystemDrive'), '\\ProgramData', 'confluent', 'cfg') else: _cfgdir = "/etc/confluent/cfg" - _cfgwriter = gpool.GreenPool(1) + _cfgwriter = None _writepending = False _syncrunning = False _syncstate = threading.RLock() @@ -2046,9 +2045,11 @@ class ConfigManager(object): @classmethod def wait_for_sync(cls, fullsync=False): - cls._cfgwriter.waitall() + if cls._cfgwriter is not None: + cls._cfgwriter.join() cls._bg_sync_to_file(fullsync) - cls._cfgwriter.waitall() + if cls._cfgwriter is not None: + cls._cfgwriter.join() @classmethod def shutdown(cls): @@ -2064,13 +2065,11 @@ class ConfigManager(object): cls._writepending = True return cls._syncrunning = True - cls._cfgwriter.spawn_n(cls._g_sync_to_file, fullsync) - - @classmethod - def _g_sync_to_file(cls, fullsync): - cls._sync_to_file(fullsync) - - + # if the thread is exiting, join it to let it close, just in case + if cls._cfgwriter is not None: + cls._cfgwriter.join() + cls._cfgwriter = threading.Thread(target=cls._sync_to_file, args=(fullsync,)) + cls._cfgwriter.start() @classmethod def _sync_to_file(cls, fullsync=False): From d5c093a30d7d41de195fc577c4529787a4345dfd Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 10 Oct 2018 09:46:01 -0400 Subject: [PATCH 03/15] Provide fallback for unexpected reply in collective show --- confluent_server/bin/collective | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/confluent_server/bin/collective b/confluent_server/bin/collective index 5f0c78f6..c8d31e62 100644 --- a/confluent_server/bin/collective +++ b/confluent_server/bin/collective @@ -68,7 +68,7 @@ def join_collective(server, invitation): res = tlvdata.recv(s) res = res.get('collective', {'status': 'Unknown response: ' + repr(res)}) - print(res.get('status', res['error'])) + print(res.get('status', res.get('error', repr(res)))) def show_collective(): From b77ed8dbffe7d879d4d7dc32e127b41d6a695842 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 10 Oct 2018 13:07:27 -0400 Subject: [PATCH 04/15] Fix config sync on dead writer The sync thread can die without clearing syncrunning. Make sure that the thread is alive *and* that the thread has not indicated intent to give up. --- confluent_server/confluent/collective/manager.py | 2 +- confluent_server/confluent/config/configmanager.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 09961698..e8092529 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -82,7 +82,7 @@ def connect_to_leader(cert=None, name=None, leader=None): log.log({ 'info': 'Collective initialization in progress on ' '{0}, will retry connection'.format(leader), - 'subsystem': 'collective'}) + 'subsystem': 'collective'}) ####### bad idea? eventlet.spawn_after(random.random(), connect_to_leader, cert, name, leader) return True diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index bc114426..477c9104 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -2061,7 +2061,8 @@ class ConfigManager(object): if statelessmode: return with cls._syncstate: - if cls._syncrunning: + if (cls._syncrunning and cls._cfgwriter is not None and + cls._cfgwriter.isAlive()): cls._writepending = True return cls._syncrunning = True From 32ddb33de3c5a9ecc8741ad69c7fcabeaf18510b Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 10 Oct 2018 13:10:41 -0400 Subject: [PATCH 05/15] Fix error when trying to do fullsync without globals yet If globals is missing, then do not break the sync trying to handle it --- confluent_server/confluent/collective/manager.py | 2 +- confluent_server/confluent/config/configmanager.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index e8092529..09961698 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -82,7 +82,7 @@ def connect_to_leader(cert=None, name=None, leader=None): log.log({ 'info': 'Collective initialization in progress on ' '{0}, will retry connection'.format(leader), - 'subsystem': 'collective'}) ####### bad idea? + 'subsystem': 'collective'}) eventlet.spawn_after(random.random(), connect_to_leader, cert, name, leader) return True diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 477c9104..0ca84ab4 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -968,7 +968,7 @@ class ConfigManager(object): _cfgstore['main'] = {} self._bg_sync_to_file() self._cfgstore = _cfgstore['main'] - if 'nodegroups' not in self._cfgstore: + if 'nodegroups' not in self._cfgstore: # This can happen during a clear... it seams... and if so it messes up... self._cfgstore['nodegroups'] = {'everything': {'nodes': set()}} _mark_dirtykey('nodegroups', 'everything', self.tenant) self._bg_sync_to_file() @@ -2080,8 +2080,9 @@ class ConfigManager(object): _mkpath(cls._cfgdir) with open(os.path.join(cls._cfgdir, 'transactioncount'), 'w') as f: f.write(struct.pack('!Q', _txcount)) - if fullsync or 'dirtyglobals' in _cfgstore: - if fullsync: + if (fullsync or 'dirtyglobals' in _cfgstore and + 'globals' in _cfgstore): + if fullsync: # globals is not a given to be set.. dirtyglobals = _cfgstore['globals'] else: with _dirtylock: From 3b2b96a4cf298eb331cb44a1452bbb20f7eae495 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 10 Oct 2018 14:32:13 -0400 Subject: [PATCH 06/15] Force fullsync if dead sync thread likely If the sync thread died previously, force the next sync to be full. --- confluent_server/confluent/config/configmanager.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 0ca84ab4..afd45983 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -2065,6 +2065,9 @@ class ConfigManager(object): cls._cfgwriter.isAlive()): cls._writepending = True return + if cls._syncrunning: # This suggests an unclean write attempt, + # do a fullsync as a recovery + fullsync = True cls._syncrunning = True # if the thread is exiting, join it to let it close, just in case if cls._cfgwriter is not None: From 6a784e3a1c9ac7b15f3f9bc0ccaffe7177a06e8b Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 10 Oct 2018 14:49:33 -0400 Subject: [PATCH 07/15] Ensure sync is complete prior to leaving configmanager sync The initialization lock is meant to avoid collective and generic initialization stepping on each other. This is somewhat reduced in efficacy if one has a sync running while the other is changing relevant data. --- confluent_server/confluent/config/configmanager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index afd45983..cfd8f29d 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -626,7 +626,7 @@ def rollback_clear(): _cfgstore = _oldcfgstore _oldtxcount = 0 _oldcfgstore = None - ConfigManager._bg_sync_to_file() + ConfigManager.wait_for_sync(True) def clear_configuration(): @@ -990,6 +990,7 @@ class ConfigManager(object): if 'nodes' not in self._cfgstore: self._cfgstore['nodes'] = {} self._bg_sync_to_file() + self.wait_for_sync() def get_collective_member(self, name): return get_collective_member(name) From 6b70a4322a9d8d2f5c145e5b91508a46bc99090e Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 10 Oct 2018 15:22:20 -0400 Subject: [PATCH 08/15] Fix rollback The fix for the stale data introduced breaking clear rollback Restore the behavior and make self._cfgstore a somewhat slower property for now. --- confluent_server/confluent/config/configmanager.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index cfd8f29d..9275693f 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -638,10 +638,7 @@ def clear_configuration(): stop_following() _oldcfgstore = _cfgstore _oldtxcount = _txcount - if _cfgstore is None or 'main' not in _cfgstore: - _cfgstore = {} - else: - _cfgstore['main'].clear() + _cfgstore = {} _txcount = 0 def commit_clear(): @@ -955,6 +952,12 @@ class ConfigManager(object): _nodecollwatchers = {} _notifierids = {} + @property + def _cfgstore(self): + if self.tenant is None: + return _cfgstore['main'] + return _cfgstore['tenant'][self.tenant] + def __init__(self, tenant, decrypt=False, username=None): global _cfgstore with _initlock: @@ -967,7 +970,6 @@ class ConfigManager(object): if 'main' not in _cfgstore: _cfgstore['main'] = {} self._bg_sync_to_file() - self._cfgstore = _cfgstore['main'] if 'nodegroups' not in self._cfgstore: # This can happen during a clear... it seams... and if so it messes up... self._cfgstore['nodegroups'] = {'everything': {'nodes': set()}} _mark_dirtykey('nodegroups', 'everything', self.tenant) @@ -983,7 +985,6 @@ class ConfigManager(object): _cfgstore['tenant'][tenant] = {} self._bg_sync_to_file() self.tenant = tenant - self._cfgstore = _cfgstore['tenant'][tenant] if 'nodegroups' not in self._cfgstore: self._cfgstore['nodegroups'] = {'everything': {}} _mark_dirtykey('nodegroups', 'everything', self.tenant) From 2d0199a4e92d319c27ac32e96fd42e92cf317566 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 10 Oct 2018 15:24:55 -0400 Subject: [PATCH 09/15] Wrap bdb deletion in same lock that sync itself uses If os.remove happens at a bad time, it causes an unfortunate behavior in dbm. Serialize this sort of operation to avoid the bad behavior. --- confluent_server/confluent/config/configmanager.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 9275693f..b5fa19bb 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -646,12 +646,13 @@ def commit_clear(): global _oldcfgstore _oldcfgstore = None _oldtxcount = 0 - todelete = _config_areas + ('globals', 'collective', 'transactioncount') - for cfg in todelete: - try: - os.remove(os.path.join(ConfigManager._cfgdir, cfg)) - except OSError as oe: - pass + with _synclock: + todelete = _config_areas + ('globals', 'collective', 'transactioncount') + for cfg in todelete: + try: + os.remove(os.path.join(ConfigManager._cfgdir, cfg)) + except OSError as oe: + pass ConfigManager.wait_for_sync(True) ConfigManager._bg_sync_to_file() From be930fc0763633adac7898299ac731c8f25e380d Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 10 Oct 2018 16:30:28 -0400 Subject: [PATCH 10/15] Add missing subsystem marker from a collective log --- confluent_server/confluent/collective/manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 09961698..9e82b509 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -101,7 +101,7 @@ def connect_to_leader(cert=None, name=None, leader=None): log.log({'info': 'Prospective leader {0} has inferior ' 'transaction count, becoming leader' - ''.format(leader)}) + ''.format(leader), 'subsystem': 'collective'}) return become_leader(remote) print(keydata['error']) return False @@ -360,7 +360,7 @@ def handle_connection(connection, cert, request, local=False): tlvdata.send(connection, {'error': 'Refusing to be assimilated by inferior' 'transaction count', - 'txcount': cfm._txcount}) + 'txcount': cfm._txcount,}) return eventlet.spawn_n(connect_to_leader, None, None, leader=connection.getpeername()[0]) From 3012de1fe4c3d281ea6efdca1164a7f5f60a363b Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 11 Oct 2018 09:16:57 -0400 Subject: [PATCH 11/15] Prioritize deletion of transactioncount If the invalidation is incomplete, make sure that transactioncount is invalidated first to avoid it being able to propogate through a collective. --- confluent_server/confluent/config/configmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index b5fa19bb..4fcc0b71 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -647,7 +647,7 @@ def commit_clear(): _oldcfgstore = None _oldtxcount = 0 with _synclock: - todelete = _config_areas + ('globals', 'collective', 'transactioncount') + todelete = ('transactioncount', 'globals', 'collective') + _config_areas for cfg in todelete: try: os.remove(os.path.join(ConfigManager._cfgdir, cfg)) From f525c25ba637accfcbd582480fae132ffe72a0f2 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 11 Oct 2018 15:15:11 -0400 Subject: [PATCH 12/15] Provide more verbose collective logging This helps understand the flow in practice of collective behavior. --- confluent_server/confluent/collective/manager.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 9e82b509..63189aa5 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -317,6 +317,8 @@ def handle_connection(connection, cert, request, local=False): f = open('/etc/confluent/cfg/myname', 'w') f.write(name) f.close() + log.log({'info': 'Connecting to collective due to join', + 'subsystem': 'collective'}) eventlet.spawn_n(connect_to_leader, rsp['collective'][ 'fingerprint'], name) if 'enroll' == operation: @@ -362,6 +364,8 @@ def handle_connection(connection, cert, request, local=False): 'transaction count', 'txcount': cfm._txcount,}) return + log.log({'info': 'Connecting in response to assimilation', + 'subsystem': 'collective'}) eventlet.spawn_n(connect_to_leader, None, None, leader=connection.getpeername()[0]) tlvdata.send(connection, {'status': 0}) @@ -408,6 +412,8 @@ def handle_connection(connection, cert, request, local=False): {'error': 'Client has higher tranasaction count, ' 'should assimilate me, connecting..', 'txcount': cfm._txcount}) + log.log({'info': 'Connecting to leader due to superior ' + 'transaction count', 'subsystem': collective}) eventlet.spawn_n(connect_to_leader, None, None, connection.getpeername()[0]) connection.close() @@ -545,6 +551,8 @@ def start_collective(): if cfm.cfgleader is None: cfm.stop_following(True) ldrcandidate = cfm.get_collective_member(member)['address'] + log.log({'info': 'Performing startup attempt to {0}'.format( + ldrcandidate), 'subsystem': 'collective'}) if connect_to_leader(name=myname, leader=ldrcandidate): break else: From 3105b9b1f923de688733826646e36fe62fb6c0ca Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 12 Oct 2018 11:45:23 -0400 Subject: [PATCH 13/15] Significantly rework the collective startup behavior One, make the tracking bools enforce a lock to reduce confusion Treat an initializing peer as failed, to avoid getting too fixated on an uncertain target. Make sure that no more than one follower is tried at a time by killing before starting a new one, and syncing up the configmanager state Decline to act on an assimilation request if we are trying to connect and also if the current leader asks us to connect and we already are. Avoid calling get_leader while connecting, as that can cause a member to decide to become a leader while trying to connect, by swapping the reactions to the connect request. Avoid trying to assimilate existing followers. Fix some logging. --- .../confluent/collective/manager.py | 52 ++++++++++++++----- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 63189aa5..5e49a2cf 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -41,12 +41,15 @@ retrythread = None class ContextBool(object): def __init__(self): self.active = False + self.mylock = threading.RLock() def __enter__(self): self.active = True + self.mylock.__enter__() def __exit__(self, exc_type, exc_val, exc_tb): self.active = False + self.mylock.__exit__(exc_type, exc_val, exc_tb) connecting = ContextBool() leader_init = ContextBool() @@ -81,11 +84,9 @@ def connect_to_leader(cert=None, name=None, leader=None): if 'backoff' in keydata: log.log({ 'info': 'Collective initialization in progress on ' - '{0}, will retry connection'.format(leader), + '{0}'.format(leader), 'subsystem': 'collective'}) - eventlet.spawn_after(random.random(), connect_to_leader, - cert, name, leader) - return True + return False if 'leader' in keydata: log.log( {'info': 'Prospective leader {0} has redirected this ' @@ -101,13 +102,17 @@ def connect_to_leader(cert=None, name=None, leader=None): log.log({'info': 'Prospective leader {0} has inferior ' 'transaction count, becoming leader' - ''.format(leader), 'subsystem': 'collective'}) + ''.format(leader), 'subsystem': 'collective', + 'subsystem': 'collective'}) return become_leader(remote) - print(keydata['error']) return False follower.kill() cfm.stop_following() follower = None + if follower: + follower.kill() + cfm.stop_following() + follower = None log.log({'info': 'Following leader {0}'.format(leader), 'subsystem': 'collective'}) colldata = tlvdata.recv(remote) @@ -364,6 +369,18 @@ def handle_connection(connection, cert, request, local=False): 'transaction count', 'txcount': cfm._txcount,}) return + if connecting.active: + # don't try to connect while actively already trying to connect + tlvdata.send(connection, {'status': 0}) + connection.close() + return + if (currentleader == connection.getpeername()[0] and + follower and follower.isAlive()): + # if we are happily following this leader already, don't stir + # the pot + tlvdata.send(connection, {'status': 0}) + connection.close() + return log.log({'info': 'Connecting in response to assimilation', 'subsystem': 'collective'}) eventlet.spawn_n(connect_to_leader, None, None, @@ -394,6 +411,11 @@ def handle_connection(connection, cert, request, local=False): connection.close() return myself = connection.getsockname()[0] + if connecting.active: + tlvdata.send(connection, {'error': 'Connecting right now', + 'backoff': True}) + connection.close() + return if myself != get_leader(connection): tlvdata.send( connection, @@ -401,11 +423,6 @@ def handle_connection(connection, cert, request, local=False): 'in another castle', 'leader': currentleader}) connection.close() return - if connecting.active: - tlvdata.send(connection, {'error': 'Connecting right now', - 'backoff': True}) - connection.close() - return if request['txcount'] > cfm._txcount: retire_as_leader() tlvdata.send(connection, @@ -491,6 +508,12 @@ def try_assimilate(drone): def get_leader(connection): if currentleader is None or connection.getpeername()[0] == currentleader: + if currentleader is None: + msg = 'Becoming leader as no leader known' + else: + msg = 'Becoming leader because {0} attempted to connect and it ' \ + 'is current leader'.format(currentleader) + log.log({'info': msg, 'subsystem': 'collective'}) become_leader(connection) return currentleader @@ -507,6 +530,7 @@ def become_leader(connection): 'subsystem': 'collective'}) if follower: follower.kill() + cfm.stop_following() follower = None if retrythread: retrythread.cancel() @@ -514,9 +538,12 @@ def become_leader(connection): currentleader = connection.getsockname()[0] skipaddr = connection.getpeername()[0] myname = get_myname() + skipem = set(cfm.cfgstreams) + skipem.add(currentleader) + skipem.add(skipaddr) for member in cfm.list_collective(): dronecandidate = cfm.get_collective_member(member)['address'] - if dronecandidate in (currentleader, skipaddr) or member == myname: + if dronecandidate in skipem or member == myname: continue eventlet.spawn_n(try_assimilate, dronecandidate) @@ -533,6 +560,7 @@ def start_collective(): global retrythread if follower: follower.kill() + cfm.stop_following() follower = None try: if cfm.cfgstreams: From deb90fbca9b641aa4318c93d49595b76134f2eb2 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 12 Oct 2018 14:28:21 -0400 Subject: [PATCH 14/15] Fix trace on early console connect If the trace happens before tracelog is ready, just print the output to the stdout log for now. --- confluent_server/confluent/consoleserver.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index 8ab71308..76fc3d6f 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -359,8 +359,11 @@ class ConsoleHandler(object): except (exc.NotImplementedException, exc.NotFoundException): self._console = None except: - _tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, - event=log.Events.stacktrace) + if _tracelog: + _tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, + event=log.Events.stacktrace) + else: + print(traceback.format_exc()) if not isinstance(self._console, conapi.Console): self.clearbuffer() self.connectstate = 'unconnected' From e9ba49a4aa3f7a2c9f130b32a66735c112e7eaea Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 12 Oct 2018 15:46:54 -0400 Subject: [PATCH 15/15] Intercept another 'unexpected error' During a particularly hectic init, Invalid Session ID may occur if a command is ran particularly early. Intercept and replace a more clean message. --- confluent_server/confluent/plugins/hardwaremanagement/ipmi.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py index f6b2b32a..6b2da8ab 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py @@ -477,6 +477,10 @@ class IpmiHandler(object): self.output.put(msg.ConfluentTargetTimeout( self.node, self.error)) return + elif 'Invalid Session ID' in self.error: + self.output.put(msg.ConfluentTargetTimeout( + self.node, 'Temporary Login Error')) + return elif ('Unauthorized' in self.error or 'Incorrect password' in self.error): self.output.put(