mirror of
https://github.com/xcat2/confluent.git
synced 2026-06-18 09:30:50 +00:00
Fikup TaskPile and it's uses
This commit is contained in:
@@ -186,8 +186,7 @@ async def read_sensors(element, node, configmanager):
|
||||
'type': 'Power',
|
||||
},
|
||||
]
|
||||
yield msg.SensorReadings(readings, name=node)
|
||||
return
|
||||
return msg.SensorReadings(readings, name=node)
|
||||
|
||||
def get_client(node, configmanager):
|
||||
if node not in _pduclients:
|
||||
@@ -204,7 +203,7 @@ async def read_firmware(node, configmanager):
|
||||
gc = get_client(node, configmanager)
|
||||
adev = await gc.grab_json_response('/xhrgetuserlist.jsp')
|
||||
myversion = adev[0]['fwver']
|
||||
yield msg.Firmware([{'PDU Firmware': {'version': myversion}}], node)
|
||||
return msg.Firmware([{'PDU Firmware': {'version': myversion}}], node)
|
||||
|
||||
|
||||
async def read_inventory(element, node, configmanager):
|
||||
@@ -221,7 +220,7 @@ async def read_inventory(element, node, configmanager):
|
||||
info['Product Name'] = adev['pdu'][0]['model']
|
||||
info['Model'] = adev['pdu'][0]['part_number']
|
||||
inventory['information'] = info
|
||||
yield msg.KeyValueData({'inventory': [inventory]}, node)
|
||||
return msg.KeyValueData({'inventory': [inventory]}, node)
|
||||
|
||||
async def retrieve(nodes, element, configmanager, inputdata):
|
||||
|
||||
@@ -230,7 +229,7 @@ async def retrieve(nodes, element, configmanager, inputdata):
|
||||
for node in nodes:
|
||||
|
||||
gp.spawn(get_outlet, element, node, configmanager)
|
||||
for res in gp:
|
||||
async for res in gp:
|
||||
yield res
|
||||
|
||||
return
|
||||
@@ -238,25 +237,22 @@ async def retrieve(nodes, element, configmanager, inputdata):
|
||||
gp = tasks.TaskPile(pdupool)
|
||||
for node in nodes:
|
||||
gp.spawn(read_sensors, element, node, configmanager)
|
||||
for rsp in gp:
|
||||
for datum in rsp:
|
||||
yield datum
|
||||
async for rsp in gp:
|
||||
yield rsp
|
||||
return
|
||||
elif '/'.join(element).startswith('inventory/firmware/all'):
|
||||
gp = tasks.TaskPile(pdupool)
|
||||
for node in nodes:
|
||||
gp.spawn(read_firmware, node, configmanager)
|
||||
for rsp in gp:
|
||||
for datum in rsp:
|
||||
yield datum
|
||||
async for rsp in gp:
|
||||
yield rsp
|
||||
|
||||
elif '/'.join(element).startswith('inventory/hardware/all'):
|
||||
gp = tasks.TaskPile(pdupool)
|
||||
for node in nodes:
|
||||
gp.spawn(read_inventory, element, node, configmanager)
|
||||
for rsp in gp:
|
||||
for datum in rsp:
|
||||
yield datum
|
||||
async for rsp in gp:
|
||||
yield rsp
|
||||
else:
|
||||
for node in nodes:
|
||||
yield msg.ConfluentResourceUnavailable(node, 'Not implemented')
|
||||
|
||||
@@ -234,9 +234,9 @@ async def read_sensors(element, node, configmanager):
|
||||
process_measurements(name, category, datum['outlet'], 'outlet', readings)
|
||||
if justnames:
|
||||
for reading in readings:
|
||||
yield msg.ChildCollection(simplify_name(reading['name']))
|
||||
return msg.ChildCollection(simplify_name(reading['name']))
|
||||
else:
|
||||
yield msg.SensorReadings(readings, name=node)
|
||||
return msg.SensorReadings(readings, name=node)
|
||||
|
||||
|
||||
async def get_outlet(element, node, configmanager):
|
||||
@@ -250,7 +250,7 @@ async def read_firmware(node, configmanager):
|
||||
gc = GeistClient(node, configmanager)
|
||||
adev = await gc.wc.grab_json_response('/api/sys')
|
||||
myversion = adev['data']['version']
|
||||
yield msg.Firmware([{'PDU Firmware': {'version': myversion}}], node)
|
||||
return msg.Firmware([{'PDU Firmware': {'version': myversion}}], node)
|
||||
|
||||
|
||||
async def read_inventory(element, node, configmanager):
|
||||
@@ -306,7 +306,7 @@ async def retrieve(nodes, element, configmanager, inputdata):
|
||||
gp = tasks.TaskPile(pdupool)
|
||||
for node in nodes:
|
||||
gp.spawn(get_outlet, element, node, configmanager)
|
||||
for res in gp:
|
||||
async for res in gp:
|
||||
yield res
|
||||
|
||||
return
|
||||
@@ -314,23 +314,20 @@ async def retrieve(nodes, element, configmanager, inputdata):
|
||||
gp = tasks.TaskPile(pdupool)
|
||||
for node in nodes:
|
||||
gp.spawn(read_sensors, element, node, configmanager)
|
||||
for rsp in gp:
|
||||
for datum in rsp:
|
||||
yield datum
|
||||
async for rsp in gp:
|
||||
yield rsp
|
||||
return
|
||||
elif '/'.join(element).startswith('inventory/firmware/all'):
|
||||
gp = tasks.TaskPile(pdupool)
|
||||
for node in nodes:
|
||||
gp.spawn(read_firmware, node, configmanager)
|
||||
for rsp in gp:
|
||||
for datum in rsp:
|
||||
yield datum
|
||||
|
||||
async for rsp in gp:
|
||||
yield rsp
|
||||
elif '/'.join(element).startswith('inventory/hardware/all'):
|
||||
gp = tasks.TaskPile(pdupool)
|
||||
for node in nodes:
|
||||
gp.spawn(read_inventory, element, node, configmanager)
|
||||
for rsp in gp:
|
||||
async for rsp in gp:
|
||||
for datum in rsp:
|
||||
yield datum
|
||||
else:
|
||||
|
||||
@@ -53,12 +53,16 @@ class TaskPile:
|
||||
self._tasks.add(task)
|
||||
return task
|
||||
|
||||
def __iter__(self):
|
||||
while self._tasks:
|
||||
done, _ = yield from asyncio.wait(self._tasks, return_when=asyncio.FIRST_COMPLETED)
|
||||
for task in done:
|
||||
self._tasks.discard(task)
|
||||
yield task
|
||||
def __aiter__(self):
|
||||
return self
|
||||
|
||||
async def __anext__(self):
|
||||
if not self._tasks:
|
||||
raise StopAsyncIteration
|
||||
done, _ = await asyncio.wait(self._tasks, return_when=asyncio.FIRST_COMPLETED)
|
||||
for task in done:
|
||||
self._tasks.discard(task)
|
||||
return task
|
||||
|
||||
class TaskPool:
|
||||
def __init__(self, max_concurrent=128):
|
||||
|
||||
Reference in New Issue
Block a user