diff --git a/confluent_client/confluent/tlvdata.py b/confluent_client/confluent/tlvdata.py index 158028b5..408e19fd 100644 --- a/confluent_client/confluent/tlvdata.py +++ b/confluent_client/confluent/tlvdata.py @@ -190,6 +190,13 @@ def _unicode_list(currlist): _unicode_list(currlist[i]) +async def sendall(handle, data): + if isinstance(handle, tuple): + return await handle[1].write(data) + else: + cloop = asyncio.get_event_loop() + return await cloop.sock_sendall(handle, data) + async def send(handle, data, filehandle=None): cloop = asyncio.get_event_loop() if isinstance(data, unicode): @@ -223,17 +230,26 @@ async def send(handle, data, filehandle=None): tl |= 16777216 await cloop.sock_sendall(handle, struct.pack("!I", tl)) await cloop.sock_sendall(handle, sdata) + elif isinstance (handle, tuple): + raise Exception("Cannot send filehandle over network socket") else: tl |= (2 << 24) await cloop.sock_sendall(handle, struct.pack("!I", tl)) await send_fds(handle, b'', [filehandle]) +async def _grabhdl(handle, size): + if isinstance(handle, tuple): + return await handle[0].read(size) + else: + cloop = asyncio.get_event_loop() + return await cloop.sock_recv(handle, size) + + async def recvall(handle, size): - cloop = asyncio.get_event_loop() - rd = await cloop.sock_recv(handle, size) + rd = await _grabhdl(handle, size) while len(rd) < size: - nd = await cloop.sock_recv(handle, size - len(rd)) + nd = await _grabhdl(handle, size - len(rd)) if not nd: raise Exception("Error reading data") rd += nd @@ -241,12 +257,11 @@ async def recvall(handle, size): async def recv(handle): - cloop = asyncio.get_event_loop() - tl = await cloop.sock_recv(handle, 4) + tl = await _grabhdl(handle, 4) if not tl: return None while len(tl) < 4: - ndata = await cloop.sock_recv(handle, 4 - len(tl)) + ndata = await _grabhdl(handle, 4 - len(tl)) if not ndata: raise Exception("Error reading data") tl += ndata @@ -261,6 +276,8 @@ async def recv(handle): if dlen == 0: return None if datatype == tlv.Types.filehandle: + if isinstance(handle, tuple): + raise Exception('Filehandle not supported over TLS socket') filehandles = array.array('i') rawbuffer = bytearray(2048) pkttype = ctypes.c_ubyte * 2048 @@ -287,9 +304,9 @@ async def recv(handle): data = json.loads(bytes(data)) return ClientFile(data['filename'], data['mode'], filehandles[0]) else: - data = await cloop.sock_recv(handle, dlen) + data = await _grabhdl(handle, dlen) while len(data) < dlen: - ndata = await cloop.sock_recv(handle, dlen - len(data)) + ndata = await _grabhdl(handle, dlen - len(data)) if not ndata: raise Exception("Error reading data") data += ndata diff --git a/confluent_server/bin/confluent b/confluent_server/bin/confluent index 708f5d73..751208a2 100755 --- a/confluent_server/bin/confluent +++ b/confluent_server/bin/confluent @@ -1,4 +1,4 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2014 IBM Corporation @@ -15,10 +15,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import sys import os +import eventlet import eventlet.hubs eventlet.hubs.use_hub("eventlet.hubs.asyncio") +from eventlet.asyncio import spawn_for_awaitable + path = os.path.dirname(os.path.realpath(__file__)) path = os.path.realpath(os.path.join(path, '..', 'lib', 'python')) @@ -35,7 +39,8 @@ import confluent.main import multiprocessing if __name__ == '__main__': multiprocessing.freeze_support() - confluent.main.run(sys.argv) + gt = spawn_for_awaitable(confluent.main.run(sys.argv)) + gt.wait() #except: # pass #p.disable() diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 4bf55038..196a0ee6 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -62,9 +62,7 @@ except ImportError: crypto = None import confluent.util as util import eventlet -import eventlet.greenpool as greenpool import eventlet.green.ssl as ssl -import eventlet.queue as queue import eventlet.semaphore as semaphore import itertools import msgpack @@ -1093,7 +1091,6 @@ async def handle_node_request(configmanager, inputdata, operation, nodesbyhandler[hfunc] = [node] for bn in badcollnodes: nodesbyhandler[BadCollective(bn).error] = [bn] - workers = greenpool.GreenPool() numworkers = 0 for hfunc in nodesbyhandler: numworkers += 1 diff --git a/confluent_server/confluent/main.py b/confluent_server/confluent/main.py index d545d2cd..7375b584 100644 --- a/confluent_server/confluent/main.py +++ b/confluent_server/confluent/main.py @@ -40,7 +40,6 @@ import confluent.log as log import confluent.collective.manager as collective import confluent.discovery.protocols.pxe as pxe import linecache -from eventlet.asyncio import spawn_for_awaitable try: import confluent.sockapi as sockapi except ImportError: @@ -283,7 +282,7 @@ def migrate_db(): configmanager.init() -def run(args): +async def run(args): setlimits() try: configmanager.ConfigManager(None) @@ -349,7 +348,7 @@ def run(args): sock_bind_host, sock_bind_port = _get_connector_config('socket') try: sockservice = sockapi.SockApi(sock_bind_host, sock_bind_port) - spawn_for_awaitable(sockservice.start()) + asyncio.get_event_loop().create_task(sockservice.start()) except NameError: pass webservice = httpapi.HttpApi(http_bind_host, http_bind_port) @@ -361,12 +360,12 @@ def run(args): configmanager.check_quorum() break except Exception: - eventlet.sleep(0.5) + await asyncio.sleep(0.5) eventlet.spawn_n(disco.start_detection) - eventlet.sleep(1) + await asyncio.sleep(1) consoleserver.start_console_sessions() while 1: - eventlet.sleep(100) + await asyncio.sleep(100) def _get_connector_config(session): host = conf.get_option(session, 'bindhost') diff --git a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py index 420f49c1..ba4ecb42 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py @@ -106,12 +106,6 @@ class NullLock(object): #console.session.socket.getaddrinfo = eventlet.support.greendns.getaddrinfo -def exithandler(): - if console.session.iothread is not None: - console.session.iothread.join() - -atexit.register(exithandler) - #_ipmiworkers = greenpool.GreenPool(512) _ipmithread = None diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 0a4ff1b7..82df5bb0 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -32,10 +32,8 @@ import struct import sys import traceback -import eventlet.green.select as select import socket -import eventlet.green.ssl as ssl -import eventlet.support.greendns as greendns +import ssl import eventlet import confluent.auth as auth @@ -53,33 +51,14 @@ import confluent.util as util tracelog = None auditlog = None -try: - SO_PEERCRED = socket.SO_PEERCRED -except AttributeError: - import platform - if "ppc64" in platform.machine(): - SO_PEERCRED = 21 - else: - SO_PEERCRED = 17 -try: - # Python core TLS despite improvements still has no support for custom - # verify functions.... try to use PyOpenSSL where available to support - # client certificates with custom verify - import eventlet.green.OpenSSL.SSL as libssl - # further, not even pyopenssl exposes SSL_CTX_set_cert_verify_callback - # so we need to ffi that in using a strategy compatible with PyOpenSSL - import OpenSSL.SSL as libssln - import OpenSSL.crypto as crypto - from OpenSSL._util import ffi -except ImportError: - libssl = None - ffi = None - crypto = None plainsocket = None libc = ctypes.CDLL(ctypes.util.find_library('c')) +libssl = ctypes.CDLL(ctypes.util.find_library('ssl')) +libssl.SSL_CTX_set_cert_verify_callback.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] + def _should_authlog(path, operation): if (operation == 'retrieve' and @@ -168,17 +147,6 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): request = await tlvdata.recv(connection) if request and isinstance(request, dict) and 'collective' in request: if skipauth: - if not libssl: - tlvdata.send( - connection, - {'collective': {'error': 'Server either does not have ' - 'python-pyopenssl installed or has an ' - 'incorrect version installed ' - '(e.g. pyOpenSSL would need to be ' - 'replaced with python-pyopenssl). ' - 'Restart confluent after updating ' - 'the dependency.'}}) - return return collective.handle_connection(connection, None, request['collective'], local=True) else: @@ -274,7 +242,6 @@ async def process_request(connection, request, cfm, authdata, authname, skipauth return def start_proxy_term(connection, cert, request): - cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) droneinfo = configmanager.get_collective_member(request['name']) if not util.cert_matches(droneinfo['fingerprint'], cert): connection.close() @@ -366,7 +333,7 @@ def term_interact(authdata, authname, ccons, cfm, connection, consession, connection.close() -def _tlshandler(bind_host, bind_port): +async def _tlshandler(bind_host, bind_port): global plainsocket plainsocket = socket.socket(socket.AF_INET6) plainsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -385,21 +352,38 @@ def _tlshandler(bind_host, bind_port): plainsocket.setsockopt(socket.SOL_TCP, 23, 5) plainsocket.listen(5) cs = credserver.CredServer() + cloop = asyncio.get_event_loop() while (1): # TODO: exithook - cnn, addr = plainsocket.accept() + cnn, addr = await cloop.sock_accept(plainsocket) if addr[1] < 1000: eventlet.spawn_n(cs.handle_client, cnn, addr) else: asyncio.create_task(_tlsstartup(cnn)) -if ffi: - @ffi.callback("int(*)( X509_STORE_CTX *, void*)") - def verify_stub(store, misc): - return 1 +@ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p) +def verify_stub(store, misc): + return 1 + + +class PyObject_HEAD(ctypes.Structure): + _fields_ = [ + ("ob_refcnt", ctypes.c_ssize_t), + ("ob_type", ctypes.c_void_p), + ] + + +# see main/Modules/_ssl.c, only caring about the SSL_CTX pointer +class PySSLContext(ctypes.Structure): + _fields_ = [ + ("ob_base", PyObject_HEAD), + ("ctx", ctypes.c_void_p), + ] + async def _tlsstartup(cnn): authname = None + cloop = asyncio.get_event_loop() cert = None conf.init_config() configfile = conf.get_config() @@ -407,37 +391,27 @@ async def _tlsstartup(cnn): ciphers = configfile.get('security', 'cipher_list') else: ciphers = 'ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384' - if libssl: - # most fully featured SSL function - ctx = libssl.Context(libssl.SSLv23_METHOD) - ctx.set_options(libssl.OP_NO_SSLv2 | libssl.OP_NO_SSLv3 | - libssl.OP_NO_TLSv1 | libssl.OP_NO_TLSv1_1 | - libssl.OP_CIPHER_SERVER_PREFERENCE) - ctx.set_cipher_list(ciphers) - ctx.set_tmp_ecdh(crypto.get_elliptic_curve('secp384r1')) - ctx.use_certificate_file('/etc/confluent/srvcert.pem') - ctx.use_privatekey_file('/etc/confluent/privkey.pem') - ctx.set_verify(libssln.VERIFY_PEER, lambda *args: True) - libssln._lib.SSL_CTX_set_cert_verify_callback(ctx._context, - verify_stub, ffi.NULL) - cnn = libssl.Connection(ctx, cnn) - cnn.set_accept_state() - cnn.do_handshake() - cert = cnn.get_peer_certificate() - else: - try: - # Try relatively newer python TLS function - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - ctx.options |= ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 - ctx.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 - ctx.options |= ssl.OP_CIPHER_SERVER_PREFERENCE - ctx.set_ciphers(ciphers) - ctx.load_cert_chain('/etc/confluent/srvcert.pem', - '/etc/confluent/privkey.pem') - cnn = ctx.wrap_socket(cnn, server_side=True) - except AttributeError: - raise Exception('Unable to find workable SSL support') - await sessionhdl(cnn, authname, cert=cert) + try: + # Try relatively newer python TLS function + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ctx.options |= ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 + ctx.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 + ctx.options |= ssl.OP_CIPHER_SERVER_PREFERENCE + ctx.set_ciphers(ciphers) + ctx.load_cert_chain('/etc/confluent/srvcert.pem', + '/etc/confluent/privkey.pem') + ssl_ctx = PySSLContext.from_address(id(ctx)).ctx + libssl.SSL_CTX_set_cert_verify_callback(ssl_ctx, verify_stub, 0) + sreader = asyncio.StreamReader() + sreaderprot = asyncio.StreamReaderProtocol(sreader) + tport, _ = await cloop.connect_accepted_socket(lambda: sreaderprot, sock=cnn, ssl=ctx) + swriter = asyncio.StreamWriter(tport, sreaderprot, sreader) + cert = tport.get_extra_info('ssl_object').getpeercert(binary_form=True) + cnn = (sreader, swriter) + #cnn = ctx.wrap_socket(cnn, server_side=True) + except AttributeError: + raise Exception('Unable to find workable SSL support') + asyncio.create_task(sessionhdl(cnn, authname, cert=cert)) def removesocket(): try: @@ -463,7 +437,7 @@ async def _unixdomainhandler(): unixsocket.listen(5) while True: cnn, addr = await aloop.sock_accept(unixsocket) - creds = cnn.getsockopt(socket.SOL_SOCKET, SO_PEERCRED, + creds = cnn.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED, struct.calcsize('iII')) pid, uid, gid = struct.unpack('iII', creds) skipauth = False @@ -502,37 +476,22 @@ class SockApi(object): if self.should_run_remoteapi(): self.start_remoteapi() else: - eventlet.spawn_n(self.watch_for_cert) - eventlet.spawn_n(self.watch_resolv) + cloop = asyncio.get_event_loop() + cloop.create_task(self.watch_for_cert()) self.unixdomainserver = asyncio.create_task(_unixdomainhandler()) - def watch_resolv(self): - while True: - watcher = libc.inotify_init1(os.O_NONBLOCK) - resolvpath = '/etc/resolv.conf' - while True: - try: - resolvpath = os.readlink(resolvpath) - except Exception: - break - if not isinstance(resolvpath, bytes): - resolvpath = resolvpath.encode('utf8') - if libc.inotify_add_watch(watcher, resolvpath, 0xcc2) <= -1: - eventlet.sleep(15) - continue - select.select((watcher,), (), (), 86400) - try: - os.read(watcher, 1024) - except Exception: - pass - greendns.resolver = greendns.ResolverProxy(hosts_resolver=greendns.HostsResolver()) - os.close(watcher) - - def watch_for_cert(self): + async def watch_for_cert(self): watcher = libc.inotify_init1(os.O_NONBLOCK) if libc.inotify_add_watch(watcher, b'/etc/confluent/', 0x100) > -1: while True: - select.select((watcher,), (), (), 86400) + currfut = asyncio.Future() + asyncio.get_event_loop().add_reader( + watcher, currfut.set_result, None) + currfut.add_done_callback( + lambda x: asyncio.get_event_loop().remove_reader(watcher)) + done, _ = await asyncio.wait([currfut], return_when=asyncio.FIRST_COMPLETED) + for currfut in done: + await currfut try: os.read(watcher, 1024) except Exception: @@ -548,12 +507,12 @@ class SockApi(object): def stop_remoteapi(self): if self.tlsserver is None: return - self.tlsserver.kill() + self.tlsserver.cancel() plainsocket.close() self.tlsserver = None def start_remoteapi(self): if self.tlsserver is not None: return - self.tlsserver = eventlet.spawn( - _tlshandler, self.bind_host, self.bind_port) + self.tlsserver = asyncio.get_event_loop().create_task( + _tlshandler(self.bind_host, self.bind_port))