From 2ab85bb687b30edc70ff830b655dcdca8294e5c9 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 17 Mar 2026 15:40:01 -0400 Subject: [PATCH] Refactor functions to be a bit more readable --- confluent_server/confluent/runansible.py | 41 ++++++++++++++++++------ 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/confluent_server/confluent/runansible.py b/confluent_server/confluent/runansible.py index 3435ddcf..a1f4e75e 100644 --- a/confluent_server/confluent/runansible.py +++ b/confluent_server/confluent/runansible.py @@ -33,6 +33,27 @@ import tempfile anspypath = None running_status = {} + +async def recv_exact(conn, size): + data = b'' + while len(data) < size: + chunk = await asyncio.get_event_loop().sock_recv(conn, size - len(data)) + if not chunk: + return None + data += chunk + return data + +async def recv_message(conn): + header = await recv_exact(conn, 8) + if not header: + return None + sz = struct.unpack('=q', header)[0] + msg = await recv_exact(conn, sz) + if not msg: + return None + return msgpack.unpackb(msg, raw=False) + + class PlayRunner(object): def __init__(self, playfiles, nodes): self.stderr = '' @@ -86,7 +107,6 @@ class PlayRunner(object): 'complete': self.complete, 'results': self.get_available_results() } - async def _really_run_playbooks(self): global anspypath try: @@ -121,18 +141,19 @@ class PlayRunner(object): conn, _ = await asyncio.get_event_loop().sock_accept(feedback) conn.setblocking(False) try: - while True: - header = await asyncio.get_event_loop().sock_recv(conn, 8) - if not header: - break - sz = struct.unpack('=q', header)[0] - msg = await asyncio.get_event_loop().sock_recv(conn, sz) - result = msgpack.unpackb(msg, raw=False) - self.results.append(result) + result = True + while result is not None: + result = await recv_message(conn) + if result: + self.results.append(result) if worker.returncode is not None: break + while result: + result = await recv_message(conn) + if result is not None: + self.results.append(result) finally: - conn.close() + await asyncio.get_event_loopy().sock_close(conn) stdout, stder = await worker.communicate() self.stderr += stder.decode('utf8') self.stdout += stdout.decode('utf8')