2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-02 07:23:35 +00:00

Wire up a number of async style calls

This commit is contained in:
Jarrod Johnson
2026-01-29 15:18:46 -05:00
parent e75a1dc7ad
commit b496f2c324

View File

@@ -424,17 +424,17 @@ class IpmiHandler:
elif self.element == ['identify']:
self.identify()
elif self.element[0] == 'sensors':
self.handle_sensors()
await self.handle_sensors()
elif self.element[:2] == ['configuration', 'storage']:
self.handle_storage()
await self.handle_storage()
elif self.element[0] == 'configuration':
self.handle_configuration()
await self.handle_configuration()
elif self.element[:3] == ['inventory', 'firmware', 'updates']:
self.handle_update()
await self.handle_update()
elif self.element[:3] == ['inventory', 'firmware', 'updatestatus']:
self.handle_update_status()
await self.handle_update_status()
elif self.element[0] == 'inventory':
self.handle_inventory()
await self.handle_inventory()
elif self.element == ['media', 'attach']:
self.handle_attach_media()
elif self.element == ['media', 'detach']:
@@ -491,23 +491,23 @@ class IpmiHandler:
self.output.put(msg.CreatedResource(
'nodes/{0}/support/servicedata/{1}'.format(self.node, u.name)))
def handle_attach_media(self):
async def handle_attach_media(self):
try:
self.ipmicmd.attach_remote_media(self.inputdata.nodefile(
await self.ipmicmd.attach_remote_media(self.inputdata.nodefile(
self.node))
except pygexc.UnsupportedFunctionality as uf:
self.output.put(msg.ConfluentNodeError(self.node, str(uf)))
def handle_detach_media(self):
self.ipmicmd.detach_remote_media()
async def handle_detach_media(self):
await self.ipmicmd.detach_remote_media()
def handle_list_media(self):
async def handle_list_media(self):
for media in self.ipmicmd.list_media():
self.output.put(msg.Media(self.node, media))
def handle_configuration(self):
async def handle_configuration(self):
if self.element[1:3] == ['management_controller', 'alerts']:
return self.handle_alerts()
return await self.handle_alerts()
elif self.element[1:3] == ['management_controller', 'certificate_authorities']:
return self.handle_cert_authorities()
elif self.element[1:3] == ['management_controller', 'certificate']:
@@ -553,12 +553,12 @@ class IpmiHandler:
def decode_alert(self):
raise Exception("Decode Alert not implemented for redfish")
def handle_certificate(self):
async def handle_certificate(self):
self.element = self.element[3:]
if len(self.element) != 1:
raise Exception('Not implemented')
if self.element[0] == 'sign' and self.op == 'update':
csr = self.ipmicmd.get_bmc_csr()
csr = await self.ipmicmd.get_bmc_csr()
subj, san = util.get_bmc_subject_san(self.cfm, self.node, self.inputdata.get_added_names(self.node))
with tempfile.NamedTemporaryFile() as tmpfile:
tmpfile.write(csr.encode())
@@ -571,16 +571,16 @@ class IpmiHandler:
with open(certname, 'rb') as certf:
cert = certf.read()
os.unlink(certname)
self.ipmicmd.install_bmc_certificate(cert)
await self.ipmicmd.install_bmc_certificate(cert)
def handle_cert_authorities(self):
async def handle_cert_authorities(self):
if len(self.element) == 3:
if self.op == 'read':
for cert in self.ipmicmd.get_trusted_cas():
self.output.put(msg.ChildCollection(cert['id']))
elif self.op == 'update':
cert = self.inputdata.get_pem(self.node)
self.ipmicmd.add_trusted_ca(cert)
await self.ipmicmd.add_trusted_ca(cert)
elif len(self.element) == 4:
certid = self.element[-1]
if self.op == 'read':
@@ -592,15 +592,15 @@ class IpmiHandler:
subject=certdata['subject'],
san=certdata.get('san', None)))
elif self.op == 'delete':
self.ipmicmd.del_trusted_ca(certid)
await self.ipmicmd.del_trusted_ca(certid)
return
def handle_certificate(self):
async def handle_certificate(self):
self.element = self.element[3:]
if len(self.element) != 1:
raise Exception('Not implemented')
if self.element[0] == 'sign' and self.op == 'update':
csr = self.ipmicmd.get_bmc_csr()
csr = await self.ipmicmd.get_bmc_csr()
subj, san = util.get_bmc_subject_san(self.cfm, self.node, self.inputdata.get_added_names(self.node))
with tempfile.NamedTemporaryFile() as tmpfile:
tmpfile.write(csr.encode())
@@ -613,16 +613,16 @@ class IpmiHandler:
with open(certname, 'rb') as certf:
cert = certf.read()
os.unlink(certname)
self.ipmicmd.install_bmc_certificate(cert)
await self.ipmicmd.install_bmc_certificate(cert)
def handle_cert_authorities(self):
async def handle_cert_authorities(self):
if len(self.element) == 3:
if self.op == 'read':
for cert in self.ipmicmd.get_trusted_cas():
self.output.put(msg.ChildCollection(cert['id']))
elif self.op == 'update':
cert = self.inputdata.get_pem(self.node)
self.ipmicmd.add_trusted_ca(cert)
await self.ipmicmd.add_trusted_ca(cert)
elif len(self.element) == 4:
certid = self.element[-1]
if self.op == 'read':
@@ -634,10 +634,10 @@ class IpmiHandler:
subject=certdata['subject'],
san=certdata.get('san', None)))
elif self.op == 'delete':
self.ipmicmd.del_trusted_ca(certid)
await self.ipmicmd.del_trusted_ca(certid)
return
def handle_alerts(self):
async def handle_alerts(self):
if self.element[3] == 'destinations':
if len(self.element) == 4:
# A list of destinations
@@ -648,7 +648,7 @@ class IpmiHandler:
elif len(self.element) == 5:
alertidx = int(self.element[-1])
if self.op == 'read':
destdata = self.ipmicmd.get_alert_destination(alertidx)
destdata = await self.ipmicmd.get_alert_destination(alertidx)
self.output.put(msg.AlertDestination(
ip=destdata['address'],
acknowledge=destdata['acknowledge_required'],
@@ -668,15 +668,15 @@ class IpmiHandler:
alertargs['ip'] = alertparms['ip']
if 'retries' in alertparms:
alertargs['retries'] = alertparms['retries']
self.ipmicmd.set_alert_destination(destination=alertidx,
await self.ipmicmd.set_alert_destination(destination=alertidx,
**alertargs)
return
elif self.op == 'delete':
self.ipmicmd.clear_alert_destination(alertidx)
await self.ipmicmd.clear_alert_destination(alertidx)
return
raise Exception('Not implemented')
def handle_nets(self):
async def handle_nets(self):
if len(self.element) == 3:
if self.op != 'read':
self.output.put(
@@ -685,8 +685,8 @@ class IpmiHandler:
self.output.put(msg.ChildCollection('management'))
elif len(self.element) == 4 and self.element[-1] == 'management':
if self.op == 'read':
lancfg = self.ipmicmd.get_net_configuration()
v6cfg = self.ipmicmd.get_net6_configuration()
lancfg = await self.ipmicmd.get_net_configuration()
v6cfg = await self.ipmicmd.get_net6_configuration()
self.output.put(msg.NetworkConfiguration(
self.node, ipv4addr=lancfg['ipv4_address'],
ipv4gateway=lancfg['ipv4_gateway'],
@@ -699,7 +699,7 @@ class IpmiHandler:
elif self.op == 'update':
config = self.inputdata.netconfig(self.node)
try:
self.ipmicmd.set_net_configuration(
await self.ipmicmd.set_net_configuration(
ipv4_address=config['ipv4_address'],
ipv4_configuration=config['ipv4_configuration'],
ipv4_gateway=config['ipv4_gateway'],
@@ -708,7 +708,7 @@ class IpmiHandler:
if v6addrs is not None:
v6addrs = v6addrs.split(',')
v6gw = config.get('static_v6_gateway', None)
self.ipmicmd.set_net6_configuration(static_addresses=v6addrs, static_gateway=v6gw)
await self.ipmicmd.set_net6_configuration(static_addresses=v6addrs, static_gateway=v6gw)
except socket.error as se:
self.output.put(msg.ConfluentNodeError(self.node,
se.message))
@@ -723,32 +723,32 @@ class IpmiHandler:
msg.ConfluentTargetNotFound(self.node,
'Interface not found'))
def handle_users(self):
async def handle_users(self):
# Create user
if len(self.element) == 3:
if self.op == 'update':
user = self.inputdata.credentials[self.node]
self.ipmicmd.create_user(uid=user['uid'], name=user['username'],
await self.ipmicmd.create_user(uid=user['uid'], name=user['username'],
password=user['password'],
privilege_level=user['privilege_level'])
# A list of users
self.output.put(msg.ChildCollection('all'))
for user in self.ipmicmd.get_users():
self.output.put(msg.ChildCollection(user, candelete=True))
await self.output.put(msg.ChildCollection('all'))
async for user in self.ipmicmd.get_users():
await self.output.put(msg.ChildCollection(user, candelete=True))
return
# List all users
elif len(self.element) == 4 and self.element[-1] == 'all':
users = []
for user in self.ipmicmd.get_users():
users.append(self.ipmicmd.get_user(uid=user))
self.output.put(msg.UserCollection(users=users, name=self.node))
async for user in self.ipmicmd.get_users():
users.append(await self.ipmicmd.get_user(uid=user))
await self.output.put(msg.UserCollection(users=users, name=self.node))
return
# Update user
elif len(self.element) == 4:
user = int(self.element[-1])
if self.op == 'read':
data = self.ipmicmd.get_user(uid=user)
self.output.put(msg.User(
data = await self.ipmicmd.get_user(uid=user)
await self.output.put(msg.User(
uid=data['uid'],
username=data['name'],
privilege_level=data['access']['privilege_level'],
@@ -774,21 +774,21 @@ class IpmiHandler:
mode = 'enable'
else:
mode = 'disable'
self.ipmicmd.disable_user(user['uid'], mode)
await self.ipmicmd.disable_user(user['uid'], mode)
return
elif self.op == 'delete':
self.ipmicmd.user_delete(uid=user)
await self.ipmicmd.user_delete(uid=user)
return
def do_eventlog(self):
async def do_eventlog(self):
eventout = []
clear = False
if self.op == 'delete':
clear = True
for event in self.ipmicmd.get_event_log(clear):
async for event in self.ipmicmd.get_event_log(clear):
self.pyghmi_event_to_confluent(event)
eventout.append(event)
self.output.put(msg.EventCollection(eventout, name=self.node))
await self.output.put(msg.EventCollection(eventout, name=self.node))
def pyghmi_event_to_confluent(self, event):
event['severity'] = _str_health(event.get('severity', 'unknown'))
@@ -804,56 +804,56 @@ class IpmiHandler:
for name in invnames:
self.invmap[simplify_name(name)] = name
def make_sensor_map(self, sensors=None):
async def make_sensor_map(self, sensors=None):
if sensors is None:
sensors = self.ipmicmd.get_sensor_descriptions()
sensors = await self.ipmicmd.get_sensor_descriptions()
for sensor in sensors:
resourcename = sensor['name']
self.sensormap[simplify_name(resourcename)] = resourcename
def read_normalized(self, sensorname):
async def read_normalized(self, sensorname):
readings = None
if sensorname == 'average_cpu_temp':
cputemp = self.ipmicmd.get_average_processor_temperature()
cputemp = await self.ipmicmd.get_average_processor_temperature()
readings = [cputemp]
elif sensorname == 'inlet_temp':
inltemp = self.ipmicmd.get_inlet_temperature()
inltemp = await self.ipmicmd.get_inlet_temperature()
readings = [inltemp]
elif sensorname == 'total_power':
sensor = EmptySensor('Total Power')
sensor.states = []
sensor.units = 'W'
sensor.value = self.ipmicmd.get_system_power_watts()
sensor.value = await self.ipmicmd.get_system_power_watts()
readings = [sensor]
if readings:
self.output.put(msg.SensorReadings(readings, name=self.node))
await self.output.put(msg.SensorReadings(readings, name=self.node))
def read_sensors(self, sensorname):
async def read_sensors(self, sensorname):
if sensorname == 'all':
sensors = self.ipmicmd.get_sensor_descriptions()
sensors = await self.ipmicmd.get_sensor_descriptions()
readings = []
for sensor in filter(self.match_sensor, sensors):
try:
reading = self.ipmicmd.get_sensor_reading(
sensor['name'])
if reading.unavailable:
self.output.put(msg.SensorReadings([EmptySensor(
await self.output.put(msg.SensorReadings([EmptySensor(
sensor['name'])], name=self.node))
continue
except pygexc.IpmiException as ie:
if ie.ipmicode == 203:
self.output.put(msg.SensorReadings([EmptySensor(
await self.output.put(msg.SensorReadings([EmptySensor(
sensor['name'])], name=self.node))
continue
raise
if hasattr(reading, 'health'):
reading.health = _str_health(reading.health)
readings.append(reading)
self.output.put(msg.SensorReadings(readings, name=self.node))
await self.output.put(msg.SensorReadings(readings, name=self.node))
else:
self.make_sensor_map()
await self.make_sensor_map()
if sensorname not in self.sensormap:
self.output.put(
await self.output.put(
msg.ConfluentTargetNotFound(self.node,
'Sensor not found'))
return
@@ -861,44 +861,44 @@ class IpmiHandler:
reading = self.ipmicmd.get_sensor_reading(
self.sensormap[sensorname])
if reading.unavailable:
self.output.put(msg.ConfluentResourceUnavailable(
await self.output.put(msg.ConfluentResourceUnavailable(
self.node, 'Unavailable'
))
return
if hasattr(reading, 'health'):
reading.health = _str_health(reading.health)
self.output.put(
await self.output.put(
msg.SensorReadings([reading],
name=self.node))
except pygexc.IpmiException as ie:
if ie.ipmicode == 203:
self.output.put(msg.ConfluentResourceUnavailable(
await self.output.put(msg.ConfluentResourceUnavailable(
self.node, 'Unavailable'
))
else:
self.output.put(msg.ConfluentTargetTimeout(self.node))
await self.output.put(msg.ConfluentTargetTimeout(self.node))
def list_inventory(self):
async def list_inventory(self):
try:
components = self.ipmicmd.get_inventory_descriptions()
components = await self.ipmicmd.get_inventory_descriptions()
except pygexc.IpmiException:
self.output.put(msg.ConfluentTargetTimeout(self.node))
await self.output.put(msg.ConfluentTargetTimeout(self.node))
return
self.output.put(msg.ChildCollection('all'))
await self.output.put(msg.ChildCollection('all'))
for component in components:
self.output.put(msg.ChildCollection(simplify_name(component)))
await self.output.put(msg.ChildCollection(simplify_name(component)))
def list_firmware(self):
self.output.put(msg.ChildCollection('all'))
for id, data in self.ipmicmd.get_firmware():
self.output.put(msg.ChildCollection(simplify_name(id)))
async def list_firmware(self):
await self.output.put(msg.ChildCollection('all'))
async for id, data in self.ipmicmd.get_firmware():
await self.output.put(msg.ChildCollection(simplify_name(id)))
def read_firmware(self, component, category):
async def read_firmware(self, component, category):
items = []
errorneeded = False
try:
complist = () if component == 'all' else (component,)
for id, data in self.ipmicmd.get_firmware(complist, category):
async for id, data in self.ipmicmd.get_firmware(complist, category):
if (component in ('core', 'all') or
component == simplify_name(id) or
match_aliases(component, simplify_name(id))):
@@ -917,49 +917,49 @@ class IpmiHandler:
except pygexc.TemporaryError as e:
errorneeded = msg.ConfluentNodeError(
self.node, str(e))
self.output.put(msg.Firmware(items, self.node))
await self.output.put(msg.Firmware(items, self.node))
if errorneeded:
self.output.put(errorneeded)
await self.output.put(errorneeded)
def handle_update_status(self):
activeupdates = list(firmwaremanager.list_updates([self.node], None, []))
async def handle_update_status(self):
activeupdates = list(await firmwaremanager.list_updates([self.node], None, []))
if activeupdates:
self.output.put(msg.KeyValueData({'status': 'active'}, self.node))
await self.output.put(msg.KeyValueData({'status': 'active'}, self.node))
else:
status = self.ipmicmd.get_update_status()
self.output.put(msg.KeyValueData({'status': status}, self.node))
status = await self.ipmicmd.get_update_status()
await self.output.put(msg.KeyValueData({'status': status}, self.node))
def handle_inventory(self):
async def handle_inventory(self):
if self.element[1] == 'firmware':
if len(self.element) == 3:
return self.list_firmware()
return await self.list_firmware()
elif len(self.element) == 4:
return self.read_firmware(self.element[-1], self.element[-2])
return await self.read_firmware(self.element[-1], self.element[-2])
elif self.element[1] == 'hardware':
if len(self.element) == 3: # list things in inventory
return self.list_inventory()
return await self.list_inventory()
elif len(self.element) == 4: # actually read inventory data
return self.read_inventory(self.element[-1])
return await self.read_inventory(self.element[-1])
raise Exception('Unsupported scenario...')
def list_leds(self):
self.output.put(msg.ChildCollection('all'))
for category, info in self.ipmicmd.get_leds():
self.output.put(msg.ChildCollection(simplify_name(category)))
async def list_leds(self):
await self.output.put(msg.ChildCollection('all'))
async for category, info in self.ipmicmd.get_leds():
await self.output.put(msg.ChildCollection(simplify_name(category)))
def read_leds(self, component):
async def read_leds(self, component):
led_categories = []
for category, info in self.ipmicmd.get_leds():
async for category, info in self.ipmicmd.get_leds():
if component == 'all' or component == simplify_name(category):
led_categories.append({category: info})
self.output.put(msg.LEDStatus(led_categories, self.node))
await self.output.put(msg.LEDStatus(led_categories, self.node))
def read_inventory(self, component):
async def read_inventory(self, component):
errorneeded = False
try:
invitems = []
if component == 'all':
for invdata in self.ipmicmd.get_inventory():
async for invdata in self.ipmicmd.get_inventory():
if invdata[1] is None:
newinf = {'present': False, 'information': None,
'name': invdata[0]}
@@ -970,12 +970,12 @@ class IpmiHandler:
newinf['name'] = invdata[1].get('name', invdata[0])
self.add_invitem(invitems, newinf)
else:
self.make_inventory_map()
await self.make_inventory_map()
compname = self.invmap.get(component, None)
if compname is None:
self.output.put(msg.ConfluentTargetNotFound())
await self.output.put(msg.ConfluentTargetNotFound())
return
invdata = self.ipmicmd.get_inventory_of_component(compname)
invdata = await self.ipmicmd.get_inventory_of_component(compname)
if invdata is None:
newinf = {'present': False, 'information': None,
'name': compname}
@@ -996,9 +996,9 @@ class IpmiHandler:
'target certificate fingerprint and '
'pubkeys.tls_hardwaremanager attribute')
newinvdata = {'inventory': invitems}
self.output.put(msg.KeyValueData(newinvdata, self.node))
await self.output.put(msg.KeyValueData(newinvdata, self.node))
if errorneeded:
self.output.put(errorneeded)
await self.output.put(errorneeded)
def add_invitem(self, invitems, newinf):
if newinf.get('information', None) and 'name' in newinf['information']:
@@ -1035,7 +1035,7 @@ class IpmiHandler:
elif 'create' == self.realop:
return self._create_storage(storelem)
def _delete_storage(self, storelem):
async def _delete_storage(self, storelem):
if len(storelem) < 2:
storelem.append('')
if len(storelem) < 2 or storelem[0] != 'volumes':
@@ -1054,15 +1054,15 @@ class IpmiHandler:
self.output.put(msg.ConfluentTargetNotFound(
self.node, "No volume named '{0}' found".format(volname)))
return
self.ipmicmd.remove_storage_configuration(toremove)
self.output.put(msg.DeletedResource(volname))
await self.ipmicmd.remove_storage_configuration(toremove)
await self.output.put(msg.DeletedResource(volname))
def _create_storage(self, storelem):
async def _create_storage(self, storelem):
if 'volumes' not in storelem:
raise exc.InvalidArgumentException('Can only create volumes')
vols = []
thedisks = None
currcfg = self.ipmicmd.get_storage_configuration()
currcfg = await self.ipmicmd.get_storage_configuration()
currnames = []
for arr in currcfg.arrays:
arrname = '{0}-{1}'.format(*arr.id)
@@ -1091,22 +1091,22 @@ class IpmiHandler:
vols.append(storage.Volume(name=vol['name'], size=vol['size'], stripsize=vol['stripsize']))
newcfg = storage.ConfigSpec(
arrays=(storage.Array(raid=raidlvl, disks=disks, volumes=vols),))
self.ipmicmd.apply_storage_configuration(newcfg)
await self.ipmicmd.apply_storage_configuration(newcfg)
for vol in self.inputdata.inputbynode[self.node]:
if vol['name'] is None:
newcfg = self.ipmicmd.get_storage_configuration()
newcfg = await self.ipmicmd.get_storage_configuration()
for arr in newcfg.arrays:
arrname = '{0}-{1}'.format(*arr.id)
for vol in arr.volumes:
if vol.name not in currnames:
self.output.put(
await self.output.put(
msg.Volume(self.node, vol.name, vol.size,
vol.status, arrname))
return
else:
self._show_storage(storelem[:1] + [vol['name']])
def _update_storage(self, storelem):
async def _update_storage(self, storelem):
if storelem[0] == 'disks':
if len(storelem) == 1:
raise exc.InvalidArgumentException('Must target a disk')