diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py index ff14d783..9051aebc 100644 --- a/confluent_server/confluent/asynchttp.py +++ b/confluent_server/confluent/asynchttp.py @@ -39,6 +39,7 @@ # Much like console sessions, these will be reaped if a client spends too # far away. +import asyncio import collections import confluent.exceptions as exc import confluent.messages as messages @@ -91,6 +92,8 @@ class AsyncSession(object): return self.responses.append((requestid, rsp)) if self._evt: + + self._evt.send() self._evt = None @@ -118,11 +121,13 @@ class AsyncSession(object): async def run_handler(self, handler, requestid): try: - for rsp in handler: + handler = await handler + async for rsp in handler: await self.add(requestid, rsp) - self.add(requestid, messages.AsyncCompletion()) + await self.add(requestid, messages.AsyncCompletion()) except Exception as e: - self.add(requestid, e) + print(repr(e)) + await self.add(requestid, e) def get_responses(self, timeout=25): self.reaper.cancel() @@ -146,15 +151,17 @@ class AsyncSession(object): yield self.responses.popleft() -async def run_handler(hdlr, env): - asyncsessid = env['HTTP_CONFLUENTASYNCID'] +async def run_handler(hdlr, req): + asyncsessid = req.headers['ConfluentAsyncId'] try: asyncsession = _asyncsessions[asyncsessid]['asyncsession'] - requestid = env['HTTP_CONFLUENTREQUESTID'] + requestid = req.headers['ConfluentRequestId'] except KeyError: raise exc.InvalidArgumentException( 'Invalid Session ID or missing request id') - eventlet.spawn_n(asyncsession.run_handler, hdlr, requestid) + cloop = asyncio.get_event_loop() + cloop.create_task(asyncsession.run_handler(hdlr, requestid)) + #eventlet.spawn_n(asyncsession.run_handler, hdlr, requestid) return requestid diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 196a0ee6..fd7040bd 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -642,14 +642,14 @@ def stripnode(iterablersp, node): yield i -def iterate_collections(iterable, forcecollection=True): +async def iterate_collections(iterable, forcecollection=True): for coll in iterable: if forcecollection and coll[-1] != '/': coll += '/' yield msg.ChildCollection(coll, candelete=True) -def iterate_resources(fancydict): +async def iterate_resources(fancydict): for resource in fancydict: if resource.startswith("_"): continue @@ -768,7 +768,7 @@ def create_noderange(inputdata, configmanager): -def enumerate_collections(collections): +async def enumerate_collections(collections): for collection in collections: yield msg.ChildCollection(collection) @@ -1107,10 +1107,10 @@ async def handle_node_request(configmanager, inputdata, operation, 'element': pathcomponents, 'configmanager': configmanager, 'inputdata': inputdata, 'operation': operation, 'isnoderange': isnoderange})) if isnoderange or not autostrip: - return [x async for x in iterate_queue(numworkers, passvalues)] + return iterate_queue(numworkers, passvalues) # [x async for x in iterate_queue(numworkers, passvalues)] else: if numworkers > 0: - return [x async for x in iterate_queue(numworkers, passvalues, nodes[0])] + return iterate_queue(numworkers, passvalues, nodes[0]) # [x async for x in iterate_queue(numworkers, passvalues, nodes[0])] else: raise exc.NotImplementedException() diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index f10b4207..cd114c40 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -26,7 +26,7 @@ try: except ImportError: webauthn = None import asyncio -from aiohttp import web, web_urldispatcher, connector, ClientSession +from aiohttp import web, web_urldispatcher, connector, ClientSession, WSMsgType import confluent.auth as auth import confluent.config.attributes as attribs import confluent.config.configmanager as configmanager @@ -243,7 +243,7 @@ def _csrf_valid(req, session): if 'csrftoken' not in session: # The client has not (yet) requested CSRF protection # so we return true - if 'ConfluentAuthToken' in req.headers: + if 'Confluentauthtoken' in req.headers: # The client has requested CSRF countermeasures, # oblige the request and apply a new token to the # session @@ -264,7 +264,7 @@ def _csrf_valid(req, session): # The session has CSRF protection enabled, only mark valid if # the client has provided an auth token and that token matches the # value protecting the session - return ('ConfluentAuthToken' in req.headers and + return ('Confluentauthtoken' in req.headers and req.headers['ConfluentAuthToken'] == session['csrftoken']) @@ -459,7 +459,6 @@ async def wsock_handler(req): return httpsessions[sessid]['inflight'].add(rsp) name = httpsessions[sessid]['name'] - print(req.rel_url.path) authdata = auth.authorize(name, req.rel_url.path, operation='start') if not authdata: return @@ -467,9 +466,9 @@ async def wsock_handler(req): username = httpsessions[sessid]['name'] if req.rel_url.path == '/sessions/current/async': myconsoles = {} - async def asyncwscallback(rsp): - rsp = json.dumps(rsp.raw()) - await rsp.send_str(u'!' + rsp) + async def asyncwscallback(rspm): + rspm = json.dumps(rspm.raw()) + await rsp.send_str(u'!' + rspm) currsess['inflight'].add(rsp) asess = None try: @@ -479,10 +478,14 @@ async def wsock_handler(req): clientmsg = True while clientmsg: clientmsg = await rsp.receive() + if clientmsg.type == WSMsgType.CLOSE: + break + elif clientmsg.type != WSMsgType.TEXT: + continue clientmsg = clientmsg.data if clientmsg: if clientmsg[0] == '?': - ws.send('?') + await rsp.send_str('?') elif clientmsg[0] == '$': targid, data = clientmsg[1:].split('$', 1) if data[0] == ' ': @@ -631,7 +634,6 @@ async def resourcehandler(request): try: if 'Sec-WebSocket-Version' in request.headers: - print('WebSocket....') return await wsock_handler(request) else: return await resourcehandler_backend(request, make_response) @@ -953,9 +955,9 @@ async def resourcehandler_backend(req, make_response): hdlr = pluginapi.handle_path(url, operation, cfgmgr, querydict) if 'ConfluentAsyncId' in req.headers: - confluent.asynchttp.run_handler(hdlr, env) - await make_response('text/plain', 202, cookies=cookies) - rsp.write(b'Request queued') + await confluent.asynchttp.run_handler(hdlr, req) + rsp = await make_response('text/plain', 202, cookies=cookies) + await rsp.write(b'Request queued') return rsp pagecontent = "" if mimetype == 'text/html': @@ -1051,7 +1053,7 @@ async def _assemble_json(responses, resource=None, url=None, extension=None): else: links['collection'] = {"href": "./" + extension} rspdata = {} - for rsp in await responses: + async for rsp in await responses: if isinstance(rsp, confluent.messages.LinkRelation): haldata = rsp.raw() for hk in haldata: diff --git a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py index ba4ecb42..602ce45b 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py @@ -411,6 +411,7 @@ async def perform_requests(operator, nodes, element, cfg, inputdata, realop): except asyncio.QueueEmpty: pass except asyncio.TimeoutError: + print("whoopsie?") pass finally: for datum in sorted( @@ -525,7 +526,7 @@ class IpmiHandler: self.ipmicmd = persistent_ipmicmds[(node, tenant)] giveup = util.monotonic_time() + 60 while not self.ipmicmd.ipmi_session.broken and not self.ipmicmd.ipmi_session.logged and self.ipmicmd.ipmi_session.logging: - self.ipmicmd.ipmi_session.wait_for_rsp(3) + await self.ipmicmd.ipmi_session.wait_for_rsp(3) if util.monotonic_time() > giveup: self.ipmicmd.ipmi_session.broken = True return self diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 78bd077d..d75a5d4f 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -201,7 +201,9 @@ async def send_response(responses, connection): if responses is None: return responses = await responses - for rsp in responses: + if responses is None: + return + async for rsp in responses: await send_data(connection, rsp.raw()) await send_data(connection, {'_requestdone': 1})