2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-02-25 01:39:22 +00:00

Port mdns to the async architecture similar to ssdp

This commit is contained in:
Jarrod Johnson
2026-02-20 10:17:24 -05:00
parent 9b2eaa90b5
commit f2de24015b

View File

@@ -36,15 +36,12 @@ import confluent.noderange as noderange
import confluent.util as util
import confluent.log as log
import confluent.netutil as netutil
import eventlet
import eventlet.green.select as select
import eventlet.green.socket as socket
import eventlet.greenpool as gp
import confluent.tasks as tasks
import socket
import os
import time
import struct
import traceback
import confluent.tasks as tasks
import aiohmi.util.webclient as webclient
@@ -154,43 +151,58 @@ async def snoop(handler, byehandler=None, protocol=None, uuidlookup=None):
# errno 98 can happen if aliased, skip for now
raise
peerbymacaddress = {}
newmacs = set([])
deferrednotifies = []
machandlers = {}
pktq = asyncio.Queue()
cloop = asyncio.get_running_loop()
cloop.add_reader(net4, _relay_pkt, net4, pktq)
cloop.add_reader(net6, _relay_pkt, net6, pktq)
while True:
try:
newmacs = set([])
deferrednotifies = []
machandlers = {}
r = select.select((net4, net6), (), (), 60)
if r:
r = r[0]
recent_peers = set([])
while r and len(deferrednotifies) < 256:
for s in r:
(rsp, peer) = s.recvfrom(9000)
if peer in recent_peers:
newmacs.clear()
deferrednotifies.clear()
machandlers.clear()
timeout = None
srp = await pktq.get()
recent_peers.clear()
while srp and len(deferrednotifies) < 256:
srp = None
if timeout is None:
srp = await pktq.get()
else:
try:
srp = await asyncio.wait_for(pktq.get(), timeout=timeout)
except asyncio.exceptions.TimeoutError:
break
timeout = 0.2
s, rsp, peer = srp
if peer in recent_peers:
continue
mac = await neighutil.get_hwaddr(peer[0])
if mac == False:
continue
if not mac:
probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:]
try:
s.setblocking(1)
s.sendto(b'\x00', probepeer)
except Exception:
continue
mac = neighutil.get_hwaddr(peer[0])
if not mac:
probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:]
try:
s.sendto(b'\x00', probepeer)
except Exception:
continue
deferrednotifies.append((peer, rsp))
datum = _process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byehandler, machandlers, handler)
if datum == 2:
recent_peers.add(peer)
r = select.select((net4, net6), (), (), 1.5)
if r:
r = r[0]
deferrednotifies.append((peer, rsp))
continue
datum = _process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byehandler, machandlers, handler)
if datum == 2:
recent_peers.add(peer)
if deferrednotifies:
await asyncio.sleep(2.2)
for peerrsp in deferrednotifies:
peer, rsp = peerrsp
mac = neighutil.get_hwaddr(peer[0])
mac = await neighutil.get_hwaddr(peer[0])
if not mac:
continue
_process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byehandler, machandlers, handler)
for mac in newmacs:
for mac in list(newmacs):
thehandler = machandlers.get(mac, None)
if thehandler:
thehandler(peerbymacaddress[mac])
@@ -214,7 +226,15 @@ def get_sockets():
net4 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
net4.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return net4, net6
def _relay_pkt(sock, pktq):
sock.setblocking(0)
try:
rsp, peer = sock.recvfrom(9000)
except socket.error:
return
pktq.put_nowait((sock, rsp, peer))
async def active_scan(handler, protocol=None):
net4, net6 = get_sockets()
for idx in util.list_interface_indexes():
@@ -237,31 +257,40 @@ async def active_scan(handler, protocol=None):
if se.errno != 101 and se.errno != 1:
raise
deadline = util.monotonic_time() + 2
r, _, _ = select.select((net4, net6), (), (), 2)
pktq = asyncio.Queue()
cloop = asyncio.get_running_loop()
cloop.add_reader(net4, _relay_pkt, net4, pktq)
cloop.add_reader(net6, _relay_pkt, net6, pktq)
peerdata = {}
deferparse = []
while r:
for s in r:
(rsp, peer) = s.recvfrom(9000)
if not neighutil.get_hwaddr(peer[0]):
probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:]
try:
s.sendto(b'\x00', probepeer)
except Exception:
continue
deferparse.append((rsp, peer))
srp = True
timeout = 2
while timeout and srp and len(deferparse) < 256:
try:
srp = await asyncio.wait_for(pktq.get(), timeout)
except asyncio.exceptions.TimeoutError:
break
s, rsp, peer = srp
if not await neighutil.get_hwaddr(peer[0]):
probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:]
try:
s.setblocking(1)
s.sendto(b'\x00', probepeer)
except Exception:
srp = True
continue
_parse_mdns(peer, rsp, peerdata, '_obmc_console._tcp.local')
deferparse.append((rsp, peer))
srp = True
continue
await _parse_mdns(peer, rsp, peerdata, '_obmc_console._tcp.local')
timeout = deadline - util.monotonic_time()
if timeout < 0:
timeout = 0
r, _, _ = select.select((net4, net6), (), (), timeout)
if deferparse:
await asyncio.sleep(2.2)
for dp in deferparse:
rsp, peer = dp
_parse_mdns(peer, rsp, peerdata, '_obmc_console._tcp.local')
querypool = gp.GreenPool()
await _parse_mdns(peer, rsp, peerdata, '_obmc_console._tcp.local')
pooltargs = []
for nid in peerdata:
if '/redfish/v1/' not in peerdata[nid].get('urls', ()) and '/redfish/v1' not in peerdata[nid].get('urls', ()):
@@ -274,9 +303,16 @@ async def active_scan(handler, protocol=None):
# or we drop support for those devices
#else:
# pooltargs.append(('/redfish/v1/', peerdata[nid]))
for pi in querypool.imap(check_fish, pooltargs):
if pi is not None:
handler(pi)
tsks = []
for targ in pooltargs:
tsks.append(tasks.spawn_task(check_fish(targ)))
while tsks:
done, tsks = await asyncio.wait(tsks, return_when=asyncio.FIRST_COMPLETED)
for dt in done:
dt = await dt
if dt is None:
continue
handler(dt)
async def check_fish(urldata, port=443, verifycallback=None):
if not verifycallback:
@@ -373,14 +409,14 @@ def _mdns_to_dict(rsp):
return retval
def _parse_mdns(peer, rsp, peerdata, srvname):
async def _parse_mdns(peer, rsp, peerdata, srvname):
parsed = _mdns_to_dict(rsp)
if not parsed:
return
if parsed.get('ttl', 0) == 0:
return
nid = peer[0]
mac = neighutil.get_hwaddr(peer[0])
mac = await neighutil.get_hwaddr(peer[0])
if mac:
nid = mac
if nid in peerdata:
@@ -399,10 +435,10 @@ def _parse_mdns(peer, rsp, peerdata, srvname):
peerdata[nid] = peerdatum
def _parse_ssdp(peer, rsp, peerdata):
async def _parse_ssdp(peer, rsp, peerdata):
nid = peer[0]
mac = None
mac = neighutil.get_hwaddr(peer[0])
mac = await neighutil.get_hwaddr(peer[0])
if mac:
nid = mac
headlines = rsp.split(b'\r\n')