diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py index 279a7fcb..f0222da9 100644 --- a/confluent_server/confluent/asynchttp.py +++ b/confluent_server/confluent/asynchttp.py @@ -33,6 +33,8 @@ import collections import confluent.exceptions as exc import confluent.messages as messages import confluent.util as util +import confluent.core as core +import confluent.log as log import time _asyncsessions = {} @@ -90,13 +92,12 @@ class AsyncSession(object): async def run_handler(self, handler, requestid): try: - # iterate_responses from core maybe? handler might return other stuff - handler = await handler - async for rsp in handler: + async for rsp in core.iterate_responses(handler): await self.add(requestid, rsp) await self.add(requestid, messages.AsyncCompletion()) except Exception as e: print(repr(e)) + log.logtrace() await self.add(requestid, e) async def run_handler(hdlr, req): diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 3af338fc..884c8485 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -91,6 +91,10 @@ async def iterate_responses(responses): return elif inspect.isawaitable(responses): responses = await responses + if inspect.isasyncgen(responses): + async for rsp in responses: + yield rsp + return for rsp in responses: yield rsp @@ -933,7 +937,7 @@ async def handle_dispatch(connection, cert, dispatch, peername): numworkers = 0 for hfunc in nodesbyhandler: numworkers += 1 - asyncio.create_task(addtoqueue(passvalues, hfunc, { + tasks.spawn(addtoqueue(passvalues, hfunc, { 'nodes': nodesbyhandler[hfunc], 'element': pathcomponents, 'configmanager': configmanager, diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index 56bed091..273f1901 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -1005,7 +1005,7 @@ async def _assemble_json(responses, resource=None, url=None, extension=None): else: links['collection'] = {"href": "./" + extension} rspdata = {} - async for rsp in await responses: + async for rsp in pluginapi.iterate_responses(responses): if isinstance(rsp, confluent.messages.LinkRelation): haldata = rsp.raw() for hk in haldata: diff --git a/confluent_server/confluent/plugins/info/layout.py b/confluent_server/confluent/plugins/info/layout.py index ca7f120c..d9d90ea5 100644 --- a/confluent_server/confluent/plugins/info/layout.py +++ b/confluent_server/confluent/plugins/info/layout.py @@ -15,7 +15,7 @@ import confluent.core as core import confluent.messages as msg -def retrieve(nodes, element, configmanager, inputdata): +async def retrieve(nodes, element, configmanager, inputdata): locationinfo = configmanager.get_node_attributes(nodes, (u'enclosure.manager', u'enclosure.bay', u'location.rack', u'location.row', u'location.u', u'location.height')) @@ -89,10 +89,10 @@ def retrieve(nodes, element, configmanager, inputdata): needheight.add(node) needheight = ','.join(needheight) if needheight: - for rsp in core.handle_path( + async for rsp in core.iterate_responses(core.handle_path( '/noderange/{0}/description'.format(needheight), 'retrieve', configmanager, - inputdata=None): + inputdata=None)): if not hasattr(rsp, 'kvpairs'): results['errors'].append((rsp.node, rsp.error)) continue diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index f942a5b6..63324224 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -205,9 +205,6 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): async def send_response(responses, connection): - if responses is None: - return - responses = await responses if responses is None: return async for rsp in pluginapi.iterate_responses(responses):