mirror of
https://github.com/xcat2/confluent.git
synced 2026-05-01 04:47:45 +00:00
Simplify webauthn implementation
Stop tracking sign counters (which weren't used). Remove various management of transient challenges. Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
@@ -351,10 +351,11 @@ async def _authorize_request(req, operation, reqbody):
|
||||
sessid = _establish_http_session(req, authdata, name, cookie)
|
||||
if authdata and element and element.startswith('/sessions/current/webauthn/validate/'):
|
||||
if webauthn:
|
||||
for rsp in webauthn.handle_api_request(element, env, None, authdata[2], authdata[1], None, reqbody, None):
|
||||
if rsp['verified']:
|
||||
sessid = _establish_http_session(env, authdata, name, cookie)
|
||||
break
|
||||
rsp = await webauthn.handle_api_request(element, req, authdata[2], authdata[1], reqbody, None)
|
||||
if rsp['verified']:
|
||||
sessid = _establish_http_session(req, authdata, name, cookie)
|
||||
else:
|
||||
return {'code': 403}
|
||||
skiplog = _should_skip_authlog(req)
|
||||
if authdata:
|
||||
auditmsg = {
|
||||
@@ -1012,7 +1013,7 @@ async def resourcehandler_backend(req, make_response):
|
||||
if not webauthn:
|
||||
rsp = await make_response('text/plain', 501, 'Not Implemented')
|
||||
return rsp
|
||||
wauthbody = webauthn.handle_api_request(url, req, authorized['username'], cfgmgr, reqbody, authorized)
|
||||
wauthbody = await webauthn.handle_api_request(url, req, authorized['username'], cfgmgr, reqbody, authorized)
|
||||
return await make_response(body=wauthbody)
|
||||
resource = '.' + url[url.rindex('/'):]
|
||||
lquerydict = copy.deepcopy(querydict)
|
||||
|
||||
@@ -19,14 +19,13 @@ from webauthn import verify_registration_response
|
||||
from webauthn import verify_authentication_response
|
||||
|
||||
|
||||
challenges = set([])
|
||||
challenges = {}
|
||||
|
||||
CONFIG_MANAGER = None
|
||||
|
||||
class Credential():
|
||||
def __init__(self, id, signature_count, public_key):
|
||||
def __init__(self, id, public_key):
|
||||
self.id = id
|
||||
self.signature_count = signature_count
|
||||
self.credential_public_key = public_key
|
||||
|
||||
class Challenge():
|
||||
@@ -55,18 +54,17 @@ def _load_authenticators(authenticators):
|
||||
return ret
|
||||
|
||||
class User():
|
||||
def __init__(self, id, username, user_handle, challenge: Challenge = None, credential: Credential = None):
|
||||
def __init__(self, id, username, user_handle, credential: Credential = None):
|
||||
self.id = id
|
||||
self.username = username
|
||||
self.user_handle = user_handle
|
||||
self.challenges = challenge
|
||||
self.credentials = credential
|
||||
|
||||
def __parse_credentials(self):
|
||||
if self.credentials:
|
||||
credid = base64.b64encode(self.credentials.id).decode()
|
||||
pubkey = base64.b64encode(self.credentials.credential_public_key).decode()
|
||||
return {"id": credid, "signature_count": self.credentials.signature_count, "credential_public_key": pubkey}
|
||||
return {"id": credid, "credential_public_key": pubkey}
|
||||
|
||||
@staticmethod
|
||||
def seek_credential_by_id(credential_id):
|
||||
@@ -81,8 +79,7 @@ class User():
|
||||
except KeyError:
|
||||
continue
|
||||
if "id" in credential.keys() and credential["id"] == credential_id:
|
||||
#for now leaving signature count as None
|
||||
return (Credential(id=credential["id"], signature_count=None, public_key=credential["credential_public_key"]), username)
|
||||
return (Credential(id=credential["id"], public_key=credential["credential_public_key"]), username)
|
||||
return None
|
||||
|
||||
|
||||
@@ -95,28 +92,13 @@ class User():
|
||||
credential = authenticators.get('credentials', None)
|
||||
if credential is None:
|
||||
return None
|
||||
|
||||
if credential_id is None:
|
||||
return Credential(id=credential["id"], signature_count=credential["signature_count"], public_key=credential["credential_public_key"])
|
||||
|
||||
return Credential(id=credential["id"], public_key=credential["credential_public_key"])
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def get_challenge(username):
|
||||
if not isinstance(username, str):
|
||||
username = username.decode('utf8')
|
||||
authuser = CONFIG_MANAGER.get_user(username)
|
||||
if not authuser:
|
||||
return None
|
||||
authenticators = authuser.get('authenticators', {})
|
||||
authenticators = _load_authenticators(authenticators)
|
||||
challenge = authenticators['challenges']
|
||||
return Challenge(request=challenge["request"], id=challenge["id"])
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get(username):
|
||||
challenges_return = None
|
||||
credentials_return = None
|
||||
if not CONFIG_MANAGER:
|
||||
raise Exception('config manager is not set up')
|
||||
@@ -135,39 +117,30 @@ class User():
|
||||
authid = None
|
||||
else:
|
||||
authid = base64.b64decode(b64authid)
|
||||
challenge = authenticators.get("challenges", None)
|
||||
if challenge:
|
||||
challenges_return = Challenge(challenge['request'], id=challenge["id"])
|
||||
|
||||
credential = authenticators.get("credentials", None)
|
||||
if credential:
|
||||
credentials_return = (Credential(credential['id'], credential['signature_count'], credential["credential_public_key"]))
|
||||
credentials_return = (Credential(credential['id'], credential["credential_public_key"]))
|
||||
|
||||
return User(id=None, username=username, user_handle=authid, challenge=challenges_return, credential=credentials_return)
|
||||
return User(id=None, username=username, user_handle=authid, credential=credentials_return)
|
||||
|
||||
def save(self):
|
||||
async def save(self):
|
||||
authenticators = CONFIG_MANAGER.get_user(self.username).get('authenticators', {})
|
||||
authenticators = _load_authenticators(authenticators)
|
||||
# Let's not retain the transient challenges
|
||||
#authenticators['challenges'] = self.__parse_challenges() # Looks like the bigger the array we encounter problems changing to just save one challenge
|
||||
authenticators['credentials'] = self.__parse_credentials()
|
||||
|
||||
CONFIG_MANAGER.set_user(self.username, {'authenticators': authenticators})
|
||||
await CONFIG_MANAGER.set_user(self.username, {'authenticators': authenticators})
|
||||
|
||||
|
||||
def add(self, item):
|
||||
if isinstance(item, Challenge):
|
||||
self.challenges = item
|
||||
elif isinstance(item, Credential):
|
||||
if isinstance(item, Credential):
|
||||
self.credentials = item
|
||||
else:
|
||||
raise Exception("Unsupported item type")
|
||||
|
||||
def update(self, item):
|
||||
if isinstance(item, Challenge):
|
||||
self.challenges = item
|
||||
elif isinstance(item, Credential):
|
||||
if isinstance(item, Credential):
|
||||
self.credentials = item
|
||||
return
|
||||
#raise Exception("Credential item not found")
|
||||
else:
|
||||
raise Exception("Unsupported item type")
|
||||
|
||||
|
||||
def registration_request(username, cfg, APP_RELYING_PARTY):
|
||||
@@ -185,26 +158,36 @@ def registration_request(username, cfg, APP_RELYING_PARTY):
|
||||
),
|
||||
)
|
||||
|
||||
challenge = Challenge(options.challenge)
|
||||
user_model.add(challenge)
|
||||
user_model.save()
|
||||
challenges[options.challenge] = username
|
||||
options_json = options_to_json(options)
|
||||
return options_json
|
||||
|
||||
|
||||
def registration_response(request, username, APP_RELYING_PARTY, APP_ORIGIN):
|
||||
challenge_model = User.get_challenge(username)
|
||||
if not challenge_model:
|
||||
raise Exception("Could not find challenge matching given id")
|
||||
def b64decode(data: str) -> bytes:
|
||||
"""Decode base64, padding being optional.
|
||||
|
||||
:param data: Base64 data as an ASCII byte string
|
||||
:returns: The decoded byte string.
|
||||
"""
|
||||
data += '=' * (-len(data) % 4) # Pad with '='s
|
||||
return base64.urlsafe_b64decode(data)
|
||||
|
||||
async def registration_response(request, username, APP_RELYING_PARTY, APP_ORIGIN):
|
||||
cdj = request['response']['clientDataJSON']
|
||||
cdata = json.loads(b64decode(cdj))
|
||||
challenge = b64decode(cdata['challenge'])
|
||||
if challenge not in challenges:
|
||||
raise Exception("Could not find challenge")
|
||||
chausername = challenges.pop(challenge, None)
|
||||
if chausername != username:
|
||||
raise Exception("Challenge does not match username")
|
||||
user_model = User.get(username)
|
||||
if not user_model:
|
||||
raise Exception("Invalid Username")
|
||||
|
||||
try:
|
||||
registration_verification = verify_registration_response(
|
||||
credential=request,
|
||||
expected_challenge=challenge_model.request,
|
||||
expected_challenge=challenge,
|
||||
expected_rp_id=APP_RELYING_PARTY.id,
|
||||
expected_origin=APP_ORIGIN,
|
||||
require_user_verification=True,
|
||||
@@ -212,10 +195,9 @@ def registration_response(request, username, APP_RELYING_PARTY, APP_ORIGIN):
|
||||
except Exception as err:
|
||||
raise Exception("Could not handle credential attestation")
|
||||
|
||||
credential = Credential(id=registration_verification.credential_id, signature_count=registration_verification.sign_count, public_key=registration_verification.credential_public_key)
|
||||
credential = Credential(id=registration_verification.credential_id, public_key=registration_verification.credential_public_key)
|
||||
user_model.add(credential)
|
||||
user_model.save()
|
||||
|
||||
await user_model.save()
|
||||
return {"verified": True}
|
||||
|
||||
|
||||
@@ -237,10 +219,7 @@ def authentication_request(username, APP_RELYING_PARTY):
|
||||
rp_id=APP_RELYING_PARTY.id,
|
||||
user_verification=UserVerificationRequirement.REQUIRED,
|
||||
)
|
||||
|
||||
challenge = Challenge(options.challenge)
|
||||
user_model.add(challenge)
|
||||
user_model.save()
|
||||
challenges[options.challenge] = username
|
||||
opts = options_to_json(options)
|
||||
return opts
|
||||
|
||||
@@ -248,11 +227,8 @@ def authentication_response(request, username, APP_RELYING_PARTY, APP_ORIGIN):
|
||||
user_model = User.get(username)
|
||||
if not user_model:
|
||||
raise Exception("Invalid Username")
|
||||
|
||||
challenge_model = User.get_challenge(username)
|
||||
if not challenge_model:
|
||||
raise Exception("Could not find challenge matching given id")
|
||||
|
||||
print(repr(request))
|
||||
raise Exception("Nope")
|
||||
credential_model = User.get_credential(credential_id=None, username=username)
|
||||
if not credential_model:
|
||||
raise Exception("No credential for user")
|
||||
@@ -275,7 +251,7 @@ class RpEntity(object):
|
||||
self.name = name
|
||||
self.id = id
|
||||
|
||||
def handle_api_request(url, req, username, cfm, reqbody, authorized):
|
||||
async def handle_api_request(url, req, username, cfm, reqbody, authorized):
|
||||
"""
|
||||
For now webauth is going to be limited to just one passkey per user
|
||||
If you try to register a new passkey this will just clear the old one and register the new passkey
|
||||
@@ -299,7 +275,7 @@ def handle_api_request(url, req, username, cfm, reqbody, authorized):
|
||||
# this would entail checking authid for uniqueness as a key once that key structure starts being built
|
||||
authid = secrets.token_bytes(64)
|
||||
b64authid = base64.b64encode(authid).decode()
|
||||
cfm.set_user(username, {'webauthid': b64authid})
|
||||
await cfm.set_user(username, {'webauthid': b64authid})
|
||||
opts = registration_request(username, cfm, APP_RELYING_PARTY)
|
||||
return opts
|
||||
elif url.startswith('/registered_credentials/'):
|
||||
@@ -331,9 +307,7 @@ def handle_api_request(url, req, username, cfm, reqbody, authorized):
|
||||
elif url == '/register_credential':
|
||||
req = json.loads(reqbody)
|
||||
userinfo = cfm.get_user(username)
|
||||
if not isinstance(username, bytes):
|
||||
username = username.encode('utf8')
|
||||
rsp = registration_response(req, username, APP_RELYING_PARTY, APP_ORIGIN)
|
||||
rsp = await registration_response(req, username, APP_RELYING_PARTY, APP_ORIGIN)
|
||||
if rsp.get('verified', False):
|
||||
return json.dumps({'status': 'Success'})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user