From f2de24015babf4504a583f80eb3e875e3ed325ee Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 20 Feb 2026 10:17:24 -0500 Subject: [PATCH] Port mdns to the async architecture similar to ssdp --- .../confluent/discovery/protocols/mdns.py | 146 +++++++++++------- 1 file changed, 91 insertions(+), 55 deletions(-) diff --git a/confluent_server/confluent/discovery/protocols/mdns.py b/confluent_server/confluent/discovery/protocols/mdns.py index 7f63e59f..e359a9b0 100644 --- a/confluent_server/confluent/discovery/protocols/mdns.py +++ b/confluent_server/confluent/discovery/protocols/mdns.py @@ -36,15 +36,12 @@ import confluent.noderange as noderange import confluent.util as util import confluent.log as log import confluent.netutil as netutil -import eventlet -import eventlet.green.select as select -import eventlet.green.socket as socket -import eventlet.greenpool as gp +import confluent.tasks as tasks +import socket import os import time import struct import traceback -import confluent.tasks as tasks import aiohmi.util.webclient as webclient @@ -154,43 +151,58 @@ async def snoop(handler, byehandler=None, protocol=None, uuidlookup=None): # errno 98 can happen if aliased, skip for now raise peerbymacaddress = {} + newmacs = set([]) + deferrednotifies = [] + machandlers = {} + pktq = asyncio.Queue() + cloop = asyncio.get_running_loop() + cloop.add_reader(net4, _relay_pkt, net4, pktq) + cloop.add_reader(net6, _relay_pkt, net6, pktq) while True: try: - newmacs = set([]) - deferrednotifies = [] - machandlers = {} - r = select.select((net4, net6), (), (), 60) - if r: - r = r[0] - recent_peers = set([]) - while r and len(deferrednotifies) < 256: - for s in r: - (rsp, peer) = s.recvfrom(9000) - if peer in recent_peers: + newmacs.clear() + deferrednotifies.clear() + machandlers.clear() + timeout = None + srp = await pktq.get() + recent_peers.clear() + while srp and len(deferrednotifies) < 256: + srp = None + if timeout is None: + srp = await pktq.get() + else: + try: + srp = await asyncio.wait_for(pktq.get(), timeout=timeout) + except asyncio.exceptions.TimeoutError: + break + timeout = 0.2 + s, rsp, peer = srp + if peer in recent_peers: + continue + mac = await neighutil.get_hwaddr(peer[0]) + if mac == False: + continue + if not mac: + probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:] + try: + s.setblocking(1) + s.sendto(b'\x00', probepeer) + except Exception: continue - mac = neighutil.get_hwaddr(peer[0]) - if not mac: - probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:] - try: - s.sendto(b'\x00', probepeer) - except Exception: - continue - deferrednotifies.append((peer, rsp)) - datum = _process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byehandler, machandlers, handler) - if datum == 2: - recent_peers.add(peer) - r = select.select((net4, net6), (), (), 1.5) - if r: - r = r[0] + deferrednotifies.append((peer, rsp)) + continue + datum = _process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byehandler, machandlers, handler) + if datum == 2: + recent_peers.add(peer) if deferrednotifies: await asyncio.sleep(2.2) for peerrsp in deferrednotifies: peer, rsp = peerrsp - mac = neighutil.get_hwaddr(peer[0]) + mac = await neighutil.get_hwaddr(peer[0]) if not mac: continue _process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byehandler, machandlers, handler) - for mac in newmacs: + for mac in list(newmacs): thehandler = machandlers.get(mac, None) if thehandler: thehandler(peerbymacaddress[mac]) @@ -214,7 +226,15 @@ def get_sockets(): net4 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) net4.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return net4, net6 - + +def _relay_pkt(sock, pktq): + sock.setblocking(0) + try: + rsp, peer = sock.recvfrom(9000) + except socket.error: + return + pktq.put_nowait((sock, rsp, peer)) + async def active_scan(handler, protocol=None): net4, net6 = get_sockets() for idx in util.list_interface_indexes(): @@ -237,31 +257,40 @@ async def active_scan(handler, protocol=None): if se.errno != 101 and se.errno != 1: raise deadline = util.monotonic_time() + 2 - r, _, _ = select.select((net4, net6), (), (), 2) + pktq = asyncio.Queue() + cloop = asyncio.get_running_loop() + cloop.add_reader(net4, _relay_pkt, net4, pktq) + cloop.add_reader(net6, _relay_pkt, net6, pktq) peerdata = {} deferparse = [] - while r: - for s in r: - (rsp, peer) = s.recvfrom(9000) - if not neighutil.get_hwaddr(peer[0]): - probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:] - try: - s.sendto(b'\x00', probepeer) - except Exception: - continue - deferparse.append((rsp, peer)) + srp = True + timeout = 2 + while timeout and srp and len(deferparse) < 256: + try: + srp = await asyncio.wait_for(pktq.get(), timeout) + except asyncio.exceptions.TimeoutError: + break + s, rsp, peer = srp + if not await neighutil.get_hwaddr(peer[0]): + probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:] + try: + s.setblocking(1) + s.sendto(b'\x00', probepeer) + except Exception: + srp = True continue - _parse_mdns(peer, rsp, peerdata, '_obmc_console._tcp.local') + deferparse.append((rsp, peer)) + srp = True + continue + await _parse_mdns(peer, rsp, peerdata, '_obmc_console._tcp.local') timeout = deadline - util.monotonic_time() if timeout < 0: timeout = 0 - r, _, _ = select.select((net4, net6), (), (), timeout) if deferparse: await asyncio.sleep(2.2) for dp in deferparse: rsp, peer = dp - _parse_mdns(peer, rsp, peerdata, '_obmc_console._tcp.local') - querypool = gp.GreenPool() + await _parse_mdns(peer, rsp, peerdata, '_obmc_console._tcp.local') pooltargs = [] for nid in peerdata: if '/redfish/v1/' not in peerdata[nid].get('urls', ()) and '/redfish/v1' not in peerdata[nid].get('urls', ()): @@ -274,9 +303,16 @@ async def active_scan(handler, protocol=None): # or we drop support for those devices #else: # pooltargs.append(('/redfish/v1/', peerdata[nid])) - for pi in querypool.imap(check_fish, pooltargs): - if pi is not None: - handler(pi) + tsks = [] + for targ in pooltargs: + tsks.append(tasks.spawn_task(check_fish(targ))) + while tsks: + done, tsks = await asyncio.wait(tsks, return_when=asyncio.FIRST_COMPLETED) + for dt in done: + dt = await dt + if dt is None: + continue + handler(dt) async def check_fish(urldata, port=443, verifycallback=None): if not verifycallback: @@ -373,14 +409,14 @@ def _mdns_to_dict(rsp): return retval -def _parse_mdns(peer, rsp, peerdata, srvname): +async def _parse_mdns(peer, rsp, peerdata, srvname): parsed = _mdns_to_dict(rsp) if not parsed: return if parsed.get('ttl', 0) == 0: return nid = peer[0] - mac = neighutil.get_hwaddr(peer[0]) + mac = await neighutil.get_hwaddr(peer[0]) if mac: nid = mac if nid in peerdata: @@ -399,10 +435,10 @@ def _parse_mdns(peer, rsp, peerdata, srvname): peerdata[nid] = peerdatum -def _parse_ssdp(peer, rsp, peerdata): +async def _parse_ssdp(peer, rsp, peerdata): nid = peer[0] mac = None - mac = neighutil.get_hwaddr(peer[0]) + mac = await neighutil.get_hwaddr(peer[0]) if mac: nid = mac headlines = rsp.split(b'\r\n')