diff --git a/confluent_server/confluent/plugins/hardwaremanagement/redfish.py b/confluent_server/confluent/plugins/hardwaremanagement/redfish.py index 7efda5c8..867496a6 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/redfish.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/redfish.py @@ -13,24 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import confluent.exceptions as exc import confluent.firmwaremanager as firmwaremanager import confluent.messages as msg import confluent.util as util import copy import errno -import eventlet -import eventlet.event -import eventlet.greenpool as greenpool -import eventlet.queue as queue -import eventlet.support.greendns from fnmatch import fnmatch import os import pwd -import pyghmi.constants as pygconstants -import pyghmi.exceptions as pygexc -import pyghmi.storage as storage -ipmicommand = eventlet.import_patched('pyghmi.redfish.command') +import aiohmi.constants as pygconstants +import aiohmi.exceptions as pygexc +import aiohmi.storage as storage +import aiohmi.redfish.command as ipmicommand import socket import ssl import traceback @@ -41,8 +37,9 @@ if not hasattr(ssl, 'SSLEOFError'): pci_cache = {} def get_dns_txt(qstring): - return eventlet.support.greendns.resolver.query( - qstring, 'TXT')[0].strings[0].replace('i=', '') + return None + # return eventlet.support.greendns.resolver.query( + # qstring, 'TXT')[0].strings[0].replace('i=', '') def get_pci_text_from_ids(subdevice, subvendor, device, vendor): fqpi = '{0}.{1}.{2}.{3}'.format(subdevice, subvendor, device, vendor) @@ -87,10 +84,6 @@ class NullLock(object): acquire = donothing release = donothing -_ipmiworkers = greenpool.GreenPool() - -_ipmithread = None -_ipmiwaiters = [] sensor_categories = { 'temperature': frozenset(['Temperature']), @@ -114,7 +107,7 @@ def hex2bin(hexstring): if len(hexvals) < 2: hexvals = hexstring.split(' ') if len(hexvals) < 2: - hexvals = [hexstring[i:i+2] for i in xrange(0, len(hexstring), 2)] + hexvals = [hexstring[i:i+2] for i in range(0, len(hexstring), 2)] bytedata = [int(i, 16) for i in hexvals] return bytearray(bytedata) @@ -151,7 +144,12 @@ def sanitize_invdata(indata): class IpmiCommandWrapper(ipmicommand.Command): - def __init__(self, node, cfm, **kwargs): + @classmethod + async def create(cls, node, cfm, **kwargs): + kv = util.TLSCertVerifier( + cfm, node, 'pubkeys.tls_hardwaremanager').verify_cert + kwargs['verifycallback'] = kv + self = await super().create(**kwargs) #kwargs['pool'] = eventlet.greenpool.GreenPool(4) #Some BMCs at the time of this writing crumble under the weight #of 4 concurrent requests. For now give up on this optimization. @@ -163,11 +161,9 @@ class IpmiCommandWrapper(ipmicommand.Command): (node,), ('secret.hardwaremanagementuser', 'collective.manager', 'secret.hardwaremanagementpassword', 'hardwaremanagement.manager'), self._attribschanged) - kv = util.TLSCertVerifier(cfm, node, - 'pubkeys.tls_hardwaremanager').verify_cert - kwargs['verifycallback'] = kv + try: - super(IpmiCommandWrapper, self).__init__(**kwargs) + pass except socket.error as se: if (hasattr(se, 'errno') and se.errno in (errno.ENETUNREACH, errno.EHOSTUNREACH, errno.EADDRNOTAVAIL)): @@ -184,6 +180,7 @@ class IpmiCommandWrapper(ipmicommand.Command): if 'Redfish not ready' in str(pe): raise exc.TargetEndpointUnreachable('Redfish is not supported by this system or is not yet ready') raise + return self def close_confluent(self): if self._attribwatcher: @@ -196,10 +193,10 @@ class IpmiCommandWrapper(ipmicommand.Command): except KeyError: pass - def get_health(self): + async def get_health(self): if self._inhealth: while self._inhealth: - eventlet.sleep(0.1) + await asyncio.sleep(0.1) return self._lasthealth self._inhealth = True try: @@ -211,17 +208,6 @@ class IpmiCommandWrapper(ipmicommand.Command): return self._lasthealth -def _ipmi_evtloop(): - while True: - try: - console.session.Session.wait_for_rsp(timeout=600) - while _ipmiwaiters: - waiter = _ipmiwaiters.pop() - waiter.send() - except: # TODO(jbjohnso): log the trace into the log - traceback.print_exc() - - def get_conn_params(node, configdata): if 'secret.hardwaremanagementuser' in configdata: username = configdata['secret.hardwaremanagementuser']['value'] @@ -256,22 +242,22 @@ def _donothing(data): pass -def perform_requests(operator, nodes, element, cfg, inputdata, realop): +async def perform_requests(operator, nodes, element, cfg, inputdata, realop): cryptit = cfg.decrypt cfg.decrypt = True configdata = cfg.get_node_attributes(nodes, _configattributes) cfg.decrypt = cryptit - resultdata = queue.LightQueue() + resultdata = asyncio.Queue() livingthreads = set([]) numnodes = len(nodes) for node in nodes: - livingthreads.add(_ipmiworkers.spawn( - perform_request, operator, node, element, configdata, inputdata, - cfg, resultdata, realop)) + livingthreads.add(asyncio.create_task( + perform_request(operator, node, element, configdata, inputdata, + cfg, resultdata, realop))) while livingthreads: try: bundle = [] - datum = resultdata.get(timeout=10) + datum = await asyncio.wait_for(resultdata.get(), timeout=10.0) while datum: if datum != 'Done': if isinstance(datum, Exception): @@ -283,71 +269,80 @@ def perform_requests(operator, nodes, element, cfg, inputdata, realop): else: yield datum timeout = 0.1 if numnodes else 0.001 - datum = resultdata.get(timeout=timeout) - except queue.Empty: + datum = await asyncio.wait_for(resultdata.get(), timeout=timeout) + except asyncio.QueueEmpty: pass + except asyncio.TimeoutError: + print("odd timeout?" + repr(element) + repr(nodes)) finally: for datum in sorted( bundle, key=lambda x: util.naturalize_string(x[0])): yield datum[1] for t in list(livingthreads): - if t.dead: + if t.done(): livingthreads.discard(t) try: # drain queue if a thread put something on the queue and died while True: - datum = resultdata.get_nowait() + datum = await resultdata.get_nowait() if datum != 'Done': yield datum - except queue.Empty: + except asyncio.QueueEmpty: pass -def perform_request(operator, node, element, - configdata, inputdata, cfg, results, realop): - try: - return IpmiHandler(operator, node, element, configdata, inputdata, - cfg, results, realop).handle_request() - except socket.error as se: - if hasattr(se, 'strerror'): - results.put(msg.ConfluentTargetTimeout(node, se.strerror)) - else: - results.put(msg.ConfluentTargetTimeout(node, str(se))) - except pygexc.IpmiException as ipmiexc: - excmsg = str(ipmiexc) - if excmsg in ('Session no longer connected', 'timeout'): - results.put(msg.ConfluentTargetTimeout(node)) - else: - results.put(msg.ConfluentNodeError(node, excmsg)) - raise - except exc.TargetEndpointUnreachable as tu: - results.put(msg.ConfluentTargetTimeout(node, str(tu))) - except exc.TargetEndpointBadCredentials: - results.put(msg.ConfluentTargetInvalidCredentials(node)) - except ssl.SSLEOFError: - results.put(msg.ConfluentNodeError( - node, 'Unable to communicate with the https server on ' - 'the target BMC')) - except exc.PubkeyInvalid: - results.put(msg.ConfluentNodeError( - node, - 'Mismatch detected between target certificate fingerprint ' - 'and pubkeys.tls_hardwaremanager attribute')) - except (pygexc.InvalidParameterValue, pygexc.RedfishError) as e: - results.put(msg.ConfluentNodeError(node, str(e))) - except Exception as e: - results.put(msg.ConfluentNodeError(node, 'Unexpected Error: {0}'.format(str(e)))) - traceback.print_exc() - finally: - results.put('Done') - if (node, cfg.tenant) in persistent_ipmicmds: - del persistent_ipmicmds[(node, cfg.tenant)] +async def perform_request(operator, node, element, + configdata, inputdata, cfg, results, realop): + try: + ih = await IpmiHandler.create( + operator, node, element, configdata, inputdata, cfg, results, + realop) + return await ih.handle_request() + except socket.error as se: + if hasattr(se, 'strerror'): + results.put(msg.ConfluentTargetTimeout(node, se.strerror)) + else: + results.put(msg.ConfluentTargetTimeout(node, str(se))) + except pygexc.IpmiException as ipmiexc: + excmsg = str(ipmiexc) + if excmsg in ('Session no longer connected', 'timeout'): + results.put(msg.ConfluentTargetTimeout(node)) + else: + results.put(msg.ConfluentNodeError(node, excmsg)) + raise + except exc.TargetEndpointUnreachable as tu: + results.put(msg.ConfluentTargetTimeout(node, str(tu))) + except exc.TargetEndpointBadCredentials: + results.put(msg.ConfluentTargetInvalidCredentials(node)) + except ssl.SSLEOFError: + results.put(msg.ConfluentNodeError( + node, 'Unable to communicate with the https server on ' + 'the target BMC')) + except exc.PubkeyInvalid: + await results.put(msg.ConfluentNodeError( + node, + 'Mismatch detected between target certificate fingerprint ' + 'and pubkeys.tls_hardwaremanager attribute')) + except (pygexc.InvalidParameterValue, pygexc.RedfishError) as e: + results.put(msg.ConfluentNodeError(node, str(e))) + except Exception as e: + await results.put(msg.ConfluentNodeError(node, 'Unexpected Error: {0}'.format(str(e)))) + traceback.print_exc() + finally: + await results.put('Done') + if (node, cfg.tenant) in persistent_ipmicmds: + del persistent_ipmicmds[(node, cfg.tenant)] persistent_ipmicmds = {} -class IpmiHandler(object): - def __init__(self, operation, node, element, cfd, inputdata, cfg, output, - realop): + +class IpmiHandler: + + @classmethod + async def create( + cls, operation, node, element, cfd, inputdata, cfg, output, + realop): + self = cls() self.cfm = cfg self.sensormap = {} self.invmap = {} @@ -355,7 +350,7 @@ class IpmiHandler(object): self.sensorcategory = None self.broken = False self.error = None - eventlet.sleep(0) + await asyncio.sleep(0) self.cfg = cfd[node] self.current_user = cfg.current_user self.loggedin = False @@ -370,11 +365,11 @@ class IpmiHandler(object): tenant = cfg.tenant if (node, tenant) not in persistent_ipmicmds: try: - persistent_ipmicmds[(node, tenant)].close_confluent() + await persistent_ipmicmds[(node, tenant)].close_confluent() except KeyError: # was no previous session pass try: - persistent_ipmicmds[(node, tenant)] = IpmiCommandWrapper( + persistent_ipmicmds[(node, tenant)] = await IpmiCommandWrapper.create( node, cfg, bmc=connparams['bmc'], userid=connparams['username'], password=connparams['passphrase']) @@ -385,13 +380,14 @@ class IpmiHandler(object): raise exc.TargetEndpointUnreachable(ge.strerror) raise self.ipmicmd = persistent_ipmicmds[(node, tenant)] + return self bootdevices = { 'optical': 'cd' } - def handle_request(self): + async def handle_request(self): if self.broken: if (self.error == 'timeout' or 'Insufficient resources' in self.error): @@ -411,7 +407,7 @@ class IpmiHandler(object): else: raise Exception(self.error) if self.element == ['power', 'state']: - self.power() + await self.power() elif self.element == ['_enclosure', 'reseat_bay']: self.reseat_bay() elif self.element == ['boot', 'nextdevice']: @@ -530,23 +526,14 @@ class IpmiHandler(object): raise Exception('Not implemented') def decode_alert(self): - inputdata = self.inputdata.get_alert(self.node) - specifictrap = int(inputdata['.1.3.6.1.6.3.1.1.4.1.0'].rpartition( - '.')[-1]) - for tmpvarbind in inputdata: - if tmpvarbind.endswith('3183.1.1'): - varbinddata = inputdata[tmpvarbind] - varbinddata = hex2bin(varbinddata) - event = self.ipmicmd.decode_pet(specifictrap, varbinddata) - self.pyghmi_event_to_confluent(event) - self.output.put(msg.EventCollection((event,), name=self.node)) + raise Exception("Decode Alert not implemented for redfish") def handle_alerts(self): if self.element[3] == 'destinations': if len(self.element) == 4: # A list of destinations maxdest = self.ipmicmd.get_alert_destination_count() - for alertidx in xrange(0, maxdest + 1): + for alertidx in range(0, maxdest + 1): self.output.put(msg.ChildCollection(alertidx)) return elif len(self.element) == 5: @@ -1242,11 +1229,11 @@ class IpmiHandler(object): self.output.put(msg.IdentifyState(node=self.node, state=identify)) return - def power(self): + async def power(self): if 'read' == self.op: - power = self.ipmicmd.get_power() - self.output.put(msg.PowerState(node=self.node, - state=power['powerstate'])) + power = await self.ipmicmd.get_power() + await self.output.put( + msg.PowerState(node=self.node, state=power['powerstate'])) return elif 'update' == self.op: powerstate = self.inputdata.powerstate(self.node) @@ -1255,15 +1242,15 @@ class IpmiHandler(object): oldpower = self.ipmicmd.get_power() if 'powerstate' in oldpower: oldpower = oldpower['powerstate'] - self.ipmicmd.set_power(powerstate, wait=30) + await self.ipmicmd.set_power(powerstate, wait=30) if powerstate == 'boot' and oldpower == 'on': power = {'powerstate': 'reset'} else: - power = self.ipmicmd.get_power() + power = await self.ipmicmd.get_power() if powerstate == 'reset' and power['powerstate'] == 'on': power['powerstate'] = 'reset' - self.output.put(msg.PowerState(node=self.node, + await self.output.put(msg.PowerState(node=self.node, state=power['powerstate'], oldstate=oldpower)) return diff --git a/confluent_server/confluent/util.py b/confluent_server/confluent/util.py index b78fe13d..5db7c424 100644 --- a/confluent_server/confluent/util.py +++ b/confluent_server/confluent/util.py @@ -243,8 +243,8 @@ class TLSCertVerifier(object): auditlog = log.Logger('audit') auditlog.log({'node': self.node, 'event': 'certautoadd', 'fingerprint': fingerprint}) - self.cfm.set_node_attributes( - {self.node: {self.fieldname: fingerprint}}) + spawn(self.cfm.set_node_attributes( + {self.node: {self.fieldname: fingerprint}})) return True elif cert_matches(storedprint[self.node][self.fieldname]['value'], certificate):