diff --git a/confluent_server/bin/confluent_selfcheck b/confluent_server/bin/confluent_selfcheck index a4315fee..fbfa68ac 100755 --- a/confluent_server/bin/confluent_selfcheck +++ b/confluent_server/bin/confluent_selfcheck @@ -366,7 +366,7 @@ async def main(): for nic in glob.glob("/sys/class/net/*/ifindex"): idx = int(open(nic, "r").read()) nicname = nic.split('/')[-2] - ncfg = netutil.get_nic_config(cfg, args.node, ifidx=idx) + ncfg = await netutil.get_nic_config(cfg, args.node, ifidx=idx) if ncfg['ipv4_address']: targsships.append(ncfg['ipv4_address']) if ncfg['ipv4_address'] or ncfg['ipv4_method'] == 'dhcp': diff --git a/confluent_server/bin/osdeploy b/confluent_server/bin/osdeploy index 488dee27..c73a8089 100644 --- a/confluent_server/bin/osdeploy +++ b/confluent_server/bin/osdeploy @@ -173,7 +173,7 @@ async def local_node_trust_setup(): myprincipals.add(myshortname) if domain: myprincipals.add('{0}.{1}'.format(myshortname, domain)) - for addr in netutil.get_my_addresses(): + for addr in await netutil.get_my_addresses(): addr = socket.inet_ntop(addr[0], addr[1]) myprincipals.add(addr) for pubkey in glob.glob('/etc/ssh/ssh_host_*_key.pub'): diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 87d905ff..d980e09a 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -1097,12 +1097,12 @@ def get_collective_member(name): return _cfgstore.get('collective', {}).get(name, None) -def get_collective_member_by_address(address): +async def get_collective_member_by_address(address): if _cfgstore is None: init() for name in _cfgstore.get('collective', {}): currdrone = _cfgstore['collective'][name] - if netutil.addresses_match(address, currdrone['address']): + if await netutil.addresses_match(address, currdrone['address']): return currdrone diff --git a/confluent_server/confluent/credserver.py b/confluent_server/confluent/credserver.py index 3f8c2b0a..ea137b52 100644 --- a/confluent_server/confluent/credserver.py +++ b/confluent_server/confluent/credserver.py @@ -39,10 +39,10 @@ libc = ctypes.CDLL(ctypes.util.find_library('c')) # 128, len, len, key - sealed key -def address_is_somewhat_trusted(address, nodename, cfm): - if netutil.ip_on_same_subnet(address.split('%')[0], 'fe80::', 64): +async def address_is_somewhat_trusted(address, nodename, cfm): + if await netutil.ip_on_same_subnet(address.split('%')[0], 'fe80::', 64): return True - if netutil.address_is_local(address): + if await netutil.address_is_local(address): return True authnets = cfm.get_node_attributes(nodename, 'trusted.subnets') authnets = authnets.get(nodename, {}).get('trusted.subnets', {}).get('value', None) @@ -51,7 +51,7 @@ def address_is_somewhat_trusted(address, nodename, cfm): for anet in authnet.split(): na, plen = anet.split('/') plen = int(plen) - if netutil.ip_on_same_subnet(address, na, plen): + if await netutil.ip_on_same_subnet(address, na, plen): return True return False @@ -85,7 +85,7 @@ class CredServer(object): apiarmed = apimats.get(nodename, {}).get('deployment.apiarmed', {}).get( 'value', None) if not hmackey: - if not address_is_somewhat_trusted(peer[0], nodename, self.cfm): + if not await address_is_somewhat_trusted(peer[0], nodename, self.cfm): client.close() return if not apiarmed: diff --git a/confluent_server/confluent/discovery/handlers/bmc.py b/confluent_server/confluent/discovery/handlers/bmc.py index c3a2b6bd..eedf9ab5 100644 --- a/confluent_server/confluent/discovery/handlers/bmc.py +++ b/confluent_server/confluent/discovery/handlers/bmc.py @@ -162,7 +162,7 @@ class NodeHandler(generic.NodeHandler): newip = newipinfo[-1][0] if ':' in newip: raise exc.NotImplementedException('IPv6 remote config TODO') - netconfig = netutil.get_nic_config(cfg, nodename, ip=newip) + netconfig = await netutil.get_nic_config(cfg, nodename, ip=newip) plen = netconfig['prefix'] newip = '{0}/{1}'.format(newip, plen) currcfg = ic.get_net_configuration() diff --git a/confluent_server/confluent/discovery/handlers/redfishbmc.py b/confluent_server/confluent/discovery/handlers/redfishbmc.py index 6d494a15..64a20bad 100644 --- a/confluent_server/confluent/discovery/handlers/redfishbmc.py +++ b/confluent_server/confluent/discovery/handlers/redfishbmc.py @@ -300,8 +300,8 @@ class NodeHandler(generic.NodeHandler): else: raise Exception("Unable to detect active NIC of multi-nic bmc") actualnics = [actualnic] - currnet = wc.grab_json_response(actualnics[0]) - netconfig = netutil.get_nic_config(self.configmanager, nodename, ip=newip) + currnet = await wc.grab_json_response(actualnics[0]) + netconfig = await netutil.get_nic_config(self.configmanager, nodename, ip=newip) newconfig = { "Address": newip, "SubnetMask": netutil.cidr_to_mask(netconfig['prefix']), diff --git a/confluent_server/confluent/discovery/handlers/smm.py b/confluent_server/confluent/discovery/handlers/smm.py index c08207b9..b586cb8c 100644 --- a/confluent_server/confluent/discovery/handlers/smm.py +++ b/confluent_server/confluent/discovery/handlers/smm.py @@ -117,7 +117,7 @@ class NodeHandler(bmchandler.NodeHandler): 'ifConfig').find('v4IPAddr').text if currip == smmip: return - netconfig = netutil.get_nic_config(cfg, nodename, ip=smmip) + netconfig = await netutil.get_nic_config(cfg, nodename, ip=smmip) netmask = netutil.cidr_to_mask(netconfig['prefix']) setdata = 'set=ifIndex:0,v4DHCPEnabled:0,v4IPAddr:{0},v4NetMask:{1}'.format(smmip, netmask) gateway = netconfig.get('ipv4_gateway', None) diff --git a/confluent_server/confluent/discovery/handlers/tsm.py b/confluent_server/confluent/discovery/handlers/tsm.py index c46a8d05..dbfb30ca 100644 --- a/confluent_server/confluent/discovery/handlers/tsm.py +++ b/confluent_server/confluent/discovery/handlers/tsm.py @@ -212,7 +212,7 @@ class NodeHandler(generic.NodeHandler): if net['channel_number'] == self.channel: # we have found the interface to potentially manipulate if net['ipv4_address'] != newip: - netconfig = netutil.get_nic_config(self.configmanager, nodename, ip=newip) + netconfig = await netutil.get_nic_config(self.configmanager, nodename, ip=newip) newmask = netutil.cidr_to_mask(netconfig['prefix']) net['ipv4_address'] = newip net['ipv4_subnet'] = newmask diff --git a/confluent_server/confluent/discovery/handlers/xcc.py b/confluent_server/confluent/discovery/handlers/xcc.py index e3b88183..86685eea 100644 --- a/confluent_server/confluent/discovery/handlers/xcc.py +++ b/confluent_server/confluent/discovery/handlers/xcc.py @@ -619,11 +619,11 @@ class NodeHandler(immhandler.NodeHandler): newip = newipinfo[-1][0] if ':' in newip: raise exc.NotImplementedException('IPv6 remote config TODO') - netconfig = netutil.get_nic_config(self.configmanager, nodename, ip=targbmc) + netconfig = await netutil.get_nic_config(self.configmanager, nodename, ip=targbmc) newmask = netutil.cidr_to_mask(netconfig['prefix']) currinfo = await wc.grab_json_response('/api/providers/logoninfo') currip = currinfo.get('items', [{}])[0].get('ipv4_address', '') - curreth1 = wc.grab_json_response('/api/dataset/imm_ethernet') + curreth1 = await wc.grab_json_response('/api/dataset/imm_ethernet') if curreth1: if self.ipaddr.startswith('fe80::'): ipkey = 'ipv6_link_local_address' @@ -634,7 +634,7 @@ class NodeHandler(immhandler.NodeHandler): nic1ip = curreth1.get('items', [{}])[0].get(ipkey, None) if nic1ip != self.ipaddr: # check second nic instead - curreth2 = wc.grab_json_response('/api/dataset/imm_ethernet_2') + curreth2 = await wc.grab_json_response('/api/dataset/imm_ethernet_2') if curreth2 and curreth2.get('items', [{}])[0].get('if_second_port_exist', 0): nic2ip = curreth2.get('items', [{}])[0].get(ipkey + '_2', None) if nic2ip != self.ipaddr: @@ -650,7 +650,7 @@ class NodeHandler(immhandler.NodeHandler): } if netconfig['ipv4_gateway']: statargs['ENET_IPv4GatewayIPAddr'] = netconfig['ipv4_gateway'] - elif not netutil.address_is_local(newip): + elif not await netutil.address_is_local(newip): raise exc.InvalidArgumentException('Will not remotely configure a device with no gateway') if attribsuffix: for currkey in list(statargs): diff --git a/confluent_server/confluent/discovery/protocols/pxe.py b/confluent_server/confluent/discovery/protocols/pxe.py index ee5b43eb..b06c4ce4 100644 --- a/confluent_server/confluent/discovery/protocols/pxe.py +++ b/confluent_server/confluent/discovery/protocols/pxe.py @@ -569,7 +569,7 @@ def get_deployment_profile(node, cfg, cfd=None): staticassigns = {} myipbypeer = {} -def check_reply(node, info, packet, sock, cfg, reqview, addr, requestor): +async def check_reply(node, info, packet, sock, cfg, reqview, addr, requestor): if not requestor: requestor = ('0.0.0.0', None) if requestor[0] == '0.0.0.0' and not info.get('uuid', None): @@ -590,16 +590,16 @@ def check_reply(node, info, packet, sock, cfg, reqview, addr, requestor): if packet['vci'] and packet['vci'].startswith('PXEClient'): log.log({'info': 'IPv6 PXE boot attempt by {0}, but IPv6 PXE is not supported, try IPv6 HTTP boot or IPv4 boot'.format(node)}) return - return reply_dhcp6(node, addr, cfg, packet, cfd, profile, sock) + return await reply_dhcp6(node, addr, cfg, packet, cfd, profile, sock) else: - return reply_dhcp4(node, info, packet, cfg, reqview, httpboot, cfd, profile, sock, requestor) + return await reply_dhcp4(node, info, packet, cfg, reqview, httpboot, cfd, profile, sock, requestor) -def reply_dhcp6(node, addr, cfg, packet, cfd, profile, sock): - myaddrs = netutil.get_my_addresses(addr[-1], socket.AF_INET6) +async def reply_dhcp6(node, addr, cfg, packet, cfd, profile, sock): + myaddrs = await netutil.get_my_addresses(addr[-1], socket.AF_INET6) if not myaddrs: log.log({'info': 'Unable to provide IPv6 boot services to {0}, no viable IPv6 configuration on interface index "{1}" to respond through.'.format(node, addr[-1])}) return - niccfg = netutil.get_nic_config(cfg, node, ifidx=addr[-1], onlyfamily=socket.AF_INET6) + niccfg = await netutil.get_nic_config(cfg, node, ifidx=addr[-1], onlyfamily=socket.AF_INET6) ipv6addr = niccfg.get('ipv6_address', None) ipv6prefix = niccfg.get('ipv6_prefix', None) ipv6method = niccfg.get('ipv6_method', 'static') @@ -679,7 +679,7 @@ def get_my_duid(): _recent_txids = {} -def reply_dhcp4(node, info, packet, cfg, reqview, httpboot, cfd, profile, sock=None, requestor=None): +async def reply_dhcp4(node, info, packet, cfg, reqview, httpboot, cfd, profile, sock=None, requestor=None): replen = 275 # default is going to be 286 # while myipn is describing presumed destination, it's really # vague in the face of aliases, need to convert to ifidx and evaluate @@ -724,7 +724,7 @@ def reply_dhcp4(node, info, packet, cfg, reqview, httpboot, cfd, profile, sock=N relayipa = socket.inet_ntoa(relayip) gateway = None netmask = None - niccfg = netutil.get_nic_config(cfg, node, ifidx=info['netinfo']['ifidx'], relayipn=relayip, onlyfamily=socket.AF_INET) + niccfg = await netutil.get_nic_config(cfg, node, ifidx=info['netinfo']['ifidx'], relayipn=relayip, onlyfamily=socket.AF_INET) nicerr = niccfg.get('error_msg', False) if nicerr: log.log({'error': nicerr}) diff --git a/confluent_server/confluent/discovery/protocols/ssdp.py b/confluent_server/confluent/discovery/protocols/ssdp.py index aa31ba6b..fc89aaaa 100644 --- a/confluent_server/confluent/discovery/protocols/ssdp.py +++ b/confluent_server/confluent/discovery/protocols/ssdp.py @@ -293,7 +293,7 @@ async def snoop(handler, byehandler=None, protocol=None, uuidlookup=None): msecs = int(currtime * 1000 % 1000) reply = 'HTTP/1.1 200 OK\r\nNODENAME: {0}\r\nCURRTIME: {1}\r\nCURRMSECS: {2}\r\n'.format(node, seconds, msecs) theip = peer[0].split('%', 1)[0] - if netutil.ip_on_same_subnet(theip, 'fe80::', 64): + if await netutil.ip_on_same_subnet(theip, 'fe80::', 64): if '%' in peer[0]: ifidx = peer[0].split('%', 1)[1] iface = await cloop.getaddrinfo(peer[0], 0, socket.AF_INET6, socket.SOCK_DGRAM)[0][-1][-1] @@ -301,11 +301,11 @@ async def snoop(handler, byehandler=None, protocol=None, uuidlookup=None): ifidx = '{}'.format(peer[-1]) iface = peer[-1] reply += 'MGTIFACE: {0}\r\n'.format(ifidx) - ncfg = netutil.get_nic_config( + ncfg = await netutil.get_nic_config( cfg, node, ifidx=iface) if ncfg.get('matchesnodename', None): reply += 'DEFAULTNET: 1\r\n' - elif not netutil.address_is_local(peer[0]): + elif not await netutil.address_is_local(peer[0]): continue if not isinstance(reply, bytes): reply = reply.encode('utf8') diff --git a/confluent_server/confluent/neighutil.py b/confluent_server/confluent/neighutil.py index 9ab17189..1f21d077 100644 --- a/confluent_server/confluent/neighutil.py +++ b/confluent_server/confluent/neighutil.py @@ -101,7 +101,7 @@ async def get_hwaddr(ipaddr): await _update_neigh() updated = True hwaddr = neightable.get(ipaddr, None) - if not hwaddr and not netutil.ipn_is_local(ipaddr): + if not hwaddr and not await netutil.ipn_is_local(ipaddr): hwaddr = False if hwaddr == None and not updated: await _update_neigh() diff --git a/confluent_server/confluent/netutil.py b/confluent_server/confluent/netutil.py index e3fc5b3f..41fb3354 100644 --- a/confluent_server/confluent/netutil.py +++ b/confluent_server/confluent/netutil.py @@ -16,6 +16,7 @@ # this will implement noderange grammar +import asyncio import confluent.exceptions as exc import codecs try: @@ -24,13 +25,10 @@ except ImportError: psutil = None import netifaces import struct -import eventlet.green.socket as socket -import eventlet.support.greendns import os -getaddrinfo = eventlet.support.greendns.getaddrinfo +import socket +import confluent.tasks as tasks -eventlet.support.greendns.resolver.clear() -eventlet.support.greendns.resolver._resolver.lifetime = 1 def msg_align(len): return (len + 3) & ~3 @@ -74,18 +72,18 @@ def ipn_on_same_subnet(fam, first, second, prefix): second = struct.unpack('!I', second)[0] return (first & mask == second & mask) -def ip_on_same_subnet(first, second, prefix): +async def ip_on_same_subnet(first, second, prefix): if first.startswith('::ffff:') and '.' in first: first = first.replace('::ffff:', '') if second.startswith('::ffff:') and '.' in second: second = second.replace('::ffff:', '') - addrinf = socket.getaddrinfo(first, None, 0, socket.SOCK_STREAM)[0] + addrinf = (await asyncio.get_running_loop().getaddrinfo(first, None, 0, socket.SOCK_STREAM))[0] fam = addrinf[0] if '%' in addrinf[-1][0]: return False ip = socket.inet_pton(fam, addrinf[-1][0]) ip = int(codecs.encode(bytes(ip), 'hex'), 16) - addrinf = socket.getaddrinfo(second, None, 0, socket.SOCK_STREAM)[0] + addrinf = (await asyncio.get_running_loop().getaddrinfo(second, None, 0, socket.SOCK_STREAM))[0] if fam != addrinf[0]: return False txtaddr = addrinf[-1][0].split('%')[0] @@ -101,10 +99,10 @@ def ip_on_same_subnet(first, second, prefix): return ip & mask == oip & mask -def ipn_is_local(ipn): +async def ipn_is_local(ipn): if len(ipn) > 5 and ipn.startswith(b'\xfe\x80'): return True - for addr in get_my_addresses(): + for addr in await get_my_addresses(): if len(addr[1]) != len(ipn): continue if ipn_on_same_subnet(addr[0], ipn, addr[1], addr[2]): @@ -112,25 +110,25 @@ def ipn_is_local(ipn): return False -def address_is_local(address): +async def address_is_local(address): if psutil: ifas = psutil.net_if_addrs() for iface in ifas: for addr in ifas[iface]: if addr.family in (socket.AF_INET, socket.AF_INET6): cidr = mask_to_cidr(addr.netmask) - if ip_on_same_subnet(addr.address, address, cidr): + if await ip_on_same_subnet(addr.address, address, cidr): return True else: for iface in netifaces.interfaces(): for i4 in netifaces.ifaddresses(iface).get(2, []): cidr = mask_to_cidr(i4['netmask']) - if ip_on_same_subnet(i4['addr'], address, cidr): + if await ip_on_same_subnet(i4['addr'], address, cidr): return True for i6 in netifaces.ifaddresses(iface).get(10, []): cidr = int(i6['netmask'].split('/')[1]) laddr = i6['addr'].split('%')[0] - if ip_on_same_subnet(laddr, address, cidr): + if await ip_on_same_subnet(laddr, address, cidr): return True return False @@ -146,7 +144,7 @@ def _rebuildidxmap(): pass -def myiptonets(svrip): +async def myiptonets(svrip): fam = socket.AF_INET if ':' in svrip: fam = socket.AF_INET6 @@ -159,7 +157,7 @@ def myiptonets(svrip): continue addr = addr.address addr = addr.split('%')[0] - if addresses_match(addr, svrip): + if await addresses_match(addr, svrip): relevantnic = iface break else: @@ -170,7 +168,7 @@ def myiptonets(svrip): for addr in netifaces.ifaddresses(iface).get(fam, []): addr = addr.get('addr', '') addr = addr.split('%')[0] - if addresses_match(addr, svrip): + if await addresses_match(addr, svrip): relevantnic = iface break else: @@ -220,13 +218,12 @@ class NetManager(object): self.consumednames4 = set([]) self.consumednames6 = set([]) - @property - def allmyaddrs(self): + async def allmyaddrs(self): if not self._allmyaddrs: - self._allmyaddrs = get_my_addresses() + self._allmyaddrs = await get_my_addresses() return self._allmyaddrs - def process_attribs(self, netname, attribs): + async def process_attribs(self, netname, attribs): self.myattribs[netname] = {} ipv4addr = None ipv6addr = None @@ -255,7 +252,7 @@ class NetManager(object): if ipv4addr: try: luaddr = ipv4addr.split('/', 1)[0] - for ai in socket.getaddrinfo(luaddr, 0, socket.AF_INET, socket.SOCK_STREAM): + for ai in await asyncio.get_running_loop().getaddrinfo(luaddr, 0, socket.AF_INET, socket.SOCK_STREAM): ipv4addr.replace(luaddr, ai[-1][0]) except socket.gaierror: pass @@ -263,7 +260,7 @@ class NetManager(object): currname = attribs.get('hostname', self.node).split()[0] if currname and currname not in self.consumednames4: try: - for ai in socket.getaddrinfo(currname, 0, socket.AF_INET, socket.SOCK_STREAM): + for ai in await asyncio.get_running_loop().getaddrinfo(currname, 0, socket.AF_INET, socket.SOCK_STREAM): ipv4addr = ai[-1][0] self.consumednames4.add(currname) except socket.gaierror: @@ -280,7 +277,7 @@ class NetManager(object): ipv6addr = attribs.get('ipv6_address', None) if ipv6addr: try: - for ai in socket.getaddrinfo(ipv6addr, 0, socket.AF_INET6, socket.SOCK_STREAM): + for ai in await asyncio.get_running_loop().getaddrinfo(ipv6addr, 0, socket.AF_INET6, socket.SOCK_STREAM): ipv6addr = ai[-1][0] except socket.gaierror: pass @@ -288,7 +285,7 @@ class NetManager(object): currname = attribs.get('hostname', self.node).split()[0] if currname and currname not in self.consumednames6: try: - for ai in socket.getaddrinfo(currname, 0, socket.AF_INET6, socket.SOCK_STREAM): + for ai in await asyncio.get_running_loop().getaddrinfo(currname, 0, socket.AF_INET6, socket.SOCK_STREAM): ipv6addr = ai[-1][0] self.consumednames6.add(currname) except socket.gaierror: @@ -327,7 +324,7 @@ class NetManager(object): if '/' not in myattribs.get('ipv6_address', '/'): ipn = socket.inet_pton(socket.AF_INET6, myattribs['ipv6_address']) plen = 64 - for addr in self.allmyaddrs: + for addr in await self.allmyaddrs(): if addr[0] != socket.AF_INET6: continue if ipn_on_same_subnet(addr[0], ipn, addr[1], addr[2]): @@ -336,7 +333,7 @@ class NetManager(object): if '/' not in myattribs.get('ipv4_address', '/'): ipn = socket.inet_pton(socket.AF_INET, myattribs['ipv4_address']) plen = 16 - for addr in self.allmyaddrs: + for addr in await self.allmyaddrs(): if addr[0] != socket.AF_INET: continue if ipn_on_same_subnet(addr[0], ipn, addr[1], addr[2]): @@ -346,8 +343,8 @@ class NetManager(object): myattribs['current_nic'] = False -def get_flat_net_config(configmanager, node): - fnc = get_full_net_config(configmanager, node) +async def get_flat_net_config(configmanager, node): + fnc = await get_full_net_config(configmanager, node) dft = fnc.get('default', {}) if dft: ret = [dft] @@ -364,7 +361,7 @@ def add_netmask(ncfg): plen = ncfg['ipv4_address'].split('/', 1)[1] ncfg['ipv4_netmask'] = cidr_to_mask(int(plen)) -def get_full_net_config(configmanager, node, serverip=None): +async def get_full_net_config(configmanager, node, serverip=None): cfd = configmanager.get_node_attributes(node, ['net.*']) cfd = cfd.get(node, {}) bmc = configmanager.get_node_attributes( @@ -374,11 +371,11 @@ def get_full_net_config(configmanager, node, serverip=None): bmc6 = None if bmc: try: - bmc4 = socket.getaddrinfo(bmc, 0, socket.AF_INET, socket.SOCK_DGRAM)[0][-1][0] + bmc4 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET, socket.SOCK_DGRAM))[0][-1][0] except Exception: pass try: - bmc6 = socket.getaddrinfo(bmc, 0, socket.AF_INET6, socket.SOCK_DGRAM)[0][-1][0] + bmc6 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET6, socket.SOCK_DGRAM))[0][-1][0] except Exception: pass attribs = {} @@ -400,16 +397,16 @@ def get_full_net_config(configmanager, node, serverip=None): attribs[iface][attrib] = val myaddrs = [] if serverip: - myaddrs = get_addresses_by_serverip(serverip) + myaddrs = await get_addresses_by_serverip(serverip) nm = NetManager(myaddrs, node, configmanager) defaultnic = {} - ppool = eventlet.greenpool.GreenPool(64) + ppool = tasks.TaskPool() if None in attribs: - ppool.spawn(nm.process_attribs, None, attribs[None]) + ppool.schedule(nm.process_attribs, None, attribs[None]) del attribs[None] for netname in sorted(attribs): - ppool.spawn(nm.process_attribs, netname, attribs[netname]) - ppool.waitall() + ppool.schedule(nm.process_attribs, netname, attribs[netname]) + await ppool.waitall() for iface in list(nm.myattribs): if bmc4 and nm.myattribs[iface].get('ipv4_address', None) == bmc4: del nm.myattribs[iface] @@ -477,7 +474,7 @@ def noneify(cfgdata): # that mac address # the ip as reported by recvmsg to match the subnet of that net.* interface # if switch and port available, that should match. -def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None, +async def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None, serverip=None, relayipn=b'\x00\x00\x00\x00', clientip=None, onlyfamily=None): """Fetch network configuration parameters for a nic @@ -533,12 +530,12 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None, if bmc: try: if onlyfamily in (0, socket.AF_INET): - bmc4 = socket.getaddrinfo(bmc, 0, socket.AF_INET, socket.SOCK_DGRAM)[0][-1][0] + bmc4 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET, socket.SOCK_DGRAM))[0][-1][0] except Exception: pass try: if onlyfamily in (0, socket.AF_INET6): - bmc6 = socket.getaddrinfo(bmc, 0, socket.AF_INET6, socket.SOCK_DGRAM)[0][-1][0] + bmc6 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET6, socket.SOCK_DGRAM))[0][-1][0] except Exception: pass cfgbyname = {} @@ -564,7 +561,7 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None, myaddrs = [] if ifidx is not None: dhcprequested = False - myaddrs = get_my_addresses(ifidx, family=onlyfamily) + myaddrs = await get_my_addresses(ifidx, family=onlyfamily) v4broken = True v6broken = True for addr in myaddrs: @@ -579,7 +576,7 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None, isremote = False if serverip is not None: dhcprequested = False - myaddrs = get_addresses_by_serverip(serverip) + myaddrs = await get_addresses_by_serverip(serverip) if serverfam == socket.AF_INET6 and ipn_on_same_subnet(serverfam, serveripn, llaipn, 64): isremote = False elif clientfam: @@ -597,13 +594,13 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None, ip6bynodename = None try: if onlyfamily in (socket.AF_INET, 0): - for addr in socket.getaddrinfo(node, 0, socket.AF_INET, socket.SOCK_DGRAM): + for addr in await asyncio.get_running_loop().getaddrinfo(node, 0, socket.AF_INET, socket.SOCK_DGRAM): ipbynodename = addr[-1][0] except socket.gaierror: pass try: if onlyfamily in (socket.AF_INET6, 0): - for addr in socket.getaddrinfo(node, 0, socket.AF_INET6, socket.SOCK_DGRAM): + for addr in await asyncio.get_running_loop().getaddrinfo(node, 0, socket.AF_INET6, socket.SOCK_DGRAM): ip6bynodename = addr[-1][0] except socket.gaierror: pass @@ -650,7 +647,7 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None, if bmc6 and candip == bmc6: continue try: - for inf in socket.getaddrinfo(candip, 0, fam, socket.SOCK_STREAM): + for inf in await asyncio.get_running_loop().getaddrinfo(candip, 0, fam, socket.SOCK_STREAM): candipn = socket.inet_pton(fam, inf[-1][0]) if ((isremote and ipn_on_same_subnet(fam, clientipn, candipn, int(candprefix))) or ipn_on_same_subnet(fam, bootsvrip, candipn, prefix)): @@ -669,7 +666,7 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None, except Exception as e: cfgdata['error_msg'] = "Error trying to evaluate net.*ipv4_address attribute value '{0}' on {1}: {2}".format(candip, node, str(e)) elif candgw: - for inf in socket.getaddrinfo(candgw, 0, fam, socket.SOCK_STREAM): + for inf in await asyncio.get_running_loop().getaddrinfo(candgw, 0, fam, socket.SOCK_STREAM): candgwn = socket.inet_pton(fam, inf[-1][0]) if ipn_on_same_subnet(fam, bootsvrip, candgwn, prefix): candgws.append((fam, candgwn, prefix)) @@ -731,7 +728,7 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None, cfgdata['ipv{}_gateway'.format(nver)] = socket.inet_ntop(fam, candgwn) return noneify(cfgdata) if ip is not None: - for prefixinfo in get_prefix_len_for_ip(ip): + for prefixinfo in await get_prefix_len_for_ip(ip): fam, prefix = prefixinfo ip = ip.split('/', 1)[0] if fam == socket.AF_INET: @@ -747,14 +744,14 @@ def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None, if gw is None or not gw: continue gwn = socket.inet_pton(fam, gw) - ip = socket.getaddrinfo(ip, 0, proto=socket.IPPROTO_TCP, family=fam)[-1][-1][0] + ip = (await asyncio.get_running_loop().getaddrinfo(ip, 0, proto=socket.IPPROTO_TCP, family=fam))[-1][-1][0] ipn = socket.inet_pton(fam, ip) if ipn_on_same_subnet(fam, ipn, gwn, prefix): cfgdata['ipv{}_gateway'.format(nver)] = gw break return noneify(cfgdata) -def get_addresses_by_serverip(serverip): +async def get_addresses_by_serverip(serverip): if '.' in serverip: fam = socket.AF_INET elif ':' in serverip: @@ -763,15 +760,15 @@ def get_addresses_by_serverip(serverip): raise ValueError('"{0}" is not a valid ip argument'.format(serverip)) ipbytes = socket.inet_pton(fam, serverip) if ipbytes[:8] == b'\xfe\x80\x00\x00\x00\x00\x00\x00': - myaddrs = get_my_addresses(matchlla=ipbytes) + myaddrs = await get_my_addresses(matchlla=ipbytes) else: - myaddrs = [x for x in get_my_addresses() if x[1] == ipbytes] + myaddrs = [x for x in await get_my_addresses() if x[1] == ipbytes] return myaddrs nlhdrsz = struct.calcsize('IHHII') ifaddrsz = struct.calcsize('BBBBI') -def get_my_addresses(idx=0, family=0, matchlla=None): +async def get_my_addresses(idx=0, family=0, matchlla=None): # RTM_GETADDR = 22 # nlmsghdr struct: u32 len, u16 type, u16 flags, u32 seq, u32 pid nlhdr = struct.pack('IHHII', nlhdrsz + ifaddrsz, 22, 0x301, 0, 0) @@ -779,10 +776,10 @@ def get_my_addresses(idx=0, family=0, matchlla=None): ifaddrmsg = struct.pack('BBBBI', family, 0, 0, 0, idx) s = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, socket.NETLINK_ROUTE) s.bind((0, 0)) - s.sendall(nlhdr + ifaddrmsg) + await asyncio.get_event_loop().sock_sendall(s, nlhdr + ifaddrmsg) addrs = [] while True: - pdata = s.recv(65536) + pdata = await asyncio.get_event_loop().sock_recv(s, 65536) v = memoryview(pdata) if struct.unpack('H', v[4:6])[0] == 3: # netlink done message break @@ -798,7 +795,7 @@ def get_my_addresses(idx=0, family=0, matchlla=None): if rtalen < 4: break if rta[4:rtalen].tobytes() == matchlla: - return get_my_addresses(idx=ridx) + return await get_my_addresses(idx=ridx) rta = rta[msg_align(rtalen):] elif (ridx == idx or not idx) and scope == 0: rta = v[nlhdrsz+ifaddrsz:length] @@ -813,14 +810,14 @@ def get_my_addresses(idx=0, family=0, matchlla=None): return addrs -def get_prefix_len_for_ip(ip): +async def get_prefix_len_for_ip(ip): plen = None if '/' in ip: ip, plen = ip.split('/', 1) plen = int(plen) - myaddrs = get_my_addresses() + myaddrs = await get_my_addresses() found = False - for inf in socket.getaddrinfo(ip, 0, 0, socket.SOCK_DGRAM): + for inf in await asyncio.get_running_loop().getaddrinfo(ip, 0, 0, socket.SOCK_DGRAM): if plen: yield (inf[0], plen) return @@ -833,7 +830,7 @@ def get_prefix_len_for_ip(ip): if not found: raise exc.NotImplementedException("Non local addresses not supported") -def addresses_match(addr1, addr2): +async def addresses_match(addr1, addr2): """Check two network addresses for similarity Is it zero padded in one place, not zero padded in another? Is one place by name and another by IP?? @@ -846,12 +843,12 @@ def addresses_match(addr1, addr2): """ if '%' in addr1 or '%' in addr2: return False - for addrinfo in socket.getaddrinfo(addr1, 0, 0, socket.SOCK_STREAM): + for addrinfo in await asyncio.get_running_loop().getaddrinfo(addr1, 0, 0, socket.SOCK_STREAM): rootaddr1 = socket.inet_pton(addrinfo[0], addrinfo[4][0]) if addrinfo[0] == socket.AF_INET6 and rootaddr1[:12] == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff': # normalize to standard IPv4 rootaddr1 = rootaddr1[-4:] - for otherinfo in socket.getaddrinfo(addr2, 0, 0, socket.SOCK_STREAM): + for otherinfo in await asyncio.get_running_loop().getaddrinfo(addr2, 0, 0, socket.SOCK_STREAM): otheraddr = socket.inet_pton(otherinfo[0], otherinfo[4][0]) if otherinfo[0] == socket.AF_INET6 and otheraddr[:12] == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff': otheraddr = otheraddr[-4:] diff --git a/confluent_server/confluent/plugins/deployment/identimage.py b/confluent_server/confluent/plugins/deployment/identimage.py index 27d6848a..9801da1f 100644 --- a/confluent_server/confluent/plugins/deployment/identimage.py +++ b/confluent_server/confluent/plugins/deployment/identimage.py @@ -53,10 +53,10 @@ async def create_ident_image(node, configmanager): # restricted by 'managercandidates' ident['deploy_servers'] = [] ident['confluent_uuid'] = cfm.get_global('confluent_uuid') - for myaddr in netutil.get_my_addresses(): + for myaddr in await netutil.get_my_addresses(): myaddr = socket.inet_ntop(myaddr[0], myaddr[1]) ident['deploy_servers'].append(myaddr) - ident['net_cfgs'] = netutil.get_flat_net_config(configmanager, node) + ident['net_cfgs'] = await netutil.get_flat_net_config(configmanager, node) with open(os.path.join(tmpd, 'cnflnt.yml'), 'w') as yamlout: yaml.safe_dump(ident, yamlout, default_flow_style=False) with open(os.path.join(tmpd, 'cnflnt.jsn'), 'w') as jsonout: diff --git a/confluent_server/confluent/selfservice.py b/confluent_server/confluent/selfservice.py index ec3f5b02..86abab70 100644 --- a/confluent_server/confluent/selfservice.py +++ b/confluent_server/confluent/selfservice.py @@ -51,7 +51,7 @@ def listdump(input): return retval -def get_extra_names(nodename, cfg, myip=None, preferadjacent=False, addlocalhost=True): +async def get_extra_names(nodename, cfg, myip=None, preferadjacent=False, addlocalhost=True): if addlocalhost: names = set(['127.0.0.1', '::1', 'localhost', 'localhost.localdomain']) else: @@ -71,8 +71,8 @@ def get_extra_names(nodename, cfg, myip=None, preferadjacent=False, addlocalhost if domain and domain not in currname: names.add('{0}.{1}'.format(currname, domain)) if myip: - ncfgs = [netutil.get_nic_config(cfg, nodename, serverip=myip)] - fncfg = netutil.get_full_net_config(cfg, nodename, serverip=myip) + ncfgs = [await netutil.get_nic_config(cfg, nodename, serverip=myip)] + fncfg = await netutil.get_full_net_config(cfg, nodename, serverip=myip) ncfgs.append(fncfg.get('default', {})) for ent in fncfg.get('extranets', []): ncfgs.append(fncfg['extranets'][ent]) @@ -82,7 +82,7 @@ def get_extra_names(nodename, cfg, myip=None, preferadjacent=False, addlocalhost for nip in (ncfg.get('ipv4_address', None), ncfg.get('ipv6_address', None)): if nip: nip = nip.split('/', 1)[0] - if not preferadjacent or netutil.address_is_local(nip): + if not preferadjacent or await netutil.address_is_local(nip): names.add(nip) addall = False else: @@ -266,7 +266,7 @@ async def handle_request(req, make_response, mimetype): bmcaddr = await asyncio.get_event_loop().getaddrinfo(bmcaddr, 0)[0] bmcaddr = bmcaddr[-1][0] if '.' in bmcaddr: # ipv4 is allowed - netconfig = netutil.get_nic_config(cfg, nodename, ip=bmcaddr) + netconfig = await netutil.get_nic_config(cfg, nodename, ip=bmcaddr) res['bmcipv4'] = bmcaddr res['prefixv4'] = netconfig['prefix'] res['bmcgw'] = netconfig.get('ipv4_gateway', None) @@ -285,7 +285,7 @@ async def handle_request(req, make_response, mimetype): mrsp = await make_response(mimetype, 200, 'OK') await mrsp.write(dumper(rsp)) elif reqpath == '/self/netcfg': - ncfg = netutil.get_full_net_config(cfg, nodename, myip) + ncfg = await netutil.get_full_net_config(cfg, nodename, myip) mrsp = await make_response(mimetype, 200, 'OK') await mrsp.write(dumper(ncfg)) elif reqpath in ('/self/deploycfg', '/self/deploycfg2'): @@ -296,9 +296,9 @@ async def handle_request(req, make_response, mimetype): except ValueError: with open('/sys/class/net/{}/ifindex'.format(nicname), 'r') as nici: ifidx = int(nici.read()) - ncfg = netutil.get_nic_config(cfg, nodename, ifidx=ifidx) + ncfg = await netutil.get_nic_config(cfg, nodename, ifidx=ifidx) else: - ncfg = netutil.get_nic_config(cfg, nodename, serverip=myip, clientip=clientip) + ncfg = await netutil.get_nic_config(cfg, nodename, serverip=myip, clientip=clientip) if reqpath == '/self/deploycfg': for key in list(ncfg): if 'v6' in key: @@ -429,12 +429,12 @@ async def handle_request(req, make_response, mimetype): mrsp = await make_response(mimetype, 500, 'Unconfigured') await msrp.write(b'CA is not configured on this system (run ...)') return mrsp - pals = get_extra_names(nodename, cfg, myip) + pals = await get_extra_names(nodename, cfg, myip) cert = sshutil.sign_host_key(reqbody, nodename, pals) mrsp = await make_response('text/plain', 200, 'OK') await mrsp.write(cert) elif reqpath == '/self/nodelist': - nodes, _ = get_cluster_list(nodename, cfg) + nodes, _ = await get_cluster_list(nodename, cfg) if isgeneric: mrsp = await make_response('text/plain', 200, 'OK') for node in util.natural_sort(nodes): @@ -538,7 +538,7 @@ async def handle_request(req, make_response, mimetype): return elif reqpath.startswith('/self/remotesyncfiles'): if 'POST' == operation: - pals = get_extra_names(nodename, cfg, myip, preferadjacent=True, addlocalhost=False) + pals = await get_extra_names(nodename, cfg, myip, preferadjacent=True, addlocalhost=False) if clientip in pals: pals = [clientip] result = syncfiles.start_syncfiles( @@ -622,7 +622,7 @@ def get_scriptlist(scriptcat, cfg, nodename, pathtemplate): return slist, profile -def get_cluster_list(nodename=None, cfg=None): +async def get_cluster_list(nodename=None, cfg=None): if cfg is None: cfg = configmanager.ConfigManager(None) nodes = None @@ -642,7 +642,7 @@ def get_cluster_list(nodename=None, cfg=None): domaininfo = cfg.get_node_attributes(node, 'dns.domain') domain = domaininfo.get(node, {}).get('dns.domain', {}).get( 'value', None) - for extraname in get_extra_names(node, cfg): + for extraname in await get_extra_names(node, cfg): nodes.add(extraname) if autonodes: for mgr in configmanager.list_collective(): diff --git a/confluent_server/confluent/tasks.py b/confluent_server/confluent/tasks.py index 9d8548a0..8ba3e8b8 100644 --- a/confluent_server/confluent/tasks.py +++ b/confluent_server/confluent/tasks.py @@ -74,6 +74,13 @@ class TaskPool: self._tasks.add(currtask) currtask.add_done_callback(self._done_callback) return tholder + + async def waitall(self): + while self._tasks or self._pending: + if self._tasks: + done, _ = await asyncio.wait(self._tasks, return_when=asyncio.FIRST_COMPLETED) + for task in done: + self._tasks.discard(task) tasksitter = None logtrace = None