From 50e530ebdedd5c457aa9c1a6deb23945bf761d4b Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 22 Jan 2026 14:40:44 -0500 Subject: [PATCH] Replace pyghmi with aiohmi in various plugins, remove some eventlet usage --- .../confluent/discovery/protocols/mdns.py | 3 +- confluent_server/confluent/networking/lldp.py | 59 +++--- .../plugins/hardwaremanagement/cnos.py | 172 +++++++++--------- .../plugins/hardwaremanagement/cooltera.py | 108 ++--------- .../plugins/hardwaremanagement/enclosure.py | 2 +- .../plugins/hardwaremanagement/enos.py | 1 - .../plugins/hardwaremanagement/proxmox.py | 96 +++++----- confluent_server/confluent/tasks.py | 23 +++ 8 files changed, 197 insertions(+), 267 deletions(-) diff --git a/confluent_server/confluent/discovery/protocols/mdns.py b/confluent_server/confluent/discovery/protocols/mdns.py index fa766b24..2e09075f 100644 --- a/confluent_server/confluent/discovery/protocols/mdns.py +++ b/confluent_server/confluent/discovery/protocols/mdns.py @@ -44,7 +44,8 @@ import time import struct import traceback -webclient = eventlet.import_patched('pyghmi.util.webclient') +import aiohmi.util.webclient as webclient + mcastv4addr = '224.0.0.251' mcastv6addr = 'ff02::fb' diff --git a/confluent_server/confluent/networking/lldp.py b/confluent_server/confluent/networking/lldp.py index fe18fe62..533ed184 100644 --- a/confluent_server/confluent/networking/lldp.py +++ b/confluent_server/confluent/networking/lldp.py @@ -33,6 +33,7 @@ if __name__ == '__main__': import sys import confluent.config.configmanager as cfm +import asyncio import base64 import confluent.networking.nxapi as nxapi import confluent.exceptions as exc @@ -41,11 +42,11 @@ import confluent.messages as msg import confluent.snmputil as snmp import confluent.networking.netutil as netutil import confluent.util as util -import eventlet -from eventlet.greenpool import GreenPool -import eventlet.semaphore +import confluent.tasks as tasks import re -webclient = eventlet.import_patched('pyghmi.util.webclient') + +import aiohmi.util.webclient as webclient + # The interesting OIDs are: # lldpLocChassisId - to cross reference (1.0.8802.1.1.2.1.3.2.0) # lldpLocPortId - for cross referencing.. (1.0.8802.1.1.2.1.3.7.1.3) @@ -180,35 +181,35 @@ def detect_backend(switch, verifier): backend = _fastbackends.get(switch, None) if backend: return backend - wc = webclient.SecureHTTPConnection( + wc = webclient.WebConnection( switch, 443, verifycallback=verifier, timeout=5) - apicheck, retcode = wc.grab_json_response_with_status('/affluent/') + apicheck, retcode = await wc.grab_json_response_with_status('/affluent/') if retcode == 401 and apicheck.startswith(b'{}'): _fastbackends[switch] = 'affluent' else: - apicheck, retcode = wc.grab_json_response_with_status('/api/') + apicheck, retcode = await wc.grab_json_response_with_status('/api/') if retcode == 400 and apicheck.startswith(b'{"imdata":['): _fastbackends[switch] = 'nxapi' return _fastbackends.get(switch, None) -def _extract_neighbor_data_https(switch, user, password, cfm, lldpdata): +async def _extract_neighbor_data_https(switch, user, password, cfm, lldpdata): kv = util.TLSCertVerifier(cfm, switch, 'pubkeys.tls_hardwaremanager').verify_cert - backend = detect_backend(switch, kv) + backend = await detect_backend(switch, kv) if not backend: raise Exception("No HTTPS backend identified") - wc = webclient.SecureHTTPConnection( + wc = webclient.WebConnection( switch, 443, verifycallback=kv, timeout=5) if backend == 'affluent': return _extract_neighbor_data_affluent(switch, user, password, cfm, lldpdata, wc) elif backend == 'nxapi': - return _extract_neighbor_data_nxapi(switch, user, password, cfm, lldpdata, wc) + return await _extract_neighbor_data_nxapi(switch, user, password, cfm, lldpdata, wc) -def _extract_neighbor_data_nxapi(switch, user, password, cfm, lldpdata, wc): +async def _extract_neighbor_data_nxapi(switch, user, password, cfm, lldpdata, wc): cli = nxapi.NxApiClient(switch, user, password, cfm) - lldpinfo = cli.get_lldp() + lldpinfo = await cli.get_lldp() for port in lldpinfo: portdata = lldpinfo[port] peerid = '{0}.{1}'.format( @@ -221,9 +222,9 @@ def _extract_neighbor_data_nxapi(switch, user, password, cfm, lldpdata, wc): lldpdata[port] = portdata _neighdata[switch] = lldpdata -def _extract_neighbor_data_affluent(switch, user, password, cfm, lldpdata, wc): +async def _extract_neighbor_data_affluent(switch, user, password, cfm, lldpdata, wc): wc.set_basic_credentials(user, password) - neighdata = wc.grab_json_response('/affluent/lldp/all') + neighdata = await wc.grab_json_response('/affluent/lldp/all') chassisid = neighdata['chassis']['id'] _chassisidbyswitch[switch] = chassisid, for record in neighdata['neighbors']: @@ -250,7 +251,7 @@ def _extract_neighbor_data_affluent(switch, user, password, cfm, lldpdata, wc): _neighdata[switch] = lldpdata -def _extract_neighbor_data_b(args): +async def _extract_neighbor_data_b(args): """Build LLDP data about elements connected to switch args are carried as a tuple, because of eventlet convenience @@ -268,7 +269,7 @@ def _extract_neighbor_data_b(args): return lldpdata = {'!!vintage': now} try: - return _extract_neighbor_data_https(switch, user, password, cfm, lldpdata) + return await _extract_neighbor_data_https(switch, user, password, cfm, lldpdata) except Exception as e: pass conn = snmp.Session(switch, password, user, privacy_protocol=privproto) @@ -327,19 +328,19 @@ def _extract_neighbor_data_b(args): _neighdata[switch] = lldpdata -def update_switch_data(switch, configmanager, force=False, retexc=False): +async def update_switch_data(switch, configmanager, force=False, retexc=False): switchcreds = netutil.get_switchcreds(configmanager, (switch,))[0] - ndr = _extract_neighbor_data(switchcreds + (force, retexc)) + ndr = await _extract_neighbor_data(switchcreds + (force, retexc)) if retexc and isinstance(ndr, Exception): raise ndr return _neighdata.get(switch, {}) -def update_neighbors(configmanager, force=False, retexc=False): - return _update_neighbors_backend(configmanager, force, retexc) +async def update_neighbors(configmanager, force=False, retexc=False): + return await _update_neighbors_backend(configmanager, force, retexc) -def _update_neighbors_backend(configmanager, force, retexc): +async def _update_neighbors_backend(configmanager, force, retexc): global _neighdata global _neighbypeerid vintage = _neighdata.get('!!vintage', 0) @@ -351,23 +352,22 @@ def _update_neighbors_backend(configmanager, force, retexc): switches = netutil.list_switches(configmanager) switchcreds = netutil.get_switchcreds(configmanager, switches) switchcreds = [ x + (force, retexc) for x in switchcreds] - pool = GreenPool(64) - for ans in pool.imap(_extract_neighbor_data, switchcreds): + async for ans in tasks.task_imap(_extract_neighbor_data, switchcreds, max_concurrent=64): yield ans -def _extract_neighbor_data(args): +async def _extract_neighbor_data(args): # single switch neighbor data update switch = args[0] if switch not in _updatelocks: - _updatelocks[switch] = eventlet.semaphore.Semaphore() + _updatelocks[switch] = asyncio.Semaphore() if _updatelocks[switch].locked(): while _updatelocks[switch].locked(): - eventlet.sleep(1) + await asyncio.sleep(1) return try: - with _updatelocks[switch]: - return _extract_neighbor_data_b(args) + async with _updatelocks[switch]: + return await _extract_neighbor_data_b(args) except Exception as e: yieldexc = False if len(args) >= 7: @@ -382,6 +382,7 @@ if __name__ == '__main__': # (should do three argument form for snmpv3 test import sys _extract_neighbor_data((sys.argv[1], sys.argv[2], None, True)) + asyncio.run(_extract_neighbor_data((sys.argv[1], sys.argv[2], None, True))) print(repr(_neighdata)) diff --git a/confluent_server/confluent/plugins/hardwaremanagement/cnos.py b/confluent_server/confluent/plugins/hardwaremanagement/cnos.py index 1963a94d..a398749b 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/cnos.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/cnos.py @@ -23,10 +23,10 @@ # - One power supply is off. -import eventlet -import eventlet.queue as queue +import asyncio +from confluent_server.confluent import tasks import confluent.exceptions as exc -webclient = eventlet.import_patched('pyghmi.util.webclient') +import aiohmi.util.webclient as webclient import confluent.messages as msg import confluent.util as util @@ -38,92 +38,39 @@ class SwitchSensor(object): self.health = health -def cnos_login(node, configmanager, creds): - wc = webclient.SecureHTTPConnection(node, port=443, verifycallback=util.TLSCertVerifier( +async def cnos_login(node, configmanager, creds): + wc = webclient.WebConnection(node, port=443, verifycallback=util.TLSCertVerifier( configmanager, node, 'pubkeys.tls_hardwaremanager').verify_cert) wc.set_basic_credentials(creds[node]['secret.hardwaremanagementuser']['value'], creds[node]['secret.hardwaremanagementpassword']['value']) - wc.request('GET', '/nos/api/login/') - rsp = wc.getresponse() - body = rsp.read() - if rsp.status == 401: # CNOS gives 401 on first attempt... - wc.request('GET', '/nos/api/login/') - rsp = wc.getresponse() - body = rsp.read() - if rsp.status >= 200 and rsp.status < 300: + body, status, headers = await wc.grab_response_with_status('/nos/api/login/') + if status == 401: # CNOS gives 401 on first attempt... + body, status, headers = await wc.grab_response_with_status('/nos/api/login/') + if status >= 200 and status < 300: return wc raise exc.TargetEndpointBadCredentials('Unable to authenticate') -def update(nodes, element, configmanager, inputdata): +async def update(nodes, element, configmanager, inputdata): for node in nodes: yield msg.ConfluentNodeError(node, 'Not Implemented') -def delete(nodes, element, configmanager, inputdata): +async def delete(nodes, element, configmanager, inputdata): for node in nodes: yield msg.ConfluentNodeError(node, 'Not Implemented') -def create(nodes, element, configmanager, inputdata): +async def create(nodes, element, configmanager, inputdata): for node in nodes: yield msg.ConfluentNodeError(node, 'Not Implemented') -def retrieve(nodes, element, configmanager, inputdata): - results = queue.LightQueue() - workers = set([]) - if element == ['power', 'state']: - for node in nodes: - yield msg.PowerState(node=node, state='on') - return - elif element == ['health', 'hardware']: - creds = configmanager.get_node_attributes( - nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True) - for node in nodes: - workers.add(eventlet.spawn(retrieve_health, configmanager, creds, - node, results)) - elif element[:3] == ['inventory', 'hardware', 'all']: - creds = configmanager.get_node_attributes( - nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True) - for node in nodes: - workers.add(eventlet.spawn(retrieve_inventory, configmanager, - creds, node, results, element)) - elif element[:3] == ['inventory', 'firmware', 'all']: - creds = configmanager.get_node_attributes( - nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True) - for node in nodes: - workers.add(eventlet.spawn(retrieve_firmware, configmanager, - creds, node, results, element)) - else: - for node in nodes: - yield msg.ConfluentNodeError(node, 'Not Implemented') - return - currtimeout = 10 - while workers: - try: - datum = results.get(10) - while datum: - if datum: - yield datum - datum = results.get_nowait() - except queue.Empty: - pass - eventlet.sleep(0.001) - for t in list(workers): - if t.dead: - workers.discard(t) - try: - while True: - datum = results.get_nowait() - if datum: - yield datum - except queue.Empty: - pass -def retrieve_inventory(configmanager, creds, node, results, element): + +async def retrieve_inventory(configmanager, creds, node, results, element): if len(element) == 3: - results.put(msg.ChildCollection('all')) - results.put(msg.ChildCollection('system')) + await results.put(msg.ChildCollection('all')) + await results.put(msg.ChildCollection('system')) return - wc = cnos_login(node, configmanager, creds) - sysinfo = wc.grab_json_response('/nos/api/sysinfo/inventory') + wc = await cnos_login(node, configmanager, creds) + sysinfo = await wc.grab_json_response('/nos/api/sysinfo/inventory') invinfo = { 'inventory': [{ 'name': 'System', @@ -138,39 +85,39 @@ def retrieve_inventory(configmanager, creds, node, results, element): } }] } - results.put(msg.KeyValueData(invinfo, node)) + await results.put(msg.KeyValueData(invinfo, node)) -def retrieve_firmware(configmanager, creds, node, results, element): +async def retrieve_firmware(configmanager, creds, node, results, element): if len(element) == 3: - results.put(msg.ChildCollection('all')) + await results.put(msg.ChildCollection('all')) return - wc = cnos_login(node, configmanager, creds) - sysinfo = wc.grab_json_response('/nos/api/sysinfo/inventory') + wc = await cnos_login(node, configmanager, creds) + sysinfo = await wc.grab_json_response('/nos/api/sysinfo/inventory') items = [{ 'Software': {'version': sysinfo['Software Revision']}, }, { 'BIOS': {'version': sysinfo['BIOS Revision']}, }] - results.put(msg.Firmware(items, node)) + await results.put(msg.Firmware(items, node)) -def retrieve_health(configmanager, creds, node, results): - wc = cnos_login(node, configmanager, creds) - hinfo = wc.grab_json_response('/nos/api/sysinfo/globalhealthstatus') +async def retrieve_health(configmanager, creds, node, results): + wc = await cnos_login(node, configmanager, creds) + hinfo = await wc.grab_json_response('/nos/api/sysinfo/globalhealthstatus') summary = hinfo['status'].lower() if summary == 'noncritical': summary = 'warning' - results.put(msg.HealthSummary(summary, name=node)) + await results.put(msg.HealthSummary(summary, name=node)) state = None badreadings = [] if summary != 'ok': # temperature or dump or fans or psu - wc.grab_json_response('/nos/api/sysinfo/panic_dump') - switchinfo = wc.grab_json_response('/nos/api/sysinfo/panic_dump') + await wc.grab_json_response('/nos/api/sysinfo/panic_dump') + switchinfo = await wc.grab_json_response('/nos/api/sysinfo/panic_dump') if switchinfo: badreadings.append( SwitchSensor('Panicdump', ['Present'], health='warning')) - switchinfo = wc.grab_json_response('/nos/api/sysinfo/temperatures') + switchinfo = await wc.grab_json_response('/nos/api/sysinfo/temperatures') for temp in switchinfo: if temp == 'Temperature threshold': continue @@ -181,17 +128,68 @@ def retrieve_health(configmanager, creds, node, results): tempval = switchinfo[temp]['Temp'] badreadings.append( SwitchSensor(temp, [], value=tempval, health=temphealth)) - switchinfo = wc.grab_json_response('/nos/api/sysinfo/fans') + switchinfo = await wc.grab_json_response('/nos/api/sysinfo/fans') for fan in switchinfo: if switchinfo[fan]['speed-rpm'] < 100: badreadings.append( SwitchSensor(fan, [], value=switchinfo[fan]['speed-rpm'], health='critical')) - switchinfo = wc.grab_json_response('/nos/api/sysinfo/power') + switchinfo = await wc.grab_json_response('/nos/api/sysinfo/power') for psu in switchinfo: if switchinfo[psu]['State'] != 'Normal ON': psuname = switchinfo[psu]['Name'] badreadings.append( SwitchSensor(psuname, states=[switchinfo[psu]['State']], health='critical')) - results.put(msg.SensorReadings(badreadings, name=node)) + await results.put(msg.SensorReadings(badreadings, name=node)) + +async def retrieve(nodes, element, configmanager, inputdata): + results = asyncio.Queue() + workers = set([]) + if element == ['power', 'state']: + for node in nodes: + yield msg.PowerState(node=node, state='on') + return + elif element == ['health', 'hardware']: + creds = configmanager.get_node_attributes( + nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True) + for node in nodes: + workers.add(tasks.spawn(retrieve_health(configmanager, creds, + node, results)) + elif element[:3] == ['inventory', 'hardware', 'all']: + creds = configmanager.get_node_attributes( + nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True) + for node in nodes: + workers.add(tasks.spawn(retrieve_inventory(configmanager, + creds, node, results, element))) + elif element[:3] == ['inventory', 'firmware', 'all']: + creds = configmanager.get_node_attributes( + nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True) + for node in nodes: + workers.add(tasks.spawn(retrieve_firmware(configmanager, + creds, node, results, element))) + else: + for node in nodes: + yield msg.ConfluentNodeError(node, 'Not Implemented') + return + currtimeout = 10 + while workers: + try: + datum = await asyncio.wait_for(results.get(), timeout=10) + while datum: + if datum: + yield datum + datum = results.get_nowait() + except asyncio.QueueEmpty: + pass + await asyncio.sleep(0.001) + for t in list(workers): + if t.done(): + workers.discard(t) + try: + while True: + datum = results.get_nowait() + if datum: + yield datum + except asyncio.QueueEmpty: + pass diff --git a/confluent_server/confluent/plugins/hardwaremanagement/cooltera.py b/confluent_server/confluent/plugins/hardwaremanagement/cooltera.py index 80265878..86a24774 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/cooltera.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/cooltera.py @@ -16,17 +16,9 @@ from xml.etree.ElementTree import fromstring as rfromstring import confluent.util as util import confluent.messages as msg import confluent.exceptions as exc -import eventlet.green.time as time -import eventlet.green.socket as socket -import eventlet.greenpool as greenpool -import eventlet -wc = eventlet.import_patched('pyghmi.util.webclient') -try: - import Cookie - httplib = eventlet.import_patched('httplib') -except ImportError: - httplib = eventlet.import_patched('http.client') - import http.cookies as Cookie +import confluent.tasks as tasks +import aiohmi.util.webclient as wc +import time sensorsbymodel = { 'FS1350': ['alarms', 'dt', 'duty', 'dw', 'mode', 'p3state', 'primflow', 'ps1', 'ps1a', 'ps1b', 'ps2', 'ps3', 'ps4', 'ps5a', 'ps5b', 'ps5c', 'pumpspeed1', 'pumpspeed2', 'pumpspeed3', 'rh', 'sdp', 'secflow', 'setpoint', 't1', 't2', 't2a', 't2b', 't2c', 't3', 't3', 't4', 't5', 'valve', 'valve2'], @@ -104,81 +96,6 @@ def simplify_name(name): return name.lower().replace(' ', '_').replace('/', '-').replace( '_-_', '-') -pdupool = greenpool.GreenPool(128) - -class WebResponse(httplib.HTTPResponse): - def _check_close(self): - return True - -class WebConnection(wc.SecureHTTPConnection): - response_class = WebResponse - def __init__(self, host, secure, verifycallback): - if secure: - port = 443 - else: - port = 80 - wc.SecureHTTPConnection.__init__(self, host, port, verifycallback=verifycallback) - self.secure = secure - self.cookies = {} - - def connect(self): - if self.secure: - return super(WebConnection, self).connect() - addrinfo = socket.getaddrinfo(self.host, self.port)[0] - # workaround problems of too large mtu, moderately frequent occurance - # in this space - plainsock = socket.socket(addrinfo[0]) - plainsock.settimeout(self.mytimeout) - try: - plainsock.setsockopt(socket.IPPROTO_TCP, socket.TCP_MAXSEG, 1456) - except socket.error: - pass - plainsock.connect(addrinfo[4]) - self.sock = plainsock - - def getresponse(self): - try: - rsp = super(WebConnection, self).getresponse() - try: - hdrs = [x.split(':', 1) for x in rsp.msg.headers] - except AttributeError: - hdrs = rsp.msg.items() - for hdr in hdrs: - if hdr[0] == 'Set-Cookie': - c = Cookie.BaseCookie(hdr[1]) - for k in c: - self.cookies[k] = c[k].value - except httplib.BadStatusLine: - self.broken = True - raise - return rsp - - def request(self, method, url, body=None): - headers = {} - if body: - headers['Content-Length'] = len(body) - cookies = [] - for cookie in self.cookies: - cookies.append('{0}={1}'.format(cookie, self.cookies[cookie])) - headers['Cookie'] = ';'.join(cookies) - headers['Host'] = 'pdu.cluster.net' - headers['Accept'] = '*/*' - headers['Accept-Language'] = 'en-US,en;q=0.9' - headers['Connection'] = 'close' - headers['Referer'] = 'http://pdu.cluster.net/setting_admin.htm' - return super(WebConnection, self).request(method, url, body, headers) - - def grab_response(self, url, body=None, method=None): - if method is None: - method = 'GET' if body is None else 'POST' - if body: - self.request(method, url, body) - else: - self.request(method, url) - rsp = self.getresponse() - body = rsp.read() - return body, rsp.status - class CoolteraClient(object): def __init__(self, cdu, configmanager): self.node = cdu @@ -201,7 +118,7 @@ class CoolteraClient(object): cv = util.TLSCertVerifier( self.configmanager, self.node, 'pubkeys.tls_hardwaremanager').verify_cert - self._wc = WebConnection(target, secure=True, verifycallback=cv) + self._wc = wc.WebConnection(target, 443, verifycallback=cv) return self._wc @@ -231,14 +148,12 @@ def xml2stateinfo(statdata): _sensors_by_node = {} -def read_sensors(element, node, configmanager): +async def read_sensors(element, node, configmanager): category, name = element[-2:] - justnames = False if len(element) == 3: # just get names category = name name = 'all' - justnames = True for sensor in sensors: yield msg.ChildCollection(simplify_name(sensors[sensor][0])) return @@ -247,9 +162,7 @@ def read_sensors(element, node, configmanager): sn = _sensors_by_node.get(node, None) if not sn or sn[1] < time.time(): cc = CoolteraClient(node, configmanager) - cc.wc.request('GET', '/status.xml') - rsp = cc.wc.getresponse() - statdata = rsp.read() + statdata, status, hdrs = await cc.wc.grab_response_with_status('/status.xml') statinfo = xml2stateinfo(statdata) _sensors_by_node[node] = (statinfo, time.time() + 1) sn = _sensors_by_node.get(node, None) @@ -257,12 +170,13 @@ def read_sensors(element, node, configmanager): yield msg.SensorReadings(sn[0], name=node) -def retrieve(nodes, element, configmanager, inputdata): +async def retrieve(nodes, element, configmanager, inputdata): if element[0] == 'sensors': - gp = greenpool.GreenPile(pdupool) + taskargs = [] for node in nodes: - gp.spawn(read_sensors, element, node, configmanager) - for rsp in gp: + taskargs.append((element, node, configmanager)) + gp = tasks.starmap(read_sensors, taskargs) + async for rsp in gp: for datum in rsp: yield datum return diff --git a/confluent_server/confluent/plugins/hardwaremanagement/enclosure.py b/confluent_server/confluent/plugins/hardwaremanagement/enclosure.py index 434b9aef..5478e1d3 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/enclosure.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/enclosure.py @@ -13,7 +13,7 @@ # limitations under the License. import confluent.core as core import confluent.messages as msg -import pyghmi.exceptions as pygexc +import aiohmi.exceptions as pygexc import confluent.exceptions as exc diff --git a/confluent_server/confluent/plugins/hardwaremanagement/enos.py b/confluent_server/confluent/plugins/hardwaremanagement/enos.py index f568fae2..866390bd 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/enos.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/enos.py @@ -25,7 +25,6 @@ import re import eventlet import eventlet.queue as queue import confluent.exceptions as exc -webclient = eventlet.import_patched("pyghmi.util.webclient") import confluent.messages as msg import confluent.util as util import confluent.plugins.shell.ssh as ssh diff --git a/confluent_server/confluent/plugins/hardwaremanagement/proxmox.py b/confluent_server/confluent/plugins/hardwaremanagement/proxmox.py index 49368c3c..215d4a71 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/proxmox.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/proxmox.py @@ -1,18 +1,12 @@ +import asyncio import confluent.vinzmanager as vinzmanager -import codecs import confluent.util as util import confluent.messages as msg -import eventlet -import json -import struct -webclient = eventlet.import_patched('pyghmi.util.webclient') -import eventlet.green.socket as socket -import eventlet +import aiohmi.util.webclient as webclient import confluent.interface.console as conapi import io import urllib.parse as urlparse -import eventlet.green.ssl as ssl import eventlet @@ -211,20 +205,20 @@ class PmxApiClient: except Exception: pass self.server = server - self.wc = webclient.SecureHTTPConnection(server, port=8006, verifycallback=cv) + self.wc = webclient.WebConnection(server, port=8006, verifycallback=cv) self.fprint = configmanager.get_node_attributes(server, 'pubkeys.tls').get(server, {}).get('pubkeys.tls', {}).get('value', None) self.vmmap = {} self.login() self.vmlist = {} self.vmbyid = {} - def login(self): + async def login(self): loginform = { 'username': self.user, 'password': self.password, } loginbody = urlparse.urlencode(loginform) - rsp = self.wc.grab_json_response_with_status('/api2/json/access/ticket', loginbody) + rsp = await self.wc.grab_json_response_with_status('/api2/json/access/ticket', loginbody) self.wc.cookies['PVEAuthCookie'] = rsp[0]['data']['ticket'] self.pac = rsp[0]['data']['ticket'] self.wc.set_header('CSRFPreventionToken', rsp[0]['data']['CSRFPreventionToken']) @@ -233,23 +227,23 @@ class PmxApiClient: def get_screenshot(self, vm, outfile): raise Exception("Not implemented") - def map_vms(self): - rsp = self.wc.grab_json_response('/api2/json/cluster/resources') + async def map_vms(self): + rsp = await self.wc.grab_json_response('/api2/json/cluster/resources') for datum in rsp.get('data', []): if datum['type'] == 'qemu': self.vmmap[datum['name']] = (datum['node'], datum['id']) return self.vmmap - def get_vm(self, vm): + async def get_vm(self, vm): if vm not in self.vmmap: - self.map_vms() + await self.map_vms() return self.vmmap[vm] - def get_vm_inventory(self, vm): - host, guest = self.get_vm(vm) - cfg = self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending') + async def get_vm_inventory(self, vm): + host, guest = await self.get_vm(vm) + cfg = await self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending') myuuid = None sysinfo = {'name': 'System', 'present': True, 'information': { 'Product name': 'Proxmox qemu virtual machine', @@ -281,15 +275,15 @@ class PmxApiClient: yield msg.KeyValueData({'inventory': invitems}, vm) - def get_vm_ikvm(self, vm): - return self.get_vm_consproxy(vm, 'vnc') + async def get_vm_ikvm(self, vm): + return await self.get_vm_consproxy(vm, 'vnc') - def get_vm_serial(self, vm): - return self.get_vm_consproxy(vm, 'term') + async def get_vm_serial(self, vm): + return await self.get_vm_consproxy(vm, 'term') - def get_vm_consproxy(self, vm, constype): - host, guest = self.get_vm(vm) - rsp = self.wc.grab_json_response_with_status(f'/api2/json/nodes/{host}/{guest}/{constype}proxy', method='POST') + async def get_vm_consproxy(self, vm, constype): + host, guest = await self.get_vm(vm) + rsp = await self.wc.grab_json_response_with_status(f'/api2/json/nodes/{host}/{guest}/{constype}proxy', method='POST') consdata = rsp[0]['data'] consdata['server'] = self.server consdata['host'] = host @@ -297,9 +291,9 @@ class PmxApiClient: consdata['pac'] = self.pac return consdata - def get_vm_bootdev(self, vm): - host, guest = self.get_vm(vm) - cfg = self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending') + async def get_vm_bootdev(self, vm): + host, guest = await self.get_vm(vm) + cfg = await self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending') for datum in cfg['data']: if datum['key'] == 'boot': bootseq = datum.get('pending', datum['value']) @@ -312,9 +306,9 @@ class PmxApiClient: return 'default' - def get_vm_power(self, vm): - host, guest = self.get_vm(vm) - rsp = self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/status/current') + async def get_vm_power(self, vm): + host, guest = await self.get_vm(vm) + rsp = await self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/status/current') rsp = rsp['data'] currstatus = rsp["qmpstatus"] # stopped, "running" if currstatus == 'running': @@ -323,15 +317,15 @@ class PmxApiClient: return 'off' raise Exception("Unknnown response to status query") - def set_vm_power(self, vm, state): - host, guest = self.get_vm(vm) + async def set_vm_power(self, vm, state): + host, guest = await self.get_vm(vm) current = None newstate = '' targstate = state if targstate == 'boot': targstate = 'on' if state == 'boot': - current = self.get_vm_power(vm) + current = await self.get_vm_power(vm) if current == 'on': state = 'reset' newstate = 'reset' @@ -342,27 +336,27 @@ class PmxApiClient: elif state == 'off': state = 'stop' if state == 'reset': # check for pending config - cfg = self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending') + cfg = await self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending') for datum in cfg['data']: if datum['key'] == 'boot' and 'pending' in datum: - self.set_vm_power(vm, 'off') - self.set_vm_power(vm, 'on') + await self.set_vm_power(vm, 'off') + await self.set_vm_power(vm, 'on') state = '' newstate = 'reset' if state: - rsp = self.wc.grab_json_response_with_status(f'/api2/json/nodes/{host}/{guest}/status/{state}', method='POST') + rsp = await self.wc.grab_json_response_with_status(f'/api2/json/nodes/{host}/{guest}/status/{state}', method='POST') if state and state != 'reset': - newstate = self.get_vm_power(vm) + newstate = await self.get_vm_power(vm) while newstate != targstate: - eventlet.sleep(0.1) - newstate = self.get_vm_power(vm) + await asyncio.sleep(0.1) + newstate = await self.get_vm_power(vm) return newstate, current - def set_vm_bootdev(self, vm, bootdev): - host, guest = self.get_vm(vm) + async def set_vm_bootdev(self, vm, bootdev): + host, guest = await self.get_vm(vm) if bootdev not in ('net', 'network', 'default'): raise Exception('Requested boot device not supported') - cfg = self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending') + cfg = await self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending') nonnetdevs = [] netdevs = [] for datum in cfg['data']: @@ -383,7 +377,7 @@ class PmxApiClient: neworder = 'order=' + ';'.join(newbootdevs) self.wc.set_header('Content-Type', 'application/json') try: - self.wc.grab_json_response_with_status(f'/api2/json/nodes/{host}/{guest}/config', {'boot': neworder}, method='PUT') + await self.wc.grab_json_response_with_status(f'/api2/json/nodes/{host}/{guest}/config', {'boot': neworder}, method='PUT') finally: del self.wc.stdheaders['Content-Type'] @@ -420,16 +414,16 @@ def retrieve(nodes, element, configmanager, inputdata): # good background for the webui, and kitty yield msg.ConfluentNodeError(node, "vnc available, screenshot not available") -def update(nodes, element, configmanager, inputdata): +async def update(nodes, element, configmanager, inputdata): clientsbynode = prep_proxmox_clients(nodes, configmanager) for node in nodes: currclient = clientsbynode[node] if element == ['power', 'state']: - newstate, oldstate = currclient.set_vm_power(node, inputdata.powerstate(node)) + newstate, oldstate = await currclient.set_vm_power(node, inputdata.powerstate(node)) yield msg.PowerState(node, newstate, oldstate) elif element == ['boot', 'nextdevice']: - currclient.set_vm_bootdev(node, inputdata.bootdevice(node)) - yield msg.BootDevice(node, currclient.get_vm_bootdev(node)) + await currclient.set_vm_bootdev(node, inputdata.bootdevice(node)) + yield msg.BootDevice(node, await currclient.get_vm_bootdev(node)) elif element == ['console', 'ikvm']: try: currclient = clientsbynode[node] @@ -441,7 +435,7 @@ def update(nodes, element, configmanager, inputdata): return # assume this is only console for now -def create(nodes, element, configmanager, inputdata): +async def create(nodes, element, configmanager, inputdata): clientsbynode = prep_proxmox_clients(nodes, configmanager) for node in nodes: if element == ['console', 'ikvm']: @@ -453,7 +447,7 @@ def create(nodes, element, configmanager, inputdata): return yield msg.ChildCollection(url) return - serialdata = clientsbynode[node].get_vm_serial(node) + serialdata = await clientsbynode[node].get_vm_serial(node) yield PmxConsole(serialdata, node, configmanager, clientsbynode[node]) return diff --git a/confluent_server/confluent/tasks.py b/confluent_server/confluent/tasks.py index cfa3a2a6..09c1b583 100644 --- a/confluent_server/confluent/tasks.py +++ b/confluent_server/confluent/tasks.py @@ -26,6 +26,28 @@ tsks = {} tasksitter = None logtrace = None +async def task_starmap(coro, iterable, max_concurrent=256): + semaphore = asyncio.Semaphore(max_concurrent) + + async def sem_coro(*args): + async with semaphore: + return await coro(*args) + + tasks = [asyncio.create_task(sem_coro(*item)) for item in iterable] + for task in asyncio.as_completed(tasks): + yield await task + +async def task_imap(coro, iterable, max_concurrent=256): + semaphore = asyncio.Semaphore(max_concurrent) + + async def sem_coro(item): + async with semaphore: + return await coro(item) + + tasks = [asyncio.create_task(sem_coro(item)) for item in iterable] + for task in asyncio.as_completed(tasks): + yield await task + async def _sit_tasks(): while True: while not tsks: @@ -60,6 +82,7 @@ def spawn(coro): while tskid in tsks: tskid = random.random() tsks[tskid] = spawn_task(coro) + return tsks[tskid] async def _sleep_and_run(sleeptime, func, args):