From f842a0a6e3063d6deb02f27d44485d215c4615b8 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 27 Mar 2026 12:56:47 -0400 Subject: [PATCH] Fikup TaskPile and it's uses --- .../plugins/hardwaremanagement/enlogic.py | 24 ++++++++----------- .../plugins/hardwaremanagement/geist.py | 21 +++++++--------- confluent_server/confluent/tasks.py | 16 ++++++++----- 3 files changed, 29 insertions(+), 32 deletions(-) diff --git a/confluent_server/confluent/plugins/hardwaremanagement/enlogic.py b/confluent_server/confluent/plugins/hardwaremanagement/enlogic.py index fae717b7..42c8560e 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/enlogic.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/enlogic.py @@ -186,8 +186,7 @@ async def read_sensors(element, node, configmanager): 'type': 'Power', }, ] - yield msg.SensorReadings(readings, name=node) - return + return msg.SensorReadings(readings, name=node) def get_client(node, configmanager): if node not in _pduclients: @@ -204,7 +203,7 @@ async def read_firmware(node, configmanager): gc = get_client(node, configmanager) adev = await gc.grab_json_response('/xhrgetuserlist.jsp') myversion = adev[0]['fwver'] - yield msg.Firmware([{'PDU Firmware': {'version': myversion}}], node) + return msg.Firmware([{'PDU Firmware': {'version': myversion}}], node) async def read_inventory(element, node, configmanager): @@ -221,7 +220,7 @@ async def read_inventory(element, node, configmanager): info['Product Name'] = adev['pdu'][0]['model'] info['Model'] = adev['pdu'][0]['part_number'] inventory['information'] = info - yield msg.KeyValueData({'inventory': [inventory]}, node) + return msg.KeyValueData({'inventory': [inventory]}, node) async def retrieve(nodes, element, configmanager, inputdata): @@ -230,7 +229,7 @@ async def retrieve(nodes, element, configmanager, inputdata): for node in nodes: gp.spawn(get_outlet, element, node, configmanager) - for res in gp: + async for res in gp: yield res return @@ -238,25 +237,22 @@ async def retrieve(nodes, element, configmanager, inputdata): gp = tasks.TaskPile(pdupool) for node in nodes: gp.spawn(read_sensors, element, node, configmanager) - for rsp in gp: - for datum in rsp: - yield datum + async for rsp in gp: + yield rsp return elif '/'.join(element).startswith('inventory/firmware/all'): gp = tasks.TaskPile(pdupool) for node in nodes: gp.spawn(read_firmware, node, configmanager) - for rsp in gp: - for datum in rsp: - yield datum + async for rsp in gp: + yield rsp elif '/'.join(element).startswith('inventory/hardware/all'): gp = tasks.TaskPile(pdupool) for node in nodes: gp.spawn(read_inventory, element, node, configmanager) - for rsp in gp: - for datum in rsp: - yield datum + async for rsp in gp: + yield rsp else: for node in nodes: yield msg.ConfluentResourceUnavailable(node, 'Not implemented') diff --git a/confluent_server/confluent/plugins/hardwaremanagement/geist.py b/confluent_server/confluent/plugins/hardwaremanagement/geist.py index 3b83f4ec..7fc9d867 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/geist.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/geist.py @@ -234,9 +234,9 @@ async def read_sensors(element, node, configmanager): process_measurements(name, category, datum['outlet'], 'outlet', readings) if justnames: for reading in readings: - yield msg.ChildCollection(simplify_name(reading['name'])) + return msg.ChildCollection(simplify_name(reading['name'])) else: - yield msg.SensorReadings(readings, name=node) + return msg.SensorReadings(readings, name=node) async def get_outlet(element, node, configmanager): @@ -250,7 +250,7 @@ async def read_firmware(node, configmanager): gc = GeistClient(node, configmanager) adev = await gc.wc.grab_json_response('/api/sys') myversion = adev['data']['version'] - yield msg.Firmware([{'PDU Firmware': {'version': myversion}}], node) + return msg.Firmware([{'PDU Firmware': {'version': myversion}}], node) async def read_inventory(element, node, configmanager): @@ -306,7 +306,7 @@ async def retrieve(nodes, element, configmanager, inputdata): gp = tasks.TaskPile(pdupool) for node in nodes: gp.spawn(get_outlet, element, node, configmanager) - for res in gp: + async for res in gp: yield res return @@ -314,23 +314,20 @@ async def retrieve(nodes, element, configmanager, inputdata): gp = tasks.TaskPile(pdupool) for node in nodes: gp.spawn(read_sensors, element, node, configmanager) - for rsp in gp: - for datum in rsp: - yield datum + async for rsp in gp: + yield rsp return elif '/'.join(element).startswith('inventory/firmware/all'): gp = tasks.TaskPile(pdupool) for node in nodes: gp.spawn(read_firmware, node, configmanager) - for rsp in gp: - for datum in rsp: - yield datum - + async for rsp in gp: + yield rsp elif '/'.join(element).startswith('inventory/hardware/all'): gp = tasks.TaskPile(pdupool) for node in nodes: gp.spawn(read_inventory, element, node, configmanager) - for rsp in gp: + async for rsp in gp: for datum in rsp: yield datum else: diff --git a/confluent_server/confluent/tasks.py b/confluent_server/confluent/tasks.py index 14e775cc..06f1bf41 100644 --- a/confluent_server/confluent/tasks.py +++ b/confluent_server/confluent/tasks.py @@ -53,12 +53,16 @@ class TaskPile: self._tasks.add(task) return task - def __iter__(self): - while self._tasks: - done, _ = yield from asyncio.wait(self._tasks, return_when=asyncio.FIRST_COMPLETED) - for task in done: - self._tasks.discard(task) - yield task + def __aiter__(self): + return self + + async def __anext__(self): + if not self._tasks: + raise StopAsyncIteration + done, _ = await asyncio.wait(self._tasks, return_when=asyncio.FIRST_COMPLETED) + for task in done: + self._tasks.discard(task) + return task class TaskPool: def __init__(self, max_concurrent=128):