diff --git a/confluent_client/confluent/client.py b/confluent_client/confluent/client.py index f1776ef8..210d9025 100644 --- a/confluent_client/confluent/client.py +++ b/confluent_client/confluent/client.py @@ -742,7 +742,7 @@ async def updateattrib(session, updateargs, nodetype, noderange, options, dictas value = os.environ.get( key, os.environ[key.upper()]) if (nodetype == "nodegroups"): - exitcode = await ession.simple_nodegroups_command(noderange, + exitcode = await session.simple_nodegroups_command(noderange, 'attributes/all', value, key) else: diff --git a/confluent_server/confluent/collective/invites.py b/confluent_server/confluent/collective/invites.py index 71693b1d..ae0e20a6 100644 --- a/confluent_server/confluent/collective/invites.py +++ b/confluent_server/confluent/collective/invites.py @@ -41,7 +41,7 @@ def check_server_proof(invitation, mycert, peercert, proof): def check_client_proof(servername, mycert, peercert, proof): servername = servername.encode('utf-8') if servername not in pending_invites: - return False + return False, None invitation = pending_invites[servername] role = invitation['role'] invitation = invitation['invitation'] diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 77239d66..393b1bc7 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import base64 import confluent.collective.invites as invites import confluent.config.configmanager as cfm @@ -22,12 +23,11 @@ import confluent.log as log import confluent.noderange as noderange import confluent.tlvdata as tlvdata import confluent.util as util -import eventlet -import eventlet.greenpool as greenpool -import eventlet.green.socket as socket -import eventlet.green.ssl as ssl -import eventlet.green.threading as threading +import socket +import ssl import confluent.sortutil as sortutil +import ctypes +import ctypes.util import greenlet import random import time @@ -36,6 +36,18 @@ import sys import OpenSSL.crypto as crypto +class PyObject_HEAD(ctypes.Structure): + _fields_ = [ + ("ob_refcnt", ctypes.c_ssize_t), + ("ob_type", ctypes.c_void_p), + ] + +class PySSLContext(ctypes.Structure): + _fields_ = [ + ("ob_base", PyObject_HEAD), + ("ctx", ctypes.c_void_p), + ] + currentleader = None follower = None retrythread = None @@ -43,24 +55,34 @@ failovercheck = None initting = True reassimilate = None +libssl = ctypes.CDLL(ctypes.util.find_library('ssl')) +libssl.SSL_CTX_set_cert_verify_callback.argtypes = [ + ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] + + +@ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p) +def verify_stub(store, misc): + return 1 + + class ContextBool(object): def __init__(self): self.active = False - self.mylock = threading.RLock() + self.mylock = asyncio.Lock() - def __enter__(self): + async def __aenter__(self): self.active = True - self.mylock.__enter__() + return await self.mylock.__aenter__() - def __exit__(self, exc_type, exc_val, exc_tb): + async def __aexit__(self, exc_type, exc_val, exc_tb): self.active = False - self.mylock.__exit__(exc_type, exc_val, exc_tb) + return await self.mylock.__aexit__(exc_type, exc_val, exc_tb) connecting = ContextBool() leader_init = ContextBool() enrolling = ContextBool() -def connect_to_leader(cert=None, name=None, leader=None, remote=None, isretry=False): +async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isretry=False): global currentleader global follower ocert = cert @@ -73,27 +95,28 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None, isretry=Fa log.log({'info': 'Attempting connection to leader {0}'.format(leader), 'subsystem': 'collective'}) try: - remote = connect_to_collective(cert, leader, remote) + remote = await connect_to_collective(cert, leader, remote) except Exception as e: log.log({'error': 'Collective connection attempt to {0} failed: {1}' ''.format(leader, str(e)), 'subsystem': 'collective'}) return False - with connecting: + async with connecting: with cfm._initlock: - banner = tlvdata.recv(remote) # the banner + # remote is a socket... + banner = await tlvdata.recv(remote) # the banner if not banner: return vers = banner.split()[2] if vers != b'v4': raise Exception('This instance only supports protocol 4, synchronize versions between collective members') - tlvdata.recv(remote) # authpassed... 0.. + await tlvdata.recv(remote) # authpassed... 0.. if name is None: name = get_myname() - tlvdata.send(remote, {'collective': {'operation': 'connect', + await tlvdata.send(remote, {'collective': {'operation': 'connect', 'name': name, 'txcount': cfm._txcount}}) - keydata = tlvdata.recv(remote) + keydata = await tlvdata.recv(remote) if not keydata: return False if 'error' in keydata: @@ -104,7 +127,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None, isretry=Fa 'subsystem': 'collective'}) return False if 'waitinline' in keydata: - eventlet.sleep(0.3) + await asyncio.sleep(0.3) return connect_to_leader(cert, name, leader, None, isretry=True) if 'leader' in keydata: if keydata['leader'] == None: @@ -125,7 +148,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None, isretry=Fa 'transaction count, becoming leader' ''.format(leader), 'subsystem': 'collective', 'subsystem': 'collective'}) - return become_leader(remote) + return await become_leader(remote) return False follower.kill() cfm.stop_following() @@ -136,9 +159,9 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None, isretry=Fa follower = None log.log({'info': 'Following leader {0}'.format(leader), 'subsystem': 'collective'}) - colldata = tlvdata.recv(remote) - globaldata = tlvdata.recv(remote) - dbi = tlvdata.recv(remote) + colldata = await tlvdata.recv(remote) + globaldata = await tlvdata.recv(remote) + dbi = await tlvdata.recv(remote) dbsize = dbi['dbsize'] dbjson = b'' while (len(dbjson) < dbsize): @@ -172,7 +195,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None, isretry=Fa currentleader = leader #spawn this as a thread... remote.settimeout(90) - follower = eventlet.spawn(follow_leader, remote, leader) + follower = util.spawn(follow_leader(remote, leader)) return True @@ -208,25 +231,48 @@ def follow_leader(remote, leader): cfm.stop_following() currentleader = None if retrythread is None: # start a recovery - retrythread = eventlet.spawn_after( + retrythread = util.spawn_after( random.random(), start_collective) -def create_connection(member): +async def _create_tls_connection(host, port): + cloop = asyncio.get_event_loop() + ainfo = await cloop.getaddrinfo( + host, port, family=socket.AF_UNSPEC, type=socket.SOCK_STREAM) + for res in ainfo: + af, socktype, proto, canonname, sa = res + remote = socket.socket(af, socktype, proto) + remote.setsockopt( + socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + remote.settimeout(0) + await cloop.sock_connect(remote, sa) + break + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ssl_ctx = PySSLContext.from_address(id(ctx)).ctx + libssl.SSL_CTX_set_cert_verify_callback(ssl_ctx, verify_stub, 0) + ctx.load_cert_chain('/etc/confluent/srvcert.pem', '/etc/confluent/privkey.pem') + sreader = asyncio.StreamReader() + sreaderprot = asyncio.StreamReaderProtocol(sreader) + tport, _ = await cloop.create_connection( + lambda: sreaderprot, sock=remote, ssl=ctx, server_hostname='x') + swriter = asyncio.StreamWriter(tport, sreaderprot, sreader, cloop) + return (sreader, swriter) + + +async def create_connection(member): remote = None try: - remote = socket.create_connection((member, 13001), 2) - remote.settimeout(15) + remote = await _create_tls_connection(member, 13001) + #remote = socket.create_connection((member, 13001), 2) + #remote.settimeout(15) # TLS cert validation is custom and will not pass normal CA vetting # to override completely in the right place requires enormous effort, so just defer until after connect - remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', - certfile='/etc/confluent/srvcert.pem') except Exception as e: return member, e return member, remote -def connect_to_collective(cert, member, remote=None): +async def connect_to_collective(cert, member, remote=None): if remote is None: - _, remote = create_connection(member) + _, remote = await create_connection(member) if isinstance(remote, Exception): raise remote if cert: @@ -234,7 +280,8 @@ def connect_to_collective(cert, member, remote=None): else: collent = cfm.get_collective_member_by_address(member) fprint = collent['fingerprint'] - if not util.cert_matches(fprint, remote.getpeercert(binary_form=True)): + cnn = remote[1].transport.get_extra_info('ssl_object') + if not util.cert_matches(fprint, cnn.getpeercert(binary_form=True)): # probably Janeway up to something raise Exception("Certificate mismatch in the collective") return remote @@ -263,11 +310,8 @@ async def handle_connection(connection, cert, request, local=False): global currentleader global retrythread global initting - connection.settimeout(5) operation = request['operation'] - if cert: - cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) - else: + if not cert: if not local: return if operation in ('show', 'delete'): @@ -295,7 +339,7 @@ async def handle_connection(connection, cert, request, local=False): await tlvdata.send(connection, {'error': 'Invalid certificate, ' 'redo invitation process'}) - connection.close() + # connection.close() return await tlvdata.recv(remote) # ignore banner await tlvdata.recv(remote) # ignore authpassed: 0 @@ -333,7 +377,7 @@ async def handle_connection(connection, cert, request, local=False): try: cfm.check_quorum() except exc.DegradedCollective: - tlvdata.send(connection, + await tlvdata.send(connection, {'collective': {'error': 'Collective does not have quorum'}}) return @@ -360,24 +404,46 @@ async def handle_connection(connection, cert, request, local=False): return host = request['server'] try: - remote = socket.create_connection((host, 13001), 15) + cloop = asyncio.get_event_loop() + ainfo = await cloop.getaddrinfo( + host, 13001, family=socket.AF_UNSPEC, type=socket.SOCK_STREAM) + for res in ainfo: + af, socktype, proto, canonname, sa = res + remote = socket.socket(af, socktype, proto) + remote.setsockopt( + socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + remote.settimeout(0) + await cloop.sock_connect(remote, sa) + break + #remote = socket.create_connection((host, 13001), 15) # This isn't what it looks like. We do CERT_NONE to disable # openssl verification, but then use the invitation as a # shared secret to validate the certs as part of the join # operation - remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, - keyfile='/etc/confluent/privkey.pem', - certfile='/etc/confluent/srvcert.pem') + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ssl_ctx = PySSLContext.from_address(id(ctx)).ctx + libssl.SSL_CTX_set_cert_verify_callback(ssl_ctx, verify_stub, 0) + ctx.load_cert_chain('/etc/confluent/srvcert.pem', '/etc/confluent/privkey.pem') + sreader = asyncio.StreamReader() + sreaderprot = asyncio.StreamReaderProtocol(sreader) + tport, _ = await cloop.create_connection( + lambda: sreaderprot, sock=remote, ssl=ctx, server_hostname='x') + swriter = asyncio.StreamWriter(tport, sreaderprot, sreader, cloop) + remote = (sreader, swriter) + #remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, + # keyfile='/etc/confluent/privkey.pem', + # certfile='/etc/confluent/srvcert.pem') except Exception: await tlvdata.send( connection, {'collective': {'status': 'Failed to connect to {0}'.format(host)}}) connection.close() + raise return mycert = util.get_certificate_from_file( '/etc/confluent/srvcert.pem') - cert = remote.getpeercert(binary_form=True) + cert = tport.get_extra_info('ssl_object').getpeercert(binary_form=True) proof = base64.b64encode(invites.create_client_proof( invitation, mycert, cert)) await tlvdata.recv(remote) # ignore banner @@ -386,39 +452,37 @@ async def handle_connection(connection, cert, request, local=False): 'name': name, 'hmac': proof}}) rsp = await tlvdata.recv(remote) if 'error' in rsp: - tlvdata.send(connection, {'collective': + await tlvdata.send(connection, {'collective': {'status': rsp['error']}}) - connection.close() return proof = rsp['collective']['approval'] proof = base64.b64decode(proof) j = invites.check_server_proof(invitation, mycert, cert, proof) + print(repr(j)) if not j: remote.close() await tlvdata.send(connection, {'collective': {'status': 'Bad server token'}}) - connection.close() return - tlvdata.send(connection, {'collective': {'status': 'Success'}}) - connection.close() + await tlvdata.send(connection, {'collective': {'status': 'Success'}}) + # connection.close() currentleader = rsp['collective']['leader'] f = open('/etc/confluent/cfg/myname', 'w') f.write(name) f.close() log.log({'info': 'Connecting to collective due to join', 'subsystem': 'collective'}) - eventlet.spawn_n(connect_to_leader, rsp['collective'][ - 'fingerprint'], name) + util.spawn(connect_to_leader(rsp['collective'][ + 'fingerprint'], name)) if 'enroll' == operation: - with enrolling: + async with enrolling: cfm.check_quorum() mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') proof = base64.b64decode(request['hmac']) myrsp, role = invites.check_client_proof(request['name'], mycert, cert, proof) if not myrsp: - tlvdata.send(connection, {'error': 'Invalid token'}) - connection.close() + await tlvdata.send(connection, {'error': 'Invalid token'}) return if not list(cfm.list_collective()): # First enrollment of a collective, since the collective doesn't @@ -431,23 +495,23 @@ async def handle_connection(connection, cert, request, local=False): iam = cfm.get_collective_member(get_myname()) if not iam: cfm.add_collective_member(get_myname(), - connection.getsockname()[0], myfprint) + connection[1].transport.get_extra_info('socket').getsockname()[0], myfprint) cfm.add_collective_member(request['name'], - connection.getpeername()[0], fprint, role) - myleader = get_leader(connection) + connection[1].transport.get_extra_info('socket').getpeername()[0], fprint, role) + myleader = await get_leader(connection) ldrfprint = cfm.get_collective_member_by_address( myleader)['fingerprint'] - tlvdata.send(connection, + await tlvdata.send(connection, {'collective': {'approval': myrsp, 'fingerprint': ldrfprint, - 'leader': get_leader(connection)}}) + 'leader': await get_leader(connection)}}) havequorum = False while not havequorum: try: cfm.check_quorum() havequorum = True except exc.DegradedCollective: - eventlet.sleep(0.1) + await asyncio.sleep(0.1) if 'assimilate' == operation: drone = request['name'] droneinfo = cfm.get_collective_member(drone) @@ -518,44 +582,45 @@ async def handle_connection(connection, cert, request, local=False): connection.close() if not connect_to_leader(None, None, leader=newleader): if retrythread is None: - retrythread = eventlet.spawn_after(random.random(), + retrythread = util.spawn_after(random.random(), start_collective) if 'getinfo' == operation: drone = request['name'] droneinfo = cfm.get_collective_member(drone) if not (droneinfo and util.cert_matches(droneinfo['fingerprint'], cert)): - tlvdata.send(connection, + await tlvdata.send(connection, {'error': 'Invalid certificate, ' 'redo invitation process'}) connection.close() return collinfo = {} populate_collinfo(collinfo) - tlvdata.send(connection, collinfo) + await tlvdata.send(connection, collinfo) if 'connect' == operation: drone = request['name'] droneinfo = cfm.get_collective_member(drone) if not (droneinfo and util.cert_matches(droneinfo['fingerprint'], cert)): - tlvdata.send(connection, + await tlvdata.send(connection, {'error': 'Invalid certificate, ' 'redo invitation process'}) connection.close() return - myself = connection.getsockname()[0] + cnn = connection[1].transport.get_extra_info('socket') + myself = cnn.getsockname()[0] if connecting.active or initting: - tlvdata.send(connection, {'error': 'Connecting right now', + await tlvdata.send(connection, {'error': 'Connecting right now', 'backoff': True}) connection.close() return if leader_init.active: - tlvdata.send(connection, {'error': 'Servicing a connection', + await tlvdata.send(connection, {'error': 'Servicing a connection', 'waitinline': True}) connection.close() return - if myself != get_leader(connection): - tlvdata.send( + if myself != await get_leader(connection): + await tlvdata.send( connection, {'error': 'Cannot assimilate, our leader is ' 'in another castle', 'leader': currentleader}) @@ -573,22 +638,23 @@ async def handle_connection(connection, cert, request, local=False): if not connect_to_leader( None, None, connection.getpeername()[0]): if retrythread is None: - retrythread = eventlet.spawn_after(5 + random.random(), + retrythread = util.spawn_after(5 + random.random(), start_collective) return if retrythread is not None: retrythread.cancel() retrythread = None - with leader_init: + async with leader_init: + cnn = connection[1].get_extra_info('socket') cfm.update_collective_address(request['name'], - connection.getpeername()[0]) - tlvdata.send(connection, cfm._dump_keys(None, False)) - tlvdata.send(connection, cfm._cfgstore['collective']) - tlvdata.send(connection, {'confluent_uuid': cfm.get_global('confluent_uuid')}) # cfm.get_globals()) + cnn.getpeername()[0]) + await tlvdata.send(connection, cfm._dump_keys(None, False)) + await tlvdata.send(connection, cfm._cfgstore['collective']) + await tlvdata.send(connection, {'confluent_uuid': cfm.get_global('confluent_uuid')}) # cfm.get_globals()) cfgdata = cfm.ConfigManager(None)._dump_to_json() try: - tlvdata.send(connection, {'txcount': cfm._txcount, - 'dbsize': len(cfgdata)}) + await tlvdata.send(connection, {'txcount': cfm._txcount, + 'dbsize': len(cfgdata)}) connection.sendall(cfgdata) except Exception: try: @@ -603,7 +669,7 @@ async def handle_connection(connection, cert, request, local=False): 'subsystem': 'collective'}) if retrythread is None: # start a recovery if everyone else seems # to have disappeared - retrythread = eventlet.spawn_after(5 + random.random(), + retrythread = util.spawn_after(5 + random.random(), start_collective) # ok, we have a connecting member whose certificate checks out # He needs to bootstrap his configuration and subscribe it to updates @@ -624,10 +690,10 @@ def populate_collinfo(collinfo): collinfo['nonvoting'].append(member) -def try_assimilate(drone, followcount, remote): +async def try_assimilate(drone, followcount, remote): global retrythread try: - remote = connect_to_collective(None, drone, remote) + remote = await connect_to_collective(None, drone, remote) except socket.error: # Oh well, unable to connect, hopefully the rest will be # in order @@ -652,7 +718,7 @@ def try_assimilate(drone, followcount, remote): retire_as_leader(drone) if not connect_to_leader(None, None, leader=remote.getpeername()[0]): if retrythread is None: - retrythread = eventlet.spawn_after(random.random(), + retrythread = util.spawn_after(random.random(), start_collective) return False if 'leader' in answer: @@ -669,8 +735,9 @@ def try_assimilate(drone, followcount, remote): return True -def get_leader(connection): - if currentleader is None or connection.getpeername()[0] == currentleader: +async def get_leader(connection): + cnn = connection[1].transport.get_extra_info('socket') + if currentleader is None or cnn.getpeername()[0] == currentleader: # cancel retry if a retry is pending if currentleader is None: msg = 'Becoming leader as no leader known' @@ -678,7 +745,7 @@ def get_leader(connection): msg = 'Becoming leader because {0} attempted to connect and it ' \ 'is current leader'.format(currentleader) log.log({'info': msg, 'subsystem': 'collective'}) - become_leader(connection) + await become_leader(connection) return currentleader def retire_as_leader(newleader=None): @@ -690,7 +757,7 @@ def retire_as_leader(newleader=None): reassimilate = None currentleader = None -def become_leader(connection): +async def become_leader(connection): global currentleader global follower global retrythread @@ -708,26 +775,27 @@ def become_leader(connection): if retrythread is not None: retrythread.cancel() retrythread = None - currentleader = connection.getsockname()[0] - skipaddr = connection.getpeername()[0] + cnn = connection[1].transport.get_extra_info('socket') + currentleader = cnn.getsockname()[0] + skipaddr = cnn.getpeername()[0] if reassimilate is not None: reassimilate.kill() - reassimilate = eventlet.spawn(reassimilate_missing) + reassimilate = util.spawn(reassimilate_missing()) cfm._ready = True - if _assimilate_missing(skipaddr): + if await _assimilate_missing(skipaddr): schedule_rebalance() -def reassimilate_missing(): - eventlet.sleep(30) +async def reassimilate_missing(): + await asyncio.sleep(30) while True: try: - _assimilate_missing() + await _assimilate_missing() except Exception as e: cfm.logException() - eventlet.sleep(30) + await asyncio.sleep(30) -def _assimilate_missing(skipaddr=None): +async def _assimilate_missing(skipaddr=None): connecto = [] myname = get_myname() skipem = set(cfm.cfgstreams) @@ -742,13 +810,13 @@ def _assimilate_missing(skipaddr=None): connecto.append(dronecandidate) if not connecto: return True - conpool = greenpool.GreenPool(64) - connections = conpool.imap(create_connection, connecto) + for ct in connecto: + util.spawn(create_connection(ct)) for ent in connections: member, remote = ent if isinstance(remote, Exception): continue - if not try_assimilate(member, numfollowers, remote): + if not await try_assimilate(member, numfollowers, remote): return False return True @@ -758,9 +826,9 @@ def startup(): if len(members) < 2: # Not in collective mode, return return - eventlet.spawn_n(start_collective) + util.spawn(start_collective()) -def check_managers(): +async def check_managers(): global failovercheck if not follower: try: @@ -803,16 +871,16 @@ def check_managers(): continue c.set_node_attributes({node: {'collective.manager': {'value': targets[0]}}}) availmanagers[targets[0]] += 1 - _assimilate_missing() + await _assimilate_missing() failovercheck = None def schedule_rebalance(): global failovercheck if not failovercheck: failovercheck = True - failovercheck = eventlet.spawn_after(10, check_managers) + failovercheck = util.spawn_after(10, check_managers) -def start_collective(): +async def start_collective(): global follower global retrythread global initting @@ -844,8 +912,8 @@ def start_collective(): cfm.stop_following(True) ldrcandidate = cfm.get_collective_member(member)['address'] connecto.append(ldrcandidate) - conpool = greenpool.GreenPool(64) - connections = conpool.imap(create_connection, connecto) + for ct in connecto: + util.spawn(create_connection(ct)) for ent in connections: member, remote = ent if isinstance(remote, Exception): @@ -853,7 +921,7 @@ def start_collective(): if follower is None: log.log({'info': 'Performing startup attempt to {0}'.format( member), 'subsystem': 'collective'}) - if not connect_to_leader(name=myname, leader=member, remote=remote): + if not await connect_to_leader(name=myname, leader=member, remote=remote): remote.close() else: remote.close() @@ -861,6 +929,7 @@ def start_collective(): pass finally: if retrythread is None and follower is None: - retrythread = eventlet.spawn_after(5 + random.random(), + #retrythread = asyncio.create_task(start_collective()) + retrythread = util.spawn_after(5 + random.random(), start_collective) initting = False diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 9bdd94b2..accd5373 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -416,6 +416,7 @@ async def _tlsstartup(cnn): ctx.options |= ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 ctx.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 ctx.options |= ssl.OP_CIPHER_SERVER_PREFERENCE + ctx.verify_mode = ssl.CERT_OPTIONAL ctx.set_ciphers(ciphers) ctx.load_cert_chain('/etc/confluent/srvcert.pem', '/etc/confluent/privkey.pem') diff --git a/confluent_server/confluent/util.py b/confluent_server/confluent/util.py index af27f473..e132ffff 100644 --- a/confluent_server/confluent/util.py +++ b/confluent_server/confluent/util.py @@ -40,10 +40,13 @@ def mkdirp(path, mode=0o777): async def _sleep_and_run(sleeptime, func, args): await asyncio.sleep(sleeptime) - func(*args) + print(repr(func)) + await func(*args) def spawn_after(sleeptime, func, *args): + if func is None: + raise Exception('tf') return spawn(_sleep_and_run(sleeptime, func, args))