diff --git a/confluent_server/confluent/auth.py b/confluent_server/confluent/auth.py index a1e296ba..2280785f 100644 --- a/confluent_server/confluent/auth.py +++ b/confluent_server/confluent/auth.py @@ -33,6 +33,7 @@ import msgpack import multiprocessing import os import pwd +import confluent.tasks as tasks import confluent.userutil as userutil import confluent.util as util pam = None @@ -316,7 +317,7 @@ async def check_user_passphrase(name, passphrase, operation=None, element=None, authworkers = ProcessPoolExecutor(max_workers=1) # multiprocessing.Pool(processes=1) else: authcleaner.cancel() - authcleaner = util.spawn_after(30, _clean_authworkers) + authcleaner = tasks.spawn_task_after(30, _clean_authworkers) crypted = await _do_pbkdf(passphrase, salt) del _passchecking[(user, tenant)] await asyncio.sleep( diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 3a327f0a..dd74d71c 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -23,6 +23,7 @@ import confluent.log as log import confluent.noderange as noderange import confluent.asynctlvdata as tlvdata import confluent.util as util +import confluent.tasks as tasks import socket import ssl import confluent.sortutil as sortutil @@ -194,7 +195,7 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre currentleader = leader #spawn this as a thread... #remote.settimeout(90) - follower = util.spawn(follow_leader(remote, leader)) + follower = tasks.spawn_task(follow_leader(remote, leader)) return True @@ -228,7 +229,7 @@ async def follow_leader(remote, leader): await cfm.stop_following() currentleader = None if retrythread is None: # start a recovery - retrythread = util.spawn_after( + retrythread = tasks.spawn_task_after( random.random(), start_collective) async def _create_tls_connection(host, port): @@ -476,7 +477,7 @@ async def handle_connection(connection, cert, request, local=False): f.close() log.log({'info': 'Connecting to collective due to join', 'subsystem': 'collective'}) - util.spawn(connect_to_leader(rsp['collective'][ + tasks.spawn(connect_to_leader(rsp['collective'][ 'fingerprint'], name)) if 'enroll' == operation: async with enrolling: @@ -591,7 +592,7 @@ async def handle_connection(connection, cert, request, local=False): await connection[1].wait_closed() if not await connect_to_leader(None, None, leader=newleader): if retrythread is None: - retrythread = util.spawn_after(random.random(), + retrythread = tasks.spawn_task_after(random.random(), start_collective) if 'getinfo' == operation: drone = request['name'] @@ -656,7 +657,7 @@ async def handle_connection(connection, cert, request, local=False): if not await connect_to_leader( None, None, peername): if retrythread is None: - retrythread = util.spawn_after(5 + random.random(), + retrythread = tasks.spawn_task_after(5 + random.random(), start_collective) return if retrythread is not None: @@ -691,7 +692,7 @@ async def handle_connection(connection, cert, request, local=False): 'subsystem': 'collective'}) if retrythread is None: # start a recovery if everyone else seems # to have disappeared - retrythread = util.spawn_after(5 + random.random(), + retrythread = tasks.spawn_task_after(5 + random.random(), start_collective) # ok, we have a connecting member whose certificate checks out # He needs to bootstrap his configuration and subscribe it to updates @@ -741,7 +742,7 @@ async def try_assimilate(drone, followcount, remote): cnn = remote[1].transport.get_extra_info('socket') if not await connect_to_leader(None, None, leader=cnn.getpeername()[0]): if retrythread is None: - retrythread = util.spawn_after(random.random(), + retrythread = tasks.spawn_task_after(random.random(), start_collective) return False if 'leader' in answer: @@ -803,7 +804,7 @@ async def become_leader(connection): skipaddr = cnn.getpeername()[0] if reassimilate is not None: reassimilate.cancel() - reassimilate = util.spawn(reassimilate_missing()) + reassimilate = tasks.spawn_task(reassimilate_missing()) cfm._ready = True if await _assimilate_missing(skipaddr): schedule_rebalance() @@ -835,7 +836,7 @@ async def _assimilate_missing(skipaddr=None): return True connections = [] for ct in connecto: - connections.append(util.spawn(create_connection(ct))) + connections.append(tasks.spawn_task(create_connection(ct))) for ent in connections: ent = await ent member, remote = ent @@ -851,7 +852,7 @@ def startup(): if len(members) < 2: # Not in collective mode, return return - util.spawn(start_collective()) + tasks.spawn(start_collective()) async def check_managers(): global failovercheck @@ -903,7 +904,7 @@ def schedule_rebalance(): global failovercheck if not failovercheck: failovercheck = True - failovercheck = util.spawn_after(10, check_managers) + failovercheck = tasks.spawn_task_after(10, check_managers) async def start_collective(): global follower @@ -939,7 +940,7 @@ async def start_collective(): connecto.append(ldrcandidate) connections = [] for ct in connecto: - connections.append(util.spawn(create_connection(ct))) + connections.append(tasks.spawn_task(create_connection(ct))) pnding = connections while pnding: rdy, pnding = await asyncio.wait(pnding, return_when=asyncio.FIRST_COMPLETED) @@ -961,6 +962,6 @@ async def start_collective(): finally: if retrythread is None and follower is None: #retrythread = asyncio.create_task(start_collective()) - retrythread = util.spawn_after(5 + random.random(), + retrythread = tasks.spawn_task_after(5 + random.random(), start_collective) initting = False diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 992466d6..a657687b 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -72,6 +72,7 @@ import confluent.config.attributes as allattributes import confluent.config.conf as conf import confluent.noderange as noderange import confluent.util +import confluent.tasks as tasks import confluent.netutil as netutil import confluent.exceptions as exc import confluent.log @@ -2174,7 +2175,7 @@ class ConfigManager(object): for watcher in notifdata: watcher = notifdata[watcher] callback = watcher['callback'] - confluent.util.spawn(_do_notifier(self, watcher, callback)) + tasks.spawn(_do_notifier(self, watcher, callback)) async def del_nodes(self, nodes): if isinstance(nodes, set): @@ -2330,7 +2331,7 @@ class ConfigManager(object): nodecollwatchers = self._nodecollwatchers[self.tenant] for watcher in nodecollwatchers: watcher = nodecollwatchers[watcher] - confluent.util.spawn(_do_add_watcher(watcher, (), self, renamemap)) + tasks.spawn(_do_add_watcher(watcher, (), self, renamemap)) self._bg_sync_to_file() async def rename_nodegroups(self, renamemap): @@ -2525,7 +2526,7 @@ class ConfigManager(object): nodecollwatchers = self._nodecollwatchers[self.tenant] for watcher in nodecollwatchers: watcher = nodecollwatchers[watcher] - confluent.util.spawn(_do_add_watcher(watcher, newnodes, self)) + tasks.spawn(_do_add_watcher(watcher, newnodes, self)) self._bg_sync_to_file() #TODO: wait for synchronization to suceed/fail??) diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index 501ad21d..7385e6c9 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -31,6 +31,7 @@ import confluent.interface.console as conapi import confluent.log as log import confluent.core as plugin import confluent.asynctlvdata as tlvdata +import confluent.tasks as tasks import confluent.util as util import socket import random @@ -160,7 +161,7 @@ class ConsoleHandler(object): if self._genwatchattribs: self._attribwatcher = self.cfgmgr.watch_attributes( (self.node,), self._genwatchattribs, self._attribschanged) - util.spawn(self.ondemand_init()) + tasks.spawn(self.ondemand_init()) async def ondemand_init(self): await self.check_isondemand() @@ -325,7 +326,7 @@ class ConsoleHandler(object): if self.connectionthread: self.connectionthread.cancel() self.connectionthread = None - self.connectionthread = util.spawn(self._connect_backend()) + self.connectionthread = tasks.spawn_task(self._connect_backend()) async def _connect_backend(self): if self._console: @@ -393,7 +394,7 @@ class ConsoleHandler(object): 'error': self.error}) retrytime = self._get_retry_time() if not self.reconnect: - self.reconnect = util.spawn_after(retrytime, self._connect) + self.reconnect = tasks.spawn_task_after(retrytime, self._connect) return except (exc.TargetEndpointUnreachable, socket.gaierror) as se: self.clearbuffer() @@ -403,7 +404,7 @@ class ConsoleHandler(object): 'error': self.error}) retrytime = self._get_retry_time() if not self.reconnect: - self.reconnect = util.spawn_after(retrytime, self._connect) + self.reconnect = tasks.spawn_task_after(retrytime, self._connect) return except Exception: self.clearbuffer() @@ -415,7 +416,7 @@ class ConsoleHandler(object): 'error': self.error}) retrytime = self._get_retry_time() if not self.reconnect: - self.reconnect = util.spawn_after(retrytime, self._connect) + self.reconnect = tasks.spawn_task_after(retrytime, self._connect) return await self._got_connected() @@ -713,7 +714,7 @@ class ProxyConsole(object): await tlvdata.recv(remote) await tlvdata.send(remote, termreq) self.remote = remote - util.spawn(self.relay_data()) + tasks.spawn(self.relay_data()) async def detachsession(self, session): # we will disappear, so just let that happen... @@ -774,11 +775,11 @@ class ConsoleSession(object): self._evt = None self.node = node self.write = self.conshdl.write - util.spawn(self.delayinit(datacallback, skipreplay)) + tasks.spawn(self.delayinit(datacallback, skipreplay)) async def delayinit(self, datacallback, skipreplay): if datacallback is None: - self.reaper = util.spawn_after(15, self.destroy) + self.reaper = tasks.spawn_task_after(15, self.destroy) self.databuffer = collections.deque([]) self.data_handler = self.got_data if not skipreplay: diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 36624224..3af338fc 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -55,6 +55,7 @@ try: import confluent.shellmodule as shellmodule except ImportError: pass +import confluent.tasks as tasks import confluent.util as util import inspect import itertools @@ -885,7 +886,7 @@ async def handle_dispatch(connection, cert, dispatch, peername): await connection[1].wait_closed() return xmitlock = asyncio.Lock() - keepalive = util.spawn(_keepalivefn(connection, xmitlock)) + keepalive = tasks.spawn_task(_keepalivefn(connection, xmitlock)) dispatch = msgpack.unpackb(dispatch[2:], raw=False) configmanager = cfm.ConfigManager(dispatch['tenant']) nodes = dispatch['nodes'] @@ -1120,7 +1121,7 @@ async def handle_node_request(configmanager, inputdata, operation, numworkers = 0 for hfunc in nodesbyhandler: numworkers += 1 - util.spawn(addtoqueue(passvalues, hfunc, {'nodes': nodesbyhandler[hfunc], + tasks.spawn(addtoqueue(passvalues, hfunc, {'nodes': nodesbyhandler[hfunc], 'element': pathcomponents, 'configmanager': configmanager, 'inputdata': _get_input_data(_plugin, pathcomponents, @@ -1128,7 +1129,7 @@ async def handle_node_request(configmanager, inputdata, operation, isnoderange, configmanager)})) for manager in nodesbymanager: numworkers += 1 - util.spawn(addtoqueue(passvalues, dispatch_request, { + tasks.spawn(addtoqueue(passvalues, dispatch_request, { 'nodes': nodesbymanager[manager], 'manager': manager, 'element': pathcomponents, 'configmanager': configmanager, 'inputdata': inputdata, 'operation': operation, 'isnoderange': isnoderange})) diff --git a/confluent_server/confluent/debugger.py b/confluent_server/confluent/debugger.py index 2a82dc61..85ec283b 100644 --- a/confluent_server/confluent/debugger.py +++ b/confluent_server/confluent/debugger.py @@ -3,7 +3,7 @@ import code import os import socket import sys -import confluent.util as util +import confluent.tasks as tasks #this will ultimately fill the role of the 'backdoor' of eventlet @@ -58,7 +58,7 @@ async def srv_debug(sock): cloop = asyncio.get_event_loop() while True: cnn, addr = await cloop.sock_accept(sock) - util.spawn(interact(cloop, cnn)) + tasks.spawn(interact(cloop, cnn)) def start_dbgif(): @@ -76,4 +76,4 @@ def start_dbgif(): os.chmod("/var/run/confluent/dbg.sock", 0o600) os.umask(oumask) - util.spawn(srv_debug(unixsocket)) + tasks.spawn(srv_debug(unixsocket)) diff --git a/confluent_server/confluent/discovery/core.py b/confluent_server/confluent/discovery/core.py index e723942b..cf4b516e 100644 --- a/confluent_server/confluent/discovery/core.py +++ b/confluent_server/confluent/discovery/core.py @@ -82,6 +82,7 @@ import confluent.log as log import confluent.messages as msg import confluent.networking.macmap as macmap import confluent.noderange as noderange +import confluent.tasks as tasks import confluent.util as util import inspect import json @@ -688,7 +689,7 @@ async def _recheck_nodes_backend(nodeattribs, configmanager): if info['handler'] is None: next handler = info['handler'].NodeHandler(info, configmanager) - util.spawn(eval_node(configmanager, handler, info, nodename)) + tasks.spawn(eval_node(configmanager, handler, info, nodename)) except Exception: traceback.print_exc() log.log({'error': 'Unexpected error during discovery of {0}, check debug ' @@ -733,7 +734,7 @@ async def _recheck_single_unknown_info(configmanager, info): # if cancel did not result in dead, then we are in progress if rechecker is None or rechecker.done(): rechecktime = util.monotonic_time() + 300 - rechecker = util.spawn_after(300, _periodic_recheck, + rechecker = tasks.spawn_task_after(300, _periodic_recheck, configmanager) return nodename, info['maccount'] = await get_nodename(configmanager, handler, info) @@ -748,7 +749,7 @@ async def _recheck_single_unknown_info(configmanager, info): known_nodes[nodename][info['hwaddr']] = info info['discostatus'] = 'discovered' return # already known, no need for more - util.spawn(eval_node(configmanager, handler, info, nodename)) + tasks.spawn(eval_node(configmanager, handler, info, nodename)) def safe_detected(info): @@ -757,7 +758,7 @@ def safe_detected(info): if info['hwaddr'] in runningevals: # Do not evaluate the same mac multiple times at once return - runningevals[info['hwaddr']] = util.spawn(eval_detected(info)) + runningevals[info['hwaddr']] = tasks.spawn_task(eval_detected(info)) async def eval_detected(info): @@ -786,7 +787,7 @@ async def detected(info): return if (handler and not handler.NodeHandler.adequate(info) and info.get('protocol', None)): - util.spawn_after(10, info['protocol'].fix_info, info, + tasks.spawn_after(10, info['protocol'].fix_info, info, safe_detected) return if info['hwaddr'] in known_info and 'addresses' in info: @@ -865,7 +866,7 @@ async def detected(info): rechecker.cancel() if rechecker is None or rechecker.done(): rechecktime = util.monotonic_time() + 300 - rechecker = util.spawn_after(300, _periodic_recheck, cfg) + rechecker = tasks.spawn_task_after(300, _periodic_recheck, cfg) unknown_info[info['hwaddr']] = info info['discostatus'] = 'unidentfied' #TODO, eventlet spawn after to recheck sooner, or somehow else @@ -1461,7 +1462,7 @@ async def discover_node(cfg, handler, info, nodename, manual): info['discostatus'] = 'discovered' for i in pending_by_uuid.get(curruuid, []): - util.spawn(_recheck_single_unknown_info(cfg, i)) + tasks.spawn(_recheck_single_unknown_info(cfg, i)) try: del pending_by_uuid[curruuid] except KeyError: @@ -1550,7 +1551,7 @@ async def _handle_nodelist_change(configmanager): await _recheck_nodes((), configmanager) if needaddhandled: needaddhandled = False - nodeaddhandler = util.spawn(_handle_nodelist_change(configmanager)) + nodeaddhandler = tasks.spawn_task(_handle_nodelist_change(configmanager)) else: nodeaddhandler = None @@ -1578,7 +1579,7 @@ async def newnodes(added, deleting, renamed, configmanager): if nodeaddhandler: needaddhandled = True else: - nodeaddhandler = util.spawn(_handle_nodelist_change(configmanager)) + nodeaddhandler = tasks.spawn_task(_handle_nodelist_change(configmanager)) @@ -1600,7 +1601,7 @@ async def _periodic_recheck(configmanager): # for rechecker was requested in the course of recheck_nodes if rechecker is None: rechecktime = util.monotonic_time() + 900 - rechecker = util.spawn_after(900, _periodic_recheck, + rechecker = tasks.spawn_task_after(900, _periodic_recheck, configmanager) @@ -1610,7 +1611,7 @@ async def rescan(): if scanner: return else: - scanner = util.spawn(blocking_scan()) + scanner = tasks.spawn_task(blocking_scan()) await remotescan() async def remotescan(): @@ -1625,8 +1626,8 @@ async def remotescan(): async def blocking_scan(): global scanner - slpscan = util.spawn(slp.active_scan(safe_detected, slp)) - ssdpscan = util.spawn(ssdp.active_scan(safe_detected, ssdp)) + slpscan = tasks.spawn_task(slp.active_scan(safe_detected, slp)) + ssdpscan = tasks.spawn_task(ssdp.active_scan(safe_detected, ssdp)) await slpscan await ssdpscan #ssdpscan.wait() @@ -1649,20 +1650,20 @@ def start_detection(): start_autosense() if rechecker is None: rechecktime = util.monotonic_time() + 900 - rechecker = util.spawn_after(900, _periodic_recheck, cfg) - util.spawn(ssdp.snoop(safe_detected, None, ssdp, get_node_by_uuid_or_mac)) + rechecker = tasks.spawn_task_after(900, _periodic_recheck, cfg) + tasks.spawn(ssdp.snoop(safe_detected, None, ssdp, get_node_by_uuid_or_mac)) def stop_autosense(): for watcher in list(autosensors): - watcher.kill() + watcher.cancel() autosensors.discard(watcher) def start_autosense(): - autosensors.add(util.spawn(slp.snoop(safe_detected, slp))) + autosensors.add(tasks.spawn_task(slp.snoop(safe_detected, slp))) #autosensors.add(eventlet.spawn(mdns.snoop, safe_detected, mdns)) - util.spawn(pxe.snoop(safe_detected, pxe, get_node_guess_by_uuid)) + tasks.spawn(pxe.snoop(safe_detected, pxe, get_node_guess_by_uuid)) #autosensors.add(eventlet.spawn(pxe.snoop, safe_detected, pxe, get_node_guess_by_uuid)) - util.spawn(remotescan()) + tasks.spawn(remotescan()) nodes_by_fprint = {} diff --git a/confluent_server/confluent/discovery/protocols/pxe.py b/confluent_server/confluent/discovery/protocols/pxe.py index ea6739e6..92a85a50 100644 --- a/confluent_server/confluent/discovery/protocols/pxe.py +++ b/confluent_server/confluent/discovery/protocols/pxe.py @@ -30,6 +30,7 @@ import confluent.neighutil as neighutil import confluent.log as log import confluent.netutil as netutil import confluent.util as util +import confluent.tasks as tasks import ctypes import ctypes.util import netifaces @@ -335,7 +336,7 @@ async def proxydhcp(handler, nodeguess): def start_proxydhcp(handler, nodeguess=None): - util.spawn(proxydhcp(handler, nodeguess)) + tasks.spawn(proxydhcp(handler, nodeguess)) def new_dhcp_packet(handler, nodeguess, cfg, net4): @@ -360,7 +361,7 @@ def new_dhcp6_packet(handler, net6, cfg, nodeguess): return rqv = memoryview(pkt) if rqv[0] in (1, 3): - util.spawn(process_dhcp6req(handler, rqv, addr, net6, cfg, nodeguess)) + tasks.spawn(process_dhcp6req(handler, rqv, addr, net6, cfg, nodeguess)) async def snoop(handler, protocol=None, nodeguess=None): diff --git a/confluent_server/confluent/discovery/protocols/slp.py b/confluent_server/confluent/discovery/protocols/slp.py index 76d534fd..826044c7 100644 --- a/confluent_server/confluent/discovery/protocols/slp.py +++ b/confluent_server/confluent/discovery/protocols/slp.py @@ -16,6 +16,7 @@ import asyncio import confluent.neighutil as neighutil +import confluent.tasks as tasks import confluent.util as util import confluent.log as log import os @@ -673,7 +674,7 @@ async def scan(srvtypes=_slp_services, addresses=None, localonly=False): break else: continue - tsks.append(util.spawn(_add_attributes(rsps[id]))) + tsks.append(tasks.spawn_task(_add_attributes(rsps[id]))) handleids.add(id) if tsks: await asyncio.wait(tsks) diff --git a/confluent_server/confluent/discovery/protocols/ssdp.py b/confluent_server/confluent/discovery/protocols/ssdp.py index 1205726d..144c1b7a 100644 --- a/confluent_server/confluent/discovery/protocols/ssdp.py +++ b/confluent_server/confluent/discovery/protocols/ssdp.py @@ -36,6 +36,7 @@ import confluent.noderange as noderange import confluent.util as util import confluent.log as log import confluent.netutil as netutil +import confluent.tasks as tasks import socket import os import time @@ -121,7 +122,7 @@ def _process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byeha else: return if handler: - util.spawn(check_fish_handler(handler, peerdata, known_peers, newmacs, peerbymacaddress, machandlers, mac, peer, targurl, targtype)) + tasks.spawn(check_fish_handler(handler, peerdata, known_peers, newmacs, peerbymacaddress, machandlers, mac, peer, targurl, targtype)) async def check_fish_handler(handler, peerdata, known_peers, newmacs, peerbymacaddress, machandlers, mac, peer, targurl, targtype): retdata = await check_fish(('/DeviceDescription.json', peerdata, targtype)) @@ -468,7 +469,7 @@ async def _find_service(service, target): # pooltargs.append(('/redfish/v1/', peerdata[nid])) tsks = [] for targ in pooltargs: - tsks.append(util.spawn_task(check_fish(targ))) + tsks.append(tasks.spawn_task(check_fish(targ))) while tsks: done, tsks = await asyncio.wait(tsks, return_when=asyncio.FIRST_COMPLETED) for dt in done: diff --git a/confluent_server/confluent/log.py b/confluent_server/confluent/log.py index b159853d..fd60a803 100644 --- a/confluent_server/confluent/log.py +++ b/confluent_server/confluent/log.py @@ -68,6 +68,7 @@ import collections import confluent.config.configmanager import confluent.config.conf as conf import confluent.exceptions as exc +import confluent.tasks as tasks import inspect import glob import json @@ -116,40 +117,6 @@ MIDNIGHT = 24 * 60 * 60 _loggers = {} -async def _sleep_and_run(sleeptime, func, args): - await asyncio.sleep(sleeptime) - awt = func(*args) - if inspect.isawaitable(awt): - await awt - - -def spawn_after(sleeptime, func, *args): - if func is None: - raise Exception('tf') - return spawn(_sleep_and_run(sleeptime, func, args)) - - -tsks = {} - - -def spawn(coro): - tskid = random.random() - while tskid in tsks: - tskid = random.random() - tsks[tskid] = 1 - try: - tsks[tskid] = asyncio.create_task(_run(coro, tskid), name=repr(coro)) - except AttributeError: - tsks[tskid] = asyncio.get_event_loop().create_task(_run(coro, tskid), name=repr(coro)) - return tsks[tskid] - - -async def _run(coro, taskid): - ret = await coro - del tsks[taskid] - return ret - - class Events(object): ( undefined, clearscreen, clientconnect, clientdisconnect, @@ -665,7 +632,7 @@ class Logger(object): self.logentries.appendleft([DataTypes.event, tstamp, roll_data, Events.logrollover, None]) if self.closer is None: - self.closer = spawn_after(15, self.closelog) + self.closer = tasks.spawn_task_after(15, self.closelog) self.writer = None def read_recent_text(self, size): @@ -814,7 +781,7 @@ class Logger(object): [ltype, timestamp, logdata, event, eventdata]) if self.buffered: if self.writer is None: - self.writer = spawn_after(2, self.writedata) + self.writer = tasks.spawn_task_after(2, self.writedata) else: self.writedata() @@ -840,3 +807,5 @@ def logtrace(): tracelog = Logger('trace', buffered=False) tracelog.log(traceback.format_exc(), ltype=DataTypes.event, event=Events.stacktrace) + +tasks.logtrace = logtrace diff --git a/confluent_server/confluent/networking/macmap.py b/confluent_server/confluent/networking/macmap.py index 2f79abe7..b23c80b6 100644 --- a/confluent_server/confluent/networking/macmap.py +++ b/confluent_server/confluent/networking/macmap.py @@ -53,6 +53,7 @@ import confluent.exceptions as exc import confluent.log as log import confluent.messages as msg import confluent.noderange as noderange +import confluent.tasks as tasks import confluent.util as util import fcntl import msgpack @@ -471,7 +472,7 @@ async def update_macmap(configmanager, impatient=False): except GeneratorExit: # the calling function has stopped caring, but we want to finish # the sweep, background it - util.spawn(_finish_update(completions)) + tasks.spawn(_finish_update(completions)) raise @@ -586,7 +587,7 @@ def handle_api_request(configmanager, inputdata, operation, pathcomponents): pathcomponents == ['networking', 'macs', 'rescan']): if inputdata != {'rescan': 'start'}: raise exc.InvalidArgumentException('Input must be rescan=start') - util.spawn(rescan(configmanager)) + tasks.spawn(rescan(configmanager)) return [msg.KeyValueData({'rescan': 'started'})] raise exc.NotImplementedException( 'Operation {0} on {1} not implemented'.format( @@ -722,7 +723,7 @@ async def offloader_main(cloop): data = await sreader.read(512) upacker.feed(data) for cmd in upacker: - util.spawn(_snmp_map_switch_relay(*cmd)) + tasks.spawn(_snmp_map_switch_relay(*cmd)) sys.exit(0) async def test_main(cloop): diff --git a/confluent_server/confluent/osimage.py b/confluent_server/confluent/osimage.py index 726679f2..5e4cb41f 100644 --- a/confluent_server/confluent/osimage.py +++ b/confluent_server/confluent/osimage.py @@ -19,6 +19,7 @@ if __name__ == '__main__': import confluent.exceptions as exc import confluent.messages as msg +import confluent.tasks as tasks import confluent.util as util COPY = 1 @@ -847,7 +848,7 @@ async def generate_stock_profiles(defprofile, distpath, targpath, osname, await util.check_call( 'sh', '{0}/initprofile.sh'.format(dirname), targpath, dirname) - bootupdates.append(util.spawn(update_boot(dirname))) + bootupdates.append(tasks.spawn_task(update_boot(dirname))) profilelist.append(profname) for upd in bootupdates: await upd @@ -911,7 +912,7 @@ class MediaImporter(object): importing[importkey] = self self.filename = os.path.abspath(media) self.error = '' - self.importer = util.spawn(self.importmedia()) + self.importer = tasks.spawn_task(self.importmedia()) def stop(self): if self.worker and self.worker.returncode is None: diff --git a/confluent_server/confluent/plugins/console/openbmc.py b/confluent_server/confluent/plugins/console/openbmc.py index ad6c0fbd..baf86ef1 100644 --- a/confluent_server/confluent/plugins/console/openbmc.py +++ b/confluent_server/confluent/plugins/console/openbmc.py @@ -23,6 +23,7 @@ import asyncio import confluent.exceptions as cexc import confluent.interface.console as conapi import confluent.log as log +import confluent.tasks as tasks import confluent.util as util import aiohmi.exceptions as pygexc import aiohmi.redfish.command as rcmd @@ -129,7 +130,7 @@ class OpenBmcConsole(conapi.Console): self.ws = await self.clisess.ws_connect('wss://{0}/console0'.format(self.bmc), protocols=protos, ssl=self.ssl) #self.ws.connect('wss://{0}/console0'.format(self.bmc), host=bmc, cookie='XSRF-TOKEN={0}; SESSION={1}'.format(wc.cookies['XSRF-TOKEN'], wc.cookies['SESSION']), subprotocols=[wc.cookies['XSRF-TOKEN']]) self.connected = True - self.recvr = util.spawn_task(self.recvdata()) + self.recvr = tasks.spawn_task(self.recvdata()) return async def write(self, data): diff --git a/confluent_server/confluent/plugins/hardwaremanagement/affluent.py b/confluent_server/confluent/plugins/hardwaremanagement/affluent.py index f3e5e7a4..d286ea73 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/affluent.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/affluent.py @@ -19,6 +19,7 @@ import socket import confluent.exceptions as exc import aiohmi.util.webclient as webclient import confluent.messages as msg +import confluent.tasks as tasks import confluent.util as util class SwitchSensor(object): @@ -120,7 +121,7 @@ def _run_method(method, workers, results, configmanager, nodes, element): creds = configmanager.get_node_attributes( nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True) for node in nodes: - workers.add(util.spawn(method(configmanager, creds, + workers.add(tasks.spawn_task(method(configmanager, creds, node, results, element))) async def retrieve(nodes, element, configmanager, inputdata): diff --git a/confluent_server/confluent/plugins/shell/ssh.py b/confluent_server/confluent/plugins/shell/ssh.py index b07e12a9..64b71026 100644 --- a/confluent_server/confluent/plugins/shell/ssh.py +++ b/confluent_server/confluent/plugins/shell/ssh.py @@ -22,6 +22,7 @@ import confluent.exceptions as cexc import confluent.interface.console as conapi import confluent.log as log +import confluent.tasks as tasks import confluent.util as util import hashlib @@ -78,7 +79,7 @@ class SshShell(conapi.Console): def logon(self): self.inputmode = -3 - util.spawn(self.do_logon()) + tasks.spawn(self.do_logon()) async def do_logon(self): sco = asyncssh.SSHClientConnectionOptions() @@ -120,7 +121,7 @@ class SshShell(conapi.Console): self.connected = True await self.datacallback('Connected\r\n') self.shell = await self.ssh.open_session(term_type='vt100', term_size=(self.width, self.height)) - self.rxthread = util.spawn(self.recvdata()) + self.rxthread = tasks.spawn_task(self.recvdata()) async def write(self, data): if self.inputmode == -2: diff --git a/confluent_server/confluent/shellserver.py b/confluent_server/confluent/shellserver.py index 5f95bc2d..d0333fef 100644 --- a/confluent_server/confluent/shellserver.py +++ b/confluent_server/confluent/shellserver.py @@ -22,7 +22,7 @@ import confluent.consoleserver as consoleserver import confluent.exceptions as exc import confluent.messages as msg -import confluent.util as util +import confluent.tasks as tasks activesessions = {} @@ -48,7 +48,7 @@ class _ShellHandler(consoleserver.ConsoleHandler): def _got_disconnected(self): self.connectstate = 'closed' - util.spawn(self._bgdisconnect()) + tasks.spawn(self._bgdisconnect()) async def _bgdisconnect(self): await self._send_rcpts({'connectstate': self.connectstate}) diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 0c84461a..f942a5b6 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -46,6 +46,7 @@ import confluent.log as log import confluent.core as pluginapi import confluent.shellserver as shellserver import confluent.collective.manager as collective +import confluent.tasks as tasks import confluent.util as util tracelog = None @@ -386,9 +387,9 @@ async def _tlshandler(bind_host, bind_port): while (1): # TODO: exithook cnn, addr = await cloop.sock_accept(plainsocket) if addr[1] < 1000: - asyncio.create_task(cs.handle_client(cnn, addr)) + tasks.spawn(cs.handle_client(cnn, addr)) else: - asyncio.create_task(_tlsstartup(cnn)) + tasks.spawn(_tlsstartup(cnn)) @ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p) @@ -443,7 +444,7 @@ async def _tlsstartup(cnn): #cnn = ctx.wrap_socket(cnn, server_side=True) except AttributeError: raise Exception('Unable to find workable SSL support') - asyncio.create_task(sessionhdl(cnn, authname, cert=cert)) + tasks.spawn(sessionhdl(cnn, authname, cert=cert)) def removesocket(): try: @@ -489,7 +490,7 @@ async def _unixdomainhandler(): except KeyError: cnn.close() return - util.spawn(sessionhdl(cnn, authname, skipauth)) + tasks.spawn(sessionhdl(cnn, authname, skipauth)) #asyncio.create_task(sessionhdl(cnn, authname, skipauth)) @@ -510,8 +511,8 @@ class SockApi(object): self.start_remoteapi() else: cloop = asyncio.get_event_loop() - cloop.create_task(self.watch_for_cert()) - self.unixdomainserver = asyncio.create_task(_unixdomainhandler()) + tasks.spawn(self.watch_for_cert()) + self.unixdomainserver = tasks.spawn_task(_unixdomainhandler()) async def watch_for_cert(self): watcher = libc.inotify_init1(os.O_NONBLOCK) @@ -547,5 +548,5 @@ class SockApi(object): def start_remoteapi(self): if self.tlsserver is not None: return - self.tlsserver = asyncio.get_event_loop().create_task( + self.tlsserver = tasks.spawn_task( _tlshandler(self.bind_host, self.bind_port)) diff --git a/confluent_server/confluent/util.py b/confluent_server/confluent/util.py index 4c566ea1..5bde1f87 100644 --- a/confluent_server/confluent/util.py +++ b/confluent_server/confluent/util.py @@ -29,6 +29,7 @@ import ssl import struct import random import subprocess +import confluent.tasks as tasks def mkdirp(path, mode=0o777): @@ -45,55 +46,8 @@ async def check_call(*cmd, **kwargs): if rc != 0: raise subprocess.CalledProcessError(rc, cmd) -async def _sleep_and_run(sleeptime, func, args): - await asyncio.sleep(sleeptime) - await func(*args) -def spawn_after(sleeptime, func, *args): - if func is None: - raise Exception('tf') - return spawn(_sleep_and_run(sleeptime, func, args)) - -tsks = {} - - -tasksitter = None - - -async def _sit_tasks(): - while True: - while not tsks: - await asyncio.sleep(15) - tsk_list = [tsks[x] for x in tsks] - cmpl, pnding = await asyncio.wait(tsk_list, return_when=asyncio.FIRST_COMPLETED, timeout=15) - for tskid in list(tsks): - if tsks[tskid].done(): - try: - tsk = tsks[tskid] - del tsks[tskid] - await tsk - except Exception as e: - print(repr(e)) - - -def spawn_task(coro): - try: - return asyncio.create_task(coro) - except AttributeError: - return asyncio.get_event_loop().create_task(coro) - - -def spawn(coro): - global tasksitter - if not tasksitter: - tasksitter = spawn_task(_sit_tasks()) - tskid = random.random() - while tskid in tsks: - tskid = random.random() - tsks[tskid] = spawn_task(coro) - return tsks[tskid] - async def check_output(*cmd): @@ -264,7 +218,7 @@ class TLSCertVerifier(object): auditlog = log.Logger('audit') auditlog.log({'node': self.node, 'event': 'certautoadd', 'fingerprint': fingerprint}) - spawn(self.cfm.set_node_attributes( + tasks.spawn(self.cfm.set_node_attributes( {self.node: {self.fieldname: fingerprint}})) return True elif cert_matches(storedprint[self.node][self.fieldname]['value'],