From cbb52739d32eec46968c3de8106bddd60e6aa452 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 30 May 2024 13:59:14 -0400 Subject: [PATCH] Fix a number of issues with async rework Have util retain tasks that are 'fire and forget', to avoid garbage collection trying to delete the background tasks. Move some utilities explicitly over to asynclient/asynctlvdata that had previously been reworked. Implement terminal resize in new asyncssh backend. --- confluent_client/bin/confetty | 50 +++++++++++++------ confluent_client/bin/nodeattrib | 2 +- confluent_client/bin/nodelist | 2 +- confluent_server/confluent/consoleserver.py | 50 +++++++++---------- confluent_server/confluent/core.py | 6 +-- .../confluent/plugins/shell/ssh.py | 8 +-- confluent_server/confluent/shellserver.py | 10 ++-- confluent_server/confluent/sockapi.py | 18 ++++--- confluent_server/confluent/util.py | 17 ++++++- 9 files changed, 101 insertions(+), 62 deletions(-) diff --git a/confluent_client/bin/confetty b/confluent_client/bin/confetty index 44010f8d..011fdf9c 100755 --- a/confluent_client/bin/confetty +++ b/confluent_client/bin/confetty @@ -130,6 +130,15 @@ def print_help(): #common with the api document +def writeout(data): + while True: + try: + select.select((), (sys.stdout,), ()) + sys.stdout.write(data) + break + except BlockingIOError: + continue + def updatestatus(stateinfo={}): global powerstate, powertime, clearpowermessage status = consolename @@ -149,10 +158,10 @@ def updatestatus(stateinfo={}): if 'state' in stateinfo: # currently only read power means anything newpowerstate = stateinfo['state']['value'] if newpowerstate != powerstate and newpowerstate == 'off': - sys.stdout.write("\x1b[2J\x1b[;H[powered off]\r\n") + writeout("\x1b[2J\x1b[;H[powered off]\r\n") clearpowermessage = True if newpowerstate == 'on' and clearpowermessage: - sys.stdout.write("\x1b[2J\x1b[;H") + writeout("\x1b[2J\x1b[;H") clearpowermessage = False powerstate = newpowerstate if 'clientcount' in laststate and laststate['clientcount'] != 1: @@ -170,7 +179,7 @@ def updatestatus(stateinfo={}): if info: status += ' [' + ','.join(info) + ']' if os.environ.get('TERM', '') not in ('linux'): - sys.stdout.write('\x1b]0;console: %s\x07' % status) + writeout('\x1b]0;console: %s\x07' % status) sys.stdout.flush() @@ -450,7 +459,7 @@ def do_command(command, server): print_result(res) elif argv[0] == 'start': targpath = fullpath_target(argv[1]) - nodename = targpath.split('/')[-3] + nodename = targpath.split('/')[2] currconsole = targpath startrequest = {'operation': 'start', 'path': targpath, 'parameters': {}} @@ -657,9 +666,13 @@ def get_session_node(shellargs): return shellargs[0] if len(shellargs) == 2 and shellargs[0] == 'start': args = [s for s in shellargs[1].split('/') if s] - if len(args) == 4 and args[0] == 'nodes' and args[2] == 'console' and \ + if len(args) == 4 and args[0] == 'nodes': + if args[2] == 'console' and \ args[3] == 'session': - return args[1] + return args[1] + if args[2] == 'shell' and \ + args[3] == 'sessions': + return args[1] return None @@ -920,7 +933,7 @@ def main(): session_node = get_session_node(shellargs) if session_node is not None: consoleonly = True - do_command("start /nodes/%s/console/session" % session_node, netserver) + do_command(shellargs, netserver) doexit = True elif shellargs: do_command(shellargs, netserver) @@ -980,14 +993,21 @@ def consume_termdata(fh, bufferonly=False): clearpowermessage = False if bufferonly: return data - try: - sys.stdout.write(data) - except UnicodeEncodeError: - sys.stdout.buffer.write(data.encode('utf8')) - except IOError: # Some times circumstances are bad - # resort to byte at a time... - for d in data: - sys.stdout.write(d) + data = data.encode('utf8') + written = False + while not written: + select.select((), (sys.stdout,), ()) + try: + sys.stdout.buffer.write(data) + written = True + except BlockingIOError: + continue + except IOError: # Some times circumstances are bad + # resort to byte at a time... + raise + for d in data: + sys.stdout.write(d) + written = True now = time.time() if ('showtime' not in laststate or (now // 60) != laststate['showtime'] // 60): diff --git a/confluent_client/bin/nodeattrib b/confluent_client/bin/nodeattrib index 4a8aa2a0..f1751189 100755 --- a/confluent_client/bin/nodeattrib +++ b/confluent_client/bin/nodeattrib @@ -35,7 +35,7 @@ path = os.path.realpath(os.path.join(path, '..', 'lib', 'python')) if path.startswith('/opt'): sys.path.append(path) -import confluent.client as client +import confluent.asynclient as client async def main(): argparser = optparse.OptionParser( diff --git a/confluent_client/bin/nodelist b/confluent_client/bin/nodelist index e4f846ea..8938924f 100755 --- a/confluent_client/bin/nodelist +++ b/confluent_client/bin/nodelist @@ -34,7 +34,7 @@ path = os.path.realpath(os.path.join(path, '..', 'lib', 'python')) if path.startswith('/opt'): sys.path.append(path) -import confluent.client as client +import confluent.asynclient as client async def main(): argparser = optparse.OptionParser( diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index a2d5d8a5..6e27771e 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -30,7 +30,7 @@ import confluent.exceptions as exc import confluent.interface.console as conapi import confluent.log as log import confluent.core as plugin -import confluent.tlvdata as tlvdata +import confluent.asynctlvdata as tlvdata import confluent.util as util import eventlet import eventlet.event @@ -554,12 +554,12 @@ class ConsoleHandler(object): retdata = await get_buffer_output(nodeid) return retdata, connstate - def write(self, data): + async def write(self, data): if self.connectstate == 'connected': try: if isinstance(data, str) and not isinstance(data, bytes): data = data.encode('utf-8') - self._console.write(data) + await self._console.write(data) except Exception: _tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace) @@ -661,7 +661,7 @@ class ProxyConsole(object): while data: self.data_handler(data) data = await tlvdata.recv(self.remote) - self.remote.close() + self.remote[1].close() def get_buffer_age(self): # the server sends a buffer age if appropriate, no need to handle @@ -673,17 +673,18 @@ class ProxyConsole(object): self.skipreplay = False return b'' - def write(self, data): + async def write(self, data): # Relay data to the collective manager try: - tlvdata.send(self.remote, data) - except Exception: + await tlvdata.send(self.remote, data) + except Exception as e: + print(repr(e)) if self.clisession: - self.clisession.detach() + await self.clisession.detach() self.clisession = None - def attachsession(self, session): + async def attachsession(self, session): self.clisession = session self.data_handler = session.data_handler termreq = { @@ -700,30 +701,25 @@ class ProxyConsole(object): }, } try: - remote = socket.create_connection((self.managerinfo['address'], 13001)) - remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, - keyfile='/etc/confluent/privkey.pem', - certfile='/etc/confluent/srvcert.pem') - if not util.cert_matches(self.managerinfo['fingerprint'], - remote.getpeercert(binary_form=True)): - raise Exception('Invalid peer certificate') - except Exception: - #await asyncio.sleep(3) + remote = await collective.connect_to_collective(None, self.managerinfo['address']) + except Exception as e: + print(repr(e)) + await asyncio.sleep(3) if self.clisession: self.clisession.detach() - self.detachsession(None) + await self.detachsession(None) return - tlvdata.recv(remote) - tlvdata.recv(remote) - tlvdata.send(remote, termreq) + await tlvdata.recv(remote) + await tlvdata.recv(remote) + await tlvdata.send(remote, termreq) self.remote = remote util.spawn(self.relay_data()) - def detachsession(self, session): + async def detachsession(self, session): # we will disappear, so just let that happen... if self.remote: try: - tlvdata.send(self.remote, {'operation': 'stop'}) + await tlvdata.send(self.remote, {'operation': 'stop'}) except Exception: pass self.clisession = None @@ -840,16 +836,16 @@ class ConsoleSession(object): self._evt = None self.reghdl = None - def detach(self): + async def detach(self): """Handler for the console handler to detach so it can reattach, currently to facilitate changing from one collective.manager to another :return: """ - self.conshdl.detachsession(self) + await self.conshdl.detachsession(self) self.connect_session() - self.conshdl.attachsession(self) + await self.conshdl.attachsession(self) self.write = self.conshdl.write def got_data(self, data): diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index be67b438..f34c2554 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -991,7 +991,7 @@ async def handle_node_request(configmanager, inputdata, operation, try: nodeorrange = pathcomponents[1] if not isnoderange and not configmanager.is_node(nodeorrange): - raise exc.NotFoundException("Invalid Node") + raise exc.NotFoundException(f'Invalid Node: {repr(pathcomponents)}') if isnoderange and not (len(pathcomponents) == 3 and pathcomponents[2] == 'abbreviate'): try: @@ -1116,7 +1116,7 @@ async def handle_node_request(configmanager, inputdata, operation, numworkers = 0 for hfunc in nodesbyhandler: numworkers += 1 - asyncio.create_task(addtoqueue(passvalues, hfunc, {'nodes': nodesbyhandler[hfunc], + util.spawn(addtoqueue(passvalues, hfunc, {'nodes': nodesbyhandler[hfunc], 'element': pathcomponents, 'configmanager': configmanager, 'inputdata': _get_input_data(_plugin, pathcomponents, @@ -1124,7 +1124,7 @@ async def handle_node_request(configmanager, inputdata, operation, isnoderange, configmanager)})) for manager in nodesbymanager: numworkers += 1 - asyncio.create_task(addtoqueue(passvalues, dispatch_request, { + util.spawn(addtoqueue(passvalues, dispatch_request, { 'nodes': nodesbymanager[manager], 'manager': manager, 'element': pathcomponents, 'configmanager': configmanager, 'inputdata': inputdata, 'operation': operation, 'isnoderange': isnoderange})) diff --git a/confluent_server/confluent/plugins/shell/ssh.py b/confluent_server/confluent/plugins/shell/ssh.py index d7106e50..eca5e36b 100644 --- a/confluent_server/confluent/plugins/shell/ssh.py +++ b/confluent_server/confluent/plugins/shell/ssh.py @@ -90,7 +90,7 @@ class SshShell(conapi.Console): if not self.connected: return # asyncssh channel has change_terminal_size, hurray - self.shell.resize_pty(width=width, height=height) + self.shell[0].channel.change_terminal_size(width=width, height=height) async def recvdata(self): while self.connected: @@ -142,8 +142,8 @@ class SshShell(conapi.Console): self.datacallback('\r\nEnter "disconnect" or "accept": ') return except Exception as e: - self.ssh.close() - self.ssh.close() + if self.ssh: + self.ssh.close() self.inputmode = 0 self.username = b'' self.password = b'' @@ -158,7 +158,7 @@ class SshShell(conapi.Console): # height=self.height) self.rxthread = util.spawn(self.recvdata()) - def write(self, data): + async def write(self, data): if self.inputmode == -2: self.datacallback(conapi.ConsoleEvent.Disconnect) return diff --git a/confluent_server/confluent/shellserver.py b/confluent_server/confluent/shellserver.py index 4293309c..6e66c7b2 100644 --- a/confluent_server/confluent/shellserver.py +++ b/confluent_server/confluent/shellserver.py @@ -22,6 +22,7 @@ import confluent.consoleserver as consoleserver import confluent.exceptions as exc import confluent.messages as msg +import confluent.util as util activesessions = {} @@ -47,9 +48,12 @@ class _ShellHandler(consoleserver.ConsoleHandler): def _got_disconnected(self): self.connectstate = 'closed' - self._send_rcpts({'connectstate': self.connectstate}) + util.spawn(self._bgdisconnect()) + + async def _bgdisconnect(self): + await self._send_rcpts({'connectstate': self.connectstate}) for session in list(self.livesessions): - session.destroy() + await session.destroy() @@ -119,7 +123,7 @@ class ShellSession(consoleserver.ConsoleSession): async def destroy(self): try: - activesessions[(self.configmanager.tenant, self.node, + await activesessions[(self.configmanager.tenant, self.node, self.username)][self.sessionid].close() del activesessions[(self.configmanager.tenant, self.node, self.username)][self.sessionid] diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 6f278e6c..9bce9d42 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -316,10 +316,13 @@ async def term_interact(authdata, authname, ccons, cfm, connection, consession, if bufferage is not False: send_data(connection, {'bufferage': bufferage}) while consession is not None: - data = await tlvdata.recv(connection) + try: + data = await tlvdata.recv(connection) + except Exception: + data = None if type(data) == dict: if data['operation'] == 'stop': - consession.destroy() + await consession.destroy() break elif data['operation'] == 'break': consession.send_break() @@ -350,10 +353,12 @@ async def term_interact(authdata, authname, ccons, cfm, connection, consession, 'error': 'Unexpected error - ' + str(e)}) await send_data(connection, {'_requestdone': 1}) continue - if not data: - consession.destroy() + if not consession: break - consession.write(data) + if not data: + await consession.destroy() + break + await consession.write(data) connection.close() @@ -484,7 +489,8 @@ async def _unixdomainhandler(): except KeyError: cnn.close() return - asyncio.create_task(sessionhdl(cnn, authname, skipauth)) + util.spawn(sessionhdl(cnn, authname, skipauth)) + #asyncio.create_task(sessionhdl(cnn, authname, skipauth)) class SockApi(object): diff --git a/confluent_server/confluent/util.py b/confluent_server/confluent/util.py index a8a9f59f..4ba4c7eb 100644 --- a/confluent_server/confluent/util.py +++ b/confluent_server/confluent/util.py @@ -28,6 +28,7 @@ import ssl import struct import eventlet.green.subprocess as subprocess import asyncio +import random def mkdirp(path, mode=0o777): @@ -48,12 +49,24 @@ def spawn_after(sleeptime, func, *args): raise Exception('tf') return spawn(_sleep_and_run(sleeptime, func, args)) +tsks = {} def spawn(coro): + tskid = random.random() + while tskid in tsks: + tskid = random.random() + tsks[tskid] = 1 try: - return asyncio.create_task(coro) + tsks[tskid] = asyncio.create_task(_run(coro, tskid)) except AttributeError: - return asyncio.get_event_loop().create_task(coro) + tsks[tskid] = asyncio.get_event_loop().create_task(_run(coro, tskid)) + return tsks[tskid] + +async def _run(coro, taskid): + ret = await coro + del tsks[taskid] + return ret + def run(cmd): process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)