2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-30 12:27:50 +00:00

Begin implementation of asyncio collective

The config synchronization is in progress.
This commit is contained in:
Jarrod Johnson
2024-04-26 15:48:14 -04:00
parent afa0c0df5a
commit d2edcb62c6
5 changed files with 265 additions and 242 deletions

View File

@@ -101,8 +101,9 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre
''.format(leader, str(e)),
'subsystem': 'collective'})
return False
print("connecting to leader")
async with connecting:
with cfm._initlock:
async with cfm._initlock:
# remote is a socket...
banner = await tlvdata.recv(remote) # the banner
if not banner:
@@ -128,7 +129,7 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre
return False
if 'waitinline' in keydata:
await asyncio.sleep(0.3)
return connect_to_leader(cert, name, leader, None, isretry=True)
return await connect_to_leader(cert, name, leader, None, isretry=True)
if 'leader' in keydata:
if keydata['leader'] == None:
return None
@@ -140,7 +141,7 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre
keydata['leader'])
if ldrc and ldrc['name'] == name:
raise Exception("Redirected to self")
return connect_to_leader(name=name,
return await connect_to_leader(name=name,
leader=keydata['leader'])
if 'txcount' in keydata:
log.log({'info':
@@ -150,12 +151,12 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre
'subsystem': 'collective'})
return await become_leader(remote)
return False
follower.kill()
cfm.stop_following()
follower.cancel()
await cfm.stop_following()
follower = None
if follower is not None:
follower.kill()
cfm.stop_following()
follower.cancel()
await cfm.stop_following()
follower = None
log.log({'info': 'Following leader {0}'.format(leader),
'subsystem': 'collective'})
@@ -165,17 +166,17 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre
dbsize = dbi['dbsize']
dbjson = b''
while (len(dbjson) < dbsize):
ndata = remote.recv(dbsize - len(dbjson))
ndata = await remote[0].read(dbsize - len(dbjson))
if not ndata:
try:
remote.close()
remote[0].close()
except Exception:
pass
log.log({'error': 'Retrying connection, error during initial sync', 'subsystem': 'collective'})
return connect_to_leader(ocert, oname, oleader, None)
return await connect_to_leader(ocert, oname, oleader, None)
raise Exception("Error doing initial DB transfer") # bad ssl write retry
dbjson += ndata
cfm.clear_configuration()
await cfm.clear_configuration()
try:
cfm._restore_keys(keydata, None, sync=False)
for c in colldata:
@@ -185,28 +186,30 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre
for globvar in globaldata:
cfm.set_global(globvar, globaldata[globvar], False)
cfm._txcount = dbi.get('txcount', 0)
cfm.ConfigManager(tenant=None)._load_from_json(dbjson,
await cfm.ConfigManager(tenant=None)._load_from_json(dbjson,
sync=False)
cfm.commit_clear()
except Exception:
cfm.stop_following()
print("huh????")
await cfm.stop_following()
cfm.rollback_clear()
raise
currentleader = leader
#spawn this as a thread...
remote.settimeout(90)
#remote.settimeout(90)
follower = util.spawn(follow_leader(remote, leader))
return True
def follow_leader(remote, leader):
async def follow_leader(remote, leader):
global currentleader
global retrythread
global follower
cleanexit = False
newleader = None
try:
exitcause = cfm.follow_channel(remote)
exitcause = await cfm.follow_channel(remote)
print(repr(exitcause))
newleader = exitcause.get('newleader', None)
except greenlet.GreenletExit:
cleanexit = True
@@ -219,7 +222,7 @@ def follow_leader(remote, leader):
log.log(
{'info': 'Previous leader directed us to join new leader {}'.format(newleader)})
try:
if connect_to_leader(None, get_myname(), newleader):
if await connect_to_leader(None, get_myname(), newleader):
return
except Exception:
log.log({'error': 'Unknown error attempting to connect to {}, check trace log'.format(newleader), 'subsystem': 'collective'})
@@ -228,7 +231,7 @@ def follow_leader(remote, leader):
'collective membership'.format(leader), 'subsystem': 'collective'})
# The leader has folded, time to startup again...
follower = None
cfm.stop_following()
await cfm.stop_following()
currentleader = None
if retrythread is None: # start a recovery
retrythread = util.spawn_after(
@@ -324,18 +327,18 @@ async def handle_connection(connection, cert, request, local=False):
if follower is not None:
linfo = cfm.get_collective_member_by_address(currentleader)
try:
remote = socket.create_connection((currentleader, 13001), 15)
_, remote = await create_connection(currentleader)
except Exception:
cfm.stop_following()
return
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE,
keyfile='/etc/confluent/privkey.pem',
certfile='/etc/confluent/srvcert.pem')
cert = remote.getpeercert(binary_form=True)
#remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE,
# keyfile='/etc/confluent/privkey.pem',
# certfile='/etc/confluent/srvcert.pem')
cert = remote[1].get_extra_info('ssl_object').getpeercert(binary_form=True)
if not (linfo and util.cert_matches(
linfo['fingerprint'],
cert)):
remote.close()
#remote.close()
await tlvdata.send(connection,
{'error': 'Invalid certificate, '
'redo invitation process'})
@@ -368,7 +371,7 @@ async def handle_connection(connection, cert, request, local=False):
await tlvdata.send(connection, {'collective':
{'error': '{0} is not a recognized collective member'.format(todelete)}})
return
cfm.del_collective_member(todelete)
await cfm.del_collective_member(todelete)
await tlvdata.send(connection,
{'collective': {'status': 'Successfully deleted {0}'.format(todelete)}})
connection.close()
@@ -458,7 +461,6 @@ async def handle_connection(connection, cert, request, local=False):
proof = rsp['collective']['approval']
proof = base64.b64decode(proof)
j = invites.check_server_proof(invitation, mycert, cert, proof)
print(repr(j))
if not j:
remote.close()
await tlvdata.send(connection, {'collective':
@@ -494,9 +496,9 @@ async def handle_connection(connection, cert, request, local=False):
myfprint = util.get_fingerprint(mycert)
iam = cfm.get_collective_member(get_myname())
if not iam:
cfm.add_collective_member(get_myname(),
await cfm.add_collective_member(get_myname(),
connection[1].transport.get_extra_info('socket').getsockname()[0], myfprint)
cfm.add_collective_member(request['name'],
await cfm.add_collective_member(request['name'],
connection[1].transport.get_extra_info('socket').getpeername()[0], fprint, role)
myleader = await get_leader(connection)
ldrfprint = cfm.get_collective_member_by_address(
@@ -516,17 +518,17 @@ async def handle_connection(connection, cert, request, local=False):
drone = request['name']
droneinfo = cfm.get_collective_member(drone)
if not droneinfo:
tlvdata.send(connection,
await tlvdata.send(connection,
{'error': 'Unrecognized leader, '
'redo invitation process'})
return
if not util.cert_matches(droneinfo['fingerprint'], cert):
tlvdata.send(connection,
await tlvdata.send(connection,
{'error': 'Invalid certificate, '
'redo invitation process'})
return
if request['txcount'] < cfm._txcount:
tlvdata.send(connection,
await tlvdata.send(connection,
{'error': 'Refusing to be assimilated by inferior'
'transaction count',
'txcount': cfm._txcount,})
@@ -534,7 +536,7 @@ async def handle_connection(connection, cert, request, local=False):
if cfm.cfgstreams and request['txcount'] == cfm._txcount:
try:
cfm.check_quorum()
tlvdata.send(connection,
await tlvdata.send(connection,
{'error': 'Refusing to be assimilated as I am a leader with quorum',
'txcount': cfm._txcount,})
return
@@ -543,19 +545,19 @@ async def handle_connection(connection, cert, request, local=False):
myfollowcount = len(list(cfm.cfgstreams))
if followcount is not None:
if followcount < myfollowcount:
tlvdata.send(connection,
await tlvdata.send(connection,
{'error': 'Refusing to be assimilated by leader with fewer followers',
'txcount': cfm._txcount,})
return
elif followcount == myfollowcount:
myname = sortutil.naturalize_string(get_myname())
if myname < sortutil.naturalize_string(request['name']):
tlvdata.send(connection,
await tlvdata.send(connection,
{'error': 'Refusing, my name is better',
'txcount': cfm._txcount,})
return
if follower is not None and not follower.dead:
tlvdata.send(
await tlvdata.send(
connection,
{'error': 'Already following, assimilate leader first',
'leader': currentleader})
@@ -563,24 +565,25 @@ async def handle_connection(connection, cert, request, local=False):
return
if connecting.active:
# don't try to connect while actively already trying to connect
tlvdata.send(connection, {'status': 0})
connection.close()
await tlvdata.send(connection, {'status': 0})
#connection.close()
return
if (currentleader == connection.getpeername()[0] and
cnn = connection[1].get_extra_info('socket')
if (currentleader == cnn.getpeername()[0] and
follower and not follower.dead):
# if we are happily following this leader already, don't stir
# the pot
tlvdata.send(connection, {'status': 0})
await tlvdata.send(connection, {'status': 0})
connection.close()
return
log.log({'info': 'Connecting in response to assimilation',
'subsystem': 'collective'})
newleader = connection.getpeername()[0]
newleader = cnn.getpeername()[0]
if cfm.cfgstreams:
retire_as_leader(newleader)
tlvdata.send(connection, {'status': 0})
connection.close()
if not connect_to_leader(None, None, leader=newleader):
await retire_as_leader(newleader)
await tlvdata.send(connection, {'status': 0})
cnn.close()
if not await connect_to_leader(None, None, leader=newleader):
if retrythread is None:
retrythread = util.spawn_after(random.random(),
start_collective)
@@ -605,38 +608,40 @@ async def handle_connection(connection, cert, request, local=False):
await tlvdata.send(connection,
{'error': 'Invalid certificate, '
'redo invitation process'})
connection.close()
#connection.close()
return
cnn = connection[1].transport.get_extra_info('socket')
myself = cnn.getsockname()[0]
if connecting.active or initting:
await tlvdata.send(connection, {'error': 'Connecting right now',
'backoff': True})
connection.close()
#connection.close()
return
if leader_init.active:
await tlvdata.send(connection, {'error': 'Servicing a connection',
'waitinline': True})
connection.close()
#connection.close()
return
if myself != await get_leader(connection):
await tlvdata.send(
connection,
{'error': 'Cannot assimilate, our leader is '
'in another castle', 'leader': currentleader})
connection.close()
#connection.close()
return
if request['txcount'] > cfm._txcount:
retire_as_leader()
tlvdata.send(connection,
await retire_as_leader()
await tlvdata.send(connection,
{'error': 'Client has higher tranasaction count, '
'should assimilate me, connecting..',
'txcount': cfm._txcount})
log.log({'info': 'Connecting to leader due to superior '
'transaction count', 'subsystem': 'collective'})
connection.close() # well this won't work
if not connect_to_leader(
None, None, connection.getpeername()[0]):
cnn = connection[1].transport.get_extra_info('socket')
peername = cnn.getpeername()[0]
cnn.close()
if not await connect_to_leader(
None, None, peername):
if retrythread is None:
retrythread = util.spawn_after(5 + random.random(),
start_collective)
@@ -651,20 +656,22 @@ async def handle_connection(connection, cert, request, local=False):
await tlvdata.send(connection, cfm._dump_keys(None, False))
await tlvdata.send(connection, cfm._cfgstore['collective'])
await tlvdata.send(connection, {'confluent_uuid': cfm.get_global('confluent_uuid')}) # cfm.get_globals())
cfgdata = cfm.ConfigManager(None)._dump_to_json()
cfgdata = await cfm.ConfigManager(None)._dump_to_json()
try:
await tlvdata.send(connection, {'txcount': cfm._txcount,
'dbsize': len(cfgdata)})
connection.sendall(cfgdata)
connection[1].write(cfgdata)
await connection[1].drain()
except Exception:
try:
connection.close()
finally:
raise
return None
#tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now,
# so far unused anyway
connection.settimeout(90)
if not cfm.relay_slaved_requests(drone, connection):
#connection.settimeout(90)
if not await cfm.relay_slaved_requests(drone, connection):
log.log({'info': 'All clients have disconnected, starting recovery process',
'subsystem': 'collective'})
if retrythread is None: # start a recovery if everyone else seems
@@ -698,13 +705,13 @@ async def try_assimilate(drone, followcount, remote):
# Oh well, unable to connect, hopefully the rest will be
# in order
return
tlvdata.send(remote, {'collective': {'operation': 'assimilate',
await tlvdata.send(remote, {'collective': {'operation': 'assimilate',
'name': get_myname(),
'followcount': followcount,
'txcount': cfm._txcount}})
tlvdata.recv(remote) # the banner
tlvdata.recv(remote) # authpassed... 0..
answer = tlvdata.recv(remote)
await tlvdata.recv(remote) # the banner
await tlvdata.recv(remote) # authpassed... 0..
answer = await tlvdata.recv(remote)
if not answer:
log.log(
{'error':
@@ -715,8 +722,8 @@ async def try_assimilate(drone, followcount, remote):
if 'txcount' in answer:
log.log({'info': 'Deferring to {0} due to target being a better leader'.format(
drone), 'subsystem': 'collective'})
retire_as_leader(drone)
if not connect_to_leader(None, None, leader=remote.getpeername()[0]):
await retire_as_leader(drone)
if not await connect_to_leader(None, None, leader=remote.getpeername()[0]):
if retrythread is None:
retrythread = util.spawn_after(random.random(),
start_collective)
@@ -748,12 +755,12 @@ async def get_leader(connection):
await become_leader(connection)
return currentleader
def retire_as_leader(newleader=None):
async def retire_as_leader(newleader=None):
global currentleader
global reassimilate
cfm.stop_leading(newleader)
await cfm.stop_leading(newleader)
if reassimilate is not None:
reassimilate.kill()
reassimilate.cancel()
reassimilate = None
currentleader = None
@@ -769,8 +776,8 @@ async def become_leader(connection):
log.log({'info': 'Becoming leader of collective',
'subsystem': 'collective'})
if follower is not None:
follower.kill()
cfm.stop_following()
follower.cancel()
await cfm.stop_following()
follower = None
if retrythread is not None:
retrythread.cancel()
@@ -779,7 +786,7 @@ async def become_leader(connection):
currentleader = cnn.getsockname()[0]
skipaddr = cnn.getpeername()[0]
if reassimilate is not None:
reassimilate.kill()
reassimilate.cancel()
reassimilate = util.spawn(reassimilate_missing())
cfm._ready = True
if await _assimilate_missing(skipaddr):
@@ -810,9 +817,11 @@ async def _assimilate_missing(skipaddr=None):
connecto.append(dronecandidate)
if not connecto:
return True
connections = []
for ct in connecto:
util.spawn(create_connection(ct))
connections.append(util.spawn(create_connection(ct)))
for ent in connections:
ent = await ent
member, remote = ent
if isinstance(remote, Exception):
continue
@@ -909,22 +918,26 @@ async def start_collective():
if cfm.get_collective_member(member).get('role', None) == 'nonvoting':
continue
if cfm.cfgleader is None:
cfm.stop_following(True)
await cfm.stop_following(True)
ldrcandidate = cfm.get_collective_member(member)['address']
connecto.append(ldrcandidate)
connections = []
for ct in connecto:
util.spawn(create_connection(ct))
for ent in connections:
member, remote = ent
if isinstance(remote, Exception):
continue
if follower is None:
log.log({'info': 'Performing startup attempt to {0}'.format(
member), 'subsystem': 'collective'})
if not await connect_to_leader(name=myname, leader=member, remote=remote):
connections.append(util.spawn(create_connection(ct)))
pnding = connections
while pnding:
rdy, pnding = await asyncio.wait(pnding, return_when=asyncio.FIRST_COMPLETED)
for ent in rdy:
member, remote = await ent
if isinstance(remote, Exception):
continue
if follower is None:
log.log({'info': 'Performing startup attempt to {0}'.format(
member), 'subsystem': 'collective'})
if not await connect_to_leader(name=myname, leader=member, remote=remote):
remote.close()
else:
remote.close()
else:
remote.close()
except Exception as e:
pass
finally:

View File

@@ -57,6 +57,7 @@ try:
except ModuleNotFoundError:
import dbm
import ast
import asyncio
import base64
from binascii import hexlify
import os
@@ -81,11 +82,9 @@ try:
except ModuleNotFoundError:
import pickle as cPickle
import errno
import eventlet
import eventlet.event as event
import eventlet.green.select as select
import eventlet.green.threading as gthread
import eventlet.green.subprocess as subprocess
import socket
import fnmatch
import hashlib
import json
@@ -107,10 +106,10 @@ except NameError:
_masterkey = None
_masterintegritykey = None
_dirtylock = threading.RLock()
_leaderlock = gthread.RLock()
_leaderlock = asyncio.Lock()
_synclock = threading.RLock()
_rpclock = gthread.RLock()
_initlock = gthread.RLock()
_rpclock = asyncio.Lock()
_initlock = asyncio.Lock()
_followerlocks = {}
_config_areas = ('nodegroups', 'nodes', 'usergroups', 'users')
tracelog = None
@@ -132,6 +131,7 @@ _validroles = ['Administrator', 'Operator', 'Monitor', 'Stub']
membership_callback = None
def attrib_supports_expression(attrib):
if not isinstance(attrib, str):
attrib = attrib.decode('utf8')
@@ -206,9 +206,11 @@ def _format_key(key, password=None):
return {"unencryptedvalue": key}
def _do_notifier(cfg, watcher, callback):
async def _do_notifier(cfg, watcher, callback):
try:
callback(nodeattribs=watcher['nodeattrs'], configmanager=cfg)
cbres = callback(nodeattribs=watcher['nodeattrs'], configmanager=cfg)
if cbres is not None:
await cbres
except Exception:
logException()
@@ -338,44 +340,41 @@ def check_quorum():
raise exc.DegradedCollective()
def exec_on_leader(function, *args):
async def exec_on_leader(function, *args):
if isinstance(cfgleader, bool):
raise exc.DegradedCollective()
xid = confluent.util.stringify(base64.b64encode(os.urandom(8)))
while xid in _pendingchangesets:
xid = confluent.util.stringify(base64.b64encode(os.urandom(8)))
_pendingchangesets[xid] = event.Event()
cloop = asyncio.get_event_loop()
_pendingchangesets[xid] = cloop.create_future() # future instead of event
rpcpayload = msgpack.packb({'function': function, 'args': args,
'xid': xid}, use_bin_type=False)
rpclen = len(rpcpayload)
cfgleader.sendall(struct.pack('!Q', rpclen))
cfgleader.sendall(rpcpayload)
retv = _pendingchangesets[xid].wait()
cfgleader[1].write(struct.pack('!Q', rpclen))
cfgleader[1].write(rpcpayload)
await cfgleader[1].drain()
retv = await _pendingchangesets[xid]
del _pendingchangesets[xid]
return retv
def exec_on_followers(fnname, *args):
pushes = eventlet.GreenPool()
async def exec_on_followers(fnname, *args):
# Check health of collective prior to attempting
for _ in pushes.starmap(
_push_rpc, [(cfgstreams[s]['stream'], b'') for s in cfgstreams]):
pass
await asyncio.gather(*[_push_rpc(cfgstreams[s]['stream'], b'') for s in cfgstreams])
if not has_quorum():
# the leader counts in addition to registered streams
raise exc.DegradedCollective()
exec_on_followers_unconditional(fnname, *args)
await exec_on_followers_unconditional(fnname, *args)
def exec_on_followers_unconditional(fnname, *args):
async def exec_on_followers_unconditional(fnname, *args):
global _txcount
pushes = eventlet.GreenPool()
_txcount += 1
payload = msgpack.packb({'function': fnname, 'args': args,
'txcount': _txcount}, use_bin_type=False)
for _ in pushes.starmap(
_push_rpc, [(cfgstreams[s]['stream'], payload) for s in cfgstreams]):
pass
await asyncio.gather(
*[_push_rpc(cfgstreams[s]['stream'], payload) for s in cfgstreams])
def logException():
@@ -387,9 +386,12 @@ def logException():
event=confluent.log.Events.stacktrace)
def _do_add_watcher(watcher, added, configmanager, renamed=()):
async def _do_add_watcher(watcher, added, configmanager, renamed=()):
try:
watcher(added=added, deleting=(), renamed=renamed, configmanager=configmanager)
watched = watcher(added=added, deleting=(), renamed=renamed, configmanager=configmanager)
if watched is None:
return
await watched
except Exception:
logException()
@@ -417,12 +419,14 @@ def init_masterkey(password=None, autogen=True):
# password=password))
def _push_rpc(stream, payload):
with _rpclock:
async def _push_rpc(stream, payload):
async with _rpclock:
try:
stream.sendall(struct.pack('!Q', len(payload)))
stream[1].write(struct.pack('!Q', len(payload)))
if len(payload):
stream.sendall(payload)
stream[1].write(payload)
print("sent payload: " + repr(payload))
await stream[1].drain()
return True
except Exception:
logException()
@@ -664,16 +668,15 @@ def has_quorum():
return voters > allvoters // 2
cfgstreams = {}
def relay_slaved_requests(name, listener):
async def relay_slaved_requests(name, listener):
global cfgleader
global _hasquorum
pushes = eventlet.GreenPool()
if name not in _followerlocks:
_followerlocks[name] = gthread.RLock()
_followerlocks[name] = asyncio.Lock()
meminfo = get_collective_member(name)
with _followerlocks[name]:
async with _followerlocks[name]:
try:
stop_following()
await stop_following()
if name in cfgstreams:
try:
cfgstreams[name]['stream'].close()
@@ -690,15 +693,12 @@ def relay_slaved_requests(name, listener):
if _newquorum is not None:
_hasquorum = _newquorum
payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False)
for _ in pushes.starmap(
_push_rpc,
[(cfgstreams[s]['stream'], payload) for s in cfgstreams]):
pass
await asyncio.gather(*[_push_rpc(cfgstreams[s]['stream'], payload) for s in cfgstreams])
_newquorum = has_quorum()
_hasquorum = _newquorum
if _hasquorum and _pending_collective_updates:
apply_pending_collective_updates()
msg = lh.get_next_msg()
msg = await lh.get_next_msg()
while msg:
if name not in cfgstreams:
raise Exception("Unexpected loss of node in followers: " + name)
@@ -730,7 +730,7 @@ def relay_slaved_requests(name, listener):
if not res:
break
try:
msg = lh.get_next_msg()
msg = await lh.get_next_msg()
except Exception:
msg = None
finally:
@@ -745,23 +745,21 @@ def relay_slaved_requests(name, listener):
if cfgstreams:
_hasquorum = has_quorum()
payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False)
for _ in pushes.starmap(
_push_rpc,
[(cfgstreams[s]['stream'], payload) for s in cfgstreams]):
pass
await asyncio.gather(
*[_push_rpc(cfgstreams[s]['stream'], payload) for s in cfgstreams])
if membership_callback:
membership_callback()
if not cfgstreams and not cfgleader: # last one out, set cfgleader to boolean to mark dead collective
stop_following(True)
await stop_following(True)
return False
return True
lastheartbeat = None
def check_leader():
_push_rpc(cfgleader, b'')
async def check_leader():
await _push_rpc(cfgleader, b'')
tries = 0
while tries < 30:
eventlet.sleep(0.1)
await asyncio.sleep(0.1)
tries += 1
if lastheartbeat and lastheartbeat > (confluent.util.monotonic_time() - 3):
return True
@@ -774,25 +772,38 @@ class StreamHandler(object):
self.expiry = self.keepalive + 40
def get_next_msg(self):
async def get_next_msg(self):
r = (False,)
msg = None
try:
while not r[0]:
r = select.select(
(self.sock,), (), (),
self.keepalive - confluent.util.monotonic_time())
if confluent.util.monotonic_time() > self.expiry:
return None
if confluent.util.monotonic_time() > self.keepalive:
res = _push_rpc(self.sock, b'') # nulls are a keepalive
if not res:
while not msg:
try:
msg = await asyncio.wait_for(self.sock[0].read(), timeout=self.keepalive - confluent.util.monotonic_time())
except TimeoutError:
msg = None
if confluent.util.monotonic_time() > self.expiry:
return None
#TODO: this test can work fine even if the other end is
# gone, go to a more affirmative test to more quickly
# detect outage to peer
self.keepalive = confluent.util.monotonic_time() + 20
self.expiry = confluent.util.monotonic_time() + 60
msg = self.sock.recv(8)
if confluent.util.monotonic_time() > self.keepalive:
res = await _push_rpc(self.sock, b'') # nulls are a keepalive
if not res:
return None
self.keepalive = confluent.util.monotonic_time() + 20
# while not r[0]:
# r = select.select(
# (self.sock,), (), (),
# self.keepalive - confluent.util.monotonic_time())
# if confluent.util.monotonic_time() > self.expiry:
# return None
# if confluent.util.monotonic_time() > self.keepalive:
# res = await _push_rpc(self.sock, b'') # nulls are a keepalive
# if not res:
# return None
# #TODO: this test can work fine even if the other end is
# # gone, go to a more affirmative test to more quickly
# # detect outage to peer
# self.keepalive = confluent.util.monotonic_time() + 20
#self.expiry = confluent.util.monotonic_time() + 60
#msg = self.sock.recv(8)
except Exception as e:
msg = None
return msg
@@ -801,8 +812,8 @@ class StreamHandler(object):
self.sock = None
def stop_following(replacement=None):
with _leaderlock:
async def stop_following(replacement=None):
async with _leaderlock:
global cfgleader
if cfgleader and not isinstance(cfgleader, bool):
try:
@@ -811,14 +822,14 @@ def stop_following(replacement=None):
pass
cfgleader = replacement
def stop_leading(newleader=None):
async def stop_leading(newleader=None):
rpcpayload = None
if newleader is not None:
rpcpayload = msgpack.packb({'newleader': newleader}, use_bin_type=False)
for stream in list(cfgstreams):
try:
if rpcpayload is not None:
_push_rpc(cfgstreams[stream]['stream'], rpcpayload)
await _push_rpc(cfgstreams[stream]['stream'], rpcpayload)
cfgstreams[stream]['stream'].close()
except Exception:
pass
@@ -849,15 +860,15 @@ def rollback_clear():
ConfigManager.wait_for_sync(True)
def clear_configuration():
async def clear_configuration():
global _cfgstore
global _txcount
global _oldcfgstore
global _oldtxcount
global _ready
_ready = False
stop_leading()
stop_following()
await stop_leading()
await stop_following()
_oldcfgstore = _cfgstore
_oldtxcount = _txcount
_cfgstore = {}
@@ -891,23 +902,23 @@ def commit_clear():
cfgleader = None
def follow_channel(channel):
async def follow_channel(channel):
global _txcount
global _hasquorum
global lastheartbeat
try:
stop_leading()
stop_following(channel)
await stop_leading()
await stop_following(channel)
lh = StreamHandler(channel)
msg = lh.get_next_msg()
msg = await lh.get_next_msg()
while msg:
msg, rpc = msg[:8], msg[8:]
sz = struct.unpack('!Q', msg)[0]
if sz == 0:
lastheartbeat = confluent.util.monotonic_time()
else:
rpc = b''
while len(rpc) < sz:
nrpc = channel.recv(sz - len(rpc))
nrpc = await channel[0].read(sz - len(rpc))
if not nrpc:
raise Exception('Truncated message error')
rpc += nrpc
@@ -930,36 +941,36 @@ def follow_channel(channel):
exc = ValueError(excstr)
else:
exc = Exception(excstr)
_pendingchangesets[rpc['xid']].send_exception(exc)
_pendingchangesets[rpc['xid']].set_exception(exc)
else:
_pendingchangesets[rpc['xid']].send(rpc.get('ret', None))
_pendingchangesets[rpc['xid']].set_result(rpc.get('ret', None))
if 'quorum' in rpc:
_hasquorum = rpc['quorum']
res = _push_rpc(channel, b'') # use null as ACK
res = await _push_rpc(channel, b'') # use null as ACK
if not res:
break
msg = lh.get_next_msg()
msg = await lh.get_next_msg()
finally:
# mark the connection as broken
if cfgstreams:
stop_following(None)
await stop_following(None)
else:
stop_following(True)
await stop_following(True)
return {}
def add_collective_member(name, address, fingerprint, role=None):
async def add_collective_member(name, address, fingerprint, role=None):
if cfgleader:
return exec_on_leader('add_collective_member', name, address, fingerprint, role)
return await exec_on_leader('add_collective_member', name, address, fingerprint, role)
if cfgstreams:
exec_on_followers('_true_add_collective_member', name, address, fingerprint, True, role)
await exec_on_followers('_true_add_collective_member', name, address, fingerprint, True, role)
_true_add_collective_member(name, address, fingerprint, role=role)
def del_collective_member(name):
async def del_collective_member(name):
if cfgleader and not isinstance(cfgleader, bool):
return exec_on_leader('del_collective_member', name)
return await exec_on_leader('del_collective_member', name)
if cfgstreams:
exec_on_followers_unconditional('_true_del_collective_member', name)
await exec_on_followers_unconditional('_true_del_collective_member', name)
_true_del_collective_member(name)
def _true_del_collective_member(name, sync=True):
@@ -1290,7 +1301,7 @@ class ConfigManager(object):
self.clientfiles = {}
global _cfgstore
self.inrestore = False
with _initlock:
if True: # with _initlock:
if _cfgstore is None:
init()
self.decrypt = decrypt
@@ -1544,7 +1555,7 @@ class ConfigManager(object):
except KeyError:
return None
def set_usergroup(self, groupname, attributemap):
async def set_usergroup(self, groupname, attributemap):
"""Set usergroup attribute(s)
:param groupname: the name of teh group to modify
@@ -1554,7 +1565,7 @@ class ConfigManager(object):
return exec_on_leader('_rpc_master_set_usergroup', self.tenant,
groupname, attributemap)
if cfgstreams:
exec_on_followers('_rpc_set_usergroup', self.tenant, groupname,
await exec_on_followers('_rpc_set_usergroup', self.tenant, groupname,
attributemap)
self._true_set_usergroup(groupname, attributemap)
@@ -1573,7 +1584,7 @@ class ConfigManager(object):
_mark_dirtykey('usergroups', groupname, self.tenant)
self._bg_sync_to_file()
def create_usergroup(self, groupname, role="Administrator"):
async def create_usergroup(self, groupname, role="Administrator"):
"""Create a new user
:param groupname: The name of the user group
@@ -1585,7 +1596,7 @@ class ConfigManager(object):
return exec_on_leader('_rpc_master_create_usergroup', self.tenant,
groupname, role)
if cfgstreams:
exec_on_followers('_rpc_create_usergroup', self.tenant, groupname,
await exec_on_followers('_rpc_create_usergroup', self.tenant, groupname,
role)
self._true_create_usergroup(groupname, role)
@@ -1609,11 +1620,11 @@ class ConfigManager(object):
_mark_dirtykey('usergroups', groupname, self.tenant)
self._bg_sync_to_file()
def del_usergroup(self, name):
async def del_usergroup(self, name):
if cfgleader:
return exec_on_leader('_rpc_master_del_usergroup', self.tenant, name)
if cfgstreams:
exec_on_followers('_rpc_del_usergroup', self.tenant, name)
await exec_on_followers('_rpc_del_usergroup', self.tenant, name)
self._true_del_usergroup(name)
def _true_del_usergroup(self, name):
@@ -1622,7 +1633,7 @@ class ConfigManager(object):
_mark_dirtykey('usergroups', name, self.tenant)
self._bg_sync_to_file()
def set_user(self, name, attributemap):
async def set_user(self, name, attributemap):
"""Set user attribute(s)
:param name: The login name of the user
@@ -1632,7 +1643,7 @@ class ConfigManager(object):
return exec_on_leader('_rpc_master_set_user', self.tenant, name,
attributemap)
if cfgstreams:
exec_on_followers('_rpc_set_user', self.tenant, name, attributemap)
await exec_on_followers('_rpc_set_user', self.tenant, name, attributemap)
self._true_set_user(name, attributemap)
def _true_set_user(self, name, attributemap):
@@ -1660,11 +1671,11 @@ class ConfigManager(object):
_mark_dirtykey('users', name, self.tenant)
self._bg_sync_to_file()
def del_user(self, name):
async def del_user(self, name):
if cfgleader:
return exec_on_leader('_rpc_master_del_user', self.tenant, name)
if cfgstreams:
exec_on_followers('_rpc_del_user', self.tenant, name)
await exec_on_followers('_rpc_del_user', self.tenant, name)
self._true_del_user(name)
def _true_del_user(self, name):
@@ -1673,7 +1684,7 @@ class ConfigManager(object):
_mark_dirtykey('users', name, self.tenant)
self._bg_sync_to_file()
def create_user(self, name,
async def create_user(self, name,
role="Administrator", uid=None, displayname=None,
attributemap=None):
"""Create a new user
@@ -1689,7 +1700,7 @@ class ConfigManager(object):
return exec_on_leader('_rpc_master_create_user', self.tenant,
name, role, uid, displayname, attributemap)
if cfgstreams:
exec_on_followers('_rpc_create_user', self.tenant, name,
await exec_on_followers('_rpc_create_user', self.tenant, name,
role, uid, displayname, attributemap)
self._true_create_user(name, role, uid, displayname, attributemap)
@@ -1903,7 +1914,7 @@ class ConfigManager(object):
def add_group_attributes(self, attribmap):
self.set_group_attributes(attribmap, autocreate=True)
def set_group_attributes(self, attribmap, autocreate=False):
async def set_group_attributes(self, attribmap, autocreate=False):
for group in attribmap:
curr = attribmap[group]
for attrib in curr:
@@ -1922,7 +1933,7 @@ class ConfigManager(object):
return exec_on_leader('_rpc_master_set_group_attributes',
self.tenant, attribmap, autocreate)
if cfgstreams:
exec_on_followers('_rpc_set_group_attributes', self.tenant,
await exec_on_followers('_rpc_set_group_attributes', self.tenant,
attribmap, autocreate)
self._true_set_group_attributes(attribmap, autocreate)
@@ -2036,12 +2047,12 @@ class ConfigManager(object):
self._notif_attribwatchers(changeset)
self._bg_sync_to_file()
def clear_group_attributes(self, groups, attributes):
async def clear_group_attributes(self, groups, attributes):
if cfgleader:
return exec_on_leader('_rpc_master_clear_group_attributes',
self.tenant, groups, attributes)
if cfgstreams:
exec_on_followers('_rpc_clear_group_attributes', self.tenant,
await exec_on_followers('_rpc_clear_group_attributes', self.tenant,
groups, attributes)
self._true_clear_group_attributes(groups, attributes)
@@ -2155,23 +2166,25 @@ class ConfigManager(object):
for watcher in notifdata:
watcher = notifdata[watcher]
callback = watcher['callback']
eventlet.spawn_n(_do_notifier, self, watcher, callback)
confluent.util.spawn(_do_notifier(self, watcher, callback))
def del_nodes(self, nodes):
async def del_nodes(self, nodes):
if isinstance(nodes, set):
nodes = list(nodes) # msgpack can't handle set
if cfgleader: # slaved to a collective
return exec_on_leader('_rpc_master_del_nodes', self.tenant,
nodes)
return await exec_on_leader('_rpc_master_del_nodes', self.tenant,
nodes)
if cfgstreams:
exec_on_followers('_rpc_del_nodes', self.tenant, nodes)
self._true_del_nodes(nodes)
await exec_on_followers('_rpc_del_nodes', self.tenant, nodes)
await self._true_del_nodes(nodes)
def _true_del_nodes(self, nodes):
async def _true_del_nodes(self, nodes):
if self.tenant in self._nodecollwatchers:
for watcher in self._nodecollwatchers[self.tenant]:
watcher = self._nodecollwatchers[self.tenant][watcher]
watcher(added=(), deleting=nodes, renamed=(), configmanager=self)
watched = watcher(added=(), deleting=nodes, renamed=(), configmanager=self)
if watched is not None:
await watched
changeset = {}
for node in nodes:
# set a reserved attribute for the sake of the change notification
@@ -2186,12 +2199,12 @@ class ConfigManager(object):
self._notif_attribwatchers(changeset)
self._bg_sync_to_file()
def del_groups(self, groups):
async def del_groups(self, groups):
if cfgleader:
return exec_on_leader('_rpc_master_del_groups', self.tenant,
groups)
if cfgstreams:
exec_on_followers('_rpc_del_groups', self.tenant, groups)
await exec_on_followers('_rpc_del_groups', self.tenant, groups)
self._true_del_groups(groups)
def _true_del_groups(self, groups):
@@ -2205,7 +2218,7 @@ class ConfigManager(object):
self._notif_attribwatchers(changeset)
self._bg_sync_to_file()
def clear_node_attributes(self, nodes, attributes, warnings=None):
async def clear_node_attributes(self, nodes, attributes, warnings=None):
if cfgleader:
mywarnings = exec_on_leader('_rpc_master_clear_node_attributes',
self.tenant, nodes, attributes)
@@ -2213,7 +2226,7 @@ class ConfigManager(object):
warnings.extend(mywarnings)
return
if cfgstreams:
exec_on_followers('_rpc_clear_node_attributes', self.tenant,
await exec_on_followers('_rpc_clear_node_attributes', self.tenant,
nodes, attributes)
self._true_clear_node_attributes(nodes, attributes, warnings)
@@ -2261,15 +2274,15 @@ class ConfigManager(object):
self._notif_attribwatchers(changeset)
self._bg_sync_to_file()
def add_node_attributes(self, attribmap):
self.set_node_attributes(attribmap, autocreate=True)
async def add_node_attributes(self, attribmap):
await self.set_node_attributes(attribmap, autocreate=True)
def rename_nodes(self, renamemap):
async def rename_nodes(self, renamemap):
if cfgleader:
return exec_on_leader('_rpc_master_rename_nodes', self.tenant,
renamemap)
if cfgstreams:
exec_on_followers('_rpc_rename_nodes', self.tenant, renamemap)
await exec_on_followers('_rpc_rename_nodes', self.tenant, renamemap)
self._true_rename_nodes(renamemap)
def _true_rename_nodes(self, renamemap):
@@ -2309,14 +2322,14 @@ class ConfigManager(object):
nodecollwatchers = self._nodecollwatchers[self.tenant]
for watcher in nodecollwatchers:
watcher = nodecollwatchers[watcher]
eventlet.spawn_n(_do_add_watcher, watcher, (), self, renamemap)
confluent.util.spawn(_do_add_watcher(watcher, (), self, renamemap))
self._bg_sync_to_file()
def rename_nodegroups(self, renamemap):
async def rename_nodegroups(self, renamemap):
if cfgleader:
return exec_on_leader('_rpc_master_rename_nodegroups', self.tenant, renamemap)
if cfgstreams:
exec_on_followers('_rpc_rename_nodegroups', self.tenant, renamemap)
await exec_on_followers('_rpc_rename_nodegroups', self.tenant, renamemap)
self._true_rename_groups(renamemap)
def _true_rename_groups(self, renamemap):
@@ -2349,7 +2362,7 @@ class ConfigManager(object):
def set_node_attributes(self, attribmap, autocreate=False):
async def set_node_attributes(self, attribmap, autocreate=False):
for node in attribmap:
curr = attribmap[node]
for attrib in curr:
@@ -2368,14 +2381,11 @@ class ConfigManager(object):
return exec_on_leader('_rpc_master_set_node_attributes',
self.tenant, attribmap, autocreate)
if cfgstreams:
exec_on_followers('_rpc_set_node_attributes',
await exec_on_followers('_rpc_set_node_attributes',
self.tenant, attribmap, autocreate)
self._true_set_node_attributes(attribmap, autocreate)
def _true_set_node_attributes(self, attribmap, autocreate):
# TODO(jbjohnso): multi mgr support, here if we have peers,
# pickle the arguments and fire them off in eventlet
# flows to peers, all should have the same result
newnodes = []
changeset = {}
# first do a sanity check of the input upfront
@@ -2499,18 +2509,18 @@ class ConfigManager(object):
nodecollwatchers = self._nodecollwatchers[self.tenant]
for watcher in nodecollwatchers:
watcher = nodecollwatchers[watcher]
eventlet.spawn_n(_do_add_watcher, watcher, newnodes, self)
confluent.util.spawn(_do_add_watcher(watcher, newnodes, self))
self._bg_sync_to_file()
#TODO: wait for synchronization to suceed/fail??)
def _load_from_json(self, jsondata, sync=True):
async def _load_from_json(self, jsondata, sync=True):
self.inrestore = True
try:
self._load_from_json_backend(jsondata, sync=True)
await self._load_from_json_backend(jsondata, sync=True)
finally:
self.inrestore = False
def _load_from_json_backend(self, jsondata, sync=True):
async def _load_from_json_backend(self, jsondata, sync=True):
"""Load fresh configuration data from jsondata
:param jsondata: String of jsondata
@@ -2569,13 +2579,13 @@ class ConfigManager(object):
if confarea not in tmpconfig:
continue
if confarea == 'nodes':
self.set_node_attributes(tmpconfig[confarea], True)
await self.set_node_attributes(tmpconfig[confarea], True)
elif confarea == 'nodegroups':
self.set_group_attributes(tmpconfig[confarea], True)
await self.set_group_attributes(tmpconfig[confarea], True)
elif confarea == 'usergroups':
for usergroup in tmpconfig[confarea]:
role = tmpconfig[confarea][usergroup].get('role', 'Administrator')
self.create_usergroup(usergroup, role=role)
await self.create_usergroup(usergroup, role=role)
elif confarea == 'users':
for user in tmpconfig[confarea]:
ucfg = tmpconfig[confarea][user]
@@ -2587,7 +2597,7 @@ class ConfigManager(object):
uid = uid.encode('utf8')
displayname = ucfg.get('displayname', None)
role = ucfg.get('role', None)
self.create_user(user, uid=uid, displayname=displayname, role=role)
await self.create_user(user, uid=uid, displayname=displayname, role=role)
for attrname in ('authid', 'authenticators', 'cryptpass'):
if attrname in tmpconfig[confarea][user]:
self._cfgstore['users'][user][attrname] = tmpconfig[confarea][user][attrname]
@@ -2595,7 +2605,7 @@ class ConfigManager(object):
if sync:
self._bg_sync_to_file()
def _dump_to_json(self, redact=None):
async def _dump_to_json(self, redact=None):
"""Dump the configuration in json form to output
password is used to protect the 'secret' attributes in liue of the
@@ -2609,11 +2619,11 @@ class ConfigManager(object):
"""
with open(os.devnull, 'w+') as devnull:
worker = subprocess.Popen(
[sys.executable, __file__, '-r' if redact else ''],
stdin=devnull, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = worker.communicate()
worker = await asyncio.create_subprocess_exec(
sys.executable, __file__, '-r' if redact else '',
stdin=devnull, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await worker.communicate()
return stdout
def _real_dump_to_json(self, redact=None):
@@ -2911,7 +2921,7 @@ def _dump_keys(password, dojson=True):
return keydata
def restore_db_from_directory(location, password):
async def restore_db_from_directory(location, password):
try:
with open(os.path.join(location, 'keys.json'), 'r') as cfgfile:
keydata = cfgfile.read()
@@ -2939,7 +2949,7 @@ def restore_db_from_directory(location, password):
raise
with open(os.path.join(location, 'main.json'), 'r') as cfgfile:
cfgdata = cfgfile.read()
ConfigManager(tenant=None)._load_from_json(cfgdata)
await ConfigManager(tenant=None)._load_from_json(cfgdata)
ConfigManager.wait_for_sync(True)

View File

@@ -306,7 +306,7 @@ class ConsoleHandler(object):
def _disconnect(self):
if self.connectionthread:
self.connectionthread.kill()
self.connectionthread.cancel()
self.connectionthread = None
# clear the terminal buffer when disconnected
self.clearbuffer()
@@ -328,7 +328,7 @@ class ConsoleHandler(object):
if not self._is_local:
return
if self.connectionthread:
self.connectionthread.kill()
self.connectionthread.cancel()
self.connectionthread = None
self.connectionthread = util.spawn(self._connect_backend())

View File

@@ -678,12 +678,12 @@ def delete_nodegroup_collection(collectionpath, configmanager):
raise Exception("Not implemented")
def delete_node_collection(collectionpath, configmanager, isnoderange):
async def delete_node_collection(collectionpath, configmanager, isnoderange):
if len(collectionpath) == 2: # just node
nodes = [collectionpath[-1]]
if isnoderange:
nodes = noderange.NodeRange(nodes[0], configmanager).nodes
configmanager.del_nodes(nodes)
await configmanager.del_nodes(nodes)
for node in nodes:
yield msg.DeletedResource(node)
else:
@@ -734,7 +734,7 @@ def create_group(inputdata, configmanager):
yield msg.CreatedResource(groupname)
def create_node(inputdata, configmanager):
async def create_node(inputdata, configmanager):
try:
nodename = inputdata['name']
if ' ' in nodename:
@@ -744,7 +744,7 @@ def create_node(inputdata, configmanager):
except KeyError:
raise exc.InvalidArgumentException('name not specified')
try:
configmanager.add_node_attributes(attribmap)
await configmanager.add_node_attributes(attribmap)
except ValueError as e:
raise exc.InvalidArgumentException(str(e))
yield msg.CreatedResource(nodename)
@@ -760,7 +760,7 @@ async def create_noderange(inputdata, configmanager):
except KeyError:
raise exc.InvalidArgumentException('name not specified')
try:
configmanager.add_node_attributes(attribmap)
await configmanager.add_node_attributes(attribmap)
except ValueError as e:
raise exc.InvalidArgumentException(str(e))
for node in attribmap:
@@ -1021,7 +1021,7 @@ async def handle_node_request(configmanager, inputdata, operation,
if iscollection:
if operation == "delete":
return delete_node_collection(pathcomponents, configmanager,
isnoderange)
isnoderange)
elif operation == "retrieve":
return enumerate_node_collection(pathcomponents, configmanager)
else:

View File

@@ -647,7 +647,7 @@ def detected_models():
yield info['modelnumber']
def _recheck_nodes(nodeattribs, configmanager):
async def _recheck_nodes(nodeattribs, configmanager):
if not cfm.config_is_ready():
return
if rechecklock.locked():
@@ -1538,11 +1538,11 @@ nodeaddhandler = None
needaddhandled = False
def _handle_nodelist_change(configmanager):
async def _handle_nodelist_change(configmanager):
global needaddhandled
global nodeaddhandler
macmap.vintage = 0 # the current mac map is probably inaccurate
_recheck_nodes((), configmanager)
await _recheck_nodes((), configmanager)
if needaddhandled:
needaddhandled = False
nodeaddhandler = eventlet.spawn(_handle_nodelist_change, configmanager)
@@ -1550,7 +1550,7 @@ def _handle_nodelist_change(configmanager):
nodeaddhandler = None
def newnodes(added, deleting, renamed, configmanager):
async def newnodes(added, deleting, renamed, configmanager):
global attribwatcher
global needaddhandled
global nodeaddhandler
@@ -1573,7 +1573,7 @@ def newnodes(added, deleting, renamed, configmanager):
if nodeaddhandler:
needaddhandled = True
else:
nodeaddhandler = eventlet.spawn(_handle_nodelist_change, configmanager)
nodeaddhandler = util.spawn(_handle_nodelist_change(configmanager))