mirror of
https://github.com/xcat2/confluent.git
synced 2026-06-18 17:40:45 +00:00
Begin work to rework firmwaremanager for async
Requires a pool concept to manage concurrent task execution to match previous expectations.
This commit is contained in:
@@ -21,7 +21,7 @@
|
||||
import confluent.exceptions as exc
|
||||
import confluent.log as log
|
||||
import confluent.messages as msg
|
||||
import eventlet
|
||||
import confluent.tasks as tasks
|
||||
import io
|
||||
import os
|
||||
import pwd
|
||||
@@ -31,12 +31,12 @@ import traceback
|
||||
updatesbytarget = {}
|
||||
uploadsbytarget = {}
|
||||
downloadsbytarget = {}
|
||||
updatepool = eventlet.greenpool.GreenPool(256)
|
||||
_tracelog = None
|
||||
sharedfiles = {}
|
||||
updatepool = tasks.TaskPool(max_concurrent=256)
|
||||
|
||||
|
||||
def execupdate(handler, filename, updateobj, type, owner, node, datfile):
|
||||
async def execupdate(handler, filename, updateobj, type, owner, node, datfile):
|
||||
global _tracelog
|
||||
try:
|
||||
if type != 'ffdc' and not datfile:
|
||||
@@ -66,10 +66,10 @@ def execupdate(handler, filename, updateobj, type, owner, node, datfile):
|
||||
return
|
||||
try:
|
||||
if type == 'firmware':
|
||||
completion = handler(filename, progress=updateobj.handle_progress,
|
||||
completion = await handler(filename, progress=updateobj.handle_progress,
|
||||
data=datfile, bank=updateobj.bank)
|
||||
else:
|
||||
completion = handler(filename, progress=updateobj.handle_progress,
|
||||
completion = await handler(filename, progress=updateobj.handle_progress,
|
||||
data=datfile)
|
||||
if type == 'ffdc' and completion:
|
||||
filename = completion
|
||||
@@ -122,7 +122,7 @@ class Updater(object):
|
||||
else:
|
||||
datfile = None
|
||||
self.datfile = datfile
|
||||
self.updateproc = updatepool.spawn(execupdate, handler, filename,
|
||||
self.updateproc = updatepool.schedule(execupdate, handler, filename,
|
||||
self, type, owner, node, datfile)
|
||||
if type == 'firmware':
|
||||
myparty = updatesbytarget
|
||||
@@ -145,7 +145,7 @@ class Updater(object):
|
||||
self.detail = progress.get('detail', '')
|
||||
|
||||
def cancel(self):
|
||||
self.updateproc.kill()
|
||||
self.updateproc.cancel()
|
||||
if self.datfile:
|
||||
self.datfile.close()
|
||||
|
||||
|
||||
@@ -450,7 +450,7 @@ class IpmiHandler:
|
||||
elif self.element == ['console', 'license']:
|
||||
self.handle_license()
|
||||
elif self.element == ['support', 'servicedata']:
|
||||
self.handle_servicedata_fetch()
|
||||
await self.handle_servicedata_fetch()
|
||||
elif self.element == ['description']:
|
||||
self.handle_description()
|
||||
elif self.element == ['console', 'ikvm_methods']:
|
||||
@@ -483,12 +483,12 @@ class IpmiHandler:
|
||||
return self.ipmicmd.get_diagnostic_data(
|
||||
savefile, progress=progress, autosuffix=True)
|
||||
|
||||
def handle_servicedata_fetch(self):
|
||||
async def handle_servicedata_fetch(self):
|
||||
u = firmwaremanager.Updater(
|
||||
self.node, self.get_diags,
|
||||
self.inputdata.nodefile(self.node), self.tenant, type='ffdc',
|
||||
owner=self.current_user)
|
||||
self.output.put(msg.CreatedResource(
|
||||
await self.output.put(msg.CreatedResource(
|
||||
'nodes/{0}/support/servicedata/{1}'.format(self.node, u.name)))
|
||||
|
||||
async def handle_attach_media(self):
|
||||
|
||||
@@ -22,6 +22,58 @@ import random
|
||||
|
||||
tsks = {}
|
||||
|
||||
class TaskHolder:
|
||||
def __init__(self, coro, args):
|
||||
self._coro = coro
|
||||
self._args = args
|
||||
self._cancelled = False
|
||||
self._task = None
|
||||
|
||||
def cancel(self):
|
||||
self._cancelled = True
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
|
||||
def cancelled(self):
|
||||
return self._cancelled
|
||||
|
||||
async def run(self):
|
||||
return await self._coro(*self._args)
|
||||
|
||||
def assign_task(self, task):
|
||||
self._task = task
|
||||
|
||||
|
||||
|
||||
class TaskPool:
|
||||
def __init__(self, max_concurrent):
|
||||
self.max_concurrent = max_concurrent
|
||||
self._tasks = set()
|
||||
self._pending = []
|
||||
|
||||
|
||||
def _done_callback(self, task):
|
||||
self._tasks.discard(task)
|
||||
while self._pending and len(self._tasks) < self.max_concurrent:
|
||||
tholder = self._pending.pop(0)
|
||||
if tholder.cancelled():
|
||||
continue
|
||||
currtask = spawn_task(tholder.run())
|
||||
tholder.assign_task(currtask)
|
||||
self._tasks.add(currtask)
|
||||
currtask.add_done_callback(self._done_callback)
|
||||
|
||||
|
||||
def schedule(self, coro_func, *args):
|
||||
tholder = TaskHolder(coro_func, args)
|
||||
if len(self._tasks) >= self.max_concurrent:
|
||||
self._pending.append(tholder)
|
||||
return tholder
|
||||
currtask = spawn_task(tholder.run())
|
||||
tholder.assign_task(currtask)
|
||||
self._tasks.add(currtask)
|
||||
currtask.add_done_callback(self._done_callback)
|
||||
return tholder
|
||||
|
||||
tasksitter = None
|
||||
logtrace = None
|
||||
|
||||
Reference in New Issue
Block a user