2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-02 07:23:35 +00:00

Rework netutil for async and dependent functions

This commit is contained in:
Jarrod Johnson
2026-02-26 17:27:04 -05:00
parent 84f3614f0a
commit 3636e14628
16 changed files with 110 additions and 106 deletions

View File

@@ -366,7 +366,7 @@ async def main():
for nic in glob.glob("/sys/class/net/*/ifindex"):
idx = int(open(nic, "r").read())
nicname = nic.split('/')[-2]
ncfg = netutil.get_nic_config(cfg, args.node, ifidx=idx)
ncfg = await netutil.get_nic_config(cfg, args.node, ifidx=idx)
if ncfg['ipv4_address']:
targsships.append(ncfg['ipv4_address'])
if ncfg['ipv4_address'] or ncfg['ipv4_method'] == 'dhcp':

View File

@@ -173,7 +173,7 @@ async def local_node_trust_setup():
myprincipals.add(myshortname)
if domain:
myprincipals.add('{0}.{1}'.format(myshortname, domain))
for addr in netutil.get_my_addresses():
for addr in await netutil.get_my_addresses():
addr = socket.inet_ntop(addr[0], addr[1])
myprincipals.add(addr)
for pubkey in glob.glob('/etc/ssh/ssh_host_*_key.pub'):

View File

@@ -1097,12 +1097,12 @@ def get_collective_member(name):
return _cfgstore.get('collective', {}).get(name, None)
def get_collective_member_by_address(address):
async def get_collective_member_by_address(address):
if _cfgstore is None:
init()
for name in _cfgstore.get('collective', {}):
currdrone = _cfgstore['collective'][name]
if netutil.addresses_match(address, currdrone['address']):
if await netutil.addresses_match(address, currdrone['address']):
return currdrone

View File

@@ -39,10 +39,10 @@ libc = ctypes.CDLL(ctypes.util.find_library('c'))
# 128, len, len, key - sealed key
def address_is_somewhat_trusted(address, nodename, cfm):
if netutil.ip_on_same_subnet(address.split('%')[0], 'fe80::', 64):
async def address_is_somewhat_trusted(address, nodename, cfm):
if await netutil.ip_on_same_subnet(address.split('%')[0], 'fe80::', 64):
return True
if netutil.address_is_local(address):
if await netutil.address_is_local(address):
return True
authnets = cfm.get_node_attributes(nodename, 'trusted.subnets')
authnets = authnets.get(nodename, {}).get('trusted.subnets', {}).get('value', None)
@@ -51,7 +51,7 @@ def address_is_somewhat_trusted(address, nodename, cfm):
for anet in authnet.split():
na, plen = anet.split('/')
plen = int(plen)
if netutil.ip_on_same_subnet(address, na, plen):
if await netutil.ip_on_same_subnet(address, na, plen):
return True
return False
@@ -85,7 +85,7 @@ class CredServer(object):
apiarmed = apimats.get(nodename, {}).get('deployment.apiarmed', {}).get(
'value', None)
if not hmackey:
if not address_is_somewhat_trusted(peer[0], nodename, self.cfm):
if not await address_is_somewhat_trusted(peer[0], nodename, self.cfm):
client.close()
return
if not apiarmed:

View File

@@ -162,7 +162,7 @@ class NodeHandler(generic.NodeHandler):
newip = newipinfo[-1][0]
if ':' in newip:
raise exc.NotImplementedException('IPv6 remote config TODO')
netconfig = netutil.get_nic_config(cfg, nodename, ip=newip)
netconfig = await netutil.get_nic_config(cfg, nodename, ip=newip)
plen = netconfig['prefix']
newip = '{0}/{1}'.format(newip, plen)
currcfg = ic.get_net_configuration()

View File

@@ -300,8 +300,8 @@ class NodeHandler(generic.NodeHandler):
else:
raise Exception("Unable to detect active NIC of multi-nic bmc")
actualnics = [actualnic]
currnet = wc.grab_json_response(actualnics[0])
netconfig = netutil.get_nic_config(self.configmanager, nodename, ip=newip)
currnet = await wc.grab_json_response(actualnics[0])
netconfig = await netutil.get_nic_config(self.configmanager, nodename, ip=newip)
newconfig = {
"Address": newip,
"SubnetMask": netutil.cidr_to_mask(netconfig['prefix']),

View File

@@ -117,7 +117,7 @@ class NodeHandler(bmchandler.NodeHandler):
'ifConfig').find('v4IPAddr').text
if currip == smmip:
return
netconfig = netutil.get_nic_config(cfg, nodename, ip=smmip)
netconfig = await netutil.get_nic_config(cfg, nodename, ip=smmip)
netmask = netutil.cidr_to_mask(netconfig['prefix'])
setdata = 'set=ifIndex:0,v4DHCPEnabled:0,v4IPAddr:{0},v4NetMask:{1}'.format(smmip, netmask)
gateway = netconfig.get('ipv4_gateway', None)

View File

@@ -212,7 +212,7 @@ class NodeHandler(generic.NodeHandler):
if net['channel_number'] == self.channel:
# we have found the interface to potentially manipulate
if net['ipv4_address'] != newip:
netconfig = netutil.get_nic_config(self.configmanager, nodename, ip=newip)
netconfig = await netutil.get_nic_config(self.configmanager, nodename, ip=newip)
newmask = netutil.cidr_to_mask(netconfig['prefix'])
net['ipv4_address'] = newip
net['ipv4_subnet'] = newmask

View File

@@ -619,11 +619,11 @@ class NodeHandler(immhandler.NodeHandler):
newip = newipinfo[-1][0]
if ':' in newip:
raise exc.NotImplementedException('IPv6 remote config TODO')
netconfig = netutil.get_nic_config(self.configmanager, nodename, ip=targbmc)
netconfig = await netutil.get_nic_config(self.configmanager, nodename, ip=targbmc)
newmask = netutil.cidr_to_mask(netconfig['prefix'])
currinfo = await wc.grab_json_response('/api/providers/logoninfo')
currip = currinfo.get('items', [{}])[0].get('ipv4_address', '')
curreth1 = wc.grab_json_response('/api/dataset/imm_ethernet')
curreth1 = await wc.grab_json_response('/api/dataset/imm_ethernet')
if curreth1:
if self.ipaddr.startswith('fe80::'):
ipkey = 'ipv6_link_local_address'
@@ -634,7 +634,7 @@ class NodeHandler(immhandler.NodeHandler):
nic1ip = curreth1.get('items', [{}])[0].get(ipkey, None)
if nic1ip != self.ipaddr:
# check second nic instead
curreth2 = wc.grab_json_response('/api/dataset/imm_ethernet_2')
curreth2 = await wc.grab_json_response('/api/dataset/imm_ethernet_2')
if curreth2 and curreth2.get('items', [{}])[0].get('if_second_port_exist', 0):
nic2ip = curreth2.get('items', [{}])[0].get(ipkey + '_2', None)
if nic2ip != self.ipaddr:
@@ -650,7 +650,7 @@ class NodeHandler(immhandler.NodeHandler):
}
if netconfig['ipv4_gateway']:
statargs['ENET_IPv4GatewayIPAddr'] = netconfig['ipv4_gateway']
elif not netutil.address_is_local(newip):
elif not await netutil.address_is_local(newip):
raise exc.InvalidArgumentException('Will not remotely configure a device with no gateway')
if attribsuffix:
for currkey in list(statargs):

View File

@@ -569,7 +569,7 @@ def get_deployment_profile(node, cfg, cfd=None):
staticassigns = {}
myipbypeer = {}
def check_reply(node, info, packet, sock, cfg, reqview, addr, requestor):
async def check_reply(node, info, packet, sock, cfg, reqview, addr, requestor):
if not requestor:
requestor = ('0.0.0.0', None)
if requestor[0] == '0.0.0.0' and not info.get('uuid', None):
@@ -590,16 +590,16 @@ def check_reply(node, info, packet, sock, cfg, reqview, addr, requestor):
if packet['vci'] and packet['vci'].startswith('PXEClient'):
log.log({'info': 'IPv6 PXE boot attempt by {0}, but IPv6 PXE is not supported, try IPv6 HTTP boot or IPv4 boot'.format(node)})
return
return reply_dhcp6(node, addr, cfg, packet, cfd, profile, sock)
return await reply_dhcp6(node, addr, cfg, packet, cfd, profile, sock)
else:
return reply_dhcp4(node, info, packet, cfg, reqview, httpboot, cfd, profile, sock, requestor)
return await reply_dhcp4(node, info, packet, cfg, reqview, httpboot, cfd, profile, sock, requestor)
def reply_dhcp6(node, addr, cfg, packet, cfd, profile, sock):
myaddrs = netutil.get_my_addresses(addr[-1], socket.AF_INET6)
async def reply_dhcp6(node, addr, cfg, packet, cfd, profile, sock):
myaddrs = await netutil.get_my_addresses(addr[-1], socket.AF_INET6)
if not myaddrs:
log.log({'info': 'Unable to provide IPv6 boot services to {0}, no viable IPv6 configuration on interface index "{1}" to respond through.'.format(node, addr[-1])})
return
niccfg = netutil.get_nic_config(cfg, node, ifidx=addr[-1], onlyfamily=socket.AF_INET6)
niccfg = await netutil.get_nic_config(cfg, node, ifidx=addr[-1], onlyfamily=socket.AF_INET6)
ipv6addr = niccfg.get('ipv6_address', None)
ipv6prefix = niccfg.get('ipv6_prefix', None)
ipv6method = niccfg.get('ipv6_method', 'static')
@@ -679,7 +679,7 @@ def get_my_duid():
_recent_txids = {}
def reply_dhcp4(node, info, packet, cfg, reqview, httpboot, cfd, profile, sock=None, requestor=None):
async def reply_dhcp4(node, info, packet, cfg, reqview, httpboot, cfd, profile, sock=None, requestor=None):
replen = 275 # default is going to be 286
# while myipn is describing presumed destination, it's really
# vague in the face of aliases, need to convert to ifidx and evaluate
@@ -724,7 +724,7 @@ def reply_dhcp4(node, info, packet, cfg, reqview, httpboot, cfd, profile, sock=N
relayipa = socket.inet_ntoa(relayip)
gateway = None
netmask = None
niccfg = netutil.get_nic_config(cfg, node, ifidx=info['netinfo']['ifidx'], relayipn=relayip, onlyfamily=socket.AF_INET)
niccfg = await netutil.get_nic_config(cfg, node, ifidx=info['netinfo']['ifidx'], relayipn=relayip, onlyfamily=socket.AF_INET)
nicerr = niccfg.get('error_msg', False)
if nicerr:
log.log({'error': nicerr})

View File

@@ -293,7 +293,7 @@ async def snoop(handler, byehandler=None, protocol=None, uuidlookup=None):
msecs = int(currtime * 1000 % 1000)
reply = 'HTTP/1.1 200 OK\r\nNODENAME: {0}\r\nCURRTIME: {1}\r\nCURRMSECS: {2}\r\n'.format(node, seconds, msecs)
theip = peer[0].split('%', 1)[0]
if netutil.ip_on_same_subnet(theip, 'fe80::', 64):
if await netutil.ip_on_same_subnet(theip, 'fe80::', 64):
if '%' in peer[0]:
ifidx = peer[0].split('%', 1)[1]
iface = await cloop.getaddrinfo(peer[0], 0, socket.AF_INET6, socket.SOCK_DGRAM)[0][-1][-1]
@@ -301,11 +301,11 @@ async def snoop(handler, byehandler=None, protocol=None, uuidlookup=None):
ifidx = '{}'.format(peer[-1])
iface = peer[-1]
reply += 'MGTIFACE: {0}\r\n'.format(ifidx)
ncfg = netutil.get_nic_config(
ncfg = await netutil.get_nic_config(
cfg, node, ifidx=iface)
if ncfg.get('matchesnodename', None):
reply += 'DEFAULTNET: 1\r\n'
elif not netutil.address_is_local(peer[0]):
elif not await netutil.address_is_local(peer[0]):
continue
if not isinstance(reply, bytes):
reply = reply.encode('utf8')

View File

@@ -101,7 +101,7 @@ async def get_hwaddr(ipaddr):
await _update_neigh()
updated = True
hwaddr = neightable.get(ipaddr, None)
if not hwaddr and not netutil.ipn_is_local(ipaddr):
if not hwaddr and not await netutil.ipn_is_local(ipaddr):
hwaddr = False
if hwaddr == None and not updated:
await _update_neigh()

View File

@@ -16,6 +16,7 @@
# this will implement noderange grammar
import asyncio
import confluent.exceptions as exc
import codecs
try:
@@ -24,13 +25,10 @@ except ImportError:
psutil = None
import netifaces
import struct
import eventlet.green.socket as socket
import eventlet.support.greendns
import os
getaddrinfo = eventlet.support.greendns.getaddrinfo
import socket
import confluent.tasks as tasks
eventlet.support.greendns.resolver.clear()
eventlet.support.greendns.resolver._resolver.lifetime = 1
def msg_align(len):
return (len + 3) & ~3
@@ -74,18 +72,18 @@ def ipn_on_same_subnet(fam, first, second, prefix):
second = struct.unpack('!I', second)[0]
return (first & mask == second & mask)
def ip_on_same_subnet(first, second, prefix):
async def ip_on_same_subnet(first, second, prefix):
if first.startswith('::ffff:') and '.' in first:
first = first.replace('::ffff:', '')
if second.startswith('::ffff:') and '.' in second:
second = second.replace('::ffff:', '')
addrinf = socket.getaddrinfo(first, None, 0, socket.SOCK_STREAM)[0]
addrinf = (await asyncio.get_running_loop().getaddrinfo(first, None, 0, socket.SOCK_STREAM))[0]
fam = addrinf[0]
if '%' in addrinf[-1][0]:
return False
ip = socket.inet_pton(fam, addrinf[-1][0])
ip = int(codecs.encode(bytes(ip), 'hex'), 16)
addrinf = socket.getaddrinfo(second, None, 0, socket.SOCK_STREAM)[0]
addrinf = (await asyncio.get_running_loop().getaddrinfo(second, None, 0, socket.SOCK_STREAM))[0]
if fam != addrinf[0]:
return False
txtaddr = addrinf[-1][0].split('%')[0]
@@ -101,10 +99,10 @@ def ip_on_same_subnet(first, second, prefix):
return ip & mask == oip & mask
def ipn_is_local(ipn):
async def ipn_is_local(ipn):
if len(ipn) > 5 and ipn.startswith(b'\xfe\x80'):
return True
for addr in get_my_addresses():
for addr in await get_my_addresses():
if len(addr[1]) != len(ipn):
continue
if ipn_on_same_subnet(addr[0], ipn, addr[1], addr[2]):
@@ -112,25 +110,25 @@ def ipn_is_local(ipn):
return False
def address_is_local(address):
async def address_is_local(address):
if psutil:
ifas = psutil.net_if_addrs()
for iface in ifas:
for addr in ifas[iface]:
if addr.family in (socket.AF_INET, socket.AF_INET6):
cidr = mask_to_cidr(addr.netmask)
if ip_on_same_subnet(addr.address, address, cidr):
if await ip_on_same_subnet(addr.address, address, cidr):
return True
else:
for iface in netifaces.interfaces():
for i4 in netifaces.ifaddresses(iface).get(2, []):
cidr = mask_to_cidr(i4['netmask'])
if ip_on_same_subnet(i4['addr'], address, cidr):
if await ip_on_same_subnet(i4['addr'], address, cidr):
return True
for i6 in netifaces.ifaddresses(iface).get(10, []):
cidr = int(i6['netmask'].split('/')[1])
laddr = i6['addr'].split('%')[0]
if ip_on_same_subnet(laddr, address, cidr):
if await ip_on_same_subnet(laddr, address, cidr):
return True
return False
@@ -146,7 +144,7 @@ def _rebuildidxmap():
pass
def myiptonets(svrip):
async def myiptonets(svrip):
fam = socket.AF_INET
if ':' in svrip:
fam = socket.AF_INET6
@@ -159,7 +157,7 @@ def myiptonets(svrip):
continue
addr = addr.address
addr = addr.split('%')[0]
if addresses_match(addr, svrip):
if await addresses_match(addr, svrip):
relevantnic = iface
break
else:
@@ -170,7 +168,7 @@ def myiptonets(svrip):
for addr in netifaces.ifaddresses(iface).get(fam, []):
addr = addr.get('addr', '')
addr = addr.split('%')[0]
if addresses_match(addr, svrip):
if await addresses_match(addr, svrip):
relevantnic = iface
break
else:
@@ -220,13 +218,12 @@ class NetManager(object):
self.consumednames4 = set([])
self.consumednames6 = set([])
@property
def allmyaddrs(self):
async def allmyaddrs(self):
if not self._allmyaddrs:
self._allmyaddrs = get_my_addresses()
self._allmyaddrs = await get_my_addresses()
return self._allmyaddrs
def process_attribs(self, netname, attribs):
async def process_attribs(self, netname, attribs):
self.myattribs[netname] = {}
ipv4addr = None
ipv6addr = None
@@ -255,7 +252,7 @@ class NetManager(object):
if ipv4addr:
try:
luaddr = ipv4addr.split('/', 1)[0]
for ai in socket.getaddrinfo(luaddr, 0, socket.AF_INET, socket.SOCK_STREAM):
for ai in await asyncio.get_running_loop().getaddrinfo(luaddr, 0, socket.AF_INET, socket.SOCK_STREAM):
ipv4addr.replace(luaddr, ai[-1][0])
except socket.gaierror:
pass
@@ -263,7 +260,7 @@ class NetManager(object):
currname = attribs.get('hostname', self.node).split()[0]
if currname and currname not in self.consumednames4:
try:
for ai in socket.getaddrinfo(currname, 0, socket.AF_INET, socket.SOCK_STREAM):
for ai in await asyncio.get_running_loop().getaddrinfo(currname, 0, socket.AF_INET, socket.SOCK_STREAM):
ipv4addr = ai[-1][0]
self.consumednames4.add(currname)
except socket.gaierror:
@@ -280,7 +277,7 @@ class NetManager(object):
ipv6addr = attribs.get('ipv6_address', None)
if ipv6addr:
try:
for ai in socket.getaddrinfo(ipv6addr, 0, socket.AF_INET6, socket.SOCK_STREAM):
for ai in await asyncio.get_running_loop().getaddrinfo(ipv6addr, 0, socket.AF_INET6, socket.SOCK_STREAM):
ipv6addr = ai[-1][0]
except socket.gaierror:
pass
@@ -288,7 +285,7 @@ class NetManager(object):
currname = attribs.get('hostname', self.node).split()[0]
if currname and currname not in self.consumednames6:
try:
for ai in socket.getaddrinfo(currname, 0, socket.AF_INET6, socket.SOCK_STREAM):
for ai in await asyncio.get_running_loop().getaddrinfo(currname, 0, socket.AF_INET6, socket.SOCK_STREAM):
ipv6addr = ai[-1][0]
self.consumednames6.add(currname)
except socket.gaierror:
@@ -327,7 +324,7 @@ class NetManager(object):
if '/' not in myattribs.get('ipv6_address', '/'):
ipn = socket.inet_pton(socket.AF_INET6, myattribs['ipv6_address'])
plen = 64
for addr in self.allmyaddrs:
for addr in await self.allmyaddrs():
if addr[0] != socket.AF_INET6:
continue
if ipn_on_same_subnet(addr[0], ipn, addr[1], addr[2]):
@@ -336,7 +333,7 @@ class NetManager(object):
if '/' not in myattribs.get('ipv4_address', '/'):
ipn = socket.inet_pton(socket.AF_INET, myattribs['ipv4_address'])
plen = 16
for addr in self.allmyaddrs:
for addr in await self.allmyaddrs():
if addr[0] != socket.AF_INET:
continue
if ipn_on_same_subnet(addr[0], ipn, addr[1], addr[2]):
@@ -346,8 +343,8 @@ class NetManager(object):
myattribs['current_nic'] = False
def get_flat_net_config(configmanager, node):
fnc = get_full_net_config(configmanager, node)
async def get_flat_net_config(configmanager, node):
fnc = await get_full_net_config(configmanager, node)
dft = fnc.get('default', {})
if dft:
ret = [dft]
@@ -364,7 +361,7 @@ def add_netmask(ncfg):
plen = ncfg['ipv4_address'].split('/', 1)[1]
ncfg['ipv4_netmask'] = cidr_to_mask(int(plen))
def get_full_net_config(configmanager, node, serverip=None):
async def get_full_net_config(configmanager, node, serverip=None):
cfd = configmanager.get_node_attributes(node, ['net.*'])
cfd = cfd.get(node, {})
bmc = configmanager.get_node_attributes(
@@ -374,11 +371,11 @@ def get_full_net_config(configmanager, node, serverip=None):
bmc6 = None
if bmc:
try:
bmc4 = socket.getaddrinfo(bmc, 0, socket.AF_INET, socket.SOCK_DGRAM)[0][-1][0]
bmc4 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET, socket.SOCK_DGRAM))[0][-1][0]
except Exception:
pass
try:
bmc6 = socket.getaddrinfo(bmc, 0, socket.AF_INET6, socket.SOCK_DGRAM)[0][-1][0]
bmc6 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET6, socket.SOCK_DGRAM))[0][-1][0]
except Exception:
pass
attribs = {}
@@ -400,16 +397,16 @@ def get_full_net_config(configmanager, node, serverip=None):
attribs[iface][attrib] = val
myaddrs = []
if serverip:
myaddrs = get_addresses_by_serverip(serverip)
myaddrs = await get_addresses_by_serverip(serverip)
nm = NetManager(myaddrs, node, configmanager)
defaultnic = {}
ppool = eventlet.greenpool.GreenPool(64)
ppool = tasks.TaskPool()
if None in attribs:
ppool.spawn(nm.process_attribs, None, attribs[None])
ppool.schedule(nm.process_attribs, None, attribs[None])
del attribs[None]
for netname in sorted(attribs):
ppool.spawn(nm.process_attribs, netname, attribs[netname])
ppool.waitall()
ppool.schedule(nm.process_attribs, netname, attribs[netname])
await ppool.waitall()
for iface in list(nm.myattribs):
if bmc4 and nm.myattribs[iface].get('ipv4_address', None) == bmc4:
del nm.myattribs[iface]
@@ -477,7 +474,7 @@ def noneify(cfgdata):
# that mac address
# the ip as reported by recvmsg to match the subnet of that net.* interface
# if switch and port available, that should match.
def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None,
async def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None,
serverip=None, relayipn=b'\x00\x00\x00\x00',
clientip=None, onlyfamily=None):
"""Fetch network configuration parameters for a nic
@@ -533,12 +530,12 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None,
if bmc:
try:
if onlyfamily in (0, socket.AF_INET):
bmc4 = socket.getaddrinfo(bmc, 0, socket.AF_INET, socket.SOCK_DGRAM)[0][-1][0]
bmc4 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET, socket.SOCK_DGRAM))[0][-1][0]
except Exception:
pass
try:
if onlyfamily in (0, socket.AF_INET6):
bmc6 = socket.getaddrinfo(bmc, 0, socket.AF_INET6, socket.SOCK_DGRAM)[0][-1][0]
bmc6 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET6, socket.SOCK_DGRAM))[0][-1][0]
except Exception:
pass
cfgbyname = {}
@@ -564,7 +561,7 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None,
myaddrs = []
if ifidx is not None:
dhcprequested = False
myaddrs = get_my_addresses(ifidx, family=onlyfamily)
myaddrs = await get_my_addresses(ifidx, family=onlyfamily)
v4broken = True
v6broken = True
for addr in myaddrs:
@@ -579,7 +576,7 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None,
isremote = False
if serverip is not None:
dhcprequested = False
myaddrs = get_addresses_by_serverip(serverip)
myaddrs = await get_addresses_by_serverip(serverip)
if serverfam == socket.AF_INET6 and ipn_on_same_subnet(serverfam, serveripn, llaipn, 64):
isremote = False
elif clientfam:
@@ -597,13 +594,13 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None,
ip6bynodename = None
try:
if onlyfamily in (socket.AF_INET, 0):
for addr in socket.getaddrinfo(node, 0, socket.AF_INET, socket.SOCK_DGRAM):
for addr in await asyncio.get_running_loop().getaddrinfo(node, 0, socket.AF_INET, socket.SOCK_DGRAM):
ipbynodename = addr[-1][0]
except socket.gaierror:
pass
try:
if onlyfamily in (socket.AF_INET6, 0):
for addr in socket.getaddrinfo(node, 0, socket.AF_INET6, socket.SOCK_DGRAM):
for addr in await asyncio.get_running_loop().getaddrinfo(node, 0, socket.AF_INET6, socket.SOCK_DGRAM):
ip6bynodename = addr[-1][0]
except socket.gaierror:
pass
@@ -650,7 +647,7 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None,
if bmc6 and candip == bmc6:
continue
try:
for inf in socket.getaddrinfo(candip, 0, fam, socket.SOCK_STREAM):
for inf in await asyncio.get_running_loop().getaddrinfo(candip, 0, fam, socket.SOCK_STREAM):
candipn = socket.inet_pton(fam, inf[-1][0])
if ((isremote and ipn_on_same_subnet(fam, clientipn, candipn, int(candprefix)))
or ipn_on_same_subnet(fam, bootsvrip, candipn, prefix)):
@@ -669,7 +666,7 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None,
except Exception as e:
cfgdata['error_msg'] = "Error trying to evaluate net.*ipv4_address attribute value '{0}' on {1}: {2}".format(candip, node, str(e))
elif candgw:
for inf in socket.getaddrinfo(candgw, 0, fam, socket.SOCK_STREAM):
for inf in await asyncio.get_running_loop().getaddrinfo(candgw, 0, fam, socket.SOCK_STREAM):
candgwn = socket.inet_pton(fam, inf[-1][0])
if ipn_on_same_subnet(fam, bootsvrip, candgwn, prefix):
candgws.append((fam, candgwn, prefix))
@@ -731,7 +728,7 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None,
cfgdata['ipv{}_gateway'.format(nver)] = socket.inet_ntop(fam, candgwn)
return noneify(cfgdata)
if ip is not None:
for prefixinfo in get_prefix_len_for_ip(ip):
for prefixinfo in await get_prefix_len_for_ip(ip):
fam, prefix = prefixinfo
ip = ip.split('/', 1)[0]
if fam == socket.AF_INET:
@@ -747,14 +744,14 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None,
if gw is None or not gw:
continue
gwn = socket.inet_pton(fam, gw)
ip = socket.getaddrinfo(ip, 0, proto=socket.IPPROTO_TCP, family=fam)[-1][-1][0]
ip = (await asyncio.get_running_loop().getaddrinfo(ip, 0, proto=socket.IPPROTO_TCP, family=fam))[-1][-1][0]
ipn = socket.inet_pton(fam, ip)
if ipn_on_same_subnet(fam, ipn, gwn, prefix):
cfgdata['ipv{}_gateway'.format(nver)] = gw
break
return noneify(cfgdata)
def get_addresses_by_serverip(serverip):
async def get_addresses_by_serverip(serverip):
if '.' in serverip:
fam = socket.AF_INET
elif ':' in serverip:
@@ -763,15 +760,15 @@ def get_addresses_by_serverip(serverip):
raise ValueError('"{0}" is not a valid ip argument'.format(serverip))
ipbytes = socket.inet_pton(fam, serverip)
if ipbytes[:8] == b'\xfe\x80\x00\x00\x00\x00\x00\x00':
myaddrs = get_my_addresses(matchlla=ipbytes)
myaddrs = await get_my_addresses(matchlla=ipbytes)
else:
myaddrs = [x for x in get_my_addresses() if x[1] == ipbytes]
myaddrs = [x for x in await get_my_addresses() if x[1] == ipbytes]
return myaddrs
nlhdrsz = struct.calcsize('IHHII')
ifaddrsz = struct.calcsize('BBBBI')
def get_my_addresses(idx=0, family=0, matchlla=None):
async def get_my_addresses(idx=0, family=0, matchlla=None):
# RTM_GETADDR = 22
# nlmsghdr struct: u32 len, u16 type, u16 flags, u32 seq, u32 pid
nlhdr = struct.pack('IHHII', nlhdrsz + ifaddrsz, 22, 0x301, 0, 0)
@@ -779,10 +776,10 @@ def get_my_addresses(idx=0, family=0, matchlla=None):
ifaddrmsg = struct.pack('BBBBI', family, 0, 0, 0, idx)
s = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, socket.NETLINK_ROUTE)
s.bind((0, 0))
s.sendall(nlhdr + ifaddrmsg)
await asyncio.get_event_loop().sock_sendall(s, nlhdr + ifaddrmsg)
addrs = []
while True:
pdata = s.recv(65536)
pdata = await asyncio.get_event_loop().sock_recv(s, 65536)
v = memoryview(pdata)
if struct.unpack('H', v[4:6])[0] == 3: # netlink done message
break
@@ -798,7 +795,7 @@ def get_my_addresses(idx=0, family=0, matchlla=None):
if rtalen < 4:
break
if rta[4:rtalen].tobytes() == matchlla:
return get_my_addresses(idx=ridx)
return await get_my_addresses(idx=ridx)
rta = rta[msg_align(rtalen):]
elif (ridx == idx or not idx) and scope == 0:
rta = v[nlhdrsz+ifaddrsz:length]
@@ -813,14 +810,14 @@ def get_my_addresses(idx=0, family=0, matchlla=None):
return addrs
def get_prefix_len_for_ip(ip):
async def get_prefix_len_for_ip(ip):
plen = None
if '/' in ip:
ip, plen = ip.split('/', 1)
plen = int(plen)
myaddrs = get_my_addresses()
myaddrs = await get_my_addresses()
found = False
for inf in socket.getaddrinfo(ip, 0, 0, socket.SOCK_DGRAM):
for inf in await asyncio.get_running_loop().getaddrinfo(ip, 0, 0, socket.SOCK_DGRAM):
if plen:
yield (inf[0], plen)
return
@@ -833,7 +830,7 @@ def get_prefix_len_for_ip(ip):
if not found:
raise exc.NotImplementedException("Non local addresses not supported")
def addresses_match(addr1, addr2):
async def addresses_match(addr1, addr2):
"""Check two network addresses for similarity
Is it zero padded in one place, not zero padded in another? Is one place by name and another by IP??
@@ -846,12 +843,12 @@ def addresses_match(addr1, addr2):
"""
if '%' in addr1 or '%' in addr2:
return False
for addrinfo in socket.getaddrinfo(addr1, 0, 0, socket.SOCK_STREAM):
for addrinfo in await asyncio.get_running_loop().getaddrinfo(addr1, 0, 0, socket.SOCK_STREAM):
rootaddr1 = socket.inet_pton(addrinfo[0], addrinfo[4][0])
if addrinfo[0] == socket.AF_INET6 and rootaddr1[:12] == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff':
# normalize to standard IPv4
rootaddr1 = rootaddr1[-4:]
for otherinfo in socket.getaddrinfo(addr2, 0, 0, socket.SOCK_STREAM):
for otherinfo in await asyncio.get_running_loop().getaddrinfo(addr2, 0, 0, socket.SOCK_STREAM):
otheraddr = socket.inet_pton(otherinfo[0], otherinfo[4][0])
if otherinfo[0] == socket.AF_INET6 and otheraddr[:12] == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff':
otheraddr = otheraddr[-4:]

View File

@@ -53,10 +53,10 @@ async def create_ident_image(node, configmanager):
# restricted by 'managercandidates'
ident['deploy_servers'] = []
ident['confluent_uuid'] = cfm.get_global('confluent_uuid')
for myaddr in netutil.get_my_addresses():
for myaddr in await netutil.get_my_addresses():
myaddr = socket.inet_ntop(myaddr[0], myaddr[1])
ident['deploy_servers'].append(myaddr)
ident['net_cfgs'] = netutil.get_flat_net_config(configmanager, node)
ident['net_cfgs'] = await netutil.get_flat_net_config(configmanager, node)
with open(os.path.join(tmpd, 'cnflnt.yml'), 'w') as yamlout:
yaml.safe_dump(ident, yamlout, default_flow_style=False)
with open(os.path.join(tmpd, 'cnflnt.jsn'), 'w') as jsonout:

View File

@@ -51,7 +51,7 @@ def listdump(input):
return retval
def get_extra_names(nodename, cfg, myip=None, preferadjacent=False, addlocalhost=True):
async def get_extra_names(nodename, cfg, myip=None, preferadjacent=False, addlocalhost=True):
if addlocalhost:
names = set(['127.0.0.1', '::1', 'localhost', 'localhost.localdomain'])
else:
@@ -71,8 +71,8 @@ def get_extra_names(nodename, cfg, myip=None, preferadjacent=False, addlocalhost
if domain and domain not in currname:
names.add('{0}.{1}'.format(currname, domain))
if myip:
ncfgs = [netutil.get_nic_config(cfg, nodename, serverip=myip)]
fncfg = netutil.get_full_net_config(cfg, nodename, serverip=myip)
ncfgs = [await netutil.get_nic_config(cfg, nodename, serverip=myip)]
fncfg = await netutil.get_full_net_config(cfg, nodename, serverip=myip)
ncfgs.append(fncfg.get('default', {}))
for ent in fncfg.get('extranets', []):
ncfgs.append(fncfg['extranets'][ent])
@@ -82,7 +82,7 @@ def get_extra_names(nodename, cfg, myip=None, preferadjacent=False, addlocalhost
for nip in (ncfg.get('ipv4_address', None), ncfg.get('ipv6_address', None)):
if nip:
nip = nip.split('/', 1)[0]
if not preferadjacent or netutil.address_is_local(nip):
if not preferadjacent or await netutil.address_is_local(nip):
names.add(nip)
addall = False
else:
@@ -266,7 +266,7 @@ async def handle_request(req, make_response, mimetype):
bmcaddr = await asyncio.get_event_loop().getaddrinfo(bmcaddr, 0)[0]
bmcaddr = bmcaddr[-1][0]
if '.' in bmcaddr: # ipv4 is allowed
netconfig = netutil.get_nic_config(cfg, nodename, ip=bmcaddr)
netconfig = await netutil.get_nic_config(cfg, nodename, ip=bmcaddr)
res['bmcipv4'] = bmcaddr
res['prefixv4'] = netconfig['prefix']
res['bmcgw'] = netconfig.get('ipv4_gateway', None)
@@ -285,7 +285,7 @@ async def handle_request(req, make_response, mimetype):
mrsp = await make_response(mimetype, 200, 'OK')
await mrsp.write(dumper(rsp))
elif reqpath == '/self/netcfg':
ncfg = netutil.get_full_net_config(cfg, nodename, myip)
ncfg = await netutil.get_full_net_config(cfg, nodename, myip)
mrsp = await make_response(mimetype, 200, 'OK')
await mrsp.write(dumper(ncfg))
elif reqpath in ('/self/deploycfg', '/self/deploycfg2'):
@@ -296,9 +296,9 @@ async def handle_request(req, make_response, mimetype):
except ValueError:
with open('/sys/class/net/{}/ifindex'.format(nicname), 'r') as nici:
ifidx = int(nici.read())
ncfg = netutil.get_nic_config(cfg, nodename, ifidx=ifidx)
ncfg = await netutil.get_nic_config(cfg, nodename, ifidx=ifidx)
else:
ncfg = netutil.get_nic_config(cfg, nodename, serverip=myip, clientip=clientip)
ncfg = await netutil.get_nic_config(cfg, nodename, serverip=myip, clientip=clientip)
if reqpath == '/self/deploycfg':
for key in list(ncfg):
if 'v6' in key:
@@ -429,12 +429,12 @@ async def handle_request(req, make_response, mimetype):
mrsp = await make_response(mimetype, 500, 'Unconfigured')
await msrp.write(b'CA is not configured on this system (run ...)')
return mrsp
pals = get_extra_names(nodename, cfg, myip)
pals = await get_extra_names(nodename, cfg, myip)
cert = sshutil.sign_host_key(reqbody, nodename, pals)
mrsp = await make_response('text/plain', 200, 'OK')
await mrsp.write(cert)
elif reqpath == '/self/nodelist':
nodes, _ = get_cluster_list(nodename, cfg)
nodes, _ = await get_cluster_list(nodename, cfg)
if isgeneric:
mrsp = await make_response('text/plain', 200, 'OK')
for node in util.natural_sort(nodes):
@@ -538,7 +538,7 @@ async def handle_request(req, make_response, mimetype):
return
elif reqpath.startswith('/self/remotesyncfiles'):
if 'POST' == operation:
pals = get_extra_names(nodename, cfg, myip, preferadjacent=True, addlocalhost=False)
pals = await get_extra_names(nodename, cfg, myip, preferadjacent=True, addlocalhost=False)
if clientip in pals:
pals = [clientip]
result = syncfiles.start_syncfiles(
@@ -622,7 +622,7 @@ def get_scriptlist(scriptcat, cfg, nodename, pathtemplate):
return slist, profile
def get_cluster_list(nodename=None, cfg=None):
async def get_cluster_list(nodename=None, cfg=None):
if cfg is None:
cfg = configmanager.ConfigManager(None)
nodes = None
@@ -642,7 +642,7 @@ def get_cluster_list(nodename=None, cfg=None):
domaininfo = cfg.get_node_attributes(node, 'dns.domain')
domain = domaininfo.get(node, {}).get('dns.domain', {}).get(
'value', None)
for extraname in get_extra_names(node, cfg):
for extraname in await get_extra_names(node, cfg):
nodes.add(extraname)
if autonodes:
for mgr in configmanager.list_collective():

View File

@@ -74,6 +74,13 @@ class TaskPool:
self._tasks.add(currtask)
currtask.add_done_callback(self._done_callback)
return tholder
async def waitall(self):
while self._tasks or self._pending:
if self._tasks:
done, _ = await asyncio.wait(self._tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
self._tasks.discard(task)
tasksitter = None
logtrace = None