2
0
mirror of https://opendev.org/x/pyghmi synced 2026-01-11 18:52:33 +00:00

Manage BMC certificates

Provide means to get a CSR and install a certificate.

Further, update the webclient to use
python 3.3+ mechanisms, as newest
python removes the compatibility with the old way.

Change-Id: I79ca9408920687cbb56bde5b01a01d166c0a3c02
This commit is contained in:
Jarrod Johnson
2025-10-24 20:04:26 -04:00
parent 3a3734fea8
commit fab4d8e700
3 changed files with 154 additions and 1 deletions

View File

@@ -243,6 +243,14 @@ class Command(object):
for ca in self.oem.get_trusted_cas():
yield ca
def get_bmc_csr(self, keytype=None, keylength=None, cn=None, city=None,
state=None, country=None, org=None, orgunit=None):
return self.oem.get_bmc_csr(
keytype=keytype, keylength=keylength, cn=cn)
def install_bmc_certificate(self, certdata):
return self.oem.install_bmc_certificate(certdata)
def add_trusted_ca(self, pemdata):
return self.oem.add_trusted_ca(pemdata)

View File

@@ -330,6 +330,148 @@ class OEMHandler(object):
def sysinfo(self):
return self._do_web_request(self._varsysurl)
def get_bmc_csr(self, keytype=None, keylength=None, cn=None, city=None,
state=None, country=None, org=None, orgunit=None):
# A fun time here, the redfish specification is weird about this.
# We have a certificateservice, sounds good, and an action to generate a CSR,
# straightforward enough, but you have to indicate a certificate collection...
# We get a list of locations, so we have to infer the collection, which
# is perhaps odd, but a relatively safe bet.
# However, the purpose of the certificates is opaque, so we can only guess
# based on strings in the url if there is ambiguity.
rootinfo = self._do_web_request('/redfish/v1/')
certserviceurl = rootinfo.get('CertificateService', {}).get('@odata.id', None)
if not certserviceurl:
raise exc.PyghmiException('No CertificateService found on platform')
certservice = self._do_web_request(certserviceurl)
gencsractinfo = certservice.get('Actions', {}).get("#CertificateService.GenerateCSR", {})
curveids = gencsractinfo.get('KeyCurveId@Redfish.AllowableValues', [])
keylens = gencsractinfo.get('KeyBitLength@Redfish.AllowableValues', [])
keypairalgorithms = gencsractinfo.get('KeyPairAlgorithm@Redfish.AllowableValues', [])
selectedcurve = None
selectedkeylen = None
selectedkpa = None
if not keytype:
for kpa in keypairalgorithms:
if 'ECDH' in kpa:
keytype = 'ECC'
selectedkpa = kpa
break
if 'RSA' in kpa:
selectedkpa = kpa
keytype = 'RSA'
if not keytype:
raise exc.PyghmiException('No valid key type found for CSR generation')
if keytype.upper() in ('ECC', 'ECDSA'):
if not curveids:
raise exc.PyghmiException('No valid curves found for ECC/ECDSA key type')
if keylength:
for curve in curveids:
if fnmatch(curve, '*{0}'.format(keylength)):
selectedcurve = curve
break
else:
selectedcurve = curveids[-1]
elif keytype.upper() == 'RSA':
if not keylens:
raise exc.PyghmiException('No valid key lengths found for RSA key type')
if keylength:
allkeylens = []
for klp in keylens:
if isinstance(klp, int):
allkeylens.append(klp)
continue
for kl in klp.split(':'):
allkeylens.append(int(kl))
if keylength not in allkeylens:
raise exc.PyghmiException('Requested key length {0} not supported'.format(keylength))
selectedkeylen = keylength
gencsrtarg = gencsractinfo.get('target', None)
certcoll = self.get_certificate_collection(certservice)
payload = {
'CertificateCollection': {"@odata.id": certcoll},
'City': city or 'Unspecified',
'CommonName': cn or self.webclient.thehost,
'Country': country or 'AQ', # Need *a* valid two letter country code, Antarctica is more equally likely to be wrong than most.
'Organization': org or 'Unspecified',
'State': state or 'Unspecified',
}
if orgunit:
payload['OrganizationalUnit'] = orgunit
if selectedcurve:
payload['KeyCurveId'] = selectedcurve
elif selectedkeylen:
payload['KeyLength'] = selectedkeylen
if selectedkpa:
payload['KeyPairAlgorithm'] = selectedkpa
rsp = self._do_web_request(gencsrtarg, payload)
csr = rsp.get('CSRString', None)
return csr
def get_certificate_collection(self, certservice):
certcollections = set([])
certlocs = certservice.get('CertificateLocations', {}).get('@odata.id', None)
if certlocs:
certlocdata = self._do_web_request(certlocs)
for cert in certlocdata.get('Links', {}).get('Certificates', []):
certurl = cert.get('@odata.id', None)
if not certurl:
continue
# we need to remove the last part of url to get collection
collurl = '/'.join(certurl.split('/')[:-1])
certcollections.add(collurl)
if len(certcollections) == 0:
raise exc.PyghmiException('No certificate collections found for certificate operation')
if len(certcollections) > 1:
for candcoll in list(certcollections):
if 'TrustedCertificates' in candcoll: # likely a CA store
certcollections.discard(candcoll)
elif 'LDAP' in candcoll: # certificate for LDAP server
certcollections.discard(candcoll)
elif 'KMIP' in candcoll: # not for TLS
certcollections.discard(candcoll)
if len(certcollections) > 1:
raise exc.PyghmiException('Multiple certificate collections found, unable to infer intended target for certificate operation')
certcoll = list(certcollections)[0]
return certcoll
def install_bmc_certificate(self, certdata):
rootinfo = self._do_web_request('/redfish/v1/')
certserviceurl = rootinfo.get('CertificateService', {}).get('@odata.id', None)
if not certserviceurl:
raise exc.PyghmiException('No CertificateService found on platform')
certservice = self._do_web_request(certserviceurl)
certlocs = certservice.get('CertificateLocations', {}).get('@odata.id', None)
if not certlocs:
raise exc.PyghmiException('No CertificateLocations found on platform')
certlocdata = self._do_web_request(certlocs)
allcerts = set([])
for certloc in certlocdata.get('Links', {}).get('Certificates', []):
certurl = certloc.get('@odata.id', None)
if not certurl:
continue
allcerts.add(certurl)
if len(allcerts) == 0:
raise exc.PyghmiException('No Certificates found on platform')
elif len(allcerts) > 1:
# try to narrow down to server cert
for certurl in list(allcerts):
if 'TrustedCertificates' in certurl:
allcerts.discard(certurl)
elif 'LDAP' in certurl:
allcerts.discard(certurl)
elif 'KMIP' in certurl:
allcerts.discard(certurl)
if len(allcerts) > 1:
raise exc.PyghmiException('Multiple Certificates found, unable to infer intended target for certificate installation')
targcerturl = list(allcerts)[0]
replacecerturl = certservice.get('Actions', {}).get(
'#CertificateService.ReplaceCertificate', {}).get('target', None)
certpayload = _pem_to_dict(certdata)
certpayload['CertificateUri'] = {'@odata.id': targcerturl}
#/redfish/v1/CertificateService/Actions/CertificateService.ReplaceCertificate
self._do_web_request(replacecerturl, certpayload)
def add_trusted_ca(self, pemdata):
mgrinfo = self._do_web_request(self._bmcurl)
secpolicy = mgrinfo.get('SecurityPolicy', {}).get('@odata.id', None)

View File

@@ -205,7 +205,10 @@ class SecureHTTPConnection(httplib.HTTPConnection, object):
pass
plainsock.connect(addrinfo[4])
if self._certverify:
self.sock = ssl.wrap_socket(plainsock, cert_reqs=self.cert_reqs)
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self.sock = ctx.wrap_socket(plainsock)
bincert = self.sock.getpeercert(binary_form=True)
if not self._certverify(bincert):
raise pygexc.UnrecognizedCertificate('Unknown certificate',