diff --git a/confluent_server/confluent/networking/lldp.py b/confluent_server/confluent/networking/lldp.py index 9b933461..e9239bde 100644 --- a/confluent_server/confluent/networking/lldp.py +++ b/confluent_server/confluent/networking/lldp.py @@ -206,11 +206,11 @@ async def _extract_neighbor_data_https(switch, user, password, cfm, lldpdata): wc = webclient.WebConnection( switch, 443, verifycallback=kv, timeout=5) if backend == 'affluent': - return _extract_neighbor_data_affluent(switch, user, password, cfm, lldpdata, wc) + return await _extract_neighbor_data_affluent(switch, user, password, cfm, lldpdata, wc) elif backend == 'nxapi': return await _extract_neighbor_data_nxapi(switch, user, password, cfm, lldpdata, wc) elif backend == 'srlinux': - return _extract_neighbor_data_srlinux(switch, user, password, cfm, lldpdata, wc) + return await _extract_neighbor_data_srlinux(switch, user, password, cfm, lldpdata, wc) @@ -232,7 +232,8 @@ async def _extract_neighbor_data_nxapi(switch, user, password, cfm, lldpdata, wc async def _extract_neighbor_data_srlinux(switch, user, password, cfm, lldpdata, wc): cli = srlinux.SRLinuxClient(switch, user, password, cfm) - lldpinfo = cli.get_lldp() + await cli.login() + lldpinfo = await cli.get_lldp() for port in lldpinfo: portdata = lldpinfo[port] peerid = '{0}.{1}'.format( diff --git a/confluent_server/confluent/networking/macmap.py b/confluent_server/confluent/networking/macmap.py index 4f47bb58..f1d7dec4 100644 --- a/confluent_server/confluent/networking/macmap.py +++ b/confluent_server/confluent/networking/macmap.py @@ -160,17 +160,19 @@ async def _fast_map_switch(args): elif backend == 'nxapi': return await _nxapi_map_switch(switch, password, user, cfgm) elif backend == 'srlinux': - return _srlinux_map_switch(switch, password, user, cfgm) + return await _srlinux_map_switch(switch, password, user, cfgm) raise Exception("No fast backend match") async def _srlinux_map_switch(switch, password, user, cfgm): cli = srlinux.SRLinuxClient(switch, user, password, cfgm) + await cli.login() mt = await cli.get_mac_table() _macsbyswitch[switch] = mt _fast_backend_fixup(mt, switch) async def _nxapi_map_switch(switch, password, user, cfgm): cli = nxapi.NxApiClient(switch, user, password, cfgm) + await cli.login() mt = await cli.get_mac_table() _macsbyswitch[switch] = mt _fast_backend_fixup(mt, switch) @@ -738,20 +740,21 @@ async def rescan(cfg): async for _ in update_macmap(cfg): pass -async def get_stdin_reader(cloop): +async def get_stdin_reader(): + cloop = asyncio.get_event_loop() reader = asyncio.StreamReader() protocol = asyncio.StreamReaderProtocol(reader) await cloop.connect_read_pipe(lambda: protocol, sys.stdin) return reader -async def offloader_main(cloop): +async def offloader_main(): try: upacker = msgpack.Unpacker(encoding='utf8') except TypeError: upacker = msgpack.Unpacker(raw=False, strict_map_key=False) #currfl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL) #fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, currfl | os.O_NONBLOCK) - sreader = await get_stdin_reader(cloop) + sreader = await get_stdin_reader() while True: data = await sreader.read(512) upacker.feed(data) @@ -759,7 +762,7 @@ async def offloader_main(cloop): tasks.spawn(_snmp_map_switch_relay(*cmd)) sys.exit(0) -async def test_main(cloop): +async def test_main(): cg = cfm.ConfigManager(None) async for res in update_macmap(cg): print("map has updated") @@ -775,8 +778,7 @@ async def test_main(cloop): print(repr(_macsbyswitch)) if __name__ == '__main__': - cloop = asyncio.get_event_loop() if len(sys.argv) > 1 and sys.argv[1] == '-o': - cloop.run_until_complete(offloader_main(cloop)) + asyncio.run(offloader_main()) sys.exit(0) - cloop.run_until_complete(test_main(cloop)) + asyncio.run(test_main()) diff --git a/confluent_server/confluent/networking/srlinux.py b/confluent_server/confluent/networking/srlinux.py index 059e352e..415cc964 100644 --- a/confluent_server/confluent/networking/srlinux.py +++ b/confluent_server/confluent/networking/srlinux.py @@ -1,3 +1,4 @@ +import asyncio import confluent.util as util import aiohmi.util.webclient as webclient @@ -23,16 +24,15 @@ class SRLinuxClient: self.wc = webclient.SecureHTTPConnection(switch, port=443, verifycallback=cv) self.wc.set_basic_credentials(self.user, self.password) self.rpc_id = 1 - self.login() - def login(self): + async def login(self): # Just a quick query to validate that credentials are correct and device is reachable and TLS works out however it is supposed to - self._get_state('/system/information') + await self._get_state('/system/information') - def _rpc_call(self, method, params=None): + async def _rpc_call(self, method, params=None): """Make a JSON-RPC call to SR-Linux""" payload = { 'jsonrpc': '2.0', @@ -44,7 +44,7 @@ class SRLinuxClient: self.rpc_id += 1 - rsp = self.wc.grab_json_response_with_status('/jsonrpc', payload) + rsp = await self.wc.grab_json_response_with_status('/jsonrpc', payload) if rsp[1] != 200: raise Exception(f"Failed RPC call: {method}, status: {rsp[1]}") @@ -54,7 +54,7 @@ class SRLinuxClient: return result.get('result', {}) - def _get_state(self, path, datastore='state'): + async def _get_state(self, path, datastore='state'): """Get state data from SR-Linux using JSON-RPC get method""" params = { 'commands': [ @@ -64,13 +64,13 @@ class SRLinuxClient: } ] } - result = self._rpc_call('get', params) + result = await self._rpc_call('get', params) return result - def get_firmware(self): + async def get_firmware(self): """Get firmware/software version information""" firmdata = {} - result = self._get_state('/system/information') + result = await self._get_state('/system/information') for item in result: if 'version' in item: firmdata['SR-Linux'] = {'version': item['version']} @@ -79,10 +79,10 @@ class SRLinuxClient: firmdata['SR-Linux']['date'] = item['build-date'] return firmdata - def get_sensors(self): + async def get_sensors(self): """Get sensor readings from the device""" sensedata = [] - result = self._get_state('/platform/control/temperature') + result = await self._get_state('/platform/control/temperature') for item in result: for pcc in item: currreading = {} @@ -100,7 +100,7 @@ class SRLinuxClient: currreading['units'] = '°C' sensedata.append(currreading) - result = self._get_state('/platform/fan-tray') + result = await self._get_state('/platform/fan-tray') for item in result: for pft in item: currreading = {} @@ -118,7 +118,7 @@ class SRLinuxClient: currreading['units'] = '%' sensedata.append(currreading) - result = self._get_state('/platform/power-supply') + result = await self._get_state('/platform/power-supply') for item in result: for pps in item: for reading in item[pps]: @@ -153,9 +153,9 @@ class SRLinuxClient: - def get_health(self): + async def get_health(self): healthdata = {'health': 'ok', 'sensors': []} - sensors = self.get_sensors() + sensors = await self.get_sensors() for sensor in sensors: currhealth = sensor.get('health', 'ok') @@ -168,9 +168,9 @@ class SRLinuxClient: return healthdata - def get_inventory(self): + async def get_inventory(self): invdata = [] - results = self._get_state('/platform/chassis') + results = await self._get_state('/platform/chassis') for result in results: invinfo = {'name': 'System', 'present': True} invinfo['information'] = {'Manufacturer': 'Nokia'} @@ -188,9 +188,9 @@ class SRLinuxClient: invdata.append(invinfo) return invdata - def get_mac_table(self): + async def get_mac_table(self): macdict = {} - response = self._get_state('/network-instance/bridge-table/mac-table/mac') + response = await self._get_state('/network-instance/bridge-table/mac-table/mac') for datum in response: for niname in datum: for nin in datum[niname]: @@ -205,10 +205,10 @@ class SRLinuxClient: macdict.setdefault(macport, []).append(macaddr) return macdict - def get_lldp(self): + async def get_lldp(self): lldpbyport = {} - response = self._get_state('/system/lldp/interface') + response = await self._get_state('/system/lldp/interface') for datum in response: for intfname in datum: lldpallinfo = datum[intfname] @@ -232,7 +232,7 @@ class SRLinuxClient: return lldpbyport -if __name__ == '__main__': +async def main(): import sys import os from pprint import pprint @@ -243,9 +243,13 @@ if __name__ == '__main__': sys.exit(1) srl = SRLinuxClient(sys.argv[1], myuser, mypass, None) - pprint(srl.get_firmware()) - pprint(srl.get_inventory()) - pprint(srl.get_sensors()) - pprint(srl.get_health()) - pprint(srl.get_lldp()) - pprint(srl.get_mac_table()) + await srl.login() + pprint(await srl.get_firmware()) + pprint(await srl.get_inventory()) + pprint(await srl.get_sensors()) + pprint(await srl.get_health()) + pprint(await srl.get_lldp()) + pprint(await srl.get_mac_table()) + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/confluent_server/confluent/plugins/hardwaremanagement/srlinux.py b/confluent_server/confluent/plugins/hardwaremanagement/srlinux.py index d20fe6ac..edc54cf3 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/srlinux.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/srlinux.py @@ -1,13 +1,13 @@ +import asyncio import confluent.networking.srlinux as srlinux -import eventlet -import eventlet.queue as queue import confluent.messages as msg import traceback +import confluent.tasks as tasks -def retrieve_node(node, element, user, pwd, configmanager, inputdata, results): +async def retrieve_node(node, element, user, pwd, configmanager, inputdata, results): try: - retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results) + await retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results) except Exception as e: print(traceback.format_exc()) print(repr(e)) @@ -18,19 +18,20 @@ def simplify_name(name): '_-_', '-') -def retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results): +async def retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, results): cli = srlinux.SRLinuxClient(node, user, pwd, configmanager) + await cli.login() if element == ['power', 'state']: # client initted successfully, must be on results.put(msg.PowerState(node, 'on')) elif element == ['health', 'hardware']: - hinfo = cli.get_health() + hinfo = await cli.get_health() results.put(msg.HealthSummary(hinfo.get('health', 'unknown'), name=node)) results.put(msg.SensorReadings(hinfo.get('sensors', []), name=node)) elif element[:3] == ['inventory', 'hardware', 'all']: if len(element) == 3: results.put(msg.ChildCollection('all')) return - invinfo = cli.get_inventory() + invinfo = await cli.get_inventory() if invinfo: results.put(msg.KeyValueData({'inventory': invinfo}, node)) elif element[:3] == ['inventory', 'firmware', 'all']: @@ -38,16 +39,16 @@ def retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, re results.put(msg.ChildCollection('all')) return fwinfo = [] - for fwnam, fwdat in cli.get_firmware().items(): + for fwnam, fwdat in (await cli.get_firmware()).items(): fwinfo.append({fwnam: fwdat}) if fwinfo: results.put(msg.Firmware(fwinfo, node)) elif element == ['sensors', 'hardware', 'all']: - sensors = cli.get_sensors() + sensors = await cli.get_sensors() for sensor in sensors: results.put(msg.ChildCollection(simplify_name(sensor['name']))) elif element[:3] == ['sensors', 'hardware', 'all']: - sensors = cli.get_sensors() + sensors = await cli.get_sensors() for sensor in sensors: if element[-1] == 'all' or simplify_name(sensor['name']) == element[-1]: results.put(msg.SensorReadings([sensor], node)) @@ -55,8 +56,8 @@ def retrieve_node_backend(node, element, user, pwd, configmanager, inputdata, re print(repr(element)) -def retrieve(nodes, element, configmanager, inputdata): - results = queue.LightQueue() +async def retrieve(nodes, element, configmanager, inputdata): + results = asyncio.Queue() workers = set([]) creds = configmanager.get_node_attributes( nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True) @@ -72,24 +73,24 @@ def retrieve(nodes, element, configmanager, inputdata): if not user or not pwd: yield msg.ConfluentTargetInvalidCredentials(node) continue - workers.add(eventlet.spawn(retrieve_node, node, element, user, pwd, configmanager, inputdata, results)) + workers.add(tasks.spawn(retrieve_node, node, element, user, pwd, configmanager, inputdata, results)) while workers: try: - datum = results.get(block=True, timeout=10) + datum = await asyncio.wait_for(results.get(), timeout=10.0) while datum: if datum: yield datum datum = results.get_nowait() - except queue.Empty: + except asyncio.QueueEmpty: pass - eventlet.sleep(0.001) + await asyncio.sleep(0.001) for t in list(workers): - if t.dead: + if t.done(): workers.discard(t) try: while True: datum = results.get_nowait() if datum: yield datum - except queue.Empty: + except asyncio.QueueEmpty: pass