diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index c1db2bc7..893793e1 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -1482,7 +1482,7 @@ async def handle_path(path, operation, configmanager, inputdata=None, autostrip= return await disco.handle_api_request( configmanager, inputdata, operation, pathcomponents) elif pathcomponents[0] == 'networking': - return macmap.handle_api_request( + return await macmap.handle_api_request( configmanager, inputdata, operation, pathcomponents) elif pathcomponents[0] == 'version': return (msg.Attributes(kv={'version': confluent.__version__}),) diff --git a/confluent_server/confluent/networking/lldp.py b/confluent_server/confluent/networking/lldp.py index e9239bde..3bc0a5c0 100644 --- a/confluent_server/confluent/networking/lldp.py +++ b/confluent_server/confluent/networking/lldp.py @@ -363,7 +363,8 @@ async def update_switch_data(switch, configmanager, force=False, retexc=False): async def update_neighbors(configmanager, force=False, retexc=False): - return await _update_neighbors_backend(configmanager, force, retexc) + async for ans in _update_neighbors_backend(configmanager, force, retexc): + yield ans async def _update_neighbors_backend(configmanager, force, retexc): @@ -463,14 +464,14 @@ def list_info(parms, requestedparameter): results.add(_api_sanitize_string(candidate)) return [msg.ChildCollection(x + suffix) for x in util.natural_sort(results)] -def _handle_neighbor_query(pathcomponents, configmanager): +async def _handle_neighbor_query(pathcomponents, configmanager): choices, parms, listrequested, childcoll = _parameterize_path( pathcomponents) if not childcoll: # this means it's a single entry with by-peerid # guaranteed if (parms['by-peerid'] not in _neighbypeerid and _neighbypeerid.get('!!vintage', 0) < util.monotonic_time() - 60): - for x in update_neighbors(configmanager, retexc=True): + async for x in update_neighbors(configmanager, retexc=True): if isinstance(x, Exception): raise x if parms['by-peerid'] not in _neighbypeerid: @@ -481,9 +482,9 @@ def _handle_neighbor_query(pathcomponents, configmanager): if listrequested not in multi_selectors | single_selectors: raise exc.NotFoundException('{0} is not found'.format(listrequested)) if 'by-switch' in parms: - update_switch_data(parms['by-switch'], configmanager, retexc=True) + await update_switch_data(parms['by-switch'], configmanager, retexc=True) else: - for x in update_neighbors(configmanager, retexc=True): + async for x in update_neighbors(configmanager, retexc=True): if isinstance(x, Exception): raise x return list_info(parms, listrequested) diff --git a/confluent_server/confluent/networking/macmap.py b/confluent_server/confluent/networking/macmap.py index f1d7dec4..c75b0c8d 100644 --- a/confluent_server/confluent/networking/macmap.py +++ b/confluent_server/confluent/networking/macmap.py @@ -640,7 +640,7 @@ def get_node_fingerprints(nodename, configmanager): _namesmatch) -def handle_read_api_request(pathcomponents, configmanager): +async def handle_read_api_request(pathcomponents, configmanager): # TODO(jjohnson2): discovery core.py api handler design, apply it here # to make this a less tangled mess as it gets extended if len(pathcomponents) == 1: @@ -651,7 +651,7 @@ def handle_read_api_request(pathcomponents, configmanager): return [msg.ChildCollection(x + '/') for x in list_switches(configmanager)] else: - return _handle_neighbor_query(pathcomponents[2:], configmanager) + return await _handle_neighbor_query(pathcomponents[2:], configmanager) elif len(pathcomponents) == 2: if pathcomponents[-1] == 'macs': return [msg.ChildCollection(x) for x in (# 'by-node/', diff --git a/confluent_server/confluent/networking/nxapi.py b/confluent_server/confluent/networking/nxapi.py index f6f89674..0a63bc5b 100644 --- a/confluent_server/confluent/networking/nxapi.py +++ b/confluent_server/confluent/networking/nxapi.py @@ -101,7 +101,7 @@ class NxApiClient: raise Exception("Failed authenticating") rsp = rsp[0] self.authtoken = rsp['imdata'][0]['aaaLogin']['attributes']['token'] - self.wc.cookies['Apic-Cookie'] = self.authtoken + self.wc.cookies.update_cookies({'APIC-cookie': self.authtoken}) self.logged = True async def get_firmware(self): @@ -118,7 +118,8 @@ class NxApiClient: hwinfo = imdata['eqptCh']['children'] for component in hwinfo: add_sensedata(component, sensedata) - return sensedata + for sd in sensedata: + yield sd async def get_health(self): healthdata = {'health': 'ok', 'sensors': []} @@ -171,10 +172,10 @@ class NxApiClient: if url in self.cachedurls: if self.cachedurls[url][1] > time.monotonic() - cache: return self.cachedurls[url][0] - rsp = self.wc.grab_json_response_with_status(url) + rsp = await self.wc.grab_json_response_with_status(url) if rsp[1] == 403 and retry: - self.login() - return self.grab(url, cache, False) + await self.login() + return await self.grab(url, cache, False) if rsp[1] != 200: raise Exception("Error making request") self.cachedurls[url] = rsp[0], time.monotonic() diff --git a/confluent_server/confluent/plugins/hardwaremanagement/nxos.py b/confluent_server/confluent/plugins/hardwaremanagement/nxos.py index df5779e8..fb66b42d 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/nxos.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/nxos.py @@ -1,14 +1,12 @@ +import asyncio import confluent.networking.nxapi as nxapi -import eventlet -import eventlet.queue as queue -import eventlet.greenpool as greenpool import confluent.messages as msg import traceback -def retrieve_node(node, element, user, pwd, configmanager, inputdata, results): +async def retrieve_node(node, element, user, pwd, configmanager, inputdata, results): try: - retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results) + await retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results) except Exception as e: print(traceback.format_exc()) print(repr(e)) @@ -17,45 +15,43 @@ def simplify_name(name): return name.lower().replace(' ', '_').replace('/', '-').replace( '_-_', '-') -def retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results): +async def retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results): cli = nxapi.NxApiClient(node, user, pwd, configmanager) if element == ['power', 'state']: # client initted successfully, must be on - results.put(msg.PowerState(node, 'on')) + await results.put(msg.PowerState(node, 'on')) elif element == ['health', 'hardware']: - hinfo = cli.get_health() - results.put(msg.HealthSummary(hinfo.get('health', 'unknown'), name=node)) - results.put(msg.SensorReadings(hinfo.get('sensors', []), name=node)) + hinfo = await cli.get_health() + await results.put(msg.HealthSummary(hinfo.get('health', 'unknown'), name=node)) + await results.put(msg.SensorReadings(hinfo.get('sensors', []), name=node)) elif element[:3] == ['inventory', 'hardware', 'all']: if len(element) == 3: - results.put(msg.ChildCollection('all')) + await results.put(msg.ChildCollection('all')) return - invinfo = cli.get_inventory() + invinfo = await cli.get_inventory() if invinfo: - results.put(msg.KeyValueData({'inventory': invinfo}, node)) + await results.put(msg.KeyValueData({'inventory': invinfo}, node)) elif element[:3] == ['inventory', 'firmware', 'all']: if len(element) == 3: - results.put(msg.ChildCollection('all')) + await results.put(msg.ChildCollection('all')) return fwinfo = [] - for fwnam, fwdat in cli.get_firmware().items(): + for fwnam, fwdat in (await cli.get_firmware()).items(): fwinfo.append({fwnam: fwdat}) if fwinfo: - results.put(msg.Firmware(fwinfo, node)) + await results.put(msg.Firmware(fwinfo, node)) elif element == ['sensors', 'hardware', 'all']: - sensors = cli.get_sensors() - for sensor in sensors: - results.put(msg.ChildCollection(simplify_name(sensor['name']))) + async for sensor in cli.get_sensors(): + await results.put(msg.ChildCollection(simplify_name(sensor['name']))) elif element[:3] == ['sensors', 'hardware', 'all']: - sensors = cli.get_sensors() - for sensor in sensors: + async for sensor in cli.get_sensors(): if element[-1] == 'all' or simplify_name(sensor['name']) == element[-1]: - results.put(msg.SensorReadings([sensor], node)) + await results.put(msg.SensorReadings([sensor], node)) else: print(repr(element)) -def retrieve(nodes, element, configmanager, inputdata): - results = queue.LightQueue() +async def retrieve(nodes, element, configmanager, inputdata): + results = asyncio.Queue() workers = set([]) creds = configmanager.get_node_attributes( nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True) @@ -71,25 +67,25 @@ def retrieve(nodes, element, configmanager, inputdata): if not user or not pwd: yield msg.ConfluentTargetInvalidCredentials(node) continue - workers.add(eventlet.spawn(retrieve_node, node, element, user, pwd, configmanager, inputdata, results)) + workers.add(asyncio.create_task(retrieve_node(node, element, user, pwd, configmanager, inputdata, results))) while workers: try: - datum = results.get(block=True, timeout=10) + datum = await asyncio.wait_for(results.get(), timeout=10.0) while datum: if datum: yield datum datum = results.get_nowait() - except queue.Empty: + except asyncio.QueueEmpty: pass - eventlet.sleep(0.001) + await asyncio.sleep(0.001) for t in list(workers): - if t.dead: + if t.done(): workers.discard(t) try: while True: datum = results.get_nowait() if datum: yield datum - except queue.Empty: + except asyncio.QueueEmpty: pass