From 57fe186a10016eef450b70b4a1ccf86ce8af7c4e Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 17 Mar 2026 16:43:41 -0400 Subject: [PATCH] Rework redfish to be more async --- .../plugins/hardwaremanagement/redfish.py | 265 +++++++++--------- 1 file changed, 133 insertions(+), 132 deletions(-) diff --git a/confluent_server/confluent/plugins/hardwaremanagement/redfish.py b/confluent_server/confluent/plugins/hardwaremanagement/redfish.py index d52db613..91f266f6 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/redfish.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/redfish.py @@ -210,7 +210,7 @@ class IpmiCommandWrapper(ipmicommand.Command): return self._lasthealth self._inhealth = True try: - self._lasthealth = super(IpmiCommandWrapper, self).get_health() + self._lasthealth = await super(IpmiCommandWrapper, self).get_health() except Exception as e: self._inhealth = False raise @@ -419,13 +419,13 @@ class IpmiHandler: if self.element == ['power', 'state']: await self.power() elif self.element == ['_enclosure', 'reseat_bay']: - self.reseat_bay() + await self.reseat_bay() elif self.element == ['boot', 'nextdevice']: - self.bootdevice() + await self.bootdevice() elif self.element == ['health', 'hardware']: - self.health() + await self.health() elif self.element == ['identify']: - self.identify() + await self.identify() elif self.element[0] == 'sensors': await self.handle_sensors() elif self.element[:2] == ['configuration', 'storage']: @@ -449,9 +449,9 @@ class IpmiHandler: elif self.element == ['events', 'hardware', 'log']: await self.do_eventlog() elif self.element == ['events', 'hardware', 'decode']: - self.decode_alert() + await self.decode_alert() elif self.element == ['console', 'license']: - self.handle_license() + await self.handle_license() elif self.element == ['support', 'servicedata']: await self.handle_servicedata_fetch() elif self.element == ['description']: @@ -1025,18 +1025,18 @@ class IpmiHandler: newinf['name'] = dstr invitems.append(newinf) - def handle_storage(self): + async def handle_storage(self): if self.element[-1] == '': self.element = self.element[:-1] storelem = self.element[2:] if 'read' == self.op: - return self._show_storage(storelem) + return await self._show_storage(storelem) elif 'update' == self.realop: - return self._update_storage(storelem) + return await self._update_storage(storelem) elif 'delete' == self.op: - return self._delete_storage(storelem) + return await self._delete_storage(storelem) elif 'create' == self.realop: - return self._create_storage(storelem) + return await self._create_storage(storelem) async def _delete_storage(self, storelem): if len(storelem) < 2: @@ -1044,7 +1044,7 @@ class IpmiHandler: if len(storelem) < 2 or storelem[0] != 'volumes': raise exc.InvalidArgumentException('Must target a specific volume') volname = storelem[-1] - curr = self.ipmicmd.get_storage_configuration() + curr = await self.ipmicmd.get_storage_configuration() volumes = [] volsfound = False toremove = storage.ConfigSpec(arrays=[storage.Array(volumes=volumes)]) @@ -1115,23 +1115,23 @@ class IpmiHandler: raise exc.InvalidArgumentException('Must target a disk') self.set_disk(storelem[-1], self.inputdata.inputbynode[self.node]) - self._show_storage(storelem) + await self._show_storage(storelem) - def _show_storage(self, storelem): + async def _show_storage(self, storelem): if storelem[0] == 'disks': if len(storelem) == 1: - return self.list_disks() - return self.show_disk(storelem[1]) + return await self.list_disks() + return await self.show_disk(storelem[1]) elif storelem[0] == 'arrays': if len(storelem) == 1: - return self.list_arrays() - return self.show_array(storelem[1]) + return await self.list_arrays() + return await self.show_array(storelem[1]) elif storelem[0] == 'volumes': if len(storelem) == 1: - return self.list_volumes() - return self.show_volume(storelem[1]) + return await self.list_volumes() + return await self.show_volume(storelem[1]) elif storelem[0] == 'all': - return self._show_all_storage() + return await self._show_all_storage() def handle_sensors(self): @@ -1159,42 +1159,42 @@ class IpmiHandler: return True return False - def set_disk(self, name, state): - scfg = self.ipmicmd.get_storage_configuration() + async def set_disk(self, name, state): + scfg = await self.ipmicmd.get_storage_configuration() for disk in scfg.disks: if (name == 'all' or simplify_name(disk.name) == name or disk == name): disk.status = state - self.ipmicmd.apply_storage_configuration( + await self.ipmicmd.apply_storage_configuration( storage.ConfigSpec(disks=scfg.disks)) - def _show_all_storage(self): - scfg = self.ipmicmd.get_storage_configuration() + async def _show_all_storage(self): + scfg = await self.ipmicmd.get_storage_configuration() for disk in scfg.disks: - self.output.put( + await self.output.put( msg.Disk(self.node, disk.name, disk.description, disk.id, disk.status, disk.serial, disk.fru)) for arr in scfg.arrays: for disk in arr.disks: - self.output.put( + await self.output.put( msg.Disk(self.node, disk.name, disk.description, disk.id, disk.status, disk.serial, disk.fru, array='{0}-{1}'.format(*arr.id))) for disk in arr.hotspares: - self.output.put( + await self.output.put( msg.Disk(self.node, disk.name, disk.description, disk.id, disk.status, disk.serial, disk.fru, array='{0}-{1}'.format(*arr.id))) for arr in scfg.arrays: arrname = '{0}-{1}'.format(*arr.id) - self._detail_array(arr, arrname, True) + await self._detail_array(arr, arrname, True) - def show_disk(self, name): - scfg = self.ipmicmd.get_storage_configuration() + async def show_disk(self, name): + scfg = await self.ipmicmd.get_storage_configuration() for disk in scfg.disks: if simplify_name(disk.name) == name or disk == name: - self.output.put( + await self.output.put( msg.Disk(self.node, disk.name, disk.description, disk.id, disk.status, disk.serial, disk.fru)) @@ -1203,41 +1203,41 @@ class IpmiHandler: for disk in arr.disks: if (name == 'all' or simplify_name(disk.name) == name or disk == name): - self.output.put( + await self.output.put( msg.Disk(self.node, disk.name, disk.description, disk.id, disk.status, disk.serial, disk.fru, arrname)) for disk in arr.hotspares: if (name == 'all' or simplify_name(disk.name) == name or disk == name): - self.output.put( + await self.output.put( msg.Disk(self.node, disk.name, disk.description, disk.id, disk.status, disk.serial, disk.fru, arrname)) - def list_disks(self): - scfg = self.ipmicmd.get_storage_configuration() + async def list_disks(self): + scfg = await self.ipmicmd.get_storage_configuration() for disk in scfg.disks: - self.output.put(msg.ChildCollection(simplify_name(disk.name))) + await self.output.put(msg.ChildCollection(simplify_name(disk.name))) for arr in scfg.arrays: for disk in arr.disks: - self.output.put(msg.ChildCollection(simplify_name(disk.name))) + await self.output.put(msg.ChildCollection(simplify_name(disk.name))) for disk in arr.hotspares: - self.output.put(msg.ChildCollection(simplify_name(disk.name))) + await self.output.put(msg.ChildCollection(simplify_name(disk.name))) - def list_arrays(self): - scfg = self.ipmicmd.get_storage_configuration() + async def list_arrays(self): + scfg = await self.ipmicmd.get_storage_configuration() for arr in scfg.arrays: - self.output.put(msg.ChildCollection('{0}-{1}'.format(*arr.id))) + await self.output.put(msg.ChildCollection('{0}-{1}'.format(*arr.id))) - def show_array(self, name): - scfg = self.ipmicmd.get_storage_configuration() + async def show_array(self, name): + scfg = await self.ipmicmd.get_storage_configuration() for arr in scfg.arrays: arrname = '{0}-{1}'.format(*arr.id) if arrname == name: - self._detail_array(arr, arrname) + await self._detail_array(arr, arrname) - def _detail_array(self, arr, arrname, detailvol=False): + async def _detail_array(self, arr, arrname, detailvol=False): vols = [] for vol in arr.volumes: vols.append(simplify_name(vol.name)) @@ -1246,69 +1246,69 @@ class IpmiHandler: disks.append(simplify_name(disk.name)) for disk in arr.hotspares: disks.append(simplify_name(disk.name)) - self.output.put(msg.Array(self.node, disks, arr.raid, + await self.output.put(msg.Array(self.node, disks, arr.raid, vols, arrname, arr.capacity, arr.available_capacity)) if detailvol: for vol in arr.volumes: - self.output.put(msg.Volume(self.node, vol.name, vol.size, + await self.output.put(msg.Volume(self.node, vol.name, vol.size, vol.status, arrname)) - def show_volume(self, name): - scfg = self.ipmicmd.get_storage_configuration() + async def show_volume(self, name): + scfg = await self.ipmicmd.get_storage_configuration() for arr in scfg.arrays: arrname = '{0}-{1}'.format(*arr.id) for vol in arr.volumes: if name == simplify_name(vol.name): - self.output.put(msg.Volume(self.node, vol.name, vol.size, + await self.output.put(msg.Volume(self.node, vol.name, vol.size, vol.status, arrname)) - def list_volumes(self): - scfg = self.ipmicmd.get_storage_configuration() + async def list_volumes(self): + scfg = await self.ipmicmd.get_storage_configuration() for arr in scfg.arrays: for vol in arr.volumes: - self.output.put(msg.ChildCollection(simplify_name(vol.name))) + await self.output.put(msg.ChildCollection(simplify_name(vol.name))) - def list_sensors(self): + async def list_sensors(self): try: - sensors = self.ipmicmd.get_sensor_descriptions() + sensors = await self.ipmicmd.get_sensor_descriptions() except pygexc.IpmiException: - self.output.put(msg.ConfluentTargetTimeout(self.node)) + await self.output.put(msg.ConfluentTargetTimeout(self.node)) return - self.output.put(msg.ChildCollection('all')) + await self.output.put(msg.ChildCollection('all')) for sensor in filter(self.match_sensor, sensors): - self.output.put(msg.ChildCollection(simplify_name(sensor['name']))) + await self.output.put(msg.ChildCollection(simplify_name(sensor['name']))) - def health(self): + async def health(self): if 'read' == self.op: try: - response = self.ipmicmd.get_health() + response = await self.ipmicmd.get_health() except pygexc.IpmiException: - self.output.put(msg.ConfluentTargetTimeout(self.node)) + await self.output.put(msg.ConfluentTargetTimeout(self.node)) return health = response['health'] health = _str_health(health) - self.output.put(msg.HealthSummary(health, self.node)) + await self.output.put(msg.HealthSummary(health, self.node)) badsensors = [] for reading in response.get('badreadings', []): if hasattr(reading, 'health'): reading.health = _str_health(reading.health) badsensors.append(reading) - self.output.put(msg.SensorReadings(badsensors, name=self.node)) + await self.output.put(msg.SensorReadings(badsensors, name=self.node)) else: raise exc.InvalidArgumentException('health is read-only') - def reseat_bay(self): + async def reseat_bay(self): bay = self.inputdata.inputbynode[self.node] try: - self.ipmicmd.reseat_bay(bay) - self.output.put(msg.ReseatResult(self.node, 'success')) + await self.ipmicmd.reseat_bay(bay) + await self.output.put(msg.ReseatResult(self.node, 'success')) except pygexc.UnsupportedFunctionality as uf: - self.output.put(uf) + await self.output.put(uf) - def bootdevice(self): + async def bootdevice(self): if 'read' == self.op: - bootdev = self.ipmicmd.get_bootdev() + bootdev = await self.ipmicmd.get_bootdev() if bootdev['bootdev'] in self.bootdevices: bootdev['bootdev'] = self.bootdevices[bootdev['bootdev']] bootmode = 'unspecified' @@ -1320,7 +1320,7 @@ class IpmiHandler: persistent = False if 'persistent' in bootdev: persistent = bootdev['persistent'] - self.output.put(msg.BootDevice(node=self.node, + await self.output.put(msg.BootDevice(node=self.node, device=bootdev['bootdev'], bootmode=bootmode, persistent=persistent)) @@ -1331,24 +1331,25 @@ class IpmiHandler: if self.inputdata.bootmode(self.node) == 'uefi': douefi = True persistent = self.inputdata.persistent(self.node) - bootdev = self.ipmicmd.set_bootdev(bootdev, uefiboot=douefi, + bootdev = await self.ipmicmd.set_bootdev(bootdev, uefiboot=douefi, persist=persistent) if bootdev['bootdev'] in self.bootdevices: bootdev['bootdev'] = self.bootdevices[bootdev['bootdev']] - self.output.put(msg.BootDevice(node=self.node, + await self.output.put(msg.BootDevice(node=self.node, device=bootdev['bootdev'])) - def identify(self): + async def identify(self): if 'update' == self.op: identifystate = self.inputdata.inputbynode[self.node] == 'on' blinkstate = self.inputdata.inputbynode[self.node] == 'blink' - self.ipmicmd.set_identify(on=identifystate, blink=blinkstate) - self.output.put(msg.IdentifyState( + await self.ipmicmd.set_identify(on=identifystate, blink=blinkstate) + await self.output.put(msg.IdentifyState( node=self.node, state=self.inputdata.inputbynode[self.node])) return elif 'read' == self.op: - identify = self.ipmicmd.get_identify().get('identifystate', '') - self.output.put(msg.IdentifyState(node=self.node, state=identify)) + identify = await self.ipmicmd.get_identify() + identify = identify.get('identifystate', '') + await self.output.put(msg.IdentifyState(node=self.node, state=identify)) return async def power(self): @@ -1361,7 +1362,7 @@ class IpmiHandler: powerstate = self.inputdata.powerstate(self.node) oldpower = None if powerstate == 'boot': - oldpower = self.ipmicmd.get_power() + oldpower = await self.ipmicmd.get_power() if 'powerstate' in oldpower: oldpower = oldpower['powerstate'] await self.ipmicmd.set_power(powerstate, wait=30) @@ -1377,115 +1378,115 @@ class IpmiHandler: oldstate=oldpower)) return - def handle_reset(self): + async def handle_reset(self): if 'read' == self.op: - self.output.put(msg.BMCReset(node=self.node, + await self.output.put(msg.BMCReset(node=self.node, state='reset')) return elif 'update' == self.op: - self.ipmicmd.reset_bmc() + await self.ipmicmd.reset_bmc() return - def handle_identifier(self): + async def handle_identifier(self): if 'read' == self.op: - mci = self.ipmicmd.get_mci() - self.output.put(msg.MCI(self.node, mci)) + mci = await self.ipmicmd.get_mci() + await self.output.put(msg.MCI(self.node, mci)) return elif 'update' == self.op: mci = self.inputdata.mci(self.node) - self.ipmicmd.set_mci(mci) + await self.ipmicmd.set_mci(mci) return - def handle_hostname(self): + async def handle_hostname(self): if 'read' == self.op: - hostname = self.ipmicmd.get_hostname() - self.output.put(msg.Hostname(self.node, hostname)) + hostname = await self.ipmicmd.get_hostname() + await self.output.put(msg.Hostname(self.node, hostname)) return elif 'update' == self.op: hostname = self.inputdata.hostname(self.node) - self.ipmicmd.set_hostname(hostname) + await self.ipmicmd.set_hostname(hostname) return - def handle_domain_name(self): + async def handle_domain_name(self): if 'read' == self.op: - dn = self.ipmicmd.get_domain_name() - self.output.put(msg.DomainName(self.node, dn)) + dn = await self.ipmicmd.get_domain_name() + await self.output.put(msg.DomainName(self.node, dn)) return elif 'update' == self.op: dn = self.inputdata.domain_name(self.node) - self.ipmicmd.set_domain_name(dn) + await self.ipmicmd.set_domain_name(dn) return - def handle_location_config(self): + async def handle_location_config(self): if 'read' == self.op: - lc = self.ipmicmd.get_location_information() + lc = await self.ipmicmd.get_location_information() - def handle_bmcconfig(self, advanced=False, extended=False): + async def handle_bmcconfig(self, advanced=False, extended=False): if 'read' == self.op: try: if extended: - bmccfg = self.ipmicmd.get_extended_bmc_configuration( + bmccfg = await self.ipmicmd.get_extended_bmc_configuration( hideadvanced=(not advanced)) else: - bmccfg = self.ipmicmd.get_bmc_configuration() - self.output.put(msg.ConfigSet(self.node, bmccfg)) + bmccfg = await self.ipmicmd.get_bmc_configuration() + await self.output.put(msg.ConfigSet(self.node, bmccfg)) except Exception as e: - self.output.put( + await self.output.put( msg.ConfluentNodeError(self.node, str(e))) elif 'update' == self.op: - self.ipmicmd.set_bmc_configuration( + await self.ipmicmd.set_bmc_configuration( self.inputdata.get_attributes(self.node)) - def handle_bmcconfigclear(self): + async def handle_bmcconfigclear(self): if 'read' == self.op: raise exc.InvalidArgumentException( 'Cannot read the "clear" resource') - self.ipmicmd.clear_bmc_configuration() + await self.ipmicmd.clear_bmc_configuration() - def handle_sysconfigclear(self): + async def handle_sysconfigclear(self): if 'read' == self.op: raise exc.InvalidArgumentException( 'Cannot read the "clear" resource') - self.ipmicmd.clear_system_configuration() + await self.ipmicmd.clear_system_configuration() - def handle_sysconfig(self, advanced=False): + async def handle_sysconfig(self, advanced=False): if 'read' == self.op: - self.output.put(msg.ConfigSet( - self.node, self.ipmicmd.get_system_configuration( + await self.output.put(msg.ConfigSet( + self.node, await self.ipmicmd.get_system_configuration( hideadvanced=not advanced))) elif 'update' == self.op: - self.ipmicmd.set_system_configuration( + await self.ipmicmd.set_system_configuration( self.inputdata.get_attributes(self.node)) - def handle_ntp(self): + async def handle_ntp(self): if self.element[3] == 'enabled': if 'read' == self.op: - enabled = self.ipmicmd.get_ntp_enabled() - self.output.put(msg.NTPEnabled(self.node, enabled)) + enabled = await self.ipmicmd.get_ntp_enabled() + await self.output.put(msg.NTPEnabled(self.node, enabled)) return elif 'update' == self.op: enabled = self.inputdata.ntp_enabled(self.node) - self.ipmicmd.set_ntp_enabled(enabled == 'True') + await self.ipmicmd.set_ntp_enabled(enabled == 'True') return elif self.element[3] == 'servers': if len(self.element) == 4: - self.output.put(msg.ChildCollection('all')) - size = len(self.ipmicmd.get_ntp_servers()) + await self.output.put(msg.ChildCollection('all')) + size = len(await self.ipmicmd.get_ntp_servers()) for idx in range(1, size + 1): - self.output.put(msg.ChildCollection(idx)) + await self.output.put(msg.ChildCollection(idx)) else: if 'read' == self.op: if self.element[-1] == 'all': - servers = self.ipmicmd.get_ntp_servers() - self.output.put(msg.NTPServers(self.node, servers)) + servers = await self.ipmicmd.get_ntp_servers() + await self.output.put(msg.NTPServers(self.node, servers)) return else: idx = int(self.element[-1]) - 1 - servers = self.ipmicmd.get_ntp_servers() + servers = await self.ipmicmd.get_ntp_servers() if len(servers) > idx: - self.output.put(msg.NTPServer(self.node, servers[idx])) + await self.output.put(msg.NTPServer(self.node, servers[idx])) else: - self.output.put( + await self.output.put( msg.ConfluentTargetNotFound( self.node, 'Requested NTP configuration not found')) return @@ -1493,18 +1494,18 @@ class IpmiHandler: if self.element[-1] == 'all': servers = self.inputdata.ntp_servers(self.node) for idx in servers: - self.ipmicmd.set_ntp_server(servers[idx], + await self.ipmicmd.set_ntp_server(servers[idx], int(idx[-1])-1) return else: idx = int(self.element[-1]) - 1 server = self.inputdata.ntp_server(self.node) - self.ipmicmd.set_ntp_server(server, idx) + await self.ipmicmd.set_ntp_server(server, idx) return - def handle_license(self): - available = self.ipmicmd.get_remote_kvm_available() - self.output.put(msg.License(self.node, available)) + async def handle_license(self): + available = await self.ipmicmd.get_remote_kvm_available() + await self.output.put(msg.License(self.node, available)) return async def save_licenses(self): @@ -1537,15 +1538,15 @@ class IpmiHandler: '(ensure confluent user or group can access file ' 'and parent directories)').format( filename, socket.gethostname()) - self.output.put(msg.ConfluentNodeError(self.node, errstr)) + await self.output.put(msg.ConfluentNodeError(self.node, errstr)) return try: - self.ipmicmd.apply_license(filename, data=datfile) + await self.ipmicmd.apply_license(filename, data=datfile) finally: if datfile is not None: datfile.close() if len(self.element) == 3: - self.output.put(msg.ChildCollection('all')) + await self.output.put(msg.ChildCollection('all')) i = 1 async for lic in self.ipmicmd.get_licenses(): await self.output.put(msg.ChildCollection(str(i)))