From bf56d40fb5899953d0514748098c9a55c80dbea5 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 16 Aug 2024 14:29:04 -0400 Subject: [PATCH] Convert neighutil to asyncio --- .../confluent/discovery/protocols/pxe.py | 4 ++-- .../confluent/discovery/protocols/slp.py | 16 ++++++------- .../confluent/discovery/protocols/ssdp.py | 16 ++++++------- confluent_server/confluent/neighutil.py | 23 +++++++++++-------- 4 files changed, 31 insertions(+), 28 deletions(-) diff --git a/confluent_server/confluent/discovery/protocols/pxe.py b/confluent_server/confluent/discovery/protocols/pxe.py index ec3db84e..ea6739e6 100644 --- a/confluent_server/confluent/discovery/protocols/pxe.py +++ b/confluent_server/confluent/discovery/protocols/pxe.py @@ -409,14 +409,14 @@ async def process_dhcp6req(handler, rqv, addr, net, cfg, nodeguess): if disco['uuid'] == '03000200-0400-0500-0006-000700080009': # Ignore common malformed dhcpv6 request from firmware return - mac = neighutil.get_hwaddr(ip.split('%', 1)[0]) + mac = await neighutil.get_hwaddr(ip.split('%', 1)[0]) if not mac: net.sendto(b'\x00', addr) tries = 5 while tries and not mac: await asyncio.sleep(0.01) tries -= 1 - mac = neighutil.get_hwaddr(ip.split('%', 1)[0]) + mac = await neighutil.get_hwaddr(ip.split('%', 1)[0]) info = {'hwaddr': mac, 'uuid': disco['uuid'], 'architecture': disco['arch'], 'services': ('pxe-client',)} if ignoredisco.get(mac, 0) + 90 < time.time(): diff --git a/confluent_server/confluent/discovery/protocols/slp.py b/confluent_server/confluent/discovery/protocols/slp.py index 40c9d936..76d534fd 100644 --- a/confluent_server/confluent/discovery/protocols/slp.py +++ b/confluent_server/confluent/discovery/protocols/slp.py @@ -101,12 +101,12 @@ def _parse_SrvRply(parsed): parsed['urls'].append(url) -def _parse_slp_packet(packet, peer, rsps, xidmap, defer=None, sock=None): +async def _parse_slp_packet(packet, peer, rsps, xidmap, defer=None, sock=None): parsed = _parse_slp_header(packet) if not parsed: return addr = peer[0] - mac = neighutil.get_hwaddr(addr) + mac = await neighutil.get_hwaddr(addr) if mac: identifier = mac else: @@ -421,7 +421,7 @@ async def rescan(handler): for addr in scanned['addresses']: if addr in known_peers: break - macaddr = neighutil.get_hwaddr(addr[0]) + macaddr = await neighutil.get_hwaddr(addr[0]) if not macaddr: continue known_peers.add(addr) @@ -510,7 +510,7 @@ async def snoop(handler, protocol=None): continue if peer in deferpeers: continue - mac = neighutil.get_hwaddr(peer[0]) + mac = await neighutil.get_hwaddr(peer[0]) if not mac: probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:] try: @@ -550,7 +550,7 @@ async def snoop(handler, protocol=None): event=log.Events.stacktrace) async def process_peer(newmacs, known_peers, peerbymacaddress, peer): - mac = neighutil.get_hwaddr(peer[0]) + mac = await neighutil.get_hwaddr(peer[0]) if not mac: return known_peers.add(peer) @@ -589,7 +589,7 @@ async def active_scan(handler, protocol=None): for addr in scanned['addresses']: if addr in known_peers: break - macaddr = neighutil.get_hwaddr(addr[0]) + macaddr = await neighutil.get_hwaddr(addr[0]) if not macaddr: continue if not scanned.get('hwaddr', None): @@ -646,7 +646,7 @@ async def scan(srvtypes=_slp_services, addresses=None, localonly=False): try: srp = await asyncio.wait_for(pktq.get(), 1.0) sock, rsp, peer = srp - _parse_slp_packet(rsp, peer, rsps, xidmap, deferrals, sock) + await _parse_slp_packet(rsp, peer, rsps, xidmap, deferrals, sock) except asyncio.exceptions.TimeoutError: break cloop.remove_reader(net) @@ -655,7 +655,7 @@ async def scan(srvtypes=_slp_services, addresses=None, localonly=False): await asyncio.sleep(1.2) # already have a one second pause from select above for defer in deferrals: rsp, peer = defer - _parse_slp_packet(rsp, peer, rsps, xidmap) + await _parse_slp_packet(rsp, peer, rsps, xidmap) # now to analyze and flesh out the responses handleids = set([]) tsks = [] diff --git a/confluent_server/confluent/discovery/protocols/ssdp.py b/confluent_server/confluent/discovery/protocols/ssdp.py index 1c8813cc..c54a56b2 100644 --- a/confluent_server/confluent/discovery/protocols/ssdp.py +++ b/confluent_server/confluent/discovery/protocols/ssdp.py @@ -61,7 +61,7 @@ async def active_scan(handler, protocol=None): addr = addr[0:1] + addr[2:] if addr in known_peers: break - hwaddr = neighutil.get_hwaddr(addr[0]) + hwaddr = await neighutil.get_hwaddr(addr[0]) if not hwaddr: continue if not scanned.get('hwaddr', None): @@ -216,7 +216,7 @@ async def snoop(handler, byehandler=None, protocol=None, uuidlookup=None): if peer in known_peers: continue recent_peers.add(peer) - mac = neighutil.get_hwaddr(peer[0]) + mac = await neighutil.get_hwaddr(peer[0]) if mac == False: # neighutil determined peer ip is not local, skip attempt # to probe and critically, skip growing deferrednotifiers @@ -312,7 +312,7 @@ async def snoop(handler, byehandler=None, protocol=None, uuidlookup=None): await asyncio.sleep(2.2) for peerrsp in deferrednotifies: peer, rsp = peerrsp - mac = neighutil.get_hwaddr(peer[0]) + mac = await neighutil.get_hwaddr(peer[0]) if not mac: continue _process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byehandler, machandlers, handler) @@ -412,7 +412,7 @@ async def _find_service(service, target): except asyncio.exceptions.TimeoutError: break s, rsp, peer = srp - if not neighutil.get_hwaddr(peer[0]): + if not await neighutil.get_hwaddr(peer[0]): probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:] try: s.sendto(b'\x00', probepeer) @@ -422,7 +422,7 @@ async def _find_service(service, target): deferparse.append((rsp, peer)) srp = True continue - _parse_ssdp(peer, rsp, peerdata) + await _parse_ssdp(peer, rsp, peerdata) timeout = deadline - util.monotonic_time() if timeout < 0: timeout = 0 @@ -430,7 +430,7 @@ async def _find_service(service, target): await asyncio.sleep(2.2) for dp in deferparse: rsp, peer = dp - _parse_ssdp(peer, rsp, peerdata) + await _parse_ssdp(peer, rsp, peerdata) pooltargs = [] for nid in peerdata: if peerdata[nid].get('services', [None])[0] == 'urn::service:affluent:1': @@ -514,10 +514,10 @@ async def check_fish(urldata, port=443, verifycallback=None): return data return None -def _parse_ssdp(peer, rsp, peerdata): +async def _parse_ssdp(peer, rsp, peerdata): nid = peer[0] mac = None - mac = neighutil.get_hwaddr(peer[0]) + mac = await neighutil.get_hwaddr(peer[0]) if mac: nid = mac headlines = rsp.split(b'\r\n') diff --git a/confluent_server/confluent/neighutil.py b/confluent_server/confluent/neighutil.py index cd08e8bd..f6210262 100644 --- a/confluent_server/confluent/neighutil.py +++ b/confluent_server/confluent/neighutil.py @@ -16,11 +16,11 @@ # A consolidated manage of neighbor table information management. +import asyncio import confluent.netutil as netutil import confluent.util as util import os -import eventlet.semaphore as semaphore -import eventlet.green.socket as socket +import socket import struct @@ -33,24 +33,27 @@ neightime = 0 import re -neighlock = semaphore.Semaphore() +neighlock = asyncio.Lock() -def _update_neigh(): +async def _update_neigh(): global neightable global neightime neightime = os.times()[4] s = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, socket.NETLINK_ROUTE) s.bind((0, 0)) + s.settimeout(0) # RTM_GETNEIGH # nlmsghdr struct: u32 len, u16 type, u16 flags, u32 seq, u32 pid nlhdr = b'\x1c\x00\x00\x00\x1e\x00\x01\x03\x00\x00\x00\x00\x00\x00\x00\x00' # ndmsg struct u8 family u8 pad, u16 pad, s32 ifidx, u16 state, u8 flags, u8 type ndmsg= b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' - s.sendall(nlhdr + ndmsg) + cloop = asyncio.get_event_loop() + await cloop.sock_sendall(s, nlhdr + ndmsg) + #s.sendall(nlhdr + ndmsg) neightable = {} try: while True: - pdata = s.recv(65536) + pdata = await cloop.sock_recv(s, 65536) v = memoryview(pdata) if struct.unpack('H', v[4:6])[0] == 3: break @@ -81,7 +84,7 @@ def _update_neigh(): s.close() -def get_hwaddr(ipaddr): +async def get_hwaddr(ipaddr): if '%' in ipaddr: ipaddr, _ = ipaddr.split('%', 1) hwaddr = None @@ -91,16 +94,16 @@ def get_hwaddr(ipaddr): ipaddr = socket.inet_pton(socket.AF_INET6, ipaddr) elif '.' in ipaddr: ipaddr = socket.inet_pton(socket.AF_INET, ipaddr) - with neighlock: + async with neighlock: updated = False if os.times()[4] > (neightime + 30): - _update_neigh() + await _update_neigh() updated = True hwaddr = neightable.get(ipaddr, None) if not hwaddr and not netutil.ipn_is_local(ipaddr): hwaddr = False if hwaddr == None and not updated: - _update_neigh() + await _update_neigh() hwaddr = neightable.get(ipaddr, None) if hwaddr: hwaddr = ':'.join(['{:02x}'.format(x) for x in bytearray(hwaddr)])