From 2c75571a842fae7dbad0ceec19155257460992ba Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 4 Mar 2026 10:47:33 -0500 Subject: [PATCH] Fixes to allow a test deployment to complete under asyncio --- confluent_server/bin/osdeploy | 2 +- .../confluent/discovery/protocols/pxe.py | 14 +- confluent_server/confluent/httpapi.py | 6 +- confluent_server/confluent/netutil.py | 36 ++-- confluent_server/confluent/selfservice.py | 164 ++++++------------ confluent_server/confluent/sshutil.py | 12 +- confluent_server/confluent/syncfiles.py | 14 +- confluent_server/confluent/tasks.py | 2 +- confluent_server/confluent/util.py | 4 +- 9 files changed, 106 insertions(+), 148 deletions(-) diff --git a/confluent_server/bin/osdeploy b/confluent_server/bin/osdeploy index 9ca1a808..670d0fee 100644 --- a/confluent_server/bin/osdeploy +++ b/confluent_server/bin/osdeploy @@ -391,7 +391,7 @@ async def initialize(cmdset): sys.exit(rc) if not didsomething and (cmdset.k or cmdset.l or cmdset.g or cmdset.p): if cmdset.g: - updateboot('genesis-x86_64') + await updateboot('genesis-x86_64') sys.exit(0) if not didsomething: sys.stderr.write('Nothing was done, use initialize -i for ' diff --git a/confluent_server/confluent/discovery/protocols/pxe.py b/confluent_server/confluent/discovery/protocols/pxe.py index a026d38c..4b9b8bfc 100644 --- a/confluent_server/confluent/discovery/protocols/pxe.py +++ b/confluent_server/confluent/discovery/protocols/pxe.py @@ -383,7 +383,7 @@ def new_dhcp_packet(handler, nodeguess, cfg, net4): recv = ipfromint(recv) rqv = memoryview(data) if rqv[0] == 1: - process_dhcp4req(handler, nodeguess, cfg, net4, idx, recv, rqv, client) + tasks.spawn(process_dhcp4req(handler, nodeguess, cfg, net4, idx, recv, rqv, client)) def new_dhcp6_packet(handler, net6, cfg, nodeguess): @@ -466,9 +466,9 @@ async def process_dhcp6req(handler, rqv, addr, net, cfg, nodeguess): if ignoredisco.get(mac, 0) + 90 < time.time(): ignoredisco[mac] = time.time() handler(info) - consider_discover(info, req, net, cfg, None, nodeguess, addr) + await consider_discover(info, req, net, cfg, None, nodeguess, addr) -def process_dhcp4req(handler, nodeguess, cfg, net4, idx, recv, rqv, client): +async def process_dhcp4req(handler, nodeguess, cfg, net4, idx, recv, rqv, client): rq = bytearray(rqv) addrlen = rq[2] if addrlen > 16 or addrlen == 0: @@ -511,7 +511,7 @@ def process_dhcp4req(handler, nodeguess, cfg, net4, idx, recv, rqv, client): and time.time() > ignoredisco.get(netaddr, 0) + 90): ignoredisco[netaddr] = time.time() handler(info) - consider_discover(info, rqinfo, net4, cfg, rqv, nodeguess, requestor=client) + await consider_discover(info, rqinfo, net4, cfg, rqv, nodeguess, requestor=client) @@ -942,13 +942,13 @@ def ack_request(pkt, rq, info, sock=None, requestor=None): else: send_raw_packet(repview, len(rply), rq, info) -def consider_discover(info, packet, sock, cfg, reqview, nodeguess, addr=None, requestor=None): +async def consider_discover(info, packet, sock, cfg, reqview, nodeguess, addr=None, requestor=None): if packet.get(53, None) == b'\x03': ack_request(packet, reqview, info, sock, requestor) elif info.get('hwaddr', None) in macmap: # and info.get('uuid', None): - check_reply(macmap[info['hwaddr']], info, packet, sock, cfg, reqview, addr, requestor) + await check_reply(macmap[info['hwaddr']], info, packet, sock, cfg, reqview, addr, requestor) elif info.get('uuid', None) in uuidmap: - check_reply(uuidmap[info['uuid']], info, packet, sock, cfg, reqview, addr, requestor) + await check_reply(uuidmap[info['uuid']], info, packet, sock, cfg, reqview, addr, requestor) elif packet.get(53, None) == b'\x03': ack_request(packet, reqview, info, sock, requestor) elif info.get('uuid', None) and info.get('hwaddr', None): diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index 48cec1cd..feb9bc5d 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -632,7 +632,7 @@ async def resourcehandler(request): # any 'yield' needs to become a write to the streamresponse #TODO:asyncmerge: Replace /confluent-api with '' in path # Needs testing for confluent header names with golang clients - async def make_response(mimetype, status=200, reason=None, headers=None, cookies=None): + async def make_response(mimetype, status=200, reason=None, headers=None, cookies=None, body=None): rspheaders = { 'Cache-Control': 'no-store', 'Pragma': 'no-cache', @@ -650,6 +650,10 @@ async def resourcehandler(request): rsp.cookies.update(cookies) rsp.content_type = mimetype await rsp.prepare(request) + if body: + if isinstance(body, str): + body = body.encode('utf8') + await rsp.write(body) return rsp try: if 'Sec-WebSocket-Version' in request.headers: diff --git a/confluent_server/confluent/netutil.py b/confluent_server/confluent/netutil.py index 375a1222..9e2f1a05 100644 --- a/confluent_server/confluent/netutil.py +++ b/confluent_server/confluent/netutil.py @@ -77,13 +77,13 @@ async def ip_on_same_subnet(first, second, prefix): first = first.replace('::ffff:', '') if second.startswith('::ffff:') and '.' in second: second = second.replace('::ffff:', '') - addrinf = (await asyncio.get_running_loop().getaddrinfo(first, None, 0, socket.SOCK_STREAM))[0] + addrinf = (await asyncio.get_running_loop().getaddrinfo(first, 0, type=socket.SOCK_STREAM))[0] fam = addrinf[0] if '%' in addrinf[-1][0]: return False ip = socket.inet_pton(fam, addrinf[-1][0]) ip = int(codecs.encode(bytes(ip), 'hex'), 16) - addrinf = (await asyncio.get_running_loop().getaddrinfo(second, None, 0, socket.SOCK_STREAM))[0] + addrinf = (await asyncio.get_running_loop().getaddrinfo(second, 0, type=socket.SOCK_STREAM))[0] if fam != addrinf[0]: return False txtaddr = addrinf[-1][0].split('%')[0] @@ -252,7 +252,7 @@ class NetManager(object): if ipv4addr: try: luaddr = ipv4addr.split('/', 1)[0] - for ai in await asyncio.get_running_loop().getaddrinfo(luaddr, 0, socket.AF_INET, socket.SOCK_STREAM): + for ai in await asyncio.get_running_loop().getaddrinfo(luaddr, 0, family=socket.AF_INET, type=socket.SOCK_STREAM): ipv4addr.replace(luaddr, ai[-1][0]) except socket.gaierror: pass @@ -260,7 +260,7 @@ class NetManager(object): currname = attribs.get('hostname', self.node).split()[0] if currname and currname not in self.consumednames4: try: - for ai in await asyncio.get_running_loop().getaddrinfo(currname, 0, socket.AF_INET, socket.SOCK_STREAM): + for ai in await asyncio.get_running_loop().getaddrinfo(currname, 0, family=socket.AF_INET, type=socket.SOCK_STREAM): ipv4addr = ai[-1][0] self.consumednames4.add(currname) except socket.gaierror: @@ -277,7 +277,7 @@ class NetManager(object): ipv6addr = attribs.get('ipv6_address', None) if ipv6addr: try: - for ai in await asyncio.get_running_loop().getaddrinfo(ipv6addr, 0, socket.AF_INET6, socket.SOCK_STREAM): + for ai in await asyncio.get_running_loop().getaddrinfo(ipv6addr, 0, family=socket.AF_INET6, type=socket.SOCK_STREAM): ipv6addr = ai[-1][0] except socket.gaierror: pass @@ -285,7 +285,7 @@ class NetManager(object): currname = attribs.get('hostname', self.node).split()[0] if currname and currname not in self.consumednames6: try: - for ai in await asyncio.get_running_loop().getaddrinfo(currname, 0, socket.AF_INET6, socket.SOCK_STREAM): + for ai in await asyncio.get_running_loop().getaddrinfo(currname, 0, family=socket.AF_INET6, type=socket.SOCK_STREAM): ipv6addr = ai[-1][0] self.consumednames6.add(currname) except socket.gaierror: @@ -371,11 +371,11 @@ async def get_full_net_config(configmanager, node, serverip=None): bmc6 = None if bmc: try: - bmc4 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET, socket.SOCK_DGRAM))[0][-1][0] + bmc4 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, family=socket.AF_INET, type=socket.SOCK_DGRAM))[0][-1][0] except Exception: pass try: - bmc6 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET6, socket.SOCK_DGRAM))[0][-1][0] + bmc6 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, family=socket.AF_INET6, type=socket.SOCK_DGRAM))[0][-1][0] except Exception: pass attribs = {} @@ -419,7 +419,7 @@ async def get_full_net_config(configmanager, node, serverip=None): add_netmask(retattrs['default']) del nm.myattribs[None] else: - nnc = get_nic_config(configmanager, node, serverip=serverip) + nnc = await get_nic_config(configmanager, node, serverip=serverip) if nnc.get('ipv4_address', None): defaultnic['ipv4_address'] = '{}/{}'.format(nnc['ipv4_address'], nnc['prefix']) if nnc.get('ipv4_gateway', None): @@ -530,12 +530,12 @@ async def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None, if bmc: try: if onlyfamily in (0, socket.AF_INET): - bmc4 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET, socket.SOCK_DGRAM))[0][-1][0] + bmc4 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, family=socket.AF_INET, type=socket.SOCK_DGRAM))[0][-1][0] except Exception: pass try: if onlyfamily in (0, socket.AF_INET6): - bmc6 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET6, socket.SOCK_DGRAM))[0][-1][0] + bmc6 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, family=socket.AF_INET6, type=socket.SOCK_DGRAM))[0][-1][0] except Exception: pass cfgbyname = {} @@ -594,13 +594,13 @@ async def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None, ip6bynodename = None try: if onlyfamily in (socket.AF_INET, 0): - for addr in await asyncio.get_running_loop().getaddrinfo(node, 0, socket.AF_INET, socket.SOCK_DGRAM): + for addr in await asyncio.get_running_loop().getaddrinfo(node, 0, family=socket.AF_INET, type=socket.SOCK_DGRAM): ipbynodename = addr[-1][0] except socket.gaierror: pass try: if onlyfamily in (socket.AF_INET6, 0): - for addr in await asyncio.get_running_loop().getaddrinfo(node, 0, socket.AF_INET6, socket.SOCK_DGRAM): + for addr in await asyncio.get_running_loop().getaddrinfo(node, 0, family=socket.AF_INET6, type=socket.SOCK_DGRAM): ip6bynodename = addr[-1][0] except socket.gaierror: pass @@ -647,7 +647,7 @@ async def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None, if bmc6 and candip == bmc6: continue try: - for inf in await asyncio.get_running_loop().getaddrinfo(candip, 0, fam, socket.SOCK_STREAM): + for inf in await asyncio.get_running_loop().getaddrinfo(candip, 0, family=fam, type=socket.SOCK_STREAM): candipn = socket.inet_pton(fam, inf[-1][0]) if ((isremote and ipn_on_same_subnet(fam, clientipn, candipn, int(candprefix))) or ipn_on_same_subnet(fam, bootsvrip, candipn, prefix)): @@ -666,7 +666,7 @@ async def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None, except Exception as e: cfgdata['error_msg'] = "Error trying to evaluate net.*ipv4_address attribute value '{0}' on {1}: {2}".format(candip, node, str(e)) elif candgw: - for inf in await asyncio.get_running_loop().getaddrinfo(candgw, 0, fam, socket.SOCK_STREAM): + for inf in await asyncio.get_running_loop().getaddrinfo(candgw, 0, family=fam, type=socket.SOCK_STREAM): candgwn = socket.inet_pton(fam, inf[-1][0]) if ipn_on_same_subnet(fam, bootsvrip, candgwn, prefix): candgws.append((fam, candgwn, prefix)) @@ -818,7 +818,7 @@ async def get_prefix_len_for_ip(ip): plen = int(plen) myaddrs = await get_my_addresses() found = False - for inf in await asyncio.get_running_loop().getaddrinfo(ip, 0, 0, socket.SOCK_DGRAM): + for inf in await asyncio.get_running_loop().getaddrinfo(ip, 0, type=socket.SOCK_DGRAM): if plen: yield (inf[0], plen) return @@ -844,12 +844,12 @@ async def addresses_match(addr1, addr2): """ if '%' in addr1 or '%' in addr2: return False - for addrinfo in await asyncio.get_running_loop().getaddrinfo(addr1, 0, 0, socket.SOCK_STREAM): + for addrinfo in await asyncio.get_running_loop().getaddrinfo(addr1, 0, type=socket.SOCK_STREAM): rootaddr1 = socket.inet_pton(addrinfo[0], addrinfo[4][0]) if addrinfo[0] == socket.AF_INET6 and rootaddr1[:12] == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff': # normalize to standard IPv4 rootaddr1 = rootaddr1[-4:] - for otherinfo in await asyncio.get_running_loop().getaddrinfo(addr2, 0, 0, socket.SOCK_STREAM): + for otherinfo in await asyncio.get_running_loop().getaddrinfo(addr2, 0, type=socket.SOCK_STREAM): otheraddr = socket.inet_pton(otherinfo[0], otherinfo[4][0]) if otherinfo[0] == socket.AF_INET6 and otheraddr[:12] == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff': otheraddr = otheraddr[-4:] diff --git a/confluent_server/confluent/selfservice.py b/confluent_server/confluent/selfservice.py index 6695a5f5..42ae506d 100644 --- a/confluent_server/confluent/selfservice.py +++ b/confluent_server/confluent/selfservice.py @@ -48,7 +48,7 @@ def listdump(input): retval = '' for entry in input: retval += '- ' + entry + '\n' - return retval + return retval.encode() async def get_extra_names(nodename, cfg, myip=None, preferadjacent=False, addlocalhost=True): @@ -105,34 +105,30 @@ async def handle_request(req, make_response, mimetype): clientids = env.get('HTTP_CONFLUENT_IDS', None) if not clientids: rsp = await make_response(mimetype, 400, 'Bad Request') - await rsp.write( 'Bad Request') - return + await rsp.write(b'Bad Request') + return rsp for ids in clientids.split('/'): _, v = ids.split('=', 1) repname = disco.get_node_by_uuid_or_mac(v) if repname: rsp = await make_response(mimetype, 200, 'OK') - await rsp.write(repname) - return + await rsp.write(repname.encode()) + return rsp rsp = await make_response(mimetype, 404, 'Unknown') - return + return rsp if reqpath == '/self/registerapikey': crypthmac = env.get('HTTP_CONFLUENT_CRYPTHMAC', None) if int(env.get('CONTENT_LENGTH', 65)) > 64: rsp = await make_response(mimetype, 400, 'Bad Request') await rsp.write('Bad Request') - return + return rsp cryptkey = await req.read() if not (crypthmac and cryptkey): - rsp = make_response(mimetype, 401, 'Unauthorized') - await rsp.write('Unauthorized') - return + return await make_response(mimetype, 401, 'Unauthorized', body='Unauthorized') hmackey = cfg.get_node_attributes(nodename, ['secret.selfapiarmtoken'], decrypt=True) hmackey = hmackey.get(nodename, {}).get('secret.selfapiarmtoken', {}).get('value', None) if not hmackey: - rsp = await make_response(mimetype, 401, 'Unauthorized') - await rsp.write('Unauthorized') - return + return await make_response(mimetype, 401, 'Unauthorized', body='Unauthorized') if not isinstance(hmackey, bytes): hmackey = hmackey.encode('utf8') if not isinstance(cryptkey, bytes): @@ -140,47 +136,34 @@ async def handle_request(req, make_response, mimetype): try: crypthmac = base64.b64decode(crypthmac) except Exception: - rsp = await make_response(mimetype, 400, 'Bad Request') - await rsp.write('Bad Request') - return + return await make_response(mimetype, 400, 'Bad Request', body='Bad Request') righthmac = hmac.new(hmackey, cryptkey, hashlib.sha256).digest() if righthmac == crypthmac: if not isinstance(cryptkey, str): cryptkey = cryptkey.decode() cfgupdate = {nodename: {'crypted.selfapikey': {'hashvalue': cryptkey}}} - cfg.set_node_attributes(cfgupdate) - cfg.clear_node_attributes([nodename], ['secret.selfapiarmtoken']) - rsp = await make_response(mimetype, 200, 'OK') - await rsp.write('Accepted') - return - rsp = await make_response(mimetype, 401, 'Unauthorized') - await rsp.write('Unauthorized') - return + await cfg.set_node_attributes(cfgupdate) + await cfg.clear_node_attributes([nodename], ['secret.selfapiarmtoken']) + return await make_response(mimetype, 200, 'OK', body='Accepted') + return await make_response(mimetype, 401, 'Unauthorized', body='Unauthorized') apikey = req.headers.get('CONFLUENT_APIKEY', None) if not (nodename and apikey): - rsp = await make_response(mimetype, 401, 'Unauthorized') - await rsp.write('Unauthorized') - return + return await make_response(mimetype, 401, 'Unauthorized', body='Unauthorized') if len(apikey) > 48: - rsp = await make_response(mimetype, 401, 'Unauthorized') - await rsp.write('Unauthorized') - return + return await make_response(mimetype, 401, 'Unauthorized', body='Unauthorized') + return rsp ea = cfg.get_node_attributes(nodename, ['crypted.selfapikey', 'deployment.apiarmed']) eak = ea.get( nodename, {}).get('crypted.selfapikey', {}).get('hashvalue', None) if not eak: - rsp = await make_response(mimetype, 401, 'Unauthorized') - await rsp.write(b'Unauthorized') - return rsp + return await make_response(mimetype, 401, 'Unauthorized', body='Unauthorized') if not isinstance(eak, str): eak = eak.decode('utf8') salt = '$'.join(eak.split('$', 3)[:-1]) + '$' if crypt.crypt(apikey, salt) != eak: - rsp = await make_response(mimetype, 401, 'Unauthorized') - await rsp.write(b'Unauthorized') - return + return await make_response(mimetype, 401, 'Unauthorized', body='Unauthorized') if ea.get(nodename, {}).get('deployment.apiarmed', {}).get('value', None) == 'once': - cfg.set_node_attributes({nodename: {'deployment.apiarmed': ''}}) + await cfg.set_node_attributes({nodename: {'deployment.apiarmed': ''}}) myip = req.headers.get('X-Forwarded-Host', None) if myip and ']' in myip: myip = myip.split(']', 1)[0] @@ -198,18 +181,14 @@ async def handle_request(req, make_response, mimetype): elif retype == 'application/json': dumper = json.dumps else: - rsp = await make_response(mimetype, 406, 'Not supported') - await rsp.write(b'Unsupported content type in ACCEPT: ' + retype.encode()) - return + return await make_response(mimetype, 406, 'Not supported', body='Unsupported content type in ACCEPT: ' + retype) operation = req.method if operation not in ('HEAD', 'GET') and req.content_length > 0: reqbody = await req.read() if reqpath == '/self/register_discovered': rb = json.loads(reqbody) if not rb.get('path', None): - rsp = await make_response(mimetype, 400, 'Bad Requst') - await rsp.write('Missing Path') - return + return await make_response(mimetype, 400, 'Bad Request', body='Missing Path') targurl = '/affluent/systems/by-port/{0}/webaccess'.format(rb['path']) tlsverifier = util.TLSCertVerifier(cfg, nodename, 'pubkeys.tls_hardwaremanager') wc = webclient.WebConnection(nodename, 443, verifycallback=tlsverifier.verify_cert) @@ -239,13 +218,9 @@ async def handle_request(req, make_response, mimetype): elif rb['type'] == 'lenovo-smm2': rb['services'] = ['service:lenovo-smm2'] else: - rsp = await make_response(mimetype, 400, 'Unsupported Device') - await rsp.write('Unsupported device for remote discovery registration') - return + return await make_response(mimetype, 400, 'Unsupported Device', body='Unsupported device for remote discovery registration') await disco.detected(rb) - rsp = await make_response(mimetype, 200, 'OK') - await rsp.write('Registered') - return + return await make_response(mimetype, 200, 'OK', body='Registered') if reqpath == '/self/bmcconfig': hmattr = cfg.get_node_attributes(nodename, 'hardwaremanagement.*') hmattr = hmattr.get(nodename, {}) @@ -259,9 +234,7 @@ async def handle_request(req, make_response, mimetype): bmcaddr = hmattr.get('hardwaremanagement.manager', {}).get('value', None) if not bmcaddr: - rsp = await make_response(mimetype, 500, 'Internal Server Error') - await rsp.write('Missing value in hardwaremanagement.manager') - return + return await make_response(mimetype, 500, 'Internal Server Error', body='Missing value in hardwaremanagement.manager') bmcaddr = bmcaddr.split('/', 1)[0] bmcaddr = await asyncio.get_event_loop().getaddrinfo(bmcaddr, 0)[0] bmcaddr = bmcaddr[-1][0] @@ -271,8 +244,7 @@ async def handle_request(req, make_response, mimetype): res['prefixv4'] = netconfig['prefix'] res['bmcgw'] = netconfig.get('ipv4_gateway', None) # credential security results in user/password having to be deferred - mrsp = await make_response(mimetype, 200, 'OK') - await mrsp.write(dumper(res)) + return await make_response(mimetype, 200, 'OK', body=dumper(res)) elif reqpath == '/self/myattribs': cfd = cfg.get_node_attributes(nodename, '*', decrypt=True).get(nodename, {}) rsp = {} @@ -282,12 +254,10 @@ async def handle_request(req, make_response, mimetype): rsp[k] = cfd[k]['value'] if isinstance(rsp[k], bytes): rsp[k] = rsp[k].decode() - mrsp = await make_response(mimetype, 200, 'OK') - await mrsp.write(dumper(rsp)) + return await make_response(mimetype, 200, 'OK', body=dumper(rsp)) elif reqpath == '/self/netcfg': ncfg = await netutil.get_full_net_config(cfg, nodename, myip) - mrsp = await make_response(mimetype, 200, 'OK') - await mrsp.write(dumper(ncfg)) + return await make_response(mimetype, 200, 'OK', body=dumper(ncfg)) elif reqpath in ('/self/deploycfg', '/self/deploycfg2'): if 'CONFLUENT_MGTIFACE' in req.headers: nicname = req.headers['CONFLUENT_MGTIFACE'] @@ -421,30 +391,26 @@ async def handle_request(req, make_response, mimetype): ncfg['ntpservers'].append(ntpsrv) dnsdomain = deployinfo.get('dns.domain', {}).get('value', None) ncfg['dnsdomain'] = dnsdomain - mrsp = await make_response(mimetype, 200, 'OK') - await mrsp.write(dumper(ncfg).encode()) - return mrsp + return await make_response(mimetype, 200, 'OK', body=dumper(ncfg)) elif reqpath == '/self/sshcert' and reqbody: if not sshutil.ca_exists(): - mrsp = await make_response(mimetype, 500, 'Unconfigured') - await msrp.write(b'CA is not configured on this system (run ...)') - return mrsp + return await make_response(mimetype, 500, 'Unconfigured', body='CA is not configured on this system (run ...)') pals = await get_extra_names(nodename, cfg, myip) - cert = sshutil.sign_host_key(reqbody, nodename, pals) - mrsp = await make_response('text/plain', 200, 'OK') - await mrsp.write(cert) + cert = await sshutil.sign_host_key(reqbody, nodename, pals) + return await make_response('text/plain', 200, 'OK', body=cert.encode()) elif reqpath == '/self/nodelist': nodes, _ = await get_cluster_list(nodename, cfg) if isgeneric: mrsp = await make_response('text/plain', 200, 'OK') for node in util.natural_sort(nodes): - await mrsp.write(node + '\n') + await mrsp.write(f'{node}\n'.encode('utf-8')) else: mrsp = await make_response(retype, 200, 'OK') if retype == 'application/yaml': await mrsp.write(listdump(list(util.natural_sort(nodes)))) else: await mrsp.write(dumper(list(util.natural_sort(nodes)))) + return mrsp elif reqpath == '/self/remoteconfigbmc' and reqbody: try: reqbody = yamlload(reqbody) @@ -456,27 +422,25 @@ async def handle_request(req, make_response, mimetype): elif cfgmod == 'tsm': tsm.remote_nodecfg(nodename, cfg) else: - mrsp = await make_response(mimetype, 500, 'unsupported configmod') - await mrsp.write('Unsupported configmod "{}"'.format(cfgmod)) - mrsp = await make_response(mimetype, 200, 'Ok') - await mrsp.write('complete') + return await make_response(mimetype, 500, 'unsupported configmod', body='Unsupported configmod "{}"'.format(cfgmod)) + return await make_response(mimetype, 200, 'Ok', body='complete') elif reqpath == '/self/updatestatus' and reqbody: update = yamlload(reqbody) statusstr = update.get('state', None) statusdetail = update.get('state_detail', None) didstateupdate = False if statusstr or 'status' in update: - cfg.set_node_attributes({nodename: { + await cfg.set_node_attributes({nodename: { 'deployment.client_ip': {'value': clientip}}}) if statusstr: - cfg.set_node_attributes({nodename: {'deployment.state': statusstr}}) + await cfg.set_node_attributes({nodename: {'deployment.state': statusstr}}) didstateupdate = True if statusdetail: - cfg.set_node_attributes({nodename: {'deployment.state_detail': statusdetail}}) + await cfg.set_node_attributes({nodename: {'deployment.state_detail': statusdetail}}) didstateupdate = True if 'status' not in update and didstateupdate: mrsp = await make_response(mimetype, 200, 'Ok') - await mrsp.write('Accepted') + await mrsp.write(b'Accepted') return if update['status'] == 'staged': targattr = 'deployment.stagedprofile' @@ -503,18 +467,16 @@ async def handle_request(req, make_response, mimetype): currprof = currattr.get(targattr, {}).get('value', '') if currprof != pending: updates[targattr] = {'value': pending} - cfg.set_node_attributes({nodename: updates}) - mrsp = await make_response('text/plain', 200, 'OK') - await mrsp.write('OK') + await cfg.set_node_attributes({nodename: updates}) + return await make_response('text/plain', 200, 'OK', body='OK') else: - mrsp = await make_response('text/plain', 500, 'Error') - await mrsp.write('No pending profile detected, unable to accept status update') + return await make_response('text/plain', 500, 'Error', body='No pending profile detected, unable to accept status update') elif reqpath == '/self/saveapikey' and reqbody: if not isinstance(reqbody, str): reqbody = reqbody.decode('utf8') - cfg.set_node_attributes({ + await cfg.set_node_attributes({ nodename: {'deployment.sealedapikey': {'value': reqbody}}}) - mrsp = await make_response(mimetype, 200, 'OK') + return await make_response(mimetype, 200, 'OK', body='OK') elif reqpath.startswith('/self/remoteconfig/') and 'POST' == operation: scriptcat = reqpath.replace('/self/remoteconfig/', '') playlist = [] @@ -532,10 +494,9 @@ async def handle_request(req, make_response, mimetype): playlist.append(os.path.join(dirname, filename)) if playlist: runansible.run_playbooks(playlist, [nodename]) - mrsp = await make_response(mimetype, 202, 'Queued') + return await make_response(mimetype, 202, 'Queued', body='Queued') else: - mrsp = await make_response(mimetype, 200, 'OK') - return + return await make_response(mimetype, 200, 'OK', body='OK') elif reqpath.startswith('/self/remotesyncfiles'): if 'POST' == operation: pals = await get_extra_names(nodename, cfg, myip, preferadjacent=True, addlocalhost=False) @@ -543,25 +504,20 @@ async def handle_request(req, make_response, mimetype): pals = [clientip] result = syncfiles.start_syncfiles( nodename, cfg, json.loads(reqbody), pals) - mrsp = await make_response(mimetype, result[0]) - await mrsp.write(result[1]) - return + return await make_response(mimetype, result[0], result[1], body=result[2]) if 'GET' == operation: - status, output = syncfiles.get_syncresult(nodename) + statcode, status, output = syncfiles.get_syncresult(nodename) output = json.dumps(output) - mrsp = await make_response('application/json', status) - await mrsp.write(output) - return + return await make_response('application/json', statcode, status, body=output) elif reqpath.startswith('/self/remoteconfig/status'): rst = runansible.running_status.get(nodename, None) if not rst: - mrsp = await make_response(mimetype, 204, 'Not Running') - return + return await make_response(mimetype, 204, 'Not Running') mrsp = await make_response(mimetype, 200, 'OK') if rst.complete: del runansible.running_status[nodename] await mrsp.write(rst.dump_text()) - return + return mrsp elif reqpath.startswith('/self/scriptlist/'): scriptcat = reqpath.replace('/self/scriptlist/', '') slist, _ = get_scriptlist( @@ -572,6 +528,7 @@ async def handle_request(req, make_response, mimetype): await mrsp.write(yamldump(util.natural_sort(slist))) else: mrsp = await make_response(mimetype, 200, 'OK') + return mrsp elif reqpath.startswith('/self/profileprivate/pending/'): fname = reqpath.replace('/self/profileprivate/', '') deployinfo = cfg.get_node_attributes( @@ -580,22 +537,15 @@ async def handle_request(req, make_response, mimetype): profile = deployinfo.get( 'deployment.pendingprofile', {}).get('value', '') if not profile: - mrsp = await make_response(mimetype, 400, 'No pending profile') - await mrsp.write('No profile') - return + return await make_response(mimetype, 400, 'No pending profile', body='No profile') fname = '/var/lib/confluent/private/os/{}/{}'.format(profile, fname) try: with open(fname, 'rb') as privdata: - mrsp = await make_response(200, 'OK') - await mrsp.write(privdata.read()) - return + return await make_response(mimetype, 200, 'OK', body=privdata.read()) except IOError: - mrsp = await make_response(mimetype, 404, 'Not Found') - await mrsp.write('Not found') - return + return await make_response(mimetype, 404, 'Not Found', body='Not found') else: - mrsp = await make_response(mimetype, 404, 'Not Found') - await mrsp.write('Not found') + return await make_response(mimetype, 404, 'Not Found', body='Not found') def get_scriptlist(scriptcat, cfg, nodename, pathtemplate): if '..' in scriptcat: diff --git a/confluent_server/confluent/sshutil.py b/confluent_server/confluent/sshutil.py index 89786131..6f21dc8e 100644 --- a/confluent_server/confluent/sshutil.py +++ b/confluent_server/confluent/sshutil.py @@ -6,7 +6,9 @@ import confluent.config.configmanager as cfm import confluent.collective.manager as collective import confluent.util as util import glob +import os import shutil +import subprocess import tempfile agent_pid = None @@ -38,7 +40,7 @@ async def assure_agent(): if agent_pid is None: try: agent_starting = True - sai = await util.check_output(['ssh-agent'])[0] + sai = (await util.check_output(['ssh-agent']))[0] for line in sai.split(b'\n'): if b';' not in line: continue @@ -115,6 +117,7 @@ async def prep_ssh_key(keyname): adding_key = True if agent_pid: if os.path.exists(os.environ['SSH_AUTH_SOCK']): + sock = None try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(os.environ['SSH_AUTH_SOCK']) @@ -122,7 +125,8 @@ async def prep_ssh_key(keyname): os.unlink(os.environ['SSH_AUTH_SOCK']) os.rmdir(os.path.dirname(os.environ['SSH_AUTH_SOCK'])) finally: - sock.close() + if sock: + sock.close() if not os.path.exists(os.environ['SSH_AUTH_SOCK']): agent_pid = None ready_keys.clear() @@ -145,9 +149,7 @@ async def prep_ssh_key(keyname): os.environ['DISPLAY'] = 'NONE' os.environ['SSH_ASKPASS'] = askpass try: - with open(os.devnull, 'wb') as devnull: - #TODO:asyncmerge: capture stderr - await util.check_call('ssh-add', keyname) + await util.check_call('ssh-add', keyname, stdin=subprocess.DEVNULL) finally: del os.environ['CONFLUENT_SSH_PASSPHRASE'] del os.environ['DISPLAY'] diff --git a/confluent_server/confluent/syncfiles.py b/confluent_server/confluent/syncfiles.py index 048fbe82..fe965c59 100644 --- a/confluent_server/confluent/syncfiles.py +++ b/confluent_server/confluent/syncfiles.py @@ -313,20 +313,20 @@ def start_syncfiles(nodename, cfg, suffixes, principals=[]): raise Exception('Cannot perform syncfiles without profile assigned') synclist = '/var/lib/confluent/public/os/{}/syncfiles'.format(profile) if not os.path.exists(synclist): - return '200 OK', 'No synclist' # not running + return 200, 'OK', 'No synclist' # not running sl = SyncList(synclist, nodename, cfg) if not (sl.appendmap or sl.mergemap or sl.replacemap or sl.appendoncemap): - return '200 OK', 'Empty synclist' # the synclist has no actual entries + return 200, 'OK', 'Empty synclist' # the synclist has no actual entries if nodename in syncrunners: if syncrunners[nodename].dead: syncrunners[nodename].wait() else: - return '503 Synchronization already in progress', 'Synchronization already in progress for {}'.format(nodename) + return 503, 'Synchronization already in progress', 'Synchronization already in progress for {}'.format(nodename) syncrunners[nodename] = tasks.spawn( sync_list_to_node(sl, nodename, suffixes, peerip)) if not cleaner: cleaner = tasks.spawn(cleanit()) - return '202 Queued', 'Background synchronization initiated' # backgrounded + return 202,'Queued', 'Background synchronization initiated' # backgrounded async def cleanit(): @@ -351,9 +351,9 @@ async def cleanit(): def get_syncresult(nodename): if nodename not in syncrunners: - return ('204 Not Running', '') + return 204, 'Not Running', '' if not syncrunners[nodename].dead: - return ('200 OK', '') + return 200, 'OK', '' result = syncrunners[nodename].wait() del syncrunners[nodename] - return ('200 OK', result) + return 200, 'OK', result diff --git a/confluent_server/confluent/tasks.py b/confluent_server/confluent/tasks.py index 864bb07a..14e775cc 100644 --- a/confluent_server/confluent/tasks.py +++ b/confluent_server/confluent/tasks.py @@ -61,7 +61,7 @@ class TaskPile: yield task class TaskPool: - def __init__(self, max_concurrent): + def __init__(self, max_concurrent=128): self.max_concurrent = max_concurrent self._tasks = set() self._pending = [] diff --git a/confluent_server/confluent/util.py b/confluent_server/confluent/util.py index 96989610..08f98bed 100644 --- a/confluent_server/confluent/util.py +++ b/confluent_server/confluent/util.py @@ -63,13 +63,15 @@ async def check_call(*cmd, **kwargs): async def check_output(*cmd): + if len(cmd) == 1 and isinstance(cmd[0], (list, tuple)): + cmd = cmd[0] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await process.communicate() retcode = process.returncode if retcode: raise subprocess.CalledProcessError( - retcode, process.args, + retcode, cmd, output=stdout, stderr=stderr) return stdout, stderr