2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-05-17 19:57:19 +00:00

Refactor task management to its own module

This commit is contained in:
Jarrod Johnson
2024-08-21 11:38:46 -04:00
parent 52f172ef57
commit 1d861e60bb
19 changed files with 95 additions and 157 deletions
+2 -1
View File
@@ -33,6 +33,7 @@ import msgpack
import multiprocessing
import os
import pwd
import confluent.tasks as tasks
import confluent.userutil as userutil
import confluent.util as util
pam = None
@@ -316,7 +317,7 @@ async def check_user_passphrase(name, passphrase, operation=None, element=None,
authworkers = ProcessPoolExecutor(max_workers=1) # multiprocessing.Pool(processes=1)
else:
authcleaner.cancel()
authcleaner = util.spawn_after(30, _clean_authworkers)
authcleaner = tasks.spawn_task_after(30, _clean_authworkers)
crypted = await _do_pbkdf(passphrase, salt)
del _passchecking[(user, tenant)]
await asyncio.sleep(
@@ -23,6 +23,7 @@ import confluent.log as log
import confluent.noderange as noderange
import confluent.asynctlvdata as tlvdata
import confluent.util as util
import confluent.tasks as tasks
import socket
import ssl
import confluent.sortutil as sortutil
@@ -194,7 +195,7 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre
currentleader = leader
#spawn this as a thread...
#remote.settimeout(90)
follower = util.spawn(follow_leader(remote, leader))
follower = tasks.spawn_task(follow_leader(remote, leader))
return True
@@ -228,7 +229,7 @@ async def follow_leader(remote, leader):
await cfm.stop_following()
currentleader = None
if retrythread is None: # start a recovery
retrythread = util.spawn_after(
retrythread = tasks.spawn_task_after(
random.random(), start_collective)
async def _create_tls_connection(host, port):
@@ -476,7 +477,7 @@ async def handle_connection(connection, cert, request, local=False):
f.close()
log.log({'info': 'Connecting to collective due to join',
'subsystem': 'collective'})
util.spawn(connect_to_leader(rsp['collective'][
tasks.spawn(connect_to_leader(rsp['collective'][
'fingerprint'], name))
if 'enroll' == operation:
async with enrolling:
@@ -591,7 +592,7 @@ async def handle_connection(connection, cert, request, local=False):
await connection[1].wait_closed()
if not await connect_to_leader(None, None, leader=newleader):
if retrythread is None:
retrythread = util.spawn_after(random.random(),
retrythread = tasks.spawn_task_after(random.random(),
start_collective)
if 'getinfo' == operation:
drone = request['name']
@@ -656,7 +657,7 @@ async def handle_connection(connection, cert, request, local=False):
if not await connect_to_leader(
None, None, peername):
if retrythread is None:
retrythread = util.spawn_after(5 + random.random(),
retrythread = tasks.spawn_task_after(5 + random.random(),
start_collective)
return
if retrythread is not None:
@@ -691,7 +692,7 @@ async def handle_connection(connection, cert, request, local=False):
'subsystem': 'collective'})
if retrythread is None: # start a recovery if everyone else seems
# to have disappeared
retrythread = util.spawn_after(5 + random.random(),
retrythread = tasks.spawn_task_after(5 + random.random(),
start_collective)
# ok, we have a connecting member whose certificate checks out
# He needs to bootstrap his configuration and subscribe it to updates
@@ -741,7 +742,7 @@ async def try_assimilate(drone, followcount, remote):
cnn = remote[1].transport.get_extra_info('socket')
if not await connect_to_leader(None, None, leader=cnn.getpeername()[0]):
if retrythread is None:
retrythread = util.spawn_after(random.random(),
retrythread = tasks.spawn_task_after(random.random(),
start_collective)
return False
if 'leader' in answer:
@@ -803,7 +804,7 @@ async def become_leader(connection):
skipaddr = cnn.getpeername()[0]
if reassimilate is not None:
reassimilate.cancel()
reassimilate = util.spawn(reassimilate_missing())
reassimilate = tasks.spawn_task(reassimilate_missing())
cfm._ready = True
if await _assimilate_missing(skipaddr):
schedule_rebalance()
@@ -835,7 +836,7 @@ async def _assimilate_missing(skipaddr=None):
return True
connections = []
for ct in connecto:
connections.append(util.spawn(create_connection(ct)))
connections.append(tasks.spawn_task(create_connection(ct)))
for ent in connections:
ent = await ent
member, remote = ent
@@ -851,7 +852,7 @@ def startup():
if len(members) < 2:
# Not in collective mode, return
return
util.spawn(start_collective())
tasks.spawn(start_collective())
async def check_managers():
global failovercheck
@@ -903,7 +904,7 @@ def schedule_rebalance():
global failovercheck
if not failovercheck:
failovercheck = True
failovercheck = util.spawn_after(10, check_managers)
failovercheck = tasks.spawn_task_after(10, check_managers)
async def start_collective():
global follower
@@ -939,7 +940,7 @@ async def start_collective():
connecto.append(ldrcandidate)
connections = []
for ct in connecto:
connections.append(util.spawn(create_connection(ct)))
connections.append(tasks.spawn_task(create_connection(ct)))
pnding = connections
while pnding:
rdy, pnding = await asyncio.wait(pnding, return_when=asyncio.FIRST_COMPLETED)
@@ -961,6 +962,6 @@ async def start_collective():
finally:
if retrythread is None and follower is None:
#retrythread = asyncio.create_task(start_collective())
retrythread = util.spawn_after(5 + random.random(),
retrythread = tasks.spawn_task_after(5 + random.random(),
start_collective)
initting = False
@@ -72,6 +72,7 @@ import confluent.config.attributes as allattributes
import confluent.config.conf as conf
import confluent.noderange as noderange
import confluent.util
import confluent.tasks as tasks
import confluent.netutil as netutil
import confluent.exceptions as exc
import confluent.log
@@ -2174,7 +2175,7 @@ class ConfigManager(object):
for watcher in notifdata:
watcher = notifdata[watcher]
callback = watcher['callback']
confluent.util.spawn(_do_notifier(self, watcher, callback))
tasks.spawn(_do_notifier(self, watcher, callback))
async def del_nodes(self, nodes):
if isinstance(nodes, set):
@@ -2330,7 +2331,7 @@ class ConfigManager(object):
nodecollwatchers = self._nodecollwatchers[self.tenant]
for watcher in nodecollwatchers:
watcher = nodecollwatchers[watcher]
confluent.util.spawn(_do_add_watcher(watcher, (), self, renamemap))
tasks.spawn(_do_add_watcher(watcher, (), self, renamemap))
self._bg_sync_to_file()
async def rename_nodegroups(self, renamemap):
@@ -2525,7 +2526,7 @@ class ConfigManager(object):
nodecollwatchers = self._nodecollwatchers[self.tenant]
for watcher in nodecollwatchers:
watcher = nodecollwatchers[watcher]
confluent.util.spawn(_do_add_watcher(watcher, newnodes, self))
tasks.spawn(_do_add_watcher(watcher, newnodes, self))
self._bg_sync_to_file()
#TODO: wait for synchronization to suceed/fail??)
+9 -8
View File
@@ -31,6 +31,7 @@ import confluent.interface.console as conapi
import confluent.log as log
import confluent.core as plugin
import confluent.asynctlvdata as tlvdata
import confluent.tasks as tasks
import confluent.util as util
import socket
import random
@@ -160,7 +161,7 @@ class ConsoleHandler(object):
if self._genwatchattribs:
self._attribwatcher = self.cfgmgr.watch_attributes(
(self.node,), self._genwatchattribs, self._attribschanged)
util.spawn(self.ondemand_init())
tasks.spawn(self.ondemand_init())
async def ondemand_init(self):
await self.check_isondemand()
@@ -325,7 +326,7 @@ class ConsoleHandler(object):
if self.connectionthread:
self.connectionthread.cancel()
self.connectionthread = None
self.connectionthread = util.spawn(self._connect_backend())
self.connectionthread = tasks.spawn_task(self._connect_backend())
async def _connect_backend(self):
if self._console:
@@ -393,7 +394,7 @@ class ConsoleHandler(object):
'error': self.error})
retrytime = self._get_retry_time()
if not self.reconnect:
self.reconnect = util.spawn_after(retrytime, self._connect)
self.reconnect = tasks.spawn_task_after(retrytime, self._connect)
return
except (exc.TargetEndpointUnreachable, socket.gaierror) as se:
self.clearbuffer()
@@ -403,7 +404,7 @@ class ConsoleHandler(object):
'error': self.error})
retrytime = self._get_retry_time()
if not self.reconnect:
self.reconnect = util.spawn_after(retrytime, self._connect)
self.reconnect = tasks.spawn_task_after(retrytime, self._connect)
return
except Exception:
self.clearbuffer()
@@ -415,7 +416,7 @@ class ConsoleHandler(object):
'error': self.error})
retrytime = self._get_retry_time()
if not self.reconnect:
self.reconnect = util.spawn_after(retrytime, self._connect)
self.reconnect = tasks.spawn_task_after(retrytime, self._connect)
return
await self._got_connected()
@@ -713,7 +714,7 @@ class ProxyConsole(object):
await tlvdata.recv(remote)
await tlvdata.send(remote, termreq)
self.remote = remote
util.spawn(self.relay_data())
tasks.spawn(self.relay_data())
async def detachsession(self, session):
# we will disappear, so just let that happen...
@@ -774,11 +775,11 @@ class ConsoleSession(object):
self._evt = None
self.node = node
self.write = self.conshdl.write
util.spawn(self.delayinit(datacallback, skipreplay))
tasks.spawn(self.delayinit(datacallback, skipreplay))
async def delayinit(self, datacallback, skipreplay):
if datacallback is None:
self.reaper = util.spawn_after(15, self.destroy)
self.reaper = tasks.spawn_task_after(15, self.destroy)
self.databuffer = collections.deque([])
self.data_handler = self.got_data
if not skipreplay:
+4 -3
View File
@@ -55,6 +55,7 @@ try:
import confluent.shellmodule as shellmodule
except ImportError:
pass
import confluent.tasks as tasks
import confluent.util as util
import inspect
import itertools
@@ -885,7 +886,7 @@ async def handle_dispatch(connection, cert, dispatch, peername):
await connection[1].wait_closed()
return
xmitlock = asyncio.Lock()
keepalive = util.spawn(_keepalivefn(connection, xmitlock))
keepalive = tasks.spawn_task(_keepalivefn(connection, xmitlock))
dispatch = msgpack.unpackb(dispatch[2:], raw=False)
configmanager = cfm.ConfigManager(dispatch['tenant'])
nodes = dispatch['nodes']
@@ -1120,7 +1121,7 @@ async def handle_node_request(configmanager, inputdata, operation,
numworkers = 0
for hfunc in nodesbyhandler:
numworkers += 1
util.spawn(addtoqueue(passvalues, hfunc, {'nodes': nodesbyhandler[hfunc],
tasks.spawn(addtoqueue(passvalues, hfunc, {'nodes': nodesbyhandler[hfunc],
'element': pathcomponents,
'configmanager': configmanager,
'inputdata': _get_input_data(_plugin, pathcomponents,
@@ -1128,7 +1129,7 @@ async def handle_node_request(configmanager, inputdata, operation,
isnoderange, configmanager)}))
for manager in nodesbymanager:
numworkers += 1
util.spawn(addtoqueue(passvalues, dispatch_request, {
tasks.spawn(addtoqueue(passvalues, dispatch_request, {
'nodes': nodesbymanager[manager], 'manager': manager,
'element': pathcomponents, 'configmanager': configmanager,
'inputdata': inputdata, 'operation': operation, 'isnoderange': isnoderange}))
+3 -3
View File
@@ -3,7 +3,7 @@ import code
import os
import socket
import sys
import confluent.util as util
import confluent.tasks as tasks
#this will ultimately fill the role of the 'backdoor' of eventlet
@@ -58,7 +58,7 @@ async def srv_debug(sock):
cloop = asyncio.get_event_loop()
while True:
cnn, addr = await cloop.sock_accept(sock)
util.spawn(interact(cloop, cnn))
tasks.spawn(interact(cloop, cnn))
def start_dbgif():
@@ -76,4 +76,4 @@ def start_dbgif():
os.chmod("/var/run/confluent/dbg.sock",
0o600)
os.umask(oumask)
util.spawn(srv_debug(unixsocket))
tasks.spawn(srv_debug(unixsocket))
+20 -19
View File
@@ -82,6 +82,7 @@ import confluent.log as log
import confluent.messages as msg
import confluent.networking.macmap as macmap
import confluent.noderange as noderange
import confluent.tasks as tasks
import confluent.util as util
import inspect
import json
@@ -688,7 +689,7 @@ async def _recheck_nodes_backend(nodeattribs, configmanager):
if info['handler'] is None:
next
handler = info['handler'].NodeHandler(info, configmanager)
util.spawn(eval_node(configmanager, handler, info, nodename))
tasks.spawn(eval_node(configmanager, handler, info, nodename))
except Exception:
traceback.print_exc()
log.log({'error': 'Unexpected error during discovery of {0}, check debug '
@@ -733,7 +734,7 @@ async def _recheck_single_unknown_info(configmanager, info):
# if cancel did not result in dead, then we are in progress
if rechecker is None or rechecker.done():
rechecktime = util.monotonic_time() + 300
rechecker = util.spawn_after(300, _periodic_recheck,
rechecker = tasks.spawn_task_after(300, _periodic_recheck,
configmanager)
return
nodename, info['maccount'] = await get_nodename(configmanager, handler, info)
@@ -748,7 +749,7 @@ async def _recheck_single_unknown_info(configmanager, info):
known_nodes[nodename][info['hwaddr']] = info
info['discostatus'] = 'discovered'
return # already known, no need for more
util.spawn(eval_node(configmanager, handler, info, nodename))
tasks.spawn(eval_node(configmanager, handler, info, nodename))
def safe_detected(info):
@@ -757,7 +758,7 @@ def safe_detected(info):
if info['hwaddr'] in runningevals:
# Do not evaluate the same mac multiple times at once
return
runningevals[info['hwaddr']] = util.spawn(eval_detected(info))
runningevals[info['hwaddr']] = tasks.spawn_task(eval_detected(info))
async def eval_detected(info):
@@ -786,7 +787,7 @@ async def detected(info):
return
if (handler and not handler.NodeHandler.adequate(info) and
info.get('protocol', None)):
util.spawn_after(10, info['protocol'].fix_info, info,
tasks.spawn_after(10, info['protocol'].fix_info, info,
safe_detected)
return
if info['hwaddr'] in known_info and 'addresses' in info:
@@ -865,7 +866,7 @@ async def detected(info):
rechecker.cancel()
if rechecker is None or rechecker.done():
rechecktime = util.monotonic_time() + 300
rechecker = util.spawn_after(300, _periodic_recheck, cfg)
rechecker = tasks.spawn_task_after(300, _periodic_recheck, cfg)
unknown_info[info['hwaddr']] = info
info['discostatus'] = 'unidentfied'
#TODO, eventlet spawn after to recheck sooner, or somehow else
@@ -1461,7 +1462,7 @@ async def discover_node(cfg, handler, info, nodename, manual):
info['discostatus'] = 'discovered'
for i in pending_by_uuid.get(curruuid, []):
util.spawn(_recheck_single_unknown_info(cfg, i))
tasks.spawn(_recheck_single_unknown_info(cfg, i))
try:
del pending_by_uuid[curruuid]
except KeyError:
@@ -1550,7 +1551,7 @@ async def _handle_nodelist_change(configmanager):
await _recheck_nodes((), configmanager)
if needaddhandled:
needaddhandled = False
nodeaddhandler = util.spawn(_handle_nodelist_change(configmanager))
nodeaddhandler = tasks.spawn_task(_handle_nodelist_change(configmanager))
else:
nodeaddhandler = None
@@ -1578,7 +1579,7 @@ async def newnodes(added, deleting, renamed, configmanager):
if nodeaddhandler:
needaddhandled = True
else:
nodeaddhandler = util.spawn(_handle_nodelist_change(configmanager))
nodeaddhandler = tasks.spawn_task(_handle_nodelist_change(configmanager))
@@ -1600,7 +1601,7 @@ async def _periodic_recheck(configmanager):
# for rechecker was requested in the course of recheck_nodes
if rechecker is None:
rechecktime = util.monotonic_time() + 900
rechecker = util.spawn_after(900, _periodic_recheck,
rechecker = tasks.spawn_task_after(900, _periodic_recheck,
configmanager)
@@ -1610,7 +1611,7 @@ async def rescan():
if scanner:
return
else:
scanner = util.spawn(blocking_scan())
scanner = tasks.spawn_task(blocking_scan())
await remotescan()
async def remotescan():
@@ -1625,8 +1626,8 @@ async def remotescan():
async def blocking_scan():
global scanner
slpscan = util.spawn(slp.active_scan(safe_detected, slp))
ssdpscan = util.spawn(ssdp.active_scan(safe_detected, ssdp))
slpscan = tasks.spawn_task(slp.active_scan(safe_detected, slp))
ssdpscan = tasks.spawn_task(ssdp.active_scan(safe_detected, ssdp))
await slpscan
await ssdpscan
#ssdpscan.wait()
@@ -1649,20 +1650,20 @@ def start_detection():
start_autosense()
if rechecker is None:
rechecktime = util.monotonic_time() + 900
rechecker = util.spawn_after(900, _periodic_recheck, cfg)
util.spawn(ssdp.snoop(safe_detected, None, ssdp, get_node_by_uuid_or_mac))
rechecker = tasks.spawn_task_after(900, _periodic_recheck, cfg)
tasks.spawn(ssdp.snoop(safe_detected, None, ssdp, get_node_by_uuid_or_mac))
def stop_autosense():
for watcher in list(autosensors):
watcher.kill()
watcher.cancel()
autosensors.discard(watcher)
def start_autosense():
autosensors.add(util.spawn(slp.snoop(safe_detected, slp)))
autosensors.add(tasks.spawn_task(slp.snoop(safe_detected, slp)))
#autosensors.add(eventlet.spawn(mdns.snoop, safe_detected, mdns))
util.spawn(pxe.snoop(safe_detected, pxe, get_node_guess_by_uuid))
tasks.spawn(pxe.snoop(safe_detected, pxe, get_node_guess_by_uuid))
#autosensors.add(eventlet.spawn(pxe.snoop, safe_detected, pxe, get_node_guess_by_uuid))
util.spawn(remotescan())
tasks.spawn(remotescan())
nodes_by_fprint = {}
@@ -30,6 +30,7 @@ import confluent.neighutil as neighutil
import confluent.log as log
import confluent.netutil as netutil
import confluent.util as util
import confluent.tasks as tasks
import ctypes
import ctypes.util
import netifaces
@@ -335,7 +336,7 @@ async def proxydhcp(handler, nodeguess):
def start_proxydhcp(handler, nodeguess=None):
util.spawn(proxydhcp(handler, nodeguess))
tasks.spawn(proxydhcp(handler, nodeguess))
def new_dhcp_packet(handler, nodeguess, cfg, net4):
@@ -360,7 +361,7 @@ def new_dhcp6_packet(handler, net6, cfg, nodeguess):
return
rqv = memoryview(pkt)
if rqv[0] in (1, 3):
util.spawn(process_dhcp6req(handler, rqv, addr, net6, cfg, nodeguess))
tasks.spawn(process_dhcp6req(handler, rqv, addr, net6, cfg, nodeguess))
async def snoop(handler, protocol=None, nodeguess=None):
@@ -16,6 +16,7 @@
import asyncio
import confluent.neighutil as neighutil
import confluent.tasks as tasks
import confluent.util as util
import confluent.log as log
import os
@@ -673,7 +674,7 @@ async def scan(srvtypes=_slp_services, addresses=None, localonly=False):
break
else:
continue
tsks.append(util.spawn(_add_attributes(rsps[id])))
tsks.append(tasks.spawn_task(_add_attributes(rsps[id])))
handleids.add(id)
if tsks:
await asyncio.wait(tsks)
@@ -36,6 +36,7 @@ import confluent.noderange as noderange
import confluent.util as util
import confluent.log as log
import confluent.netutil as netutil
import confluent.tasks as tasks
import socket
import os
import time
@@ -121,7 +122,7 @@ def _process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byeha
else:
return
if handler:
util.spawn(check_fish_handler(handler, peerdata, known_peers, newmacs, peerbymacaddress, machandlers, mac, peer, targurl, targtype))
tasks.spawn(check_fish_handler(handler, peerdata, known_peers, newmacs, peerbymacaddress, machandlers, mac, peer, targurl, targtype))
async def check_fish_handler(handler, peerdata, known_peers, newmacs, peerbymacaddress, machandlers, mac, peer, targurl, targtype):
retdata = await check_fish(('/DeviceDescription.json', peerdata, targtype))
@@ -468,7 +469,7 @@ async def _find_service(service, target):
# pooltargs.append(('/redfish/v1/', peerdata[nid]))
tsks = []
for targ in pooltargs:
tsks.append(util.spawn_task(check_fish(targ)))
tsks.append(tasks.spawn_task(check_fish(targ)))
while tsks:
done, tsks = await asyncio.wait(tsks, return_when=asyncio.FIRST_COMPLETED)
for dt in done:
+5 -36
View File
@@ -68,6 +68,7 @@ import collections
import confluent.config.configmanager
import confluent.config.conf as conf
import confluent.exceptions as exc
import confluent.tasks as tasks
import inspect
import glob
import json
@@ -116,40 +117,6 @@ MIDNIGHT = 24 * 60 * 60
_loggers = {}
async def _sleep_and_run(sleeptime, func, args):
await asyncio.sleep(sleeptime)
awt = func(*args)
if inspect.isawaitable(awt):
await awt
def spawn_after(sleeptime, func, *args):
if func is None:
raise Exception('tf')
return spawn(_sleep_and_run(sleeptime, func, args))
tsks = {}
def spawn(coro):
tskid = random.random()
while tskid in tsks:
tskid = random.random()
tsks[tskid] = 1
try:
tsks[tskid] = asyncio.create_task(_run(coro, tskid), name=repr(coro))
except AttributeError:
tsks[tskid] = asyncio.get_event_loop().create_task(_run(coro, tskid), name=repr(coro))
return tsks[tskid]
async def _run(coro, taskid):
ret = await coro
del tsks[taskid]
return ret
class Events(object):
(
undefined, clearscreen, clientconnect, clientdisconnect,
@@ -665,7 +632,7 @@ class Logger(object):
self.logentries.appendleft([DataTypes.event, tstamp, roll_data,
Events.logrollover, None])
if self.closer is None:
self.closer = spawn_after(15, self.closelog)
self.closer = tasks.spawn_task_after(15, self.closelog)
self.writer = None
def read_recent_text(self, size):
@@ -814,7 +781,7 @@ class Logger(object):
[ltype, timestamp, logdata, event, eventdata])
if self.buffered:
if self.writer is None:
self.writer = spawn_after(2, self.writedata)
self.writer = tasks.spawn_task_after(2, self.writedata)
else:
self.writedata()
@@ -840,3 +807,5 @@ def logtrace():
tracelog = Logger('trace', buffered=False)
tracelog.log(traceback.format_exc(), ltype=DataTypes.event,
event=Events.stacktrace)
tasks.logtrace = logtrace
@@ -53,6 +53,7 @@ import confluent.exceptions as exc
import confluent.log as log
import confluent.messages as msg
import confluent.noderange as noderange
import confluent.tasks as tasks
import confluent.util as util
import fcntl
import msgpack
@@ -471,7 +472,7 @@ async def update_macmap(configmanager, impatient=False):
except GeneratorExit:
# the calling function has stopped caring, but we want to finish
# the sweep, background it
util.spawn(_finish_update(completions))
tasks.spawn(_finish_update(completions))
raise
@@ -586,7 +587,7 @@ def handle_api_request(configmanager, inputdata, operation, pathcomponents):
pathcomponents == ['networking', 'macs', 'rescan']):
if inputdata != {'rescan': 'start'}:
raise exc.InvalidArgumentException('Input must be rescan=start')
util.spawn(rescan(configmanager))
tasks.spawn(rescan(configmanager))
return [msg.KeyValueData({'rescan': 'started'})]
raise exc.NotImplementedException(
'Operation {0} on {1} not implemented'.format(
@@ -722,7 +723,7 @@ async def offloader_main(cloop):
data = await sreader.read(512)
upacker.feed(data)
for cmd in upacker:
util.spawn(_snmp_map_switch_relay(*cmd))
tasks.spawn(_snmp_map_switch_relay(*cmd))
sys.exit(0)
async def test_main(cloop):
+3 -2
View File
@@ -19,6 +19,7 @@ if __name__ == '__main__':
import confluent.exceptions as exc
import confluent.messages as msg
import confluent.tasks as tasks
import confluent.util as util
COPY = 1
@@ -847,7 +848,7 @@ async def generate_stock_profiles(defprofile, distpath, targpath, osname,
await util.check_call(
'sh', '{0}/initprofile.sh'.format(dirname),
targpath, dirname)
bootupdates.append(util.spawn(update_boot(dirname)))
bootupdates.append(tasks.spawn_task(update_boot(dirname)))
profilelist.append(profname)
for upd in bootupdates:
await upd
@@ -911,7 +912,7 @@ class MediaImporter(object):
importing[importkey] = self
self.filename = os.path.abspath(media)
self.error = ''
self.importer = util.spawn(self.importmedia())
self.importer = tasks.spawn_task(self.importmedia())
def stop(self):
if self.worker and self.worker.returncode is None:
@@ -23,6 +23,7 @@ import asyncio
import confluent.exceptions as cexc
import confluent.interface.console as conapi
import confluent.log as log
import confluent.tasks as tasks
import confluent.util as util
import aiohmi.exceptions as pygexc
import aiohmi.redfish.command as rcmd
@@ -129,7 +130,7 @@ class OpenBmcConsole(conapi.Console):
self.ws = await self.clisess.ws_connect('wss://{0}/console0'.format(self.bmc), protocols=protos, ssl=self.ssl)
#self.ws.connect('wss://{0}/console0'.format(self.bmc), host=bmc, cookie='XSRF-TOKEN={0}; SESSION={1}'.format(wc.cookies['XSRF-TOKEN'], wc.cookies['SESSION']), subprotocols=[wc.cookies['XSRF-TOKEN']])
self.connected = True
self.recvr = util.spawn_task(self.recvdata())
self.recvr = tasks.spawn_task(self.recvdata())
return
async def write(self, data):
@@ -19,6 +19,7 @@ import socket
import confluent.exceptions as exc
import aiohmi.util.webclient as webclient
import confluent.messages as msg
import confluent.tasks as tasks
import confluent.util as util
class SwitchSensor(object):
@@ -120,7 +121,7 @@ def _run_method(method, workers, results, configmanager, nodes, element):
creds = configmanager.get_node_attributes(
nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True)
for node in nodes:
workers.add(util.spawn(method(configmanager, creds,
workers.add(tasks.spawn_task(method(configmanager, creds,
node, results, element)))
async def retrieve(nodes, element, configmanager, inputdata):
@@ -22,6 +22,7 @@
import confluent.exceptions as cexc
import confluent.interface.console as conapi
import confluent.log as log
import confluent.tasks as tasks
import confluent.util as util
import hashlib
@@ -78,7 +79,7 @@ class SshShell(conapi.Console):
def logon(self):
self.inputmode = -3
util.spawn(self.do_logon())
tasks.spawn(self.do_logon())
async def do_logon(self):
sco = asyncssh.SSHClientConnectionOptions()
@@ -120,7 +121,7 @@ class SshShell(conapi.Console):
self.connected = True
await self.datacallback('Connected\r\n')
self.shell = await self.ssh.open_session(term_type='vt100', term_size=(self.width, self.height))
self.rxthread = util.spawn(self.recvdata())
self.rxthread = tasks.spawn_task(self.recvdata())
async def write(self, data):
if self.inputmode == -2:
+2 -2
View File
@@ -22,7 +22,7 @@
import confluent.consoleserver as consoleserver
import confluent.exceptions as exc
import confluent.messages as msg
import confluent.util as util
import confluent.tasks as tasks
activesessions = {}
@@ -48,7 +48,7 @@ class _ShellHandler(consoleserver.ConsoleHandler):
def _got_disconnected(self):
self.connectstate = 'closed'
util.spawn(self._bgdisconnect())
tasks.spawn(self._bgdisconnect())
async def _bgdisconnect(self):
await self._send_rcpts({'connectstate': self.connectstate})
+8 -7
View File
@@ -46,6 +46,7 @@ import confluent.log as log
import confluent.core as pluginapi
import confluent.shellserver as shellserver
import confluent.collective.manager as collective
import confluent.tasks as tasks
import confluent.util as util
tracelog = None
@@ -386,9 +387,9 @@ async def _tlshandler(bind_host, bind_port):
while (1): # TODO: exithook
cnn, addr = await cloop.sock_accept(plainsocket)
if addr[1] < 1000:
asyncio.create_task(cs.handle_client(cnn, addr))
tasks.spawn(cs.handle_client(cnn, addr))
else:
asyncio.create_task(_tlsstartup(cnn))
tasks.spawn(_tlsstartup(cnn))
@ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p)
@@ -443,7 +444,7 @@ async def _tlsstartup(cnn):
#cnn = ctx.wrap_socket(cnn, server_side=True)
except AttributeError:
raise Exception('Unable to find workable SSL support')
asyncio.create_task(sessionhdl(cnn, authname, cert=cert))
tasks.spawn(sessionhdl(cnn, authname, cert=cert))
def removesocket():
try:
@@ -489,7 +490,7 @@ async def _unixdomainhandler():
except KeyError:
cnn.close()
return
util.spawn(sessionhdl(cnn, authname, skipauth))
tasks.spawn(sessionhdl(cnn, authname, skipauth))
#asyncio.create_task(sessionhdl(cnn, authname, skipauth))
@@ -510,8 +511,8 @@ class SockApi(object):
self.start_remoteapi()
else:
cloop = asyncio.get_event_loop()
cloop.create_task(self.watch_for_cert())
self.unixdomainserver = asyncio.create_task(_unixdomainhandler())
tasks.spawn(self.watch_for_cert())
self.unixdomainserver = tasks.spawn_task(_unixdomainhandler())
async def watch_for_cert(self):
watcher = libc.inotify_init1(os.O_NONBLOCK)
@@ -547,5 +548,5 @@ class SockApi(object):
def start_remoteapi(self):
if self.tlsserver is not None:
return
self.tlsserver = asyncio.get_event_loop().create_task(
self.tlsserver = tasks.spawn_task(
_tlshandler(self.bind_host, self.bind_port))
+2 -48
View File
@@ -29,6 +29,7 @@ import ssl
import struct
import random
import subprocess
import confluent.tasks as tasks
def mkdirp(path, mode=0o777):
@@ -45,55 +46,8 @@ async def check_call(*cmd, **kwargs):
if rc != 0:
raise subprocess.CalledProcessError(rc, cmd)
async def _sleep_and_run(sleeptime, func, args):
await asyncio.sleep(sleeptime)
await func(*args)
def spawn_after(sleeptime, func, *args):
if func is None:
raise Exception('tf')
return spawn(_sleep_and_run(sleeptime, func, args))
tsks = {}
tasksitter = None
async def _sit_tasks():
while True:
while not tsks:
await asyncio.sleep(15)
tsk_list = [tsks[x] for x in tsks]
cmpl, pnding = await asyncio.wait(tsk_list, return_when=asyncio.FIRST_COMPLETED, timeout=15)
for tskid in list(tsks):
if tsks[tskid].done():
try:
tsk = tsks[tskid]
del tsks[tskid]
await tsk
except Exception as e:
print(repr(e))
def spawn_task(coro):
try:
return asyncio.create_task(coro)
except AttributeError:
return asyncio.get_event_loop().create_task(coro)
def spawn(coro):
global tasksitter
if not tasksitter:
tasksitter = spawn_task(_sit_tasks())
tskid = random.random()
while tskid in tsks:
tskid = random.random()
tsks[tskid] = spawn_task(coro)
return tsks[tskid]
async def check_output(*cmd):
@@ -264,7 +218,7 @@ class TLSCertVerifier(object):
auditlog = log.Logger('audit')
auditlog.log({'node': self.node, 'event': 'certautoadd',
'fingerprint': fingerprint})
spawn(self.cfm.set_node_attributes(
tasks.spawn(self.cfm.set_node_attributes(
{self.node: {self.fieldname: fingerprint}}))
return True
elif cert_matches(storedprint[self.node][self.fieldname]['value'],