mirror of
https://github.com/xcat2/confluent.git
synced 2026-03-03 20:39:17 +00:00
Begin conversion of pxe to asyncio
Also convert to 'natural' recvmsg now that we are requiring python high enough to have it.
This commit is contained in:
@@ -22,6 +22,7 @@
|
||||
|
||||
# option 97 = UUID (wireformat)
|
||||
|
||||
import asyncio
|
||||
import confluent.config.configmanager as cfm
|
||||
import confluent.collective.manager as collective
|
||||
import confluent.noderange as noderange
|
||||
@@ -273,58 +274,49 @@ def opts_to_dict(rq, optidx, expectype=1):
|
||||
def ipfromint(numb):
|
||||
return socket.inet_ntoa(struct.pack('I', numb))
|
||||
|
||||
def proxydhcp(handler, nodeguess):
|
||||
def relay_proxydhcp(sock, pktq):
|
||||
sock.setblocking(0)
|
||||
data, cmsgs, flags, peer = sock.recvmsg(9000, 9000)
|
||||
if len(data) < 240:
|
||||
return
|
||||
try:
|
||||
optidx = data.index(b'\x63\x82\x53\x63') + 4
|
||||
except ValueError:
|
||||
return
|
||||
for cmsg in cmsgs:
|
||||
level, typ, cdata = cmsg
|
||||
if level == socket.IPPROTO_IP and typ == IP_PKTINFO:
|
||||
idx, recv = struct.unpack('II', cdata[:8])
|
||||
recv = ipfromint(recv)
|
||||
break
|
||||
else:
|
||||
return
|
||||
rq = memoryview(data)
|
||||
hwlen = rq[2]
|
||||
opts, disco = opts_to_dict(rq, optidx, 3)
|
||||
disco['hwaddr'] = ':'.join(['{0:02x}'.format(x) for x in rq[28:28+hwlen]])
|
||||
node = None
|
||||
if disco.get('hwaddr', None) in macmap:
|
||||
node = macmap[disco['hwaddr']]
|
||||
elif disco.get('uuid', None) in uuidmap:
|
||||
node = uuidmap[disco['uuid']]
|
||||
myipn = myipbypeer.get(data[28:28+hwlen], None)
|
||||
skiplogging = True
|
||||
pktq.put_nowait((disco, peer, myipn, idx, recv, node, opts, data))
|
||||
|
||||
|
||||
async def proxydhcp(handler, nodeguess):
|
||||
net4011 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
net4011.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
net4011.setsockopt(socket.IPPROTO_IP, IP_PKTINFO, 1)
|
||||
net4011.bind(('', 4011))
|
||||
rp = bytearray(300)
|
||||
rpv = memoryview(rp)
|
||||
rq = bytearray(2048)
|
||||
data = pkttype.from_buffer(rq)
|
||||
msg = msghdr()
|
||||
cmsgarr = bytearray(cmsgsize)
|
||||
cmsg = cmsgtype.from_buffer(cmsgarr)
|
||||
iov = iovec()
|
||||
iov.iov_base = ctypes.addressof(data)
|
||||
iov.iov_len = 2048
|
||||
msg.msg_iov = ctypes.pointer(iov)
|
||||
msg.msg_iovlen = 1
|
||||
msg.msg_control = ctypes.addressof(cmsg)
|
||||
msg.msg_controllen = ctypes.sizeof(cmsg)
|
||||
clientaddr = sockaddr_in()
|
||||
msg.msg_name = ctypes.addressof(clientaddr)
|
||||
msg.msg_namelen = ctypes.sizeof(clientaddr)
|
||||
cloop = asyncio.get_running_loop()
|
||||
pktq = asyncio.Queue()
|
||||
cloop.add_reader(net4011, relay_proxydhcp, net4011, pktq)
|
||||
cfg = cfm.ConfigManager(None)
|
||||
while True:
|
||||
try:
|
||||
ready = select.select([net4011], [], [], None)
|
||||
if not ready or not ready[0]:
|
||||
continue
|
||||
i = recvmsg(net4011.fileno(), ctypes.pointer(msg), 0)
|
||||
#nb, client = net4011.recvfrom_into(rq)
|
||||
if i < 240:
|
||||
continue
|
||||
rqv = memoryview(rq)[:i]
|
||||
client = (ipfromint(clientaddr.sin_addr.s_addr), socket.htons(clientaddr.sin_port))
|
||||
_, level, typ = struct.unpack('QII', cmsgarr[:16])
|
||||
if level == socket.IPPROTO_IP and typ == IP_PKTINFO:
|
||||
idx, recv = struct.unpack('II', cmsgarr[16:24])
|
||||
recv = ipfromint(recv)
|
||||
try:
|
||||
optidx = rqv.tobytes().index(b'\x63\x82\x53\x63') + 4
|
||||
except ValueError:
|
||||
continue
|
||||
hwlen = rq[2]
|
||||
opts, disco = opts_to_dict(rq, optidx, 3)
|
||||
disco['hwaddr'] = ':'.join(['{0:02x}'.format(x) for x in rq[28:28+hwlen]])
|
||||
node = None
|
||||
if disco.get('hwaddr', None) in macmap:
|
||||
node = macmap[disco['hwaddr']]
|
||||
elif disco.get('uuid', None) in uuidmap:
|
||||
node = uuidmap[disco['uuid']]
|
||||
myipn = myipbypeer.get(rqv[28:28+hwlen].tobytes(), None)
|
||||
skiplogging = True
|
||||
disco, client, myipn, idx, recv, node, opts, data = await pktq.get()
|
||||
netaddr = disco['hwaddr']
|
||||
if time.time() > ignoredisco.get(netaddr, 0) + 90:
|
||||
skiplogging = False
|
||||
@@ -375,6 +367,9 @@ def proxydhcp(handler, nodeguess):
|
||||
'for this boot method.'.format(
|
||||
node, profile, len(bootfile) - 127)})
|
||||
continue
|
||||
rp = bytearray(300)
|
||||
rpv = memoryview(rp)
|
||||
rqv = memoryview(data)
|
||||
rpv[:240] = rqv[:240].tobytes()
|
||||
rpv[0:1] = b'\x02'
|
||||
rpv[108:108 + len(bootfile)] = bootfile
|
||||
@@ -390,15 +385,16 @@ def proxydhcp(handler, nodeguess):
|
||||
|
||||
|
||||
def start_proxydhcp(handler, nodeguess=None):
|
||||
eventlet.spawn_n(proxydhcp, handler, nodeguess)
|
||||
util.spawn(proxydhcp(handler, nodeguess))
|
||||
|
||||
|
||||
def snoop(handler, protocol=None, nodeguess=None):
|
||||
async def snoop(handler, protocol=None, nodeguess=None):
|
||||
#TODO(jjohnson2): ipv6 socket and multicast for DHCPv6, should that be
|
||||
#prominent
|
||||
#TODO(jjohnson2): enable unicast replies. This would suggest either
|
||||
# injection into the neigh table before OFFER or using SOCK_RAW.
|
||||
start_proxydhcp(handler, nodeguess)
|
||||
global tracelog
|
||||
tracelog = log.Logger('trace')
|
||||
global attribwatcher
|
||||
cfg = cfm.ConfigManager(None)
|
||||
|
||||
Reference in New Issue
Block a user