2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-05-01 04:47:45 +00:00
Files
confluent/confluent_server/aiohmi/redfish/command.py
Jarrod Johnson bfc27595dc Fold aiohmi into confluent
If someone asks for it independently, we can break it out again.  But for now,
assume it's only for confluent.
2026-04-30 08:48:24 -04:00

1589 lines
63 KiB
Python

# coding: utf8
# Copyright 2021 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The command module for redfish systems. Provides https-only support
for redfish compliant endpoints
"""
import asyncio
import base64
from datetime import datetime
from datetime import timedelta
import json
import os
import re
import socket
import struct
import sys
import time
from dateutil import tz
import aiohmi.constants as const
import aiohmi.exceptions as exc
import aiohmi.redfish.oem.lookup as oem
from aiohmi.util.parse import parse_time
import aiohmi.util.webclient as webclient
numregex = re.compile('([0-9]+)')
powerstates = {
'on': 'On',
'off': 'ForceOff',
'softoff': 'GracefulShutdown',
'shutdown': 'GracefulShutdown',
'reset': 'ForceRestart',
'boot': None,
}
boot_devices_read = {
'BiosSetup': 'setup',
'Cd': 'optical',
'Floppy': 'floppy',
'Hdd': 'hd',
'None': 'default',
'Pxe': 'network',
'Usb': 'usb',
'SDCard': 'sdcard',
}
_healthmap = {
'Critical': const.Health.Critical,
'Unknown': const.Health.Warning,
'Warning': const.Health.Warning,
'OK': const.Health.Ok,
None: const.Health.Ok,
}
def _mask_to_cidr(mask):
maskn = socket.inet_pton(socket.AF_INET, mask)
maskn = struct.unpack('!I', maskn)[0]
cidr = 32
while maskn & 0b1 == 0 and cidr > 0:
cidr -= 1
maskn >>= 1
return cidr
def _cidr_to_mask(cidr):
return socket.inet_ntop(
socket.AF_INET, struct.pack(
'!I', (2**32 - 1) ^ (2**(32 - cidr) - 1)))
def naturalize_string(key):
"""Analyzes string in a human way to enable natural sort
:param nodename: The node name to analyze
:returns: A structure that can be consumed by 'sorted'
"""
return [int(text) if text.isdigit() else text.lower()
for text in re.split(numregex, key)]
def natural_sort(iterable):
"""Return a sort using natural sort if possible
:param iterable:
:return:
"""
try:
return sorted(iterable, key=naturalize_string)
except TypeError:
# The natural sort attempt failed, fallback to ascii sort
return sorted(iterable)
class SensorReading(object):
def __init__(self, healthinfo, sensor=None, value=None, units=None,
unavailable=False):
self.states = []
if sensor:
self.name = sensor['name']
else:
self.name = healthinfo['Name']
self.health = _healthmap.get(healthinfo.get(
'Status', {}).get('Health', None), const.Health.Warning)
self.states = [healthinfo.get('Status', {}).get('Health',
'Unknown')]
if self.health == const.Health.Ok:
self.states = []
self.value = value
self.state_ids = None
self.imprecision = None
self.units = units
self.unavailable = unavailable
def __repr__(self):
return repr({
'value': self.value,
'state_ids': self.state_ids,
'units': self.units,
'imprecision': self.imprecision,
'name': self.name,
'unavailable': self.unavailable,
})
class Command(object):
@classmethod
async def create(cls, bmc, userid, password, verifycallback, sysurl=None,
bmcurl=None, chassisurl=None, pool=None, port=443):
self = cls()
self.wc = webclient.WebConnection(
bmc, port, verifycallback=verifycallback)
self._hwnamemap = {}
self._fwnamemap = {}
self._urlcache = {}
self._varbmcurl = bmcurl
self._varbiosurl = None
self._varbmcnicurl = None
self._varsetbiosurl = None
self._varchassisurl = chassisurl
self._varresetbmcurl = None
self._varupdateservice = None
self._varfwinventory = None
self._oem = None
self._gpool = pool
self._bmcv4ip = None
self._bmcv6ip = None
self.xauthtoken = None
for addrinf in socket.getaddrinfo(bmc, 0, 0, socket.SOCK_STREAM):
if addrinf[0] == socket.AF_INET:
self._bmcv4ip = socket.inet_pton(addrinf[0], addrinf[-1][0])
elif addrinf[0] == socket.AF_INET6:
theip = addrinf[-1][0]
theip = theip.split('%')[0]
self._bmcv6ip = socket.inet_pton(addrinf[0], theip)
self.wc.set_header('Accept', 'application/json')
self.wc.set_header('User-Agent', 'aiohmi')
self.wc.set_header('Accept-Encoding', 'gzip')
self.wc.set_header('OData-Version', '4.0')
overview = await self.wc.grab_json_response('/redfish/v1/')
self.username = userid
self.password = password
self.wc.set_basic_credentials(self.username, self.password)
self.wc.set_header('Content-Type', 'application/json')
if 'Systems' not in overview and 'Managers' not in overview:
raise exc.PyghmiException('Redfish not ready')
if 'SessionService' in overview:
await self._get_session_token(self.wc)
self._varsensormap = {}
self.powerurl = None
self.sysurl = None
self._initsysurl = sysurl
tmpoem = await oem.get_oem_handler({}, sysurl, self.wc, self._urlcache, self,
rootinfo=overview)
self._varbmcurl = await tmpoem.get_default_mgrurl()
if 'Systems' in overview:
self.sysurl = await tmpoem.get_default_sysurl()
sysinfo = await self.sysinfo()
self.powerurl = sysinfo.get('Actions', {}).get(
'#ComputerSystem.Reset', {}).get('target', None)
return self
async def _get_session_token(self, wc):
# specification actually indicates we can skip straight to this url
username = self.username
password = self.password
if not isinstance(username, str):
username = username.decode()
if not isinstance(password, str):
password = password.decode()
_, status, headers = await wc.grab_response_with_status('/redfish/v1/SessionService/Sessions',
{'UserName': username, 'Password': password})
if status > 299 or status < 200:
return
self.xauthtoken = headers.get('X-Auth-Token')
if self.xauthtoken:
if 'Authorization' in wc.stdheaders:
del wc.stdheaders['Authorization']
if 'Authorization' in self.wc.stdheaders:
del self.wc.stdheaders['Authorization']
wc.stdheaders['X-Auth-Token'] = self.xauthtoken
self.wc.stdheaders['X-Auth-Token'] = self.xauthtoken
async def _accountserviceurl(self):
sroot = await self._do_web_request('/redfish/v1/')
return sroot.get('AccountService', {}).get('@odata.id', None)
async def _validroles(self):
okroles = set()
roleurl = (await self._do_web_request(await self._accountserviceurl())).get(
'Roles', {}).get('@odata.id', None)
if roleurl:
roles = (await self._do_web_request(roleurl)).get('Members', [])
for role in roles:
role = role.get('@odata.id', '')
if not role:
continue
okroles.add(role.split('/')[-1])
if not okroles:
okroles.add('Administrator')
okroles.add('Operator')
okroles.add('ReadOnly')
return okroles
async def get_trusted_cas(self):
oem = await self.oem()
async for ca in oem.get_trusted_cas():
yield ca
async def get_bmc_csr(self, keytype=None, keylength=None, cn=None, city=None,
state=None, country=None, org=None, orgunit=None):
oem = await self.oem()
return await oem.get_bmc_csr(
keytype=keytype, keylength=keylength, cn=cn)
async def install_bmc_certificate(self, certdata):
oem = await self.oem()
return await oem.install_bmc_certificate(certdata)
async def add_trusted_ca(self, pemdata):
oem = await self.oem()
return await oem.add_trusted_ca(pemdata)
async def del_trusted_ca(self, certid):
oem = await self.oem()
return await oem.del_trusted_ca(certid)
async def get_users(self):
"""get list of users and channel access information (helper)
:param channel: number [1:7]
:return:
name: (str)
uid: (int)
channel: (int)
access:
callback (bool)
link_auth (bool)
ipmi_msg (bool)
privilege_level: (str)[callback, user, operatorm administrator,
proprietary, no_access]
"""
srvurl = await self._accountserviceurl()
names = {}
if srvurl:
srvinfo = await self._do_web_request(srvurl)
srvurl = srvinfo.get('Accounts', {}).get('@odata.id', None)
if srvurl:
srvinfo = await self._do_web_request(srvurl)
accounts = srvinfo.get('Members', [])
oem = await self.oem()
for account in accounts:
accinfo = await self._do_web_request(account['@odata.id'])
currname = accinfo.get('UserName', '')
currid = accinfo.get('Id', None)
if currname:
names[currid] = {
'name': currname,
'uid': currid,
'expiration': await oem.get_user_expiration(currid),
'access': {
'privilege_level': accinfo.get('RoleId',
'Unknown')
}
}
return names
async def _account_url_info_by_id(self, uid):
srvurl = await self._accountserviceurl()
oem = await self.oem()
if srvurl:
srvinfo = await self._do_web_request(srvurl)
srvurl = srvinfo.get('Accounts', {}).get('@odata.id', None)
if srvurl:
srvinfo = await self._do_web_request(srvurl)
accounts = srvinfo.get('Members', [])
for account in accounts:
accinfo = await self._do_web_request(account['@odata.id'])
currid = accinfo.get('Id', None)
if str(currid) == str(uid):
accinfo['expiration'] = await oem.get_user_expiration(
uid)
return account['@odata.id'], accinfo
async def get_user(self, uid):
srvurl = await self._accountserviceurl()
oem = await self.oem()
if srvurl:
srvinfo = await self._do_web_request(srvurl)
srvurl = srvinfo.get('Accounts', {}).get('@odata.id', None)
if srvurl:
srvinfo = await self._do_web_request(srvurl)
accounts = srvinfo.get('Members', [])
for account in accounts:
accinfo = await self._do_web_request(account['@odata.id'])
currname = accinfo.get('UserName', '')
currid = accinfo.get('Id', None)
if str(currid) == str(uid):
oem = await self.oem()
return {'name': currname, 'uid': uid,
'expiration': await oem.get_user_expiration(
uid),
'access': {
'privilege_level': accinfo.get(
'RoleId', 'Unknown')}}
async def set_user_password(self, uid, mode='set_password', password=None):
"""Set user password and (modes)
:param uid: id number of user. see: get_names_uid()['name']
:param mode:
disable = disable user connections
enable = enable user connections
set_password = set or ensure password
:param password: Password
(optional when mode is [disable or enable])
:return:
True on success
"""
accinfo = await self._account_url_info_by_id(uid)
if not accinfo:
raise Exception("No such account found")
etag = accinfo[1].get('@odata.etag', None)
if mode == 'set_password':
await self._do_web_request(accinfo[0], {'Password': password},
method='PATCH', etag=etag)
elif mode == 'disable':
await self._do_web_request(accinfo[0], {'Enabled': False},
method='PATCH', etag=etag)
elif mode == 'enable':
await self._do_web_request(accinfo[0], {'Enabled': True},
method='PATCH', etag=etag)
return True
async def disable_user(self, uid, mode):
"""Disable User
Just disable the User.
This will not disable the password or revoke privileges.
:param uid: user id
:param mode:
disable = disable user connections
enable = enable user connections
"""
await self.set_user_password(uid, mode)
return True
async def set_user_access(self, uid, privilege_level='ReadOnly'):
if privilege_level.startswith('custom.'):
privilege_level = privilege_level.replace('custom.', '')
accinfo = await self._account_url_info_by_id(uid)
if accinfo:
method = 'PATCH'
else:
accinfo = ((await self._accountserviceurl()) + '/Accounts', {})
method = 'POST'
etag = accinfo[1].get('@odata.etag', None)
for role in await self._validroles():
if role.lower() == privilege_level.lower():
privilege_level = role
break
await self._do_web_request(accinfo[0], {'RoleId': privilege_level},
method=method, etag=etag)
async def create_user(self, uid, name, password, privilege_level='ReadOnly'):
"""create/ensure a user is created with provided settings
:param privilege_level:
User Privilege level. Redfish role, commonly Administrator,
Operator, and ReadOnly
"""
accinfo = await self._account_url_info_by_id(uid)
if not accinfo:
raise Exception("Unable to find indicated uid")
if privilege_level.startswith('custom.'):
privilege_level = privilege_level.replace('custom.', '')
for role in await self._validroles():
if role.lower() == privilege_level.lower():
privilege_level = role
break
etag = accinfo[1].get('@odata.etag', None)
userinfo = {
"UserName": name,
"Password": password,
"RoleId": privilege_level,
}
await self._do_web_request(accinfo[0], userinfo, method='PATCH', etag=etag)
return True
async def get_screenshot(self, outfile):
oem = await self.oem()
return await oem.get_screenshot(outfile)
async def get_ikvm_methods(self):
oem = await self.oem()
return await oem.get_ikvm_methods()
async def get_ikvm_launchdata(self):
oem = await self.oem()
return await oem.get_ikvm_launchdata()
async def user_delete(self, uid):
oem = await self.oem()
return await oem.user_delete(uid, self)
async def set_user_name(self, uid, name):
"""Set user name
:param uid: user id
:param name: username
"""
accinfo = await self._account_url_info_by_id(uid)
if not accinfo:
raise Exception("No such account found")
etag = accinfo[1].get('@odata.etag', None)
await self._do_web_request(accinfo[0], {'UserName': name}, method='PATCH',
etag=etag)
return True
async def _updateservice(self):
if not self._varupdateservice:
overview = await self._do_web_request('/redfish/v1/')
us = overview.get('UpdateService', {}).get('@odata.id', None)
if not us:
raise exc.UnsupportedFunctionality(
'BMC does not implement extended firmware information')
self._varupdateservice = us
return self._varupdateservice
async def get_fwinventory(self):
if not self._varfwinventory:
usi = await self._do_web_request(await self._updateservice())
self._varfwinventory = usi.get('FirmwareInventory', {}).get(
'@odata.id', None)
if not self._varfwinventory:
raise exc.UnsupportedFunctionality(
'BMC does not implement extended firmware information')
return self._varfwinventory
async def sysinfo(self):
if not self.sysurl:
return {}
try:
return await self._do_web_request(self.sysurl)
except exc.RedfishError:
self.sysurl = None
return {}
async def bmcinfo(self):
bmcurl = await self.get_bmcurl()
return await self._do_web_request(bmcurl)
async def get_power(self):
currinfo = await self._do_web_request(self.sysurl, cache=False)
return {'powerstate': str(currinfo['PowerState'].lower())}
async def reseat_bay(self, bay):
"""Request the reseat of a bay
Request the enclosure manager to reseat the system in a particular
bay.
:param bay: The bay identifier to reseat
:return:
"""
oem = await self.oem()
await oem.reseat_bay(bay)
async def set_power(self, powerstate, wait=False):
if powerstate == 'boot':
oldpowerstate = (await self.get_power())['powerstate']
powerstate = 'on' if oldpowerstate == 'off' else 'reset'
elif powerstate in ('on', 'off'):
oldpowerstate = (await self.get_power())['powerstate']
if oldpowerstate == powerstate:
return {'powerstate': powerstate}
reqpowerstate = powerstate
if powerstate not in powerstates:
raise exc.InvalidParameterValue(
"Unknown power state %s requested" % powerstate)
powerstate = powerstates[powerstate]
await self._do_web_request(
self.powerurl, {'ResetType': powerstate})
if wait and reqpowerstate in ('on', 'off', 'softoff', 'shutdown'):
if reqpowerstate in ('softoff', 'shutdown'):
reqpowerstate = 'off'
timeout = os.times()[4] + 300
while (await self.get_power())['powerstate'] != reqpowerstate and os.times()[4] < timeout:
await asyncio.sleep(1)
if (await self.get_power())['powerstate'] != reqpowerstate:
raise exc.PyghmiException(
"System did not accomplish power state change")
return {'powerstate': reqpowerstate}
return {'pendingpowerstate': reqpowerstate}
def _get_cache(self, url, cache=30):
now = os.times()[4]
cachent = self._urlcache.get(url, None)
if cachent and cachent['vintage'] > now - cache:
return cachent['contents']
return None
async def _do_bulk_requests(self, urls, cache=True):
if self._gpool:
urls = [(x, None, None, cache) for x in urls]
for res in self._gpool.starmap(self._do_web_request_withurl, urls):
yield res
else:
for url in urls:
yield await self._do_web_request_withurl(url, cache=cache)
async def _do_web_request_withurl(self, url, payload=None, method=None,
cache=True):
return await self._do_web_request(url, payload, method, cache), url
async def _do_web_request(self, url, payload=None, method=None, cache=True,
etag=None):
res = None
if cache is True:
cache = 30
if cache and payload is None and method is None:
res = self._get_cache(url, cache)
if res:
return res
wc = self.wc
if etag:
wc.stdheaders['If-Match'] = etag
try:
res = await wc.grab_json_response_with_status(url, payload,
method=method)
finally:
if 'If-Match' in wc.stdheaders:
del wc.stdheaders['If-Match']
if res[1] == 401 and self.xauthtoken:
wc.set_basic_credentials(self.username, self.password)
await self._get_session_token(wc)
if etag:
wc.stdheaders['If-Match'] = etag
try:
res = await wc.grab_json_response_with_status(url, payload,
method=method)
finally:
if 'If-Match' in wc.stdheaders:
del wc.stdheaders['If-Match']
if res[1] < 200 or res[1] >= 300:
try:
info = json.loads(res[0])
errmsg = [
x.get('Message', x['MessageId']) for x in info.get(
'error', {}).get('@Message.ExtendedInfo', {})]
msgid = [
x['MessageId'] for x in info.get(
'error', {}).get('@Message.ExtendedInfo', {})]
errmsg = ','.join(errmsg)
msgid = ','.join(msgid)
raise exc.RedfishError(errmsg, msgid=msgid)
except (ValueError, KeyError):
raise exc.PyghmiException(str(url) + ":" + str(res[0]))
if payload is None and method is None:
self._urlcache[url] = {'contents': res[0],
'vintage': os.times()[4]}
return res[0]
async def get_bootdev(self):
"""Get current boot device override information.
:raises: PyghmiException on error
:returns: dict
"""
result = await self._do_web_request(self.sysurl)
overridestate = result.get('Boot', {}).get(
'BootSourceOverrideEnabled', None)
if overridestate == 'Disabled':
return {'bootdev': 'default', 'persistent': True}
persistent = None
if overridestate == 'Once':
persistent = False
elif overridestate == 'Continuous':
persistent = True
else:
raise exc.PyghmiException('Unrecognized Boot state: %s'
% repr(overridestate))
uefimode = result.get('Boot', {}).get('BootSourceOverrideMode', None)
if uefimode == 'UEFI':
uefimode = True
elif uefimode == 'Legacy':
uefimode = False
else:
raise exc.PyghmiException('Unrecognized mode: %s' % uefimode)
bootdev = result.get('Boot', {}).get('BootSourceOverrideTarget', None)
if bootdev not in boot_devices_read:
raise exc.PyghmiException('Unrecognized boot target: %s'
% repr(bootdev))
bootdev = boot_devices_read[bootdev]
return {'bootdev': bootdev, 'persistent': persistent,
'uefimode': uefimode}
async def set_bootdev(self, bootdev, persist=False, uefiboot=None):
"""Set boot device to use on next reboot
:param bootdev:
*network -- Request network boot
*hd -- Boot from hard drive
*safe -- Boot from hard drive, requesting 'safe mode'
*optical -- boot from CD/DVD/BD drive
*setup -- Boot into setup utility
*default -- remove any directed boot device request
:param persist: If true, ask that system firmware use this device
beyond next boot. Be aware many systems do not honor
this
:param uefiboot: If true, request UEFI boot explicitly. If False,
request BIOS style boot.
None (default) does not modify the boot mode.
:raises: PyghmiException on an error.
:returns: dict or True -- If callback is not provided, the response
"""
oem = await self.oem()
return await oem.set_bootdev(bootdev, persist, uefiboot, self)
async def get_biosurl(self):
if not self._varbiosurl:
sysinfo = await self.sysinfo()
self._varbiosurl = sysinfo.get('Bios', {}).get('@odata.id',
None)
if self._varbiosurl is None:
raise exc.UnsupportedFunctionality(
'Bios management not detected on this platform')
return self._varbiosurl
async def get_setbiosurl(self):
if self._varsetbiosurl is None:
biosinfo = await self._do_web_request(await self.get_biosurl())
self._varsetbiosurl = biosinfo.get(
'@Redfish.Settings', {}).get('SettingsObject', {}).get(
'@odata.id', None)
if self._varsetbiosurl is None:
raise exc.UnsupportedFunctionality('Ability to set BIOS settings '
'not detected on this platform')
return self._varsetbiosurl
async def _sensormap(self):
if not self._varsensormap:
sysinfo = await self.sysinfo()
if sysinfo:
for chassis in sysinfo.get('Links', {}).get('Chassis', []):
await self._mapchassissensors(chassis)
else: # no system, but check if this is a singular chassis
rootinfo = await self._do_web_request('/redfish/v1/')
chassiscol = rootinfo.get('Chassis', {}).get('@odata.id', '')
if chassiscol:
chassislist = await self._do_web_request(chassiscol)
if len(chassislist.get('Members', [])) == 1:
await self._mapchassissensors(chassislist['Members'][0])
return self._varsensormap
async def _mapchassissensors(self, chassis):
chassisurl = chassis['@odata.id']
chassisinfo = await self._do_web_request(chassisurl)
sensors = None
oem = await self.oem()
if oem.usegenericsensors:
sensors = chassisinfo.get('Sensors', {}).get('@odata.id', '')
if sensors:
sensorinf = await self._do_web_request(sensors)
for sensor in sensorinf.get('Members', []):
sensedata = await self._do_web_request(sensor['@odata.id'])
if 'Name' in sensedata:
sensetype = sensedata.get('ReadingType', 'Unknown')
self._varsensormap[sensedata['Name']] = {
'name': sensedata['Name'], 'type': sensetype,
'url': sensor['@odata.id'], 'generic': True}
else:
powurl = chassisinfo.get('Power', {}).get('@odata.id', '')
if powurl:
powinf = await self._do_web_request(powurl)
for voltage in powinf.get('Voltages', []):
if 'Name' in voltage:
self._varsensormap[voltage['Name']] = {
'name': voltage['Name'], 'url': powurl,
'type': 'Voltage'}
thermurl = chassisinfo.get('Thermal', {}).get('@odata.id', '')
if thermurl:
therminf = await self._do_web_request(thermurl)
for fan in therminf.get('Fans', []):
if 'Name' in fan:
self._varsensormap[fan['Name']] = {
'name': fan['Name'], 'type': 'Fan',
'url': thermurl}
for temp in therminf.get('Temperatures', []):
if 'Name' in temp:
self._varsensormap[temp['Name']] = {
'name': temp['Name'], 'type': 'Temperature',
'url': thermurl}
for subchassis in chassisinfo.get('Links', {}).get('Contains', []):
await self._mapchassissensors(subchassis)
async def _get_thermals(self, chassis):
chassisurl = chassis['@odata.id']
chassisinfo = await self._do_web_request(chassisurl)
thermurl = chassisinfo.get('Thermal', {}).get('@odata.id', '')
if thermurl:
therminf = await self._do_web_request(thermurl, cache=1)
return therminf.get('Temperatures', [])
async def get_bmcurl(self):
if not self._varbmcurl:
sysinfo = await self.sysinfo()
self._varbmcurl = sysinfo.get('Links', {}).get(
'ManagedBy', [{}])[0].get('@odata.id', None)
return self._varbmcurl
async def get_bmcnicurl(self):
if not self._varbmcnicurl:
self._varbmcnicurl = await self._get_bmc_nic_url()
return self._varbmcnicurl
async def list_network_interface_names(self):
bmcurl = await self.get_bmcurl()
bmcinfo = await self._do_web_request(bmcurl)
nicurl = bmcinfo.get('EthernetInterfaces', {}).get('@odata.id', None)
if not nicurl:
return
niclist = await self._do_web_request(nicurl)
for nic in niclist.get('Members', []):
curl = nic.get('@odata.id', None)
if not curl:
continue
yield curl.rsplit('/', 1)[1]
async def _get_bmc_nic_url(self, name=None):
bmcinfo = await self._do_web_request(await self.get_bmcurl())
nicurl = bmcinfo.get('EthernetInterfaces', {}).get('@odata.id', None)
niclist = await self._do_web_request(nicurl)
foundnics = 0
lastnicurl = None
oem = await self.oem()
for nic in niclist.get('Members', []):
curl = nic.get('@odata.id', None)
if not curl:
continue
if name is not None:
if curl.endswith('/{0}'.format(name)):
return curl
continue
if oem.hostnic and curl.endswith('/{0}'.format(
oem.hostnic)):
continue
nicinfo = await self._do_web_request(curl)
if nicinfo.get('Links', {}).get('HostInterface', None):
# skip host interface
continue
if not nicinfo.get('InterfaceEnabled', True):
# skip disabled interfaces
continue
for addrs in nicinfo.get('IPv4Addresses', []):
v4addr = socket.inet_pton(
socket.AF_INET, addrs.get('Address', '0.0.0.0'))
if self._bmcv4ip == v4addr:
return curl
for addrs in nicinfo.get('IPv6Addresses', []):
v6addr = socket.inet_pton(
socket.AF_INET6, addrs.get('Address', '::'))
if self._bmcv6ip == v6addr:
return curl
foundnics += 1
lastnicurl = curl
if name is None and foundnics != 1:
raise exc.PyghmiException(
'BMC does not have exactly one interface')
if name is None:
return lastnicurl
async def _bmcresetinfo(self):
if not self._varresetbmcurl:
bmcinfo = await self._do_web_request(await self.get_bmcurl())
resetinf = bmcinfo.get('Actions', {}).get('#Manager.Reset', {})
url = resetinf.get('target', '')
valid = resetinf.get('ResetType@Redfish.AllowableValues', [])
if not valid:
tmpurl = resetinf.get('@Redfish.ActionInfo', None)
if tmpurl:
resetinf = await self._do_web_request(tmpurl)
valid = resetinf.get('Parameters', [{}])[0].get(
'AllowableValues')
resettype = None
if 'GracefulRestart' in valid:
resettype = 'GracefulRestart'
elif 'ForceRestart' in valid:
resettype = 'ForceRestart'
elif 'ColdReset' in valid:
resettype = 'ColdReset'
self._varresetbmcurl = url, resettype
return self._varresetbmcurl
async def reset_bmc(self):
url, action = await self._bmcresetinfo()
if not url:
raise Exception('BMC does not provide reset action')
if not action:
raise Exception('BMC does not accept a recognized reset type')
await self._do_web_request(url, {'ResetType': action})
async def set_identify(self, on=True, blink=None):
oem = await self.oem()
if hasattr(oem, 'set_identify'):
return await oem.set_identify(on, blink)
targurl = self.sysurl
if not targurl:
root = await self._do_web_request('/redfish/v1')
systemsurl = root.get('Systems', {}).get('@odata.id', None)
if systemsurl:
targurl = await self._do_web_request(systemsurl)
if len(targurl.get('Members', [])) == 1:
targurl = targurl['Members'][0]['@odata.id']
if not targurl:
raise Exception("Unable to identify system url")
await self._do_web_request(
targurl,
{'IndicatorLED': 'Blinking' if blink else 'Lit' if on else 'Off'},
method='PATCH', etag='*')
_idstatemap = {
'Blinking': 'blink',
'Lit': 'on',
'Off': 'off',
}
async def get_identify(self):
await self.sysinfo()
ledstate = self.sysinfo
return {'identifystate': self._idstatemap[ledstate]}
async def get_health(self, verbose=True):
oem = await self.oem()
return await oem.get_health(self, verbose)
async def get_extended_bmc_configuration(self, hideadvanced=True):
oem = await self.oem()
return await oem.get_extended_bmc_configuration(self, hideadvanced=hideadvanced)
async def get_bmc_configuration(self):
"""Get miscellaneous BMC configuration
In much the same way a bmc can present arbitrary key-value
structure for BIOS/UEFI configuration, provide a mechanism
for a BMC to provide arbitrary key-value for BMC specific
settings.
"""
# For now, this is a stub, no implementation for redfish currently
oem = await self.oem()
return await oem.get_bmc_configuration()
async def set_bmc_configuration(self, changeset):
"""Get miscellaneous BMC configuration
In much the same way a bmc can present arbitrary key-value
structure for BIOS/UEFI configuration, provide a mechanism
for a BMC to provide arbitrary key-value for BMC specific
settings.
"""
# For now, this is a stub, no implementation for redfish currently
oem = await self.oem()
return await oem.set_bmc_configuration(changeset)
async def set_system_configuration(self, changeset):
oem = await self.oem()
return await oem.set_system_configuration(changeset, self)
async def get_ntp_enabled(self):
bmcinfo = await self._do_web_request(await self.get_bmcurl())
netprotocols = bmcinfo.get('NetworkProtocol', {}).get('@odata.id', None)
if netprotocols:
netprotoinfo = await self._do_web_request(netprotocols)
enabled = netprotoinfo.get('NTP', {}).get('ProtocolEnabled', False)
return enabled
return False
async def set_ntp_enabled(self, enable):
bmcinfo = await self._do_web_request(await self.get_bmcurl())
netprotocols = bmcinfo.get('NetworkProtocol', {}).get('@odata.id', None)
if netprotocols:
request = {'NTP':{'ProtocolEnabled': enable}}
await self._do_web_request(netprotocols, request, method='PATCH')
await self._do_web_request(netprotocols, cache=0)
async def get_ntp_servers(self):
bmcinfo = await self._do_web_request(await self.get_bmcurl())
netprotocols = bmcinfo.get('NetworkProtocol', {}).get('@odata.id', None)
if not netprotocols:
return []
netprotoinfo = await self._do_web_request(netprotocols)
return netprotoinfo.get('NTP', {}).get('NTPServers', [])
async def set_ntp_server(self, server, index=None):
bmcinfo = await self._do_web_request(await self.get_bmcurl())
netprotocols = bmcinfo.get('NetworkProtocol', {}).get('@odata.id', None)
currntpservers = await self.get_ntp_servers()
if index is None:
if server in currntpservers:
return
currntpservers = [server] + currntpservers
else:
if (index + 1) > len(currntpservers):
if not server:
return
currntpservers.append(server)
else:
if not server:
del currntpservers[index]
else:
currntpservers[index] = server
request = {'NTP':{'NTPServers': currntpservers}}
await self._do_web_request(netprotocols, request, method='PATCH')
await self._do_web_request(netprotocols, cache=0)
async def clear_bmc_configuration(self):
"""Reset BMC to factory default
Call appropriate function to clear BMC to factory default settings.
In many cases, this may render remote network access impracticle or
impossible."
"""
bmcinfo = await self._do_web_request(await self.get_bmcurl())
rc = bmcinfo.get('Actions', {}).get('#Manager.ResetToDefaults', {})
actinf = rc.get('ResetType@Redfish.AllowableValues', [])
if 'ResetAll' in actinf:
acturl = rc.get('target', None)
if acturl:
await self._do_web_request(acturl, {'ResetType': 'ResetAll'})
return
raise exc.UnsupportedFunctionality(
'Clear BMC configuration not supported on this platform')
async def get_system_configuration(self, hideadvanced=True):
oem = await self.oem()
return await oem.get_system_configuration(hideadvanced, self)
async def clear_system_configuration(self):
"""Clear the BIOS/UEFI configuration
"""
biosinfo = await self._do_web_request(await self.get_biosurl())
rb = biosinfo.get('Actions', {}).get('#Bios.ResetBios', {})
actinf = rb.get('@Redfish.ActionInfo', None)
rb = rb.get('target', '')
parms = {}
if actinf:
actinf = await self._do_web_request(
'/redfish/v1/Systems/Self/Bios/ResetBiosActionInfo')
for parm in actinf.get('Parameters', ()):
if parm.get('Required', False):
if parm.get('Name', None) == 'ResetType' and parm.get(
'AllowableValues', [None])[0] == 'Reset':
parms['ResetType'] = 'Reset'
else:
raise Exception(
'Unrecognized required parameter {0}'.format(
parm.get('Name', 'Unknown')))
if not rb:
raise Exception('BIOS reset not detected on this system')
if not parms:
parms = {'Action': 'Bios.ResetBios'}
await self._do_web_request(rb, parms)
async def set_net6_configuration(self, static_addresses=None, static_gateway=None, name=None):
patch = {}
if static_addresses is not None:
sa = []
patch['IPv6StaticAddresses'] = sa
for addr in static_addresses:
if '/' in addr:
addr, plen = addr.split('/', 1)
else:
plen = '64'
sa.append({
'PrefixLength': int(plen),
'Address': addr
})
if static_gateway:
patch['IPv6StaticDefaultGateways'] = [{
'Address': static_gateway,
}]
if patch:
nicurl = self._get_bmc_nic_url(name)
await self._do_web_request(nicurl, patch, 'PATCH')
async def set_net_configuration(self, ipv4_address=None, ipv4_configuration=None,
ipv4_gateway=None, vlan_id=None, name=None):
patch = {}
ipinfo = {}
dodhcp = None
netmask = None
if (ipv4_address is None and ipv4_configuration is None
and ipv4_gateway is None and vlan_id is None):
return
if ipv4_address:
if '/' in ipv4_address:
ipv4_address, cidr = ipv4_address.split('/')
netmask = _cidr_to_mask(int(cidr))
patch['IPv4StaticAddresses'] = [ipinfo]
ipinfo['Address'] = ipv4_address
ipv4_configuration = 'static'
if netmask:
ipinfo['SubnetMask'] = netmask
if ipv4_gateway:
patch['IPv4StaticAddresses'] = [ipinfo]
ipinfo['Gateway'] = ipv4_gateway
ipv4_configuration = 'static'
if ipv4_configuration and ipv4_configuration.lower() == 'dhcp':
dodhcp = True
patch['DHCPv4'] = {'DHCPEnabled': True}
elif (ipv4_configuration == 'static'
or 'IPv4StaticAddresses' in patch):
dodhcp = False
patch['DHCPv4'] = {'DHCPEnabled': False}
if vlan_id in ('off', 0, '0'):
patch['VLAN'] = {'VLANEnable': False}
elif vlan_id:
patch['VLAN'] = {'VLANEnable': True, 'VLANId': int(vlan_id)}
if patch:
nicurl = await self._get_bmc_nic_url(name)
try:
await self._do_web_request(nicurl, patch, 'PATCH')
except exc.RedfishError:
patch = {'IPv4Addresses': [ipinfo]}
if dodhcp:
ipinfo['AddressOrigin'] = 'DHCP'
elif dodhcp is not None:
ipinfo['AddressOrigin'] = 'Static'
await self._do_web_request(nicurl, patch, 'PATCH')
async def get_net6_configuration(self, name=None):
nicurl = await self._get_bmc_nic_url(name)
netcfg = await self._do_web_request(nicurl, cache=False)
retdata = {}
saddrs = netcfg.get('IPv6StaticAddresses', ())
retdata['static_addrs'] = []
for sa in saddrs:
ca = '{}/{}'.format(sa['Address'], sa['PrefixLength'])
retdata['static_addrs'].append(ca)
gws = netcfg.get('IPv6StaticDefaultGateways', None)
if gws:
for gw in gws:
retdata['static_gateway'] = gw['Address']
tagged = netcfg.get('VLAN', {}).get('VLANEnabled', False)
if tagged:
retdata['vlan_id'] = netcfg.get('VLAN', {}).get('VLANId', None)
else:
retdata['vlan_id'] = 'off'
return retdata
async def get_net_configuration(self, name=None):
nicurl = await self._get_bmc_nic_url(name)
netcfg = await self._do_web_request(nicurl, cache=False)
ipv4 = netcfg.get('IPv4Addresses', {})
if not ipv4:
raise exc.PyghmiException('Unable to locate network information')
retval = {}
if len(netcfg['IPv4Addresses']) != 1:
netcfg['IPv4Addresses'] = [
x for x in netcfg['IPv4Addresses']
if x['Address'] != '0.0.0.0']
if len(netcfg['IPv4Addresses']) != 1:
raise exc.PyghmiException('Multiple IP addresses not supported')
currip = netcfg['IPv4Addresses'][0]
cidr = _mask_to_cidr(currip['SubnetMask'])
retval['ipv4_address'] = '{0}/{1}'.format(currip['Address'], cidr)
retval['mac_address'] = netcfg['MACAddress']
hasgateway = _mask_to_cidr(currip['Gateway'])
retval['ipv4_gateway'] = currip['Gateway'] if hasgateway else None
retval['ipv4_configuration'] = currip['AddressOrigin']
tagged = netcfg.get('VLAN', {}).get('VLANEnable', False)
if tagged:
retval['vlan_id'] = netcfg.get('VLAN', {}).get('VLANId', None)
else:
retval['vlan_id'] = 'off'
return retval
async def get_hostname(self):
netcfg = await self._do_web_request(await self.get_bmcnicurl())
return netcfg['HostName']
async def set_hostname(self, hostname):
await self._do_web_request(await self.get_bmcnicurl(),
{'HostName': hostname}, 'PATCH')
async def get_firmware(self, components=(), category=None):
self._fwnamemap = {}
oem = await self.oem()
try:
async for firminfo in oem.get_firmware_inventory(components, self, category):
yield firminfo
except exc.BypassGenericBehavior:
return
fwlist = await self._do_web_request(await self.get_fwinventory())
fwurls = [x['@odata.id'] for x in fwlist.get('Members', [])]
async for res in self._do_bulk_requests(fwurls):
res = self._extract_fwinfo(res)
if res[0] is None:
continue
yield res
def _extract_fwinfo(self, inf):
currinf = self._oem._extract_fwinfo(inf)
fwi, url = inf
fwname = fwi.get('Name', 'Unknown')
if fwname in self._fwnamemap:
fwname = fwi.get('Id', fwname)
if fwname in self._fwnamemap:
# Block duplicates for by name retrieval
self._fwnamemap[fwname] = None
else:
self._fwnamemap[fwname] = url
currinf['name'] = fwname
currinf['id'] = fwi.get('Id', None)
currinf['version'] = fwi.get('Version', 'Unknown')
currinf['date'] = parse_time(fwi.get('ReleaseDate', ''))
currinf['software_id'] = fwi.get('SoftwareId', '')
if not (currinf['version'] or currinf['date']):
return None, None
# TODO(Jarrod Johnson): OEM extended data with buildid
currstate = fwi.get('Status', {}).get('State', 'Unknown')
if currstate == 'StandbyOffline':
currinf['state'] = 'pending'
elif currstate == 'Enabled':
currinf['state'] = 'active'
elif currstate == 'StandbySpare':
currinf['state'] = 'backup'
return fwname, currinf
async def get_inventory_descriptions(self, withids=False):
oem = await self.oem()
async for desc in oem.get_inventory_descriptions(withids):
yield desc
async def get_inventory_of_component(self, component):
oem = await self.oem()
return await oem.get_inventory_of_component(component)
async def get_inventory(self, withids=False):
oem = await self.oem()
async for x in oem.get_inventory(withids):
yield x
async def get_location_information(self):
locationinfo = {}
sysinfo = await self.sysinfo()
for chassis in sysinfo.get('Links', {}).get('Chassis', []):
chassisurl = chassis['@odata.id']
data = await self._do_web_request(chassisurl)
locdata = data.get('Location', {})
postaladdress = locdata.get('PostalAddress', {})
placement = locdata.get('Placement', {})
contactinfo = locdata.get('Contacts', [])
currval = postaladdress.get('Room', '')
if currval:
locationinfo['room'] = currval
currval = postaladdress.get('Location', '')
if currval:
locationinfo['location'] = currval
currval = postaladdress.get('Building', '')
if currval:
locationinfo['building'] = currval
currval = placement.get('Rack', '')
if currval:
locationinfo['rack'] = currval
for contact in contactinfo:
contact = contact.get('ContactName', '')
if not contact:
continue
if 'contactnames' not in locationinfo:
locationinfo['contactnames'] = [contact]
else:
locationinfo['contactnames'].append(contact)
return locationinfo
async def set_location_information(self, room=None, contactnames=None,
location=None, building=None, rack=None):
locationinfo = {}
postaladdress = {}
placement = {}
if contactnames is not None:
locationinfo['Contacts'] = [
{'ContactName': x} for x in contactnames]
if room is not None:
postaladdress['Room'] = room
if location is not None:
postaladdress['Location'] = location
if building is not None:
postaladdress['Building'] = building
if rack is not None:
placement['Rack'] = rack
if postaladdress:
locationinfo['PostalAddress'] = postaladdress
if placement:
locationinfo['Placement'] = placement
if locationinfo:
sysinfo = await self.sysinfo()
for chassis in sysinfo.get('Links', {}).get('Chassis', []):
chassisurl = chassis['@odata.id']
await self._do_web_request(chassisurl, {'Location': locationinfo},
method='PATCH')
async def oem(self):
if not self._oem:
if self.sysurl:
await self._do_web_request(self.sysurl, cache=False) # This is to trigger token validation and renewel
elif self._varbmcurl:
await self._do_web_request(self._varbmcurl, cache=False) # This is to trigger token validation and renewel
sysinfo = await self.sysinfo()
self._oem = await oem.get_oem_handler(
sysinfo, self._initsysurl, self.wc, self._urlcache, self)
self._oem.set_credentials(self.username, self.password)
return self._oem
async def get_description(self):
oem = await self.oem()
return await oem.get_description(self)
async def get_event_log(self, clear=False):
oem = await self.oem()
async for logent in oem.get_event_log(clear, self):
yield logent
async def _get_chassis_env(self, chassis):
chassisurl = chassis['@odata.id']
chassisinfo = await self._do_web_request(chassisurl)
envurl = chassisinfo.get('EnvironmentMetrics', {}).get('@odata.id', '')
if not envurl:
return {}
envmetric = await self._do_web_request(envurl, cache=1)
retval = {
'watts': envmetric.get('PowerWatts', {}).get('Reading', None),
'inlet': envmetric.get('TemperatureCelsius', {}).get('Reading', None)
}
return retval
async def get_average_processor_temperature(self):
oem = await self.oem()
return await oem.get_average_processor_temperature(self)
async def get_system_power_watts(self):
oem = await self.oem()
return await oem.get_system_power_watts(self)
async def get_inlet_temperature(self):
inlets = []
sysinfo = await self.sysinfo()
for chassis in sysinfo.get('Links', {}).get('Chassis', []):
envinfo = await self._get_chassis_env(chassis)
currinlet = envinfo.get('inlet', None)
if currinlet:
inlets.append(currinlet)
if inlets:
val = sum(inlets) / len(inlets)
unavail = False
else:
val = None
unavail = True
return SensorReading(
None, {'name': 'Inlet Temperature'}, value=val, units='°C',
unavailable=unavail)
async def get_sensor_descriptions(self):
for sensor in natural_sort(await self._sensormap()):
yield (await self._sensormap())[sensor]
async def get_sensor_reading(self, sensorname):
sensormap = await self._sensormap()
if sensorname not in sensormap:
raise Exception('Sensor not found')
sensor = sensormap[sensorname]
reading = await self._do_web_request(sensor['url'], cache=1)
return self._extract_reading(sensor, reading)
async def get_sensor_data(self):
for sensor in natural_sort(await self._sensormap()):
yield await self.get_sensor_reading(sensor)
def _extract_reading(self, sensor, reading):
if sensor.get('generic', False): # generic sensor
val = reading.get('Reading', None)
unavail = val is None
units = reading.get('ReadingUnits', None)
if units == 'Cel':
units = '°C'
if units == 'cft_i/min':
units = 'CFM'
return SensorReading(reading, None, value=val, units=units,
unavailable=unavail)
if sensor['type'] == 'Fan':
for fan in reading['Fans']:
if fan['Name'] == sensor['name']:
val = fan.get('Reading', None)
unavail = val is None
units = fan.get('ReadingUnits', None)
return SensorReading(
None, sensor, value=val, units=units,
unavailable=unavail)
elif sensor['type'] == 'Temperature':
for temp in reading['Temperatures']:
if temp['Name'] == sensor['name']:
val = temp.get('ReadingCelsius', None)
unavail = val is None
return SensorReading(
None, sensor, value=val, units='°C',
unavailable=unavail)
elif sensor['type'] == 'Voltage':
for volt in reading['Voltages']:
if volt['Name'] == sensor['name']:
val = volt.get('ReadingVolts', None)
unavail = val is None
return SensorReading(
None, sensor, value=val, units='V',
unavailable=unavail)
async def list_media(self):
oem = await self.oem()
async for res in oem.list_media(self):
yield res
async def get_storage_configuration(self):
""""Get storage configuration data
Retrieves the storage configuration from the target. Data is given
about disks, pools, and volumes. When referencing something, use the
relevant 'cfgpath' attribute to describe it. It is not guaranteed that
cfgpath will be consistent version to version, so a lookup is suggested
in end user applications.
:return: A aiohmi.storage.ConfigSpec object describing current config
"""
oem = await self.oem()
return await oem.get_storage_configuration()
async def remove_storage_configuration(self, cfgspec):
"""Remove specified storage configuration from controller.
:param cfgspec: A aiohmi.storage.ConfigSpec describing what to remove
:return:
"""
oem = await self.oem()
return await oem.remove_storage_configuration(cfgspec)
async def apply_storage_configuration(self, cfgspec=None):
"""Evaluate a configuration for validity
This will check if configuration is currently available and, if given,
whether the specified cfgspec can be applied.
:param cfgspec: A aiohmi.storage.ConfigSpec describing desired oonfig
:return:
"""
oem = await self.oem()
return await oem.apply_storage_configuration(cfgspec)
def check_storage_configuration(self, cfgspec=None):
"""Evaluate a configuration for validity
This will check if configuration is currently available and, if given,
whether the specified cfgspec can be applied.
:param cfgspec: A aiohmi.storage.ConfigSpec describing desired oonfig
:return:
"""
return self._oem.check_storage_configuration(cfgspec)
async def attach_remote_media(self, url, username=None, password=None):
"""Attach remote media by url
Given a url, attach remote media (cd/usb image) to the target system.
:param url: URL to indicate where to find image (protocol support
varies by BMC)
:param username: Username for endpoint to use when accessing the URL.
If applicable, 'domain' would be indicated by '@' or
'\' syntax.
:param password: Password for endpoint to use when accessing the URL.
"""
# At the moment, there isn't a viable way to
# identify the correct resource ahead of time.
# As such it's OEM specific until the standard
# provides a better way.
vmurls = []
sysinfo = await self.sysinfo()
vmcoll = sysinfo.get(
'VirtualMedia', {}).get('@odata.id', None)
if not vmcoll:
bmcinfo = await self.bmcinfo()
vmcoll = bmcinfo.get(
'VirtualMedia', {}).get('@odata.id', None)
if vmcoll:
vmlist = await self._do_web_request(vmcoll)
vmurls = [x['@odata.id'] for x in vmlist.get('Members', [])]
suspendedxauth = False # Don't trigger token expiry linked unmount
if 'X-Auth-Token' in self.wc.stdheaders:
suspendedxauth = True
del self.wc.stdheaders['X-Auth-Token']
self.wc.set_basic_credentials(self.username, self.password)
oem = await self.oem()
try:
await oem.attach_remote_media(url, username, password, vmurls)
except exc.BypassGenericBehavior:
if suspendedxauth:
self.wc.stdheaders['X-Auth-Token'] = self.xauthtoken
if 'Authorization' in self.wc.stdheaders:
del self.wc.stdheaders['Authorization']
return
for vmurl in vmurls:
vminfo = self._do_web_request(vmurl, cache=False)
if vminfo.get('ConnectedVia', None) != 'NotConnected':
continue
inserturl = vminfo.get(
'Actions', {}).get(
'#VirtualMedia.InsertMedia', {}).get('target', None)
if inserturl:
self._do_web_request(inserturl, {'Image': url})
else:
try:
self._do_web_request(vmurl,
{'Image': url, 'Inserted': True},
'PATCH')
except exc.RedfishError as re:
if re.msgid.endswith(u'PropertyUnknown'):
self._do_web_request(vmurl, {'Image': url}, 'PATCH')
else:
raise
break
if suspendedxauth:
self.wc.stdheaders['X-Auth-Token'] = self.xauthtoken
if 'Authorization' in self.wc.stdheaders:
del self.wc.stdheaders['Authorization']
async for res in oem.list_media(self, cache=False):
pass
async def detach_remote_media(self):
oem = await self.oem()
try:
await oem.detach_remote_media()
except exc.BypassGenericBehavior:
return
sysinfo = await self.sysinfo()
vmcoll = sysinfo.get('VirtualMedia', {}).get('@odata.id', None)
if not vmcoll:
bmcinfo = await self._do_web_request(await self.get_bmcurl())
vmcoll = bmcinfo.get('VirtualMedia', {}).get('@odata.id', None)
if vmcoll:
vmlist = await self._do_web_request(vmcoll)
vmurls = [x['@odata.id'] for x in vmlist.get('Members', [])]
async for vminfo in self._do_bulk_requests(vmurls):
vminfo, currl = vminfo
if vminfo['Image']:
ejurl = vminfo.get(
'Actions', {}).get(
'#VirtualMedia.EjectMedia', {}).get('target', None)
if ejurl:
await self._do_web_request(ejurl, {})
else:
try:
await self._do_web_request(currl,
{'Image': None,
'Inserted': False},
method='PATCH')
except exc.RedfishError as re:
if re.msgid.endswith(u'PropertyUnknown'):
await self._do_web_request(currl, {'Image': None},
method='PATCH')
else:
raise
oem = await self.oem()
async for res in oem.list_media(self, cache=False):
pass
async def upload_media(self, filename, progress=None, data=None):
"""Upload a file to be hosted on the target BMC
This will upload the specified data to
the BMC so that it will make it available to the system as an emulated
USB device.
:param filename: The filename to use, the basename of the parameter
will be given to the bmc.
:param progress: Optional callback for progress updates
"""
oem = await self.oem()
return await oem.upload_media(filename, progress, data)
async def get_update_status(self):
oem = await self.oem()
return await oem.get_update_status()
async def update_firmware(self, file, data=None, progress=None, bank=None):
"""Send file to BMC to perform firmware update
:param filename: The filename to upload to the target BMC
:param data: The payload of the firmware. Default is to read from
specified filename.
:param progress: A callback that will be given a dict describing
update process. Provide if
:param bank: Indicate a target 'bank' of firmware if supported
"""
if progress is None:
progress = lambda x: True
oem = await self.oem()
return await oem.update_firmware(file, data, progress, bank)
async def get_diagnostic_data(self, savefile, progress=None, autosuffix=False):
if os.path.exists(savefile) and not os.path.isdir(savefile):
raise exc.InvalidParameterValue(
'Not allowed to overwrite existing file: {0}'.format(
savefile))
oem = await self.oem()
return await oem.get_diagnostic_data(savefile, progress, autosuffix)
async def get_licenses(self):
oem = await self.oem()
async for lic in oem.get_licenses(self):
yield lic
async def delete_license(self, name):
oem = await self.oem()
return await oem.delete_license(name, self)
async def save_licenses(self, directory):
if os.path.exists(directory) and not os.path.isdir(directory):
raise exc.InvalidParameterValue(
'Not allowed to overwrite existing file: {0}'.format(
directory))
oem = await self.oem()
async for lic in oem.save_licenses(directory, self):
yield lic
async def apply_license(self, filename, progress=None, data=None):
oem = await self.oem()
return await oem.apply_license(filename, self, progress, data)
if __name__ == '__main__':
print(repr(
Command(sys.argv[1], os.environ['BMCUSER'], os.environ['BMCPASS'],
verifycallback=lambda x: True).get_power()))