2
0
mirror of https://opendev.org/x/pyghmi synced 2026-01-10 02:02:30 +00:00

Add Manager CA management

Some Managers provide
a mechanism to manipulate certificate authorities.

Wire this up with some function calls.

Change-Id: I035eeea92ccc80c0163677cf8b8e679c11141bab
This commit is contained in:
Jarrod Johnson
2025-10-10 07:42:37 -04:00
parent 33e6212152
commit bea7af51e7
2 changed files with 93 additions and 0 deletions

View File

@@ -239,6 +239,16 @@ class Command(object):
okroles.add('ReadOnly')
return okroles
def get_trusted_cas(self):
for ca in self.oem.get_trusted_cas():
yield ca
def add_trusted_ca(self, pemdata):
return self.oem.add_trusted_ca(pemdata)
def del_trusted_ca(self, certid):
return self.oem.del_trusted_ca(certid)
def get_users(self):
"""get list of users and channel access information (helper)

View File

@@ -23,6 +23,7 @@ from datetime import timedelta
from dateutil import tz
import socket
import time
import uuid
import pyghmi.constants as const
import pyghmi.exceptions as exc
@@ -31,6 +32,27 @@ import pyghmi.util.webclient as webclient
from pyghmi.util.parse import parse_time
def _pem_to_dict(pemdata, uefi=False):
"""Pull PEM into a dict
Accepts a file-like or a string or bytes.
A dict with the PEM as a value for CertificateString is created.
If uefi, then "UefiSignatureOwner" is also created with a random GUID.
This is how redfish expects certificate information for CAs to be provided for
UEFI and for itself.
"""
if hasattr(pemdata, 'read'):
pemdata = pemdata.read()
if isinstance(pemdata, bytes):
pemdata = pemdata.decode('utf-8')
cert_dict = {
'CertificateString': pemdata,
'CertificateType': 'PEM',
}
if uefi:
cert_dict['UefiSignatureOwner'] = str(uuid.uuid4())
return cert_dict
class SensorReading(object):
def __init__(self, healthinfo, sensor=None, value=None, units=None,
@@ -192,6 +214,7 @@ class OEMHandler(object):
self._gpool = gpool
self._varsysinfo = sysinfo
self._varsysurl = sysurl
self._varbmcurl = None
self._urlcache = cache
self.webclient = webclient
self._hwnamemap = {}
@@ -285,6 +308,66 @@ class OEMHandler(object):
cputemps.append(temp)
return cputemps
@property
def _bmcurl(self):
if not self._varbmcurl:
self._varbmcurl = self.sysinfo.get('Links', {}).get(
'ManagedBy', [{}])[0].get('@odata.id', None)
return self._varbmcurl
@property
def sysinfo(self):
return self._do_web_request(self._varsysurl)
def add_trusted_ca(self, pemdata):
mgrinfo = self._do_web_request(self._bmcurl)
secpolicy = mgrinfo.get('SecurityPolicy', {}).get('@odata.id', None)
if secpolicy:
secinfo = self._do_web_request(secpolicy)
certcoll = secinfo.get('TLS', {}).get('Client', {}).get('TrustedCertificates', {}).get('@odata.id', None)
if certcoll:
certpayload = _pem_to_dict(pemdata)
self._do_web_request(certcoll, certpayload)
return True
raise exc.PyghmiException('Platform does not support adding trusted CAs')
def del_trusted_ca(self, certid):
mgrinfo = self._do_web_request(self._bmcurl)
secpolicy = mgrinfo.get('SecurityPolicy', {}).get('@odata.id', None)
if secpolicy:
secinfo = self._do_web_request(secpolicy)
certcoll = secinfo.get('TLS', {}).get('Client', {}).get('TrustedCertificates', {}).get('@odata.id', None)
if certcoll:
certs = self._get_expanded_data(certcoll)
certs = certs.get('Members', [])
for cert in certs:
if cert.get('Id', '') == certid:
self._do_web_request(cert['@odata.id'], method='DELETE')
return True
raise exc.PyghmiException(f'No such certificate found: {certid}')
def get_trusted_cas(self):
mgrinfo = self._do_web_request(self._bmcurl)
secpolicy = mgrinfo.get('SecurityPolicy', {}).get('@odata.id', None)
if secpolicy:
secinfo = self._do_web_request(secpolicy)
certcoll = secinfo.get('TLS', {}).get('Client', {}).get('TrustedCertificates', {}).get('@odata.id', None)
if certcoll:
certs = self._get_expanded_data(certcoll)
certs = certs.get('Members', [])
for cert in certs:
certdesc = {
'id': cert.get('Id', ''),
'name': cert.get('Name', ''),
'pem': cert.get('CertificateString', None),
'subject': cert.get('Subject', {}).get('CommonName', ''),
'sans': cert.get('Subject', {}).get('AlternativeNames', []),
'issuer': cert.get('Issuer', {}).get('CommonName', ''),
'validfrom': cert.get('ValidNotBefore', ''),
'validto': cert.get('ValidNotAfter', ''),
}
yield certdesc
def get_event_log(self, clear=False, fishclient=None, extraurls=[]):
bmcinfo = self._do_web_request(fishclient._bmcurl)
lsurl = bmcinfo.get('LogServices', {}).get('@odata.id', None)