From 5fd0cf2b0b92e2301712ee095fefda1c478e8624 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 8 May 2024 17:18:07 -0400 Subject: [PATCH] Begin conversion of pxe to asyncio Also convert to 'natural' recvmsg now that we are requiring python high enough to have it. --- .../confluent/discovery/protocols/pxe.py | 90 +++++++++---------- 1 file changed, 43 insertions(+), 47 deletions(-) diff --git a/confluent_server/confluent/discovery/protocols/pxe.py b/confluent_server/confluent/discovery/protocols/pxe.py index 4a39654f..b4c98627 100644 --- a/confluent_server/confluent/discovery/protocols/pxe.py +++ b/confluent_server/confluent/discovery/protocols/pxe.py @@ -22,6 +22,7 @@ # option 97 = UUID (wireformat) +import asyncio import confluent.config.configmanager as cfm import confluent.collective.manager as collective import confluent.noderange as noderange @@ -273,58 +274,49 @@ def opts_to_dict(rq, optidx, expectype=1): def ipfromint(numb): return socket.inet_ntoa(struct.pack('I', numb)) -def proxydhcp(handler, nodeguess): +def relay_proxydhcp(sock, pktq): + sock.setblocking(0) + data, cmsgs, flags, peer = sock.recvmsg(9000, 9000) + if len(data) < 240: + return + try: + optidx = data.index(b'\x63\x82\x53\x63') + 4 + except ValueError: + return + for cmsg in cmsgs: + level, typ, cdata = cmsg + if level == socket.IPPROTO_IP and typ == IP_PKTINFO: + idx, recv = struct.unpack('II', cdata[:8]) + recv = ipfromint(recv) + break + else: + return + rq = memoryview(data) + hwlen = rq[2] + opts, disco = opts_to_dict(rq, optidx, 3) + disco['hwaddr'] = ':'.join(['{0:02x}'.format(x) for x in rq[28:28+hwlen]]) + node = None + if disco.get('hwaddr', None) in macmap: + node = macmap[disco['hwaddr']] + elif disco.get('uuid', None) in uuidmap: + node = uuidmap[disco['uuid']] + myipn = myipbypeer.get(data[28:28+hwlen], None) + skiplogging = True + pktq.put_nowait((disco, peer, myipn, idx, recv, node, opts, data)) + + +async def proxydhcp(handler, nodeguess): net4011 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) net4011.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) net4011.setsockopt(socket.IPPROTO_IP, IP_PKTINFO, 1) net4011.bind(('', 4011)) - rp = bytearray(300) - rpv = memoryview(rp) - rq = bytearray(2048) - data = pkttype.from_buffer(rq) - msg = msghdr() - cmsgarr = bytearray(cmsgsize) - cmsg = cmsgtype.from_buffer(cmsgarr) - iov = iovec() - iov.iov_base = ctypes.addressof(data) - iov.iov_len = 2048 - msg.msg_iov = ctypes.pointer(iov) - msg.msg_iovlen = 1 - msg.msg_control = ctypes.addressof(cmsg) - msg.msg_controllen = ctypes.sizeof(cmsg) - clientaddr = sockaddr_in() - msg.msg_name = ctypes.addressof(clientaddr) - msg.msg_namelen = ctypes.sizeof(clientaddr) + cloop = asyncio.get_running_loop() + pktq = asyncio.Queue() + cloop.add_reader(net4011, relay_proxydhcp, net4011, pktq) cfg = cfm.ConfigManager(None) while True: try: - ready = select.select([net4011], [], [], None) - if not ready or not ready[0]: - continue - i = recvmsg(net4011.fileno(), ctypes.pointer(msg), 0) - #nb, client = net4011.recvfrom_into(rq) - if i < 240: - continue - rqv = memoryview(rq)[:i] - client = (ipfromint(clientaddr.sin_addr.s_addr), socket.htons(clientaddr.sin_port)) - _, level, typ = struct.unpack('QII', cmsgarr[:16]) - if level == socket.IPPROTO_IP and typ == IP_PKTINFO: - idx, recv = struct.unpack('II', cmsgarr[16:24]) - recv = ipfromint(recv) - try: - optidx = rqv.tobytes().index(b'\x63\x82\x53\x63') + 4 - except ValueError: - continue - hwlen = rq[2] - opts, disco = opts_to_dict(rq, optidx, 3) - disco['hwaddr'] = ':'.join(['{0:02x}'.format(x) for x in rq[28:28+hwlen]]) - node = None - if disco.get('hwaddr', None) in macmap: - node = macmap[disco['hwaddr']] - elif disco.get('uuid', None) in uuidmap: - node = uuidmap[disco['uuid']] - myipn = myipbypeer.get(rqv[28:28+hwlen].tobytes(), None) - skiplogging = True + disco, client, myipn, idx, recv, node, opts, data = await pktq.get() netaddr = disco['hwaddr'] if time.time() > ignoredisco.get(netaddr, 0) + 90: skiplogging = False @@ -375,6 +367,9 @@ def proxydhcp(handler, nodeguess): 'for this boot method.'.format( node, profile, len(bootfile) - 127)}) continue + rp = bytearray(300) + rpv = memoryview(rp) + rqv = memoryview(data) rpv[:240] = rqv[:240].tobytes() rpv[0:1] = b'\x02' rpv[108:108 + len(bootfile)] = bootfile @@ -390,15 +385,16 @@ def proxydhcp(handler, nodeguess): def start_proxydhcp(handler, nodeguess=None): - eventlet.spawn_n(proxydhcp, handler, nodeguess) + util.spawn(proxydhcp(handler, nodeguess)) -def snoop(handler, protocol=None, nodeguess=None): +async def snoop(handler, protocol=None, nodeguess=None): #TODO(jjohnson2): ipv6 socket and multicast for DHCPv6, should that be #prominent #TODO(jjohnson2): enable unicast replies. This would suggest either # injection into the neigh table before OFFER or using SOCK_RAW. start_proxydhcp(handler, nodeguess) + global tracelog tracelog = log.Logger('trace') global attribwatcher cfg = cfm.ConfigManager(None)