From 9ea971d9df76634982a4be0cf48900c3dedaf4d1 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 3 Feb 2026 16:36:41 -0500 Subject: [PATCH] Begin work to rework firmwaremanager for async Requires a pool concept to manage concurrent task execution to match previous expectations. --- confluent_server/confluent/firmwaremanager.py | 14 ++--- .../plugins/hardwaremanagement/redfish.py | 6 +-- confluent_server/confluent/tasks.py | 52 +++++++++++++++++++ 3 files changed, 62 insertions(+), 10 deletions(-) diff --git a/confluent_server/confluent/firmwaremanager.py b/confluent_server/confluent/firmwaremanager.py index eb5d4c86..b35086d6 100644 --- a/confluent_server/confluent/firmwaremanager.py +++ b/confluent_server/confluent/firmwaremanager.py @@ -21,7 +21,7 @@ import confluent.exceptions as exc import confluent.log as log import confluent.messages as msg -import eventlet +import confluent.tasks as tasks import io import os import pwd @@ -31,12 +31,12 @@ import traceback updatesbytarget = {} uploadsbytarget = {} downloadsbytarget = {} -updatepool = eventlet.greenpool.GreenPool(256) _tracelog = None sharedfiles = {} +updatepool = tasks.TaskPool(max_concurrent=256) -def execupdate(handler, filename, updateobj, type, owner, node, datfile): +async def execupdate(handler, filename, updateobj, type, owner, node, datfile): global _tracelog try: if type != 'ffdc' and not datfile: @@ -66,10 +66,10 @@ def execupdate(handler, filename, updateobj, type, owner, node, datfile): return try: if type == 'firmware': - completion = handler(filename, progress=updateobj.handle_progress, + completion = await handler(filename, progress=updateobj.handle_progress, data=datfile, bank=updateobj.bank) else: - completion = handler(filename, progress=updateobj.handle_progress, + completion = await handler(filename, progress=updateobj.handle_progress, data=datfile) if type == 'ffdc' and completion: filename = completion @@ -122,7 +122,7 @@ class Updater(object): else: datfile = None self.datfile = datfile - self.updateproc = updatepool.spawn(execupdate, handler, filename, + self.updateproc = updatepool.schedule(execupdate, handler, filename, self, type, owner, node, datfile) if type == 'firmware': myparty = updatesbytarget @@ -145,7 +145,7 @@ class Updater(object): self.detail = progress.get('detail', '') def cancel(self): - self.updateproc.kill() + self.updateproc.cancel() if self.datfile: self.datfile.close() diff --git a/confluent_server/confluent/plugins/hardwaremanagement/redfish.py b/confluent_server/confluent/plugins/hardwaremanagement/redfish.py index e252f98b..f983c96e 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/redfish.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/redfish.py @@ -450,7 +450,7 @@ class IpmiHandler: elif self.element == ['console', 'license']: self.handle_license() elif self.element == ['support', 'servicedata']: - self.handle_servicedata_fetch() + await self.handle_servicedata_fetch() elif self.element == ['description']: self.handle_description() elif self.element == ['console', 'ikvm_methods']: @@ -483,12 +483,12 @@ class IpmiHandler: return self.ipmicmd.get_diagnostic_data( savefile, progress=progress, autosuffix=True) - def handle_servicedata_fetch(self): + async def handle_servicedata_fetch(self): u = firmwaremanager.Updater( self.node, self.get_diags, self.inputdata.nodefile(self.node), self.tenant, type='ffdc', owner=self.current_user) - self.output.put(msg.CreatedResource( + await self.output.put(msg.CreatedResource( 'nodes/{0}/support/servicedata/{1}'.format(self.node, u.name))) async def handle_attach_media(self): diff --git a/confluent_server/confluent/tasks.py b/confluent_server/confluent/tasks.py index 09c1b583..9d8548a0 100644 --- a/confluent_server/confluent/tasks.py +++ b/confluent_server/confluent/tasks.py @@ -22,6 +22,58 @@ import random tsks = {} +class TaskHolder: + def __init__(self, coro, args): + self._coro = coro + self._args = args + self._cancelled = False + self._task = None + + def cancel(self): + self._cancelled = True + if self._task: + self._task.cancel() + + def cancelled(self): + return self._cancelled + + async def run(self): + return await self._coro(*self._args) + + def assign_task(self, task): + self._task = task + + + +class TaskPool: + def __init__(self, max_concurrent): + self.max_concurrent = max_concurrent + self._tasks = set() + self._pending = [] + + + def _done_callback(self, task): + self._tasks.discard(task) + while self._pending and len(self._tasks) < self.max_concurrent: + tholder = self._pending.pop(0) + if tholder.cancelled(): + continue + currtask = spawn_task(tholder.run()) + tholder.assign_task(currtask) + self._tasks.add(currtask) + currtask.add_done_callback(self._done_callback) + + + def schedule(self, coro_func, *args): + tholder = TaskHolder(coro_func, args) + if len(self._tasks) >= self.max_concurrent: + self._pending.append(tholder) + return tholder + currtask = spawn_task(tholder.run()) + tholder.assign_task(currtask) + self._tasks.add(currtask) + currtask.add_done_callback(self._done_callback) + return tholder tasksitter = None logtrace = None