2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-07 01:11:29 +00:00

Get basic redfish running in asyncio

This commit is contained in:
Jarrod Johnson
2024-08-13 15:28:52 -04:00
parent db670b695f
commit 45b17ba855
2 changed files with 99 additions and 112 deletions

View File

@@ -13,24 +13,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import confluent.exceptions as exc
import confluent.firmwaremanager as firmwaremanager
import confluent.messages as msg
import confluent.util as util
import copy
import errno
import eventlet
import eventlet.event
import eventlet.greenpool as greenpool
import eventlet.queue as queue
import eventlet.support.greendns
from fnmatch import fnmatch
import os
import pwd
import pyghmi.constants as pygconstants
import pyghmi.exceptions as pygexc
import pyghmi.storage as storage
ipmicommand = eventlet.import_patched('pyghmi.redfish.command')
import aiohmi.constants as pygconstants
import aiohmi.exceptions as pygexc
import aiohmi.storage as storage
import aiohmi.redfish.command as ipmicommand
import socket
import ssl
import traceback
@@ -41,8 +37,9 @@ if not hasattr(ssl, 'SSLEOFError'):
pci_cache = {}
def get_dns_txt(qstring):
return eventlet.support.greendns.resolver.query(
qstring, 'TXT')[0].strings[0].replace('i=', '')
return None
# return eventlet.support.greendns.resolver.query(
# qstring, 'TXT')[0].strings[0].replace('i=', '')
def get_pci_text_from_ids(subdevice, subvendor, device, vendor):
fqpi = '{0}.{1}.{2}.{3}'.format(subdevice, subvendor, device, vendor)
@@ -87,10 +84,6 @@ class NullLock(object):
acquire = donothing
release = donothing
_ipmiworkers = greenpool.GreenPool()
_ipmithread = None
_ipmiwaiters = []
sensor_categories = {
'temperature': frozenset(['Temperature']),
@@ -114,7 +107,7 @@ def hex2bin(hexstring):
if len(hexvals) < 2:
hexvals = hexstring.split(' ')
if len(hexvals) < 2:
hexvals = [hexstring[i:i+2] for i in xrange(0, len(hexstring), 2)]
hexvals = [hexstring[i:i+2] for i in range(0, len(hexstring), 2)]
bytedata = [int(i, 16) for i in hexvals]
return bytearray(bytedata)
@@ -151,7 +144,12 @@ def sanitize_invdata(indata):
class IpmiCommandWrapper(ipmicommand.Command):
def __init__(self, node, cfm, **kwargs):
@classmethod
async def create(cls, node, cfm, **kwargs):
kv = util.TLSCertVerifier(
cfm, node, 'pubkeys.tls_hardwaremanager').verify_cert
kwargs['verifycallback'] = kv
self = await super().create(**kwargs)
#kwargs['pool'] = eventlet.greenpool.GreenPool(4)
#Some BMCs at the time of this writing crumble under the weight
#of 4 concurrent requests. For now give up on this optimization.
@@ -163,11 +161,9 @@ class IpmiCommandWrapper(ipmicommand.Command):
(node,), ('secret.hardwaremanagementuser', 'collective.manager',
'secret.hardwaremanagementpassword',
'hardwaremanagement.manager'), self._attribschanged)
kv = util.TLSCertVerifier(cfm, node,
'pubkeys.tls_hardwaremanager').verify_cert
kwargs['verifycallback'] = kv
try:
super(IpmiCommandWrapper, self).__init__(**kwargs)
pass
except socket.error as se:
if (hasattr(se, 'errno')
and se.errno in (errno.ENETUNREACH, errno.EHOSTUNREACH, errno.EADDRNOTAVAIL)):
@@ -184,6 +180,7 @@ class IpmiCommandWrapper(ipmicommand.Command):
if 'Redfish not ready' in str(pe):
raise exc.TargetEndpointUnreachable('Redfish is not supported by this system or is not yet ready')
raise
return self
def close_confluent(self):
if self._attribwatcher:
@@ -196,10 +193,10 @@ class IpmiCommandWrapper(ipmicommand.Command):
except KeyError:
pass
def get_health(self):
async def get_health(self):
if self._inhealth:
while self._inhealth:
eventlet.sleep(0.1)
await asyncio.sleep(0.1)
return self._lasthealth
self._inhealth = True
try:
@@ -211,17 +208,6 @@ class IpmiCommandWrapper(ipmicommand.Command):
return self._lasthealth
def _ipmi_evtloop():
while True:
try:
console.session.Session.wait_for_rsp(timeout=600)
while _ipmiwaiters:
waiter = _ipmiwaiters.pop()
waiter.send()
except: # TODO(jbjohnso): log the trace into the log
traceback.print_exc()
def get_conn_params(node, configdata):
if 'secret.hardwaremanagementuser' in configdata:
username = configdata['secret.hardwaremanagementuser']['value']
@@ -256,22 +242,22 @@ def _donothing(data):
pass
def perform_requests(operator, nodes, element, cfg, inputdata, realop):
async def perform_requests(operator, nodes, element, cfg, inputdata, realop):
cryptit = cfg.decrypt
cfg.decrypt = True
configdata = cfg.get_node_attributes(nodes, _configattributes)
cfg.decrypt = cryptit
resultdata = queue.LightQueue()
resultdata = asyncio.Queue()
livingthreads = set([])
numnodes = len(nodes)
for node in nodes:
livingthreads.add(_ipmiworkers.spawn(
perform_request, operator, node, element, configdata, inputdata,
cfg, resultdata, realop))
livingthreads.add(asyncio.create_task(
perform_request(operator, node, element, configdata, inputdata,
cfg, resultdata, realop)))
while livingthreads:
try:
bundle = []
datum = resultdata.get(timeout=10)
datum = await asyncio.wait_for(resultdata.get(), timeout=10.0)
while datum:
if datum != 'Done':
if isinstance(datum, Exception):
@@ -283,71 +269,80 @@ def perform_requests(operator, nodes, element, cfg, inputdata, realop):
else:
yield datum
timeout = 0.1 if numnodes else 0.001
datum = resultdata.get(timeout=timeout)
except queue.Empty:
datum = await asyncio.wait_for(resultdata.get(), timeout=timeout)
except asyncio.QueueEmpty:
pass
except asyncio.TimeoutError:
print("odd timeout?" + repr(element) + repr(nodes))
finally:
for datum in sorted(
bundle, key=lambda x: util.naturalize_string(x[0])):
yield datum[1]
for t in list(livingthreads):
if t.dead:
if t.done():
livingthreads.discard(t)
try:
# drain queue if a thread put something on the queue and died
while True:
datum = resultdata.get_nowait()
datum = await resultdata.get_nowait()
if datum != 'Done':
yield datum
except queue.Empty:
except asyncio.QueueEmpty:
pass
def perform_request(operator, node, element,
configdata, inputdata, cfg, results, realop):
try:
return IpmiHandler(operator, node, element, configdata, inputdata,
cfg, results, realop).handle_request()
except socket.error as se:
if hasattr(se, 'strerror'):
results.put(msg.ConfluentTargetTimeout(node, se.strerror))
else:
results.put(msg.ConfluentTargetTimeout(node, str(se)))
except pygexc.IpmiException as ipmiexc:
excmsg = str(ipmiexc)
if excmsg in ('Session no longer connected', 'timeout'):
results.put(msg.ConfluentTargetTimeout(node))
else:
results.put(msg.ConfluentNodeError(node, excmsg))
raise
except exc.TargetEndpointUnreachable as tu:
results.put(msg.ConfluentTargetTimeout(node, str(tu)))
except exc.TargetEndpointBadCredentials:
results.put(msg.ConfluentTargetInvalidCredentials(node))
except ssl.SSLEOFError:
results.put(msg.ConfluentNodeError(
node, 'Unable to communicate with the https server on '
'the target BMC'))
except exc.PubkeyInvalid:
results.put(msg.ConfluentNodeError(
node,
'Mismatch detected between target certificate fingerprint '
'and pubkeys.tls_hardwaremanager attribute'))
except (pygexc.InvalidParameterValue, pygexc.RedfishError) as e:
results.put(msg.ConfluentNodeError(node, str(e)))
except Exception as e:
results.put(msg.ConfluentNodeError(node, 'Unexpected Error: {0}'.format(str(e))))
traceback.print_exc()
finally:
results.put('Done')
if (node, cfg.tenant) in persistent_ipmicmds:
del persistent_ipmicmds[(node, cfg.tenant)]
async def perform_request(operator, node, element,
configdata, inputdata, cfg, results, realop):
try:
ih = await IpmiHandler.create(
operator, node, element, configdata, inputdata, cfg, results,
realop)
return await ih.handle_request()
except socket.error as se:
if hasattr(se, 'strerror'):
results.put(msg.ConfluentTargetTimeout(node, se.strerror))
else:
results.put(msg.ConfluentTargetTimeout(node, str(se)))
except pygexc.IpmiException as ipmiexc:
excmsg = str(ipmiexc)
if excmsg in ('Session no longer connected', 'timeout'):
results.put(msg.ConfluentTargetTimeout(node))
else:
results.put(msg.ConfluentNodeError(node, excmsg))
raise
except exc.TargetEndpointUnreachable as tu:
results.put(msg.ConfluentTargetTimeout(node, str(tu)))
except exc.TargetEndpointBadCredentials:
results.put(msg.ConfluentTargetInvalidCredentials(node))
except ssl.SSLEOFError:
results.put(msg.ConfluentNodeError(
node, 'Unable to communicate with the https server on '
'the target BMC'))
except exc.PubkeyInvalid:
await results.put(msg.ConfluentNodeError(
node,
'Mismatch detected between target certificate fingerprint '
'and pubkeys.tls_hardwaremanager attribute'))
except (pygexc.InvalidParameterValue, pygexc.RedfishError) as e:
results.put(msg.ConfluentNodeError(node, str(e)))
except Exception as e:
await results.put(msg.ConfluentNodeError(node, 'Unexpected Error: {0}'.format(str(e))))
traceback.print_exc()
finally:
await results.put('Done')
if (node, cfg.tenant) in persistent_ipmicmds:
del persistent_ipmicmds[(node, cfg.tenant)]
persistent_ipmicmds = {}
class IpmiHandler(object):
def __init__(self, operation, node, element, cfd, inputdata, cfg, output,
realop):
class IpmiHandler:
@classmethod
async def create(
cls, operation, node, element, cfd, inputdata, cfg, output,
realop):
self = cls()
self.cfm = cfg
self.sensormap = {}
self.invmap = {}
@@ -355,7 +350,7 @@ class IpmiHandler(object):
self.sensorcategory = None
self.broken = False
self.error = None
eventlet.sleep(0)
await asyncio.sleep(0)
self.cfg = cfd[node]
self.current_user = cfg.current_user
self.loggedin = False
@@ -370,11 +365,11 @@ class IpmiHandler(object):
tenant = cfg.tenant
if (node, tenant) not in persistent_ipmicmds:
try:
persistent_ipmicmds[(node, tenant)].close_confluent()
await persistent_ipmicmds[(node, tenant)].close_confluent()
except KeyError: # was no previous session
pass
try:
persistent_ipmicmds[(node, tenant)] = IpmiCommandWrapper(
persistent_ipmicmds[(node, tenant)] = await IpmiCommandWrapper.create(
node, cfg, bmc=connparams['bmc'],
userid=connparams['username'],
password=connparams['passphrase'])
@@ -385,13 +380,14 @@ class IpmiHandler(object):
raise exc.TargetEndpointUnreachable(ge.strerror)
raise
self.ipmicmd = persistent_ipmicmds[(node, tenant)]
return self
bootdevices = {
'optical': 'cd'
}
def handle_request(self):
async def handle_request(self):
if self.broken:
if (self.error == 'timeout' or
'Insufficient resources' in self.error):
@@ -411,7 +407,7 @@ class IpmiHandler(object):
else:
raise Exception(self.error)
if self.element == ['power', 'state']:
self.power()
await self.power()
elif self.element == ['_enclosure', 'reseat_bay']:
self.reseat_bay()
elif self.element == ['boot', 'nextdevice']:
@@ -530,23 +526,14 @@ class IpmiHandler(object):
raise Exception('Not implemented')
def decode_alert(self):
inputdata = self.inputdata.get_alert(self.node)
specifictrap = int(inputdata['.1.3.6.1.6.3.1.1.4.1.0'].rpartition(
'.')[-1])
for tmpvarbind in inputdata:
if tmpvarbind.endswith('3183.1.1'):
varbinddata = inputdata[tmpvarbind]
varbinddata = hex2bin(varbinddata)
event = self.ipmicmd.decode_pet(specifictrap, varbinddata)
self.pyghmi_event_to_confluent(event)
self.output.put(msg.EventCollection((event,), name=self.node))
raise Exception("Decode Alert not implemented for redfish")
def handle_alerts(self):
if self.element[3] == 'destinations':
if len(self.element) == 4:
# A list of destinations
maxdest = self.ipmicmd.get_alert_destination_count()
for alertidx in xrange(0, maxdest + 1):
for alertidx in range(0, maxdest + 1):
self.output.put(msg.ChildCollection(alertidx))
return
elif len(self.element) == 5:
@@ -1242,11 +1229,11 @@ class IpmiHandler(object):
self.output.put(msg.IdentifyState(node=self.node, state=identify))
return
def power(self):
async def power(self):
if 'read' == self.op:
power = self.ipmicmd.get_power()
self.output.put(msg.PowerState(node=self.node,
state=power['powerstate']))
power = await self.ipmicmd.get_power()
await self.output.put(
msg.PowerState(node=self.node, state=power['powerstate']))
return
elif 'update' == self.op:
powerstate = self.inputdata.powerstate(self.node)
@@ -1255,15 +1242,15 @@ class IpmiHandler(object):
oldpower = self.ipmicmd.get_power()
if 'powerstate' in oldpower:
oldpower = oldpower['powerstate']
self.ipmicmd.set_power(powerstate, wait=30)
await self.ipmicmd.set_power(powerstate, wait=30)
if powerstate == 'boot' and oldpower == 'on':
power = {'powerstate': 'reset'}
else:
power = self.ipmicmd.get_power()
power = await self.ipmicmd.get_power()
if powerstate == 'reset' and power['powerstate'] == 'on':
power['powerstate'] = 'reset'
self.output.put(msg.PowerState(node=self.node,
await self.output.put(msg.PowerState(node=self.node,
state=power['powerstate'],
oldstate=oldpower))
return

View File

@@ -243,8 +243,8 @@ class TLSCertVerifier(object):
auditlog = log.Logger('audit')
auditlog.log({'node': self.node, 'event': 'certautoadd',
'fingerprint': fingerprint})
self.cfm.set_node_attributes(
{self.node: {self.fieldname: fingerprint}})
spawn(self.cfm.set_node_attributes(
{self.node: {self.fieldname: fingerprint}}))
return True
elif cert_matches(storedprint[self.node][self.fieldname]['value'],
certificate):