From 42bfde3f864b415210f2d6205c56c2b099d28c5d Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 4 Mar 2026 15:13:30 -0500 Subject: [PATCH] Restore VNC console handling to async branch --- .../plugins/hardwaremanagement/proxmox.py | 8 +- confluent_server/confluent/vinzmanager.py | 92 ++++++++++--------- 2 files changed, 52 insertions(+), 48 deletions(-) diff --git a/confluent_server/confluent/plugins/hardwaremanagement/proxmox.py b/confluent_server/confluent/plugins/hardwaremanagement/proxmox.py index 9a983faf..22199280 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/proxmox.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/proxmox.py @@ -63,8 +63,8 @@ class KvmConnHandler: self.pmxclient = pmxclient self.node = node - def connect(self): - consdata = self.pmxclient.get_vm_ikvm(self.node) + async def connect(self): + consdata = await self.pmxclient.get_vm_ikvm(self.node) consdata['fprint'] = self.pmxclient.fprint return KvmConnection(consdata) @@ -404,7 +404,7 @@ async def update(nodes, element, configmanager, inputdata): elif element == ['console', 'ikvm']: try: currclient = clientsbynode[node] - url = vinzmanager.get_url(node, inputdata, nodeparmcallback=KvmConnHandler(currclient, node).connect) + url = await vinzmanager.get_url(node, inputdata, nodeparmcallback=KvmConnHandler(currclient, node).connect) except Exception as e: print(repr(e)) return @@ -418,7 +418,7 @@ async def create(nodes, element, configmanager, inputdata): if element == ['console', 'ikvm']: try: currclient = clientsbynode[node] - url = vinzmanager.get_url(node, inputdata, nodeparmcallback=KvmConnHandler(currclient, node).connect) + url = await vinzmanager.get_url(node, inputdata, nodeparmcallback=KvmConnHandler(currclient, node).connect) except Exception as e: print(repr(e)) return diff --git a/confluent_server/confluent/vinzmanager.py b/confluent_server/confluent/vinzmanager.py index 101b29e4..77c2123c 100644 --- a/confluent_server/confluent/vinzmanager.py +++ b/confluent_server/confluent/vinzmanager.py @@ -3,6 +3,7 @@ import asyncio import confluent.auth as auth import confluent.messages as msg import confluent.exceptions as exc +import confluent.tasks as tasks import confluent.util as util import confluent.config.configmanager as configmanager import struct @@ -43,7 +44,7 @@ async def assure_vinz(): '-d', '/var/run/confluent/vinz/sessions') while not os.path.exists('/var/run/confluent/vinz/control'): await asyncio.sleep(0.5) - util.spawn(monitor_requests) + tasks.spawn_task(monitor_requests()) finally: startingup = False @@ -52,7 +53,7 @@ _nodeparms = {} async def get_url(nodename, inputdata, nodeparmcallback=None): _nodeparms[nodename] = nodeparmcallback method = inputdata.inputbynode[nodename] - assure_vinz() + await assure_vinz() if method == 'wss': return f'/vinz/kvmsession/{nodename}' elif method == 'unix': @@ -62,7 +63,7 @@ async def get_url(nodename, inputdata, nodeparmcallback=None): _usersessions = {} -def close_session(sessionid): +async def close_session(sessionid): sessioninfo = _usersessions.get(sessionid, None) if not sessioninfo: return @@ -84,7 +85,7 @@ def close_session(sessionid): if not isinstance(bmcpass, str): bmcpass = bmcpass.decode() if bmcuser and bmcpass: - wc.grab_json_response_with_status( + await wc.grab_json_response_with_status( '/logout', {'data': [bmcuser, bmcpass]}, headers={ 'Content-Type': 'application/json', @@ -104,7 +105,7 @@ async def send_grant(conn, nodename, rqtype): if parmcallback: # plugin that handles the specifics of the vnc wrapping if rqtype == 1: raise Exception("Plugin managed login data not supported with legacy grant request") - cxnmgr = parmcallback() + cxnmgr = await parmcallback() _usersessions[sessionid] = { 'cxnmgr': cxnmgr, 'nodename': nodename, @@ -168,43 +169,44 @@ async def send_grant(conn, nodename, rqtype): if '$' in fprint: fprint = fprint.split('$', 1)[1] fprint = bytes.fromhex(fprint) - await cloop.sock_send(conn, struct.pack('!BI', rqtype, len(host))) - await cloop.sock_send(conn, host.encode()) - await cloop.sock_send(conn, struct.pack('!I', len(sessionid))) - await cloop.sock_send(conn, sessionid.encode()) + await cloop.sock_sendall(conn, struct.pack('!BI', rqtype, len(host))) + await cloop.sock_sendall(conn, host.encode()) + await cloop.sock_sendall(conn, struct.pack('!I', len(sessionid))) + await cloop.sock_sendall(conn, sessionid.encode()) if rqtype == 1: - await cloop.sock_send(conn, struct.pack('!I', len(sessiontok))) - await cloop.sock_send(conn, sessiontok.encode()) - await cloop.sock_send(conn, struct.pack('!I', len(fprint))) - await cloop.sock_send(conn, fprint) - await cloop.sock_send(conn, struct.pack('!I', len(url))) - await cloop.sock_send(conn, url.encode()) + await cloop.sock_sendall(conn, struct.pack('!I', len(sessiontok))) + await cloop.sock_sendall(conn, sessiontok.encode()) + await cloop.sock_sendall(conn, struct.pack('!I', len(fprint))) + await cloop.sock_sendall(conn, fprint) + await cloop.sock_sendall(conn, struct.pack('!I', len(url))) + await cloop.sock_sendall(conn, url.encode()) else: # newer TLV style protocol - await cloop.sock_send(conn, struct.pack('!H', portnum)) - await cloop.sock_send(conn, struct.pack('!BI', 4, len(url))) - await cloop.sock_send(conn, url.encode()) + await cloop.sock_sendall(conn, struct.pack('!H', portnum)) + await cloop.sock_sendall(conn, struct.pack('!BI', 4, len(url))) + await cloop.sock_sendall(conn, url.encode()) for cook in cookies: v = cookies[cook] totlen = len(cook) + len(v) + 4 - await cloop.sock_send(conn, struct.pack('!BIH', 1, totlen, len(cook.encode()))) - await cloop.sock_send(conn, cook.encode()) - await cloop.sock_send(conn, struct.pack('!H', len(v.encode()))) - await cloop.sock_send(conn, v.encode()) + await cloop.sock_sendall(conn, struct.pack('!BIH', 1, totlen, len(cook.encode()))) + await cloop.sock_sendall(conn, cook.encode()) + await cloop.sock_sendall(conn, struct.pack('!H', len(v.encode()))) + await cloop.sock_sendall(conn, v.encode()) for proto in protos: - await cloop.sock_send(conn, struct.pack('!BI', 2, len(proto.encode()))) - await cloop.sock_send(conn, proto.encode()) - await cloop.sock_send(conn, struct.pack('!BI', 3, len(fprint))) - await cloop.sock_send(conn, fprint) + await cloop.sock_sendall(conn, struct.pack('!BI', 2, len(proto.encode()))) + await cloop.sock_sendall(conn, proto.encode()) + await cloop.sock_sendall(conn, struct.pack('!BI', 3, len(fprint))) + await cloop.sock_sendall(conn, fprint) if passwd: - await cloop.sock_send(conn, struct.pack('!BI', 5, len(passwd.encode()[:8]))) - await cloop.sock_send(conn, passwd.encode()[:8]) - await cloop.sock_send(conn, b'\xff') + await cloop.sock_sendall(conn, struct.pack('!BI', 5, len(passwd.encode()[:8]))) + await cloop.sock_sendall(conn, passwd.encode()[:8]) + await cloop.sock_sendall(conn, b'\xff') -def recv_exact(conn, n): +async def recv_exact(conn, n): #TODO:asyncmerge: review recv_exact usage + cloop = asyncio.get_event_loop() retdata = b'' while len(retdata) < n: - currdata = conn.recv(n - len(retdata)) + currdata = await cloop.sock_recv(conn, n - len(retdata)) if not currdata: raise Exception("Error receiving") retdata += currdata @@ -220,15 +222,15 @@ async def evaluate_request(conn): pid, uid, gid = struct.unpack('iII', creds) if uid != os.getuid(): return - rqcode, fieldlen = struct.unpack('!BI', recv_exact(conn, 5)) - authtoken = recv_exact(conn, fieldlen).decode() + rqcode, fieldlen = struct.unpack('!BI', await recv_exact(conn, 5)) + authtoken = (await recv_exact(conn, fieldlen)).decode() if authtoken != _vinztoken: return if rqcode == 2: # disconnect notification - fieldlen = struct.unpack('!I', recv_exact(conn, 4))[0] - sessionid = recv_exact(conn, fieldlen).decode() - close_session(sessionid) - conn.recv(1) # digest 0xff + fieldlen = struct.unpack('!I', await recv_exact(conn, 4))[0] + sessionid = (await recv_exact(conn, fieldlen)).decode() + await close_session(sessionid) + await cloop.sock_recv(conn, 1) # digest 0xff # if rqcode == 3: # new form connection request # this will generalize things, to allow describing # arbitrary cookies and subprotocols @@ -281,26 +283,28 @@ async def monitor_requests(): a.bind('/var/run/confluent/vinz/approval') os.chmod('/var/run/confluent/vinz/approval', 0o600) a.listen(8) + a.setblocking(0) while True: conn, addr = await cloop.sock_accept(a) - util.spawn(evaluate_request(conn)) + tasks.spawn_task(evaluate_request(conn)) async def request_session(nodename): - assure_vinz() + await assure_vinz() cloop = asyncio.get_event_loop() a = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - a.connect('/var/run/confluent/vinz/control') + a.setblocking(0) + await cloop.sock_connect(a, '/var/run/confluent/vinz/control') nodename = nodename.encode() - await cloop.sock_send(a, struct.pack('!BI', 1, len(nodename))) - await cloop.sock_send(a, nodename) - await cloop.sock_send(a, b'\xff') + await cloop.sock_sendall(a, struct.pack('!BI', 1, len(nodename))) + await cloop.sock_sendall(a, nodename) + await cloop.sock_sendall(a, b'\xff') rsp = await cloop.sock_recv(a, 1) retcode = struct.unpack('!B', rsp)[0] if retcode != 1: raise Exception("Bad return code") rsp = await cloop.sock_recv(a, 4) nlen = struct.unpack('!I', rsp)[0] - sockname = await cloop.sock_recv(a, nlen).decode('utf8') + sockname = (await cloop.sock_recv(a, nlen)).decode('utf8') retcode = await cloop.sock_recv(a, 1) if retcode != b'\xff': raise Exception("Unrecognized response")