From f9e898a46a09e96c770fda8da777d9425d747954 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 4 Feb 2026 15:34:38 -0500 Subject: [PATCH] Asyncio fixes Fix ability to receive file descriptions from a unix domain client Correct invocations to clearbuffer in consoleserver --- confluent_client/confluent/asynctlvdata.py | 4 ++-- confluent_server/confluent/consoleserver.py | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/confluent_client/confluent/asynctlvdata.py b/confluent_client/confluent/asynctlvdata.py index 7d05ecd8..9f76c397 100644 --- a/confluent_client/confluent/asynctlvdata.py +++ b/confluent_client/confluent/asynctlvdata.py @@ -133,7 +133,7 @@ def _recvmsg(loop, fut, sock, msglen, maxfds, rfd): msglen, socket.CMSG_LEN(maxfds * fds.itemsize)) except (BlockingIOError, InterruptedError): fd = sock.fileno() - loop.add_reader(fd, _recvmsg, loop, fut, sock, fd) + loop.add_reader(fd, _recvmsg, loop, fut, sock, msglen, maxfds, fd) except Exception as exc: fut.set_exception(exc) else: @@ -144,7 +144,7 @@ def _recvmsg(loop, fut, sock, msglen, maxfds, rfd): fds.frombytes( cmsg_data[ :len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) - fut.set_result(msglen, list(fds)) + fut.set_result((msg, list(fds))) def recv_fds(sock, msglen, maxfds): diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index 6fe9936b..a689fb62 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -156,7 +156,6 @@ class ConsoleHandler(object): # wall clock has gone backwards, use current time as best # guess self.lasttime = util.monotonic_time() - self.clearbuffer() self.reconnect = None self.users = {} self._attribwatcher = None @@ -169,6 +168,7 @@ class ConsoleHandler(object): tasks.spawn(self.ondemand_init()) async def ondemand_init(self): + await self.clearbuffer() await self.check_isondemand() if not self._isondemand: self.connectstate = 'connecting' @@ -361,7 +361,7 @@ class ConsoleHandler(object): else: print(traceback.format_exc()) if not isinstance(self._console, conapi.Console): - self.clearbuffer() + await self.clearbuffer() self.connectstate = 'unconnected' self.error = 'misconfigured' await self._send_rcpts({'connectstate': self.connectstate, @@ -374,7 +374,7 @@ class ConsoleHandler(object): return if self.clearerror: self.clearerror = False - self.clearbuffer() + await self.clearbuffer() await self._send_rcpts(b'\x1bc\x1b[2J\x1b[1;1H') self.send_break = self._console.send_break self.resize = self._console.resize @@ -402,7 +402,7 @@ class ConsoleHandler(object): self.reconnect = tasks.spawn_task_after(retrytime, self._connect) return except (exc.TargetEndpointUnreachable, socket.gaierror) as se: - self.clearbuffer() + await self.clearbuffer() self.error = 'unreachable' self.connectstate = 'unconnected' await self._send_rcpts({'connectstate': self.connectstate, @@ -412,7 +412,7 @@ class ConsoleHandler(object): self.reconnect = tasks.spawn_task_after(retrytime, self._connect) return except Exception: - self.clearbuffer() + await self.clearbuffer() _tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace) self.error = 'unknown' @@ -444,7 +444,7 @@ class ConsoleHandler(object): if self._isalive: self._connect() else: - self.clearbuffer() + await self.clearbuffer() async def close(self): self._isalive = False