2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-05-07 17:27:16 +00:00

Allow a policy that only uses certificate authority

Technically this was possible before by setting
a bad fingerprint, but formalize an addpolicy
This commit is contained in:
Jarrod Johnson
2026-05-05 11:12:16 -04:00
parent 0abe252c24
commit 454e1b8267
+14 -18
View File
@@ -293,35 +293,31 @@ class TLSCertVerifier(object):
def verify_cert(self, certificate):
storedprint = self.cfm.get_node_attributes(self.node, (self.fieldname,)
)
if (self.fieldname not in storedprint[self.node] or
storedprint[self.node][self.fieldname]['value'] == ''):
storedprint = storedprint.get(self.node, {}).get(self.fieldname, {}).get('value', '')
if (not storedprint):
# no stored value, check policy for next action
newpolicy = self.cfm.get_node_attributes(self.node,
('pubkeys.addpolicy',))
if ('pubkeys.addpolicy' in newpolicy[self.node] and
'value' in newpolicy[self.node]['pubkeys.addpolicy'] and
newpolicy[self.node]['pubkeys.addpolicy']['value'] == 'manual'):
newpolicy = newpolicy.get(self.node, {}).get('pubkeys.addpolicy', {}).get('value', '')
if newpolicy == 'manual':
# manual policy means always raise unless a match is set
# manually
fingerprint = get_fingerprint(certificate, 'sha256')
raise cexc.PubkeyInvalid('New certificate detected',
certificate, fingerprint,
self.fieldname, 'newkey')
# since the policy is not manual, go ahead and add new key
# after logging to audit log
fingerprint = get_fingerprint(certificate, 'sha256')
auditlog = log.Logger('audit')
auditlog.log({'node': self.node, 'event': 'certautoadd',
'fingerprint': fingerprint})
tasks.spawn(self.cfm.set_node_attributes(
{self.node: {self.fieldname: fingerprint}}))
return True
elif cert_matches(storedprint[self.node][self.fieldname]['value'],
certificate):
if newpolicy in ('', 'tofu'):
fingerprint = get_fingerprint(certificate, 'sha256')
auditlog = log.Logger('audit')
auditlog.log({'node': self.node, 'event': 'certautoadd',
'fingerprint': fingerprint})
tasks.spawn(self.cfm.set_node_attributes(
{self.node: {self.fieldname: fingerprint}}))
return True
elif cert_matches(storedprint, certificate):
return True
fingerprint = get_fingerprint(certificate, 'sha256')
# Mismatches, but try more traditional validation using the site CAs
# No pinned certificate match
if self.subject:
try:
if verification and self.verify_by_ca(certificate):