From a003ad6e2c2e6d669373f95ff797210fb9f6a4bd Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 4 Feb 2026 10:02:17 -0500 Subject: [PATCH] Address asyncio changes for consoleserver --- confluent_server/confluent/consoleserver.py | 39 ++++++++++++--------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index fcb4b64e..6fe9936b 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -75,17 +75,22 @@ async def get_buffer_output(nodename): return bytes(outdata[:-1]) -def send_output(nodename, output): +async def send_output(nodename, output): if not isinstance(nodename, bytes): nodename = nodename.encode('utf8') out = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) out.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1) out.connect("\x00confluent-vtbuffer") - out.send(struct.pack('I', len(nodename) | (1 << 29))) - out.send(nodename) + rdr, writer = await asyncio.open_unix_connection(sock=out) + hdr = struct.pack('I', len(nodename) | (1 << 29)) + writer.write(hdr) + writer.write(nodename) for chunk in chunk_output(output, 8192): - out.send(struct.pack('I', len(chunk) | (2 << 29))) - out.send(chunk) + writer.write(struct.pack('I', len(chunk) | (2 << 29))) + writer.write(chunk) + await writer.drain() + writer.close() + await writer.wait_closed() def _utf8_normalize(data, decoder): # first we give the stateful decoder a crack at the byte stream, @@ -182,7 +187,7 @@ class ConsoleHandler(object): retrytime = 120 return retrytime + (retrytime * random.random()) - def feedbuffer(self, data): + async def feedbuffer(self, data): if not isinstance(data, bytes): data = data.encode('utf-8') if self.pendingbytes is not None: @@ -190,11 +195,11 @@ class ConsoleHandler(object): self.pendingbytes = b'' nodeid = self.termprefix + self.node try: - send_output(nodeid, data) + await send_output(nodeid, data) data = self.pendingbytes self.pendingbytes = None if data: - send_output(nodeid, data) + await send_output(nodeid, data) except Exception: _tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace) @@ -267,7 +272,7 @@ class ConsoleHandler(object): if onlylogging: return else: - self._ondemand() + await self._ondemand() if logvalue in ('none', 'memory'): self._dologging = False if not self._isondemand or self.livesessions: @@ -287,8 +292,8 @@ class ConsoleHandler(object): else: self._console.ping() - def clearbuffer(self): - self.feedbuffer( + async def clearbuffer(self): + await self.feedbuffer( '\x1bc[No data has been received from the remote console since ' \ 'connecting. This could\r\nbe due to having the console.logging ' \ 'attribute set to none or interactive,\r\nserial console not ' \ @@ -305,7 +310,7 @@ class ConsoleHandler(object): self.connectionthread.cancel() self.connectionthread = None # clear the terminal buffer when disconnected - self.clearbuffer() + await self.clearbuffer() if self._console: self.log( logdata='console disconnected', ltype=log.DataTypes.event, @@ -315,10 +320,10 @@ class ConsoleHandler(object): self.connectstate = 'unconnected' await self._send_rcpts({'connectstate': self.connectstate}) - def _ondemand(self): + async def _ondemand(self): self._isondemand = True if not self.livesessions and self._console: - self._disconnect() + await self._disconnect() def _connect(self): if not self._is_local: @@ -497,7 +502,7 @@ class ConsoleHandler(object): event=log.Events.clientdisconnect, eventdata=edata) await self._send_rcpts({'clientcount': len(self.livesessions)}) if self._isondemand and not self.livesessions: - self._disconnect() + await self._disconnect() async def reopen(self): @@ -520,12 +525,12 @@ class ConsoleHandler(object): if self.clearpending or self.clearerror: self.clearpending = False self.clearerror = False - self.feedbuffer(b'\x1bc\x1b[2J\x1b[1;1H') + await self.feedbuffer(b'\x1bc\x1b[2J\x1b[1;1H') await self._send_rcpts(b'\x1bc\x1b[2J\x1b[1;1H') await self._send_rcpts(_utf8_normalize(data, self.utf8decoder)) self.log(data, eventdata=eventdata) self.lasttime = util.monotonic_time() - self.feedbuffer(data) + await self.feedbuffer(data) async def _send_rcpts(self, data):