diff --git a/confluent_server/confluent/util.py b/confluent_server/confluent/util.py index 35ff83b9..d74695ff 100644 --- a/confluent_server/confluent/util.py +++ b/confluent_server/confluent/util.py @@ -293,35 +293,31 @@ class TLSCertVerifier(object): def verify_cert(self, certificate): storedprint = self.cfm.get_node_attributes(self.node, (self.fieldname,) ) - - if (self.fieldname not in storedprint[self.node] or - storedprint[self.node][self.fieldname]['value'] == ''): + storedprint = storedprint.get(self.node, {}).get(self.fieldname, {}).get('value', '') + if (not storedprint): # no stored value, check policy for next action newpolicy = self.cfm.get_node_attributes(self.node, ('pubkeys.addpolicy',)) - if ('pubkeys.addpolicy' in newpolicy[self.node] and - 'value' in newpolicy[self.node]['pubkeys.addpolicy'] and - newpolicy[self.node]['pubkeys.addpolicy']['value'] == 'manual'): + newpolicy = newpolicy.get(self.node, {}).get('pubkeys.addpolicy', {}).get('value', '') + if newpolicy == 'manual': # manual policy means always raise unless a match is set # manually fingerprint = get_fingerprint(certificate, 'sha256') raise cexc.PubkeyInvalid('New certificate detected', certificate, fingerprint, self.fieldname, 'newkey') - # since the policy is not manual, go ahead and add new key - # after logging to audit log - fingerprint = get_fingerprint(certificate, 'sha256') - auditlog = log.Logger('audit') - auditlog.log({'node': self.node, 'event': 'certautoadd', - 'fingerprint': fingerprint}) - tasks.spawn(self.cfm.set_node_attributes( - {self.node: {self.fieldname: fingerprint}})) - return True - elif cert_matches(storedprint[self.node][self.fieldname]['value'], - certificate): + if newpolicy in ('', 'tofu'): + fingerprint = get_fingerprint(certificate, 'sha256') + auditlog = log.Logger('audit') + auditlog.log({'node': self.node, 'event': 'certautoadd', + 'fingerprint': fingerprint}) + tasks.spawn(self.cfm.set_node_attributes( + {self.node: {self.fieldname: fingerprint}})) + return True + elif cert_matches(storedprint, certificate): return True fingerprint = get_fingerprint(certificate, 'sha256') - # Mismatches, but try more traditional validation using the site CAs + # No pinned certificate match if self.subject: try: if verification and self.verify_by_ca(certificate):