2
0
mirror of https://opendev.org/x/pyghmi synced 2026-01-11 10:42:32 +00:00

Implement firmware fetch categories

This is a cleaner way to model categories, and use
the categories to help optimize XCC fetch of
data by category.

Change-Id: I91131d946671b0ac97732ed34a419fe56e0dd0b8
This commit is contained in:
Jarrod Johnson
2025-08-29 17:14:19 -04:00
parent f56e000a66
commit a79ba0abc7
8 changed files with 153 additions and 123 deletions

View File

@@ -2166,14 +2166,14 @@ class Command(object):
return True
def get_firmware(self, components=()):
def get_firmware(self, components=(), category=None):
"""Retrieve OEM Firmware information"""
self.oem_init()
mcinfo = self.xraw_command(netfn=6, command=1)
major, minor = struct.unpack('BB', mcinfo['data'][2:4])
bmcver = '{0}.{1}'.format(major, hex(minor)[2:])
return self._oem.get_oem_firmware(bmcver, components)
return self._oem.get_oem_firmware(bmcver, components, category)
def get_capping_enabled(self):
"""Get PSU based power capping status

View File

@@ -279,7 +279,7 @@ class OEMHandler(object):
fru['oem_parser'] = None
return fru
def get_oem_firmware(self, bmcver, components):
def get_oem_firmware(self, bmcver, components, category):
"""Get Firmware information."""
# Here the bmc version is passed into the OEM handler, to allow

View File

@@ -842,7 +842,7 @@ class OEMHandler(generic.OEMHandler):
return False
return False
def get_oem_firmware(self, bmcver, components):
def get_oem_firmware(self, bmcver, components, category):
if self.has_tsm or self.has_ami or self.has_asrock:
command = firmware.get_categories()["firmware"]
fw_cmd = self.get_cmd_type("firmware", command)
@@ -865,7 +865,7 @@ class OEMHandler(generic.OEMHandler):
bios_versions,
self.has_asrock)
elif self.has_imm:
return self.immhandler.get_firmware_inventory(bmcver, components)
return self.immhandler.get_firmware_inventory(bmcver, components, category)
elif self.is_fpc:
return nextscale.get_fpc_firmware(bmcver, self.ipmicmd,
self._fpc_variant)

View File

@@ -556,84 +556,86 @@ class IMMClient(object):
def fetch_psu_firmware(self):
return []
def fetch_agentless_firmware(self):
def fetch_agentless_firmware(self, needdisk=True, needadp=True):
skipkeys = set([])
cd = self.get_cached_data('lenovo_cached_adapters_fu')
if cd:
adapterdata, fwu = cd
else:
adapterdata = None
if not adapterdata:
if self.updating:
raise pygexc.TemporaryError(
'Cannot read extended inventory during firmware update')
if self.wc:
adapterdata = self.wc.grab_json_response(
self.ADP_URL, referer=self.adp_referer)
if self.ADP_FU_URL:
fwu = self.wc.grab_json_response(
self.ADP_FU_URL, referer=self.adp_referer)
else:
fwu = {}
if adapterdata:
self.datacache['lenovo_cached_adapters_fu'] = (
(adapterdata, fwu), util._monotonic_time())
if adapterdata and 'items' in adapterdata:
anames = {}
for adata in adapterdata['items']:
aname = adata[self.ADP_NAME]
if aname in anames:
anames[aname] += 1
aname = '{0} {1}'.format(aname, anames[aname])
else:
anames[aname] = 1
donenames = set([])
for fundata in adata[self.ADP_FUN]:
fdata = fundata.get('firmwares', ())
for firm in fdata:
fname = firm['firmwareName'].rstrip()
if '.' in fname:
fname = firm['description'].rstrip()
if fname in donenames:
# ignore redundant entry
continue
if not fname:
continue
donenames.add(fname)
bdata = {}
if 'versionStr' in firm and firm['versionStr']:
bdata['version'] = firm['versionStr']
if ('releaseDate' in firm
and firm['releaseDate']
and firm['releaseDate'] != 'N/A'):
try:
bdata['date'] = self._parse_builddate(
firm['releaseDate'])
except ValueError:
pass
yield '{0} {1}'.format(aname, fname), bdata
for fwi in fwu.get('items', []):
if fwi.get('key', -1) == adata.get('key', -2):
skipkeys.add(fwi['key'])
if fwi.get('fw_status', 0) == 2:
if needadp:
cd = self.get_cached_data('lenovo_cached_adapters_fu')
if cd:
adapterdata, fwu = cd
else:
adapterdata = None
if not adapterdata:
if self.updating:
raise pygexc.TemporaryError(
'Cannot read extended inventory during firmware update')
if self.wc:
adapterdata = self.wc.grab_json_response(
self.ADP_URL, referer=self.adp_referer)
if self.ADP_FU_URL:
fwu = self.wc.grab_json_response(
self.ADP_FU_URL, referer=self.adp_referer)
else:
fwu = {}
if adapterdata:
self.datacache['lenovo_cached_adapters_fu'] = (
(adapterdata, fwu), util._monotonic_time())
if adapterdata and 'items' in adapterdata:
anames = {}
for adata in adapterdata['items']:
aname = adata[self.ADP_NAME]
if aname in anames:
anames[aname] += 1
aname = '{0} {1}'.format(aname, anames[aname])
else:
anames[aname] = 1
donenames = set([])
for fundata in adata[self.ADP_FUN]:
fdata = fundata.get('firmwares', ())
for firm in fdata:
fname = firm['firmwareName'].rstrip()
if '.' in fname:
fname = firm['description'].rstrip()
if fname in donenames:
# ignore redundant entry
continue
if not fname:
continue
donenames.add(fname)
bdata = {}
if 'fw_pkg_version' in fwi and fwi['fw_pkg_version']:
bdata['version'] = fwi['fw_pkg_version']
elif 'fw_version_pend' in fwi:
bdata['version'] = fwi['fw_version_pend']
yield '{0} Pending Update'.format(aname), bdata
for fwi in fwu.get('items', []):
if fwi.get('key', -1) > 0 and fwi['key'] not in skipkeys:
bdata = {}
bdata['version'] = fwi['fw_version']
yield fwi['adapterName'], bdata
if fwi.get('fw_status', 0) == 2:
if 'versionStr' in firm and firm['versionStr']:
bdata['version'] = firm['versionStr']
if ('releaseDate' in firm
and firm['releaseDate']
and firm['releaseDate'] != 'N/A'):
try:
bdata['date'] = self._parse_builddate(
firm['releaseDate'])
except ValueError:
pass
yield '{0} {1}'.format(aname, fname), bdata
for fwi in fwu.get('items', []):
if fwi.get('key', -1) == adata.get('key', -2):
skipkeys.add(fwi['key'])
if fwi.get('fw_status', 0) == 2:
bdata = {}
if 'fw_pkg_version' in fwi and fwi['fw_pkg_version']:
bdata['version'] = fwi['fw_pkg_version']
elif 'fw_version_pend' in fwi:
bdata['version'] = fwi['fw_version_pend']
yield '{0} Pending Update'.format(aname), bdata
for fwi in fwu.get('items', []):
if fwi.get('key', -1) > 0 and fwi['key'] not in skipkeys:
bdata = {}
if 'fw_version_pend' in fwi:
bdata['version'] = fwi['fw_version_pend']
yield '{0} Pending Update'.format(fwi['adapterName']), bdata
for disk in self.disk_inventory():
yield disk
bdata['version'] = fwi['fw_version']
yield fwi['adapterName'], bdata
if fwi.get('fw_status', 0) == 2:
bdata = {}
if 'fw_version_pend' in fwi:
bdata['version'] = fwi['fw_version_pend']
yield '{0} Pending Update'.format(fwi['adapterName']), bdata
if needdisk:
for disk in self.disk_inventory():
yield disk
self.weblogout()
def disk_inventory(self, mode=0):
@@ -829,11 +831,13 @@ class IMMClient(object):
self.weblogout()
return hwmap
def get_firmware_inventory(self, bmcver, components):
def get_firmware_inventory(self, bmcver, components, category):
# First we fetch the system firmware found in imm properties
# then check for agentless, if agentless, get adapter info using
# https, using the caller TLS verification scheme
components = set(components)
if 'core' in components:
category = 'core'
if not components or set(('imm', 'xcc', 'bmc', 'core')) & components:
rsp = self.ipmicmd.xraw_command(netfn=0x3a, command=0x50)
immverdata = self.parse_imm_buildinfo(rsp['data'])
@@ -1811,12 +1815,16 @@ class XCCClient(IMMClient):
{'model': psu['model'],
'version': psu['version']})
def get_firmware_inventory(self, bmcver, components):
def get_firmware_inventory(self, bmcver, components, category):
# First we fetch the system firmware found in imm properties
# then check for agentless, if agentless, get adapter info using
# https, using the caller TLS verification scheme
if 'core' in components:
category = 'core'
if not category:
category = 'all'
components = set(components)
if (not components
if category in ('all', 'core') and (not components
or set(('core', 'imm', 'bmc', 'xcc')) & components):
rsp = self.ipmicmd.xraw_command(netfn=0x3a, command=0x50)
immverdata = self.parse_imm_buildinfo(rsp['data'])
@@ -1854,7 +1862,7 @@ class XCCClient(IMMClient):
'date': '/v2/ibmc/dm/fw/imm3/primary_pending_build_date'})
if bdata:
yield '{0} Pending Update'.format(self.bmcname), bdata
if not components or set(('core', 'uefi', 'bios')) & components:
if category in ('all', 'core') and not components or set(('core', 'uefi', 'bios')) & components:
bdata = self.fetch_grouped_properties({
'build': '/v2/bios/build_id',
'version': '/v2/bios/build_version',
@@ -1867,7 +1875,7 @@ class XCCClient(IMMClient):
'build': '/v2/bios/pending_build_id'})
if bdata:
yield 'UEFI Pending Update', bdata
if not components or set(('lxpm', 'core')) & components:
if category in ('all', 'core') and not components or set(('lxpm', 'core')) & components:
bdata = self.fetch_grouped_properties({
'build': '/v2/tdm/build_id',
'version': '/v2/tdm/build_version',
@@ -1888,7 +1896,7 @@ class XCCClient(IMMClient):
})
if bdata:
yield 'LXPM Linux Driver Bundle', bdata
if not components or set(('lxum', 'core')):
if category in ('all', 'core') and (not components or set(('lxum', 'core'))):
sysinf = self.wc.grab_json_response('/api/dataset/sys_info')
for item in sysinf.get('items', {}):
for firm in item.get('firmware', []):
@@ -1899,7 +1907,7 @@ class XCCClient(IMMClient):
}
if firm['type'] == 10:
yield ('LXUM', firminfo)
if not components or set(('core', 'fpga')) in components:
if category in ('all', 'core') and (not components or set(('core', 'fpga')) in components):
try:
fpga = self.ipmicmd.xraw_command(netfn=0x3a, command=0x6b,
data=(0,))
@@ -1909,11 +1917,12 @@ class XCCClient(IMMClient):
except pygexc.IpmiException as ie:
if ie.ipmicode != 193:
raise
if (not components or components - set((
'core', 'uefi', 'bios', 'xcc', 'bmc', 'imm', 'fpga',
'lxpm'))):
for firm in self.fetch_agentless_firmware():
yield firm
needdiskfirmware = category in ('all', 'disks')
needadapterfirmware = category in ('all', 'adapters')
needpsufirmware = category in ('all', 'misc')
for firm in self.fetch_agentless_firmware(needdisk=needdiskfirmware, needadp=needadapterfirmware):
yield firm
if needpsufirmware:
for firm in self.fetch_psu_firmware():
yield firm

View File

@@ -1111,10 +1111,10 @@ class Command(object):
self._do_web_request(self._bmcnicurl,
{'HostName': hostname}, 'PATCH')
def get_firmware(self, components=()):
def get_firmware(self, components=(), category=None):
self._fwnamemap = {}
try:
for firminfo in self.oem.get_firmware_inventory(components, self):
for firminfo in self.oem.get_firmware_inventory(components, self, category):
yield firminfo
except exc.BypassGenericBehavior:
return

View File

@@ -667,7 +667,7 @@ class OEMHandler(object):
def _extract_fwinfo(self, inf):
return {}
def get_firmware_inventory(self, components, fishclient):
def get_firmware_inventory(self, components, fishclient, category=None):
return []
def set_credentials(self, username, password):

View File

@@ -699,33 +699,54 @@ class OEMHandler(generic.OEMHandler):
for diskent in adp.get('aimDisks', ()):
yield self._get_disk_firmware_single(diskent)
def get_firmware_inventory(self, components, fishclient):
sysinf = self.wc.grab_json_response('/api/dataset/sys_info')
for item in sysinf.get('items', {}):
for firm in item.get('firmware', []):
firminfo = {
'version': firm['version'],
'build': firm['build'],
'date': parse_time(firm['release_date']),
}
if firm['type'] == 5:
yield ('XCC', firminfo)
elif firm['type'] == 6:
yield ('XCC Backup', firminfo)
elif firm['type'] == 0:
yield ('UEFI', firminfo)
elif firm['type'] == 7:
yield ('LXPM', firminfo)
elif firm['type'] == 8:
yield ('LXPM Windows Driver Bundle', firminfo)
elif firm['type'] == 9:
yield ('LXPM Linux Driver Bundle', firminfo)
elif firm['type'] == 10:
yield ('LXUM', firminfo)
for adpinfo in self._get_agentless_firmware(components):
yield adpinfo
for adpinfo in self._get_disk_firmware(components):
yield adpinfo
def get_firmware_inventory(self, components, fishclient, category=None):
if components and 'core' in components:
category = 'core'
components = components - set(['core'])
if not category:
category = 'all'
if category in ('all', 'core'):
sysinf = self.wc.grab_json_response('/api/dataset/sys_info')
for item in sysinf.get('items', {}):
for firm in item.get('firmware', []):
firminfo = {
'version': firm['version'],
'build': firm['build'],
'date': parse_time(firm['release_date']),
}
if firm['type'] == 5:
yield ('XCC', firminfo)
elif firm['type'] == 6:
yield ('XCC Backup', firminfo)
elif firm['type'] == 0:
yield ('UEFI', firminfo)
elif firm['type'] == 7:
yield ('LXPM', firminfo)
elif firm['type'] == 8:
yield ('LXPM Windows Driver Bundle', firminfo)
elif firm['type'] == 9:
yield ('LXPM Linux Driver Bundle', firminfo)
elif firm['type'] == 10:
yield ('LXUM', firminfo)
if components:
components = components - set((
'core', 'uefi', 'bios', 'xcc', 'bmc', 'imm', 'fpga',
'lxpm'))
if not components:
return
if not components:
components = set(('all',))
if category in ('all', 'adapters'):
needadapterfirmware = True
if category in ('all', 'disks'):
needdiskfirmware = True
if needadapterfirmware:
for adpinfo in self._get_agentless_firmware(components):
yield adpinfo
if needdiskfirmware:
for adpinfo in self._get_disk_firmware(components):
yield adpinfo
raise pygexc.BypassGenericBehavior()
def get_storage_configuration(self, logout=True):

View File

@@ -1122,7 +1122,7 @@ class OEMHandler(generic.OEMHandler):
if progress:
progress({'phase': 'complete'})
def get_firmware_inventory(self, components, fishclient):
def get_firmware_inventory(self, components, fishclient, category):
sfs = fishclient._do_web_request('/api/providers/system_firmware_status')
pendingscm = sfs.get('fpga_scm_pending_build', None)
pendinghpm = sfs.get('fpga_hpm_pending_build', None)