2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-03-03 20:39:17 +00:00

Advance asyncio port

Purge sockapi of remaining eventlet call

Extend asyncio into the credserver to finish out sockapi.

Have client and sockapi complete TLS connection including password checking

Fix confetty ability to 'create'.
This commit is contained in:
Jarrod Johnson
2024-04-01 16:38:10 -04:00
parent c1d680d8d8
commit 198ffb8be6
8 changed files with 151 additions and 95 deletions

View File

@@ -486,7 +486,7 @@ async def do_command(command, server):
elif argv[0] == 'set':
setvalues(argv[1:])
elif argv[0] == 'create':
createresource(argv[1:])
await createresource(argv[1:])
elif argv[0] in ('rm', 'delete', 'remove'):
delresource(argv[1])
elif argv[0] in ('unset', 'clear'):
@@ -501,7 +501,7 @@ def shutdown():
tlvdata.send(session.connection, {'operation': 'shutdown', 'path': '/'})
def createresource(args):
async def createresource(args):
resname = args[0]
attribs = args[1:]
keydata = parameterize_attribs(attribs)
@@ -514,12 +514,12 @@ def createresource(args):
collection, _, resname = targpath.rpartition('/')
if 'name' not in keydata:
keydata['name'] = resname
makecall(session.create, (collection, keydata))
await makecall(session.create, (collection, keydata))
def makecall(callout, args):
async def makecall(callout, args):
global exitcode
for response in callout(*args):
async for response in callout(*args):
if 'deleted' in response:
print("Deleted: " + response['deleted'])
if 'created' in response:
@@ -550,9 +550,9 @@ def clearvalues(resource, attribs):
sys.stderr.write('Error: ' + res['error'] + '\n')
def delresource(resname):
async def delresource(resname):
resname = fullpath_target(resname)
makecall(session.delete, (resname,))
await makecall(session.delete, (resname,))
def setvalues(attribs):

View File

@@ -15,10 +15,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import anydbm as dbm
except ImportError:
import dbm
import asyncio
import ctypes
import ctypes.util
import dbm
import csv
import errno
import fnmatch
@@ -30,6 +30,9 @@ import ssl
import sys
import confluent.tlvdata as tlvdata
import confluent.sortutil as sortutil
libssl = ctypes.CDLL(ctypes.util.find_library('ssl'))
libssl.SSL_CTX_set_cert_verify_callback.argtypes = [
ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
SO_PASSCRED = 16
@@ -47,6 +50,26 @@ except NameError:
getinput = input
class PyObject_HEAD(ctypes.Structure):
_fields_ = [
("ob_refcnt", ctypes.c_ssize_t),
("ob_type", ctypes.c_void_p),
]
# see main/Modules/_ssl.c, only caring about the SSL_CTX pointer
class PySSLContext(ctypes.Structure):
_fields_ = [
("ob_base", PyObject_HEAD),
("ctx", ctypes.c_void_p),
]
@ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p)
def verify_stub(store, misc):
return 1
class NestedDict(dict):
def __missing__(self, key):
value = self[key] = type(self)()
@@ -182,7 +205,7 @@ class Command(object):
elif self.serverloc == '/var/run/confluent/api.sock':
raise Exception('Confluent service is not available')
else:
self._connect_tls()
await self._connect_tls()
self.protversion = int((await tlvdata.recv(self.connection)).split(
b'--')[1].strip()[1:])
authdata = await tlvdata.recv(self.connection)
@@ -205,8 +228,8 @@ class Command(object):
tlvdata.send(self.connection, {'filename': name, 'mode': mode}, handle)
async def authenticate(self, username, password):
tlvdata.send(self.connection,
{'username': username, 'password': password})
await tlvdata.send(self.connection,
{'username': username, 'password': password})
authdata = await tlvdata.recv(self.connection)
if authdata['authpassed'] == 1:
self.authenticated = True
@@ -374,7 +397,7 @@ class Command(object):
self.connection.setsockopt(socket.SOL_SOCKET, SO_PASSCRED, 1)
self.connection.connect(self.serverloc)
def _connect_tls(self):
async def _connect_tls(self):
server, port = _parseserver(self.serverloc)
for res in socket.getaddrinfo(server, port, socket.AF_UNSPEC,
socket.SOCK_STREAM):
@@ -389,7 +412,7 @@ class Command(object):
try:
self.connection.settimeout(5)
self.connection.connect(sa)
self.connection.settimeout(None)
self.connection.settimeout(0)
except:
raise
self.connection.close()
@@ -412,10 +435,21 @@ class Command(object):
cacert = None
certreqs = ssl.CERT_NONE
knownhosts = True
self.connection = ssl.wrap_socket(self.connection, ca_certs=cacert,
cert_reqs=certreqs)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_ctx = PySSLContext.from_address(id(ctx)).ctx
libssl.SSL_CTX_set_cert_verify_callback(ssl_ctx, verify_stub, 0)
sreader = asyncio.StreamReader()
sreaderprot = asyncio.StreamReaderProtocol(sreader)
cloop = asyncio.get_event_loop()
tport, _ = await cloop.create_connection(
lambda: sreaderprot, sock=self.connection, ssl=ctx, server_hostname='x')
swriter = asyncio.StreamWriter(tport, sreaderprot, sreader, cloop)
self.connection = (sreader, swriter)
#self.connection = ssl.wrap_socket(self.connection, ca_certs=cacert,
# cert_reqs=certreqs)
if knownhosts:
certdata = self.connection.getpeercert(binary_form=True)
certdata = tport.get_extra_info('ssl_object').getpeercert(binary_form=True)
# certdata = self.connection.getpeercert(binary_form=True)
fingerprint = 'sha512$' + hashlib.sha512(certdata).hexdigest()
fingerprint = fingerprint.encode('utf-8')
hostid = '@'.join((port, server))

View File

@@ -21,7 +21,6 @@ import ctypes
import ctypes.util
import confluent.tlv as tlv
import socket
import select
from datetime import datetime
import json
import os
@@ -97,6 +96,7 @@ class ClientFile(object):
self.fileobject = os.fdopen(fd, mode)
self.filename = name
libc = ctypes.CDLL(ctypes.util.find_library('c'))
recvmsg = libc.recvmsg
recvmsg.argtypes = [ctypes.c_int, ctypes.POINTER(msghdr), ctypes.c_int]
@@ -109,7 +109,9 @@ def _sendmsg(loop, fut, sock, msg, fds, rfd):
if fut.cancelled():
return
try:
retdata = sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", fds))])
retdata = sock.sendmsg(
[msg],
[(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", fds))])
except (BlockingIOError, InterruptedError):
fd = sock.fileno()
loop.add_reader(fd, _sendmsg, loop, fut, sock, fd)
@@ -148,6 +150,7 @@ def _recvmsg(loop, fut, sock, msglen, maxfds, rfd):
:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
fut.set_result(msglen, list(fds))
def recv_fds(sock, msglen, maxfds):
cloop = asyncio.get_event_loop()
fut = cloop.create_future()
@@ -168,6 +171,7 @@ def decodestr(value):
return value
return ret
def unicode_dictvalues(dictdata):
for key in dictdata:
if isinstance(dictdata[key], bytes):
@@ -192,11 +196,13 @@ def _unicode_list(currlist):
async def sendall(handle, data):
if isinstance(handle, tuple):
return await handle[1].write(data)
handle[1].write(data)
return await handle[1].drain()
else:
cloop = asyncio.get_event_loop()
return await cloop.sock_sendall(handle, data)
async def send(handle, data, filehandle=None):
cloop = asyncio.get_event_loop()
if isinstance(data, unicode):
@@ -213,10 +219,10 @@ async def send(handle, data, filehandle=None):
if tl < 16777216:
# type for string is '0', so we don't need
# to xor anything in
await cloop.sock_sendall(handle, struct.pack("!I", tl))
await sendall(handle, struct.pack("!I", tl))
else:
raise Exception("String data length exceeds protocol")
await cloop.sock_sendall(handle, data)
await sendall(handle, data)
elif isinstance(data, dict): # JSON currently only goes to 4 bytes
# Some structured message, like what would be seen in http responses
unicode_dictvalues(data) # make everything unicode, assuming UTF-8
@@ -228,9 +234,9 @@ async def send(handle, data, filehandle=None):
# xor in the type (0b1 << 24)
if filehandle is None:
tl |= 16777216
await cloop.sock_sendall(handle, struct.pack("!I", tl))
await cloop.sock_sendall(handle, sdata)
elif isinstance (handle, tuple):
await sendall(handle, struct.pack("!I", tl))
await sendall(handle, sdata)
elif isinstance(handle, tuple):
raise Exception("Cannot send filehandle over network socket")
else:
tl |= (2 << 24)

View File

@@ -52,7 +52,7 @@ def make_certificate():
os.umask(umask)
def show_invitation(name, nonvoting=False):
async def show_invitation(name, nonvoting=False):
if not os.path.exists('/etc/confluent/srvcert.pem'):
make_certificate()
s = client.Command().connection
@@ -121,7 +121,7 @@ def show_collective():
else:
print('Run collective show on leader for more data')
def main():
async def main():
a = argparse.ArgumentParser(description='Confluent server utility')
sp = a.add_subparsers(dest='command')
gc = sp.add_parser('gencert', help='Generate Confluent Certificates for '
@@ -152,4 +152,4 @@ def main():
delete_member(cmdset.name)
if __name__ == '__main__':
main()
asyncio.get_event_loop().run_until_complete(main())

View File

@@ -19,6 +19,7 @@
# the PBKDF2 transform is skipped unless a user has been idle for sufficient
# time
import asyncio
import confluent.config.configmanager as configmanager
import eventlet
import eventlet.tpool
@@ -238,7 +239,7 @@ def authorize(name, element, tenant=False, operation='create',
return False
def check_user_passphrase(name, passphrase, operation=None, element=None, tenant=False):
async def check_user_passphrase(name, passphrase, operation=None, element=None, tenant=False):
"""Check a a login name and passphrase for authenticity and authorization
The function combines authentication and authorization into one function.
@@ -268,7 +269,7 @@ def check_user_passphrase(name, passphrase, operation=None, element=None, tenant
# by a user, which might be malicious
# would normally make an event and wait
# but here there's no need for that
eventlet.sleep(0.5)
await asyncio.sleep(0.5)
cfm = configmanager.ConfigManager(tenant, username=user)
ucfg = cfm.get_user(user)
if ucfg is None:
@@ -280,7 +281,7 @@ def check_user_passphrase(name, passphrase, operation=None, element=None, tenant
except KeyError:
pass
if ucfg is None:
eventlet.sleep(0.05)
await asyncio.sleep(0.05)
return None
bpassphrase = None
if isinstance(passphrase, dict) and len(passphrase) == 1:
@@ -319,7 +320,7 @@ def check_user_passphrase(name, passphrase, operation=None, element=None, tenant
authcleaner = eventlet.spawn_after(30, _clean_authworkers)
crypted = eventlet.tpool.execute(_do_pbkdf, passphrase, salt)
del _passchecking[(user, tenant)]
eventlet.sleep(
await asyncio.sleep(
0.05) # either way, we want to stall so that client can't
# determine failure because there is a delay, valid response will
# delay as well
@@ -332,7 +333,7 @@ def check_user_passphrase(name, passphrase, operation=None, element=None, tenant
pwe = pwd.getpwnam(user)
except KeyError:
#pam won't work if the user doesn't exist, don't go further
eventlet.sleep(0.05) # stall even on test for existence of a username
await asyncio.sleep(0.05) # stall even on test for existence of a username
return None
if os.getuid() != 0:
# confluent is running with reduced privilege, however, pam_unix refuses
@@ -375,7 +376,7 @@ def check_user_passphrase(name, passphrase, operation=None, element=None, tenant
if bpassphrase:
_passcache[(user, tenant)] = hashlib.sha256(bpassphrase).digest()
return authorize(user, element, tenant, operation, skipuserobj=False)
eventlet.sleep(0.05) # stall even on test for existence of a username
await asyncio.sleep(0.05) # stall even on test for existence of a username
return None
def _apply_pbkdf(passphrase, salt):

View File

@@ -32,12 +32,9 @@ import greenlet
import random
import time
import sys
try:
import OpenSSL.crypto as crypto
except ImportError:
# while not always required, we use pyopenssl required for at least
# collective
crypto = None
import OpenSSL.crypto as crypto
currentleader = None
follower = None

View File

@@ -14,14 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import confluent.config.configmanager as cfm
import confluent.netutil as netutil
import confluent.util as util
import datetime
import eventlet
import eventlet.green.select as select
import eventlet.green.socket as socket
import eventlet.greenpool
import hashlib
import hmac
import os
@@ -60,25 +57,26 @@ class CredServer(object):
def __init__(self):
self.cfm = cfm.ConfigManager(None)
def handle_client(self, client, peer):
async def handle_client(self, client, peer):
try:
apiarmed = None
hmackey = None
hmacval = None
client.send(b'\xc2\xd1-\xa8\x80\xd8j\xba')
tlv = bytearray(client.recv(2))
cloop = asyncio.get_event_loop()
await cloop.sock_send(client, b'\xc2\xd1-\xa8\x80\xd8j\xba')
tlv = bytearray(cloop.sock_recv(client, 2))
if tlv[0] != 1:
client.close()
return
nodename = util.stringify(client.recv(tlv[1]))
tlv = bytearray(client.recv(2)) # should always be null
nodename = util.stringify(await cloop.sock_recv(client, tlv[1]))
tlv = bytearray(await cloop.sock_recv(client, 2)) # should always be null
onlylocal = True
if tlv[0] == 6:
hmacval = client.recv(tlv[1])
hmacval = await cloop.sock_recv(client, tlv[1])
hmackey = self.cfm.get_node_attributes(nodename, ['secret.selfapiarmtoken'], decrypt=True)
hmackey = hmackey.get(nodename, {}).get('secret.selfapiarmtoken', {}).get('value', None)
elif tlv[1]:
client.recv(tlv[1])
await cloop.sock_recv(client, tlv[1])
apimats = self.cfm.get_node_attributes(nodename,
['deployment.apiarmed', 'deployment.sealedapikey'])
apiarmed = apimats.get(nodename, {}).get('deployment.apiarmed', {}).get(
@@ -95,7 +93,7 @@ class CredServer(object):
if not isinstance(sealed, bytes):
sealed = sealed.encode('utf8')
reply = b'\x80' + struct.pack('>H', len(sealed) + 1) + sealed + b'\x00'
client.send(reply)
await cloop.sock_send(client, reply)
client.close()
return
if apiarmed not in ('once', 'continuous'):
@@ -105,23 +103,23 @@ class CredServer(object):
self.cfm.set_node_attributes({nodename: {'deployment.apiarmed': ''}})
client.close()
return
client.send(b'\x02\x20')
await cloop.sock_send(client, b'\x02\x20')
rttoken = os.urandom(32)
client.send(rttoken)
client.send(b'\x00\x00')
tlv = bytearray(client.recv(2))
await cloop.sock_send(client, rttoken)
await cloop.sock_send(client, b'\x00\x00')
tlv = bytearray(await cloop.sock_recv(client, 2))
if tlv[0] != 3:
client.close()
return
echotoken = client.recv(tlv[1])
echotoken = await cloop.sock_recv(client, tlv[1])
if echotoken != rttoken:
client.close()
return
tlv = bytearray(client.recv(2))
tlv = bytearray(await cloop.sock_recv(client, 2))
if tlv[0] != 4:
client.close()
return
echotoken = util.stringify(client.recv(tlv[1]))
echotoken = util.stringify(await cloop.sock_recv(client, tlv[1]))
if hmackey:
etok = echotoken.encode('utf8')
if hmacval != hmac.new(hmackey, etok, hashlib.sha256).digest():
@@ -133,12 +131,14 @@ class CredServer(object):
if apiarmed == 'continuous':
del cfgupdate[nodename]['deployment.apiarmed']
self.cfm.set_node_attributes(cfgupdate)
client.recv(2) # drain end of message
client.send(b'\x05\x00') # report success
await cloop.sock_recv(client, 2) # drain end of message
await cloop.sock_send(client, b'\x05\x00') # report success
finally:
client.close()
if __name__ == '__main__':
async def main():
a = CredServer()
while True:
eventlet.sleep(86400)
await asyncio.sleep(86400)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

View File

@@ -34,7 +34,6 @@ import traceback
import socket
import ssl
import eventlet
import confluent.auth as auth
import confluent.credserver as credserver
@@ -57,7 +56,8 @@ plainsocket = None
libc = ctypes.CDLL(ctypes.util.find_library('c'))
libssl = ctypes.CDLL(ctypes.util.find_library('ssl'))
libssl.SSL_CTX_set_cert_verify_callback.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
libssl.SSL_CTX_set_cert_verify_callback.argtypes = [
ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
def _should_authlog(path, operation):
@@ -68,6 +68,7 @@ def _should_authlog(path, operation):
return False
return True
class ClientConsole(object):
def __init__(self, client):
self.client = client
@@ -109,7 +110,8 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None):
if authdata:
cfm = authdata[1]
authenticated = True
# version 0 == original, version 1 == pickle3 allowed, 2 = pickle forbidden, msgpack allowed
# version 0 == original, version 1 == pickle3 allowed,
# v2 = pickle forbidden, msgpack allowed
# v3 - filehandle allowed
# v4 - schema change and keepalive changes
@@ -121,25 +123,28 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None):
return
if 'collective' in response:
return collective.handle_connection(connection, cert,
response['collective'])
response['collective'])
while not configmanager.config_is_ready():
eventlet.sleep(1)
await asyncio.sleep(1)
if 'dispatch' in response:
dreq = tlvdata.recvall(connection, response['dispatch']['length'])
dreq = tlvdata.recvall(
connection, response['dispatch']['length'])
return pluginapi.handle_dispatch(connection, cert, dreq,
response['dispatch']['name'])
response['dispatch']['name'])
if 'proxyconsole' in response:
return start_proxy_term(connection, cert, response['proxyconsole'])
return start_proxy_term(connection, cert,
response['proxyconsole'])
authname = response['username']
passphrase = response['password']
# note(jbjohnso): here, we need to authenticate, but not
# authorize a user. When authorization starts understanding
# element path, that authorization will need to be called
# per request the user makes
authdata = auth.check_user_passphrase(authname, passphrase)
authdata = await auth.check_user_passphrase(authname, passphrase)
if not authdata:
auditlog.log(
{'operation': 'connect', 'user': authname, 'allowed': False})
{'operation': 'connect',
'user': authname, 'allowed': False})
else:
authenticated = True
cfm = authdata[1]
@@ -147,12 +152,14 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None):
request = await tlvdata.recv(connection)
if request and isinstance(request, dict) and 'collective' in request:
if skipauth:
return collective.handle_connection(connection, None, request['collective'],
local=True)
return collective.handle_connection(
connection, None, request['collective'], local=True)
else:
tlvdata.send(
connection,
{'collective': {'error': 'collective management commands may only be used by root'}})
connection,
{'collective': {
'error': 'collective management commands '
'may only be used by root'}})
while request is not None:
try:
await process_request(
@@ -160,19 +167,22 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None):
except exc.ConfluentException as e:
if ((not isinstance(e, exc.LockedCredentials)) and
e.apierrorcode == 500):
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
tracelog.log(
traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
await send_data(connection, {'errorcode': e.apierrorcode,
'error': e.apierrorstr,
'detail': e.get_error_body()})
'error': e.apierrorstr,
'detail': e.get_error_body()})
await send_data(connection, {'_requestdone': 1})
except SystemExit:
sys.exit(0)
except Exception as e:
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
await send_data(connection, {'errorcode': 500,
'error': 'Unexpected error - ' + str(e)})
event=log.Events.stacktrace)
await send_data(
connection,
{'errorcode': 500,
'error': 'Unexpected error - ' + str(e)})
await send_data(connection, {'_requestdone': 1})
try:
request = await tlvdata.recv(connection)
@@ -186,6 +196,7 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None):
except Exception:
pass
async def send_response(responses, connection):
if responses is None:
return
@@ -195,7 +206,8 @@ async def send_response(responses, connection):
await send_data(connection, {'_requestdone': 1})
async def process_request(connection, request, cfm, authdata, authname, skipauth):
async def process_request(
connection, request, cfm, authdata, authname, skipauth):
if isinstance(request, tlvdata.ClientFile):
cfm.add_client_file(request)
return
@@ -232,15 +244,16 @@ async def process_request(connection, request, cfm, authdata, authname, skipauth
hdlr = pluginapi.handle_path(path, operation, cfm, params)
except exc.NotFoundException as e:
send_data(connection, {"errorcode": 404,
"error": "Target not found - " + str(e)})
"error": "Target not found - " + str(e)})
send_data(connection, {"_requestdone": 1})
except exc.InvalidArgumentException as e:
send_data(connection, {"errorcode": 400,
"error": "Bad Request - " + str(e)})
"error": "Bad Request - " + str(e)})
send_data(connection, {"_requestdone": 1})
await send_response(hdlr, connection)
return
def start_proxy_term(connection, cert, request):
droneinfo = configmanager.get_collective_member(request['name'])
if not util.cert_matches(droneinfo['fingerprint'], cert):
@@ -255,6 +268,7 @@ def start_proxy_term(connection, cert, request):
'height', 24))
term_interact(None, None, ccons, None, connection, consession, None)
def start_term(authname, cfm, connection, params, path, authdata, skipauth):
elems = path.split('/')
if len(elems) < 4 or elems[1] != 'nodes':
@@ -322,8 +336,10 @@ def term_interact(authdata, authname, ccons, cfm, connection, consession,
tracelog.log(traceback.format_exc(),
ltype=log.DataTypes.event,
event=log.Events.stacktrace)
send_data(connection, {'errorcode': 500,
'error': 'Unexpected error - ' + str(e)})
send_data(
connection,
{'errorcode': 500,
'error': 'Unexpected error - ' + str(e)})
send_data(connection, {'_requestdone': 1})
continue
if not data:
@@ -336,6 +352,7 @@ def term_interact(authdata, authname, ccons, cfm, connection, consession,
async def _tlshandler(bind_host, bind_port):
global plainsocket
plainsocket = socket.socket(socket.AF_INET6)
plainsocket.settimeout(0)
plainsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
plainsocket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
bound = False
@@ -347,7 +364,7 @@ async def _tlshandler(bind_host, bind_port):
if e.errno != 98:
raise
sys.stderr.write('TLS Socket in use, retrying in 1 second\n')
eventlet.sleep(1)
await asyncio.sleep(1)
# Enable TCP_FASTOPEN
plainsocket.setsockopt(socket.SOL_TCP, 23, 5)
plainsocket.listen(5)
@@ -356,7 +373,7 @@ async def _tlshandler(bind_host, bind_port):
while (1): # TODO: exithook
cnn, addr = await cloop.sock_accept(plainsocket)
if addr[1] < 1000:
eventlet.spawn_n(cs.handle_client, cnn, addr)
asyncio.create_task(cs.handle_client(cnn, addr))
else:
asyncio.create_task(_tlsstartup(cnn))
@@ -404,8 +421,9 @@ async def _tlsstartup(cnn):
libssl.SSL_CTX_set_cert_verify_callback(ssl_ctx, verify_stub, 0)
sreader = asyncio.StreamReader()
sreaderprot = asyncio.StreamReaderProtocol(sreader)
tport, _ = await cloop.connect_accepted_socket(lambda: sreaderprot, sock=cnn, ssl=ctx)
swriter = asyncio.StreamWriter(tport, sreaderprot, sreader)
tport, _ = await cloop.connect_accepted_socket(
lambda: sreaderprot, sock=cnn, ssl=ctx)
swriter = asyncio.StreamWriter(tport, sreaderprot, sreader, cloop)
cert = tport.get_extra_info('ssl_object').getpeercert(binary_form=True)
cnn = (sreader, swriter)
#cnn = ctx.wrap_socket(cnn, server_side=True)