mirror of
https://github.com/xcat2/confluent.git
synced 2026-04-12 20:01:30 +00:00
Implement cross-python collective compat
This enables cross-version compatibility for a collective.
This commit is contained in:
@@ -27,6 +27,7 @@ import eventlet.green.ssl as ssl
|
||||
import eventlet.green.threading as threading
|
||||
import greenlet
|
||||
import random
|
||||
import sys
|
||||
try:
|
||||
import OpenSSL.crypto as crypto
|
||||
except ImportError:
|
||||
@@ -70,11 +71,22 @@ def connect_to_leader(cert=None, name=None, leader=None):
|
||||
return False
|
||||
with connecting:
|
||||
with cfm._initlock:
|
||||
tlvdata.recv(remote) # the banner
|
||||
banner = tlvdata.recv(remote) # the banner
|
||||
vers = banner.split()[2]
|
||||
pvers = 0
|
||||
reqver = 4
|
||||
if vers == 'v0':
|
||||
pvers = 2
|
||||
elif vers == 'v1':
|
||||
pvers = 4
|
||||
if sys.version_info[0] < 3:
|
||||
pvers = 2
|
||||
reqver = 2
|
||||
tlvdata.recv(remote) # authpassed... 0..
|
||||
if name is None:
|
||||
name = get_myname()
|
||||
tlvdata.send(remote, {'collective': {'operation': 'connect',
|
||||
'protover': reqver,
|
||||
'name': name,
|
||||
'txcount': cfm._txcount}})
|
||||
keydata = tlvdata.recv(remote)
|
||||
@@ -148,15 +160,15 @@ def connect_to_leader(cert=None, name=None, leader=None):
|
||||
raise
|
||||
currentleader = leader
|
||||
#spawn this as a thread...
|
||||
follower = eventlet.spawn(follow_leader, remote)
|
||||
follower = eventlet.spawn(follow_leader, remote, pvers)
|
||||
return True
|
||||
|
||||
|
||||
def follow_leader(remote):
|
||||
def follow_leader(remote, proto):
|
||||
global currentleader
|
||||
cleanexit = False
|
||||
try:
|
||||
cfm.follow_channel(remote)
|
||||
cfm.follow_channel(remote, proto)
|
||||
except greenlet.GreenletExit:
|
||||
cleanexit = True
|
||||
finally:
|
||||
@@ -402,6 +414,7 @@ def handle_connection(connection, cert, request, local=False):
|
||||
tlvdata.send(connection, collinfo)
|
||||
if 'connect' == operation:
|
||||
drone = request['name']
|
||||
folver = request.get('protover', 2)
|
||||
droneinfo = cfm.get_collective_member(drone)
|
||||
if not (droneinfo and util.cert_matches(droneinfo['fingerprint'],
|
||||
cert)):
|
||||
@@ -450,7 +463,7 @@ def handle_connection(connection, cert, request, local=False):
|
||||
connection.sendall(cfgdata)
|
||||
#tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now,
|
||||
# so far unused anyway
|
||||
if not cfm.relay_slaved_requests(drone, connection):
|
||||
if not cfm.relay_slaved_requests(drone, connection, folver):
|
||||
if not retrythread: # start a recovery if everyone else seems
|
||||
# to have disappeared
|
||||
retrythread = eventlet.spawn_after(30 + random.random(),
|
||||
|
||||
@@ -101,6 +101,10 @@ _cfgstore = None
|
||||
_pendingchangesets = {}
|
||||
_txcount = 0
|
||||
_hasquorum = True
|
||||
if sys.version_info[0] >= 3:
|
||||
lowestver = 4
|
||||
else:
|
||||
lowestver = 2
|
||||
|
||||
_attraliases = {
|
||||
'bmc': 'hardwaremanagement.manager',
|
||||
@@ -314,7 +318,7 @@ def exec_on_leader(function, *args):
|
||||
xid = os.urandom(8)
|
||||
_pendingchangesets[xid] = event.Event()
|
||||
rpcpayload = cPickle.dumps({'function': function, 'args': args,
|
||||
'xid': xid})
|
||||
'xid': xid}, protocol=cfgproto)
|
||||
rpclen = len(rpcpayload)
|
||||
cfgleader.sendall(struct.pack('!Q', rpclen))
|
||||
cfgleader.sendall(rpcpayload)
|
||||
@@ -331,7 +335,7 @@ def exec_on_followers(fnname, *args):
|
||||
pushes = eventlet.GreenPool()
|
||||
_txcount += 1
|
||||
payload = cPickle.dumps({'function': fnname, 'args': args,
|
||||
'txcount': _txcount})
|
||||
'txcount': _txcount}, protocol=lowestver)
|
||||
for res in pushes.starmap(
|
||||
_push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]):
|
||||
pass
|
||||
@@ -546,9 +550,14 @@ def set_global(globalname, value, sync=True):
|
||||
ConfigManager._bg_sync_to_file()
|
||||
|
||||
cfgstreams = {}
|
||||
def relay_slaved_requests(name, listener):
|
||||
def relay_slaved_requests(name, listener, vers):
|
||||
global cfgleader
|
||||
global _hasquorum
|
||||
global lowestver
|
||||
if vers > 2 and sys.version_info[0] < 3:
|
||||
vers = 2
|
||||
if vers < lowestver:
|
||||
lowestver = vers
|
||||
pushes = eventlet.GreenPool()
|
||||
if name not in _followerlocks:
|
||||
_followerlocks[name] = gthread.RLock()
|
||||
@@ -565,7 +574,7 @@ def relay_slaved_requests(name, listener):
|
||||
lh = StreamHandler(listener)
|
||||
_hasquorum = len(cfgstreams) >= (
|
||||
len(_cfgstore['collective']) // 2)
|
||||
payload = cPickle.dumps({'quorum': _hasquorum})
|
||||
payload = cPickle.dumps({'quorum': _hasquorum}, protocol=vers)
|
||||
for _ in pushes.starmap(
|
||||
_push_rpc,
|
||||
[(cfgstreams[s], payload) for s in cfgstreams]):
|
||||
@@ -592,7 +601,7 @@ def relay_slaved_requests(name, listener):
|
||||
exc = e
|
||||
if 'xid' in rpc:
|
||||
_push_rpc(listener, cPickle.dumps({'xid': rpc['xid'],
|
||||
'exc': exc}))
|
||||
'exc': exc}, protocol=vers))
|
||||
try:
|
||||
msg = lh.get_next_msg()
|
||||
except Exception:
|
||||
@@ -609,7 +618,7 @@ def relay_slaved_requests(name, listener):
|
||||
if cfgstreams:
|
||||
_hasquorum = len(cfgstreams) >= (
|
||||
len(_cfgstore['collective']) // 2)
|
||||
payload = cPickle.dumps({'quorum': _hasquorum})
|
||||
payload = cPickle.dumps({'quorum': _hasquorum}, protocol=vers)
|
||||
for _ in pushes.starmap(
|
||||
_push_rpc,
|
||||
[(cfgstreams[s], payload) for s in cfgstreams]):
|
||||
@@ -649,15 +658,19 @@ class StreamHandler(object):
|
||||
self.sock = None
|
||||
|
||||
|
||||
def stop_following(replacement=None):
|
||||
def stop_following(replacement=None, proto=2):
|
||||
with _leaderlock:
|
||||
global cfgleader
|
||||
global cfgproto
|
||||
if cfgleader and not isinstance(cfgleader, bool):
|
||||
try:
|
||||
cfgleader.close()
|
||||
except Exception:
|
||||
pass
|
||||
cfgleader = replacement
|
||||
if proto > 2 and sys.version_info[0] < 3:
|
||||
proto = 2
|
||||
cfgproto = proto
|
||||
|
||||
def stop_leading():
|
||||
for stream in list(cfgstreams):
|
||||
@@ -715,14 +728,15 @@ def commit_clear():
|
||||
ConfigManager._bg_sync_to_file()
|
||||
|
||||
cfgleader = None
|
||||
cfgproto = 2
|
||||
|
||||
|
||||
def follow_channel(channel):
|
||||
def follow_channel(channel, proto=2):
|
||||
global _txcount
|
||||
global _hasquorum
|
||||
try:
|
||||
stop_leading()
|
||||
stop_following(channel)
|
||||
stop_following(channel, proto)
|
||||
lh = StreamHandler(channel)
|
||||
msg = lh.get_next_msg()
|
||||
while msg:
|
||||
@@ -2326,7 +2340,7 @@ class ConfigManager(object):
|
||||
for globalkey in dirtyglobals:
|
||||
if globalkey in _cfgstore['globals']:
|
||||
globalf[globalkey] = \
|
||||
cPickle.dumps(_cfgstore['globals'][globalkey])
|
||||
cPickle.dumps(_cfgstore['globals'][globalkey], protocol=cPickle.HIGHEST_PROTOCOL)
|
||||
else:
|
||||
if globalkey in globalf:
|
||||
del globalf[globalkey]
|
||||
@@ -2345,7 +2359,7 @@ class ConfigManager(object):
|
||||
for coll in colls:
|
||||
if coll in _cfgstore['collective']:
|
||||
collectivef[coll] = cPickle.dumps(
|
||||
_cfgstore['collective'][coll])
|
||||
_cfgstore['collective'][coll], protocol=cPickle.HIGHEST_PROTOCOL)
|
||||
else:
|
||||
if coll in collectivef:
|
||||
del globalf[coll]
|
||||
@@ -2359,7 +2373,7 @@ class ConfigManager(object):
|
||||
dbf = dbm.open(os.path.join(pathname, category), 'c', 384) # 0600
|
||||
try:
|
||||
for ck in currdict[category]:
|
||||
dbf[ck] = cPickle.dumps(currdict[category][ck])
|
||||
dbf[ck] = cPickle.dumps(currdict[category][ck], protocol=cPickle.HIGHEST_PROTOCOL)
|
||||
finally:
|
||||
dbf.close()
|
||||
elif 'dirtykeys' in _cfgstore:
|
||||
@@ -2383,7 +2397,7 @@ class ConfigManager(object):
|
||||
if ck in dbf:
|
||||
del dbf[ck]
|
||||
else:
|
||||
dbf[ck] = cPickle.dumps(currdict[category][ck])
|
||||
dbf[ck] = cPickle.dumps(currdict[category][ck], protocol=cPickle.HIGHEST_PROTOCOL)
|
||||
finally:
|
||||
dbf.close()
|
||||
willrun = False
|
||||
|
||||
@@ -687,6 +687,9 @@ def handle_dispatch(connection, cert, dispatch, peername):
|
||||
cfm.get_collective_member(peername)['fingerprint'], cert):
|
||||
connection.close()
|
||||
return
|
||||
pversion = 0
|
||||
if bytearray(dispatch)[0] == 0x80:
|
||||
pversion = bytearray(dispatch)[1]
|
||||
dispatch = pickle.loads(dispatch)
|
||||
configmanager = cfm.ConfigManager(dispatch['tenant'])
|
||||
nodes = dispatch['nodes']
|
||||
@@ -723,18 +726,18 @@ def handle_dispatch(connection, cert, dispatch, peername):
|
||||
configmanager=configmanager,
|
||||
inputdata=inputdata))
|
||||
for res in itertools.chain(*passvalues):
|
||||
_forward_rsp(connection, res)
|
||||
_forward_rsp(connection, res, pversion)
|
||||
except Exception as res:
|
||||
_forward_rsp(connection, res)
|
||||
_forward_rsp(connection, res, pversion)
|
||||
connection.sendall('\x00\x00\x00\x00\x00\x00\x00\x00')
|
||||
|
||||
|
||||
def _forward_rsp(connection, res):
|
||||
def _forward_rsp(connection, res, pversion):
|
||||
try:
|
||||
r = pickle.dumps(res)
|
||||
r = pickle.dumps(res, protocol=pversion)
|
||||
except TypeError:
|
||||
r = pickle.dumps(Exception(
|
||||
'Cannot serialize error, check collective.manager error logs for details' + str(res)))
|
||||
'Cannot serialize error, check collective.manager error logs for details' + str(res)), protocol=pversion)
|
||||
rlen = len(r)
|
||||
if not rlen:
|
||||
return
|
||||
@@ -963,12 +966,20 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata,
|
||||
if not util.cert_matches(a['fingerprint'], remote.getpeercert(
|
||||
binary_form=True)):
|
||||
raise Exception("Invalid certificate on peer")
|
||||
tlvdata.recv(remote)
|
||||
banner = tlvdata.recv(remote)
|
||||
vers = banner.split()[2]
|
||||
if vers == 'v0':
|
||||
pvers = 2
|
||||
elif vers == 'v1':
|
||||
pvers = 4
|
||||
if sys.version_info[0] < 3:
|
||||
pvers = 2
|
||||
tlvdata.recv(remote)
|
||||
myname = collective.get_myname()
|
||||
dreq = pickle.dumps({'name': myname, 'nodes': list(nodes),
|
||||
'path': element,'tenant': configmanager.tenant,
|
||||
'operation': operation, 'inputdata': inputdata})
|
||||
'operation': operation, 'inputdata': inputdata},
|
||||
version=pvers)
|
||||
tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}})
|
||||
remote.sendall(dreq)
|
||||
while True:
|
||||
|
||||
@@ -123,7 +123,8 @@ def sessionhdl(connection, authname, skipauth=False, cert=None):
|
||||
if authdata:
|
||||
cfm = authdata[1]
|
||||
authenticated = True
|
||||
send_data(connection, "Confluent -- v0 --")
|
||||
# version 0 == original, version 1 == pickle3 allowed
|
||||
send_data(connection, "Confluent -- v{0} --".format(sys.version_info[0] - 2))
|
||||
while not authenticated: # prompt for name and passphrase
|
||||
send_data(connection, {'authpassed': 0})
|
||||
response = tlvdata.recv(connection)
|
||||
|
||||
Reference in New Issue
Block a user