diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 393b1bc7..62f3ce90 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -101,8 +101,9 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre ''.format(leader, str(e)), 'subsystem': 'collective'}) return False + print("connecting to leader") async with connecting: - with cfm._initlock: + async with cfm._initlock: # remote is a socket... banner = await tlvdata.recv(remote) # the banner if not banner: @@ -128,7 +129,7 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre return False if 'waitinline' in keydata: await asyncio.sleep(0.3) - return connect_to_leader(cert, name, leader, None, isretry=True) + return await connect_to_leader(cert, name, leader, None, isretry=True) if 'leader' in keydata: if keydata['leader'] == None: return None @@ -140,7 +141,7 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre keydata['leader']) if ldrc and ldrc['name'] == name: raise Exception("Redirected to self") - return connect_to_leader(name=name, + return await connect_to_leader(name=name, leader=keydata['leader']) if 'txcount' in keydata: log.log({'info': @@ -150,12 +151,12 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre 'subsystem': 'collective'}) return await become_leader(remote) return False - follower.kill() - cfm.stop_following() + follower.cancel() + await cfm.stop_following() follower = None if follower is not None: - follower.kill() - cfm.stop_following() + follower.cancel() + await cfm.stop_following() follower = None log.log({'info': 'Following leader {0}'.format(leader), 'subsystem': 'collective'}) @@ -165,17 +166,17 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre dbsize = dbi['dbsize'] dbjson = b'' while (len(dbjson) < dbsize): - ndata = remote.recv(dbsize - len(dbjson)) + ndata = await remote[0].read(dbsize - len(dbjson)) if not ndata: try: - remote.close() + remote[0].close() except Exception: pass log.log({'error': 'Retrying connection, error during initial sync', 'subsystem': 'collective'}) - return connect_to_leader(ocert, oname, oleader, None) + return await connect_to_leader(ocert, oname, oleader, None) raise Exception("Error doing initial DB transfer") # bad ssl write retry dbjson += ndata - cfm.clear_configuration() + await cfm.clear_configuration() try: cfm._restore_keys(keydata, None, sync=False) for c in colldata: @@ -185,28 +186,30 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre for globvar in globaldata: cfm.set_global(globvar, globaldata[globvar], False) cfm._txcount = dbi.get('txcount', 0) - cfm.ConfigManager(tenant=None)._load_from_json(dbjson, + await cfm.ConfigManager(tenant=None)._load_from_json(dbjson, sync=False) cfm.commit_clear() except Exception: - cfm.stop_following() + print("huh????") + await cfm.stop_following() cfm.rollback_clear() raise currentleader = leader #spawn this as a thread... - remote.settimeout(90) + #remote.settimeout(90) follower = util.spawn(follow_leader(remote, leader)) return True -def follow_leader(remote, leader): +async def follow_leader(remote, leader): global currentleader global retrythread global follower cleanexit = False newleader = None try: - exitcause = cfm.follow_channel(remote) + exitcause = await cfm.follow_channel(remote) + print(repr(exitcause)) newleader = exitcause.get('newleader', None) except greenlet.GreenletExit: cleanexit = True @@ -219,7 +222,7 @@ def follow_leader(remote, leader): log.log( {'info': 'Previous leader directed us to join new leader {}'.format(newleader)}) try: - if connect_to_leader(None, get_myname(), newleader): + if await connect_to_leader(None, get_myname(), newleader): return except Exception: log.log({'error': 'Unknown error attempting to connect to {}, check trace log'.format(newleader), 'subsystem': 'collective'}) @@ -228,7 +231,7 @@ def follow_leader(remote, leader): 'collective membership'.format(leader), 'subsystem': 'collective'}) # The leader has folded, time to startup again... follower = None - cfm.stop_following() + await cfm.stop_following() currentleader = None if retrythread is None: # start a recovery retrythread = util.spawn_after( @@ -324,18 +327,18 @@ async def handle_connection(connection, cert, request, local=False): if follower is not None: linfo = cfm.get_collective_member_by_address(currentleader) try: - remote = socket.create_connection((currentleader, 13001), 15) + _, remote = await create_connection(currentleader) except Exception: cfm.stop_following() return - remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, - keyfile='/etc/confluent/privkey.pem', - certfile='/etc/confluent/srvcert.pem') - cert = remote.getpeercert(binary_form=True) + #remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, + # keyfile='/etc/confluent/privkey.pem', + # certfile='/etc/confluent/srvcert.pem') + cert = remote[1].get_extra_info('ssl_object').getpeercert(binary_form=True) if not (linfo and util.cert_matches( linfo['fingerprint'], cert)): - remote.close() + #remote.close() await tlvdata.send(connection, {'error': 'Invalid certificate, ' 'redo invitation process'}) @@ -368,7 +371,7 @@ async def handle_connection(connection, cert, request, local=False): await tlvdata.send(connection, {'collective': {'error': '{0} is not a recognized collective member'.format(todelete)}}) return - cfm.del_collective_member(todelete) + await cfm.del_collective_member(todelete) await tlvdata.send(connection, {'collective': {'status': 'Successfully deleted {0}'.format(todelete)}}) connection.close() @@ -458,7 +461,6 @@ async def handle_connection(connection, cert, request, local=False): proof = rsp['collective']['approval'] proof = base64.b64decode(proof) j = invites.check_server_proof(invitation, mycert, cert, proof) - print(repr(j)) if not j: remote.close() await tlvdata.send(connection, {'collective': @@ -494,9 +496,9 @@ async def handle_connection(connection, cert, request, local=False): myfprint = util.get_fingerprint(mycert) iam = cfm.get_collective_member(get_myname()) if not iam: - cfm.add_collective_member(get_myname(), + await cfm.add_collective_member(get_myname(), connection[1].transport.get_extra_info('socket').getsockname()[0], myfprint) - cfm.add_collective_member(request['name'], + await cfm.add_collective_member(request['name'], connection[1].transport.get_extra_info('socket').getpeername()[0], fprint, role) myleader = await get_leader(connection) ldrfprint = cfm.get_collective_member_by_address( @@ -516,17 +518,17 @@ async def handle_connection(connection, cert, request, local=False): drone = request['name'] droneinfo = cfm.get_collective_member(drone) if not droneinfo: - tlvdata.send(connection, + await tlvdata.send(connection, {'error': 'Unrecognized leader, ' 'redo invitation process'}) return if not util.cert_matches(droneinfo['fingerprint'], cert): - tlvdata.send(connection, + await tlvdata.send(connection, {'error': 'Invalid certificate, ' 'redo invitation process'}) return if request['txcount'] < cfm._txcount: - tlvdata.send(connection, + await tlvdata.send(connection, {'error': 'Refusing to be assimilated by inferior' 'transaction count', 'txcount': cfm._txcount,}) @@ -534,7 +536,7 @@ async def handle_connection(connection, cert, request, local=False): if cfm.cfgstreams and request['txcount'] == cfm._txcount: try: cfm.check_quorum() - tlvdata.send(connection, + await tlvdata.send(connection, {'error': 'Refusing to be assimilated as I am a leader with quorum', 'txcount': cfm._txcount,}) return @@ -543,19 +545,19 @@ async def handle_connection(connection, cert, request, local=False): myfollowcount = len(list(cfm.cfgstreams)) if followcount is not None: if followcount < myfollowcount: - tlvdata.send(connection, + await tlvdata.send(connection, {'error': 'Refusing to be assimilated by leader with fewer followers', 'txcount': cfm._txcount,}) return elif followcount == myfollowcount: myname = sortutil.naturalize_string(get_myname()) if myname < sortutil.naturalize_string(request['name']): - tlvdata.send(connection, + await tlvdata.send(connection, {'error': 'Refusing, my name is better', 'txcount': cfm._txcount,}) return if follower is not None and not follower.dead: - tlvdata.send( + await tlvdata.send( connection, {'error': 'Already following, assimilate leader first', 'leader': currentleader}) @@ -563,24 +565,25 @@ async def handle_connection(connection, cert, request, local=False): return if connecting.active: # don't try to connect while actively already trying to connect - tlvdata.send(connection, {'status': 0}) - connection.close() + await tlvdata.send(connection, {'status': 0}) + #connection.close() return - if (currentleader == connection.getpeername()[0] and + cnn = connection[1].get_extra_info('socket') + if (currentleader == cnn.getpeername()[0] and follower and not follower.dead): # if we are happily following this leader already, don't stir # the pot - tlvdata.send(connection, {'status': 0}) + await tlvdata.send(connection, {'status': 0}) connection.close() return log.log({'info': 'Connecting in response to assimilation', 'subsystem': 'collective'}) - newleader = connection.getpeername()[0] + newleader = cnn.getpeername()[0] if cfm.cfgstreams: - retire_as_leader(newleader) - tlvdata.send(connection, {'status': 0}) - connection.close() - if not connect_to_leader(None, None, leader=newleader): + await retire_as_leader(newleader) + await tlvdata.send(connection, {'status': 0}) + cnn.close() + if not await connect_to_leader(None, None, leader=newleader): if retrythread is None: retrythread = util.spawn_after(random.random(), start_collective) @@ -605,38 +608,40 @@ async def handle_connection(connection, cert, request, local=False): await tlvdata.send(connection, {'error': 'Invalid certificate, ' 'redo invitation process'}) - connection.close() + #connection.close() return cnn = connection[1].transport.get_extra_info('socket') myself = cnn.getsockname()[0] if connecting.active or initting: await tlvdata.send(connection, {'error': 'Connecting right now', 'backoff': True}) - connection.close() + #connection.close() return if leader_init.active: await tlvdata.send(connection, {'error': 'Servicing a connection', 'waitinline': True}) - connection.close() + #connection.close() return if myself != await get_leader(connection): await tlvdata.send( connection, {'error': 'Cannot assimilate, our leader is ' 'in another castle', 'leader': currentleader}) - connection.close() + #connection.close() return if request['txcount'] > cfm._txcount: - retire_as_leader() - tlvdata.send(connection, + await retire_as_leader() + await tlvdata.send(connection, {'error': 'Client has higher tranasaction count, ' 'should assimilate me, connecting..', 'txcount': cfm._txcount}) log.log({'info': 'Connecting to leader due to superior ' 'transaction count', 'subsystem': 'collective'}) - connection.close() # well this won't work - if not connect_to_leader( - None, None, connection.getpeername()[0]): + cnn = connection[1].transport.get_extra_info('socket') + peername = cnn.getpeername()[0] + cnn.close() + if not await connect_to_leader( + None, None, peername): if retrythread is None: retrythread = util.spawn_after(5 + random.random(), start_collective) @@ -651,20 +656,22 @@ async def handle_connection(connection, cert, request, local=False): await tlvdata.send(connection, cfm._dump_keys(None, False)) await tlvdata.send(connection, cfm._cfgstore['collective']) await tlvdata.send(connection, {'confluent_uuid': cfm.get_global('confluent_uuid')}) # cfm.get_globals()) - cfgdata = cfm.ConfigManager(None)._dump_to_json() + cfgdata = await cfm.ConfigManager(None)._dump_to_json() try: await tlvdata.send(connection, {'txcount': cfm._txcount, 'dbsize': len(cfgdata)}) - connection.sendall(cfgdata) + connection[1].write(cfgdata) + await connection[1].drain() except Exception: try: connection.close() finally: + raise return None #tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, # so far unused anyway - connection.settimeout(90) - if not cfm.relay_slaved_requests(drone, connection): + #connection.settimeout(90) + if not await cfm.relay_slaved_requests(drone, connection): log.log({'info': 'All clients have disconnected, starting recovery process', 'subsystem': 'collective'}) if retrythread is None: # start a recovery if everyone else seems @@ -698,13 +705,13 @@ async def try_assimilate(drone, followcount, remote): # Oh well, unable to connect, hopefully the rest will be # in order return - tlvdata.send(remote, {'collective': {'operation': 'assimilate', + await tlvdata.send(remote, {'collective': {'operation': 'assimilate', 'name': get_myname(), 'followcount': followcount, 'txcount': cfm._txcount}}) - tlvdata.recv(remote) # the banner - tlvdata.recv(remote) # authpassed... 0.. - answer = tlvdata.recv(remote) + await tlvdata.recv(remote) # the banner + await tlvdata.recv(remote) # authpassed... 0.. + answer = await tlvdata.recv(remote) if not answer: log.log( {'error': @@ -715,8 +722,8 @@ async def try_assimilate(drone, followcount, remote): if 'txcount' in answer: log.log({'info': 'Deferring to {0} due to target being a better leader'.format( drone), 'subsystem': 'collective'}) - retire_as_leader(drone) - if not connect_to_leader(None, None, leader=remote.getpeername()[0]): + await retire_as_leader(drone) + if not await connect_to_leader(None, None, leader=remote.getpeername()[0]): if retrythread is None: retrythread = util.spawn_after(random.random(), start_collective) @@ -748,12 +755,12 @@ async def get_leader(connection): await become_leader(connection) return currentleader -def retire_as_leader(newleader=None): +async def retire_as_leader(newleader=None): global currentleader global reassimilate - cfm.stop_leading(newleader) + await cfm.stop_leading(newleader) if reassimilate is not None: - reassimilate.kill() + reassimilate.cancel() reassimilate = None currentleader = None @@ -769,8 +776,8 @@ async def become_leader(connection): log.log({'info': 'Becoming leader of collective', 'subsystem': 'collective'}) if follower is not None: - follower.kill() - cfm.stop_following() + follower.cancel() + await cfm.stop_following() follower = None if retrythread is not None: retrythread.cancel() @@ -779,7 +786,7 @@ async def become_leader(connection): currentleader = cnn.getsockname()[0] skipaddr = cnn.getpeername()[0] if reassimilate is not None: - reassimilate.kill() + reassimilate.cancel() reassimilate = util.spawn(reassimilate_missing()) cfm._ready = True if await _assimilate_missing(skipaddr): @@ -810,9 +817,11 @@ async def _assimilate_missing(skipaddr=None): connecto.append(dronecandidate) if not connecto: return True + connections = [] for ct in connecto: - util.spawn(create_connection(ct)) + connections.append(util.spawn(create_connection(ct))) for ent in connections: + ent = await ent member, remote = ent if isinstance(remote, Exception): continue @@ -909,22 +918,26 @@ async def start_collective(): if cfm.get_collective_member(member).get('role', None) == 'nonvoting': continue if cfm.cfgleader is None: - cfm.stop_following(True) + await cfm.stop_following(True) ldrcandidate = cfm.get_collective_member(member)['address'] connecto.append(ldrcandidate) + connections = [] for ct in connecto: - util.spawn(create_connection(ct)) - for ent in connections: - member, remote = ent - if isinstance(remote, Exception): - continue - if follower is None: - log.log({'info': 'Performing startup attempt to {0}'.format( - member), 'subsystem': 'collective'}) - if not await connect_to_leader(name=myname, leader=member, remote=remote): + connections.append(util.spawn(create_connection(ct))) + pnding = connections + while pnding: + rdy, pnding = await asyncio.wait(pnding, return_when=asyncio.FIRST_COMPLETED) + for ent in rdy: + member, remote = await ent + if isinstance(remote, Exception): + continue + if follower is None: + log.log({'info': 'Performing startup attempt to {0}'.format( + member), 'subsystem': 'collective'}) + if not await connect_to_leader(name=myname, leader=member, remote=remote): + remote.close() + else: remote.close() - else: - remote.close() except Exception as e: pass finally: diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 528924e8..ef537889 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -57,6 +57,7 @@ try: except ModuleNotFoundError: import dbm import ast +import asyncio import base64 from binascii import hexlify import os @@ -81,11 +82,9 @@ try: except ModuleNotFoundError: import pickle as cPickle import errno -import eventlet import eventlet.event as event import eventlet.green.select as select -import eventlet.green.threading as gthread -import eventlet.green.subprocess as subprocess +import socket import fnmatch import hashlib import json @@ -107,10 +106,10 @@ except NameError: _masterkey = None _masterintegritykey = None _dirtylock = threading.RLock() -_leaderlock = gthread.RLock() +_leaderlock = asyncio.Lock() _synclock = threading.RLock() -_rpclock = gthread.RLock() -_initlock = gthread.RLock() +_rpclock = asyncio.Lock() +_initlock = asyncio.Lock() _followerlocks = {} _config_areas = ('nodegroups', 'nodes', 'usergroups', 'users') tracelog = None @@ -132,6 +131,7 @@ _validroles = ['Administrator', 'Operator', 'Monitor', 'Stub'] membership_callback = None + def attrib_supports_expression(attrib): if not isinstance(attrib, str): attrib = attrib.decode('utf8') @@ -206,9 +206,11 @@ def _format_key(key, password=None): return {"unencryptedvalue": key} -def _do_notifier(cfg, watcher, callback): +async def _do_notifier(cfg, watcher, callback): try: - callback(nodeattribs=watcher['nodeattrs'], configmanager=cfg) + cbres = callback(nodeattribs=watcher['nodeattrs'], configmanager=cfg) + if cbres is not None: + await cbres except Exception: logException() @@ -338,44 +340,41 @@ def check_quorum(): raise exc.DegradedCollective() -def exec_on_leader(function, *args): +async def exec_on_leader(function, *args): if isinstance(cfgleader, bool): raise exc.DegradedCollective() xid = confluent.util.stringify(base64.b64encode(os.urandom(8))) while xid in _pendingchangesets: xid = confluent.util.stringify(base64.b64encode(os.urandom(8))) - _pendingchangesets[xid] = event.Event() + cloop = asyncio.get_event_loop() + _pendingchangesets[xid] = cloop.create_future() # future instead of event rpcpayload = msgpack.packb({'function': function, 'args': args, 'xid': xid}, use_bin_type=False) rpclen = len(rpcpayload) - cfgleader.sendall(struct.pack('!Q', rpclen)) - cfgleader.sendall(rpcpayload) - retv = _pendingchangesets[xid].wait() + cfgleader[1].write(struct.pack('!Q', rpclen)) + cfgleader[1].write(rpcpayload) + await cfgleader[1].drain() + retv = await _pendingchangesets[xid] del _pendingchangesets[xid] return retv -def exec_on_followers(fnname, *args): - pushes = eventlet.GreenPool() +async def exec_on_followers(fnname, *args): # Check health of collective prior to attempting - for _ in pushes.starmap( - _push_rpc, [(cfgstreams[s]['stream'], b'') for s in cfgstreams]): - pass + await asyncio.gather(*[_push_rpc(cfgstreams[s]['stream'], b'') for s in cfgstreams]) if not has_quorum(): # the leader counts in addition to registered streams raise exc.DegradedCollective() - exec_on_followers_unconditional(fnname, *args) + await exec_on_followers_unconditional(fnname, *args) -def exec_on_followers_unconditional(fnname, *args): +async def exec_on_followers_unconditional(fnname, *args): global _txcount - pushes = eventlet.GreenPool() _txcount += 1 payload = msgpack.packb({'function': fnname, 'args': args, 'txcount': _txcount}, use_bin_type=False) - for _ in pushes.starmap( - _push_rpc, [(cfgstreams[s]['stream'], payload) for s in cfgstreams]): - pass + await asyncio.gather( + *[_push_rpc(cfgstreams[s]['stream'], payload) for s in cfgstreams]) def logException(): @@ -387,9 +386,12 @@ def logException(): event=confluent.log.Events.stacktrace) -def _do_add_watcher(watcher, added, configmanager, renamed=()): +async def _do_add_watcher(watcher, added, configmanager, renamed=()): try: - watcher(added=added, deleting=(), renamed=renamed, configmanager=configmanager) + watched = watcher(added=added, deleting=(), renamed=renamed, configmanager=configmanager) + if watched is None: + return + await watched except Exception: logException() @@ -417,12 +419,14 @@ def init_masterkey(password=None, autogen=True): # password=password)) -def _push_rpc(stream, payload): - with _rpclock: +async def _push_rpc(stream, payload): + async with _rpclock: try: - stream.sendall(struct.pack('!Q', len(payload))) + stream[1].write(struct.pack('!Q', len(payload))) if len(payload): - stream.sendall(payload) + stream[1].write(payload) + print("sent payload: " + repr(payload)) + await stream[1].drain() return True except Exception: logException() @@ -664,16 +668,15 @@ def has_quorum(): return voters > allvoters // 2 cfgstreams = {} -def relay_slaved_requests(name, listener): +async def relay_slaved_requests(name, listener): global cfgleader global _hasquorum - pushes = eventlet.GreenPool() if name not in _followerlocks: - _followerlocks[name] = gthread.RLock() + _followerlocks[name] = asyncio.Lock() meminfo = get_collective_member(name) - with _followerlocks[name]: + async with _followerlocks[name]: try: - stop_following() + await stop_following() if name in cfgstreams: try: cfgstreams[name]['stream'].close() @@ -690,15 +693,12 @@ def relay_slaved_requests(name, listener): if _newquorum is not None: _hasquorum = _newquorum payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False) - for _ in pushes.starmap( - _push_rpc, - [(cfgstreams[s]['stream'], payload) for s in cfgstreams]): - pass + await asyncio.gather(*[_push_rpc(cfgstreams[s]['stream'], payload) for s in cfgstreams]) _newquorum = has_quorum() _hasquorum = _newquorum if _hasquorum and _pending_collective_updates: apply_pending_collective_updates() - msg = lh.get_next_msg() + msg = await lh.get_next_msg() while msg: if name not in cfgstreams: raise Exception("Unexpected loss of node in followers: " + name) @@ -730,7 +730,7 @@ def relay_slaved_requests(name, listener): if not res: break try: - msg = lh.get_next_msg() + msg = await lh.get_next_msg() except Exception: msg = None finally: @@ -745,23 +745,21 @@ def relay_slaved_requests(name, listener): if cfgstreams: _hasquorum = has_quorum() payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False) - for _ in pushes.starmap( - _push_rpc, - [(cfgstreams[s]['stream'], payload) for s in cfgstreams]): - pass + await asyncio.gather( + *[_push_rpc(cfgstreams[s]['stream'], payload) for s in cfgstreams]) if membership_callback: membership_callback() if not cfgstreams and not cfgleader: # last one out, set cfgleader to boolean to mark dead collective - stop_following(True) + await stop_following(True) return False return True lastheartbeat = None -def check_leader(): - _push_rpc(cfgleader, b'') +async def check_leader(): + await _push_rpc(cfgleader, b'') tries = 0 while tries < 30: - eventlet.sleep(0.1) + await asyncio.sleep(0.1) tries += 1 if lastheartbeat and lastheartbeat > (confluent.util.monotonic_time() - 3): return True @@ -774,25 +772,38 @@ class StreamHandler(object): self.expiry = self.keepalive + 40 - def get_next_msg(self): + async def get_next_msg(self): r = (False,) + msg = None try: - while not r[0]: - r = select.select( - (self.sock,), (), (), - self.keepalive - confluent.util.monotonic_time()) - if confluent.util.monotonic_time() > self.expiry: - return None - if confluent.util.monotonic_time() > self.keepalive: - res = _push_rpc(self.sock, b'') # nulls are a keepalive - if not res: + while not msg: + try: + msg = await asyncio.wait_for(self.sock[0].read(), timeout=self.keepalive - confluent.util.monotonic_time()) + except TimeoutError: + msg = None + if confluent.util.monotonic_time() > self.expiry: return None - #TODO: this test can work fine even if the other end is - # gone, go to a more affirmative test to more quickly - # detect outage to peer - self.keepalive = confluent.util.monotonic_time() + 20 - self.expiry = confluent.util.monotonic_time() + 60 - msg = self.sock.recv(8) + if confluent.util.monotonic_time() > self.keepalive: + res = await _push_rpc(self.sock, b'') # nulls are a keepalive + if not res: + return None + self.keepalive = confluent.util.monotonic_time() + 20 + # while not r[0]: + # r = select.select( + # (self.sock,), (), (), + # self.keepalive - confluent.util.monotonic_time()) + # if confluent.util.monotonic_time() > self.expiry: + # return None + # if confluent.util.monotonic_time() > self.keepalive: + # res = await _push_rpc(self.sock, b'') # nulls are a keepalive + # if not res: + # return None + # #TODO: this test can work fine even if the other end is + # # gone, go to a more affirmative test to more quickly + # # detect outage to peer + # self.keepalive = confluent.util.monotonic_time() + 20 + #self.expiry = confluent.util.monotonic_time() + 60 + #msg = self.sock.recv(8) except Exception as e: msg = None return msg @@ -801,8 +812,8 @@ class StreamHandler(object): self.sock = None -def stop_following(replacement=None): - with _leaderlock: +async def stop_following(replacement=None): + async with _leaderlock: global cfgleader if cfgleader and not isinstance(cfgleader, bool): try: @@ -811,14 +822,14 @@ def stop_following(replacement=None): pass cfgleader = replacement -def stop_leading(newleader=None): +async def stop_leading(newleader=None): rpcpayload = None if newleader is not None: rpcpayload = msgpack.packb({'newleader': newleader}, use_bin_type=False) for stream in list(cfgstreams): try: if rpcpayload is not None: - _push_rpc(cfgstreams[stream]['stream'], rpcpayload) + await _push_rpc(cfgstreams[stream]['stream'], rpcpayload) cfgstreams[stream]['stream'].close() except Exception: pass @@ -849,15 +860,15 @@ def rollback_clear(): ConfigManager.wait_for_sync(True) -def clear_configuration(): +async def clear_configuration(): global _cfgstore global _txcount global _oldcfgstore global _oldtxcount global _ready _ready = False - stop_leading() - stop_following() + await stop_leading() + await stop_following() _oldcfgstore = _cfgstore _oldtxcount = _txcount _cfgstore = {} @@ -891,23 +902,23 @@ def commit_clear(): cfgleader = None -def follow_channel(channel): +async def follow_channel(channel): global _txcount global _hasquorum global lastheartbeat try: - stop_leading() - stop_following(channel) + await stop_leading() + await stop_following(channel) lh = StreamHandler(channel) - msg = lh.get_next_msg() + msg = await lh.get_next_msg() while msg: + msg, rpc = msg[:8], msg[8:] sz = struct.unpack('!Q', msg)[0] if sz == 0: lastheartbeat = confluent.util.monotonic_time() else: - rpc = b'' while len(rpc) < sz: - nrpc = channel.recv(sz - len(rpc)) + nrpc = await channel[0].read(sz - len(rpc)) if not nrpc: raise Exception('Truncated message error') rpc += nrpc @@ -930,36 +941,36 @@ def follow_channel(channel): exc = ValueError(excstr) else: exc = Exception(excstr) - _pendingchangesets[rpc['xid']].send_exception(exc) + _pendingchangesets[rpc['xid']].set_exception(exc) else: - _pendingchangesets[rpc['xid']].send(rpc.get('ret', None)) + _pendingchangesets[rpc['xid']].set_result(rpc.get('ret', None)) if 'quorum' in rpc: _hasquorum = rpc['quorum'] - res = _push_rpc(channel, b'') # use null as ACK + res = await _push_rpc(channel, b'') # use null as ACK if not res: break - msg = lh.get_next_msg() + msg = await lh.get_next_msg() finally: # mark the connection as broken if cfgstreams: - stop_following(None) + await stop_following(None) else: - stop_following(True) + await stop_following(True) return {} -def add_collective_member(name, address, fingerprint, role=None): +async def add_collective_member(name, address, fingerprint, role=None): if cfgleader: - return exec_on_leader('add_collective_member', name, address, fingerprint, role) + return await exec_on_leader('add_collective_member', name, address, fingerprint, role) if cfgstreams: - exec_on_followers('_true_add_collective_member', name, address, fingerprint, True, role) + await exec_on_followers('_true_add_collective_member', name, address, fingerprint, True, role) _true_add_collective_member(name, address, fingerprint, role=role) -def del_collective_member(name): +async def del_collective_member(name): if cfgleader and not isinstance(cfgleader, bool): - return exec_on_leader('del_collective_member', name) + return await exec_on_leader('del_collective_member', name) if cfgstreams: - exec_on_followers_unconditional('_true_del_collective_member', name) + await exec_on_followers_unconditional('_true_del_collective_member', name) _true_del_collective_member(name) def _true_del_collective_member(name, sync=True): @@ -1290,7 +1301,7 @@ class ConfigManager(object): self.clientfiles = {} global _cfgstore self.inrestore = False - with _initlock: + if True: # with _initlock: if _cfgstore is None: init() self.decrypt = decrypt @@ -1544,7 +1555,7 @@ class ConfigManager(object): except KeyError: return None - def set_usergroup(self, groupname, attributemap): + async def set_usergroup(self, groupname, attributemap): """Set usergroup attribute(s) :param groupname: the name of teh group to modify @@ -1554,7 +1565,7 @@ class ConfigManager(object): return exec_on_leader('_rpc_master_set_usergroup', self.tenant, groupname, attributemap) if cfgstreams: - exec_on_followers('_rpc_set_usergroup', self.tenant, groupname, + await exec_on_followers('_rpc_set_usergroup', self.tenant, groupname, attributemap) self._true_set_usergroup(groupname, attributemap) @@ -1573,7 +1584,7 @@ class ConfigManager(object): _mark_dirtykey('usergroups', groupname, self.tenant) self._bg_sync_to_file() - def create_usergroup(self, groupname, role="Administrator"): + async def create_usergroup(self, groupname, role="Administrator"): """Create a new user :param groupname: The name of the user group @@ -1585,7 +1596,7 @@ class ConfigManager(object): return exec_on_leader('_rpc_master_create_usergroup', self.tenant, groupname, role) if cfgstreams: - exec_on_followers('_rpc_create_usergroup', self.tenant, groupname, + await exec_on_followers('_rpc_create_usergroup', self.tenant, groupname, role) self._true_create_usergroup(groupname, role) @@ -1609,11 +1620,11 @@ class ConfigManager(object): _mark_dirtykey('usergroups', groupname, self.tenant) self._bg_sync_to_file() - def del_usergroup(self, name): + async def del_usergroup(self, name): if cfgleader: return exec_on_leader('_rpc_master_del_usergroup', self.tenant, name) if cfgstreams: - exec_on_followers('_rpc_del_usergroup', self.tenant, name) + await exec_on_followers('_rpc_del_usergroup', self.tenant, name) self._true_del_usergroup(name) def _true_del_usergroup(self, name): @@ -1622,7 +1633,7 @@ class ConfigManager(object): _mark_dirtykey('usergroups', name, self.tenant) self._bg_sync_to_file() - def set_user(self, name, attributemap): + async def set_user(self, name, attributemap): """Set user attribute(s) :param name: The login name of the user @@ -1632,7 +1643,7 @@ class ConfigManager(object): return exec_on_leader('_rpc_master_set_user', self.tenant, name, attributemap) if cfgstreams: - exec_on_followers('_rpc_set_user', self.tenant, name, attributemap) + await exec_on_followers('_rpc_set_user', self.tenant, name, attributemap) self._true_set_user(name, attributemap) def _true_set_user(self, name, attributemap): @@ -1660,11 +1671,11 @@ class ConfigManager(object): _mark_dirtykey('users', name, self.tenant) self._bg_sync_to_file() - def del_user(self, name): + async def del_user(self, name): if cfgleader: return exec_on_leader('_rpc_master_del_user', self.tenant, name) if cfgstreams: - exec_on_followers('_rpc_del_user', self.tenant, name) + await exec_on_followers('_rpc_del_user', self.tenant, name) self._true_del_user(name) def _true_del_user(self, name): @@ -1673,7 +1684,7 @@ class ConfigManager(object): _mark_dirtykey('users', name, self.tenant) self._bg_sync_to_file() - def create_user(self, name, + async def create_user(self, name, role="Administrator", uid=None, displayname=None, attributemap=None): """Create a new user @@ -1689,7 +1700,7 @@ class ConfigManager(object): return exec_on_leader('_rpc_master_create_user', self.tenant, name, role, uid, displayname, attributemap) if cfgstreams: - exec_on_followers('_rpc_create_user', self.tenant, name, + await exec_on_followers('_rpc_create_user', self.tenant, name, role, uid, displayname, attributemap) self._true_create_user(name, role, uid, displayname, attributemap) @@ -1903,7 +1914,7 @@ class ConfigManager(object): def add_group_attributes(self, attribmap): self.set_group_attributes(attribmap, autocreate=True) - def set_group_attributes(self, attribmap, autocreate=False): + async def set_group_attributes(self, attribmap, autocreate=False): for group in attribmap: curr = attribmap[group] for attrib in curr: @@ -1922,7 +1933,7 @@ class ConfigManager(object): return exec_on_leader('_rpc_master_set_group_attributes', self.tenant, attribmap, autocreate) if cfgstreams: - exec_on_followers('_rpc_set_group_attributes', self.tenant, + await exec_on_followers('_rpc_set_group_attributes', self.tenant, attribmap, autocreate) self._true_set_group_attributes(attribmap, autocreate) @@ -2036,12 +2047,12 @@ class ConfigManager(object): self._notif_attribwatchers(changeset) self._bg_sync_to_file() - def clear_group_attributes(self, groups, attributes): + async def clear_group_attributes(self, groups, attributes): if cfgleader: return exec_on_leader('_rpc_master_clear_group_attributes', self.tenant, groups, attributes) if cfgstreams: - exec_on_followers('_rpc_clear_group_attributes', self.tenant, + await exec_on_followers('_rpc_clear_group_attributes', self.tenant, groups, attributes) self._true_clear_group_attributes(groups, attributes) @@ -2155,23 +2166,25 @@ class ConfigManager(object): for watcher in notifdata: watcher = notifdata[watcher] callback = watcher['callback'] - eventlet.spawn_n(_do_notifier, self, watcher, callback) + confluent.util.spawn(_do_notifier(self, watcher, callback)) - def del_nodes(self, nodes): + async def del_nodes(self, nodes): if isinstance(nodes, set): nodes = list(nodes) # msgpack can't handle set if cfgleader: # slaved to a collective - return exec_on_leader('_rpc_master_del_nodes', self.tenant, - nodes) + return await exec_on_leader('_rpc_master_del_nodes', self.tenant, + nodes) if cfgstreams: - exec_on_followers('_rpc_del_nodes', self.tenant, nodes) - self._true_del_nodes(nodes) + await exec_on_followers('_rpc_del_nodes', self.tenant, nodes) + await self._true_del_nodes(nodes) - def _true_del_nodes(self, nodes): + async def _true_del_nodes(self, nodes): if self.tenant in self._nodecollwatchers: for watcher in self._nodecollwatchers[self.tenant]: watcher = self._nodecollwatchers[self.tenant][watcher] - watcher(added=(), deleting=nodes, renamed=(), configmanager=self) + watched = watcher(added=(), deleting=nodes, renamed=(), configmanager=self) + if watched is not None: + await watched changeset = {} for node in nodes: # set a reserved attribute for the sake of the change notification @@ -2186,12 +2199,12 @@ class ConfigManager(object): self._notif_attribwatchers(changeset) self._bg_sync_to_file() - def del_groups(self, groups): + async def del_groups(self, groups): if cfgleader: return exec_on_leader('_rpc_master_del_groups', self.tenant, groups) if cfgstreams: - exec_on_followers('_rpc_del_groups', self.tenant, groups) + await exec_on_followers('_rpc_del_groups', self.tenant, groups) self._true_del_groups(groups) def _true_del_groups(self, groups): @@ -2205,7 +2218,7 @@ class ConfigManager(object): self._notif_attribwatchers(changeset) self._bg_sync_to_file() - def clear_node_attributes(self, nodes, attributes, warnings=None): + async def clear_node_attributes(self, nodes, attributes, warnings=None): if cfgleader: mywarnings = exec_on_leader('_rpc_master_clear_node_attributes', self.tenant, nodes, attributes) @@ -2213,7 +2226,7 @@ class ConfigManager(object): warnings.extend(mywarnings) return if cfgstreams: - exec_on_followers('_rpc_clear_node_attributes', self.tenant, + await exec_on_followers('_rpc_clear_node_attributes', self.tenant, nodes, attributes) self._true_clear_node_attributes(nodes, attributes, warnings) @@ -2261,15 +2274,15 @@ class ConfigManager(object): self._notif_attribwatchers(changeset) self._bg_sync_to_file() - def add_node_attributes(self, attribmap): - self.set_node_attributes(attribmap, autocreate=True) + async def add_node_attributes(self, attribmap): + await self.set_node_attributes(attribmap, autocreate=True) - def rename_nodes(self, renamemap): + async def rename_nodes(self, renamemap): if cfgleader: return exec_on_leader('_rpc_master_rename_nodes', self.tenant, renamemap) if cfgstreams: - exec_on_followers('_rpc_rename_nodes', self.tenant, renamemap) + await exec_on_followers('_rpc_rename_nodes', self.tenant, renamemap) self._true_rename_nodes(renamemap) def _true_rename_nodes(self, renamemap): @@ -2309,14 +2322,14 @@ class ConfigManager(object): nodecollwatchers = self._nodecollwatchers[self.tenant] for watcher in nodecollwatchers: watcher = nodecollwatchers[watcher] - eventlet.spawn_n(_do_add_watcher, watcher, (), self, renamemap) + confluent.util.spawn(_do_add_watcher(watcher, (), self, renamemap)) self._bg_sync_to_file() - def rename_nodegroups(self, renamemap): + async def rename_nodegroups(self, renamemap): if cfgleader: return exec_on_leader('_rpc_master_rename_nodegroups', self.tenant, renamemap) if cfgstreams: - exec_on_followers('_rpc_rename_nodegroups', self.tenant, renamemap) + await exec_on_followers('_rpc_rename_nodegroups', self.tenant, renamemap) self._true_rename_groups(renamemap) def _true_rename_groups(self, renamemap): @@ -2349,7 +2362,7 @@ class ConfigManager(object): - def set_node_attributes(self, attribmap, autocreate=False): + async def set_node_attributes(self, attribmap, autocreate=False): for node in attribmap: curr = attribmap[node] for attrib in curr: @@ -2368,14 +2381,11 @@ class ConfigManager(object): return exec_on_leader('_rpc_master_set_node_attributes', self.tenant, attribmap, autocreate) if cfgstreams: - exec_on_followers('_rpc_set_node_attributes', + await exec_on_followers('_rpc_set_node_attributes', self.tenant, attribmap, autocreate) self._true_set_node_attributes(attribmap, autocreate) def _true_set_node_attributes(self, attribmap, autocreate): - # TODO(jbjohnso): multi mgr support, here if we have peers, - # pickle the arguments and fire them off in eventlet - # flows to peers, all should have the same result newnodes = [] changeset = {} # first do a sanity check of the input upfront @@ -2499,18 +2509,18 @@ class ConfigManager(object): nodecollwatchers = self._nodecollwatchers[self.tenant] for watcher in nodecollwatchers: watcher = nodecollwatchers[watcher] - eventlet.spawn_n(_do_add_watcher, watcher, newnodes, self) + confluent.util.spawn(_do_add_watcher(watcher, newnodes, self)) self._bg_sync_to_file() #TODO: wait for synchronization to suceed/fail??) - def _load_from_json(self, jsondata, sync=True): + async def _load_from_json(self, jsondata, sync=True): self.inrestore = True try: - self._load_from_json_backend(jsondata, sync=True) + await self._load_from_json_backend(jsondata, sync=True) finally: self.inrestore = False - def _load_from_json_backend(self, jsondata, sync=True): + async def _load_from_json_backend(self, jsondata, sync=True): """Load fresh configuration data from jsondata :param jsondata: String of jsondata @@ -2569,13 +2579,13 @@ class ConfigManager(object): if confarea not in tmpconfig: continue if confarea == 'nodes': - self.set_node_attributes(tmpconfig[confarea], True) + await self.set_node_attributes(tmpconfig[confarea], True) elif confarea == 'nodegroups': - self.set_group_attributes(tmpconfig[confarea], True) + await self.set_group_attributes(tmpconfig[confarea], True) elif confarea == 'usergroups': for usergroup in tmpconfig[confarea]: role = tmpconfig[confarea][usergroup].get('role', 'Administrator') - self.create_usergroup(usergroup, role=role) + await self.create_usergroup(usergroup, role=role) elif confarea == 'users': for user in tmpconfig[confarea]: ucfg = tmpconfig[confarea][user] @@ -2587,7 +2597,7 @@ class ConfigManager(object): uid = uid.encode('utf8') displayname = ucfg.get('displayname', None) role = ucfg.get('role', None) - self.create_user(user, uid=uid, displayname=displayname, role=role) + await self.create_user(user, uid=uid, displayname=displayname, role=role) for attrname in ('authid', 'authenticators', 'cryptpass'): if attrname in tmpconfig[confarea][user]: self._cfgstore['users'][user][attrname] = tmpconfig[confarea][user][attrname] @@ -2595,7 +2605,7 @@ class ConfigManager(object): if sync: self._bg_sync_to_file() - def _dump_to_json(self, redact=None): + async def _dump_to_json(self, redact=None): """Dump the configuration in json form to output password is used to protect the 'secret' attributes in liue of the @@ -2609,11 +2619,11 @@ class ConfigManager(object): """ with open(os.devnull, 'w+') as devnull: - worker = subprocess.Popen( - [sys.executable, __file__, '-r' if redact else ''], - stdin=devnull, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stderr = worker.communicate() + worker = await asyncio.create_subprocess_exec( + sys.executable, __file__, '-r' if redact else '', + stdin=devnull, stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + stdout, stderr = await worker.communicate() return stdout def _real_dump_to_json(self, redact=None): @@ -2911,7 +2921,7 @@ def _dump_keys(password, dojson=True): return keydata -def restore_db_from_directory(location, password): +async def restore_db_from_directory(location, password): try: with open(os.path.join(location, 'keys.json'), 'r') as cfgfile: keydata = cfgfile.read() @@ -2939,7 +2949,7 @@ def restore_db_from_directory(location, password): raise with open(os.path.join(location, 'main.json'), 'r') as cfgfile: cfgdata = cfgfile.read() - ConfigManager(tenant=None)._load_from_json(cfgdata) + await ConfigManager(tenant=None)._load_from_json(cfgdata) ConfigManager.wait_for_sync(True) diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index 4ccfb8fd..070759c6 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -306,7 +306,7 @@ class ConsoleHandler(object): def _disconnect(self): if self.connectionthread: - self.connectionthread.kill() + self.connectionthread.cancel() self.connectionthread = None # clear the terminal buffer when disconnected self.clearbuffer() @@ -328,7 +328,7 @@ class ConsoleHandler(object): if not self._is_local: return if self.connectionthread: - self.connectionthread.kill() + self.connectionthread.cancel() self.connectionthread = None self.connectionthread = util.spawn(self._connect_backend()) diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 9b2d8698..ff0f9247 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -678,12 +678,12 @@ def delete_nodegroup_collection(collectionpath, configmanager): raise Exception("Not implemented") -def delete_node_collection(collectionpath, configmanager, isnoderange): +async def delete_node_collection(collectionpath, configmanager, isnoderange): if len(collectionpath) == 2: # just node nodes = [collectionpath[-1]] if isnoderange: nodes = noderange.NodeRange(nodes[0], configmanager).nodes - configmanager.del_nodes(nodes) + await configmanager.del_nodes(nodes) for node in nodes: yield msg.DeletedResource(node) else: @@ -734,7 +734,7 @@ def create_group(inputdata, configmanager): yield msg.CreatedResource(groupname) -def create_node(inputdata, configmanager): +async def create_node(inputdata, configmanager): try: nodename = inputdata['name'] if ' ' in nodename: @@ -744,7 +744,7 @@ def create_node(inputdata, configmanager): except KeyError: raise exc.InvalidArgumentException('name not specified') try: - configmanager.add_node_attributes(attribmap) + await configmanager.add_node_attributes(attribmap) except ValueError as e: raise exc.InvalidArgumentException(str(e)) yield msg.CreatedResource(nodename) @@ -760,7 +760,7 @@ async def create_noderange(inputdata, configmanager): except KeyError: raise exc.InvalidArgumentException('name not specified') try: - configmanager.add_node_attributes(attribmap) + await configmanager.add_node_attributes(attribmap) except ValueError as e: raise exc.InvalidArgumentException(str(e)) for node in attribmap: @@ -1021,7 +1021,7 @@ async def handle_node_request(configmanager, inputdata, operation, if iscollection: if operation == "delete": return delete_node_collection(pathcomponents, configmanager, - isnoderange) + isnoderange) elif operation == "retrieve": return enumerate_node_collection(pathcomponents, configmanager) else: diff --git a/confluent_server/confluent/discovery/core.py b/confluent_server/confluent/discovery/core.py index dfb50b9f..ef00737c 100644 --- a/confluent_server/confluent/discovery/core.py +++ b/confluent_server/confluent/discovery/core.py @@ -647,7 +647,7 @@ def detected_models(): yield info['modelnumber'] -def _recheck_nodes(nodeattribs, configmanager): +async def _recheck_nodes(nodeattribs, configmanager): if not cfm.config_is_ready(): return if rechecklock.locked(): @@ -1538,11 +1538,11 @@ nodeaddhandler = None needaddhandled = False -def _handle_nodelist_change(configmanager): +async def _handle_nodelist_change(configmanager): global needaddhandled global nodeaddhandler macmap.vintage = 0 # the current mac map is probably inaccurate - _recheck_nodes((), configmanager) + await _recheck_nodes((), configmanager) if needaddhandled: needaddhandled = False nodeaddhandler = eventlet.spawn(_handle_nodelist_change, configmanager) @@ -1550,7 +1550,7 @@ def _handle_nodelist_change(configmanager): nodeaddhandler = None -def newnodes(added, deleting, renamed, configmanager): +async def newnodes(added, deleting, renamed, configmanager): global attribwatcher global needaddhandled global nodeaddhandler @@ -1573,7 +1573,7 @@ def newnodes(added, deleting, renamed, configmanager): if nodeaddhandler: needaddhandled = True else: - nodeaddhandler = eventlet.spawn(_handle_nodelist_change, configmanager) + nodeaddhandler = util.spawn(_handle_nodelist_change(configmanager))