mirror of
https://github.com/xcat2/confluent.git
synced 2026-03-22 10:09:17 +00:00
Restore ansible running in async, complete with recent changes from master
This commit is contained in:
@@ -101,20 +101,41 @@ class PlayRunner(object):
|
||||
if not mypath:
|
||||
mypath = sys.executable
|
||||
targnodes = ','.join(self.nodes)
|
||||
for playfilename in self.playfiles:
|
||||
worker = await asyncio.create_subprocess_exec(
|
||||
mypath, __file__, targnodes, playfilename,
|
||||
stdin=asyncio.subprocess.DEVNULL,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE)
|
||||
stdout, stder = await worker.communicate()
|
||||
self.stderr += stder.decode('utf8')
|
||||
current = memoryview(stdout)
|
||||
while len(current):
|
||||
sz = struct.unpack('=q', current[:8])[0]
|
||||
result = msgpack.unpackb(current[8:8+sz], raw=False)
|
||||
self.results.append(result)
|
||||
current = current[8+sz:]
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
feedback = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
suffix = base64.urlsafe_b64encode(os.urandom(6)).decode('utf8')
|
||||
sockpath = os.path.join(tmpdir, 'feedback-' + suffix)
|
||||
feedback.bind(sockpath)
|
||||
feedback.listen(1)
|
||||
feedback.setblocking(False)
|
||||
localenv = os.environ.copy()
|
||||
localenv['FEEDBACK_SOCK'] = sockpath
|
||||
with feedback:
|
||||
for playfilename in self.playfiles:
|
||||
worker = await asyncio.create_subprocess_exec(
|
||||
mypath, __file__, targnodes, playfilename,
|
||||
stdin=asyncio.subprocess.DEVNULL,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=localenv)
|
||||
conn, _ = await asyncio.get_event_loop().sock_accept(feedback)
|
||||
conn.setblocking(False)
|
||||
try:
|
||||
while True:
|
||||
header = await asyncio.get_event_loop().sock_recv(conn, 8)
|
||||
if not header:
|
||||
break
|
||||
sz = struct.unpack('=q', header)[0]
|
||||
msg = await asyncio.get_event_loop().sock_recv(conn, sz)
|
||||
result = msgpack.unpackb(msg, raw=False)
|
||||
self.results.append(result)
|
||||
if worker.returncode is not None:
|
||||
break
|
||||
finally:
|
||||
conn.close()
|
||||
stdout, stder = await worker.communicate()
|
||||
self.stderr += stder.decode('utf8')
|
||||
self.stdout += stdout.decode('utf8')
|
||||
finally:
|
||||
self.complete = True
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
import socket
|
||||
|
||||
agent_pid = None
|
||||
ready_keys = {}
|
||||
|
||||
Reference in New Issue
Block a user