From 8fc3b7c9c0bcadfb980cef1b0ae8d18ea863cad4 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 7 Oct 2019 15:41:38 -0400 Subject: [PATCH] Implement cross-python collective compat This enables cross-version compatibility for a collective. --- .../confluent/collective/manager.py | 23 ++++++++--- .../confluent/config/configmanager.py | 40 +++++++++++++------ confluent_server/confluent/core.py | 25 ++++++++---- confluent_server/confluent/sockapi.py | 3 +- 4 files changed, 65 insertions(+), 26 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 5e49a2cf..8f3b9f15 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -27,6 +27,7 @@ import eventlet.green.ssl as ssl import eventlet.green.threading as threading import greenlet import random +import sys try: import OpenSSL.crypto as crypto except ImportError: @@ -70,11 +71,22 @@ def connect_to_leader(cert=None, name=None, leader=None): return False with connecting: with cfm._initlock: - tlvdata.recv(remote) # the banner + banner = tlvdata.recv(remote) # the banner + vers = banner.split()[2] + pvers = 0 + reqver = 4 + if vers == 'v0': + pvers = 2 + elif vers == 'v1': + pvers = 4 + if sys.version_info[0] < 3: + pvers = 2 + reqver = 2 tlvdata.recv(remote) # authpassed... 0.. if name is None: name = get_myname() tlvdata.send(remote, {'collective': {'operation': 'connect', + 'protover': reqver, 'name': name, 'txcount': cfm._txcount}}) keydata = tlvdata.recv(remote) @@ -148,15 +160,15 @@ def connect_to_leader(cert=None, name=None, leader=None): raise currentleader = leader #spawn this as a thread... - follower = eventlet.spawn(follow_leader, remote) + follower = eventlet.spawn(follow_leader, remote, pvers) return True -def follow_leader(remote): +def follow_leader(remote, proto): global currentleader cleanexit = False try: - cfm.follow_channel(remote) + cfm.follow_channel(remote, proto) except greenlet.GreenletExit: cleanexit = True finally: @@ -402,6 +414,7 @@ def handle_connection(connection, cert, request, local=False): tlvdata.send(connection, collinfo) if 'connect' == operation: drone = request['name'] + folver = request.get('protover', 2) droneinfo = cfm.get_collective_member(drone) if not (droneinfo and util.cert_matches(droneinfo['fingerprint'], cert)): @@ -450,7 +463,7 @@ def handle_connection(connection, cert, request, local=False): connection.sendall(cfgdata) #tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, # so far unused anyway - if not cfm.relay_slaved_requests(drone, connection): + if not cfm.relay_slaved_requests(drone, connection, folver): if not retrythread: # start a recovery if everyone else seems # to have disappeared retrythread = eventlet.spawn_after(30 + random.random(), diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 7d558404..f727a68f 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -101,6 +101,10 @@ _cfgstore = None _pendingchangesets = {} _txcount = 0 _hasquorum = True +if sys.version_info[0] >= 3: + lowestver = 4 +else: + lowestver = 2 _attraliases = { 'bmc': 'hardwaremanagement.manager', @@ -314,7 +318,7 @@ def exec_on_leader(function, *args): xid = os.urandom(8) _pendingchangesets[xid] = event.Event() rpcpayload = cPickle.dumps({'function': function, 'args': args, - 'xid': xid}) + 'xid': xid}, protocol=cfgproto) rpclen = len(rpcpayload) cfgleader.sendall(struct.pack('!Q', rpclen)) cfgleader.sendall(rpcpayload) @@ -331,7 +335,7 @@ def exec_on_followers(fnname, *args): pushes = eventlet.GreenPool() _txcount += 1 payload = cPickle.dumps({'function': fnname, 'args': args, - 'txcount': _txcount}) + 'txcount': _txcount}, protocol=lowestver) for res in pushes.starmap( _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): pass @@ -546,9 +550,14 @@ def set_global(globalname, value, sync=True): ConfigManager._bg_sync_to_file() cfgstreams = {} -def relay_slaved_requests(name, listener): +def relay_slaved_requests(name, listener, vers): global cfgleader global _hasquorum + global lowestver + if vers > 2 and sys.version_info[0] < 3: + vers = 2 + if vers < lowestver: + lowestver = vers pushes = eventlet.GreenPool() if name not in _followerlocks: _followerlocks[name] = gthread.RLock() @@ -565,7 +574,7 @@ def relay_slaved_requests(name, listener): lh = StreamHandler(listener) _hasquorum = len(cfgstreams) >= ( len(_cfgstore['collective']) // 2) - payload = cPickle.dumps({'quorum': _hasquorum}) + payload = cPickle.dumps({'quorum': _hasquorum}, protocol=vers) for _ in pushes.starmap( _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): @@ -592,7 +601,7 @@ def relay_slaved_requests(name, listener): exc = e if 'xid' in rpc: _push_rpc(listener, cPickle.dumps({'xid': rpc['xid'], - 'exc': exc})) + 'exc': exc}, protocol=vers)) try: msg = lh.get_next_msg() except Exception: @@ -609,7 +618,7 @@ def relay_slaved_requests(name, listener): if cfgstreams: _hasquorum = len(cfgstreams) >= ( len(_cfgstore['collective']) // 2) - payload = cPickle.dumps({'quorum': _hasquorum}) + payload = cPickle.dumps({'quorum': _hasquorum}, protocol=vers) for _ in pushes.starmap( _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): @@ -649,15 +658,19 @@ class StreamHandler(object): self.sock = None -def stop_following(replacement=None): +def stop_following(replacement=None, proto=2): with _leaderlock: global cfgleader + global cfgproto if cfgleader and not isinstance(cfgleader, bool): try: cfgleader.close() except Exception: pass cfgleader = replacement + if proto > 2 and sys.version_info[0] < 3: + proto = 2 + cfgproto = proto def stop_leading(): for stream in list(cfgstreams): @@ -715,14 +728,15 @@ def commit_clear(): ConfigManager._bg_sync_to_file() cfgleader = None +cfgproto = 2 -def follow_channel(channel): +def follow_channel(channel, proto=2): global _txcount global _hasquorum try: stop_leading() - stop_following(channel) + stop_following(channel, proto) lh = StreamHandler(channel) msg = lh.get_next_msg() while msg: @@ -2326,7 +2340,7 @@ class ConfigManager(object): for globalkey in dirtyglobals: if globalkey in _cfgstore['globals']: globalf[globalkey] = \ - cPickle.dumps(_cfgstore['globals'][globalkey]) + cPickle.dumps(_cfgstore['globals'][globalkey], protocol=cPickle.HIGHEST_PROTOCOL) else: if globalkey in globalf: del globalf[globalkey] @@ -2345,7 +2359,7 @@ class ConfigManager(object): for coll in colls: if coll in _cfgstore['collective']: collectivef[coll] = cPickle.dumps( - _cfgstore['collective'][coll]) + _cfgstore['collective'][coll], protocol=cPickle.HIGHEST_PROTOCOL) else: if coll in collectivef: del globalf[coll] @@ -2359,7 +2373,7 @@ class ConfigManager(object): dbf = dbm.open(os.path.join(pathname, category), 'c', 384) # 0600 try: for ck in currdict[category]: - dbf[ck] = cPickle.dumps(currdict[category][ck]) + dbf[ck] = cPickle.dumps(currdict[category][ck], protocol=cPickle.HIGHEST_PROTOCOL) finally: dbf.close() elif 'dirtykeys' in _cfgstore: @@ -2383,7 +2397,7 @@ class ConfigManager(object): if ck in dbf: del dbf[ck] else: - dbf[ck] = cPickle.dumps(currdict[category][ck]) + dbf[ck] = cPickle.dumps(currdict[category][ck], protocol=cPickle.HIGHEST_PROTOCOL) finally: dbf.close() willrun = False diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index a92e9439..d43897ef 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -687,6 +687,9 @@ def handle_dispatch(connection, cert, dispatch, peername): cfm.get_collective_member(peername)['fingerprint'], cert): connection.close() return + pversion = 0 + if bytearray(dispatch)[0] == 0x80: + pversion = bytearray(dispatch)[1] dispatch = pickle.loads(dispatch) configmanager = cfm.ConfigManager(dispatch['tenant']) nodes = dispatch['nodes'] @@ -723,18 +726,18 @@ def handle_dispatch(connection, cert, dispatch, peername): configmanager=configmanager, inputdata=inputdata)) for res in itertools.chain(*passvalues): - _forward_rsp(connection, res) + _forward_rsp(connection, res, pversion) except Exception as res: - _forward_rsp(connection, res) + _forward_rsp(connection, res, pversion) connection.sendall('\x00\x00\x00\x00\x00\x00\x00\x00') -def _forward_rsp(connection, res): +def _forward_rsp(connection, res, pversion): try: - r = pickle.dumps(res) + r = pickle.dumps(res, protocol=pversion) except TypeError: r = pickle.dumps(Exception( - 'Cannot serialize error, check collective.manager error logs for details' + str(res))) + 'Cannot serialize error, check collective.manager error logs for details' + str(res)), protocol=pversion) rlen = len(r) if not rlen: return @@ -963,12 +966,20 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata, if not util.cert_matches(a['fingerprint'], remote.getpeercert( binary_form=True)): raise Exception("Invalid certificate on peer") - tlvdata.recv(remote) + banner = tlvdata.recv(remote) + vers = banner.split()[2] + if vers == 'v0': + pvers = 2 + elif vers == 'v1': + pvers = 4 + if sys.version_info[0] < 3: + pvers = 2 tlvdata.recv(remote) myname = collective.get_myname() dreq = pickle.dumps({'name': myname, 'nodes': list(nodes), 'path': element,'tenant': configmanager.tenant, - 'operation': operation, 'inputdata': inputdata}) + 'operation': operation, 'inputdata': inputdata}, + version=pvers) tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}}) remote.sendall(dreq) while True: diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 76ec6131..154adfc2 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -123,7 +123,8 @@ def sessionhdl(connection, authname, skipauth=False, cert=None): if authdata: cfm = authdata[1] authenticated = True - send_data(connection, "Confluent -- v0 --") + # version 0 == original, version 1 == pickle3 allowed + send_data(connection, "Confluent -- v{0} --".format(sys.version_info[0] - 2)) while not authenticated: # prompt for name and passphrase send_data(connection, {'authpassed': 0}) response = tlvdata.recv(connection)