From efaf1dae70e9cecd075a192612eea512dc2ea6f6 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 13 Jul 2018 09:05:28 -0400 Subject: [PATCH] Make cfgleader modifications more robust If cfgleader is about to forget a socket, explicitly try to close it first. --- .../confluent/collective/manager.py | 7 +++--- .../confluent/config/configmanager.py | 22 +++++++++++++++---- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index b05c4df3..c681e3bd 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -101,7 +101,7 @@ def connect_to_leader(cert=None, name=None, leader=None): if not ndata: raise Exception("Error doing initial DB transfer") dbjson += ndata - cfm.cfgleader = None + cfm.stop_following() cfm.clear_configuration() try: cfm._restore_keys(keydata, None, sync=False) @@ -128,7 +128,7 @@ def follow_leader(remote): global currentleader cfm.follow_channel(remote) # The leader has folded, time to startup again... - remote.close() + cfm.stop_following() currentleader = None eventlet.spawn_n(start_collective) @@ -319,7 +319,6 @@ def handle_connection(connection, cert, request, local=False): connection.sendall(cfgdata) #tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, # so far unused anyway - cfm.cfgleader = None cfm.relay_slaved_requests(drone, connection) # ok, we have a connecting member whose certificate checks out # He needs to bootstrap his configuration and subscribe it to updates @@ -386,7 +385,7 @@ def start_collective(): if member == myname: continue if cfm.cfgleader is None: - cfm.cfgleader = True + cfm.stop_following(True) ldrcandidate = cfm.get_collective_member(member)['address'] if connect_to_leader(name=myname, leader=ldrcandidate): break diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index dcf1742f..a6c7a865 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -61,6 +61,7 @@ import cPickle import errno import eventlet import eventlet.event as event +import eventlet.green.threading as gthread import fnmatch import json import operator @@ -77,6 +78,7 @@ import traceback _masterkey = None _masterintegritykey = None _dirtylock = threading.RLock() +_leaderlock = gthread.RLock() _config_areas = ('nodegroups', 'nodes', 'usergroups', 'users') tracelog = None statelessmode = False @@ -460,6 +462,7 @@ def set_global(globalname, value, sync=True): cfgstreams = {} def relay_slaved_requests(name, listener): global cfgleader + stop_following() cfgstreams[name] = listener msg = listener.recv(8) while msg: @@ -484,9 +487,19 @@ def relay_slaved_requests(name, listener): except KeyError: pass # May have already been closed/deleted... if not cfgstreams and not cfgleader: - cfgleader = True + stop_following(True) +def stop_following(replacement=None): + with _leaderlock: + global cfgleader + if cfgleader and not isinstance(cfgleader, bool): + try: + cfgleader.close() + except Exception: + pass + cfgleader = replacement + def stop_leading(): for stream in list(cfgstreams): cfgstreams[stream].close() @@ -537,10 +550,11 @@ def commit_clear(): ConfigManager._bg_sync_to_file() cfgleader = None + + def follow_channel(channel): - global cfgleader global _txcount - cfgleader = channel + stop_following(channel) msg = channel.recv(8) while msg: sz = struct.unpack('!Q', msg)[0] @@ -560,7 +574,7 @@ def follow_channel(channel): _pendingchangesets[rpc['xid']].send() msg = channel.recv(8) # mark the connection as broken - cfgleader = True + stop_following(True) def add_collective_member(name, address, fingerprint):