2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-26 10:41:29 +00:00

Fixes to allow a test deployment to complete under asyncio

This commit is contained in:
Jarrod Johnson
2026-03-04 10:47:33 -05:00
parent a5e7fe93e4
commit 2c75571a84
9 changed files with 106 additions and 148 deletions

View File

@@ -391,7 +391,7 @@ async def initialize(cmdset):
sys.exit(rc)
if not didsomething and (cmdset.k or cmdset.l or cmdset.g or cmdset.p):
if cmdset.g:
updateboot('genesis-x86_64')
await updateboot('genesis-x86_64')
sys.exit(0)
if not didsomething:
sys.stderr.write('Nothing was done, use initialize -i for '

View File

@@ -383,7 +383,7 @@ def new_dhcp_packet(handler, nodeguess, cfg, net4):
recv = ipfromint(recv)
rqv = memoryview(data)
if rqv[0] == 1:
process_dhcp4req(handler, nodeguess, cfg, net4, idx, recv, rqv, client)
tasks.spawn(process_dhcp4req(handler, nodeguess, cfg, net4, idx, recv, rqv, client))
def new_dhcp6_packet(handler, net6, cfg, nodeguess):
@@ -466,9 +466,9 @@ async def process_dhcp6req(handler, rqv, addr, net, cfg, nodeguess):
if ignoredisco.get(mac, 0) + 90 < time.time():
ignoredisco[mac] = time.time()
handler(info)
consider_discover(info, req, net, cfg, None, nodeguess, addr)
await consider_discover(info, req, net, cfg, None, nodeguess, addr)
def process_dhcp4req(handler, nodeguess, cfg, net4, idx, recv, rqv, client):
async def process_dhcp4req(handler, nodeguess, cfg, net4, idx, recv, rqv, client):
rq = bytearray(rqv)
addrlen = rq[2]
if addrlen > 16 or addrlen == 0:
@@ -511,7 +511,7 @@ def process_dhcp4req(handler, nodeguess, cfg, net4, idx, recv, rqv, client):
and time.time() > ignoredisco.get(netaddr, 0) + 90):
ignoredisco[netaddr] = time.time()
handler(info)
consider_discover(info, rqinfo, net4, cfg, rqv, nodeguess, requestor=client)
await consider_discover(info, rqinfo, net4, cfg, rqv, nodeguess, requestor=client)
@@ -942,13 +942,13 @@ def ack_request(pkt, rq, info, sock=None, requestor=None):
else:
send_raw_packet(repview, len(rply), rq, info)
def consider_discover(info, packet, sock, cfg, reqview, nodeguess, addr=None, requestor=None):
async def consider_discover(info, packet, sock, cfg, reqview, nodeguess, addr=None, requestor=None):
if packet.get(53, None) == b'\x03':
ack_request(packet, reqview, info, sock, requestor)
elif info.get('hwaddr', None) in macmap: # and info.get('uuid', None):
check_reply(macmap[info['hwaddr']], info, packet, sock, cfg, reqview, addr, requestor)
await check_reply(macmap[info['hwaddr']], info, packet, sock, cfg, reqview, addr, requestor)
elif info.get('uuid', None) in uuidmap:
check_reply(uuidmap[info['uuid']], info, packet, sock, cfg, reqview, addr, requestor)
await check_reply(uuidmap[info['uuid']], info, packet, sock, cfg, reqview, addr, requestor)
elif packet.get(53, None) == b'\x03':
ack_request(packet, reqview, info, sock, requestor)
elif info.get('uuid', None) and info.get('hwaddr', None):

View File

@@ -632,7 +632,7 @@ async def resourcehandler(request):
# any 'yield' needs to become a write to the streamresponse
#TODO:asyncmerge: Replace /confluent-api with '' in path
# Needs testing for confluent header names with golang clients
async def make_response(mimetype, status=200, reason=None, headers=None, cookies=None):
async def make_response(mimetype, status=200, reason=None, headers=None, cookies=None, body=None):
rspheaders = {
'Cache-Control': 'no-store',
'Pragma': 'no-cache',
@@ -650,6 +650,10 @@ async def resourcehandler(request):
rsp.cookies.update(cookies)
rsp.content_type = mimetype
await rsp.prepare(request)
if body:
if isinstance(body, str):
body = body.encode('utf8')
await rsp.write(body)
return rsp
try:
if 'Sec-WebSocket-Version' in request.headers:

View File

@@ -77,13 +77,13 @@ async def ip_on_same_subnet(first, second, prefix):
first = first.replace('::ffff:', '')
if second.startswith('::ffff:') and '.' in second:
second = second.replace('::ffff:', '')
addrinf = (await asyncio.get_running_loop().getaddrinfo(first, None, 0, socket.SOCK_STREAM))[0]
addrinf = (await asyncio.get_running_loop().getaddrinfo(first, 0, type=socket.SOCK_STREAM))[0]
fam = addrinf[0]
if '%' in addrinf[-1][0]:
return False
ip = socket.inet_pton(fam, addrinf[-1][0])
ip = int(codecs.encode(bytes(ip), 'hex'), 16)
addrinf = (await asyncio.get_running_loop().getaddrinfo(second, None, 0, socket.SOCK_STREAM))[0]
addrinf = (await asyncio.get_running_loop().getaddrinfo(second, 0, type=socket.SOCK_STREAM))[0]
if fam != addrinf[0]:
return False
txtaddr = addrinf[-1][0].split('%')[0]
@@ -252,7 +252,7 @@ class NetManager(object):
if ipv4addr:
try:
luaddr = ipv4addr.split('/', 1)[0]
for ai in await asyncio.get_running_loop().getaddrinfo(luaddr, 0, socket.AF_INET, socket.SOCK_STREAM):
for ai in await asyncio.get_running_loop().getaddrinfo(luaddr, 0, family=socket.AF_INET, type=socket.SOCK_STREAM):
ipv4addr.replace(luaddr, ai[-1][0])
except socket.gaierror:
pass
@@ -260,7 +260,7 @@ class NetManager(object):
currname = attribs.get('hostname', self.node).split()[0]
if currname and currname not in self.consumednames4:
try:
for ai in await asyncio.get_running_loop().getaddrinfo(currname, 0, socket.AF_INET, socket.SOCK_STREAM):
for ai in await asyncio.get_running_loop().getaddrinfo(currname, 0, family=socket.AF_INET, type=socket.SOCK_STREAM):
ipv4addr = ai[-1][0]
self.consumednames4.add(currname)
except socket.gaierror:
@@ -277,7 +277,7 @@ class NetManager(object):
ipv6addr = attribs.get('ipv6_address', None)
if ipv6addr:
try:
for ai in await asyncio.get_running_loop().getaddrinfo(ipv6addr, 0, socket.AF_INET6, socket.SOCK_STREAM):
for ai in await asyncio.get_running_loop().getaddrinfo(ipv6addr, 0, family=socket.AF_INET6, type=socket.SOCK_STREAM):
ipv6addr = ai[-1][0]
except socket.gaierror:
pass
@@ -285,7 +285,7 @@ class NetManager(object):
currname = attribs.get('hostname', self.node).split()[0]
if currname and currname not in self.consumednames6:
try:
for ai in await asyncio.get_running_loop().getaddrinfo(currname, 0, socket.AF_INET6, socket.SOCK_STREAM):
for ai in await asyncio.get_running_loop().getaddrinfo(currname, 0, family=socket.AF_INET6, type=socket.SOCK_STREAM):
ipv6addr = ai[-1][0]
self.consumednames6.add(currname)
except socket.gaierror:
@@ -371,11 +371,11 @@ async def get_full_net_config(configmanager, node, serverip=None):
bmc6 = None
if bmc:
try:
bmc4 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET, socket.SOCK_DGRAM))[0][-1][0]
bmc4 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, family=socket.AF_INET, type=socket.SOCK_DGRAM))[0][-1][0]
except Exception:
pass
try:
bmc6 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET6, socket.SOCK_DGRAM))[0][-1][0]
bmc6 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, family=socket.AF_INET6, type=socket.SOCK_DGRAM))[0][-1][0]
except Exception:
pass
attribs = {}
@@ -419,7 +419,7 @@ async def get_full_net_config(configmanager, node, serverip=None):
add_netmask(retattrs['default'])
del nm.myattribs[None]
else:
nnc = get_nic_config(configmanager, node, serverip=serverip)
nnc = await get_nic_config(configmanager, node, serverip=serverip)
if nnc.get('ipv4_address', None):
defaultnic['ipv4_address'] = '{}/{}'.format(nnc['ipv4_address'], nnc['prefix'])
if nnc.get('ipv4_gateway', None):
@@ -530,12 +530,12 @@ async def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None,
if bmc:
try:
if onlyfamily in (0, socket.AF_INET):
bmc4 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET, socket.SOCK_DGRAM))[0][-1][0]
bmc4 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, family=socket.AF_INET, type=socket.SOCK_DGRAM))[0][-1][0]
except Exception:
pass
try:
if onlyfamily in (0, socket.AF_INET6):
bmc6 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, socket.AF_INET6, socket.SOCK_DGRAM))[0][-1][0]
bmc6 = (await asyncio.get_running_loop().getaddrinfo(bmc, 0, family=socket.AF_INET6, type=socket.SOCK_DGRAM))[0][-1][0]
except Exception:
pass
cfgbyname = {}
@@ -594,13 +594,13 @@ async def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None,
ip6bynodename = None
try:
if onlyfamily in (socket.AF_INET, 0):
for addr in await asyncio.get_running_loop().getaddrinfo(node, 0, socket.AF_INET, socket.SOCK_DGRAM):
for addr in await asyncio.get_running_loop().getaddrinfo(node, 0, family=socket.AF_INET, type=socket.SOCK_DGRAM):
ipbynodename = addr[-1][0]
except socket.gaierror:
pass
try:
if onlyfamily in (socket.AF_INET6, 0):
for addr in await asyncio.get_running_loop().getaddrinfo(node, 0, socket.AF_INET6, socket.SOCK_DGRAM):
for addr in await asyncio.get_running_loop().getaddrinfo(node, 0, family=socket.AF_INET6, type=socket.SOCK_DGRAM):
ip6bynodename = addr[-1][0]
except socket.gaierror:
pass
@@ -647,7 +647,7 @@ async def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None,
if bmc6 and candip == bmc6:
continue
try:
for inf in await asyncio.get_running_loop().getaddrinfo(candip, 0, fam, socket.SOCK_STREAM):
for inf in await asyncio.get_running_loop().getaddrinfo(candip, 0, family=fam, type=socket.SOCK_STREAM):
candipn = socket.inet_pton(fam, inf[-1][0])
if ((isremote and ipn_on_same_subnet(fam, clientipn, candipn, int(candprefix)))
or ipn_on_same_subnet(fam, bootsvrip, candipn, prefix)):
@@ -666,7 +666,7 @@ async def get_nic_config(configmanager, node, ip=None, mac=None, ifidx=None,
except Exception as e:
cfgdata['error_msg'] = "Error trying to evaluate net.*ipv4_address attribute value '{0}' on {1}: {2}".format(candip, node, str(e))
elif candgw:
for inf in await asyncio.get_running_loop().getaddrinfo(candgw, 0, fam, socket.SOCK_STREAM):
for inf in await asyncio.get_running_loop().getaddrinfo(candgw, 0, family=fam, type=socket.SOCK_STREAM):
candgwn = socket.inet_pton(fam, inf[-1][0])
if ipn_on_same_subnet(fam, bootsvrip, candgwn, prefix):
candgws.append((fam, candgwn, prefix))
@@ -818,7 +818,7 @@ async def get_prefix_len_for_ip(ip):
plen = int(plen)
myaddrs = await get_my_addresses()
found = False
for inf in await asyncio.get_running_loop().getaddrinfo(ip, 0, 0, socket.SOCK_DGRAM):
for inf in await asyncio.get_running_loop().getaddrinfo(ip, 0, type=socket.SOCK_DGRAM):
if plen:
yield (inf[0], plen)
return
@@ -844,12 +844,12 @@ async def addresses_match(addr1, addr2):
"""
if '%' in addr1 or '%' in addr2:
return False
for addrinfo in await asyncio.get_running_loop().getaddrinfo(addr1, 0, 0, socket.SOCK_STREAM):
for addrinfo in await asyncio.get_running_loop().getaddrinfo(addr1, 0, type=socket.SOCK_STREAM):
rootaddr1 = socket.inet_pton(addrinfo[0], addrinfo[4][0])
if addrinfo[0] == socket.AF_INET6 and rootaddr1[:12] == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff':
# normalize to standard IPv4
rootaddr1 = rootaddr1[-4:]
for otherinfo in await asyncio.get_running_loop().getaddrinfo(addr2, 0, 0, socket.SOCK_STREAM):
for otherinfo in await asyncio.get_running_loop().getaddrinfo(addr2, 0, type=socket.SOCK_STREAM):
otheraddr = socket.inet_pton(otherinfo[0], otherinfo[4][0])
if otherinfo[0] == socket.AF_INET6 and otheraddr[:12] == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff':
otheraddr = otheraddr[-4:]

View File

@@ -48,7 +48,7 @@ def listdump(input):
retval = ''
for entry in input:
retval += '- ' + entry + '\n'
return retval
return retval.encode()
async def get_extra_names(nodename, cfg, myip=None, preferadjacent=False, addlocalhost=True):
@@ -105,34 +105,30 @@ async def handle_request(req, make_response, mimetype):
clientids = env.get('HTTP_CONFLUENT_IDS', None)
if not clientids:
rsp = await make_response(mimetype, 400, 'Bad Request')
await rsp.write( 'Bad Request')
return
await rsp.write(b'Bad Request')
return rsp
for ids in clientids.split('/'):
_, v = ids.split('=', 1)
repname = disco.get_node_by_uuid_or_mac(v)
if repname:
rsp = await make_response(mimetype, 200, 'OK')
await rsp.write(repname)
return
await rsp.write(repname.encode())
return rsp
rsp = await make_response(mimetype, 404, 'Unknown')
return
return rsp
if reqpath == '/self/registerapikey':
crypthmac = env.get('HTTP_CONFLUENT_CRYPTHMAC', None)
if int(env.get('CONTENT_LENGTH', 65)) > 64:
rsp = await make_response(mimetype, 400, 'Bad Request')
await rsp.write('Bad Request')
return
return rsp
cryptkey = await req.read()
if not (crypthmac and cryptkey):
rsp = make_response(mimetype, 401, 'Unauthorized')
await rsp.write('Unauthorized')
return
return await make_response(mimetype, 401, 'Unauthorized', body='Unauthorized')
hmackey = cfg.get_node_attributes(nodename, ['secret.selfapiarmtoken'], decrypt=True)
hmackey = hmackey.get(nodename, {}).get('secret.selfapiarmtoken', {}).get('value', None)
if not hmackey:
rsp = await make_response(mimetype, 401, 'Unauthorized')
await rsp.write('Unauthorized')
return
return await make_response(mimetype, 401, 'Unauthorized', body='Unauthorized')
if not isinstance(hmackey, bytes):
hmackey = hmackey.encode('utf8')
if not isinstance(cryptkey, bytes):
@@ -140,47 +136,34 @@ async def handle_request(req, make_response, mimetype):
try:
crypthmac = base64.b64decode(crypthmac)
except Exception:
rsp = await make_response(mimetype, 400, 'Bad Request')
await rsp.write('Bad Request')
return
return await make_response(mimetype, 400, 'Bad Request', body='Bad Request')
righthmac = hmac.new(hmackey, cryptkey, hashlib.sha256).digest()
if righthmac == crypthmac:
if not isinstance(cryptkey, str):
cryptkey = cryptkey.decode()
cfgupdate = {nodename: {'crypted.selfapikey': {'hashvalue': cryptkey}}}
cfg.set_node_attributes(cfgupdate)
cfg.clear_node_attributes([nodename], ['secret.selfapiarmtoken'])
rsp = await make_response(mimetype, 200, 'OK')
await rsp.write('Accepted')
return
rsp = await make_response(mimetype, 401, 'Unauthorized')
await rsp.write('Unauthorized')
return
await cfg.set_node_attributes(cfgupdate)
await cfg.clear_node_attributes([nodename], ['secret.selfapiarmtoken'])
return await make_response(mimetype, 200, 'OK', body='Accepted')
return await make_response(mimetype, 401, 'Unauthorized', body='Unauthorized')
apikey = req.headers.get('CONFLUENT_APIKEY', None)
if not (nodename and apikey):
rsp = await make_response(mimetype, 401, 'Unauthorized')
await rsp.write('Unauthorized')
return
return await make_response(mimetype, 401, 'Unauthorized', body='Unauthorized')
if len(apikey) > 48:
rsp = await make_response(mimetype, 401, 'Unauthorized')
await rsp.write('Unauthorized')
return
return await make_response(mimetype, 401, 'Unauthorized', body='Unauthorized')
return rsp
ea = cfg.get_node_attributes(nodename, ['crypted.selfapikey', 'deployment.apiarmed'])
eak = ea.get(
nodename, {}).get('crypted.selfapikey', {}).get('hashvalue', None)
if not eak:
rsp = await make_response(mimetype, 401, 'Unauthorized')
await rsp.write(b'Unauthorized')
return rsp
return await make_response(mimetype, 401, 'Unauthorized', body='Unauthorized')
if not isinstance(eak, str):
eak = eak.decode('utf8')
salt = '$'.join(eak.split('$', 3)[:-1]) + '$'
if crypt.crypt(apikey, salt) != eak:
rsp = await make_response(mimetype, 401, 'Unauthorized')
await rsp.write(b'Unauthorized')
return
return await make_response(mimetype, 401, 'Unauthorized', body='Unauthorized')
if ea.get(nodename, {}).get('deployment.apiarmed', {}).get('value', None) == 'once':
cfg.set_node_attributes({nodename: {'deployment.apiarmed': ''}})
await cfg.set_node_attributes({nodename: {'deployment.apiarmed': ''}})
myip = req.headers.get('X-Forwarded-Host', None)
if myip and ']' in myip:
myip = myip.split(']', 1)[0]
@@ -198,18 +181,14 @@ async def handle_request(req, make_response, mimetype):
elif retype == 'application/json':
dumper = json.dumps
else:
rsp = await make_response(mimetype, 406, 'Not supported')
await rsp.write(b'Unsupported content type in ACCEPT: ' + retype.encode())
return
return await make_response(mimetype, 406, 'Not supported', body='Unsupported content type in ACCEPT: ' + retype)
operation = req.method
if operation not in ('HEAD', 'GET') and req.content_length > 0:
reqbody = await req.read()
if reqpath == '/self/register_discovered':
rb = json.loads(reqbody)
if not rb.get('path', None):
rsp = await make_response(mimetype, 400, 'Bad Requst')
await rsp.write('Missing Path')
return
return await make_response(mimetype, 400, 'Bad Request', body='Missing Path')
targurl = '/affluent/systems/by-port/{0}/webaccess'.format(rb['path'])
tlsverifier = util.TLSCertVerifier(cfg, nodename, 'pubkeys.tls_hardwaremanager')
wc = webclient.WebConnection(nodename, 443, verifycallback=tlsverifier.verify_cert)
@@ -239,13 +218,9 @@ async def handle_request(req, make_response, mimetype):
elif rb['type'] == 'lenovo-smm2':
rb['services'] = ['service:lenovo-smm2']
else:
rsp = await make_response(mimetype, 400, 'Unsupported Device')
await rsp.write('Unsupported device for remote discovery registration')
return
return await make_response(mimetype, 400, 'Unsupported Device', body='Unsupported device for remote discovery registration')
await disco.detected(rb)
rsp = await make_response(mimetype, 200, 'OK')
await rsp.write('Registered')
return
return await make_response(mimetype, 200, 'OK', body='Registered')
if reqpath == '/self/bmcconfig':
hmattr = cfg.get_node_attributes(nodename, 'hardwaremanagement.*')
hmattr = hmattr.get(nodename, {})
@@ -259,9 +234,7 @@ async def handle_request(req, make_response, mimetype):
bmcaddr = hmattr.get('hardwaremanagement.manager', {}).get('value',
None)
if not bmcaddr:
rsp = await make_response(mimetype, 500, 'Internal Server Error')
await rsp.write('Missing value in hardwaremanagement.manager')
return
return await make_response(mimetype, 500, 'Internal Server Error', body='Missing value in hardwaremanagement.manager')
bmcaddr = bmcaddr.split('/', 1)[0]
bmcaddr = await asyncio.get_event_loop().getaddrinfo(bmcaddr, 0)[0]
bmcaddr = bmcaddr[-1][0]
@@ -271,8 +244,7 @@ async def handle_request(req, make_response, mimetype):
res['prefixv4'] = netconfig['prefix']
res['bmcgw'] = netconfig.get('ipv4_gateway', None)
# credential security results in user/password having to be deferred
mrsp = await make_response(mimetype, 200, 'OK')
await mrsp.write(dumper(res))
return await make_response(mimetype, 200, 'OK', body=dumper(res))
elif reqpath == '/self/myattribs':
cfd = cfg.get_node_attributes(nodename, '*', decrypt=True).get(nodename, {})
rsp = {}
@@ -282,12 +254,10 @@ async def handle_request(req, make_response, mimetype):
rsp[k] = cfd[k]['value']
if isinstance(rsp[k], bytes):
rsp[k] = rsp[k].decode()
mrsp = await make_response(mimetype, 200, 'OK')
await mrsp.write(dumper(rsp))
return await make_response(mimetype, 200, 'OK', body=dumper(rsp))
elif reqpath == '/self/netcfg':
ncfg = await netutil.get_full_net_config(cfg, nodename, myip)
mrsp = await make_response(mimetype, 200, 'OK')
await mrsp.write(dumper(ncfg))
return await make_response(mimetype, 200, 'OK', body=dumper(ncfg))
elif reqpath in ('/self/deploycfg', '/self/deploycfg2'):
if 'CONFLUENT_MGTIFACE' in req.headers:
nicname = req.headers['CONFLUENT_MGTIFACE']
@@ -421,30 +391,26 @@ async def handle_request(req, make_response, mimetype):
ncfg['ntpservers'].append(ntpsrv)
dnsdomain = deployinfo.get('dns.domain', {}).get('value', None)
ncfg['dnsdomain'] = dnsdomain
mrsp = await make_response(mimetype, 200, 'OK')
await mrsp.write(dumper(ncfg).encode())
return mrsp
return await make_response(mimetype, 200, 'OK', body=dumper(ncfg))
elif reqpath == '/self/sshcert' and reqbody:
if not sshutil.ca_exists():
mrsp = await make_response(mimetype, 500, 'Unconfigured')
await msrp.write(b'CA is not configured on this system (run ...)')
return mrsp
return await make_response(mimetype, 500, 'Unconfigured', body='CA is not configured on this system (run ...)')
pals = await get_extra_names(nodename, cfg, myip)
cert = sshutil.sign_host_key(reqbody, nodename, pals)
mrsp = await make_response('text/plain', 200, 'OK')
await mrsp.write(cert)
cert = await sshutil.sign_host_key(reqbody, nodename, pals)
return await make_response('text/plain', 200, 'OK', body=cert.encode())
elif reqpath == '/self/nodelist':
nodes, _ = await get_cluster_list(nodename, cfg)
if isgeneric:
mrsp = await make_response('text/plain', 200, 'OK')
for node in util.natural_sort(nodes):
await mrsp.write(node + '\n')
await mrsp.write(f'{node}\n'.encode('utf-8'))
else:
mrsp = await make_response(retype, 200, 'OK')
if retype == 'application/yaml':
await mrsp.write(listdump(list(util.natural_sort(nodes))))
else:
await mrsp.write(dumper(list(util.natural_sort(nodes))))
return mrsp
elif reqpath == '/self/remoteconfigbmc' and reqbody:
try:
reqbody = yamlload(reqbody)
@@ -456,27 +422,25 @@ async def handle_request(req, make_response, mimetype):
elif cfgmod == 'tsm':
tsm.remote_nodecfg(nodename, cfg)
else:
mrsp = await make_response(mimetype, 500, 'unsupported configmod')
await mrsp.write('Unsupported configmod "{}"'.format(cfgmod))
mrsp = await make_response(mimetype, 200, 'Ok')
await mrsp.write('complete')
return await make_response(mimetype, 500, 'unsupported configmod', body='Unsupported configmod "{}"'.format(cfgmod))
return await make_response(mimetype, 200, 'Ok', body='complete')
elif reqpath == '/self/updatestatus' and reqbody:
update = yamlload(reqbody)
statusstr = update.get('state', None)
statusdetail = update.get('state_detail', None)
didstateupdate = False
if statusstr or 'status' in update:
cfg.set_node_attributes({nodename: {
await cfg.set_node_attributes({nodename: {
'deployment.client_ip': {'value': clientip}}})
if statusstr:
cfg.set_node_attributes({nodename: {'deployment.state': statusstr}})
await cfg.set_node_attributes({nodename: {'deployment.state': statusstr}})
didstateupdate = True
if statusdetail:
cfg.set_node_attributes({nodename: {'deployment.state_detail': statusdetail}})
await cfg.set_node_attributes({nodename: {'deployment.state_detail': statusdetail}})
didstateupdate = True
if 'status' not in update and didstateupdate:
mrsp = await make_response(mimetype, 200, 'Ok')
await mrsp.write('Accepted')
await mrsp.write(b'Accepted')
return
if update['status'] == 'staged':
targattr = 'deployment.stagedprofile'
@@ -503,18 +467,16 @@ async def handle_request(req, make_response, mimetype):
currprof = currattr.get(targattr, {}).get('value', '')
if currprof != pending:
updates[targattr] = {'value': pending}
cfg.set_node_attributes({nodename: updates})
mrsp = await make_response('text/plain', 200, 'OK')
await mrsp.write('OK')
await cfg.set_node_attributes({nodename: updates})
return await make_response('text/plain', 200, 'OK', body='OK')
else:
mrsp = await make_response('text/plain', 500, 'Error')
await mrsp.write('No pending profile detected, unable to accept status update')
return await make_response('text/plain', 500, 'Error', body='No pending profile detected, unable to accept status update')
elif reqpath == '/self/saveapikey' and reqbody:
if not isinstance(reqbody, str):
reqbody = reqbody.decode('utf8')
cfg.set_node_attributes({
await cfg.set_node_attributes({
nodename: {'deployment.sealedapikey': {'value': reqbody}}})
mrsp = await make_response(mimetype, 200, 'OK')
return await make_response(mimetype, 200, 'OK', body='OK')
elif reqpath.startswith('/self/remoteconfig/') and 'POST' == operation:
scriptcat = reqpath.replace('/self/remoteconfig/', '')
playlist = []
@@ -532,10 +494,9 @@ async def handle_request(req, make_response, mimetype):
playlist.append(os.path.join(dirname, filename))
if playlist:
runansible.run_playbooks(playlist, [nodename])
mrsp = await make_response(mimetype, 202, 'Queued')
return await make_response(mimetype, 202, 'Queued', body='Queued')
else:
mrsp = await make_response(mimetype, 200, 'OK')
return
return await make_response(mimetype, 200, 'OK', body='OK')
elif reqpath.startswith('/self/remotesyncfiles'):
if 'POST' == operation:
pals = await get_extra_names(nodename, cfg, myip, preferadjacent=True, addlocalhost=False)
@@ -543,25 +504,20 @@ async def handle_request(req, make_response, mimetype):
pals = [clientip]
result = syncfiles.start_syncfiles(
nodename, cfg, json.loads(reqbody), pals)
mrsp = await make_response(mimetype, result[0])
await mrsp.write(result[1])
return
return await make_response(mimetype, result[0], result[1], body=result[2])
if 'GET' == operation:
status, output = syncfiles.get_syncresult(nodename)
statcode, status, output = syncfiles.get_syncresult(nodename)
output = json.dumps(output)
mrsp = await make_response('application/json', status)
await mrsp.write(output)
return
return await make_response('application/json', statcode, status, body=output)
elif reqpath.startswith('/self/remoteconfig/status'):
rst = runansible.running_status.get(nodename, None)
if not rst:
mrsp = await make_response(mimetype, 204, 'Not Running')
return
return await make_response(mimetype, 204, 'Not Running')
mrsp = await make_response(mimetype, 200, 'OK')
if rst.complete:
del runansible.running_status[nodename]
await mrsp.write(rst.dump_text())
return
return mrsp
elif reqpath.startswith('/self/scriptlist/'):
scriptcat = reqpath.replace('/self/scriptlist/', '')
slist, _ = get_scriptlist(
@@ -572,6 +528,7 @@ async def handle_request(req, make_response, mimetype):
await mrsp.write(yamldump(util.natural_sort(slist)))
else:
mrsp = await make_response(mimetype, 200, 'OK')
return mrsp
elif reqpath.startswith('/self/profileprivate/pending/'):
fname = reqpath.replace('/self/profileprivate/', '')
deployinfo = cfg.get_node_attributes(
@@ -580,22 +537,15 @@ async def handle_request(req, make_response, mimetype):
profile = deployinfo.get(
'deployment.pendingprofile', {}).get('value', '')
if not profile:
mrsp = await make_response(mimetype, 400, 'No pending profile')
await mrsp.write('No profile')
return
return await make_response(mimetype, 400, 'No pending profile', body='No profile')
fname = '/var/lib/confluent/private/os/{}/{}'.format(profile, fname)
try:
with open(fname, 'rb') as privdata:
mrsp = await make_response(200, 'OK')
await mrsp.write(privdata.read())
return
return await make_response(mimetype, 200, 'OK', body=privdata.read())
except IOError:
mrsp = await make_response(mimetype, 404, 'Not Found')
await mrsp.write('Not found')
return
return await make_response(mimetype, 404, 'Not Found', body='Not found')
else:
mrsp = await make_response(mimetype, 404, 'Not Found')
await mrsp.write('Not found')
return await make_response(mimetype, 404, 'Not Found', body='Not found')
def get_scriptlist(scriptcat, cfg, nodename, pathtemplate):
if '..' in scriptcat:

View File

@@ -6,7 +6,9 @@ import confluent.config.configmanager as cfm
import confluent.collective.manager as collective
import confluent.util as util
import glob
import os
import shutil
import subprocess
import tempfile
agent_pid = None
@@ -38,7 +40,7 @@ async def assure_agent():
if agent_pid is None:
try:
agent_starting = True
sai = await util.check_output(['ssh-agent'])[0]
sai = (await util.check_output(['ssh-agent']))[0]
for line in sai.split(b'\n'):
if b';' not in line:
continue
@@ -115,6 +117,7 @@ async def prep_ssh_key(keyname):
adding_key = True
if agent_pid:
if os.path.exists(os.environ['SSH_AUTH_SOCK']):
sock = None
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(os.environ['SSH_AUTH_SOCK'])
@@ -122,7 +125,8 @@ async def prep_ssh_key(keyname):
os.unlink(os.environ['SSH_AUTH_SOCK'])
os.rmdir(os.path.dirname(os.environ['SSH_AUTH_SOCK']))
finally:
sock.close()
if sock:
sock.close()
if not os.path.exists(os.environ['SSH_AUTH_SOCK']):
agent_pid = None
ready_keys.clear()
@@ -145,9 +149,7 @@ async def prep_ssh_key(keyname):
os.environ['DISPLAY'] = 'NONE'
os.environ['SSH_ASKPASS'] = askpass
try:
with open(os.devnull, 'wb') as devnull:
#TODO:asyncmerge: capture stderr
await util.check_call('ssh-add', keyname)
await util.check_call('ssh-add', keyname, stdin=subprocess.DEVNULL)
finally:
del os.environ['CONFLUENT_SSH_PASSPHRASE']
del os.environ['DISPLAY']

View File

@@ -313,20 +313,20 @@ def start_syncfiles(nodename, cfg, suffixes, principals=[]):
raise Exception('Cannot perform syncfiles without profile assigned')
synclist = '/var/lib/confluent/public/os/{}/syncfiles'.format(profile)
if not os.path.exists(synclist):
return '200 OK', 'No synclist' # not running
return 200, 'OK', 'No synclist' # not running
sl = SyncList(synclist, nodename, cfg)
if not (sl.appendmap or sl.mergemap or sl.replacemap or sl.appendoncemap):
return '200 OK', 'Empty synclist' # the synclist has no actual entries
return 200, 'OK', 'Empty synclist' # the synclist has no actual entries
if nodename in syncrunners:
if syncrunners[nodename].dead:
syncrunners[nodename].wait()
else:
return '503 Synchronization already in progress', 'Synchronization already in progress for {}'.format(nodename)
return 503, 'Synchronization already in progress', 'Synchronization already in progress for {}'.format(nodename)
syncrunners[nodename] = tasks.spawn(
sync_list_to_node(sl, nodename, suffixes, peerip))
if not cleaner:
cleaner = tasks.spawn(cleanit())
return '202 Queued', 'Background synchronization initiated' # backgrounded
return 202,'Queued', 'Background synchronization initiated' # backgrounded
async def cleanit():
@@ -351,9 +351,9 @@ async def cleanit():
def get_syncresult(nodename):
if nodename not in syncrunners:
return ('204 Not Running', '')
return 204, 'Not Running', ''
if not syncrunners[nodename].dead:
return ('200 OK', '')
return 200, 'OK', ''
result = syncrunners[nodename].wait()
del syncrunners[nodename]
return ('200 OK', result)
return 200, 'OK', result

View File

@@ -61,7 +61,7 @@ class TaskPile:
yield task
class TaskPool:
def __init__(self, max_concurrent):
def __init__(self, max_concurrent=128):
self.max_concurrent = max_concurrent
self._tasks = set()
self._pending = []

View File

@@ -63,13 +63,15 @@ async def check_call(*cmd, **kwargs):
async def check_output(*cmd):
if len(cmd) == 1 and isinstance(cmd[0], (list, tuple)):
cmd = cmd[0]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await process.communicate()
retcode = process.returncode
if retcode:
raise subprocess.CalledProcessError(
retcode, process.args,
retcode, cmd,
output=stdout, stderr=stderr)
return stdout, stderr