From 7b2e32009f3878467aa853190ceebfe18f8aed14 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 4 Apr 2024 17:13:37 -0400 Subject: [PATCH] Numerous async improvements Restore 'as available' behavior to noderange over socket Bring the httpapi to the point where the webui is able to start working, notably bringing the asynchttp online with the websocket. Fix a flaw in the async ipmi that would cause hangups. --- confluent_server/confluent/asynchttp.py | 21 +++++++++----- confluent_server/confluent/core.py | 10 +++---- confluent_server/confluent/httpapi.py | 28 ++++++++++--------- .../plugins/hardwaremanagement/ipmi.py | 3 +- confluent_server/confluent/sockapi.py | 4 ++- 5 files changed, 39 insertions(+), 27 deletions(-) diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py index ff14d783..9051aebc 100644 --- a/confluent_server/confluent/asynchttp.py +++ b/confluent_server/confluent/asynchttp.py @@ -39,6 +39,7 @@ # Much like console sessions, these will be reaped if a client spends too # far away. +import asyncio import collections import confluent.exceptions as exc import confluent.messages as messages @@ -91,6 +92,8 @@ class AsyncSession(object): return self.responses.append((requestid, rsp)) if self._evt: + + self._evt.send() self._evt = None @@ -118,11 +121,13 @@ class AsyncSession(object): async def run_handler(self, handler, requestid): try: - for rsp in handler: + handler = await handler + async for rsp in handler: await self.add(requestid, rsp) - self.add(requestid, messages.AsyncCompletion()) + await self.add(requestid, messages.AsyncCompletion()) except Exception as e: - self.add(requestid, e) + print(repr(e)) + await self.add(requestid, e) def get_responses(self, timeout=25): self.reaper.cancel() @@ -146,15 +151,17 @@ class AsyncSession(object): yield self.responses.popleft() -async def run_handler(hdlr, env): - asyncsessid = env['HTTP_CONFLUENTASYNCID'] +async def run_handler(hdlr, req): + asyncsessid = req.headers['ConfluentAsyncId'] try: asyncsession = _asyncsessions[asyncsessid]['asyncsession'] - requestid = env['HTTP_CONFLUENTREQUESTID'] + requestid = req.headers['ConfluentRequestId'] except KeyError: raise exc.InvalidArgumentException( 'Invalid Session ID or missing request id') - eventlet.spawn_n(asyncsession.run_handler, hdlr, requestid) + cloop = asyncio.get_event_loop() + cloop.create_task(asyncsession.run_handler(hdlr, requestid)) + #eventlet.spawn_n(asyncsession.run_handler, hdlr, requestid) return requestid diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 196a0ee6..fd7040bd 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -642,14 +642,14 @@ def stripnode(iterablersp, node): yield i -def iterate_collections(iterable, forcecollection=True): +async def iterate_collections(iterable, forcecollection=True): for coll in iterable: if forcecollection and coll[-1] != '/': coll += '/' yield msg.ChildCollection(coll, candelete=True) -def iterate_resources(fancydict): +async def iterate_resources(fancydict): for resource in fancydict: if resource.startswith("_"): continue @@ -768,7 +768,7 @@ def create_noderange(inputdata, configmanager): -def enumerate_collections(collections): +async def enumerate_collections(collections): for collection in collections: yield msg.ChildCollection(collection) @@ -1107,10 +1107,10 @@ async def handle_node_request(configmanager, inputdata, operation, 'element': pathcomponents, 'configmanager': configmanager, 'inputdata': inputdata, 'operation': operation, 'isnoderange': isnoderange})) if isnoderange or not autostrip: - return [x async for x in iterate_queue(numworkers, passvalues)] + return iterate_queue(numworkers, passvalues) # [x async for x in iterate_queue(numworkers, passvalues)] else: if numworkers > 0: - return [x async for x in iterate_queue(numworkers, passvalues, nodes[0])] + return iterate_queue(numworkers, passvalues, nodes[0]) # [x async for x in iterate_queue(numworkers, passvalues, nodes[0])] else: raise exc.NotImplementedException() diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index f10b4207..cd114c40 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -26,7 +26,7 @@ try: except ImportError: webauthn = None import asyncio -from aiohttp import web, web_urldispatcher, connector, ClientSession +from aiohttp import web, web_urldispatcher, connector, ClientSession, WSMsgType import confluent.auth as auth import confluent.config.attributes as attribs import confluent.config.configmanager as configmanager @@ -243,7 +243,7 @@ def _csrf_valid(req, session): if 'csrftoken' not in session: # The client has not (yet) requested CSRF protection # so we return true - if 'ConfluentAuthToken' in req.headers: + if 'Confluentauthtoken' in req.headers: # The client has requested CSRF countermeasures, # oblige the request and apply a new token to the # session @@ -264,7 +264,7 @@ def _csrf_valid(req, session): # The session has CSRF protection enabled, only mark valid if # the client has provided an auth token and that token matches the # value protecting the session - return ('ConfluentAuthToken' in req.headers and + return ('Confluentauthtoken' in req.headers and req.headers['ConfluentAuthToken'] == session['csrftoken']) @@ -459,7 +459,6 @@ async def wsock_handler(req): return httpsessions[sessid]['inflight'].add(rsp) name = httpsessions[sessid]['name'] - print(req.rel_url.path) authdata = auth.authorize(name, req.rel_url.path, operation='start') if not authdata: return @@ -467,9 +466,9 @@ async def wsock_handler(req): username = httpsessions[sessid]['name'] if req.rel_url.path == '/sessions/current/async': myconsoles = {} - async def asyncwscallback(rsp): - rsp = json.dumps(rsp.raw()) - await rsp.send_str(u'!' + rsp) + async def asyncwscallback(rspm): + rspm = json.dumps(rspm.raw()) + await rsp.send_str(u'!' + rspm) currsess['inflight'].add(rsp) asess = None try: @@ -479,10 +478,14 @@ async def wsock_handler(req): clientmsg = True while clientmsg: clientmsg = await rsp.receive() + if clientmsg.type == WSMsgType.CLOSE: + break + elif clientmsg.type != WSMsgType.TEXT: + continue clientmsg = clientmsg.data if clientmsg: if clientmsg[0] == '?': - ws.send('?') + await rsp.send_str('?') elif clientmsg[0] == '$': targid, data = clientmsg[1:].split('$', 1) if data[0] == ' ': @@ -631,7 +634,6 @@ async def resourcehandler(request): try: if 'Sec-WebSocket-Version' in request.headers: - print('WebSocket....') return await wsock_handler(request) else: return await resourcehandler_backend(request, make_response) @@ -953,9 +955,9 @@ async def resourcehandler_backend(req, make_response): hdlr = pluginapi.handle_path(url, operation, cfgmgr, querydict) if 'ConfluentAsyncId' in req.headers: - confluent.asynchttp.run_handler(hdlr, env) - await make_response('text/plain', 202, cookies=cookies) - rsp.write(b'Request queued') + await confluent.asynchttp.run_handler(hdlr, req) + rsp = await make_response('text/plain', 202, cookies=cookies) + await rsp.write(b'Request queued') return rsp pagecontent = "" if mimetype == 'text/html': @@ -1051,7 +1053,7 @@ async def _assemble_json(responses, resource=None, url=None, extension=None): else: links['collection'] = {"href": "./" + extension} rspdata = {} - for rsp in await responses: + async for rsp in await responses: if isinstance(rsp, confluent.messages.LinkRelation): haldata = rsp.raw() for hk in haldata: diff --git a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py index ba4ecb42..602ce45b 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py @@ -411,6 +411,7 @@ async def perform_requests(operator, nodes, element, cfg, inputdata, realop): except asyncio.QueueEmpty: pass except asyncio.TimeoutError: + print("whoopsie?") pass finally: for datum in sorted( @@ -525,7 +526,7 @@ class IpmiHandler: self.ipmicmd = persistent_ipmicmds[(node, tenant)] giveup = util.monotonic_time() + 60 while not self.ipmicmd.ipmi_session.broken and not self.ipmicmd.ipmi_session.logged and self.ipmicmd.ipmi_session.logging: - self.ipmicmd.ipmi_session.wait_for_rsp(3) + await self.ipmicmd.ipmi_session.wait_for_rsp(3) if util.monotonic_time() > giveup: self.ipmicmd.ipmi_session.broken = True return self diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 78bd077d..d75a5d4f 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -201,7 +201,9 @@ async def send_response(responses, connection): if responses is None: return responses = await responses - for rsp in responses: + if responses is None: + return + async for rsp in responses: await send_data(connection, rsp.raw()) await send_data(connection, {'_requestdone': 1})