diff --git a/confluent_server/bin/confluent b/confluent_server/bin/confluent index 9891e4aa..b5f56fe2 100755 --- a/confluent_server/bin/confluent +++ b/confluent_server/bin/confluent @@ -18,10 +18,7 @@ import asyncio import sys import os -#import eventlet -#import eventlet.hubs -#eventlet.hubs.use_hub("eventlet.hubs.asyncio") -#from eventlet.asyncio import spawn_for_awaitable + path = os.path.dirname(os.path.realpath(__file__)) diff --git a/confluent_server/builddeb b/confluent_server/builddeb index 3a983694..842108c6 100755 --- a/confluent_server/builddeb +++ b/confluent_server/builddeb @@ -38,9 +38,9 @@ if [ "$OPKGNAME" = "confluent-server" ]; then if grep wheezy /etc/os-release; then sed -i 's/^\(Depends:.*\)/\1, python-confluent-client, python-lxml, python-eficompressor, python-pycryptodomex, python-dateutil, python-pyopenssl, python-msgpack/' debian/control elif grep jammy /etc/os-release; then - sed -i 's/^\(Depends:.*\)/\1, confluent-client, python3-lxml, python3-eficompressor, python3-pycryptodome, python3-websocket, python3-msgpack, python3-eventlet, python3-pyparsing, python3-pyghmi(>=1.5.71), python3-paramiko, python3-pysnmp4, python3-libarchive-c, confluent-vtbufferd, python3-netifaces, python3-yaml, python3-dateutil/' debian/control + sed -i 's/^\(Depends:.*\)/\1, confluent-client, python3-lxml, python3-eficompressor, python3-pycryptodome, python3-websocket, python3-msgpack, python3-aiohttp, python3-pyparsing, python3-pyghmi(>=1.5.71), python3-paramiko, python3-pysnmp4, python3-libarchive-c, confluent-vtbufferd, python3-netifaces, python3-yaml, python3-dateutil/' debian/control else - sed -i 's/^\(Depends:.*\)/\1, confluent-client, python3-lxml, python3-eficompressor, python3-pycryptodome, python3-websocket, python3-msgpack, python3-eventlet, python3-pyparsing, python3-pyghmi(>=1.5.71), python3-paramiko, python3-pysnmp4, python3-libarchive-c, confluent-vtbufferd, python3-netifaces, python3-yaml, python3-dateutil, python3-pyasyncore/' debian/control + sed -i 's/^\(Depends:.*\)/\1, confluent-client, python3-lxml, python3-eficompressor, python3-pycryptodome, python3-websocket, python3-msgpack, python3-aiohttp, python3-pyparsing, python3-pyghmi(>=1.5.71), python3-paramiko, python3-pysnmp4, python3-libarchive-c, confluent-vtbufferd, python3-netifaces, python3-yaml, python3-dateutil, python3-pyasyncore/' debian/control fi if grep wheezy /etc/os-release; then echo 'confluent_client python-confluent-client' >> debian/pydist-overrides diff --git a/confluent_server/confluent/alerts.py b/confluent_server/confluent/alerts.py index 81328470..8a4eb0f9 100644 --- a/confluent_server/confluent/alerts.py +++ b/confluent_server/confluent/alerts.py @@ -34,7 +34,7 @@ import confluent.exceptions as exc import confluent.lookuptools as lookuptools import confluent.core -def decode_alert(varbinds, configmanager): +async def decode_alert(varbinds, configmanager): """Decode an SNMP alert for a server Given the agentaddr, OID for the trap, and a dict of varbinds, @@ -49,11 +49,11 @@ def decode_alert(varbinds, configmanager): agentaddr = varbinds['.1.3.6.1.6.3.18.1.3.0'] except KeyError: agentaddr = varbinds['1.3.6.1.6.3.18.1.3.0'] - node = lookuptools.node_by_manager(agentaddr) + node = await lookuptools.node_by_manager(agentaddr) if node is None: raise exc.InvalidArgumentException( 'Unable to find a node with specified manager') - return confluent.core.handle_path( + return await confluent.core.handle_path( '/nodes/{0}/events/hardware/decode'.format(node), 'update', configmanager, varbinds, autostrip=False) diff --git a/confluent_server/confluent/auth.py b/confluent_server/confluent/auth.py index 2280785f..9793616f 100644 --- a/confluent_server/confluent/auth.py +++ b/confluent_server/confluent/auth.py @@ -305,12 +305,6 @@ async def check_user_passphrase(name, passphrase, operation=None, element=None, # throw it at the worker pool when implemented # maybe a distinct worker pool, wondering about starving out non-auth stuff salt, crypt = ucfg['cryptpass'] - # execute inside tpool to get greenthreads to give it a special thread - # world - # TODO(jbjohnso): util function to generically offload a call - # such a beast could be passed into pyghmi as a way for pyghmi to - # magically get offload of the crypto functions without having - # to explicitly get into the eventlet tpool game global authworkers global authcleaner if authworkers is None: diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index 9400ac91..ffe1021c 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -570,21 +570,23 @@ class ConsoleHandler(object): await self._got_disconnected() -def disconnect_node(node, configmanager): +async def disconnect_node(node, configmanager): consk = (node, configmanager.tenant) if consk in _handled_consoles: - _handled_consoles[consk].close() + await _handled_consoles[consk].close() del _handled_consoles[consk] def _nodechange(added, deleting, renamed, configmanager): + async def _replace_node(old, new, cfm): + await disconnect_node(old, cfm) + await connect_node(new, cfm) for node in deleting: - eventlet.spawn(disconnect_node, node, configmanager) + tasks.spawn(disconnect_node(node, configmanager)) for node in renamed: - disconnect_node(node, configmanager) - eventlet.spawn(connect_node, renamed[node], configmanager) + tasks.spawn(_replace_node(node, renamed[node], configmanager)) for node in added: - eventlet.spawn(connect_node, node, configmanager) + tasks.spawn(connect_node(node, configmanager)) def _start_tenant_sessions(cfm): @@ -595,7 +597,7 @@ def _start_tenant_sessions(cfm): if manager and collective.get_myname() != manager: continue try: - connect_node(node, cfm) + await connect_node(node, cfm) except: _tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace) @@ -617,7 +619,7 @@ async def start_console_sessions(): configmodule.hook_new_configmanagers(_start_tenant_sessions) -def connect_node(node, configmanager, username=None, direct=True, width=80, +async def connect_node(node, configmanager, username=None, direct=True, width=80, height=24): attrval = configmanager.get_node_attributes(node, 'collective.manager') myc = attrval.get(node, {}).get('collective.manager', {}).get( diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 893793e1..0d34b332 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -1404,7 +1404,7 @@ class Staging: raise FileNotFoundError return directory -def handle_staging(pathcomponents, operation, configmanager, inputdata): +async def handle_staging(pathcomponents, operation, configmanager, inputdata): ''' e.g push_url: /confluent-api/staging/user/ ''' @@ -1433,10 +1433,11 @@ def handle_staging(pathcomponents, operation, configmanager, inputdata): with open(file, 'wb') as f: while remaining_length > 0: progress = (1 - (remaining_length/content_length)) * 100 + #TODO: ASYNC Need to change to aiohttp approach datachunk = filedata['wsgi.input'].read(min(chunk_size, remaining_length)) f.write(datachunk) remaining_length -= len(datachunk) - eventlet.sleep(0) + await asyncio.sleep(0) yield msg.FileUploadProgress(progress) yield msg.FileUploadProgress(100) @@ -1552,11 +1553,11 @@ async def handle_path(path, operation, configmanager, inputdata=None, autostrip= if element != 'decode': raise exc.NotFoundException() if operation == 'update': - return alerts.decode_alert(inputdata, configmanager) + return await alerts.decode_alert(inputdata, configmanager) elif pathcomponents[0] == 'discovery': return handle_discovery(pathcomponents[1:], operation, configmanager, inputdata) elif pathcomponents[0] == 'staging': - return handle_staging(pathcomponents, operation, configmanager, inputdata) + return await handle_staging(pathcomponents, operation, configmanager, inputdata) else: raise exc.NotFoundException() diff --git a/confluent_server/confluent/discovery/core.py b/confluent_server/confluent/discovery/core.py index eba2bc34..fd0bdd89 100644 --- a/confluent_server/confluent/discovery/core.py +++ b/confluent_server/confluent/discovery/core.py @@ -549,11 +549,9 @@ async def register_remote_addrs(addresses, configmanager): except Exception: return addr, False return addr, True - #rpool = eventlet.greenpool.GreenPool(512) for count in iterate_addrs(addresses, True): yield msg.ConfluentResourceCount(count) - return # ASYNC - for result in rpool.imap(register_remote_addr, iterate_addrs(addresses)): + async for result in tasks.task_imap(register_remote_addr, iterate_addrs(addresses), max_concurrent=512): if result[1]: yield msg.CreatedResource(result[0]) else: @@ -894,7 +892,7 @@ async def detected(info): rechecker = tasks.spawn_task_after(300, _periodic_recheck, cfg) unknown_info[info['hwaddr']] = info info['discostatus'] = 'unidentfied' - #TODO, eventlet spawn after to recheck sooner, or somehow else + #TODO, spawn after to recheck sooner, or somehow else # influence periodic recheck to shorten delay? return nodename, info['maccount'] = await get_nodename(cfg, handler, info) @@ -1715,9 +1713,7 @@ def stop_autosense(): def start_autosense(): autosensors.add(tasks.spawn_task(slp.snoop(safe_detected, slp))) - #autosensors.add(eventlet.spawn(mdns.snoop, safe_detected, mdns)) tasks.spawn(pxe.snoop(safe_detected, pxe, get_node_guess_by_uuid)) - #autosensors.add(eventlet.spawn(pxe.snoop, safe_detected, pxe, get_node_guess_by_uuid)) tasks.spawn(remotescan()) diff --git a/confluent_server/confluent/discovery/protocols/pxe.py b/confluent_server/confluent/discovery/protocols/pxe.py index b06c4ce4..a026d38c 100644 --- a/confluent_server/confluent/discovery/protocols/pxe.py +++ b/confluent_server/confluent/discovery/protocols/pxe.py @@ -44,8 +44,8 @@ import os import socket import struct import time -import traceback import uuid +import confluent.tasks as tasks libc = ctypes.CDLL(ctypes.util.find_library('c')) @@ -871,12 +871,12 @@ async def reply_dhcp4(node, info, packet, cfg, reqview, httpboot, cfd, profile, ipinfo = 'without address, served from {0}'.format(myip) if relayipa: ipinfo += ' (relayed to {} via {})'.format(relayipa, requestor[0]) - eventlet.spawn(send_rsp, repview, replen, requestor, relayip, reqview, info, deferanswer, isboot, node, boottype, ipinfo, sock) + tasks.spawn(send_rsp(repview, replen, requestor, relayip, reqview, info, deferanswer, isboot, node, boottype, ipinfo, sock)) -def send_rsp(repview, replen, requestor, relayip, reqview, info, defertxid, isboot, node, boottype, ipinfo, sock): +async def send_rsp(repview, replen, requestor, relayip, reqview, info, defertxid, isboot, node, boottype, ipinfo, sock): if defertxid: - eventlet.sleep(0.5) + await asyncio.sleep(0.5) if defertxid in _recent_txids: log.log({'info': 'Skipping reply for {} over interface {} due to better offer being made over other interface'.format(node, info['netinfo']['ifidx'])}) return diff --git a/confluent_server/confluent/forwarder.py b/confluent_server/confluent/forwarder.py index 52f85acb..5752efa9 100644 --- a/confluent_server/confluent/forwarder.py +++ b/confluent_server/confluent/forwarder.py @@ -17,9 +17,9 @@ #This handles port forwarding for web interfaces on management devices #It will also hijack port 3900 and do best effort.. -import eventlet -import eventlet.green.select as select -import eventlet.green.socket as socket +import asyncio +import socket +import confluent.tasks as tasks forwardersbyclient = {} relaysbysession = {} sessionsbyip = {} @@ -28,24 +28,37 @@ sockhandler = {} vidtargetbypeer = {} vidforwarder = None -def handle_connection(incoming, outgoing): - while True: - r, _, _ = select.select((incoming, outgoing), (), (), 60) - for mysock in r: - data = mysock.recv(32768) - if not data: - incoming.close() - outgoing.close() - return - if mysock == incoming: - outgoing.sendall(data) - elif mysock == outgoing: - incoming.sendall(data) +async def handle_connection(incoming, outgoing): + async def _relay(reader, writer): + try: + while True: + data = await reader.read(32768) + if not data: + return + writer.write(data) + await writer.drain() + except (ConnectionError, OSError): + return + + inrdr, inwriter = await asyncio.open_connection(sock=incoming) + outrdr, outwriter = await asyncio.open_connection(sock=outgoing) + try: + done, pending = await asyncio.wait( + [asyncio.ensure_future(_relay(inrdr, outwriter)), + asyncio.ensure_future(_relay(outrdr, inwriter))], + return_when=asyncio.FIRST_COMPLETED) + for task in pending: + task.cancel() + finally: + inwriter.close() + outwriter.close() -def forward_port(sock, target, clientip, sessionid): +async def forward_port(sock, target, clientip, sessionid): + loop = asyncio.get_event_loop() + sock.setblocking(False) while True: - conn, cli = sock.accept() + conn, cli = await loop.sock_accept(sock) if cli[0] != clientip: conn.close() continue @@ -57,14 +70,19 @@ def forward_port(sock, target, clientip, sessionid): continue if sessionid not in relaysbysession: relaysbysession[sessionid] = {} - relaysbysession[sessionid][eventlet.spawn( - handle_connection, conn, client)] = conn + relaysbysession[sessionid][tasks.spawn( + handle_connection(conn, client))] = conn -def forward_video(): - sock = eventlet.listen(('::', 3900, 0, 0), family=socket.AF_INET6) +async def forward_video(): + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('::', 3900, 0, 0)) + sock.listen(50) + loop = asyncio.get_event_loop() + sock.setblocking(False) while True: - conn, cli = sock.accept() + conn, cli = await loop.sock_accept(sock) if cli[0] not in vidtargetbypeer or not sessionsbyip.get(cli[0], None): conn.close() continue @@ -76,7 +94,7 @@ def forward_video(): conn.close() vidclient.close() continue - eventlet.spawn_n(handle_connection, conn, vidclient) + tasks.spawn(handle_connection(conn, vidclient)) def close_session(sessionid): @@ -124,10 +142,10 @@ def get_port(addr, clientip, sessionid): newport += 1 continue forwardersbyclient[sessionid][addr] = newsock - sockhandler[newsock] = eventlet.spawn(forward_port, newsock, addr, - clientip, sessionid) + sockhandler[newsock] = tasks.spawn(forward_port(newsock, addr, + clientip, sessionid)) if not vidforwarder: - vidforwarder = eventlet.spawn(forward_video) + vidforwarder = tasks.spawn(forward_video()) vidtargetbypeer[clientip] = addr return forwardersbyclient[sessionid][addr].getsockname()[1] diff --git a/confluent_server/confluent/lookuptools.py b/confluent_server/confluent/lookuptools.py index fee9dcc8..bcdf95fd 100644 --- a/confluent_server/confluent/lookuptools.py +++ b/confluent_server/confluent/lookuptools.py @@ -25,13 +25,13 @@ # service should have a null tenant and a tenant entry that correlates) __author__ = 'jjohnson2' +import asyncio import confluent.config.configmanager as configmanager import itertools -from eventlet.support import greendns manager_to_nodemap = {} -def node_by_manager(manager): +async def node_by_manager(manager): """Lookup a node by manager Search for a node according to a given network address. @@ -46,7 +46,7 @@ def node_by_manager(manager): """ manageraddresses = [] - for tmpaddr in greendns.getaddrinfo(manager, None): + for tmpaddr in await asyncio.get_event_loop().getaddrinfo(manager, None): manageraddresses.append(tmpaddr[4][0]) cfm = configmanager.ConfigManager(None) if manager in manager_to_nodemap: @@ -68,7 +68,7 @@ def node_by_manager(manager): if currhm in manageraddresses: manager_to_nodemap[manager] = node return node - for curraddr in greendns.getaddrinfo(currhm, None): + for curraddr in await asyncio.get_event_loop().getaddrinfo(currhm, None): curraddr = curraddr[4][0] if curraddr in manageraddresses: manager_to_nodemap[manager] = node diff --git a/confluent_server/confluent/mountmanager.py b/confluent_server/confluent/mountmanager.py index a4cc8b29..279ce3dc 100644 --- a/confluent_server/confluent/mountmanager.py +++ b/confluent_server/confluent/mountmanager.py @@ -1,6 +1,5 @@ import asyncio -import eventlet import confluent.messages as msg import confluent.exceptions as exc import struct diff --git a/confluent_server/confluent/networking/macmap.py b/confluent_server/confluent/networking/macmap.py index c244c159..aa578e48 100644 --- a/confluent_server/confluent/networking/macmap.py +++ b/confluent_server/confluent/networking/macmap.py @@ -17,7 +17,7 @@ # This provides the implementation of locating MAC addresses on ethernet # switches. It is, essentially, a port of 'MacMap.pm' to confluent. # However, there are enhancements. -# For one, each switch interrogation is handled in an eventlet 'thread' +# For one, each switch interrogation is handled in an coroutine # For another, MAC addresses are checked in the dictionary on every # switch return, rather than waiting for all switches to check in # (which makes it more responsive when there is a missing or bad switch) diff --git a/confluent_server/confluent/plugins/hardwaremanagement/enos.py b/confluent_server/confluent/plugins/hardwaremanagement/enos.py index 866390bd..7a28e171 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/enos.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/enos.py @@ -22,8 +22,8 @@ # - One power supply is off. import re -import eventlet -import eventlet.queue as queue +import asyncio +import confluent.tasks as tasks import confluent.exceptions as exc import confluent.messages as msg import confluent.util as util @@ -44,8 +44,8 @@ def _run_method(method, workers, results, configmanager, nodes, element): nodes, ["switchuser", "switchpass", "secret.hardwaremanagementpassword", "secret.hardwaremanagementuser"], decrypt=True) for node in nodes: - workers.add(eventlet.spawn(method, configmanager, creds, - node, results, element)) + workers.add(tasks.spawn(method(configmanager, creds, + node, results, element))) def enos_login(node, configmanager, creds): @@ -91,7 +91,7 @@ def create(nodes, element, configmanager, inputdata): def retrieve(nodes, element, configmanager, inputdata): - results = queue.LightQueue() + results = asyncio.Queue() workers = set([]) if element == ["power", "state"]: for node in nodes: @@ -112,14 +112,14 @@ def retrieve(nodes, element, configmanager, inputdata): currtimeout = 10 while workers: try: - datum = results.get(10) + datum = await results.get() while datum: if datum: yield datum datum = results.get_nowait() - except queue.Empty: + except asyncio.QueueEmpty: pass - eventlet.sleep(0.001) + await asyncio.sleep(0.001) for t in list(workers): if t.dead: workers.discard(t) @@ -128,7 +128,7 @@ def retrieve(nodes, element, configmanager, inputdata): datum = results.get_nowait() if datum: yield datum - except queue.Empty: + except asyncio.QueueEmpty: pass diff --git a/confluent_server/confluent/runansible.py b/confluent_server/confluent/runansible.py index b82d8684..e9a9d7e6 100644 --- a/confluent_server/confluent/runansible.py +++ b/confluent_server/confluent/runansible.py @@ -17,10 +17,10 @@ try: import confluent.sshutil as sshutil - import eventlet - import eventlet.green.subprocess as subprocess + import confluent.tasks as tasks except ImportError: pass +import asyncio import shutil import json import msgpack @@ -40,7 +40,7 @@ class PlayRunner(object): self.complete = False def _start_playbooks(self): - self.worker = eventlet.spawn(self._really_run_playbooks) + self.worker = tasks.spawn(self._really_run_playbooks()) def get_available_results(self): avail = self.results @@ -79,7 +79,7 @@ class PlayRunner(object): 'results': self.get_available_results() } - def _really_run_playbooks(self): + async def _really_run_playbooks(self): global anspypath try: mypath = anspypath @@ -92,21 +92,21 @@ class PlayRunner(object): mypath = anspypath if not mypath: mypath = sys.executable - with open(os.devnull, 'w+') as devnull: - targnodes = ','.join(self.nodes) - for playfilename in self.playfiles: - worker = subprocess.Popen( - [mypath, __file__, targnodes, playfilename], - stdin=devnull, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stder = worker.communicate() - self.stderr += stder.decode('utf8') - current = memoryview(stdout) - while len(current): - sz = struct.unpack('=q', current[:8])[0] - result = msgpack.unpackb(current[8:8+sz], raw=False) - self.results.append(result) - current = current[8+sz:] + targnodes = ','.join(self.nodes) + for playfilename in self.playfiles: + worker = await asyncio.create_subprocess_exec( + mypath, __file__, targnodes, playfilename, + stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + stdout, stder = await worker.communicate() + self.stderr += stder.decode('utf8') + current = memoryview(stdout) + while len(current): + sz = struct.unpack('=q', current[:8])[0] + result = msgpack.unpackb(current[8:8+sz], raw=False) + self.results.append(result) + current = current[8+sz:] finally: self.complete = True diff --git a/confluent_server/confluent/selfservice.py b/confluent_server/confluent/selfservice.py index 86abab70..6695a5f5 100644 --- a/confluent_server/confluent/selfservice.py +++ b/confluent_server/confluent/selfservice.py @@ -366,7 +366,7 @@ async def handle_request(req, make_response, mimetype): pass if needlocalectl: try: - langinfo = util.run(['localectl', 'status'])[0].split(b'\n') + langinfo = (await util.check_output('localectl', 'status'))[0].split(b'\n') except Exception: langinfo = [] for line in langinfo: diff --git a/confluent_server/confluent/shellmodule.py b/confluent_server/confluent/shellmodule.py index d8cc224f..89bfb7e7 100644 --- a/confluent_server/confluent/shellmodule.py +++ b/confluent_server/confluent/shellmodule.py @@ -22,10 +22,10 @@ # only by the process owner and such an owner would be able to read a file # anyway. Regardless, it is advisable to 'unset' +import asyncio import confluent.interface.console as conapi -import eventlet -import eventlet.green.select as select -import eventlet.green.subprocess as subprocess +import confluent.tasks as tasks +#TODO: ASYNC: port shellmodule over if it matters import fcntl import os import pty @@ -45,7 +45,7 @@ class ExecConsole(conapi.Console): 'CONFLUENT_NODE': node, } - def relaydata(self): + async def relaydata(self): while self.subproc is not None: rdylist, _, _ = select.select( (self._master, self.subproc.stderr), (), (), @@ -55,7 +55,7 @@ class ExecConsole(conapi.Console): somedata = os.read(self._master, 128) while somedata: self._datacallback(somedata) - eventlet.sleep(0) + await asyncio.sleep(0) somedata = os.read(self._master, 128) except OSError as e: if e.errno == 5: @@ -69,7 +69,7 @@ class ExecConsole(conapi.Console): somedata = self.subproc.stderr.read() while somedata: self._datacallback(somedata) - eventlet.sleep(0) + await asyncio.sleep(0) somedata = self.subproc.stderr.read() except IOError as e: if e.errno != 11: @@ -95,12 +95,12 @@ class ExecConsole(conapi.Console): os.close(slave) fcntl.fcntl(master, fcntl.F_SETFL, os.O_NONBLOCK) fcntl.fcntl(self.subproc.stderr.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) - self.readerthread = eventlet.spawn(self.relaydata) + self.readerthread = tasks.spawn(self.relaydata()) def write(self, data): os.write(self._master, data) - def close(self): + async def close(self): try: os.close(self._master) except OSError: @@ -110,7 +110,7 @@ class ExecConsole(conapi.Console): self.subproc.terminate() waittime = 10 while self.subproc is not None and self.subproc.poll() is None: - eventlet.sleep(1) + await asyncio.sleep(1) waittime -= 1 if waittime == 0: break diff --git a/confluent_server/confluent/shellserver.py b/confluent_server/confluent/shellserver.py index 97f7858a..aa9101a7 100644 --- a/confluent_server/confluent/shellserver.py +++ b/confluent_server/confluent/shellserver.py @@ -19,6 +19,7 @@ # capacity for having a multiple of sessions per node active at a given time +import asyncio import confluent.consoleserver as consoleserver import confluent.exceptions as exc import confluent.messages as msg @@ -28,9 +29,9 @@ activesessions = {} _reaper = None -def reapsessions(): +async def reapsessions(): while True: - eventlet.sleep(30) + await asyncio.sleep(30) for clientid in activesessions: currcli = activesessions[clientid] for sesshdl in list(currcli): @@ -50,7 +51,7 @@ class _ShellHandler(consoleserver.ConsoleHandler): self.numusers = 0 global _reaper if _reaper is None: - _reaper = eventlet.spawn(reapsessions) + _reaper = tasks.spawn(reapsessions()) def check_collective(self, attrvalue): diff --git a/confluent_server/confluent/snmputil.py b/confluent_server/confluent/snmputil.py index fd82fa07..faac30d8 100644 --- a/confluent_server/confluent/snmputil.py +++ b/confluent_server/confluent/snmputil.py @@ -17,8 +17,8 @@ # This provides a simplified wrapper around snmp implementation roughly # mapping to the net-snmp commands -# net-snmp-python was considered as the API is cleaner, but the ability to -# patch pysnmp to have it be eventlet friendly has caused it's selection +# net-snmp-python was considered as the API is cleaner, but +# want clean asyncio support # This module simplifies the complex hlapi pysnmp interface import asyncio diff --git a/confluent_server/confluent/sshutil.py b/confluent_server/confluent/sshutil.py index 6ecb143a..89786131 100644 --- a/confluent_server/confluent/sshutil.py +++ b/confluent_server/confluent/sshutil.py @@ -6,7 +6,6 @@ import confluent.config.configmanager as cfm import confluent.collective.manager as collective import confluent.util as util import glob -import eventlet.green.os as os import shutil import tempfile @@ -39,7 +38,7 @@ async def assure_agent(): if agent_pid is None: try: agent_starting = True - sai = util.run(['ssh-agent'])[0] + sai = await util.check_output(['ssh-agent'])[0] for line in sai.split(b'\n'): if b';' not in line: continue diff --git a/confluent_server/confluent/syncfiles.py b/confluent_server/confluent/syncfiles.py index 16cf4c49..048fbe82 100644 --- a/confluent_server/confluent/syncfiles.py +++ b/confluent_server/confluent/syncfiles.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import glob import os import shutil @@ -21,7 +22,7 @@ import tempfile import confluent.sshutil as sshutil import confluent.util as util import confluent.noderange as noderange -import eventlet +import confluent.tasks as tasks import pwd import grp import sys @@ -194,8 +195,8 @@ def sync_list_to_node(sl, node, suffixes, peerip=None): targip = node if peerip: targip = peerip - output, stderr = util.run( - ['rsync', '-rvLD', targdir + '/', 'root@[{}]:/'.format(targip)]) + output, stderr = util.check_output( + 'rsync', '-rvLD', targdir + '/', 'root@[{}]:/'.format(targip)) except Exception as e: if 'CalledProcessError' not in repr(e): # https://github.com/eventlet/eventlet/issues/413 @@ -321,14 +322,14 @@ def start_syncfiles(nodename, cfg, suffixes, principals=[]): syncrunners[nodename].wait() else: return '503 Synchronization already in progress', 'Synchronization already in progress for {}'.format(nodename) - syncrunners[nodename] = eventlet.spawn( - sync_list_to_node, sl, nodename, suffixes, peerip) + syncrunners[nodename] = tasks.spawn( + sync_list_to_node(sl, nodename, suffixes, peerip)) if not cleaner: - cleaner = eventlet.spawn(cleanit) + cleaner = tasks.spawn(cleanit()) return '202 Queued', 'Background synchronization initiated' # backgrounded -def cleanit(): +async def cleanit(): toreap = {} while True: for nn in list(syncrunners): @@ -345,7 +346,7 @@ def cleanit(): toreap[nn] = 1 elif nn in toreap: del toreap[nn] - eventlet.sleep(30) + await asyncio.sleep(30) def get_syncresult(nodename):