From e185f2224f017b521b78ccd52160a19951bda84b Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 6 Mar 2026 16:24:26 -0500 Subject: [PATCH 1/2] Implement ability for user to kick off confluent ansible runs Add nodeapply -A and associated API. This permits orchestrating plays without touching the nodes directly by the user. --- confluent_client/bin/nodeapply | 52 ++++++++++++++++--- confluent_server/confluent/core.py | 12 ++++- confluent_server/confluent/messages.py | 6 +++ .../plugins/deployment/remotecofig.py | 38 ++++++++++++++ confluent_server/confluent/selfservice.py | 30 ++++++----- 5 files changed, 116 insertions(+), 22 deletions(-) create mode 100644 confluent_server/confluent/plugins/deployment/remotecofig.py diff --git a/confluent_client/bin/nodeapply b/confluent_client/bin/nodeapply index bf4b9a53..0e887db2 100755 --- a/confluent_client/bin/nodeapply +++ b/confluent_client/bin/nodeapply @@ -36,6 +36,36 @@ import confluent.client as client import confluent.sortutil as sortutil devnull = None +def run_automation(noderange, category, c): + automationbynode = {} + for res in c.update('/noderange/{0}/deployment/remote_config/run'.format(noderange), { + 'category': category, + }): + if 'error' in res: + sys.stderr.write(res['error'] + '\n') + exitcode |= res.get('errorcode', 1) + if 'created' in res: + nodename = res['created'].split('/')[2] + automationbynode[nodename] = res['created'] + while automationbynode: + for node in list(automationbynode): + for res in c.read(automationbynode[node]): + if 'error' in res: + sys.stderr.write(res['error'] + '\n') + exitcode |= res.get('errorcode', 1) + for result in res.get('results', []): + sys.stdout.write('{0}: Task [{1}] {2}\n'.format( + node, result['task_name'], result['state'])) + for warning in result.get('warnings', []): + sys.stderr.write('{0}: [WARNING] {1}\n'.format(node, warning)) + if 'errorinfo' in result: + for errorline in result['errorinfo'].splitlines(): + sys.stderr.write('{0}: [ERROR] {1}\n'.format(node, errorline)) + if res.get('complete', False): + del automationbynode[node] + sys.stdout.write('{0}: Automation complete\n'.format(node)) + + def run(): global devnull devnull = open(os.devnull, 'rb') @@ -51,6 +81,8 @@ def run(): help='Run the syncfiles associated with the currently completed OS profile on the noderange') argparser.add_option('-P', '--scripts', help='Re-run specified scripts, with full path under scripts, e.g. post.d/first,firstboot.d/second') + argparser.add_option('-A', '--automation', + help='Run the automation scripts associated with the current OS profile on the noderange, specifying category (onboot.d/firstboot.d/post.d)') argparser.add_option('-m', '--maxnodes', type='int', help='Specify a maximum number of ' 'nodes to run remote ssh command to, ' @@ -74,16 +106,13 @@ def run(): exitcode = 0 c.stop_if_noderange_over(args[0], options.maxnodes) + if options.automation: + run_automation(args[0], options.automation, c) + nodemap = {} cmdparms = [] nodes = [] - for res in c.read('/noderange/{0}/nodes/'.format(args[0])): - if 'error' in res: - sys.stderr.write(res['error'] + '\n') - exitcode |= res.get('errorcode', 1) - break - node = res['item']['href'][:-1] - nodes.append(node) + cmdstorun = [] if options.security: @@ -94,8 +123,17 @@ def run(): for script in options.scripts.split(','): cmdstorun.append(['run_remote', script]) if not cmdstorun: + if options.automation: + sys.exit(0) argparser.print_help() sys.exit(1) + for res in c.read('/noderange/{0}/nodes/'.format(args[0])): + if 'error' in res: + sys.stderr.write(res['error'] + '\n') + exitcode |= res.get('errorcode', 1) + break + node = res['item']['href'][:-1] + nodes.append(node) idxbynode = {} cmdvbase = ['bash', '/etc/confluent/functions'] for sshnode in nodes: diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index b368efbe..ee2e4a95 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -76,7 +76,7 @@ import shutil vinz = None pluginmap = {} -dispatch_plugins = (b'ipmi', u'ipmi', b'redfish', u'redfish', b'tsmsol', u'tsmsol', b'geist', u'geist', b'deltapdu', u'deltapdu', b'eatonpdu', u'eatonpdu', b'affluent', u'affluent', b'cnos', u'cnos', b'enos', u'enos') +dispatch_plugins = (b'remoteconfig', b'ipmi', u'ipmi', b'redfish', u'redfish', b'tsmsol', u'tsmsol', b'geist', u'geist', b'deltapdu', u'deltapdu', b'eatonpdu', u'eatonpdu', b'affluent', u'affluent', b'cnos', u'cnos', b'enos', u'enos') PluginCollection = plugin.PluginCollection @@ -476,7 +476,15 @@ def _init_core(): }), 'ident_image': PluginRoute({ 'handler': 'identimage' - }) + }), + 'remote_config': { + 'run': PluginRoute({ + 'handler': 'remoteconfig' + }), + 'active': PluginCollection({ + 'handler': 'remoteconfig' + }), + }, }, 'events': { 'hardware': { diff --git a/confluent_server/confluent/messages.py b/confluent_server/confluent/messages.py index aac7aa89..8844c05d 100644 --- a/confluent_server/confluent/messages.py +++ b/confluent_server/confluent/messages.py @@ -580,6 +580,8 @@ def get_input_message(path, operation, inputdata, nodes=None, multinode=False, return InputLicense(path, nodes, inputdata, configmanager) elif path == ['deployment', 'lock'] and inputdata: return InputDeploymentLock(path, nodes, inputdata) + elif path == ['deployment', 'remote_config', 'run'] and inputdata: + return InputRemoteConfig(path, nodes, inputdata) elif path == ['deployment', 'ident_image']: return InputIdentImage(path, nodes, inputdata) elif path == ['console', 'ikvm']: @@ -994,6 +996,10 @@ class InputDeploymentLock(ConfluentInputMessage): keyname = 'lock' valid_values = ['autolock', 'unlocked', 'locked'] +class InputRemoteConfig(ConfluentInputMessage): + keyname = 'category' + valid_values = ['post.d', 'firstboot.d', 'onboot.d'] + class DeploymentLock(ConfluentChoiceMessage): valid_values = set([ 'autolock', diff --git a/confluent_server/confluent/plugins/deployment/remotecofig.py b/confluent_server/confluent/plugins/deployment/remotecofig.py new file mode 100644 index 00000000..fd08c382 --- /dev/null +++ b/confluent_server/confluent/plugins/deployment/remotecofig.py @@ -0,0 +1,38 @@ +import confluent.selfservice as selfservice +import confluent.messages as msg +import confluent.runansible as runansible + +_user_initiated_runs = {} +def update(nodes, element, configmanager, inputdata): + if element[-1] != 'run': + raise ValueError('Invalid element for remoteconfig plugin') + for node in nodes: + category = inputdata.inputbynode[node]['category'] + playlist = selfservice.list_ansible_scripts(configmanager, node, category) + if playlist: + _user_initiated_runs[node] = True + runansible.run_playbooks(playlist, [node]) + yield msg.CreatedResource( + '/nodes/{0}/deployment/remote_config/active/{0}'.format(node)) + else: + yield msg.ConfluentNodeError('No remote configuration for category "{0}"', node) + +def retrieve(nodes, element, configmanager, inputdata): + for node in nodes: + if element[-1] == 'active': + rst = runansible.running_status.get(node, None) + if not rst: + return + yield msg.ChildCollection(node) + elif element[-2] == 'active' and element[-1] == node: + rst = runansible.running_status.get(node, None) + if not rst: + return + playstatus = rst.dump_dict() + if playstatus['complete'] and _user_initiated_runs.get(node, False): + del runansible.running_status[node] + del _user_initiated_runs[node] + yield msg.KeyValueData(playstatus, node) + + + diff --git a/confluent_server/confluent/selfservice.py b/confluent_server/confluent/selfservice.py index 72ffb765..44aa5619 100644 --- a/confluent_server/confluent/selfservice.py +++ b/confluent_server/confluent/selfservice.py @@ -519,19 +519,7 @@ def handle_request(env, start_response): yield '' elif env['PATH_INFO'].startswith('/self/remoteconfig/') and 'POST' == operation: scriptcat = env['PATH_INFO'].replace('/self/remoteconfig/', '') - playlist = [] - for privacy in ('public', 'private'): - slist, profile = get_scriptlist( - scriptcat, cfg, nodename, - '/var/lib/confluent/{0}/os/{{0}}/ansible/{{1}}'.format(privacy)) - dirname = '/var/lib/confluent/{2}/os/{0}/ansible/{1}/'.format( - profile, scriptcat, privacy) - if not os.path.isdir(dirname): - dirname = '/var/lib/confluent/{2}/os/{0}/ansible/{1}.d/'.format( - profile, scriptcat, privacy) - for filename in slist: - if filename.endswith('.yaml') or filename.endswith('.yml'): - playlist.append(os.path.join(dirname, filename)) + playlist = list_ansible_scripts(cfg, nodename, scriptcat) if playlist: runansible.run_playbooks(playlist, [nodename]) start_response('202 Queued', ()) @@ -603,6 +591,22 @@ def handle_request(env, start_response): start_response('404 Not Found', ()) yield 'Not found' +def list_ansible_scripts(cfg, nodename, scriptcat): + playlist = [] + for privacy in ('public', 'private'): + slist, profile = get_scriptlist( + scriptcat, cfg, nodename, + '/var/lib/confluent/{0}/os/{{0}}/ansible/{{1}}'.format(privacy)) + dirname = '/var/lib/confluent/{2}/os/{0}/ansible/{1}/'.format( + profile, scriptcat, privacy) + if not os.path.isdir(dirname): + dirname = '/var/lib/confluent/{2}/os/{0}/ansible/{1}.d/'.format( + profile, scriptcat, privacy) + for filename in slist: + if filename.endswith('.yaml') or filename.endswith('.yml'): + playlist.append(os.path.join(dirname, filename)) + return playlist + def get_scriptlist(scriptcat, cfg, nodename, pathtemplate): if '..' in scriptcat: return None, None From 1a87701fee4377720a61cd112e6bd070f9205bf7 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 9 Mar 2026 16:48:42 -0400 Subject: [PATCH 2/2] Fix ansible running Have results available as they happen change away from stdout, to avoid being stepped on by ansible modules that print to that --- confluent_server/confluent/runansible.py | 95 ++++++++++++++++++------ 1 file changed, 73 insertions(+), 22 deletions(-) diff --git a/confluent_server/confluent/runansible.py b/confluent_server/confluent/runansible.py index b82d8684..730ff15a 100644 --- a/confluent_server/confluent/runansible.py +++ b/confluent_server/confluent/runansible.py @@ -21,12 +21,16 @@ try: import eventlet.green.subprocess as subprocess except ImportError: pass +import base64 +import eventlet.green.select as select import shutil import json +import socket import msgpack import os import struct import sys +import tempfile anspypath = None running_status = {} @@ -38,6 +42,7 @@ class PlayRunner(object): self.worker = None self.results = [] self.complete = False + self.stdout = '' def _start_playbooks(self): self.worker = eventlet.spawn(self._really_run_playbooks) @@ -49,6 +54,7 @@ class PlayRunner(object): def dump_text(self): stderr = self.stderr + stdout = self.stdout retinfo = self.dump_dict() textout = '' for result in retinfo['results']: @@ -65,6 +71,9 @@ class PlayRunner(object): else: textout += result['state'] + '\n' textout += '\n' + if stdout: + textout += "OUTPUT **********************************\n" + textout += stdout if stderr: textout += "ERRORS **********************************\n" textout += stderr @@ -92,21 +101,54 @@ class PlayRunner(object): mypath = anspypath if not mypath: mypath = sys.executable - with open(os.devnull, 'w+') as devnull: - targnodes = ','.join(self.nodes) - for playfilename in self.playfiles: - worker = subprocess.Popen( - [mypath, __file__, targnodes, playfilename], - stdin=devnull, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stder = worker.communicate() - self.stderr += stder.decode('utf8') - current = memoryview(stdout) - while len(current): - sz = struct.unpack('=q', current[:8])[0] - result = msgpack.unpackb(current[8:8+sz], raw=False) - self.results.append(result) - current = current[8+sz:] + with tempfile.TemporaryDirectory() as tmpdir: + feedback = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + suffix = base64.urlsafe_b64encode(os.urandom(6)).decode('ascii') + sockpath = os.path.join(tmpdir, 'feedback.sock.' + suffix) + localenv = os.environ.copy() + localenv['FEEDBACK_SOCK'] = sockpath + feedback.bind(sockpath) + feedback.listen(1) + with feedback: + with open(os.devnull, 'w+') as devnull: + targnodes = ','.join(self.nodes) + for playfilename in self.playfiles: + worker = subprocess.Popen( + [mypath, __file__, targnodes, playfilename], + stdin=devnull, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, env=localenv) + rlist, _, _ = select.select([feedback], [], [], 10) + if not rlist: + raise RuntimeError( + f"Timed out waiting for feedback socket connection for playbook '{playfilename}'" + ) + + def _recv_results(sock, timeout=0.1): + while select.select([sock], [], [], timeout)[0]: + hdr = b'' + while len(hdr) < 8: + chunk = sock.recv(8 - len(hdr)) + if not chunk: + if not hdr: + return + raise RuntimeError("Socket closed while receiving message header") + hdr += chunk + msglen = struct.unpack('=q', hdr)[0] + msg = b'' + while len(msg) < msglen: + chunk = sock.recv(msglen - len(msg)) + if not chunk: + raise RuntimeError("Socket closed while receiving message") + msg += chunk + self.results.append(msgpack.unpackb(msg, raw=False)) + conn, _ = feedback.accept() + with conn: + while worker.poll() is None: + _recv_results(conn) + _recv_results(conn, timeout=0) + stdout, stder = worker.communicate() + self.stderr += stder.decode('utf8') + self.stdout += stdout.decode('utf8') finally: self.complete = True @@ -119,7 +161,7 @@ def run_playbooks(playfiles, nodes): runner._start_playbooks() -def print_result(result, state, collector=None): +def print_result(result, state, collector=None, callbacksock=None): output = { 'task_name': result.task_name, 'changed': result._result.get('changed', ''), @@ -130,12 +172,12 @@ def print_result(result, state, collector=None): del result._result['warnings'] except KeyError: pass - if collector: + if state != 'ok' and collector and hasattr(collector, '_dump_results'): output['errorinfo'] = collector._dump_results(result._result) msg = msgpack.packb(output, use_bin_type=True) msglen = len(msg) - sys.stdout.buffer.write(struct.pack('=q', msglen)) - sys.stdout.buffer.write(msg) + callbacksock.sendall(struct.pack('=q', msglen)) + callbacksock.sendall(msg) if __name__ == '__main__': from ansible.inventory.manager import InventoryManager @@ -149,16 +191,25 @@ if __name__ == '__main__': import ansible.plugins.loader import yaml + sockpath = os.environ.get('FEEDBACK_SOCK') + if not sockpath: + sys.stderr.write('No feedback socket specified\n') + sys.exit(1) + + callbacksock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + callbacksock.connect(sockpath) + + class ResultsCollector(CallbackBase): def v2_runner_on_unreachable(self, result): - print_result(result, 'UNREACHABLE', self) + print_result(result, 'UNREACHABLE', self, callbacksock) def v2_runner_on_ok(self, result, *args, **kwargs): - print_result(result, 'ok') + print_result(result, 'ok', self, callbacksock) def v2_runner_on_failed(self, result, *args, **kwargs): - print_result(result, 'FAILED', self) + print_result(result, 'FAILED', self, callbacksock) context.CLIARGS = ImmutableDict( connection='smart', module_path=['/usr/share/ansible'], forks=10,