2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-05-08 01:37:18 +00:00

Convert neighutil to asyncio

This commit is contained in:
Jarrod Johnson
2024-08-16 14:29:04 -04:00
parent fab6a5a757
commit bf56d40fb5
4 changed files with 31 additions and 28 deletions
@@ -409,14 +409,14 @@ async def process_dhcp6req(handler, rqv, addr, net, cfg, nodeguess):
if disco['uuid'] == '03000200-0400-0500-0006-000700080009':
# Ignore common malformed dhcpv6 request from firmware
return
mac = neighutil.get_hwaddr(ip.split('%', 1)[0])
mac = await neighutil.get_hwaddr(ip.split('%', 1)[0])
if not mac:
net.sendto(b'\x00', addr)
tries = 5
while tries and not mac:
await asyncio.sleep(0.01)
tries -= 1
mac = neighutil.get_hwaddr(ip.split('%', 1)[0])
mac = await neighutil.get_hwaddr(ip.split('%', 1)[0])
info = {'hwaddr': mac, 'uuid': disco['uuid'],
'architecture': disco['arch'], 'services': ('pxe-client',)}
if ignoredisco.get(mac, 0) + 90 < time.time():
@@ -101,12 +101,12 @@ def _parse_SrvRply(parsed):
parsed['urls'].append(url)
def _parse_slp_packet(packet, peer, rsps, xidmap, defer=None, sock=None):
async def _parse_slp_packet(packet, peer, rsps, xidmap, defer=None, sock=None):
parsed = _parse_slp_header(packet)
if not parsed:
return
addr = peer[0]
mac = neighutil.get_hwaddr(addr)
mac = await neighutil.get_hwaddr(addr)
if mac:
identifier = mac
else:
@@ -421,7 +421,7 @@ async def rescan(handler):
for addr in scanned['addresses']:
if addr in known_peers:
break
macaddr = neighutil.get_hwaddr(addr[0])
macaddr = await neighutil.get_hwaddr(addr[0])
if not macaddr:
continue
known_peers.add(addr)
@@ -510,7 +510,7 @@ async def snoop(handler, protocol=None):
continue
if peer in deferpeers:
continue
mac = neighutil.get_hwaddr(peer[0])
mac = await neighutil.get_hwaddr(peer[0])
if not mac:
probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:]
try:
@@ -550,7 +550,7 @@ async def snoop(handler, protocol=None):
event=log.Events.stacktrace)
async def process_peer(newmacs, known_peers, peerbymacaddress, peer):
mac = neighutil.get_hwaddr(peer[0])
mac = await neighutil.get_hwaddr(peer[0])
if not mac:
return
known_peers.add(peer)
@@ -589,7 +589,7 @@ async def active_scan(handler, protocol=None):
for addr in scanned['addresses']:
if addr in known_peers:
break
macaddr = neighutil.get_hwaddr(addr[0])
macaddr = await neighutil.get_hwaddr(addr[0])
if not macaddr:
continue
if not scanned.get('hwaddr', None):
@@ -646,7 +646,7 @@ async def scan(srvtypes=_slp_services, addresses=None, localonly=False):
try:
srp = await asyncio.wait_for(pktq.get(), 1.0)
sock, rsp, peer = srp
_parse_slp_packet(rsp, peer, rsps, xidmap, deferrals, sock)
await _parse_slp_packet(rsp, peer, rsps, xidmap, deferrals, sock)
except asyncio.exceptions.TimeoutError:
break
cloop.remove_reader(net)
@@ -655,7 +655,7 @@ async def scan(srvtypes=_slp_services, addresses=None, localonly=False):
await asyncio.sleep(1.2) # already have a one second pause from select above
for defer in deferrals:
rsp, peer = defer
_parse_slp_packet(rsp, peer, rsps, xidmap)
await _parse_slp_packet(rsp, peer, rsps, xidmap)
# now to analyze and flesh out the responses
handleids = set([])
tsks = []
@@ -61,7 +61,7 @@ async def active_scan(handler, protocol=None):
addr = addr[0:1] + addr[2:]
if addr in known_peers:
break
hwaddr = neighutil.get_hwaddr(addr[0])
hwaddr = await neighutil.get_hwaddr(addr[0])
if not hwaddr:
continue
if not scanned.get('hwaddr', None):
@@ -216,7 +216,7 @@ async def snoop(handler, byehandler=None, protocol=None, uuidlookup=None):
if peer in known_peers:
continue
recent_peers.add(peer)
mac = neighutil.get_hwaddr(peer[0])
mac = await neighutil.get_hwaddr(peer[0])
if mac == False:
# neighutil determined peer ip is not local, skip attempt
# to probe and critically, skip growing deferrednotifiers
@@ -312,7 +312,7 @@ async def snoop(handler, byehandler=None, protocol=None, uuidlookup=None):
await asyncio.sleep(2.2)
for peerrsp in deferrednotifies:
peer, rsp = peerrsp
mac = neighutil.get_hwaddr(peer[0])
mac = await neighutil.get_hwaddr(peer[0])
if not mac:
continue
_process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byehandler, machandlers, handler)
@@ -412,7 +412,7 @@ async def _find_service(service, target):
except asyncio.exceptions.TimeoutError:
break
s, rsp, peer = srp
if not neighutil.get_hwaddr(peer[0]):
if not await neighutil.get_hwaddr(peer[0]):
probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:]
try:
s.sendto(b'\x00', probepeer)
@@ -422,7 +422,7 @@ async def _find_service(service, target):
deferparse.append((rsp, peer))
srp = True
continue
_parse_ssdp(peer, rsp, peerdata)
await _parse_ssdp(peer, rsp, peerdata)
timeout = deadline - util.monotonic_time()
if timeout < 0:
timeout = 0
@@ -430,7 +430,7 @@ async def _find_service(service, target):
await asyncio.sleep(2.2)
for dp in deferparse:
rsp, peer = dp
_parse_ssdp(peer, rsp, peerdata)
await _parse_ssdp(peer, rsp, peerdata)
pooltargs = []
for nid in peerdata:
if peerdata[nid].get('services', [None])[0] == 'urn::service:affluent:1':
@@ -514,10 +514,10 @@ async def check_fish(urldata, port=443, verifycallback=None):
return data
return None
def _parse_ssdp(peer, rsp, peerdata):
async def _parse_ssdp(peer, rsp, peerdata):
nid = peer[0]
mac = None
mac = neighutil.get_hwaddr(peer[0])
mac = await neighutil.get_hwaddr(peer[0])
if mac:
nid = mac
headlines = rsp.split(b'\r\n')
+13 -10
View File
@@ -16,11 +16,11 @@
# A consolidated manage of neighbor table information management.
import asyncio
import confluent.netutil as netutil
import confluent.util as util
import os
import eventlet.semaphore as semaphore
import eventlet.green.socket as socket
import socket
import struct
@@ -33,24 +33,27 @@ neightime = 0
import re
neighlock = semaphore.Semaphore()
neighlock = asyncio.Lock()
def _update_neigh():
async def _update_neigh():
global neightable
global neightime
neightime = os.times()[4]
s = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, socket.NETLINK_ROUTE)
s.bind((0, 0))
s.settimeout(0)
# RTM_GETNEIGH
# nlmsghdr struct: u32 len, u16 type, u16 flags, u32 seq, u32 pid
nlhdr = b'\x1c\x00\x00\x00\x1e\x00\x01\x03\x00\x00\x00\x00\x00\x00\x00\x00'
# ndmsg struct u8 family u8 pad, u16 pad, s32 ifidx, u16 state, u8 flags, u8 type
ndmsg= b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
s.sendall(nlhdr + ndmsg)
cloop = asyncio.get_event_loop()
await cloop.sock_sendall(s, nlhdr + ndmsg)
#s.sendall(nlhdr + ndmsg)
neightable = {}
try:
while True:
pdata = s.recv(65536)
pdata = await cloop.sock_recv(s, 65536)
v = memoryview(pdata)
if struct.unpack('H', v[4:6])[0] == 3:
break
@@ -81,7 +84,7 @@ def _update_neigh():
s.close()
def get_hwaddr(ipaddr):
async def get_hwaddr(ipaddr):
if '%' in ipaddr:
ipaddr, _ = ipaddr.split('%', 1)
hwaddr = None
@@ -91,16 +94,16 @@ def get_hwaddr(ipaddr):
ipaddr = socket.inet_pton(socket.AF_INET6, ipaddr)
elif '.' in ipaddr:
ipaddr = socket.inet_pton(socket.AF_INET, ipaddr)
with neighlock:
async with neighlock:
updated = False
if os.times()[4] > (neightime + 30):
_update_neigh()
await _update_neigh()
updated = True
hwaddr = neightable.get(ipaddr, None)
if not hwaddr and not netutil.ipn_is_local(ipaddr):
hwaddr = False
if hwaddr == None and not updated:
_update_neigh()
await _update_neigh()
hwaddr = neightable.get(ipaddr, None)
if hwaddr:
hwaddr = ':'.join(['{:02x}'.format(x) for x in bytearray(hwaddr)])