2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-03-04 04:49:22 +00:00

Advance the state of asyncio port

This commit is contained in:
Jarrod Johnson
2024-03-15 12:50:04 -04:00
parent 887207b9fc
commit da63543a70
6 changed files with 149 additions and 89 deletions

View File

@@ -95,8 +95,8 @@ async def main():
if oneval != twoval:
print('Values did not match.')
argassign[arg] = twoval
session.stop_if_noderange_over(noderange, options.maxnodes)
exitcode=client.updateattrib(session,args,nodetype, noderange, options, argassign)
await session.stop_if_noderange_over(noderange, options.maxnodes)
exitcode = await client.updateattrib(session,args,nodetype, noderange, options, argassign)
try:
# setting user output to what the user inputs
if args[1] == 'all':

View File

@@ -1,4 +1,4 @@
#!/usr/bin/python2
#!/usr/bin/python3
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2015-2017 Lenovo
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import optparse
import os
import signal
@@ -85,4 +86,8 @@ if options.previous:
def outhandler(node, res):
for k in res[node]:
client.cprint('{0}: {1}: {2}'.format(node, k.replace('inlet_', ''), res[node][k]))
sys.exit(session.simple_noderange_command(noderange, '/power/{0}'.format(powurl), setstate, promptover=options.maxnodes, key='state', outhandler=outhandler))
async def main():
sys.exit(await session.simple_noderange_command(noderange, '/power/{0}'.format(powurl), setstate, promptover=options.maxnodes, key='state', outhandler=outhandler))
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

View File

@@ -258,7 +258,7 @@ class Command(object):
outhandler(node, res)
return rc
def simple_noderange_command(self, noderange, resource, input=None,
async def simple_noderange_command(self, noderange, resource, input=None,
key=None, errnodes=None, promptover=None, outhandler=None, **kwargs):
try:
self._currnoderange = noderange
@@ -271,13 +271,13 @@ class Command(object):
else:
ikey = key
if input is None:
for res in self.read('/noderange/{0}/{1}'.format(
async for res in self.read('/noderange/{0}/{1}'.format(
noderange, resource)):
rc = self.handle_results(ikey, rc, res, errnodes, outhandler)
else:
self.stop_if_noderange_over(noderange, promptover)
await self.stop_if_noderange_over(noderange, promptover)
kwargs[ikey] = input
for res in self.update('/noderange/{0}/{1}'.format(
async for res in self.update('/noderange/{0}/{1}'.format(
noderange, resource), kwargs):
rc = self.handle_results(ikey, rc, res, errnodes, outhandler)
self._currnoderange = None
@@ -292,8 +292,8 @@ class Command(object):
nsize = await self.get_noderange_size(noderange)
if nsize > maxnodes:
if nsize == 1:
nodename = list(self.read(
'/noderange/{0}/nodes/'.format(noderange)))[0].get('item', {}).get('href', None)
nodename = [x async for x in self.read(
'/noderange/{0}/nodes/'.format(noderange))][0].get('item', {}).get('href', None)
nodename = nodename[:-1]
p = getinput('Command is about to affect node {0}, continue (y/n)? '.format(nodename))
else:
@@ -680,7 +680,7 @@ def printgroupattributes(session, requestargs, showtype, nodetype, noderange, op
exitcode = 1
return exitcode
def updateattrib(session, updateargs, nodetype, noderange, options, dictassign=None):
async def updateattrib(session, updateargs, nodetype, noderange, options, dictassign=None):
# update attribute
exitcode = 0
if options.clear:
@@ -688,7 +688,7 @@ def updateattrib(session, updateargs, nodetype, noderange, options, dictassign=N
keydata = {}
for attrib in updateargs[1:]:
keydata[attrib] = None
for res in session.update(targpath, keydata):
async for res in session.update(targpath, keydata):
if 'error' in res:
if 'errorcode' in res:
exitcode = res['errorcode']
@@ -705,21 +705,21 @@ def updateattrib(session, updateargs, nodetype, noderange, options, dictassign=N
value = os.environ.get(
key, os.environ[key.upper()])
if (nodetype == "nodegroups"):
exitcode = session.simple_nodegroups_command(noderange,
exitcode = await ession.simple_nodegroups_command(noderange,
'attributes/all',
value, key)
else:
exitcode = session.simple_noderange_command(noderange,
exitcode = await session.simple_noderange_command(noderange,
'attributes/all',
value, key)
sys.exit(exitcode)
elif dictassign:
for key in dictassign:
if nodetype == 'nodegroups':
exitcode = session.simple_nodegroups_command(
exitcode = await session.simple_nodegroups_command(
noderange, 'attributes/all', dictassign[key], key)
else:
exitcode = session.simple_noderange_command(
exitcode = await session.simple_noderange_command(
noderange, 'attributes/all', dictassign[key], key)
else:
if "=" in updateargs[1]:
@@ -736,10 +736,10 @@ def updateattrib(session, updateargs, nodetype, noderange, options, dictassign=N
key = val[0]
value = val[1]
if (nodetype == "nodegroups"):
exitcode = session.simple_nodegroups_command(noderange, 'attributes/all',
exitcode = await session.simple_nodegroups_command(noderange, 'attributes/all',
value, key)
else:
exitcode = session.simple_noderange_command(noderange, 'attributes/all',
exitcode = await session.simple_noderange_command(noderange, 'attributes/all',
value, key)
except Exception:
sys.stderr.write('Error: {0} not a valid expression\n'.format(str(updateargs[1:])))

View File

@@ -50,6 +50,7 @@ import confluent.networking.macmap as macmap
import confluent.noderange as noderange
import confluent.osimage as osimage
import confluent.plugin as plugin
import types
try:
import confluent.shellmodule as shellmodule
except ImportError:
@@ -1109,10 +1110,10 @@ async def handle_node_request(configmanager, inputdata, operation,
'element': pathcomponents, 'configmanager': configmanager,
'inputdata': inputdata, 'operation': operation, 'isnoderange': isnoderange}))
if isnoderange or not autostrip:
return await iterate_queue(numworkers, passvalues)
return [x async for x in iterate_queue(numworkers, passvalues)]
else:
if numworkers > 0:
return await iterate_queue(numworkers, passvalues, nodes[0])
return [x async for x in iterate_queue(numworkers, passvalues, nodes[0])]
else:
raise exc.NotImplementedException()
@@ -1158,6 +1159,7 @@ async def addtoqueue(theq, fun, kwargs):
async for pv in result:
await theq.put(pv)
else:
print(repr(result))
for pv in result:
await theq.put(pv)
except Exception as e:

View File

@@ -39,6 +39,7 @@ import confluent.httpapi as httpapi
import confluent.log as log
import confluent.collective.manager as collective
import confluent.discovery.protocols.pxe as pxe
import linecache
from eventlet.asyncio import spawn_for_awaitable
try:
import confluent.sockapi as sockapi
@@ -60,6 +61,7 @@ try:
except ImportError:
havefcntl = False
#import multiprocessing
import asyncio
import gc
from greenlet import greenlet
import sys
@@ -74,6 +76,36 @@ import tempfile
import uuid
def format_stack(task):
task.print_stack()
extracted_list = []
checked = set()
for f in task.get_stack():
lineno = f.f_lineno
co = f.f_code
filename = co.co_filename
name = co.co_name
if filename not in checked:
checked.add(filename)
linecache.checkcache(filename)
line = linecache.getline(filename, lineno, f.f_globals)
extracted_list.append((filename, lineno, name, line))
exc = task._exception
if not extracted_list:
yield f'No stack for {task!r}'
elif exc is not None:
yield f'Traceback for {task!r} (most recent call last):'
else:
yield f'Stack for {task!r} (most recent call last):'
for x in traceback.format_list(extracted_list):
yield x
if exc is not None:
for line in traceback.format_exception_only(exc.__class__, exc):
yield line
def _daemonize():
if not 'fork' in os.__dict__:
return
@@ -182,6 +214,9 @@ def dumptrace(signalname, frame):
continue
ht.write('Thread trace: ({0})\n'.format(id(o)))
ht.write(''.join(traceback.format_stack(o.gr_frame)))
for atask in asyncio.all_tasks():
ht.write('Async trace: ({0})\n'.format(id(atask)))
ht.write(''.join([x for x in format_stack(atask)]))
ht.close()
def doexit():

View File

@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import atexit
import confluent.exceptions as exc
import confluent.firmwaremanager as firmwaremanager
@@ -33,7 +34,8 @@ import pwd
import aiohmi.constants as pygconstants
import aiohmi.exceptions as pygexc
import aiohmi.storage as storage
console = eventlet.import_patched('pyghmi.ipmi.console')
#console = eventlet.import_patched('pyghmi.ipmi.console')
import aiohmi.ipmi.console as console
import aiohmi.ipmi.command as ipmicommand
import socket
import ssl
@@ -97,11 +99,11 @@ class NullLock(object):
acquire = donothing
release = donothing
console.session.select = eventlet.green.select
console.session.threading = eventlet.green.threading
console.session.WAITING_SESSIONS = NullLock()
console.session.KEEPALIVE_SESSIONS = NullLock()
console.session.socket.getaddrinfo = eventlet.support.greendns.getaddrinfo
#console.session.select = eventlet.green.select
#console.session.threading = eventlet.green.threading
#console.session.WAITING_SESSIONS = NullLock()
#console.session.KEEPALIVE_SESSIONS = NullLock()
#console.session.socket.getaddrinfo = eventlet.support.greendns.getaddrinfo
def exithandler():
@@ -110,7 +112,7 @@ def exithandler():
atexit.register(exithandler)
_ipmiworkers = greenpool.GreenPool(512)
#_ipmiworkers = greenpool.GreenPool(512)
_ipmithread = None
_ipmiwaiters = []
@@ -176,18 +178,17 @@ def sanitize_invdata(indata):
class IpmiCommandWrapper(ipmicommand.Command):
@classmethod
async def create(cls, node, cfm, **kwargs):
self = cls()
kwargs['keepalive'] = False
self = await super().create(**kwargs)
self.cfm = cfm
self.node = node
self.sensormap = {}
self._inhealth = False
self._lasthealth = None
kwargs['keepalive'] = False
self._attribwatcher = cfm.watch_attributes(
(node,), ('secret.hardwaremanagementuser', 'collective.manager',
'secret.hardwaremanagementpassword', 'secret.ipmikg',
'hardwaremanagement.manager'), self._attribschanged)
await super().create(**kwargs)
self.setup_confluent_keyhandler()
try:
os.makedirs('/var/cache/confluent/ipmi/')
@@ -199,6 +200,7 @@ class IpmiCommandWrapper(ipmicommand.Command):
self.set_sdr_cachedir('/var/cache/confluent/ipmi/')
except Exception:
pass
return self
def setup_confluent_keyhandler(self):
@@ -233,10 +235,10 @@ class IpmiCommandWrapper(ipmicommand.Command):
return self._lasthealth
def _ipmi_evtloop():
async def _ipmi_evtloop():
while True:
try:
console.session.Session.wait_for_rsp(timeout=600)
await console.session.Session.wait_for_rsp(timeout=600)
while _ipmiwaiters:
waiter = _ipmiwaiters.pop()
waiter.send()
@@ -387,17 +389,19 @@ async def perform_requests(operator, nodes, element, cfg, inputdata, realop):
cfg.decrypt = True
configdata = cfg.get_node_attributes(nodes, _configattributes)
cfg.decrypt = cryptit
resultdata = queue.LightQueue()
resultdata = asyncio.Queue()
livingthreads = set([])
numnodes = len(nodes)
for node in nodes:
livingthreads.add(_ipmiworkers.spawn(
perform_request, operator, node, element, configdata, inputdata,
cfg, resultdata, realop))
livingthreads.add(asyncio.create_task(perform_request(operator, node, element, configdata, inputdata,
cfg, resultdata, realop)))
#livingthreads.add(_ipmiworkers.spawn(
# perform_request, operator, node, element, configdata, inputdata,
# cfg, resultdata, realop))
while livingthreads:
try:
bundle = []
datum = resultdata.get(timeout=10)
datum = await asyncio.wait_for(resultdata.get(), timeout=10.0)
while datum:
if datum != 'Done':
if isinstance(datum, Exception):
@@ -409,56 +413,60 @@ async def perform_requests(operator, nodes, element, cfg, inputdata, realop):
else:
yield datum
timeout = 0.1 if numnodes else 0.001
datum = resultdata.get(timeout=timeout)
except queue.Empty:
datum = await asyncio.wait_for(resultdata.get(), timeout=timeout)
except asyncio.QueueEmpty:
pass
except asyncio.TimeoutError:
pass
finally:
for datum in sorted(
bundle, key=lambda x: util.naturalize_string(x[0])):
yield datum[1]
for t in list(livingthreads):
if t.dead:
if t.done():
livingthreads.discard(t)
try:
# drain queue if a thread put something on the queue and died
while True:
datum = resultdata.get_nowait()
datum = await resultdata.get_nowait()
if datum != 'Done':
yield datum
except queue.Empty:
except asyncio.QueueEmpty:
pass
def perform_request(operator, node, element,
async def perform_request(operator, node, element,
configdata, inputdata, cfg, results, realop):
try:
return IpmiHandler.create(operator, node, element, configdata, inputdata,
cfg, results, realop).handle_request()
ih = await IpmiHandler.create(operator, node, element, configdata, inputdata,
cfg, results, realop)
return await ih.handle_request()
except pygexc.IpmiException as ipmiexc:
excmsg = str(ipmiexc)
if excmsg in ('Session no longer connected', 'timeout'):
results.put(msg.ConfluentTargetTimeout(node))
await results.put(msg.ConfluentTargetTimeout(node))
else:
results.put(msg.ConfluentNodeError(node, excmsg))
raise
await results.put(msg.ConfluentNodeError(node, excmsg))
#raise
except exc.TargetEndpointUnreachable as tu:
results.put(msg.ConfluentTargetTimeout(node, str(tu)))
await results.put(msg.ConfluentTargetTimeout(node, str(tu)))
except ssl.SSLEOFError:
results.put(msg.ConfluentNodeError(
await results.put(msg.ConfluentNodeError(
node, 'Unable to communicate with the https server on '
'the target BMC'))
except exc.PubkeyInvalid:
results.put(msg.ConfluentNodeError(
await results.put(msg.ConfluentNodeError(
node,
'Mismatch detected between target certificate fingerprint '
'and pubkeys.tls_hardwaremanager attribute'))
except pygexc.InvalidParameterValue as e:
results.put(msg.ConfluentNodeError(node, str(e)))
await results.put(msg.ConfluentNodeError(node, str(e)))
except Exception as e:
results.put(msg.ConfluentNodeError(node, 'Unexpected Error: {0}'.format(str(e))))
await results.put(msg.ConfluentNodeError(node, 'Unexpected Error: {0}'.format(str(e))))
traceback.print_exc()
finally:
results.put('Done')
await results.put('Done')
persistent_ipmicmds = {}
@@ -473,7 +481,7 @@ class IpmiHandler:
self.sensorcategory = None
self.broken = False
self.error = None
eventlet.sleep(0)
await asyncio.sleep(0)
self.cfg = cfd[node]
self.current_user = cfg.current_user
self.loggedin = False
@@ -491,7 +499,7 @@ class IpmiHandler:
persistent_ipmicmds[(node, tenant)].ipmi_session.broken):
try:
persistent_ipmicmds[(node, tenant)].close_confluent()
persistent_ipmicmds[(node, tenant)].ipmi_session._mark_broken()
await persistent_ipmicmds[(node, tenant)].ipmi_session._mark_broken()
persistent_ipmicmds[(node, tenant)].ipmi_session.logonwaiters = []
except KeyError: # was no previous session
pass
@@ -500,18 +508,20 @@ class IpmiHandler:
node, cfg, bmc=connparams['bmc'],
userid=connparams['username'],
password=connparams['passphrase'], kg=connparams['kg'],
port=connparams['port'], onlogon=self.logged)
port=connparams['port'])
ipmisess = persistent_ipmicmds[(node, tenant)].ipmi_session
self.ipmicmd = persistent_ipmicmds[(node, tenant)]
self.loggedin = True
begin = util.monotonic_time()
while ((not (ipmisess.broken or self.loggedin)) and
(util.monotonic_time() - begin) < 30):
ipmisess.wait_for_rsp(31 - (util.monotonic_time() - begin))
await ipmisess.wait_for_rsp(31 - (util.monotonic_time() - begin))
if self.broken or self.loggedin:
break
cfd = cfg.get_node_attributes(node, _configattributes, decrypt=True)
self.cfg = cfd[node]
connparams = get_conn_params(node, self.cfg)
ipmisess._mark_broken()
await ipmisess._mark_broken()
# raise exc.TargetEndpointUnreachable(
# "Login process to " + connparams['bmc'] + " died")
except socket.gaierror as ge:
@@ -524,6 +534,7 @@ class IpmiHandler:
self.ipmicmd.ipmi_session.wait_for_rsp(3)
if util.monotonic_time() > giveup:
self.ipmicmd.ipmi_session.broken = True
return self
bootdevices = {
'optical': 'cd'
@@ -537,7 +548,7 @@ class IpmiHandler:
self.ipmicmd = ipmicmd
self.loggedin = True
def handle_request(self):
async def handle_request(self):
if self.broken:
if (self.error == 'timeout' or
'Insufficient resources' in self.error):
@@ -557,7 +568,7 @@ class IpmiHandler:
else:
raise Exception(self.error)
if self.element == ['power', 'state']:
self.power()
await self.power()
elif self.element == ['_enclosure', 'reseat_bay']:
self.reseat_bay()
elif self.element == ['boot', 'nextdevice']:
@@ -571,7 +582,7 @@ class IpmiHandler:
elif self.element[:2] == ['configuration', 'storage']:
self.handle_storage()
elif self.element[0] == 'configuration':
self.handle_configuration()
await self.handle_configuration()
elif self.element[:3] == ['inventory', 'firmware', 'updates']:
self.handle_update()
elif self.element[0] == 'inventory':
@@ -641,7 +652,7 @@ class IpmiHandler:
for media in self.ipmicmd.list_media():
self.output.put(msg.Media(self.node, media))
def handle_configuration(self):
async def handle_configuration(self):
if self.element[1:3] == ['management_controller', 'alerts']:
return self.handle_alerts()
elif self.element[1:3] == ['management_controller', 'users']:
@@ -653,7 +664,7 @@ class IpmiHandler:
elif self.element[1:3] == ['management_controller', 'identifier']:
return self.handle_identifier()
elif self.element[1:3] == ['management_controller', 'hostname']:
return self.handle_hostname()
return await self.handle_hostname()
elif self.element[1:3] == ['management_controller', 'domain_name']:
return self.handle_domain_name()
elif self.element[1:3] == ['management_controller', 'ntp']:
@@ -1392,10 +1403,10 @@ class IpmiHandler:
self.output.put(msg.IdentifyState(node=self.node, state=''))
return
def power(self):
async def power(self):
if 'read' == self.op:
power = self.ipmicmd.get_power()
self.output.put(msg.PowerState(node=self.node,
power = await self.ipmicmd.get_power()
await self.output.put(msg.PowerState(node=self.node,
state=power['powerstate']))
return
elif 'update' == self.op:
@@ -1403,22 +1414,23 @@ class IpmiHandler:
oldpower = None
waitamount = 30
if powerstate == 'boot':
oldpower = self.ipmicmd.get_power()
oldpower = await self.ipmicmd.get_power()
if 'powerstate' in oldpower:
oldpower = oldpower['powerstate']
elif powerstate == 'shutdown':
waitamount = True
self.ipmicmd.set_power(powerstate, wait=waitamount)
await self.ipmicmd.set_power(powerstate, wait=waitamount)
if powerstate == 'boot' and oldpower == 'on':
power = {'powerstate': 'reset'}
else:
power = self.ipmicmd.get_power()
power = await self.ipmicmd.get_power()
if powerstate == 'reset' and power['powerstate'] == 'on':
power['powerstate'] = 'reset'
self.output.put(msg.PowerState(node=self.node,
state=power['powerstate'],
oldstate=oldpower))
await self.output.put(
msg.PowerState(node=self.node,
state=power['powerstate'],
oldstate=oldpower))
return
def handle_reset(self):
@@ -1440,14 +1452,14 @@ class IpmiHandler:
self.ipmicmd.set_mci(mci)
return
def handle_hostname(self):
async def handle_hostname(self):
if 'read' == self.op:
hostname = self.ipmicmd.get_hostname()
self.output.put(msg.Hostname(self.node, hostname))
hostname = await self.ipmicmd.get_hostname()
await self.output.put(msg.Hostname(self.node, hostname))
return
elif 'update' == self.op:
hostname = self.inputdata.hostname(self.node)
self.ipmicmd.set_hostname(hostname)
await self.ipmicmd.set_hostname(hostname)
return
def handle_domain_name(self):
@@ -1637,7 +1649,7 @@ def _str_health(health):
def initthread():
global _ipmithread
if _ipmithread is None:
_ipmithread = eventlet.spawn(_ipmi_evtloop)
_ipmithread = asyncio.create_task(_ipmi_evtloop())
async def create(nodes, element, configmanager, inputdata, realop='create'):
@@ -1645,10 +1657,12 @@ async def create(nodes, element, configmanager, inputdata, realop='create'):
if element == ['_console', 'session']:
if len(nodes) > 1:
raise Exception("_console/session does not support multiple nodes")
return IpmiConsole(nodes[0], configmanager)
yield IpmiConsole(nodes[0], configmanager)
return
else:
return await perform_requests(
'update', nodes, element, configmanager, inputdata, realop)
async for ret in perform_requests(
'update', nodes, element, configmanager, inputdata, realop):
yield ret
def update(nodes, element, configmanager, inputdata):
@@ -1659,17 +1673,21 @@ def update(nodes, element, configmanager, inputdata):
async def retrieve(nodes, element, configmanager, inputdata):
initthread()
if '/'.join(element).startswith('inventory/firmware/updates/active'):
return firmwaremanager.list_updates(nodes, configmanager.tenant,
element)
async for ret in firmwaremanager.list_updates(nodes, configmanager.tenant,
element):
yield ret
elif '/'.join(element).startswith('media/uploads'):
return firmwaremanager.list_updates(nodes, configmanager.tenant,
element, 'mediaupload')
async for ret in firmwaremanager.list_updates(nodes, configmanager.tenant,
element, 'mediaupload'):
yield ret
elif '/'.join(element).startswith('support/servicedata'):
return firmwaremanager.list_updates(nodes, configmanager.tenant,
element, 'ffdc')
async for ret in firmwaremanager.list_updates(nodes, configmanager.tenant,
element, 'ffdc'):
yield ret
else:
return await perform_requests('read', nodes, element, configmanager,
inputdata, 'read')
async for ret in perform_requests('read', nodes, element, configmanager,
inputdata, 'read'):
yield ret
def delete(nodes, element, configmanager, inputdata):
initthread()