2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-03-10 10:09:17 +00:00

Further move toward asyncio and reduce PyOpenSSL dep

Since we are rebasing to at least Python 3.6, and with
some extra ctypes wranging of the ssl context, we can likely
remove PyOpenSSL. Take first steps by removing it from 'sockapi'.

Have confluent executable become the 'top level' for eventlet, to allow
work on 'de-eventleting' on 'main.py'.

Rework tlvdata to deal with either a socket or a reader, writer tuple.
Using TLS with asyncio is easiest with the 'open_connection'
semantics, which force either a Protocol handler (callback based) or
dual streams.  While protocol approach ends with a more socket-like
'transport', the 'protocol' half is a bit unwieldy. So reader and writer
streams instead.
This commit is contained in:
Jarrod Johnson
2024-03-29 16:23:45 -04:00
parent 81428727d3
commit 1fbaee6149
6 changed files with 100 additions and 129 deletions

View File

@@ -190,6 +190,13 @@ def _unicode_list(currlist):
_unicode_list(currlist[i])
async def sendall(handle, data):
if isinstance(handle, tuple):
return await handle[1].write(data)
else:
cloop = asyncio.get_event_loop()
return await cloop.sock_sendall(handle, data)
async def send(handle, data, filehandle=None):
cloop = asyncio.get_event_loop()
if isinstance(data, unicode):
@@ -223,17 +230,26 @@ async def send(handle, data, filehandle=None):
tl |= 16777216
await cloop.sock_sendall(handle, struct.pack("!I", tl))
await cloop.sock_sendall(handle, sdata)
elif isinstance (handle, tuple):
raise Exception("Cannot send filehandle over network socket")
else:
tl |= (2 << 24)
await cloop.sock_sendall(handle, struct.pack("!I", tl))
await send_fds(handle, b'', [filehandle])
async def _grabhdl(handle, size):
if isinstance(handle, tuple):
return await handle[0].read(size)
else:
cloop = asyncio.get_event_loop()
return await cloop.sock_recv(handle, size)
async def recvall(handle, size):
cloop = asyncio.get_event_loop()
rd = await cloop.sock_recv(handle, size)
rd = await _grabhdl(handle, size)
while len(rd) < size:
nd = await cloop.sock_recv(handle, size - len(rd))
nd = await _grabhdl(handle, size - len(rd))
if not nd:
raise Exception("Error reading data")
rd += nd
@@ -241,12 +257,11 @@ async def recvall(handle, size):
async def recv(handle):
cloop = asyncio.get_event_loop()
tl = await cloop.sock_recv(handle, 4)
tl = await _grabhdl(handle, 4)
if not tl:
return None
while len(tl) < 4:
ndata = await cloop.sock_recv(handle, 4 - len(tl))
ndata = await _grabhdl(handle, 4 - len(tl))
if not ndata:
raise Exception("Error reading data")
tl += ndata
@@ -261,6 +276,8 @@ async def recv(handle):
if dlen == 0:
return None
if datatype == tlv.Types.filehandle:
if isinstance(handle, tuple):
raise Exception('Filehandle not supported over TLS socket')
filehandles = array.array('i')
rawbuffer = bytearray(2048)
pkttype = ctypes.c_ubyte * 2048
@@ -287,9 +304,9 @@ async def recv(handle):
data = json.loads(bytes(data))
return ClientFile(data['filename'], data['mode'], filehandles[0])
else:
data = await cloop.sock_recv(handle, dlen)
data = await _grabhdl(handle, dlen)
while len(data) < dlen:
ndata = await cloop.sock_recv(handle, dlen - len(data))
ndata = await _grabhdl(handle, dlen - len(data))
if not ndata:
raise Exception("Error reading data")
data += ndata

View File

@@ -1,4 +1,4 @@
#!/usr/bin/python2
#!/usr/bin/python3
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 IBM Corporation
@@ -15,10 +15,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import sys
import os
import eventlet
import eventlet.hubs
eventlet.hubs.use_hub("eventlet.hubs.asyncio")
from eventlet.asyncio import spawn_for_awaitable
path = os.path.dirname(os.path.realpath(__file__))
path = os.path.realpath(os.path.join(path, '..', 'lib', 'python'))
@@ -35,7 +39,8 @@ import confluent.main
import multiprocessing
if __name__ == '__main__':
multiprocessing.freeze_support()
confluent.main.run(sys.argv)
gt = spawn_for_awaitable(confluent.main.run(sys.argv))
gt.wait()
#except:
# pass
#p.disable()

View File

@@ -62,9 +62,7 @@ except ImportError:
crypto = None
import confluent.util as util
import eventlet
import eventlet.greenpool as greenpool
import eventlet.green.ssl as ssl
import eventlet.queue as queue
import eventlet.semaphore as semaphore
import itertools
import msgpack
@@ -1093,7 +1091,6 @@ async def handle_node_request(configmanager, inputdata, operation,
nodesbyhandler[hfunc] = [node]
for bn in badcollnodes:
nodesbyhandler[BadCollective(bn).error] = [bn]
workers = greenpool.GreenPool()
numworkers = 0
for hfunc in nodesbyhandler:
numworkers += 1

View File

@@ -40,7 +40,6 @@ import confluent.log as log
import confluent.collective.manager as collective
import confluent.discovery.protocols.pxe as pxe
import linecache
from eventlet.asyncio import spawn_for_awaitable
try:
import confluent.sockapi as sockapi
except ImportError:
@@ -283,7 +282,7 @@ def migrate_db():
configmanager.init()
def run(args):
async def run(args):
setlimits()
try:
configmanager.ConfigManager(None)
@@ -349,7 +348,7 @@ def run(args):
sock_bind_host, sock_bind_port = _get_connector_config('socket')
try:
sockservice = sockapi.SockApi(sock_bind_host, sock_bind_port)
spawn_for_awaitable(sockservice.start())
asyncio.get_event_loop().create_task(sockservice.start())
except NameError:
pass
webservice = httpapi.HttpApi(http_bind_host, http_bind_port)
@@ -361,12 +360,12 @@ def run(args):
configmanager.check_quorum()
break
except Exception:
eventlet.sleep(0.5)
await asyncio.sleep(0.5)
eventlet.spawn_n(disco.start_detection)
eventlet.sleep(1)
await asyncio.sleep(1)
consoleserver.start_console_sessions()
while 1:
eventlet.sleep(100)
await asyncio.sleep(100)
def _get_connector_config(session):
host = conf.get_option(session, 'bindhost')

View File

@@ -106,12 +106,6 @@ class NullLock(object):
#console.session.socket.getaddrinfo = eventlet.support.greendns.getaddrinfo
def exithandler():
if console.session.iothread is not None:
console.session.iothread.join()
atexit.register(exithandler)
#_ipmiworkers = greenpool.GreenPool(512)
_ipmithread = None

View File

@@ -32,10 +32,8 @@ import struct
import sys
import traceback
import eventlet.green.select as select
import socket
import eventlet.green.ssl as ssl
import eventlet.support.greendns as greendns
import ssl
import eventlet
import confluent.auth as auth
@@ -53,33 +51,14 @@ import confluent.util as util
tracelog = None
auditlog = None
try:
SO_PEERCRED = socket.SO_PEERCRED
except AttributeError:
import platform
if "ppc64" in platform.machine():
SO_PEERCRED = 21
else:
SO_PEERCRED = 17
try:
# Python core TLS despite improvements still has no support for custom
# verify functions.... try to use PyOpenSSL where available to support
# client certificates with custom verify
import eventlet.green.OpenSSL.SSL as libssl
# further, not even pyopenssl exposes SSL_CTX_set_cert_verify_callback
# so we need to ffi that in using a strategy compatible with PyOpenSSL
import OpenSSL.SSL as libssln
import OpenSSL.crypto as crypto
from OpenSSL._util import ffi
except ImportError:
libssl = None
ffi = None
crypto = None
plainsocket = None
libc = ctypes.CDLL(ctypes.util.find_library('c'))
libssl = ctypes.CDLL(ctypes.util.find_library('ssl'))
libssl.SSL_CTX_set_cert_verify_callback.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
def _should_authlog(path, operation):
if (operation == 'retrieve' and
@@ -168,17 +147,6 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None):
request = await tlvdata.recv(connection)
if request and isinstance(request, dict) and 'collective' in request:
if skipauth:
if not libssl:
tlvdata.send(
connection,
{'collective': {'error': 'Server either does not have '
'python-pyopenssl installed or has an '
'incorrect version installed '
'(e.g. pyOpenSSL would need to be '
'replaced with python-pyopenssl). '
'Restart confluent after updating '
'the dependency.'}})
return
return collective.handle_connection(connection, None, request['collective'],
local=True)
else:
@@ -274,7 +242,6 @@ async def process_request(connection, request, cfm, authdata, authname, skipauth
return
def start_proxy_term(connection, cert, request):
cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
droneinfo = configmanager.get_collective_member(request['name'])
if not util.cert_matches(droneinfo['fingerprint'], cert):
connection.close()
@@ -366,7 +333,7 @@ def term_interact(authdata, authname, ccons, cfm, connection, consession,
connection.close()
def _tlshandler(bind_host, bind_port):
async def _tlshandler(bind_host, bind_port):
global plainsocket
plainsocket = socket.socket(socket.AF_INET6)
plainsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@@ -385,21 +352,38 @@ def _tlshandler(bind_host, bind_port):
plainsocket.setsockopt(socket.SOL_TCP, 23, 5)
plainsocket.listen(5)
cs = credserver.CredServer()
cloop = asyncio.get_event_loop()
while (1): # TODO: exithook
cnn, addr = plainsocket.accept()
cnn, addr = await cloop.sock_accept(plainsocket)
if addr[1] < 1000:
eventlet.spawn_n(cs.handle_client, cnn, addr)
else:
asyncio.create_task(_tlsstartup(cnn))
if ffi:
@ffi.callback("int(*)( X509_STORE_CTX *, void*)")
def verify_stub(store, misc):
return 1
@ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p)
def verify_stub(store, misc):
return 1
class PyObject_HEAD(ctypes.Structure):
_fields_ = [
("ob_refcnt", ctypes.c_ssize_t),
("ob_type", ctypes.c_void_p),
]
# see main/Modules/_ssl.c, only caring about the SSL_CTX pointer
class PySSLContext(ctypes.Structure):
_fields_ = [
("ob_base", PyObject_HEAD),
("ctx", ctypes.c_void_p),
]
async def _tlsstartup(cnn):
authname = None
cloop = asyncio.get_event_loop()
cert = None
conf.init_config()
configfile = conf.get_config()
@@ -407,37 +391,27 @@ async def _tlsstartup(cnn):
ciphers = configfile.get('security', 'cipher_list')
else:
ciphers = 'ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384'
if libssl:
# most fully featured SSL function
ctx = libssl.Context(libssl.SSLv23_METHOD)
ctx.set_options(libssl.OP_NO_SSLv2 | libssl.OP_NO_SSLv3 |
libssl.OP_NO_TLSv1 | libssl.OP_NO_TLSv1_1 |
libssl.OP_CIPHER_SERVER_PREFERENCE)
ctx.set_cipher_list(ciphers)
ctx.set_tmp_ecdh(crypto.get_elliptic_curve('secp384r1'))
ctx.use_certificate_file('/etc/confluent/srvcert.pem')
ctx.use_privatekey_file('/etc/confluent/privkey.pem')
ctx.set_verify(libssln.VERIFY_PEER, lambda *args: True)
libssln._lib.SSL_CTX_set_cert_verify_callback(ctx._context,
verify_stub, ffi.NULL)
cnn = libssl.Connection(ctx, cnn)
cnn.set_accept_state()
cnn.do_handshake()
cert = cnn.get_peer_certificate()
else:
try:
# Try relatively newer python TLS function
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.options |= ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3
ctx.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
ctx.options |= ssl.OP_CIPHER_SERVER_PREFERENCE
ctx.set_ciphers(ciphers)
ctx.load_cert_chain('/etc/confluent/srvcert.pem',
'/etc/confluent/privkey.pem')
cnn = ctx.wrap_socket(cnn, server_side=True)
except AttributeError:
raise Exception('Unable to find workable SSL support')
await sessionhdl(cnn, authname, cert=cert)
try:
# Try relatively newer python TLS function
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ctx.options |= ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3
ctx.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
ctx.options |= ssl.OP_CIPHER_SERVER_PREFERENCE
ctx.set_ciphers(ciphers)
ctx.load_cert_chain('/etc/confluent/srvcert.pem',
'/etc/confluent/privkey.pem')
ssl_ctx = PySSLContext.from_address(id(ctx)).ctx
libssl.SSL_CTX_set_cert_verify_callback(ssl_ctx, verify_stub, 0)
sreader = asyncio.StreamReader()
sreaderprot = asyncio.StreamReaderProtocol(sreader)
tport, _ = await cloop.connect_accepted_socket(lambda: sreaderprot, sock=cnn, ssl=ctx)
swriter = asyncio.StreamWriter(tport, sreaderprot, sreader)
cert = tport.get_extra_info('ssl_object').getpeercert(binary_form=True)
cnn = (sreader, swriter)
#cnn = ctx.wrap_socket(cnn, server_side=True)
except AttributeError:
raise Exception('Unable to find workable SSL support')
asyncio.create_task(sessionhdl(cnn, authname, cert=cert))
def removesocket():
try:
@@ -463,7 +437,7 @@ async def _unixdomainhandler():
unixsocket.listen(5)
while True:
cnn, addr = await aloop.sock_accept(unixsocket)
creds = cnn.getsockopt(socket.SOL_SOCKET, SO_PEERCRED,
creds = cnn.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED,
struct.calcsize('iII'))
pid, uid, gid = struct.unpack('iII', creds)
skipauth = False
@@ -502,37 +476,22 @@ class SockApi(object):
if self.should_run_remoteapi():
self.start_remoteapi()
else:
eventlet.spawn_n(self.watch_for_cert)
eventlet.spawn_n(self.watch_resolv)
cloop = asyncio.get_event_loop()
cloop.create_task(self.watch_for_cert())
self.unixdomainserver = asyncio.create_task(_unixdomainhandler())
def watch_resolv(self):
while True:
watcher = libc.inotify_init1(os.O_NONBLOCK)
resolvpath = '/etc/resolv.conf'
while True:
try:
resolvpath = os.readlink(resolvpath)
except Exception:
break
if not isinstance(resolvpath, bytes):
resolvpath = resolvpath.encode('utf8')
if libc.inotify_add_watch(watcher, resolvpath, 0xcc2) <= -1:
eventlet.sleep(15)
continue
select.select((watcher,), (), (), 86400)
try:
os.read(watcher, 1024)
except Exception:
pass
greendns.resolver = greendns.ResolverProxy(hosts_resolver=greendns.HostsResolver())
os.close(watcher)
def watch_for_cert(self):
async def watch_for_cert(self):
watcher = libc.inotify_init1(os.O_NONBLOCK)
if libc.inotify_add_watch(watcher, b'/etc/confluent/', 0x100) > -1:
while True:
select.select((watcher,), (), (), 86400)
currfut = asyncio.Future()
asyncio.get_event_loop().add_reader(
watcher, currfut.set_result, None)
currfut.add_done_callback(
lambda x: asyncio.get_event_loop().remove_reader(watcher))
done, _ = await asyncio.wait([currfut], return_when=asyncio.FIRST_COMPLETED)
for currfut in done:
await currfut
try:
os.read(watcher, 1024)
except Exception:
@@ -548,12 +507,12 @@ class SockApi(object):
def stop_remoteapi(self):
if self.tlsserver is None:
return
self.tlsserver.kill()
self.tlsserver.cancel()
plainsocket.close()
self.tlsserver = None
def start_remoteapi(self):
if self.tlsserver is not None:
return
self.tlsserver = eventlet.spawn(
_tlshandler, self.bind_host, self.bind_port)
self.tlsserver = asyncio.get_event_loop().create_task(
_tlshandler(self.bind_host, self.bind_port))