diff --git a/confluent_server/confluent/plugins/hardwaremanagement/affluent.py b/confluent_server/confluent/plugins/hardwaremanagement/affluent.py index 3534e714..f3e5e7a4 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/affluent.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/affluent.py @@ -14,11 +14,10 @@ # limitations under the License. -import eventlet -import eventlet.queue as queue -import eventlet.green.socket as socket +import asyncio +import socket import confluent.exceptions as exc -webclient = eventlet.import_patched('pyghmi.util.webclient') +import aiohmi.util.webclient as webclient import confluent.messages as msg import confluent.util as util @@ -32,38 +31,38 @@ class SwitchSensor(object): class WebClient(object): def __init__(self, node, configmanager, creds): self.node = node - self.wc = webclient.SecureHTTPConnection(node, port=443, verifycallback=util.TLSCertVerifier( + self.wc = webclient.WebConnection(node, port=443, verifycallback=util.TLSCertVerifier( configmanager, node, 'pubkeys.tls_hardwaremanager').verify_cert) self.wc.set_basic_credentials(creds[node]['secret.hardwaremanagementuser']['value'], creds[node]['secret.hardwaremanagementpassword']['value']) - def fetch(self, url, results): + async def fetch(self, url, results): try: - rsp, status = self.wc.grab_json_response_with_status(url) + rsp, status = await self.wc.grab_json_response_with_status(url) except exc.PubkeyInvalid: - results.put(msg.ConfluentNodeError(self.node, + results.put_nowait(msg.ConfluentNodeError(self.node, 'Mismatch detected between ' 'target certificate fingerprint and ' 'pubkeys.tls_hardwaremanager attribute')) return {} except (socket.gaierror, socket.herror, TimeoutError) as e: - results.put(msg.ConfluentTargetTimeout(self.node, str(e))) + results.put_nowait(msg.ConfluentTargetTimeout(self.node, str(e))) return {} except OSError as e: if e.errno == 113: - results.put(msg.ConfluentTargetTimeout(self.node)) + results.put_nowait(msg.ConfluentTargetTimeout(self.node)) else: - results.put(msg.ConfluentTargetTimeout(self.node), str(e)) + results.put_nowait(msg.ConfluentTargetTimeout(self.node), str(e)) return {} except Exception as e: - results.put(msg.ConfluentNodeError(self.node, + results.put_nowait(msg.ConfluentNodeError(self.node, repr(e))) return {} if status == 401: - results.put(msg.ConfluentTargetInvalidCredentials(self.node, 'Unable to authenticate')) + results.put_nowait(msg.ConfluentTargetInvalidCredentials(self.node, 'Unable to authenticate')) return {} elif status != 200: #must be str not bytes - results.put(msg.ConfluentNodeError(self.node, 'Unknown error: {} while retrieving {}'.format(rsp, url))) + results.put_nowait(msg.ConfluentNodeError(self.node, 'Unknown error: {} while retrieving {}'.format(rsp, url))) return {} return rsp @@ -121,11 +120,11 @@ def _run_method(method, workers, results, configmanager, nodes, element): creds = configmanager.get_node_attributes( nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True) for node in nodes: - workers.add(eventlet.spawn(method, configmanager, creds, - node, results, element)) + workers.add(util.spawn(method(configmanager, creds, + node, results, element))) -def retrieve(nodes, element, configmanager, inputdata): - results = queue.LightQueue() +async def retrieve(nodes, element, configmanager, inputdata): + results = asyncio.Queue() workers = set([]) if element == ['power', 'state']: _run_method(retrieve_power, workers, results, configmanager, nodes, element) @@ -145,69 +144,69 @@ def retrieve(nodes, element, configmanager, inputdata): return while workers: try: - datum = results.get(block=True, timeout=10) + datum = await asyncio.wait_for(results.get(), 10.0) while datum: if datum: yield datum datum = results.get_nowait() - except queue.Empty: + except asyncio.QueueEmpty: pass - eventlet.sleep(0.001) + await asyncio.sleep(0) for t in list(workers): - if t.dead: + if t.done(): workers.discard(t) try: while True: datum = results.get_nowait() if datum: yield datum - except queue.Empty: + except asyncio.QueueEmpty: pass -def retrieve_inventory(configmanager, creds, node, results, element): +async def retrieve_inventory(configmanager, creds, node, results, element): if len(element) == 3: - results.put(msg.ChildCollection('all')) - results.put(msg.ChildCollection('system')) + results.put_nowait(msg.ChildCollection('all')) + results.put_nowait(msg.ChildCollection('system')) return wc = WebClient(node, configmanager, creds) - invinfo = wc.fetch('/affluent/inventory/hardware/all', results) + invinfo = await wc.fetch('/affluent/inventory/hardware/all', results) if invinfo: - results.put(msg.KeyValueData(invinfo, node)) + results.put_nowait(msg.KeyValueData(invinfo, node)) -def retrieve_firmware(configmanager, creds, node, results, element): +async def retrieve_firmware(configmanager, creds, node, results, element): if len(element) == 3: - results.put(msg.ChildCollection('all')) + results.put_nowait(msg.ChildCollection('all')) return wc = WebClient(node, configmanager, creds) - fwinfo = wc.fetch('/affluent/inventory/firmware/all', results) + fwinfo = await wc.fetch('/affluent/inventory/firmware/all', results) if fwinfo: - results.put(msg.Firmware(fwinfo, node)) + results.put_nowait(msg.Firmware(fwinfo, node)) -def list_sensors(configmanager, creds, node, results, element): +async def list_sensors(configmanager, creds, node, results, element): wc = WebClient(node, configmanager, creds) - sensors = wc.fetch('/affluent/sensors/hardware/all', results) + sensors = await wc.fetch('/affluent/sensors/hardware/all', results) for sensor in sensors['item']: - results.put(msg.ChildCollection(sensor)) + results.put_nowait(msg.ChildCollection(sensor)) -def retrieve_sensors(configmanager, creds, node, results, element): +async def retrieve_sensors(configmanager, creds, node, results, element): wc = WebClient(node, configmanager, creds) - sensors = wc.fetch('/affluent/sensors/hardware/all/{0}'.format(element[-1]), results) + sensors = await wc.fetch('/affluent/sensors/hardware/all/{0}'.format(element[-1]), results) if sensors: - results.put(msg.SensorReadings(sensors['sensors'], node)) + results.put_nowait(msg.SensorReadings(sensors['sensors'], node)) -def retrieve_power(configmanager, creds, node, results, element): +async def retrieve_power(configmanager, creds, node, results, element): wc = WebClient(node, configmanager, creds) - hinfo = wc.fetch('/affluent/health', results) + hinfo = await wc.fetch('/affluent/health', results) if hinfo: - results.put(msg.PowerState(node=node, state='on')) + results.put_nowait(msg.PowerState(node=node, state='on')) -def retrieve_health(configmanager, creds, node, results, element): +async def retrieve_health(configmanager, creds, node, results, element): wc = WebClient(node, configmanager, creds) - hinfo = wc.fetch('/affluent/health', results) + hinfo = await wc.fetch('/affluent/health', results) if hinfo: - results.put(msg.HealthSummary(hinfo.get('health', 'unknown'), name=node)) - results.put(msg.SensorReadings(hinfo.get('sensors', []), name=node)) + results.put_nowait(msg.HealthSummary(hinfo.get('health', 'unknown'), name=node)) + results.put_nowait(msg.SensorReadings(hinfo.get('sensors', []), name=node))