mirror of
https://github.com/xcat2/confluent.git
synced 2026-04-07 01:11:29 +00:00
Normalize non-http and http and http async and internal passthrough
Have the core provide normalization and use it across places that need it.
This commit is contained in:
@@ -33,6 +33,8 @@ import collections
|
||||
import confluent.exceptions as exc
|
||||
import confluent.messages as messages
|
||||
import confluent.util as util
|
||||
import confluent.core as core
|
||||
import confluent.log as log
|
||||
import time
|
||||
|
||||
_asyncsessions = {}
|
||||
@@ -90,13 +92,12 @@ class AsyncSession(object):
|
||||
|
||||
async def run_handler(self, handler, requestid):
|
||||
try:
|
||||
# iterate_responses from core maybe? handler might return other stuff
|
||||
handler = await handler
|
||||
async for rsp in handler:
|
||||
async for rsp in core.iterate_responses(handler):
|
||||
await self.add(requestid, rsp)
|
||||
await self.add(requestid, messages.AsyncCompletion())
|
||||
except Exception as e:
|
||||
print(repr(e))
|
||||
log.logtrace()
|
||||
await self.add(requestid, e)
|
||||
|
||||
async def run_handler(hdlr, req):
|
||||
|
||||
@@ -91,6 +91,10 @@ async def iterate_responses(responses):
|
||||
return
|
||||
elif inspect.isawaitable(responses):
|
||||
responses = await responses
|
||||
if inspect.isasyncgen(responses):
|
||||
async for rsp in responses:
|
||||
yield rsp
|
||||
return
|
||||
for rsp in responses:
|
||||
yield rsp
|
||||
|
||||
@@ -933,7 +937,7 @@ async def handle_dispatch(connection, cert, dispatch, peername):
|
||||
numworkers = 0
|
||||
for hfunc in nodesbyhandler:
|
||||
numworkers += 1
|
||||
asyncio.create_task(addtoqueue(passvalues, hfunc, {
|
||||
tasks.spawn(addtoqueue(passvalues, hfunc, {
|
||||
'nodes': nodesbyhandler[hfunc],
|
||||
'element': pathcomponents,
|
||||
'configmanager': configmanager,
|
||||
|
||||
@@ -1005,7 +1005,7 @@ async def _assemble_json(responses, resource=None, url=None, extension=None):
|
||||
else:
|
||||
links['collection'] = {"href": "./" + extension}
|
||||
rspdata = {}
|
||||
async for rsp in await responses:
|
||||
async for rsp in pluginapi.iterate_responses(responses):
|
||||
if isinstance(rsp, confluent.messages.LinkRelation):
|
||||
haldata = rsp.raw()
|
||||
for hk in haldata:
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
import confluent.core as core
|
||||
import confluent.messages as msg
|
||||
|
||||
def retrieve(nodes, element, configmanager, inputdata):
|
||||
async def retrieve(nodes, element, configmanager, inputdata):
|
||||
locationinfo = configmanager.get_node_attributes(nodes,
|
||||
(u'enclosure.manager', u'enclosure.bay', u'location.rack',
|
||||
u'location.row', u'location.u', u'location.height'))
|
||||
@@ -89,10 +89,10 @@ def retrieve(nodes, element, configmanager, inputdata):
|
||||
needheight.add(node)
|
||||
needheight = ','.join(needheight)
|
||||
if needheight:
|
||||
for rsp in core.handle_path(
|
||||
async for rsp in core.iterate_responses(core.handle_path(
|
||||
'/noderange/{0}/description'.format(needheight),
|
||||
'retrieve', configmanager,
|
||||
inputdata=None):
|
||||
inputdata=None)):
|
||||
if not hasattr(rsp, 'kvpairs'):
|
||||
results['errors'].append((rsp.node, rsp.error))
|
||||
continue
|
||||
|
||||
@@ -205,9 +205,6 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None):
|
||||
|
||||
|
||||
async def send_response(responses, connection):
|
||||
if responses is None:
|
||||
return
|
||||
responses = await responses
|
||||
if responses is None:
|
||||
return
|
||||
async for rsp in pluginapi.iterate_responses(responses):
|
||||
|
||||
Reference in New Issue
Block a user