2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-01-11 02:22:31 +00:00

Add facility to manage BMC CA certs

For redfish at least, we can manage
some BMC CA certificate trust stores.
This commit is contained in:
Jarrod Johnson
2025-10-14 14:30:27 -04:00
parent a73dced80b
commit c5896c056e
4 changed files with 134 additions and 0 deletions

View File

@@ -0,0 +1,89 @@
#!/usr/bin/python3
import os
import sys
from cryptography import x509
from cryptography.hazmat.primitives import hashes
path = os.path.dirname(os.path.realpath(__file__))
path = os.path.realpath(os.path.join(path, '..', 'lib', 'python'))
if path.startswith('/opt'):
sys.path.append(path)
import confluent.client as client
def removebmccacert(noderange, certid, cmd):
for res in cmd.delete(f'/noderange/{noderange}/configuration/management_controller/certificate_authorities/{certid}'):
print(repr(res))
def listbmccacerts(noderange, cmd):
certids = []
for res in cmd.read(f'/noderange/{noderange}/configuration/management_controller/certificate_authorities'):
certids.append(res.get('item', {}).get('href', ''))
for certid in certids:
for res in cmd.read(f'/noderange/{noderange}/configuration/management_controller/certificate_authorities/{certid}'):
for node in res.get('databynode', {}):
certdata = res['databynode'][node].get('pem', {}).get('value', '')
summary = ''
if not certdata:
continue
san = res['databynode'][node].get('san', {}).get('value', '')
if san:
summary += f" SANs: {san}"
subject = res['databynode'][node].get('subject', {}).get('value', '')
if subject:
summary = subject
try:
cert = x509.load_pem_x509_certificate(certdata.encode())
sha256 = cert.fingerprint(hashes.SHA256()).hex().upper()
except Exception as e:
print(f"Error processing certificate for {node}: {e}", file=sys.stderr)
continue
summary += f" (SHA256={sha256})"
print(f"{node}: {certid}: {summary}")
def installbmccacert(noderange, certfile, cmd):
if certfile:
try:
with open(certfile, 'r') as f:
certdata = f.read()
except Exception as e:
print(f"Error reading certificate file: {e}", file=sys.stderr)
sys.exit(1)
# Simple validation: check if it starts and ends with the correct PEM markers
if not (certdata.startswith("-----BEGIN CERTIFICATE-----") and certdata.strip().endswith("-----END CERTIFICATE-----")):
print("Invalid certificate format. Must be a PEM encoded certificate.", file=sys.stderr)
sys.exit(1)
payload = {'pem': certdata}
for res in cmd.update(f'/noderange/{noderange}/configuration/management_controller/certificate_authorities', payload):
print(repr(res))
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Node certificate utility')
parser.add_argument('noderange', help='Node range to operate on')
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# installbmccacert subcommand
install_parser = subparsers.add_parser('installbmccacert', help='Install BMC CA certificate')
install_parser.add_argument('filename', help='Certificate file to install')
remove_parser = subparsers.add_parser('removebmccacert', help='Remove BMC CA certificate')
remove_parser.add_argument('id', help='Certificate id to remove')
list_parser = subparsers.add_parser('listbmccacerts', help='List BMC CA certificates')
args = parser.parse_args()
c = client.Command()
if args.command == 'installbmccacert':
installbmccacert(args.noderange, args.filename, c)
elif args.command == 'removebmccacert':
removebmccacert(args.noderange, args.id, c)
elif args.command == 'listbmccacerts':
listbmccacerts(args.noderange, c)
else:
parser.print_help()
sys.exit(1)

View File

@@ -300,6 +300,10 @@ def _init_core():
'default': 'ipmi',
}),
},
'certificate_authorities': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'clear': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',

View File

@@ -517,6 +517,8 @@ def get_input_message(path, operation, inputdata, nodes=None, multinode=False,
path[:4] == ['configuration', 'management_controller', 'alerts',
'destinations'] and operation != 'retrieve'):
return InputAlertDestination(path, nodes, inputdata, multinode)
elif len(path) == 3 and path[:3] == ['configuration', 'management_controller', 'certificate_authorities'] and operation not in ('retrieve', 'delete'):
return InputCertificateAuthority(path, nodes, inputdata)
elif path == ['identify'] and operation != 'retrieve':
return InputIdentifyMessage(path, nodes, inputdata)
elif path == ['events', 'hardware', 'decode']:
@@ -955,6 +957,16 @@ class ConfluentInputMessage(ConfluentMessage):
return key in self.valid_values
class InputCertificateAuthority(ConfluentInputMessage):
keyname = 'pem'
# anything is valid, since it is a blob of text
def get_pem(self, node):
return self.inputbynode[node]
def is_valid_key(self, key):
return key.strip().startswith('-----BEGIN') and '-----END' in key
class InputIdentImage(ConfluentInputMessage):
keyname = 'ident_image'
valid_values = ['create']
@@ -1345,6 +1357,11 @@ class ReseatResult(ConfluentChoiceMessage):
keyname = 'reseat'
class CertificateAuthority(ConfluentMessage):
def __init__(self, node, pem, subject, san):
self.myargs = (node, pem, subject, san)
self.kvpairs = {node: {'pem': {'value': pem}, 'subject': {'value': subject}, 'san': {'value': san}}}
class PowerState(ConfluentChoiceMessage):
valid_values = set([
'on',

View File

@@ -526,6 +526,8 @@ class IpmiHandler(object):
def handle_configuration(self):
if self.element[1:3] == ['management_controller', 'alerts']:
return self.handle_alerts()
elif self.element[1:3] == ['management_controller', 'certificate_authorities']:
return self.handle_cert_authorities()
elif self.element[1:3] == ['management_controller', 'users']:
return self.handle_users()
elif self.element[1:3] == ['management_controller', 'net_interfaces']:
@@ -576,6 +578,28 @@ class IpmiHandler(object):
self.pyghmi_event_to_confluent(event)
self.output.put(msg.EventCollection((event,), name=self.node))
def handle_cert_authorities(self):
if len(self.element) == 3:
if self.op == 'read':
for cert in self.ipmicmd.get_trusted_cas():
self.output.put(msg.ChildCollection(cert['id']))
elif self.op == 'update':
cert = self.inputdata.get_pem(self.node)
self.ipmicmd.add_trusted_ca(cert)
elif len(self.element) == 4:
certid = self.element[-1]
if self.op == 'read':
for certdata in self.ipmicmd.get_trusted_cas():
if certdata['id'] == certid:
self.output.put(msg.CertificateAuthority(
pem=certdata['pem'],
node=self.node,
subject=certdata['subject'],
san=certdata.get('san', None)))
elif self.op == 'delete':
self.ipmicmd.del_trusted_ca(certid)
return
def handle_alerts(self):
if self.element[3] == 'destinations':
if len(self.element) == 4: