2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-01-11 18:42:29 +00:00
Files
confluent/confluent_client/bin/nodecertutil
2025-10-31 12:04:27 -04:00

104 lines
4.5 KiB
Python

#!/usr/bin/python3
import os
import sys
from cryptography import x509
from cryptography.hazmat.primitives import hashes
path = os.path.dirname(os.path.realpath(__file__))
path = os.path.realpath(os.path.join(path, '..', 'lib', 'python'))
if path.startswith('/opt'):
sys.path.append(path)
import confluent.client as client
def removebmccacert(noderange, certid, cmd):
for res in cmd.delete(f'/noderange/{noderange}/configuration/management_controller/certificate_authorities/{certid}'):
print(repr(res))
def listbmccacerts(noderange, cmd):
certids = []
for res in cmd.read(f'/noderange/{noderange}/configuration/management_controller/certificate_authorities'):
certids.append(res.get('item', {}).get('href', ''))
for certid in certids:
for res in cmd.read(f'/noderange/{noderange}/configuration/management_controller/certificate_authorities/{certid}'):
for node in res.get('databynode', {}):
certdata = res['databynode'][node].get('pem', {}).get('value', '')
summary = ''
if not certdata:
continue
san = res['databynode'][node].get('san', {}).get('value', '')
if san:
summary += f" SANs: {san}"
subject = res['databynode'][node].get('subject', {}).get('value', '')
if subject:
summary = subject
try:
cert = x509.load_pem_x509_certificate(certdata.encode())
sha256 = cert.fingerprint(hashes.SHA256()).hex().upper()
except Exception as e:
print(f"Error processing certificate for {node}: {e}", file=sys.stderr)
continue
summary += f" (SHA256={sha256})"
print(f"{node}: {certid}: {summary}")
def installbmccacert(noderange, certfile, cmd):
if certfile:
try:
with open(certfile, 'r') as f:
certdata = f.read()
except Exception as e:
print(f"Error reading certificate file: {e}", file=sys.stderr)
sys.exit(1)
# Simple validation: check if it starts and ends with the correct PEM markers
if not (certdata.startswith("-----BEGIN CERTIFICATE-----") and certdata.strip().endswith("-----END CERTIFICATE-----")):
print("Invalid certificate format. Must be a PEM encoded certificate.", file=sys.stderr)
sys.exit(1)
payload = {'pem': certdata}
for res in cmd.update(f'/noderange/{noderange}/configuration/management_controller/certificate_authorities', payload):
print(repr(res))
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Node certificate utility')
parser.add_argument('noderange', help='Node range to operate on')
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# installbmccacert subcommand
install_parser = subparsers.add_parser('installbmccacert', help='Install BMC CA certificate')
install_parser.add_argument('filename', help='Certificate file to install')
remove_parser = subparsers.add_parser('removebmccacert', help='Remove BMC CA certificate')
remove_parser.add_argument('id', help='Certificate id to remove')
list_parser = subparsers.add_parser('listbmccacerts', help='List BMC CA certificates')
sign_bmc_parser = subparsers.add_parser('signbmccert', help='Sign BMC certificate')
sign_bmc_parser.add_argument('--days', type=int, help='Number of days the certificate is valid for')
sign_bmc_parser.add_argument('--added-names', type=str, help='Additional names to include in the certificate')
args = parser.parse_args()
c = client.Command()
if args.command == 'installbmccacert':
installbmccacert(args.noderange, args.filename, c)
elif args.command == 'removebmccacert':
removebmccacert(args.noderange, args.id, c)
elif args.command == 'listbmccacerts':
listbmccacerts(args.noderange, c)
elif args.command == 'signbmccert':
payload = {}
if args.days is not None:
payload['days'] = args.days
else:
print("Error: --days is required for signbmccert", file=sys.stderr)
sys.exit(1)
if args.added_names:
payload['added_names'] = args.added_names
for res in c.update(f'/noderange/{args.noderange}/configuration/management_controller/certificate/sign', payload):
print(repr(res))
else:
parser.print_help()
sys.exit(1)