2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-06-17 00:50:46 +00:00

Move SRLinux support to asyncio style

This commit is contained in:
Jarrod Johnson
2026-02-19 16:08:32 -05:00
parent ce2487dcb8
commit ca0c89aa07
4 changed files with 65 additions and 57 deletions
@@ -206,11 +206,11 @@ async def _extract_neighbor_data_https(switch, user, password, cfm, lldpdata):
wc = webclient.WebConnection(
switch, 443, verifycallback=kv, timeout=5)
if backend == 'affluent':
return _extract_neighbor_data_affluent(switch, user, password, cfm, lldpdata, wc)
return await _extract_neighbor_data_affluent(switch, user, password, cfm, lldpdata, wc)
elif backend == 'nxapi':
return await _extract_neighbor_data_nxapi(switch, user, password, cfm, lldpdata, wc)
elif backend == 'srlinux':
return _extract_neighbor_data_srlinux(switch, user, password, cfm, lldpdata, wc)
return await _extract_neighbor_data_srlinux(switch, user, password, cfm, lldpdata, wc)
@@ -232,7 +232,8 @@ async def _extract_neighbor_data_nxapi(switch, user, password, cfm, lldpdata, wc
async def _extract_neighbor_data_srlinux(switch, user, password, cfm, lldpdata, wc):
cli = srlinux.SRLinuxClient(switch, user, password, cfm)
lldpinfo = cli.get_lldp()
await cli.login()
lldpinfo = await cli.get_lldp()
for port in lldpinfo:
portdata = lldpinfo[port]
peerid = '{0}.{1}'.format(
@@ -160,17 +160,19 @@ async def _fast_map_switch(args):
elif backend == 'nxapi':
return await _nxapi_map_switch(switch, password, user, cfgm)
elif backend == 'srlinux':
return _srlinux_map_switch(switch, password, user, cfgm)
return await _srlinux_map_switch(switch, password, user, cfgm)
raise Exception("No fast backend match")
async def _srlinux_map_switch(switch, password, user, cfgm):
cli = srlinux.SRLinuxClient(switch, user, password, cfgm)
await cli.login()
mt = await cli.get_mac_table()
_macsbyswitch[switch] = mt
_fast_backend_fixup(mt, switch)
async def _nxapi_map_switch(switch, password, user, cfgm):
cli = nxapi.NxApiClient(switch, user, password, cfgm)
await cli.login()
mt = await cli.get_mac_table()
_macsbyswitch[switch] = mt
_fast_backend_fixup(mt, switch)
@@ -738,20 +740,21 @@ async def rescan(cfg):
async for _ in update_macmap(cfg):
pass
async def get_stdin_reader(cloop):
async def get_stdin_reader():
cloop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await cloop.connect_read_pipe(lambda: protocol, sys.stdin)
return reader
async def offloader_main(cloop):
async def offloader_main():
try:
upacker = msgpack.Unpacker(encoding='utf8')
except TypeError:
upacker = msgpack.Unpacker(raw=False, strict_map_key=False)
#currfl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
#fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, currfl | os.O_NONBLOCK)
sreader = await get_stdin_reader(cloop)
sreader = await get_stdin_reader()
while True:
data = await sreader.read(512)
upacker.feed(data)
@@ -759,7 +762,7 @@ async def offloader_main(cloop):
tasks.spawn(_snmp_map_switch_relay(*cmd))
sys.exit(0)
async def test_main(cloop):
async def test_main():
cg = cfm.ConfigManager(None)
async for res in update_macmap(cg):
print("map has updated")
@@ -775,8 +778,7 @@ async def test_main(cloop):
print(repr(_macsbyswitch))
if __name__ == '__main__':
cloop = asyncio.get_event_loop()
if len(sys.argv) > 1 and sys.argv[1] == '-o':
cloop.run_until_complete(offloader_main(cloop))
asyncio.run(offloader_main())
sys.exit(0)
cloop.run_until_complete(test_main(cloop))
asyncio.run(test_main())
@@ -1,3 +1,4 @@
import asyncio
import confluent.util as util
import aiohmi.util.webclient as webclient
@@ -23,16 +24,15 @@ class SRLinuxClient:
self.wc = webclient.SecureHTTPConnection(switch, port=443, verifycallback=cv)
self.wc.set_basic_credentials(self.user, self.password)
self.rpc_id = 1
self.login()
def login(self):
async def login(self):
# Just a quick query to validate that credentials are correct and device is reachable and TLS works out however it is supposed to
self._get_state('/system/information')
await self._get_state('/system/information')
def _rpc_call(self, method, params=None):
async def _rpc_call(self, method, params=None):
"""Make a JSON-RPC call to SR-Linux"""
payload = {
'jsonrpc': '2.0',
@@ -44,7 +44,7 @@ class SRLinuxClient:
self.rpc_id += 1
rsp = self.wc.grab_json_response_with_status('/jsonrpc', payload)
rsp = await self.wc.grab_json_response_with_status('/jsonrpc', payload)
if rsp[1] != 200:
raise Exception(f"Failed RPC call: {method}, status: {rsp[1]}")
@@ -54,7 +54,7 @@ class SRLinuxClient:
return result.get('result', {})
def _get_state(self, path, datastore='state'):
async def _get_state(self, path, datastore='state'):
"""Get state data from SR-Linux using JSON-RPC get method"""
params = {
'commands': [
@@ -64,13 +64,13 @@ class SRLinuxClient:
}
]
}
result = self._rpc_call('get', params)
result = await self._rpc_call('get', params)
return result
def get_firmware(self):
async def get_firmware(self):
"""Get firmware/software version information"""
firmdata = {}
result = self._get_state('/system/information')
result = await self._get_state('/system/information')
for item in result:
if 'version' in item:
firmdata['SR-Linux'] = {'version': item['version']}
@@ -79,10 +79,10 @@ class SRLinuxClient:
firmdata['SR-Linux']['date'] = item['build-date']
return firmdata
def get_sensors(self):
async def get_sensors(self):
"""Get sensor readings from the device"""
sensedata = []
result = self._get_state('/platform/control/temperature')
result = await self._get_state('/platform/control/temperature')
for item in result:
for pcc in item:
currreading = {}
@@ -100,7 +100,7 @@ class SRLinuxClient:
currreading['units'] = '°C'
sensedata.append(currreading)
result = self._get_state('/platform/fan-tray')
result = await self._get_state('/platform/fan-tray')
for item in result:
for pft in item:
currreading = {}
@@ -118,7 +118,7 @@ class SRLinuxClient:
currreading['units'] = '%'
sensedata.append(currreading)
result = self._get_state('/platform/power-supply')
result = await self._get_state('/platform/power-supply')
for item in result:
for pps in item:
for reading in item[pps]:
@@ -153,9 +153,9 @@ class SRLinuxClient:
def get_health(self):
async def get_health(self):
healthdata = {'health': 'ok', 'sensors': []}
sensors = self.get_sensors()
sensors = await self.get_sensors()
for sensor in sensors:
currhealth = sensor.get('health', 'ok')
@@ -168,9 +168,9 @@ class SRLinuxClient:
return healthdata
def get_inventory(self):
async def get_inventory(self):
invdata = []
results = self._get_state('/platform/chassis')
results = await self._get_state('/platform/chassis')
for result in results:
invinfo = {'name': 'System', 'present': True}
invinfo['information'] = {'Manufacturer': 'Nokia'}
@@ -188,9 +188,9 @@ class SRLinuxClient:
invdata.append(invinfo)
return invdata
def get_mac_table(self):
async def get_mac_table(self):
macdict = {}
response = self._get_state('/network-instance/bridge-table/mac-table/mac')
response = await self._get_state('/network-instance/bridge-table/mac-table/mac')
for datum in response:
for niname in datum:
for nin in datum[niname]:
@@ -205,10 +205,10 @@ class SRLinuxClient:
macdict.setdefault(macport, []).append(macaddr)
return macdict
def get_lldp(self):
async def get_lldp(self):
lldpbyport = {}
response = self._get_state('/system/lldp/interface')
response = await self._get_state('/system/lldp/interface')
for datum in response:
for intfname in datum:
lldpallinfo = datum[intfname]
@@ -232,7 +232,7 @@ class SRLinuxClient:
return lldpbyport
if __name__ == '__main__':
async def main():
import sys
import os
from pprint import pprint
@@ -243,9 +243,13 @@ if __name__ == '__main__':
sys.exit(1)
srl = SRLinuxClient(sys.argv[1], myuser, mypass, None)
pprint(srl.get_firmware())
pprint(srl.get_inventory())
pprint(srl.get_sensors())
pprint(srl.get_health())
pprint(srl.get_lldp())
pprint(srl.get_mac_table())
await srl.login()
pprint(await srl.get_firmware())
pprint(await srl.get_inventory())
pprint(await srl.get_sensors())
pprint(await srl.get_health())
pprint(await srl.get_lldp())
pprint(await srl.get_mac_table())
if __name__ == '__main__':
asyncio.run(main())
@@ -1,13 +1,13 @@
import asyncio
import confluent.networking.srlinux as srlinux
import eventlet
import eventlet.queue as queue
import confluent.messages as msg
import traceback
import confluent.tasks as tasks
def retrieve_node(node, element, user, pwd, configmanager, inputdata, results):
async def retrieve_node(node, element, user, pwd, configmanager, inputdata, results):
try:
retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results)
await retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results)
except Exception as e:
print(traceback.format_exc())
print(repr(e))
@@ -18,19 +18,20 @@ def simplify_name(name):
'_-_', '-')
def retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results):
async def retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results):
cli = srlinux.SRLinuxClient(node, user, pwd, configmanager)
await cli.login()
if element == ['power', 'state']: # client initted successfully, must be on
results.put(msg.PowerState(node, 'on'))
elif element == ['health', 'hardware']:
hinfo = cli.get_health()
hinfo = await cli.get_health()
results.put(msg.HealthSummary(hinfo.get('health', 'unknown'), name=node))
results.put(msg.SensorReadings(hinfo.get('sensors', []), name=node))
elif element[:3] == ['inventory', 'hardware', 'all']:
if len(element) == 3:
results.put(msg.ChildCollection('all'))
return
invinfo = cli.get_inventory()
invinfo = await cli.get_inventory()
if invinfo:
results.put(msg.KeyValueData({'inventory': invinfo}, node))
elif element[:3] == ['inventory', 'firmware', 'all']:
@@ -38,16 +39,16 @@ def retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, re
results.put(msg.ChildCollection('all'))
return
fwinfo = []
for fwnam, fwdat in cli.get_firmware().items():
for fwnam, fwdat in (await cli.get_firmware()).items():
fwinfo.append({fwnam: fwdat})
if fwinfo:
results.put(msg.Firmware(fwinfo, node))
elif element == ['sensors', 'hardware', 'all']:
sensors = cli.get_sensors()
sensors = await cli.get_sensors()
for sensor in sensors:
results.put(msg.ChildCollection(simplify_name(sensor['name'])))
elif element[:3] == ['sensors', 'hardware', 'all']:
sensors = cli.get_sensors()
sensors = await cli.get_sensors()
for sensor in sensors:
if element[-1] == 'all' or simplify_name(sensor['name']) == element[-1]:
results.put(msg.SensorReadings([sensor], node))
@@ -55,8 +56,8 @@ def retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, re
print(repr(element))
def retrieve(nodes, element, configmanager, inputdata):
results = queue.LightQueue()
async def retrieve(nodes, element, configmanager, inputdata):
results = asyncio.Queue()
workers = set([])
creds = configmanager.get_node_attributes(
nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True)
@@ -72,24 +73,24 @@ def retrieve(nodes, element, configmanager, inputdata):
if not user or not pwd:
yield msg.ConfluentTargetInvalidCredentials(node)
continue
workers.add(eventlet.spawn(retrieve_node, node, element, user, pwd, configmanager, inputdata, results))
workers.add(tasks.spawn(retrieve_node, node, element, user, pwd, configmanager, inputdata, results))
while workers:
try:
datum = results.get(block=True, timeout=10)
datum = await asyncio.wait_for(results.get(), timeout=10.0)
while datum:
if datum:
yield datum
datum = results.get_nowait()
except queue.Empty:
except asyncio.QueueEmpty:
pass
eventlet.sleep(0.001)
await asyncio.sleep(0.001)
for t in list(workers):
if t.dead:
if t.done():
workers.discard(t)
try:
while True:
datum = results.get_nowait()
if datum:
yield datum
except queue.Empty:
except asyncio.QueueEmpty:
pass