mirror of
https://github.com/xcat2/confluent.git
synced 2026-03-22 01:59:17 +00:00
Fix ansible running
Have results available as they happen change away from stdout, to avoid being stepped on by ansible modules that print to that
This commit is contained in:
@@ -21,12 +21,16 @@ try:
|
||||
import eventlet.green.subprocess as subprocess
|
||||
except ImportError:
|
||||
pass
|
||||
import base64
|
||||
import eventlet.green.select as select
|
||||
import shutil
|
||||
import json
|
||||
import socket
|
||||
import msgpack
|
||||
import os
|
||||
import struct
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
anspypath = None
|
||||
running_status = {}
|
||||
@@ -38,6 +42,7 @@ class PlayRunner(object):
|
||||
self.worker = None
|
||||
self.results = []
|
||||
self.complete = False
|
||||
self.stdout = ''
|
||||
|
||||
def _start_playbooks(self):
|
||||
self.worker = eventlet.spawn(self._really_run_playbooks)
|
||||
@@ -49,6 +54,7 @@ class PlayRunner(object):
|
||||
|
||||
def dump_text(self):
|
||||
stderr = self.stderr
|
||||
stdout = self.stdout
|
||||
retinfo = self.dump_dict()
|
||||
textout = ''
|
||||
for result in retinfo['results']:
|
||||
@@ -65,6 +71,9 @@ class PlayRunner(object):
|
||||
else:
|
||||
textout += result['state'] + '\n'
|
||||
textout += '\n'
|
||||
if stdout:
|
||||
textout += "OUTPUT **********************************\n"
|
||||
textout += stdout
|
||||
if stderr:
|
||||
textout += "ERRORS **********************************\n"
|
||||
textout += stderr
|
||||
@@ -92,21 +101,54 @@ class PlayRunner(object):
|
||||
mypath = anspypath
|
||||
if not mypath:
|
||||
mypath = sys.executable
|
||||
with open(os.devnull, 'w+') as devnull:
|
||||
targnodes = ','.join(self.nodes)
|
||||
for playfilename in self.playfiles:
|
||||
worker = subprocess.Popen(
|
||||
[mypath, __file__, targnodes, playfilename],
|
||||
stdin=devnull, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
stdout, stder = worker.communicate()
|
||||
self.stderr += stder.decode('utf8')
|
||||
current = memoryview(stdout)
|
||||
while len(current):
|
||||
sz = struct.unpack('=q', current[:8])[0]
|
||||
result = msgpack.unpackb(current[8:8+sz], raw=False)
|
||||
self.results.append(result)
|
||||
current = current[8+sz:]
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
feedback = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
suffix = base64.urlsafe_b64encode(os.urandom(6)).decode('ascii')
|
||||
sockpath = os.path.join(tmpdir, 'feedback.sock.' + suffix)
|
||||
localenv = os.environ.copy()
|
||||
localenv['FEEDBACK_SOCK'] = sockpath
|
||||
feedback.bind(sockpath)
|
||||
feedback.listen(1)
|
||||
with feedback:
|
||||
with open(os.devnull, 'w+') as devnull:
|
||||
targnodes = ','.join(self.nodes)
|
||||
for playfilename in self.playfiles:
|
||||
worker = subprocess.Popen(
|
||||
[mypath, __file__, targnodes, playfilename],
|
||||
stdin=devnull, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE, env=localenv)
|
||||
rlist, _, _ = select.select([feedback], [], [], 10)
|
||||
if not rlist:
|
||||
raise RuntimeError(
|
||||
f"Timed out waiting for feedback socket connection for playbook '{playfilename}'"
|
||||
)
|
||||
|
||||
def _recv_results(sock, timeout=0.1):
|
||||
while select.select([sock], [], [], timeout)[0]:
|
||||
hdr = b''
|
||||
while len(hdr) < 8:
|
||||
chunk = sock.recv(8 - len(hdr))
|
||||
if not chunk:
|
||||
if not hdr:
|
||||
return
|
||||
raise RuntimeError("Socket closed while receiving message header")
|
||||
hdr += chunk
|
||||
msglen = struct.unpack('=q', hdr)[0]
|
||||
msg = b''
|
||||
while len(msg) < msglen:
|
||||
chunk = sock.recv(msglen - len(msg))
|
||||
if not chunk:
|
||||
raise RuntimeError("Socket closed while receiving message")
|
||||
msg += chunk
|
||||
self.results.append(msgpack.unpackb(msg, raw=False))
|
||||
conn, _ = feedback.accept()
|
||||
with conn:
|
||||
while worker.poll() is None:
|
||||
_recv_results(conn)
|
||||
_recv_results(conn, timeout=0)
|
||||
stdout, stder = worker.communicate()
|
||||
self.stderr += stder.decode('utf8')
|
||||
self.stdout += stdout.decode('utf8')
|
||||
finally:
|
||||
self.complete = True
|
||||
|
||||
@@ -119,7 +161,7 @@ def run_playbooks(playfiles, nodes):
|
||||
runner._start_playbooks()
|
||||
|
||||
|
||||
def print_result(result, state, collector=None):
|
||||
def print_result(result, state, collector=None, callbacksock=None):
|
||||
output = {
|
||||
'task_name': result.task_name,
|
||||
'changed': result._result.get('changed', ''),
|
||||
@@ -130,12 +172,12 @@ def print_result(result, state, collector=None):
|
||||
del result._result['warnings']
|
||||
except KeyError:
|
||||
pass
|
||||
if collector:
|
||||
if state != 'ok' and collector and hasattr(collector, '_dump_results'):
|
||||
output['errorinfo'] = collector._dump_results(result._result)
|
||||
msg = msgpack.packb(output, use_bin_type=True)
|
||||
msglen = len(msg)
|
||||
sys.stdout.buffer.write(struct.pack('=q', msglen))
|
||||
sys.stdout.buffer.write(msg)
|
||||
callbacksock.sendall(struct.pack('=q', msglen))
|
||||
callbacksock.sendall(msg)
|
||||
|
||||
if __name__ == '__main__':
|
||||
from ansible.inventory.manager import InventoryManager
|
||||
@@ -149,16 +191,25 @@ if __name__ == '__main__':
|
||||
import ansible.plugins.loader
|
||||
import yaml
|
||||
|
||||
sockpath = os.environ.get('FEEDBACK_SOCK')
|
||||
if not sockpath:
|
||||
sys.stderr.write('No feedback socket specified\n')
|
||||
sys.exit(1)
|
||||
|
||||
callbacksock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
callbacksock.connect(sockpath)
|
||||
|
||||
|
||||
class ResultsCollector(CallbackBase):
|
||||
|
||||
def v2_runner_on_unreachable(self, result):
|
||||
print_result(result, 'UNREACHABLE', self)
|
||||
print_result(result, 'UNREACHABLE', self, callbacksock)
|
||||
|
||||
def v2_runner_on_ok(self, result, *args, **kwargs):
|
||||
print_result(result, 'ok')
|
||||
print_result(result, 'ok', self, callbacksock)
|
||||
|
||||
def v2_runner_on_failed(self, result, *args, **kwargs):
|
||||
print_result(result, 'FAILED', self)
|
||||
print_result(result, 'FAILED', self, callbacksock)
|
||||
|
||||
context.CLIARGS = ImmutableDict(
|
||||
connection='smart', module_path=['/usr/share/ansible'], forks=10,
|
||||
|
||||
Reference in New Issue
Block a user