From a6e7d016ea751e778115ef8b4e0168cca23ca5a0 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 13 Mar 2026 14:57:59 -0400 Subject: [PATCH] Restore ansible running in async, complete with recent changes from master --- confluent_server/confluent/runansible.py | 49 +++++++++++++++++------- confluent_server/confluent/sshutil.py | 1 + 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/confluent_server/confluent/runansible.py b/confluent_server/confluent/runansible.py index 8285ccab..3435ddcf 100644 --- a/confluent_server/confluent/runansible.py +++ b/confluent_server/confluent/runansible.py @@ -101,20 +101,41 @@ class PlayRunner(object): if not mypath: mypath = sys.executable targnodes = ','.join(self.nodes) - for playfilename in self.playfiles: - worker = await asyncio.create_subprocess_exec( - mypath, __file__, targnodes, playfilename, - stdin=asyncio.subprocess.DEVNULL, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE) - stdout, stder = await worker.communicate() - self.stderr += stder.decode('utf8') - current = memoryview(stdout) - while len(current): - sz = struct.unpack('=q', current[:8])[0] - result = msgpack.unpackb(current[8:8+sz], raw=False) - self.results.append(result) - current = current[8+sz:] + with tempfile.TemporaryDirectory() as tmpdir: + feedback = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + suffix = base64.urlsafe_b64encode(os.urandom(6)).decode('utf8') + sockpath = os.path.join(tmpdir, 'feedback-' + suffix) + feedback.bind(sockpath) + feedback.listen(1) + feedback.setblocking(False) + localenv = os.environ.copy() + localenv['FEEDBACK_SOCK'] = sockpath + with feedback: + for playfilename in self.playfiles: + worker = await asyncio.create_subprocess_exec( + mypath, __file__, targnodes, playfilename, + stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=localenv) + conn, _ = await asyncio.get_event_loop().sock_accept(feedback) + conn.setblocking(False) + try: + while True: + header = await asyncio.get_event_loop().sock_recv(conn, 8) + if not header: + break + sz = struct.unpack('=q', header)[0] + msg = await asyncio.get_event_loop().sock_recv(conn, sz) + result = msgpack.unpackb(msg, raw=False) + self.results.append(result) + if worker.returncode is not None: + break + finally: + conn.close() + stdout, stder = await worker.communicate() + self.stderr += stder.decode('utf8') + self.stdout += stdout.decode('utf8') finally: self.complete = True diff --git a/confluent_server/confluent/sshutil.py b/confluent_server/confluent/sshutil.py index 6f21dc8e..eaa4bb60 100644 --- a/confluent_server/confluent/sshutil.py +++ b/confluent_server/confluent/sshutil.py @@ -10,6 +10,7 @@ import os import shutil import subprocess import tempfile +import socket agent_pid = None ready_keys = {}