diff --git a/pyghmi/redfish/command.py b/pyghmi/redfish/command.py index 91399a45..21c6a556 100644 --- a/pyghmi/redfish/command.py +++ b/pyghmi/redfish/command.py @@ -239,6 +239,16 @@ class Command(object): okroles.add('ReadOnly') return okroles + def get_trusted_cas(self): + for ca in self.oem.get_trusted_cas(): + yield ca + + def add_trusted_ca(self, pemdata): + return self.oem.add_trusted_ca(pemdata) + + def del_trusted_ca(self, certid): + return self.oem.del_trusted_ca(certid) + def get_users(self): """get list of users and channel access information (helper) diff --git a/pyghmi/redfish/oem/generic.py b/pyghmi/redfish/oem/generic.py index 169e7db6..6aac621a 100644 --- a/pyghmi/redfish/oem/generic.py +++ b/pyghmi/redfish/oem/generic.py @@ -23,6 +23,7 @@ from datetime import timedelta from dateutil import tz import socket import time +import uuid import pyghmi.constants as const import pyghmi.exceptions as exc @@ -31,6 +32,27 @@ import pyghmi.util.webclient as webclient from pyghmi.util.parse import parse_time +def _pem_to_dict(pemdata, uefi=False): + """Pull PEM into a dict + + Accepts a file-like or a string or bytes. + + A dict with the PEM as a value for CertificateString is created. + If uefi, then "UefiSignatureOwner" is also created with a random GUID. + This is how redfish expects certificate information for CAs to be provided for + UEFI and for itself. + """ + if hasattr(pemdata, 'read'): + pemdata = pemdata.read() + if isinstance(pemdata, bytes): + pemdata = pemdata.decode('utf-8') + cert_dict = { + 'CertificateString': pemdata, + 'CertificateType': 'PEM', + } + if uefi: + cert_dict['UefiSignatureOwner'] = str(uuid.uuid4()) + return cert_dict class SensorReading(object): def __init__(self, healthinfo, sensor=None, value=None, units=None, @@ -192,6 +214,7 @@ class OEMHandler(object): self._gpool = gpool self._varsysinfo = sysinfo self._varsysurl = sysurl + self._varbmcurl = None self._urlcache = cache self.webclient = webclient self._hwnamemap = {} @@ -285,6 +308,66 @@ class OEMHandler(object): cputemps.append(temp) return cputemps + @property + def _bmcurl(self): + if not self._varbmcurl: + self._varbmcurl = self.sysinfo.get('Links', {}).get( + 'ManagedBy', [{}])[0].get('@odata.id', None) + return self._varbmcurl + + @property + def sysinfo(self): + return self._do_web_request(self._varsysurl) + + def add_trusted_ca(self, pemdata): + mgrinfo = self._do_web_request(self._bmcurl) + secpolicy = mgrinfo.get('SecurityPolicy', {}).get('@odata.id', None) + if secpolicy: + secinfo = self._do_web_request(secpolicy) + certcoll = secinfo.get('TLS', {}).get('Client', {}).get('TrustedCertificates', {}).get('@odata.id', None) + if certcoll: + certpayload = _pem_to_dict(pemdata) + self._do_web_request(certcoll, certpayload) + return True + raise exc.PyghmiException('Platform does not support adding trusted CAs') + + def del_trusted_ca(self, certid): + mgrinfo = self._do_web_request(self._bmcurl) + secpolicy = mgrinfo.get('SecurityPolicy', {}).get('@odata.id', None) + if secpolicy: + secinfo = self._do_web_request(secpolicy) + certcoll = secinfo.get('TLS', {}).get('Client', {}).get('TrustedCertificates', {}).get('@odata.id', None) + if certcoll: + certs = self._get_expanded_data(certcoll) + certs = certs.get('Members', []) + for cert in certs: + if cert.get('Id', '') == certid: + self._do_web_request(cert['@odata.id'], method='DELETE') + return True + raise exc.PyghmiException(f'No such certificate found: {certid}') + + def get_trusted_cas(self): + mgrinfo = self._do_web_request(self._bmcurl) + secpolicy = mgrinfo.get('SecurityPolicy', {}).get('@odata.id', None) + if secpolicy: + secinfo = self._do_web_request(secpolicy) + certcoll = secinfo.get('TLS', {}).get('Client', {}).get('TrustedCertificates', {}).get('@odata.id', None) + if certcoll: + certs = self._get_expanded_data(certcoll) + certs = certs.get('Members', []) + for cert in certs: + certdesc = { + 'id': cert.get('Id', ''), + 'name': cert.get('Name', ''), + 'pem': cert.get('CertificateString', None), + 'subject': cert.get('Subject', {}).get('CommonName', ''), + 'sans': cert.get('Subject', {}).get('AlternativeNames', []), + 'issuer': cert.get('Issuer', {}).get('CommonName', ''), + 'validfrom': cert.get('ValidNotBefore', ''), + 'validto': cert.get('ValidNotAfter', ''), + } + yield certdesc + def get_event_log(self, clear=False, fishclient=None, extraurls=[]): bmcinfo = self._do_web_request(fishclient._bmcurl) lsurl = bmcinfo.get('LogServices', {}).get('@odata.id', None)