diff --git a/confluent_server/confluent/runansible.py b/confluent_server/confluent/runansible.py index b82d8684..730ff15a 100644 --- a/confluent_server/confluent/runansible.py +++ b/confluent_server/confluent/runansible.py @@ -21,12 +21,16 @@ try: import eventlet.green.subprocess as subprocess except ImportError: pass +import base64 +import eventlet.green.select as select import shutil import json +import socket import msgpack import os import struct import sys +import tempfile anspypath = None running_status = {} @@ -38,6 +42,7 @@ class PlayRunner(object): self.worker = None self.results = [] self.complete = False + self.stdout = '' def _start_playbooks(self): self.worker = eventlet.spawn(self._really_run_playbooks) @@ -49,6 +54,7 @@ class PlayRunner(object): def dump_text(self): stderr = self.stderr + stdout = self.stdout retinfo = self.dump_dict() textout = '' for result in retinfo['results']: @@ -65,6 +71,9 @@ class PlayRunner(object): else: textout += result['state'] + '\n' textout += '\n' + if stdout: + textout += "OUTPUT **********************************\n" + textout += stdout if stderr: textout += "ERRORS **********************************\n" textout += stderr @@ -92,21 +101,54 @@ class PlayRunner(object): mypath = anspypath if not mypath: mypath = sys.executable - with open(os.devnull, 'w+') as devnull: - targnodes = ','.join(self.nodes) - for playfilename in self.playfiles: - worker = subprocess.Popen( - [mypath, __file__, targnodes, playfilename], - stdin=devnull, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stder = worker.communicate() - self.stderr += stder.decode('utf8') - current = memoryview(stdout) - while len(current): - sz = struct.unpack('=q', current[:8])[0] - result = msgpack.unpackb(current[8:8+sz], raw=False) - self.results.append(result) - current = current[8+sz:] + with tempfile.TemporaryDirectory() as tmpdir: + feedback = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + suffix = base64.urlsafe_b64encode(os.urandom(6)).decode('ascii') + sockpath = os.path.join(tmpdir, 'feedback.sock.' + suffix) + localenv = os.environ.copy() + localenv['FEEDBACK_SOCK'] = sockpath + feedback.bind(sockpath) + feedback.listen(1) + with feedback: + with open(os.devnull, 'w+') as devnull: + targnodes = ','.join(self.nodes) + for playfilename in self.playfiles: + worker = subprocess.Popen( + [mypath, __file__, targnodes, playfilename], + stdin=devnull, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, env=localenv) + rlist, _, _ = select.select([feedback], [], [], 10) + if not rlist: + raise RuntimeError( + f"Timed out waiting for feedback socket connection for playbook '{playfilename}'" + ) + + def _recv_results(sock, timeout=0.1): + while select.select([sock], [], [], timeout)[0]: + hdr = b'' + while len(hdr) < 8: + chunk = sock.recv(8 - len(hdr)) + if not chunk: + if not hdr: + return + raise RuntimeError("Socket closed while receiving message header") + hdr += chunk + msglen = struct.unpack('=q', hdr)[0] + msg = b'' + while len(msg) < msglen: + chunk = sock.recv(msglen - len(msg)) + if not chunk: + raise RuntimeError("Socket closed while receiving message") + msg += chunk + self.results.append(msgpack.unpackb(msg, raw=False)) + conn, _ = feedback.accept() + with conn: + while worker.poll() is None: + _recv_results(conn) + _recv_results(conn, timeout=0) + stdout, stder = worker.communicate() + self.stderr += stder.decode('utf8') + self.stdout += stdout.decode('utf8') finally: self.complete = True @@ -119,7 +161,7 @@ def run_playbooks(playfiles, nodes): runner._start_playbooks() -def print_result(result, state, collector=None): +def print_result(result, state, collector=None, callbacksock=None): output = { 'task_name': result.task_name, 'changed': result._result.get('changed', ''), @@ -130,12 +172,12 @@ def print_result(result, state, collector=None): del result._result['warnings'] except KeyError: pass - if collector: + if state != 'ok' and collector and hasattr(collector, '_dump_results'): output['errorinfo'] = collector._dump_results(result._result) msg = msgpack.packb(output, use_bin_type=True) msglen = len(msg) - sys.stdout.buffer.write(struct.pack('=q', msglen)) - sys.stdout.buffer.write(msg) + callbacksock.sendall(struct.pack('=q', msglen)) + callbacksock.sendall(msg) if __name__ == '__main__': from ansible.inventory.manager import InventoryManager @@ -149,16 +191,25 @@ if __name__ == '__main__': import ansible.plugins.loader import yaml + sockpath = os.environ.get('FEEDBACK_SOCK') + if not sockpath: + sys.stderr.write('No feedback socket specified\n') + sys.exit(1) + + callbacksock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + callbacksock.connect(sockpath) + + class ResultsCollector(CallbackBase): def v2_runner_on_unreachable(self, result): - print_result(result, 'UNREACHABLE', self) + print_result(result, 'UNREACHABLE', self, callbacksock) def v2_runner_on_ok(self, result, *args, **kwargs): - print_result(result, 'ok') + print_result(result, 'ok', self, callbacksock) def v2_runner_on_failed(self, result, *args, **kwargs): - print_result(result, 'FAILED', self) + print_result(result, 'FAILED', self, callbacksock) context.CLIARGS = ImmutableDict( connection='smart', module_path=['/usr/share/ansible'], forks=10,