2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-03-17 23:59:23 +00:00

Merge branch 'master' into async

This commit is contained in:
Jarrod Johnson
2026-03-09 17:30:04 -04:00
6 changed files with 140 additions and 16 deletions

View File

@@ -36,6 +36,36 @@ import confluent.client as client
import confluent.sortutil as sortutil
devnull = None
def run_automation(noderange, category, c):
automationbynode = {}
for res in c.update('/noderange/{0}/deployment/remote_config/run'.format(noderange), {
'category': category,
}):
if 'error' in res:
sys.stderr.write(res['error'] + '\n')
exitcode |= res.get('errorcode', 1)
if 'created' in res:
nodename = res['created'].split('/')[2]
automationbynode[nodename] = res['created']
while automationbynode:
for node in list(automationbynode):
for res in c.read(automationbynode[node]):
if 'error' in res:
sys.stderr.write(res['error'] + '\n')
exitcode |= res.get('errorcode', 1)
for result in res.get('results', []):
sys.stdout.write('{0}: Task [{1}] {2}\n'.format(
node, result['task_name'], result['state']))
for warning in result.get('warnings', []):
sys.stderr.write('{0}: [WARNING] {1}\n'.format(node, warning))
if 'errorinfo' in result:
for errorline in result['errorinfo'].splitlines():
sys.stderr.write('{0}: [ERROR] {1}\n'.format(node, errorline))
if res.get('complete', False):
del automationbynode[node]
sys.stdout.write('{0}: Automation complete\n'.format(node))
def run():
global devnull
devnull = open(os.devnull, 'rb')
@@ -51,6 +81,8 @@ def run():
help='Run the syncfiles associated with the currently completed OS profile on the noderange')
argparser.add_option('-P', '--scripts',
help='Re-run specified scripts, with full path under scripts, e.g. post.d/first,firstboot.d/second')
argparser.add_option('-A', '--automation',
help='Run the automation scripts associated with the current OS profile on the noderange, specifying category (onboot.d/firstboot.d/post.d)')
argparser.add_option('-m', '--maxnodes', type='int',
help='Specify a maximum number of '
'nodes to run remote ssh command to, '
@@ -74,16 +106,13 @@ def run():
exitcode = 0
c.stop_if_noderange_over(args[0], options.maxnodes)
if options.automation:
run_automation(args[0], options.automation, c)
nodemap = {}
cmdparms = []
nodes = []
for res in c.read('/noderange/{0}/nodes/'.format(args[0])):
if 'error' in res:
sys.stderr.write(res['error'] + '\n')
exitcode |= res.get('errorcode', 1)
break
node = res['item']['href'][:-1]
nodes.append(node)
cmdstorun = []
if options.security:
@@ -94,8 +123,17 @@ def run():
for script in options.scripts.split(','):
cmdstorun.append(['run_remote', script])
if not cmdstorun:
if options.automation:
sys.exit(0)
argparser.print_help()
sys.exit(1)
for res in c.read('/noderange/{0}/nodes/'.format(args[0])):
if 'error' in res:
sys.stderr.write(res['error'] + '\n')
exitcode |= res.get('errorcode', 1)
break
node = res['item']['href'][:-1]
nodes.append(node)
idxbynode = {}
cmdvbase = ['bash', '/etc/confluent/functions']
for sshnode in nodes:

View File

@@ -69,7 +69,7 @@ import shutil
vinz = None
pluginmap = {}
dispatch_plugins = (b'ipmi', u'ipmi', b'redfish', u'redfish', b'tsmsol', u'tsmsol', b'geist', u'geist', b'deltapdu', u'deltapdu', b'eatonpdu', u'eatonpdu', b'affluent', u'affluent', b'cnos', u'cnos', b'enos', u'enos')
dispatch_plugins = (b'remoteconfig', b'ipmi', u'ipmi', b'redfish', u'redfish', b'tsmsol', u'tsmsol', b'geist', u'geist', b'deltapdu', u'deltapdu', b'eatonpdu', u'eatonpdu', b'affluent', u'affluent', b'cnos', u'cnos', b'enos', u'enos')
PluginCollection = plugin.PluginCollection
@@ -498,7 +498,15 @@ def _init_core():
}),
'ident_image': PluginRoute({
'handler': 'identimage'
})
}),
'remote_config': {
'run': PluginRoute({
'handler': 'remoteconfig'
}),
'active': PluginCollection({
'handler': 'remoteconfig'
}),
},
},
'events': {
'hardware': {

View File

@@ -580,6 +580,8 @@ def get_input_message(path, operation, inputdata, nodes=None, multinode=False,
return InputLicense(path, nodes, inputdata, configmanager)
elif path == ['deployment', 'lock'] and inputdata:
return InputDeploymentLock(path, nodes, inputdata)
elif path == ['deployment', 'remote_config', 'run'] and inputdata:
return InputRemoteConfig(path, nodes, inputdata)
elif path == ['deployment', 'ident_image']:
return InputIdentImage(path, nodes, inputdata)
elif path == ['console', 'ikvm']:
@@ -999,6 +1001,10 @@ class InputDeploymentLock(ConfluentInputMessage):
keyname = 'lock'
valid_values = ['autolock', 'unlocked', 'locked']
class InputRemoteConfig(ConfluentInputMessage):
keyname = 'category'
valid_values = ['post.d', 'firstboot.d', 'onboot.d']
class DeploymentLock(ConfluentChoiceMessage):
valid_values = set([
'autolock',

View File

@@ -0,0 +1,38 @@
import confluent.selfservice as selfservice
import confluent.messages as msg
import confluent.runansible as runansible
_user_initiated_runs = {}
def update(nodes, element, configmanager, inputdata):
if element[-1] != 'run':
raise ValueError('Invalid element for remoteconfig plugin')
for node in nodes:
category = inputdata.inputbynode[node]['category']
playlist = selfservice.list_ansible_scripts(configmanager, node, category)
if playlist:
_user_initiated_runs[node] = True
runansible.run_playbooks(playlist, [node])
yield msg.CreatedResource(
'/nodes/{0}/deployment/remote_config/active/{0}'.format(node))
else:
yield msg.ConfluentNodeError('No remote configuration for category "{0}"', node)
def retrieve(nodes, element, configmanager, inputdata):
for node in nodes:
if element[-1] == 'active':
rst = runansible.running_status.get(node, None)
if not rst:
return
yield msg.ChildCollection(node)
elif element[-2] == 'active' and element[-1] == node:
rst = runansible.running_status.get(node, None)
if not rst:
return
playstatus = rst.dump_dict()
if playstatus['complete'] and _user_initiated_runs.get(node, False):
del runansible.running_status[node]
del _user_initiated_runs[node]
yield msg.KeyValueData(playstatus, node)

View File

@@ -21,12 +21,15 @@ try:
except ImportError:
pass
import asyncio
import base64
import shutil
import json
import socket
import msgpack
import os
import struct
import sys
import tempfile
anspypath = None
running_status = {}
@@ -38,6 +41,7 @@ class PlayRunner(object):
self.worker = None
self.results = []
self.complete = False
self.stdout = ''
def _start_playbooks(self):
self.worker = tasks.spawn(self._really_run_playbooks())
@@ -49,6 +53,7 @@ class PlayRunner(object):
def dump_text(self):
stderr = self.stderr
stdout = self.stdout
retinfo = self.dump_dict()
textout = ''
for result in retinfo['results']:
@@ -65,6 +70,9 @@ class PlayRunner(object):
else:
textout += result['state'] + '\n'
textout += '\n'
if stdout:
textout += "OUTPUT **********************************\n"
textout += stdout
if stderr:
textout += "ERRORS **********************************\n"
textout += stderr
@@ -119,7 +127,7 @@ def run_playbooks(playfiles, nodes):
runner._start_playbooks()
def print_result(result, state, collector=None):
def print_result(result, state, collector=None, callbacksock=None):
output = {
'task_name': result.task_name,
'changed': result._result.get('changed', ''),
@@ -130,12 +138,12 @@ def print_result(result, state, collector=None):
del result._result['warnings']
except KeyError:
pass
if collector:
if state != 'ok' and collector and hasattr(collector, '_dump_results'):
output['errorinfo'] = collector._dump_results(result._result)
msg = msgpack.packb(output, use_bin_type=True)
msglen = len(msg)
sys.stdout.buffer.write(struct.pack('=q', msglen))
sys.stdout.buffer.write(msg)
callbacksock.sendall(struct.pack('=q', msglen))
callbacksock.sendall(msg)
if __name__ == '__main__':
from ansible.inventory.manager import InventoryManager
@@ -149,16 +157,25 @@ if __name__ == '__main__':
import ansible.plugins.loader
import yaml
sockpath = os.environ.get('FEEDBACK_SOCK')
if not sockpath:
sys.stderr.write('No feedback socket specified\n')
sys.exit(1)
callbacksock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
callbacksock.connect(sockpath)
class ResultsCollector(CallbackBase):
def v2_runner_on_unreachable(self, result):
print_result(result, 'UNREACHABLE', self)
print_result(result, 'UNREACHABLE', self, callbacksock)
def v2_runner_on_ok(self, result, *args, **kwargs):
print_result(result, 'ok')
print_result(result, 'ok', self, callbacksock)
def v2_runner_on_failed(self, result, *args, **kwargs):
print_result(result, 'FAILED', self)
print_result(result, 'FAILED', self, callbacksock)
context.CLIARGS = ImmutableDict(
connection='smart', module_path=['/usr/share/ansible'], forks=10,

View File

@@ -547,6 +547,23 @@ async def handle_request(req, make_response, mimetype):
else:
return await make_response(mimetype, 404, 'Not Found', body='Not found')
def list_ansible_scripts(cfg, nodename, scriptcat):
playlist = []
for privacy in ('public', 'private'):
slist, profile = get_scriptlist(
scriptcat, cfg, nodename,
'/var/lib/confluent/{0}/os/{{0}}/ansible/{{1}}'.format(privacy))
dirname = '/var/lib/confluent/{2}/os/{0}/ansible/{1}/'.format(
profile, scriptcat, privacy)
if not os.path.isdir(dirname):
dirname = '/var/lib/confluent/{2}/os/{0}/ansible/{1}.d/'.format(
profile, scriptcat, privacy)
for filename in slist:
if filename.endswith('.yaml') or filename.endswith('.yml'):
playlist.append(os.path.join(dirname, filename))
return playlist
def get_scriptlist(scriptcat, cfg, nodename, pathtemplate):
if '..' in scriptcat:
return None, None