2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-03-29 05:33:33 +00:00

Refactor functions to be a bit more readable

This commit is contained in:
Jarrod Johnson
2026-03-17 15:40:01 -04:00
parent e1a6a1c9bf
commit 2ab85bb687

View File

@@ -33,6 +33,27 @@ import tempfile
anspypath = None
running_status = {}
async def recv_exact(conn, size):
data = b''
while len(data) < size:
chunk = await asyncio.get_event_loop().sock_recv(conn, size - len(data))
if not chunk:
return None
data += chunk
return data
async def recv_message(conn):
header = await recv_exact(conn, 8)
if not header:
return None
sz = struct.unpack('=q', header)[0]
msg = await recv_exact(conn, sz)
if not msg:
return None
return msgpack.unpackb(msg, raw=False)
class PlayRunner(object):
def __init__(self, playfiles, nodes):
self.stderr = ''
@@ -86,7 +107,6 @@ class PlayRunner(object):
'complete': self.complete,
'results': self.get_available_results()
}
async def _really_run_playbooks(self):
global anspypath
try:
@@ -121,18 +141,19 @@ class PlayRunner(object):
conn, _ = await asyncio.get_event_loop().sock_accept(feedback)
conn.setblocking(False)
try:
while True:
header = await asyncio.get_event_loop().sock_recv(conn, 8)
if not header:
break
sz = struct.unpack('=q', header)[0]
msg = await asyncio.get_event_loop().sock_recv(conn, sz)
result = msgpack.unpackb(msg, raw=False)
self.results.append(result)
result = True
while result is not None:
result = await recv_message(conn)
if result:
self.results.append(result)
if worker.returncode is not None:
break
while result:
result = await recv_message(conn)
if result is not None:
self.results.append(result)
finally:
conn.close()
await asyncio.get_event_loopy().sock_close(conn)
stdout, stder = await worker.communicate()
self.stderr += stder.decode('utf8')
self.stdout += stdout.decode('utf8')