From 7433dd3e387b862724f1edb3f6a136545f1e86c5 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 26 Jun 2018 10:13:27 -0400 Subject: [PATCH] Wrap cfg init on follow in lock Use a lock to provide more atomic behavior for connecting should something go wrong in calling connect_to_leader incorrectly. --- .../confluent/collective/manager.py | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 7f95549b..d7f5266d 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -22,6 +22,7 @@ import confluent.util as util import eventlet import eventlet.green.socket as socket import eventlet.green.ssl as ssl +import eventlet.green.threading as threading try: import OpenSSL.crypto as crypto except ImportError: @@ -30,16 +31,20 @@ except ImportError: crypto = None currentleader = None +cfginitlock = None def connect_to_leader(cert=None, name=None, leader=None): global currentleader + global cfginitlock + if cfginitlock is None: + cfginitlock = threading.RLock() if leader is None: leader = currentleader try: remote = connect_to_collective(cert, leader) except socket.error: - return + return False tlvdata.recv(remote) # the banner tlvdata.recv(remote) # authpassed... 0.. if name is None: @@ -64,18 +69,19 @@ def connect_to_leader(cert=None, name=None, leader=None): if not ndata: raise Exception("Error doing initial DB transfer") dbjson += ndata - cfm.cfgleader = None - cfm.clear_configuration() - cfm._restore_keys(keydata, None) - for c in colldata: - cfm.add_collective_member(c, colldata[c]['address'], - colldata[c]['fingerprint']) - cfm._cfgstore['collective'] = colldata - for globvar in globaldata: - cfm.set_global(globvar, globaldata[globvar]) - cfm.ConfigManager(tenant=None)._load_from_json(dbjson) - cfm.ConfigManager._bg_sync_to_file() - currentleader = leader + with cfginitlock: + cfm.cfgleader = None + cfm.clear_configuration() + cfm._restore_keys(keydata, None) + for c in colldata: + cfm.add_collective_member(c, colldata[c]['address'], + colldata[c]['fingerprint']) + cfm._cfgstore['collective'] = colldata + for globvar in globaldata: + cfm.set_global(globvar, globaldata[globvar]) + cfm.ConfigManager(tenant=None)._load_from_json(dbjson) + cfm.ConfigManager._bg_sync_to_file() + currentleader = leader cfm.follow_channel(remote) # The leader has folded, time to startup again... eventlet.spawn_n(start_collective) @@ -260,10 +266,13 @@ def become_leader(connection): def startup(): + global cfginitlock members = list(cfm.list_collective()) if len(members) < 2: # Not in collective mode, return return + if cfginitlock is None: + cfginitlock = threading.RLock() eventlet.spawn_n(start_collective) def start_collective():