diff --git a/confluent_client/bin/confetty b/confluent_client/bin/confetty index b62e59f1..012fbdde 100755 --- a/confluent_client/bin/confetty +++ b/confluent_client/bin/confetty @@ -486,7 +486,7 @@ async def do_command(command, server): elif argv[0] == 'set': setvalues(argv[1:]) elif argv[0] == 'create': - createresource(argv[1:]) + await createresource(argv[1:]) elif argv[0] in ('rm', 'delete', 'remove'): delresource(argv[1]) elif argv[0] in ('unset', 'clear'): @@ -501,7 +501,7 @@ def shutdown(): tlvdata.send(session.connection, {'operation': 'shutdown', 'path': '/'}) -def createresource(args): +async def createresource(args): resname = args[0] attribs = args[1:] keydata = parameterize_attribs(attribs) @@ -514,12 +514,12 @@ def createresource(args): collection, _, resname = targpath.rpartition('/') if 'name' not in keydata: keydata['name'] = resname - makecall(session.create, (collection, keydata)) + await makecall(session.create, (collection, keydata)) -def makecall(callout, args): +async def makecall(callout, args): global exitcode - for response in callout(*args): + async for response in callout(*args): if 'deleted' in response: print("Deleted: " + response['deleted']) if 'created' in response: @@ -550,9 +550,9 @@ def clearvalues(resource, attribs): sys.stderr.write('Error: ' + res['error'] + '\n') -def delresource(resname): +async def delresource(resname): resname = fullpath_target(resname) - makecall(session.delete, (resname,)) + await makecall(session.delete, (resname,)) def setvalues(attribs): diff --git a/confluent_client/confluent/client.py b/confluent_client/confluent/client.py index 62846b67..f1776ef8 100644 --- a/confluent_client/confluent/client.py +++ b/confluent_client/confluent/client.py @@ -15,10 +15,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -try: - import anydbm as dbm -except ImportError: - import dbm +import asyncio +import ctypes +import ctypes.util +import dbm import csv import errno import fnmatch @@ -30,6 +30,9 @@ import ssl import sys import confluent.tlvdata as tlvdata import confluent.sortutil as sortutil +libssl = ctypes.CDLL(ctypes.util.find_library('ssl')) +libssl.SSL_CTX_set_cert_verify_callback.argtypes = [ + ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] SO_PASSCRED = 16 @@ -47,6 +50,26 @@ except NameError: getinput = input +class PyObject_HEAD(ctypes.Structure): + _fields_ = [ + ("ob_refcnt", ctypes.c_ssize_t), + ("ob_type", ctypes.c_void_p), + ] + + +# see main/Modules/_ssl.c, only caring about the SSL_CTX pointer +class PySSLContext(ctypes.Structure): + _fields_ = [ + ("ob_base", PyObject_HEAD), + ("ctx", ctypes.c_void_p), + ] + + +@ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p) +def verify_stub(store, misc): + return 1 + + class NestedDict(dict): def __missing__(self, key): value = self[key] = type(self)() @@ -182,7 +205,7 @@ class Command(object): elif self.serverloc == '/var/run/confluent/api.sock': raise Exception('Confluent service is not available') else: - self._connect_tls() + await self._connect_tls() self.protversion = int((await tlvdata.recv(self.connection)).split( b'--')[1].strip()[1:]) authdata = await tlvdata.recv(self.connection) @@ -205,8 +228,8 @@ class Command(object): tlvdata.send(self.connection, {'filename': name, 'mode': mode}, handle) async def authenticate(self, username, password): - tlvdata.send(self.connection, - {'username': username, 'password': password}) + await tlvdata.send(self.connection, + {'username': username, 'password': password}) authdata = await tlvdata.recv(self.connection) if authdata['authpassed'] == 1: self.authenticated = True @@ -374,7 +397,7 @@ class Command(object): self.connection.setsockopt(socket.SOL_SOCKET, SO_PASSCRED, 1) self.connection.connect(self.serverloc) - def _connect_tls(self): + async def _connect_tls(self): server, port = _parseserver(self.serverloc) for res in socket.getaddrinfo(server, port, socket.AF_UNSPEC, socket.SOCK_STREAM): @@ -389,7 +412,7 @@ class Command(object): try: self.connection.settimeout(5) self.connection.connect(sa) - self.connection.settimeout(None) + self.connection.settimeout(0) except: raise self.connection.close() @@ -412,10 +435,21 @@ class Command(object): cacert = None certreqs = ssl.CERT_NONE knownhosts = True - self.connection = ssl.wrap_socket(self.connection, ca_certs=cacert, - cert_reqs=certreqs) + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ssl_ctx = PySSLContext.from_address(id(ctx)).ctx + libssl.SSL_CTX_set_cert_verify_callback(ssl_ctx, verify_stub, 0) + sreader = asyncio.StreamReader() + sreaderprot = asyncio.StreamReaderProtocol(sreader) + cloop = asyncio.get_event_loop() + tport, _ = await cloop.create_connection( + lambda: sreaderprot, sock=self.connection, ssl=ctx, server_hostname='x') + swriter = asyncio.StreamWriter(tport, sreaderprot, sreader, cloop) + self.connection = (sreader, swriter) + #self.connection = ssl.wrap_socket(self.connection, ca_certs=cacert, + # cert_reqs=certreqs) if knownhosts: - certdata = self.connection.getpeercert(binary_form=True) + certdata = tport.get_extra_info('ssl_object').getpeercert(binary_form=True) + # certdata = self.connection.getpeercert(binary_form=True) fingerprint = 'sha512$' + hashlib.sha512(certdata).hexdigest() fingerprint = fingerprint.encode('utf-8') hostid = '@'.join((port, server)) diff --git a/confluent_client/confluent/tlvdata.py b/confluent_client/confluent/tlvdata.py index 408e19fd..20b449c0 100644 --- a/confluent_client/confluent/tlvdata.py +++ b/confluent_client/confluent/tlvdata.py @@ -21,7 +21,6 @@ import ctypes import ctypes.util import confluent.tlv as tlv import socket -import select from datetime import datetime import json import os @@ -97,6 +96,7 @@ class ClientFile(object): self.fileobject = os.fdopen(fd, mode) self.filename = name + libc = ctypes.CDLL(ctypes.util.find_library('c')) recvmsg = libc.recvmsg recvmsg.argtypes = [ctypes.c_int, ctypes.POINTER(msghdr), ctypes.c_int] @@ -109,7 +109,9 @@ def _sendmsg(loop, fut, sock, msg, fds, rfd): if fut.cancelled(): return try: - retdata = sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", fds))]) + retdata = sock.sendmsg( + [msg], + [(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", fds))]) except (BlockingIOError, InterruptedError): fd = sock.fileno() loop.add_reader(fd, _sendmsg, loop, fut, sock, fd) @@ -148,6 +150,7 @@ def _recvmsg(loop, fut, sock, msglen, maxfds, rfd): :len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) fut.set_result(msglen, list(fds)) + def recv_fds(sock, msglen, maxfds): cloop = asyncio.get_event_loop() fut = cloop.create_future() @@ -168,6 +171,7 @@ def decodestr(value): return value return ret + def unicode_dictvalues(dictdata): for key in dictdata: if isinstance(dictdata[key], bytes): @@ -192,11 +196,13 @@ def _unicode_list(currlist): async def sendall(handle, data): if isinstance(handle, tuple): - return await handle[1].write(data) + handle[1].write(data) + return await handle[1].drain() else: cloop = asyncio.get_event_loop() return await cloop.sock_sendall(handle, data) + async def send(handle, data, filehandle=None): cloop = asyncio.get_event_loop() if isinstance(data, unicode): @@ -213,10 +219,10 @@ async def send(handle, data, filehandle=None): if tl < 16777216: # type for string is '0', so we don't need # to xor anything in - await cloop.sock_sendall(handle, struct.pack("!I", tl)) + await sendall(handle, struct.pack("!I", tl)) else: raise Exception("String data length exceeds protocol") - await cloop.sock_sendall(handle, data) + await sendall(handle, data) elif isinstance(data, dict): # JSON currently only goes to 4 bytes # Some structured message, like what would be seen in http responses unicode_dictvalues(data) # make everything unicode, assuming UTF-8 @@ -228,9 +234,9 @@ async def send(handle, data, filehandle=None): # xor in the type (0b1 << 24) if filehandle is None: tl |= 16777216 - await cloop.sock_sendall(handle, struct.pack("!I", tl)) - await cloop.sock_sendall(handle, sdata) - elif isinstance (handle, tuple): + await sendall(handle, struct.pack("!I", tl)) + await sendall(handle, sdata) + elif isinstance(handle, tuple): raise Exception("Cannot send filehandle over network socket") else: tl |= (2 << 24) diff --git a/confluent_server/bin/collective b/confluent_server/bin/collective index b5ebab5a..0168abfd 100644 --- a/confluent_server/bin/collective +++ b/confluent_server/bin/collective @@ -52,7 +52,7 @@ def make_certificate(): os.umask(umask) -def show_invitation(name, nonvoting=False): +async def show_invitation(name, nonvoting=False): if not os.path.exists('/etc/confluent/srvcert.pem'): make_certificate() s = client.Command().connection @@ -121,7 +121,7 @@ def show_collective(): else: print('Run collective show on leader for more data') -def main(): +async def main(): a = argparse.ArgumentParser(description='Confluent server utility') sp = a.add_subparsers(dest='command') gc = sp.add_parser('gencert', help='Generate Confluent Certificates for ' @@ -152,4 +152,4 @@ def main(): delete_member(cmdset.name) if __name__ == '__main__': - main() + asyncio.get_event_loop().run_until_complete(main()) diff --git a/confluent_server/confluent/auth.py b/confluent_server/confluent/auth.py index fd07a133..9b6c6eda 100644 --- a/confluent_server/confluent/auth.py +++ b/confluent_server/confluent/auth.py @@ -19,6 +19,7 @@ # the PBKDF2 transform is skipped unless a user has been idle for sufficient # time +import asyncio import confluent.config.configmanager as configmanager import eventlet import eventlet.tpool @@ -238,7 +239,7 @@ def authorize(name, element, tenant=False, operation='create', return False -def check_user_passphrase(name, passphrase, operation=None, element=None, tenant=False): +async def check_user_passphrase(name, passphrase, operation=None, element=None, tenant=False): """Check a a login name and passphrase for authenticity and authorization The function combines authentication and authorization into one function. @@ -268,7 +269,7 @@ def check_user_passphrase(name, passphrase, operation=None, element=None, tenant # by a user, which might be malicious # would normally make an event and wait # but here there's no need for that - eventlet.sleep(0.5) + await asyncio.sleep(0.5) cfm = configmanager.ConfigManager(tenant, username=user) ucfg = cfm.get_user(user) if ucfg is None: @@ -280,7 +281,7 @@ def check_user_passphrase(name, passphrase, operation=None, element=None, tenant except KeyError: pass if ucfg is None: - eventlet.sleep(0.05) + await asyncio.sleep(0.05) return None bpassphrase = None if isinstance(passphrase, dict) and len(passphrase) == 1: @@ -319,7 +320,7 @@ def check_user_passphrase(name, passphrase, operation=None, element=None, tenant authcleaner = eventlet.spawn_after(30, _clean_authworkers) crypted = eventlet.tpool.execute(_do_pbkdf, passphrase, salt) del _passchecking[(user, tenant)] - eventlet.sleep( + await asyncio.sleep( 0.05) # either way, we want to stall so that client can't # determine failure because there is a delay, valid response will # delay as well @@ -332,7 +333,7 @@ def check_user_passphrase(name, passphrase, operation=None, element=None, tenant pwe = pwd.getpwnam(user) except KeyError: #pam won't work if the user doesn't exist, don't go further - eventlet.sleep(0.05) # stall even on test for existence of a username + await asyncio.sleep(0.05) # stall even on test for existence of a username return None if os.getuid() != 0: # confluent is running with reduced privilege, however, pam_unix refuses @@ -375,7 +376,7 @@ def check_user_passphrase(name, passphrase, operation=None, element=None, tenant if bpassphrase: _passcache[(user, tenant)] = hashlib.sha256(bpassphrase).digest() return authorize(user, element, tenant, operation, skipuserobj=False) - eventlet.sleep(0.05) # stall even on test for existence of a username + await asyncio.sleep(0.05) # stall even on test for existence of a username return None def _apply_pbkdf(passphrase, salt): diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 2519cc39..5c1264b1 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -32,12 +32,9 @@ import greenlet import random import time import sys -try: - import OpenSSL.crypto as crypto -except ImportError: - # while not always required, we use pyopenssl required for at least - # collective - crypto = None + +import OpenSSL.crypto as crypto + currentleader = None follower = None diff --git a/confluent_server/confluent/credserver.py b/confluent_server/confluent/credserver.py index c569bc4d..2bb1ae35 100644 --- a/confluent_server/confluent/credserver.py +++ b/confluent_server/confluent/credserver.py @@ -14,14 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import confluent.config.configmanager as cfm import confluent.netutil as netutil import confluent.util as util import datetime -import eventlet -import eventlet.green.select as select -import eventlet.green.socket as socket -import eventlet.greenpool import hashlib import hmac import os @@ -60,25 +57,26 @@ class CredServer(object): def __init__(self): self.cfm = cfm.ConfigManager(None) - def handle_client(self, client, peer): + async def handle_client(self, client, peer): try: apiarmed = None hmackey = None hmacval = None - client.send(b'\xc2\xd1-\xa8\x80\xd8j\xba') - tlv = bytearray(client.recv(2)) + cloop = asyncio.get_event_loop() + await cloop.sock_send(client, b'\xc2\xd1-\xa8\x80\xd8j\xba') + tlv = bytearray(cloop.sock_recv(client, 2)) if tlv[0] != 1: client.close() return - nodename = util.stringify(client.recv(tlv[1])) - tlv = bytearray(client.recv(2)) # should always be null + nodename = util.stringify(await cloop.sock_recv(client, tlv[1])) + tlv = bytearray(await cloop.sock_recv(client, 2)) # should always be null onlylocal = True if tlv[0] == 6: - hmacval = client.recv(tlv[1]) + hmacval = await cloop.sock_recv(client, tlv[1]) hmackey = self.cfm.get_node_attributes(nodename, ['secret.selfapiarmtoken'], decrypt=True) hmackey = hmackey.get(nodename, {}).get('secret.selfapiarmtoken', {}).get('value', None) elif tlv[1]: - client.recv(tlv[1]) + await cloop.sock_recv(client, tlv[1]) apimats = self.cfm.get_node_attributes(nodename, ['deployment.apiarmed', 'deployment.sealedapikey']) apiarmed = apimats.get(nodename, {}).get('deployment.apiarmed', {}).get( @@ -95,7 +93,7 @@ class CredServer(object): if not isinstance(sealed, bytes): sealed = sealed.encode('utf8') reply = b'\x80' + struct.pack('>H', len(sealed) + 1) + sealed + b'\x00' - client.send(reply) + await cloop.sock_send(client, reply) client.close() return if apiarmed not in ('once', 'continuous'): @@ -105,23 +103,23 @@ class CredServer(object): self.cfm.set_node_attributes({nodename: {'deployment.apiarmed': ''}}) client.close() return - client.send(b'\x02\x20') + await cloop.sock_send(client, b'\x02\x20') rttoken = os.urandom(32) - client.send(rttoken) - client.send(b'\x00\x00') - tlv = bytearray(client.recv(2)) + await cloop.sock_send(client, rttoken) + await cloop.sock_send(client, b'\x00\x00') + tlv = bytearray(await cloop.sock_recv(client, 2)) if tlv[0] != 3: client.close() return - echotoken = client.recv(tlv[1]) + echotoken = await cloop.sock_recv(client, tlv[1]) if echotoken != rttoken: client.close() return - tlv = bytearray(client.recv(2)) + tlv = bytearray(await cloop.sock_recv(client, 2)) if tlv[0] != 4: client.close() return - echotoken = util.stringify(client.recv(tlv[1])) + echotoken = util.stringify(await cloop.sock_recv(client, tlv[1])) if hmackey: etok = echotoken.encode('utf8') if hmacval != hmac.new(hmackey, etok, hashlib.sha256).digest(): @@ -133,12 +131,14 @@ class CredServer(object): if apiarmed == 'continuous': del cfgupdate[nodename]['deployment.apiarmed'] self.cfm.set_node_attributes(cfgupdate) - client.recv(2) # drain end of message - client.send(b'\x05\x00') # report success + await cloop.sock_recv(client, 2) # drain end of message + await cloop.sock_send(client, b'\x05\x00') # report success finally: client.close() -if __name__ == '__main__': +async def main(): a = CredServer() while True: - eventlet.sleep(86400) + await asyncio.sleep(86400) +if __name__ == '__main__': + asyncio.get_event_loop().run_until_complete(main()) diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 82df5bb0..78bd077d 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -34,7 +34,6 @@ import traceback import socket import ssl -import eventlet import confluent.auth as auth import confluent.credserver as credserver @@ -57,7 +56,8 @@ plainsocket = None libc = ctypes.CDLL(ctypes.util.find_library('c')) libssl = ctypes.CDLL(ctypes.util.find_library('ssl')) -libssl.SSL_CTX_set_cert_verify_callback.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] +libssl.SSL_CTX_set_cert_verify_callback.argtypes = [ + ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] def _should_authlog(path, operation): @@ -68,6 +68,7 @@ def _should_authlog(path, operation): return False return True + class ClientConsole(object): def __init__(self, client): self.client = client @@ -109,7 +110,8 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): if authdata: cfm = authdata[1] authenticated = True - # version 0 == original, version 1 == pickle3 allowed, 2 = pickle forbidden, msgpack allowed + # version 0 == original, version 1 == pickle3 allowed, + # v2 = pickle forbidden, msgpack allowed # v3 - filehandle allowed # v4 - schema change and keepalive changes @@ -121,25 +123,28 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): return if 'collective' in response: return collective.handle_connection(connection, cert, - response['collective']) + response['collective']) while not configmanager.config_is_ready(): - eventlet.sleep(1) + await asyncio.sleep(1) if 'dispatch' in response: - dreq = tlvdata.recvall(connection, response['dispatch']['length']) + dreq = tlvdata.recvall( + connection, response['dispatch']['length']) return pluginapi.handle_dispatch(connection, cert, dreq, - response['dispatch']['name']) + response['dispatch']['name']) if 'proxyconsole' in response: - return start_proxy_term(connection, cert, response['proxyconsole']) + return start_proxy_term(connection, cert, + response['proxyconsole']) authname = response['username'] passphrase = response['password'] # note(jbjohnso): here, we need to authenticate, but not # authorize a user. When authorization starts understanding # element path, that authorization will need to be called # per request the user makes - authdata = auth.check_user_passphrase(authname, passphrase) + authdata = await auth.check_user_passphrase(authname, passphrase) if not authdata: auditlog.log( - {'operation': 'connect', 'user': authname, 'allowed': False}) + {'operation': 'connect', + 'user': authname, 'allowed': False}) else: authenticated = True cfm = authdata[1] @@ -147,12 +152,14 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): request = await tlvdata.recv(connection) if request and isinstance(request, dict) and 'collective' in request: if skipauth: - return collective.handle_connection(connection, None, request['collective'], - local=True) + return collective.handle_connection( + connection, None, request['collective'], local=True) else: tlvdata.send( - connection, - {'collective': {'error': 'collective management commands may only be used by root'}}) + connection, + {'collective': { + 'error': 'collective management commands ' + 'may only be used by root'}}) while request is not None: try: await process_request( @@ -160,19 +167,22 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): except exc.ConfluentException as e: if ((not isinstance(e, exc.LockedCredentials)) and e.apierrorcode == 500): - tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, - event=log.Events.stacktrace) + tracelog.log( + traceback.format_exc(), ltype=log.DataTypes.event, + event=log.Events.stacktrace) await send_data(connection, {'errorcode': e.apierrorcode, - 'error': e.apierrorstr, - 'detail': e.get_error_body()}) + 'error': e.apierrorstr, + 'detail': e.get_error_body()}) await send_data(connection, {'_requestdone': 1}) except SystemExit: sys.exit(0) except Exception as e: tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, - event=log.Events.stacktrace) - await send_data(connection, {'errorcode': 500, - 'error': 'Unexpected error - ' + str(e)}) + event=log.Events.stacktrace) + await send_data( + connection, + {'errorcode': 500, + 'error': 'Unexpected error - ' + str(e)}) await send_data(connection, {'_requestdone': 1}) try: request = await tlvdata.recv(connection) @@ -186,6 +196,7 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): except Exception: pass + async def send_response(responses, connection): if responses is None: return @@ -195,7 +206,8 @@ async def send_response(responses, connection): await send_data(connection, {'_requestdone': 1}) -async def process_request(connection, request, cfm, authdata, authname, skipauth): +async def process_request( + connection, request, cfm, authdata, authname, skipauth): if isinstance(request, tlvdata.ClientFile): cfm.add_client_file(request) return @@ -232,15 +244,16 @@ async def process_request(connection, request, cfm, authdata, authname, skipauth hdlr = pluginapi.handle_path(path, operation, cfm, params) except exc.NotFoundException as e: send_data(connection, {"errorcode": 404, - "error": "Target not found - " + str(e)}) + "error": "Target not found - " + str(e)}) send_data(connection, {"_requestdone": 1}) except exc.InvalidArgumentException as e: send_data(connection, {"errorcode": 400, - "error": "Bad Request - " + str(e)}) + "error": "Bad Request - " + str(e)}) send_data(connection, {"_requestdone": 1}) await send_response(hdlr, connection) return + def start_proxy_term(connection, cert, request): droneinfo = configmanager.get_collective_member(request['name']) if not util.cert_matches(droneinfo['fingerprint'], cert): @@ -255,6 +268,7 @@ def start_proxy_term(connection, cert, request): 'height', 24)) term_interact(None, None, ccons, None, connection, consession, None) + def start_term(authname, cfm, connection, params, path, authdata, skipauth): elems = path.split('/') if len(elems) < 4 or elems[1] != 'nodes': @@ -322,8 +336,10 @@ def term_interact(authdata, authname, ccons, cfm, connection, consession, tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace) - send_data(connection, {'errorcode': 500, - 'error': 'Unexpected error - ' + str(e)}) + send_data( + connection, + {'errorcode': 500, + 'error': 'Unexpected error - ' + str(e)}) send_data(connection, {'_requestdone': 1}) continue if not data: @@ -336,6 +352,7 @@ def term_interact(authdata, authname, ccons, cfm, connection, consession, async def _tlshandler(bind_host, bind_port): global plainsocket plainsocket = socket.socket(socket.AF_INET6) + plainsocket.settimeout(0) plainsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) plainsocket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) bound = False @@ -347,7 +364,7 @@ async def _tlshandler(bind_host, bind_port): if e.errno != 98: raise sys.stderr.write('TLS Socket in use, retrying in 1 second\n') - eventlet.sleep(1) + await asyncio.sleep(1) # Enable TCP_FASTOPEN plainsocket.setsockopt(socket.SOL_TCP, 23, 5) plainsocket.listen(5) @@ -356,7 +373,7 @@ async def _tlshandler(bind_host, bind_port): while (1): # TODO: exithook cnn, addr = await cloop.sock_accept(plainsocket) if addr[1] < 1000: - eventlet.spawn_n(cs.handle_client, cnn, addr) + asyncio.create_task(cs.handle_client(cnn, addr)) else: asyncio.create_task(_tlsstartup(cnn)) @@ -404,8 +421,9 @@ async def _tlsstartup(cnn): libssl.SSL_CTX_set_cert_verify_callback(ssl_ctx, verify_stub, 0) sreader = asyncio.StreamReader() sreaderprot = asyncio.StreamReaderProtocol(sreader) - tport, _ = await cloop.connect_accepted_socket(lambda: sreaderprot, sock=cnn, ssl=ctx) - swriter = asyncio.StreamWriter(tport, sreaderprot, sreader) + tport, _ = await cloop.connect_accepted_socket( + lambda: sreaderprot, sock=cnn, ssl=ctx) + swriter = asyncio.StreamWriter(tport, sreaderprot, sreader, cloop) cert = tport.get_extra_info('ssl_object').getpeercert(binary_form=True) cnn = (sreader, swriter) #cnn = ctx.wrap_socket(cnn, server_side=True)