From 198ffb8be63a9234b476a18c04e797bd08f2dc47 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 1 Apr 2024 16:38:10 -0400 Subject: [PATCH] Advance asyncio port Purge sockapi of remaining eventlet call Extend asyncio into the credserver to finish out sockapi. Have client and sockapi complete TLS connection including password checking Fix confetty ability to 'create'. --- confluent_client/bin/confetty | 14 ++-- confluent_client/confluent/client.py | 58 +++++++++++--- confluent_client/confluent/tlvdata.py | 22 ++++-- confluent_server/bin/collective | 6 +- confluent_server/confluent/auth.py | 13 ++-- .../confluent/collective/manager.py | 9 +-- confluent_server/confluent/credserver.py | 46 +++++------ confluent_server/confluent/sockapi.py | 78 ++++++++++++------- 8 files changed, 151 insertions(+), 95 deletions(-) diff --git a/confluent_client/bin/confetty b/confluent_client/bin/confetty index b62e59f1..012fbdde 100755 --- a/confluent_client/bin/confetty +++ b/confluent_client/bin/confetty @@ -486,7 +486,7 @@ async def do_command(command, server): elif argv[0] == 'set': setvalues(argv[1:]) elif argv[0] == 'create': - createresource(argv[1:]) + await createresource(argv[1:]) elif argv[0] in ('rm', 'delete', 'remove'): delresource(argv[1]) elif argv[0] in ('unset', 'clear'): @@ -501,7 +501,7 @@ def shutdown(): tlvdata.send(session.connection, {'operation': 'shutdown', 'path': '/'}) -def createresource(args): +async def createresource(args): resname = args[0] attribs = args[1:] keydata = parameterize_attribs(attribs) @@ -514,12 +514,12 @@ def createresource(args): collection, _, resname = targpath.rpartition('/') if 'name' not in keydata: keydata['name'] = resname - makecall(session.create, (collection, keydata)) + await makecall(session.create, (collection, keydata)) -def makecall(callout, args): +async def makecall(callout, args): global exitcode - for response in callout(*args): + async for response in callout(*args): if 'deleted' in response: print("Deleted: " + response['deleted']) if 'created' in response: @@ -550,9 +550,9 @@ def clearvalues(resource, attribs): sys.stderr.write('Error: ' + res['error'] + '\n') -def delresource(resname): +async def delresource(resname): resname = fullpath_target(resname) - makecall(session.delete, (resname,)) + await makecall(session.delete, (resname,)) def setvalues(attribs): diff --git a/confluent_client/confluent/client.py b/confluent_client/confluent/client.py index 62846b67..f1776ef8 100644 --- a/confluent_client/confluent/client.py +++ b/confluent_client/confluent/client.py @@ -15,10 +15,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -try: - import anydbm as dbm -except ImportError: - import dbm +import asyncio +import ctypes +import ctypes.util +import dbm import csv import errno import fnmatch @@ -30,6 +30,9 @@ import ssl import sys import confluent.tlvdata as tlvdata import confluent.sortutil as sortutil +libssl = ctypes.CDLL(ctypes.util.find_library('ssl')) +libssl.SSL_CTX_set_cert_verify_callback.argtypes = [ + ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] SO_PASSCRED = 16 @@ -47,6 +50,26 @@ except NameError: getinput = input +class PyObject_HEAD(ctypes.Structure): + _fields_ = [ + ("ob_refcnt", ctypes.c_ssize_t), + ("ob_type", ctypes.c_void_p), + ] + + +# see main/Modules/_ssl.c, only caring about the SSL_CTX pointer +class PySSLContext(ctypes.Structure): + _fields_ = [ + ("ob_base", PyObject_HEAD), + ("ctx", ctypes.c_void_p), + ] + + +@ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p) +def verify_stub(store, misc): + return 1 + + class NestedDict(dict): def __missing__(self, key): value = self[key] = type(self)() @@ -182,7 +205,7 @@ class Command(object): elif self.serverloc == '/var/run/confluent/api.sock': raise Exception('Confluent service is not available') else: - self._connect_tls() + await self._connect_tls() self.protversion = int((await tlvdata.recv(self.connection)).split( b'--')[1].strip()[1:]) authdata = await tlvdata.recv(self.connection) @@ -205,8 +228,8 @@ class Command(object): tlvdata.send(self.connection, {'filename': name, 'mode': mode}, handle) async def authenticate(self, username, password): - tlvdata.send(self.connection, - {'username': username, 'password': password}) + await tlvdata.send(self.connection, + {'username': username, 'password': password}) authdata = await tlvdata.recv(self.connection) if authdata['authpassed'] == 1: self.authenticated = True @@ -374,7 +397,7 @@ class Command(object): self.connection.setsockopt(socket.SOL_SOCKET, SO_PASSCRED, 1) self.connection.connect(self.serverloc) - def _connect_tls(self): + async def _connect_tls(self): server, port = _parseserver(self.serverloc) for res in socket.getaddrinfo(server, port, socket.AF_UNSPEC, socket.SOCK_STREAM): @@ -389,7 +412,7 @@ class Command(object): try: self.connection.settimeout(5) self.connection.connect(sa) - self.connection.settimeout(None) + self.connection.settimeout(0) except: raise self.connection.close() @@ -412,10 +435,21 @@ class Command(object): cacert = None certreqs = ssl.CERT_NONE knownhosts = True - self.connection = ssl.wrap_socket(self.connection, ca_certs=cacert, - cert_reqs=certreqs) + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ssl_ctx = PySSLContext.from_address(id(ctx)).ctx + libssl.SSL_CTX_set_cert_verify_callback(ssl_ctx, verify_stub, 0) + sreader = asyncio.StreamReader() + sreaderprot = asyncio.StreamReaderProtocol(sreader) + cloop = asyncio.get_event_loop() + tport, _ = await cloop.create_connection( + lambda: sreaderprot, sock=self.connection, ssl=ctx, server_hostname='x') + swriter = asyncio.StreamWriter(tport, sreaderprot, sreader, cloop) + self.connection = (sreader, swriter) + #self.connection = ssl.wrap_socket(self.connection, ca_certs=cacert, + # cert_reqs=certreqs) if knownhosts: - certdata = self.connection.getpeercert(binary_form=True) + certdata = tport.get_extra_info('ssl_object').getpeercert(binary_form=True) + # certdata = self.connection.getpeercert(binary_form=True) fingerprint = 'sha512$' + hashlib.sha512(certdata).hexdigest() fingerprint = fingerprint.encode('utf-8') hostid = '@'.join((port, server)) diff --git a/confluent_client/confluent/tlvdata.py b/confluent_client/confluent/tlvdata.py index 408e19fd..20b449c0 100644 --- a/confluent_client/confluent/tlvdata.py +++ b/confluent_client/confluent/tlvdata.py @@ -21,7 +21,6 @@ import ctypes import ctypes.util import confluent.tlv as tlv import socket -import select from datetime import datetime import json import os @@ -97,6 +96,7 @@ class ClientFile(object): self.fileobject = os.fdopen(fd, mode) self.filename = name + libc = ctypes.CDLL(ctypes.util.find_library('c')) recvmsg = libc.recvmsg recvmsg.argtypes = [ctypes.c_int, ctypes.POINTER(msghdr), ctypes.c_int] @@ -109,7 +109,9 @@ def _sendmsg(loop, fut, sock, msg, fds, rfd): if fut.cancelled(): return try: - retdata = sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", fds))]) + retdata = sock.sendmsg( + [msg], + [(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", fds))]) except (BlockingIOError, InterruptedError): fd = sock.fileno() loop.add_reader(fd, _sendmsg, loop, fut, sock, fd) @@ -148,6 +150,7 @@ def _recvmsg(loop, fut, sock, msglen, maxfds, rfd): :len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) fut.set_result(msglen, list(fds)) + def recv_fds(sock, msglen, maxfds): cloop = asyncio.get_event_loop() fut = cloop.create_future() @@ -168,6 +171,7 @@ def decodestr(value): return value return ret + def unicode_dictvalues(dictdata): for key in dictdata: if isinstance(dictdata[key], bytes): @@ -192,11 +196,13 @@ def _unicode_list(currlist): async def sendall(handle, data): if isinstance(handle, tuple): - return await handle[1].write(data) + handle[1].write(data) + return await handle[1].drain() else: cloop = asyncio.get_event_loop() return await cloop.sock_sendall(handle, data) + async def send(handle, data, filehandle=None): cloop = asyncio.get_event_loop() if isinstance(data, unicode): @@ -213,10 +219,10 @@ async def send(handle, data, filehandle=None): if tl < 16777216: # type for string is '0', so we don't need # to xor anything in - await cloop.sock_sendall(handle, struct.pack("!I", tl)) + await sendall(handle, struct.pack("!I", tl)) else: raise Exception("String data length exceeds protocol") - await cloop.sock_sendall(handle, data) + await sendall(handle, data) elif isinstance(data, dict): # JSON currently only goes to 4 bytes # Some structured message, like what would be seen in http responses unicode_dictvalues(data) # make everything unicode, assuming UTF-8 @@ -228,9 +234,9 @@ async def send(handle, data, filehandle=None): # xor in the type (0b1 << 24) if filehandle is None: tl |= 16777216 - await cloop.sock_sendall(handle, struct.pack("!I", tl)) - await cloop.sock_sendall(handle, sdata) - elif isinstance (handle, tuple): + await sendall(handle, struct.pack("!I", tl)) + await sendall(handle, sdata) + elif isinstance(handle, tuple): raise Exception("Cannot send filehandle over network socket") else: tl |= (2 << 24) diff --git a/confluent_server/bin/collective b/confluent_server/bin/collective index b5ebab5a..0168abfd 100644 --- a/confluent_server/bin/collective +++ b/confluent_server/bin/collective @@ -52,7 +52,7 @@ def make_certificate(): os.umask(umask) -def show_invitation(name, nonvoting=False): +async def show_invitation(name, nonvoting=False): if not os.path.exists('/etc/confluent/srvcert.pem'): make_certificate() s = client.Command().connection @@ -121,7 +121,7 @@ def show_collective(): else: print('Run collective show on leader for more data') -def main(): +async def main(): a = argparse.ArgumentParser(description='Confluent server utility') sp = a.add_subparsers(dest='command') gc = sp.add_parser('gencert', help='Generate Confluent Certificates for ' @@ -152,4 +152,4 @@ def main(): delete_member(cmdset.name) if __name__ == '__main__': - main() + asyncio.get_event_loop().run_until_complete(main()) diff --git a/confluent_server/confluent/auth.py b/confluent_server/confluent/auth.py index fd07a133..9b6c6eda 100644 --- a/confluent_server/confluent/auth.py +++ b/confluent_server/confluent/auth.py @@ -19,6 +19,7 @@ # the PBKDF2 transform is skipped unless a user has been idle for sufficient # time +import asyncio import confluent.config.configmanager as configmanager import eventlet import eventlet.tpool @@ -238,7 +239,7 @@ def authorize(name, element, tenant=False, operation='create', return False -def check_user_passphrase(name, passphrase, operation=None, element=None, tenant=False): +async def check_user_passphrase(name, passphrase, operation=None, element=None, tenant=False): """Check a a login name and passphrase for authenticity and authorization The function combines authentication and authorization into one function. @@ -268,7 +269,7 @@ def check_user_passphrase(name, passphrase, operation=None, element=None, tenant # by a user, which might be malicious # would normally make an event and wait # but here there's no need for that - eventlet.sleep(0.5) + await asyncio.sleep(0.5) cfm = configmanager.ConfigManager(tenant, username=user) ucfg = cfm.get_user(user) if ucfg is None: @@ -280,7 +281,7 @@ def check_user_passphrase(name, passphrase, operation=None, element=None, tenant except KeyError: pass if ucfg is None: - eventlet.sleep(0.05) + await asyncio.sleep(0.05) return None bpassphrase = None if isinstance(passphrase, dict) and len(passphrase) == 1: @@ -319,7 +320,7 @@ def check_user_passphrase(name, passphrase, operation=None, element=None, tenant authcleaner = eventlet.spawn_after(30, _clean_authworkers) crypted = eventlet.tpool.execute(_do_pbkdf, passphrase, salt) del _passchecking[(user, tenant)] - eventlet.sleep( + await asyncio.sleep( 0.05) # either way, we want to stall so that client can't # determine failure because there is a delay, valid response will # delay as well @@ -332,7 +333,7 @@ def check_user_passphrase(name, passphrase, operation=None, element=None, tenant pwe = pwd.getpwnam(user) except KeyError: #pam won't work if the user doesn't exist, don't go further - eventlet.sleep(0.05) # stall even on test for existence of a username + await asyncio.sleep(0.05) # stall even on test for existence of a username return None if os.getuid() != 0: # confluent is running with reduced privilege, however, pam_unix refuses @@ -375,7 +376,7 @@ def check_user_passphrase(name, passphrase, operation=None, element=None, tenant if bpassphrase: _passcache[(user, tenant)] = hashlib.sha256(bpassphrase).digest() return authorize(user, element, tenant, operation, skipuserobj=False) - eventlet.sleep(0.05) # stall even on test for existence of a username + await asyncio.sleep(0.05) # stall even on test for existence of a username return None def _apply_pbkdf(passphrase, salt): diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 2519cc39..5c1264b1 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -32,12 +32,9 @@ import greenlet import random import time import sys -try: - import OpenSSL.crypto as crypto -except ImportError: - # while not always required, we use pyopenssl required for at least - # collective - crypto = None + +import OpenSSL.crypto as crypto + currentleader = None follower = None diff --git a/confluent_server/confluent/credserver.py b/confluent_server/confluent/credserver.py index c569bc4d..2bb1ae35 100644 --- a/confluent_server/confluent/credserver.py +++ b/confluent_server/confluent/credserver.py @@ -14,14 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import confluent.config.configmanager as cfm import confluent.netutil as netutil import confluent.util as util import datetime -import eventlet -import eventlet.green.select as select -import eventlet.green.socket as socket -import eventlet.greenpool import hashlib import hmac import os @@ -60,25 +57,26 @@ class CredServer(object): def __init__(self): self.cfm = cfm.ConfigManager(None) - def handle_client(self, client, peer): + async def handle_client(self, client, peer): try: apiarmed = None hmackey = None hmacval = None - client.send(b'\xc2\xd1-\xa8\x80\xd8j\xba') - tlv = bytearray(client.recv(2)) + cloop = asyncio.get_event_loop() + await cloop.sock_send(client, b'\xc2\xd1-\xa8\x80\xd8j\xba') + tlv = bytearray(cloop.sock_recv(client, 2)) if tlv[0] != 1: client.close() return - nodename = util.stringify(client.recv(tlv[1])) - tlv = bytearray(client.recv(2)) # should always be null + nodename = util.stringify(await cloop.sock_recv(client, tlv[1])) + tlv = bytearray(await cloop.sock_recv(client, 2)) # should always be null onlylocal = True if tlv[0] == 6: - hmacval = client.recv(tlv[1]) + hmacval = await cloop.sock_recv(client, tlv[1]) hmackey = self.cfm.get_node_attributes(nodename, ['secret.selfapiarmtoken'], decrypt=True) hmackey = hmackey.get(nodename, {}).get('secret.selfapiarmtoken', {}).get('value', None) elif tlv[1]: - client.recv(tlv[1]) + await cloop.sock_recv(client, tlv[1]) apimats = self.cfm.get_node_attributes(nodename, ['deployment.apiarmed', 'deployment.sealedapikey']) apiarmed = apimats.get(nodename, {}).get('deployment.apiarmed', {}).get( @@ -95,7 +93,7 @@ class CredServer(object): if not isinstance(sealed, bytes): sealed = sealed.encode('utf8') reply = b'\x80' + struct.pack('>H', len(sealed) + 1) + sealed + b'\x00' - client.send(reply) + await cloop.sock_send(client, reply) client.close() return if apiarmed not in ('once', 'continuous'): @@ -105,23 +103,23 @@ class CredServer(object): self.cfm.set_node_attributes({nodename: {'deployment.apiarmed': ''}}) client.close() return - client.send(b'\x02\x20') + await cloop.sock_send(client, b'\x02\x20') rttoken = os.urandom(32) - client.send(rttoken) - client.send(b'\x00\x00') - tlv = bytearray(client.recv(2)) + await cloop.sock_send(client, rttoken) + await cloop.sock_send(client, b'\x00\x00') + tlv = bytearray(await cloop.sock_recv(client, 2)) if tlv[0] != 3: client.close() return - echotoken = client.recv(tlv[1]) + echotoken = await cloop.sock_recv(client, tlv[1]) if echotoken != rttoken: client.close() return - tlv = bytearray(client.recv(2)) + tlv = bytearray(await cloop.sock_recv(client, 2)) if tlv[0] != 4: client.close() return - echotoken = util.stringify(client.recv(tlv[1])) + echotoken = util.stringify(await cloop.sock_recv(client, tlv[1])) if hmackey: etok = echotoken.encode('utf8') if hmacval != hmac.new(hmackey, etok, hashlib.sha256).digest(): @@ -133,12 +131,14 @@ class CredServer(object): if apiarmed == 'continuous': del cfgupdate[nodename]['deployment.apiarmed'] self.cfm.set_node_attributes(cfgupdate) - client.recv(2) # drain end of message - client.send(b'\x05\x00') # report success + await cloop.sock_recv(client, 2) # drain end of message + await cloop.sock_send(client, b'\x05\x00') # report success finally: client.close() -if __name__ == '__main__': +async def main(): a = CredServer() while True: - eventlet.sleep(86400) + await asyncio.sleep(86400) +if __name__ == '__main__': + asyncio.get_event_loop().run_until_complete(main()) diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 82df5bb0..78bd077d 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -34,7 +34,6 @@ import traceback import socket import ssl -import eventlet import confluent.auth as auth import confluent.credserver as credserver @@ -57,7 +56,8 @@ plainsocket = None libc = ctypes.CDLL(ctypes.util.find_library('c')) libssl = ctypes.CDLL(ctypes.util.find_library('ssl')) -libssl.SSL_CTX_set_cert_verify_callback.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] +libssl.SSL_CTX_set_cert_verify_callback.argtypes = [ + ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] def _should_authlog(path, operation): @@ -68,6 +68,7 @@ def _should_authlog(path, operation): return False return True + class ClientConsole(object): def __init__(self, client): self.client = client @@ -109,7 +110,8 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): if authdata: cfm = authdata[1] authenticated = True - # version 0 == original, version 1 == pickle3 allowed, 2 = pickle forbidden, msgpack allowed + # version 0 == original, version 1 == pickle3 allowed, + # v2 = pickle forbidden, msgpack allowed # v3 - filehandle allowed # v4 - schema change and keepalive changes @@ -121,25 +123,28 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): return if 'collective' in response: return collective.handle_connection(connection, cert, - response['collective']) + response['collective']) while not configmanager.config_is_ready(): - eventlet.sleep(1) + await asyncio.sleep(1) if 'dispatch' in response: - dreq = tlvdata.recvall(connection, response['dispatch']['length']) + dreq = tlvdata.recvall( + connection, response['dispatch']['length']) return pluginapi.handle_dispatch(connection, cert, dreq, - response['dispatch']['name']) + response['dispatch']['name']) if 'proxyconsole' in response: - return start_proxy_term(connection, cert, response['proxyconsole']) + return start_proxy_term(connection, cert, + response['proxyconsole']) authname = response['username'] passphrase = response['password'] # note(jbjohnso): here, we need to authenticate, but not # authorize a user. When authorization starts understanding # element path, that authorization will need to be called # per request the user makes - authdata = auth.check_user_passphrase(authname, passphrase) + authdata = await auth.check_user_passphrase(authname, passphrase) if not authdata: auditlog.log( - {'operation': 'connect', 'user': authname, 'allowed': False}) + {'operation': 'connect', + 'user': authname, 'allowed': False}) else: authenticated = True cfm = authdata[1] @@ -147,12 +152,14 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): request = await tlvdata.recv(connection) if request and isinstance(request, dict) and 'collective' in request: if skipauth: - return collective.handle_connection(connection, None, request['collective'], - local=True) + return collective.handle_connection( + connection, None, request['collective'], local=True) else: tlvdata.send( - connection, - {'collective': {'error': 'collective management commands may only be used by root'}}) + connection, + {'collective': { + 'error': 'collective management commands ' + 'may only be used by root'}}) while request is not None: try: await process_request( @@ -160,19 +167,22 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): except exc.ConfluentException as e: if ((not isinstance(e, exc.LockedCredentials)) and e.apierrorcode == 500): - tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, - event=log.Events.stacktrace) + tracelog.log( + traceback.format_exc(), ltype=log.DataTypes.event, + event=log.Events.stacktrace) await send_data(connection, {'errorcode': e.apierrorcode, - 'error': e.apierrorstr, - 'detail': e.get_error_body()}) + 'error': e.apierrorstr, + 'detail': e.get_error_body()}) await send_data(connection, {'_requestdone': 1}) except SystemExit: sys.exit(0) except Exception as e: tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, - event=log.Events.stacktrace) - await send_data(connection, {'errorcode': 500, - 'error': 'Unexpected error - ' + str(e)}) + event=log.Events.stacktrace) + await send_data( + connection, + {'errorcode': 500, + 'error': 'Unexpected error - ' + str(e)}) await send_data(connection, {'_requestdone': 1}) try: request = await tlvdata.recv(connection) @@ -186,6 +196,7 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): except Exception: pass + async def send_response(responses, connection): if responses is None: return @@ -195,7 +206,8 @@ async def send_response(responses, connection): await send_data(connection, {'_requestdone': 1}) -async def process_request(connection, request, cfm, authdata, authname, skipauth): +async def process_request( + connection, request, cfm, authdata, authname, skipauth): if isinstance(request, tlvdata.ClientFile): cfm.add_client_file(request) return @@ -232,15 +244,16 @@ async def process_request(connection, request, cfm, authdata, authname, skipauth hdlr = pluginapi.handle_path(path, operation, cfm, params) except exc.NotFoundException as e: send_data(connection, {"errorcode": 404, - "error": "Target not found - " + str(e)}) + "error": "Target not found - " + str(e)}) send_data(connection, {"_requestdone": 1}) except exc.InvalidArgumentException as e: send_data(connection, {"errorcode": 400, - "error": "Bad Request - " + str(e)}) + "error": "Bad Request - " + str(e)}) send_data(connection, {"_requestdone": 1}) await send_response(hdlr, connection) return + def start_proxy_term(connection, cert, request): droneinfo = configmanager.get_collective_member(request['name']) if not util.cert_matches(droneinfo['fingerprint'], cert): @@ -255,6 +268,7 @@ def start_proxy_term(connection, cert, request): 'height', 24)) term_interact(None, None, ccons, None, connection, consession, None) + def start_term(authname, cfm, connection, params, path, authdata, skipauth): elems = path.split('/') if len(elems) < 4 or elems[1] != 'nodes': @@ -322,8 +336,10 @@ def term_interact(authdata, authname, ccons, cfm, connection, consession, tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace) - send_data(connection, {'errorcode': 500, - 'error': 'Unexpected error - ' + str(e)}) + send_data( + connection, + {'errorcode': 500, + 'error': 'Unexpected error - ' + str(e)}) send_data(connection, {'_requestdone': 1}) continue if not data: @@ -336,6 +352,7 @@ def term_interact(authdata, authname, ccons, cfm, connection, consession, async def _tlshandler(bind_host, bind_port): global plainsocket plainsocket = socket.socket(socket.AF_INET6) + plainsocket.settimeout(0) plainsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) plainsocket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) bound = False @@ -347,7 +364,7 @@ async def _tlshandler(bind_host, bind_port): if e.errno != 98: raise sys.stderr.write('TLS Socket in use, retrying in 1 second\n') - eventlet.sleep(1) + await asyncio.sleep(1) # Enable TCP_FASTOPEN plainsocket.setsockopt(socket.SOL_TCP, 23, 5) plainsocket.listen(5) @@ -356,7 +373,7 @@ async def _tlshandler(bind_host, bind_port): while (1): # TODO: exithook cnn, addr = await cloop.sock_accept(plainsocket) if addr[1] < 1000: - eventlet.spawn_n(cs.handle_client, cnn, addr) + asyncio.create_task(cs.handle_client(cnn, addr)) else: asyncio.create_task(_tlsstartup(cnn)) @@ -404,8 +421,9 @@ async def _tlsstartup(cnn): libssl.SSL_CTX_set_cert_verify_callback(ssl_ctx, verify_stub, 0) sreader = asyncio.StreamReader() sreaderprot = asyncio.StreamReaderProtocol(sreader) - tport, _ = await cloop.connect_accepted_socket(lambda: sreaderprot, sock=cnn, ssl=ctx) - swriter = asyncio.StreamWriter(tport, sreaderprot, sreader) + tport, _ = await cloop.connect_accepted_socket( + lambda: sreaderprot, sock=cnn, ssl=ctx) + swriter = asyncio.StreamWriter(tport, sreaderprot, sreader, cloop) cert = tport.get_extra_info('ssl_object').getpeercert(binary_form=True) cnn = (sreader, swriter) #cnn = ctx.wrap_socket(cnn, server_side=True)