2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-06-25 00:40:52 +00:00

Handle NXOS async and advance state of lldp/mac map with async

This commit is contained in:
Jarrod Johnson
2026-02-20 08:58:44 -05:00
parent 09089befc8
commit f28e6d32f3
5 changed files with 41 additions and 43 deletions
+1 -1
View File
@@ -1482,7 +1482,7 @@ async def handle_path(path, operation, configmanager, inputdata=None, autostrip=
return await disco.handle_api_request(
configmanager, inputdata, operation, pathcomponents)
elif pathcomponents[0] == 'networking':
return macmap.handle_api_request(
return await macmap.handle_api_request(
configmanager, inputdata, operation, pathcomponents)
elif pathcomponents[0] == 'version':
return (msg.Attributes(kv={'version': confluent.__version__}),)
@@ -363,7 +363,8 @@ async def update_switch_data(switch, configmanager, force=False, retexc=False):
async def update_neighbors(configmanager, force=False, retexc=False):
return await _update_neighbors_backend(configmanager, force, retexc)
async for ans in _update_neighbors_backend(configmanager, force, retexc):
yield ans
async def _update_neighbors_backend(configmanager, force, retexc):
@@ -463,14 +464,14 @@ def list_info(parms, requestedparameter):
results.add(_api_sanitize_string(candidate))
return [msg.ChildCollection(x + suffix) for x in util.natural_sort(results)]
def _handle_neighbor_query(pathcomponents, configmanager):
async def _handle_neighbor_query(pathcomponents, configmanager):
choices, parms, listrequested, childcoll = _parameterize_path(
pathcomponents)
if not childcoll: # this means it's a single entry with by-peerid
# guaranteed
if (parms['by-peerid'] not in _neighbypeerid and
_neighbypeerid.get('!!vintage', 0) < util.monotonic_time() - 60):
for x in update_neighbors(configmanager, retexc=True):
async for x in update_neighbors(configmanager, retexc=True):
if isinstance(x, Exception):
raise x
if parms['by-peerid'] not in _neighbypeerid:
@@ -481,9 +482,9 @@ def _handle_neighbor_query(pathcomponents, configmanager):
if listrequested not in multi_selectors | single_selectors:
raise exc.NotFoundException('{0} is not found'.format(listrequested))
if 'by-switch' in parms:
update_switch_data(parms['by-switch'], configmanager, retexc=True)
await update_switch_data(parms['by-switch'], configmanager, retexc=True)
else:
for x in update_neighbors(configmanager, retexc=True):
async for x in update_neighbors(configmanager, retexc=True):
if isinstance(x, Exception):
raise x
return list_info(parms, listrequested)
@@ -640,7 +640,7 @@ def get_node_fingerprints(nodename, configmanager):
_namesmatch)
def handle_read_api_request(pathcomponents, configmanager):
async def handle_read_api_request(pathcomponents, configmanager):
# TODO(jjohnson2): discovery core.py api handler design, apply it here
# to make this a less tangled mess as it gets extended
if len(pathcomponents) == 1:
@@ -651,7 +651,7 @@ def handle_read_api_request(pathcomponents, configmanager):
return [msg.ChildCollection(x + '/')
for x in list_switches(configmanager)]
else:
return _handle_neighbor_query(pathcomponents[2:], configmanager)
return await _handle_neighbor_query(pathcomponents[2:], configmanager)
elif len(pathcomponents) == 2:
if pathcomponents[-1] == 'macs':
return [msg.ChildCollection(x) for x in (# 'by-node/',
@@ -101,7 +101,7 @@ class NxApiClient:
raise Exception("Failed authenticating")
rsp = rsp[0]
self.authtoken = rsp['imdata'][0]['aaaLogin']['attributes']['token']
self.wc.cookies['Apic-Cookie'] = self.authtoken
self.wc.cookies.update_cookies({'APIC-cookie': self.authtoken})
self.logged = True
async def get_firmware(self):
@@ -118,7 +118,8 @@ class NxApiClient:
hwinfo = imdata['eqptCh']['children']
for component in hwinfo:
add_sensedata(component, sensedata)
return sensedata
for sd in sensedata:
yield sd
async def get_health(self):
healthdata = {'health': 'ok', 'sensors': []}
@@ -171,10 +172,10 @@ class NxApiClient:
if url in self.cachedurls:
if self.cachedurls[url][1] > time.monotonic() - cache:
return self.cachedurls[url][0]
rsp = self.wc.grab_json_response_with_status(url)
rsp = await self.wc.grab_json_response_with_status(url)
if rsp[1] == 403 and retry:
self.login()
return self.grab(url, cache, False)
await self.login()
return await self.grab(url, cache, False)
if rsp[1] != 200:
raise Exception("Error making request")
self.cachedurls[url] = rsp[0], time.monotonic()
@@ -1,14 +1,12 @@
import asyncio
import confluent.networking.nxapi as nxapi
import eventlet
import eventlet.queue as queue
import eventlet.greenpool as greenpool
import confluent.messages as msg
import traceback
def retrieve_node(node, element, user, pwd, configmanager, inputdata, results):
async def retrieve_node(node, element, user, pwd, configmanager, inputdata, results):
try:
retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results)
await retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results)
except Exception as e:
print(traceback.format_exc())
print(repr(e))
@@ -17,45 +15,43 @@ def simplify_name(name):
return name.lower().replace(' ', '_').replace('/', '-').replace(
'_-_', '-')
def retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results):
async def retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results):
cli = nxapi.NxApiClient(node, user, pwd, configmanager)
if element == ['power', 'state']: # client initted successfully, must be on
results.put(msg.PowerState(node, 'on'))
await results.put(msg.PowerState(node, 'on'))
elif element == ['health', 'hardware']:
hinfo = cli.get_health()
results.put(msg.HealthSummary(hinfo.get('health', 'unknown'), name=node))
results.put(msg.SensorReadings(hinfo.get('sensors', []), name=node))
hinfo = await cli.get_health()
await results.put(msg.HealthSummary(hinfo.get('health', 'unknown'), name=node))
await results.put(msg.SensorReadings(hinfo.get('sensors', []), name=node))
elif element[:3] == ['inventory', 'hardware', 'all']:
if len(element) == 3:
results.put(msg.ChildCollection('all'))
await results.put(msg.ChildCollection('all'))
return
invinfo = cli.get_inventory()
invinfo = await cli.get_inventory()
if invinfo:
results.put(msg.KeyValueData({'inventory': invinfo}, node))
await results.put(msg.KeyValueData({'inventory': invinfo}, node))
elif element[:3] == ['inventory', 'firmware', 'all']:
if len(element) == 3:
results.put(msg.ChildCollection('all'))
await results.put(msg.ChildCollection('all'))
return
fwinfo = []
for fwnam, fwdat in cli.get_firmware().items():
for fwnam, fwdat in (await cli.get_firmware()).items():
fwinfo.append({fwnam: fwdat})
if fwinfo:
results.put(msg.Firmware(fwinfo, node))
await results.put(msg.Firmware(fwinfo, node))
elif element == ['sensors', 'hardware', 'all']:
sensors = cli.get_sensors()
for sensor in sensors:
results.put(msg.ChildCollection(simplify_name(sensor['name'])))
async for sensor in cli.get_sensors():
await results.put(msg.ChildCollection(simplify_name(sensor['name'])))
elif element[:3] == ['sensors', 'hardware', 'all']:
sensors = cli.get_sensors()
for sensor in sensors:
async for sensor in cli.get_sensors():
if element[-1] == 'all' or simplify_name(sensor['name']) == element[-1]:
results.put(msg.SensorReadings([sensor], node))
await results.put(msg.SensorReadings([sensor], node))
else:
print(repr(element))
def retrieve(nodes, element, configmanager, inputdata):
results = queue.LightQueue()
async def retrieve(nodes, element, configmanager, inputdata):
results = asyncio.Queue()
workers = set([])
creds = configmanager.get_node_attributes(
nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True)
@@ -71,25 +67,25 @@ def retrieve(nodes, element, configmanager, inputdata):
if not user or not pwd:
yield msg.ConfluentTargetInvalidCredentials(node)
continue
workers.add(eventlet.spawn(retrieve_node, node, element, user, pwd, configmanager, inputdata, results))
workers.add(asyncio.create_task(retrieve_node(node, element, user, pwd, configmanager, inputdata, results)))
while workers:
try:
datum = results.get(block=True, timeout=10)
datum = await asyncio.wait_for(results.get(), timeout=10.0)
while datum:
if datum:
yield datum
datum = results.get_nowait()
except queue.Empty:
except asyncio.QueueEmpty:
pass
eventlet.sleep(0.001)
await asyncio.sleep(0.001)
for t in list(workers):
if t.dead:
if t.done():
workers.discard(t)
try:
while True:
datum = results.get_nowait()
if datum:
yield datum
except queue.Empty:
except asyncio.QueueEmpty:
pass