2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-06 08:56:27 +00:00

Begin work on selfservice asyncio port

Have a deploycfg call be able to proceed through.
This commit is contained in:
Jarrod Johnson
2024-08-16 17:06:49 -04:00
parent 2f415caead
commit 6f776a657c
3 changed files with 139 additions and 145 deletions

View File

@@ -64,8 +64,8 @@ class CredServer(object):
hmackey = None
hmacval = None
cloop = asyncio.get_event_loop()
await cloop.sock_send(client, b'\xc2\xd1-\xa8\x80\xd8j\xba')
tlv = bytearray(cloop.sock_recv(client, 2))
await cloop.sock_sendall(client, b'\xc2\xd1-\xa8\x80\xd8j\xba')
tlv = bytearray(await cloop.sock_recv(client, 2))
if tlv[0] != 1:
client.close()
return
@@ -94,7 +94,7 @@ class CredServer(object):
if not isinstance(sealed, bytes):
sealed = sealed.encode('utf8')
reply = b'\x80' + struct.pack('>H', len(sealed) + 1) + sealed + b'\x00'
await cloop.sock_send(client, reply)
await cloop.sock_sendall(client, reply)
client.close()
return
if apiarmed not in ('once', 'continuous'):
@@ -104,10 +104,10 @@ class CredServer(object):
self.cfm.set_node_attributes({nodename: {'deployment.apiarmed': ''}})
client.close()
return
await cloop.sock_send(client, b'\x02\x20')
await cloop.sock_sendall(client, b'\x02\x20')
rttoken = os.urandom(32)
await cloop.sock_send(client, rttoken)
await cloop.sock_send(client, b'\x00\x00')
await cloop.sock_sendall(client, rttoken)
await cloop.sock_sendall(client, b'\x00\x00')
tlv = bytearray(await cloop.sock_recv(client, 2))
if tlv[0] != 3:
client.close()
@@ -127,9 +127,9 @@ class CredServer(object):
client.close()
return
cfgupdate = {nodename: {'crypted.selfapikey': {'hashvalue': echotoken}}}
self.cfm.set_node_attributes(cfgupdate)
await self.cfm.set_node_attributes(cfgupdate)
await cloop.sock_recv(client, 2) # drain end of message
await cloop.sock_send(client, b'\x05\x00') # report success
await cloop.sock_sendall(client, b'\x05\x00') # report success
if hmackey and apiarmed != 'continuous':
self.cfm.clear_node_attributes([nodename], ['secret.selfapiarmtoken'])
if apiarmed != 'continuous':
@@ -140,7 +140,7 @@ class CredServer(object):
except Exception:
pass
if disarm:
self.cfm.set_node_attributes(disarm)
await self.cfm.set_node_attributes(disarm)
async def main():

View File

@@ -652,7 +652,7 @@ async def resourcehandler_backend(req, make_response):
reqtype = None
reqpath = req.rel_url.path
if reqpath.startswith('/self/'):
return await selfservice.handle_request(req, make_response)
return await selfservice.handle_request(req, make_response, mimetype)
if reqpath.startswith('/boot/'):
request = reqpath.split('/')
if not request[0]:

View File

@@ -1,3 +1,4 @@
import asyncio
import confluent.runansible as runansible
import confluent.syncfiles as syncfiles
import confluent.config.configmanager as configmanager
@@ -6,8 +7,6 @@ import confluent.netutil as netutil
import confluent.noderange as noderange
import confluent.sshutil as sshutil
import confluent.util as util
import eventlet.green.socket as socket
import eventlet.green.subprocess as subprocess
import confluent.discovery.handlers.xcc as xcc
import confluent.discovery.handlers.tsm as tsm
import confluent.discovery.core as disco
@@ -26,8 +25,8 @@ except ImportError:
from yaml import SafeLoader
from yaml import SafeDumper
import confluent.discovery.protocols.ssdp as ssdp
import eventlet
webclient = eventlet.import_patched('pyghmi.util.webclient')
import subprocess
import aiohmi.util.webclient as webclient
currtz = 'UTC'
@@ -81,47 +80,47 @@ def get_extra_names(nodename, cfg, myip=None):
names.add(nip)
return names
def handle_request(req, make_response):
async def handle_request(req, make_response, mimetype):
global currtz
global keymap
global currlocale
global currtzvintage
configmanager.check_quorum()
cfg = configmanager.ConfigManager(None)
nodename = env.get('HTTP_CONFLUENT_NODENAME', None)
clientip = env.get('HTTP_X_FORWARDED_FOR', None)
if env['PATH_INFO'] == '/self/whoami':
reqpath = req.rel_url.path
nodename = req.headers.get('CONFLUENT_NODENAME', None)
clientip = req.headers.get('X-Forwarded-For', None)
if reqpath == '/self/whoami':
clientids = env.get('HTTP_CONFLUENT_IDS', None)
if not clientids:
start_response('400 Bad Request', [])
yield 'Bad Request'
rsp = await make_response(mimetype, 400, 'Bad Request')
await rsp.write( 'Bad Request')
return
for ids in clientids.split('/'):
_, v = ids.split('=', 1)
repname = disco.get_node_by_uuid_or_mac(v)
if repname:
start_response('200 OK', [])
yield repname
rsp = await make_response(mimetype, 200, 'OK')
await rsp.write(repname)
return
start_response('404 Unknown', [])
yield ''
rsp = await make_response(mimetype, 404, 'Unknown')
return
if env['PATH_INFO'] == '/self/registerapikey':
if reqpath == '/self/registerapikey':
crypthmac = env.get('HTTP_CONFLUENT_CRYPTHMAC', None)
if int(env.get('CONTENT_LENGTH', 65)) > 64:
start_response('400 Bad Request', [])
yield 'Bad Request'
rsp = await make_response(mimetype, 400, 'Bad Request')
await rsp.write('Bad Request')
return
cryptkey = env['wsgi.input'].read(int(env['CONTENT_LENGTH']))
cryptkey = await req.read()
if not (crypthmac and cryptkey):
start_response('401 Unauthorized', [])
yield 'Unauthorized'
rsp = make_response(mimetype, 401, 'Unauthorized')
await rsp.write('Unauthorized')
return
hmackey = cfg.get_node_attributes(nodename, ['secret.selfapiarmtoken'], decrypt=True)
hmackey = hmackey.get(nodename, {}).get('secret.selfapiarmtoken', {}).get('value', None)
if not hmackey:
start_response('401 Unauthorized', [])
yield 'Unauthorized'
rsp = await make_response(mimetype, 401, 'Unauthorized')
await rsp.write('Unauthorized')
return
if not isinstance(hmackey, bytes):
hmackey = hmackey.encode('utf8')
@@ -130,53 +129,53 @@ def handle_request(req, make_response):
try:
crypthmac = base64.b64decode(crypthmac)
except Exception:
start_response('400 Bad Request', [])
yield 'Bad Request'
rsp = await make_response(mimetype, 400, 'Bad Request')
await rsp.write('Bad Request')
return
righthmac = hmac.new(hmackey, cryptkey, hashlib.sha256).digest()
if righthmac == crypthmac:
cfgupdate = {nodename: {'crypted.selfapikey': {'hashvalue': cryptkey}}}
cfg.set_node_attributes(cfgupdate)
cfg.clear_node_attributes([nodename], ['secret.selfapiarmtoken'])
start_response('200 OK', [])
yield 'Accepted'
rsp = await make_response(mimetype, 200, 'OK')
await rsp.write('Accepted')
return
start_response('401 Unauthorized', [])
yield 'Unauthorized'
rsp = await make_response(mimetype, 401, 'Unauthorized')
await rsp.write('Unauthorized')
return
apikey = env.get('HTTP_CONFLUENT_APIKEY', None)
apikey = req.headers.get('CONFLUENT_APIKEY', None)
if not (nodename and apikey):
start_response('401 Unauthorized', [])
yield 'Unauthorized'
rsp = await make_response(mimetype, 401, 'Unauthorized')
await rsp.write('Unauthorized')
return
if len(apikey) > 48:
start_response('401', [])
yield 'Unauthorized'
rsp = await make_response(mimetype, 401, 'Unauthorized')
await rsp.write('Unauthorized')
return
ea = cfg.get_node_attributes(nodename, ['crypted.selfapikey', 'deployment.apiarmed'])
eak = ea.get(
nodename, {}).get('crypted.selfapikey', {}).get('hashvalue', None)
if not eak:
start_response('401 Unauthorized', [])
yield 'Unauthorized'
return
rsp = await make_response(mimetype, 401, 'Unauthorized')
await rsp.write(b'Unauthorized')
return rsp
if not isinstance(eak, str):
eak = eak.decode('utf8')
salt = '$'.join(eak.split('$', 3)[:-1]) + '$'
if crypt.crypt(apikey, salt) != eak:
start_response('401 Unauthorized', [])
yield 'Unauthorized'
rsp = await make_response(mimetype, 401, 'Unauthorized')
await rsp.write(b'Unauthorized')
return
if ea.get(nodename, {}).get('deployment.apiarmed', {}).get('value', None) == 'once':
cfg.set_node_attributes({nodename: {'deployment.apiarmed': ''}})
myip = env.get('HTTP_X_FORWARDED_HOST', None)
myip = req.headers.get('X-Forwarded-Host', None)
if myip and ']' in myip:
myip = myip.split(']', 1)[0]
elif myip:
myip = myip.split(':', 1)[0]
if myip:
myip = myip.replace('[', '').replace(']', '')
retype = env.get('HTTP_ACCEPT', 'application/yaml')
retype = req.headers.get('Accept', 'application/yaml')
isgeneric = False
if retype == '*/*':
isgeneric = True
@@ -186,21 +185,21 @@ def handle_request(req, make_response):
elif retype == 'application/json':
dumper = json.dumps
else:
start_response('406 Not supported', [])
yield 'Unsupported content type in ACCEPT: ' + retype
rsp = await make_response(mimetype, 406, 'Not supported')
await rsp.write(b'Unsupported content type in ACCEPT: ' + retype.encode())
return
operation = env['REQUEST_METHOD']
if operation not in ('HEAD', 'GET') and 'CONTENT_LENGTH' in env and int(env['CONTENT_LENGTH']) > 0:
reqbody = env['wsgi.input'].read(int(env['CONTENT_LENGTH']))
if env['PATH_INFO'] == '/self/register_discovered':
operation = req.method
if operation not in ('HEAD', 'GET') and req.content_length > 0:
reqbody = await req.read()
if reqpath == '/self/register_discovered':
rb = json.loads(reqbody)
if not rb.get('path', None):
start_response('400 Bad Requst', [])
yield 'Missing Path'
rsp = await make_response(mimetype, 400, 'Bad Requst')
await rsp.write('Missing Path')
return
targurl = '/affluent/systems/by-port/{0}/webaccess'.format(rb['path'])
tlsverifier = util.TLSCertVerifier(cfg, nodename, 'pubkeys.tls_hardwaremanager')
wc = webclient.SecureHTTPConnection(nodename, 443, verifycallback=tlsverifier.verify_cert)
wc = webclient.WebConnection(nodename, 443, verifycallback=tlsverifier.verify_cert)
relaycreds = cfg.get_node_attributes(nodename, 'secret.*', decrypt=True)
relaycreds = relaycreds.get(nodename, {})
relayuser = relaycreds.get('secret.hardwaremanagementuser', {}).get('value', None)
@@ -208,10 +207,8 @@ def handle_request(req, make_response):
if not relayuser or not relaypass:
raise Exception('No credentials for {0}'.format(nodename))
wc.set_basic_credentials(relayuser, relaypass)
wc.request('GET', targurl)
rsp = wc.getresponse()
_ = rsp.read()
if rsp.status == 302:
rsp, status = await wc.grab_json_response_with_status(targurl)
if status == 302:
newurl = rsp.headers['Location']
newhost, newport = newurl.replace('https://', '').split('/')[0].split(':')
def verify_cert(certificate):
@@ -225,18 +222,18 @@ def handle_request(req, make_response):
if 'bay' in rb:
rb['enclosure.bay'] = rb['bay']
if rb['type'] == 'lenovo-xcc':
ssdp.check_fish(('/DeviceDescription.json', rb), newport, verify_cert)
await ssdp.check_fish(('/DeviceDescription.json', rb), newport, verify_cert)
elif rb['type'] == 'lenovo-smm2':
rb['services'] = ['service:lenovo-smm2']
else:
start_response('400 Unsupported Device', [])
yield 'Unsupported device for remote discovery registration'
rsp = await make_response(mimetype, 400, 'Unsupported Device')
await rsp.write('Unsupported device for remote discovery registration')
return
disco.detected(rb)
start_response('200 OK', [])
yield 'Registered'
await disco.detected(rb)
rsp = await make_response(mimetype, 200, 'OK')
await rsp.write('Registered')
return
if env['PATH_INFO'] == '/self/bmcconfig':
if reqpath == '/self/bmcconfig':
hmattr = cfg.get_node_attributes(nodename, 'hardwaremanagement.*')
hmattr = hmattr.get(nodename, {})
res = {}
@@ -249,7 +246,7 @@ def handle_request(req, make_response):
bmcaddr = hmattr.get('hardwaremanagement.manager', {}).get('value',
None)
bmcaddr = bmcaddr.split('/', 1)[0]
bmcaddr = socket.getaddrinfo(bmcaddr, 0)[0]
bmcaddr = await asyncio.get_event_loop().getaddrinfo(bmcaddr, 0)[0]
bmcaddr = bmcaddr[-1][0]
if '.' in bmcaddr: # ipv4 is allowed
netconfig = netutil.get_nic_config(cfg, nodename, ip=bmcaddr)
@@ -257,24 +254,24 @@ def handle_request(req, make_response):
res['prefixv4'] = netconfig['prefix']
res['bmcgw'] = netconfig.get('ipv4_gateway', None)
# credential security results in user/password having to be deferred
start_response('200 OK', (('Content-Type', retype),))
yield dumper(res)
elif env['PATH_INFO'] == '/self/myattribs':
mrsp = await make_response(mimetype, 200, 'OK')
await mrsp.write(dumper(res))
elif reqpath == '/self/myattribs':
cfd = cfg.get_node_attributes(nodename, '*').get(nodename, {})
rsp = {}
for k in cfd:
if k.startswith('secret') or k.startswith('crypt') or 'value' not in cfd[k] or not cfd[k]['value']:
continue
rsp[k] = cfd[k]['value']
start_response('200 OK', (('Conntent-Type', retype),))
yield dumper(rsp)
elif env['PATH_INFO'] == '/self/netcfg':
mrsp = await make_response(mimetype, 200, 'OK')
await mrsp.write(dumper(rsp))
elif reqpath == '/self/netcfg':
ncfg = netutil.get_full_net_config(cfg, nodename, myip)
start_response('200 OK', (('Content-Type', retype),))
yield dumper(ncfg)
elif env['PATH_INFO'] in ('/self/deploycfg', '/self/deploycfg2'):
if 'HTTP_CONFLUENT_MGTIFACE' in env:
nicname = env['HTTP_CONFLUENT_MGTIFACE']
mrsp = await make_response(mimetype, 200, 'OK')
await mrsp.write(dumper(ncfg))
elif reqpath in ('/self/deploycfg', '/self/deploycfg2'):
if 'CONFLUENT_MGTIFACE' in req.headers:
nicname = req.headers['CONFLUENT_MGTIFACE']
try:
ifidx = int(nicname)
except ValueError:
@@ -283,7 +280,7 @@ def handle_request(req, make_response):
ncfg = netutil.get_nic_config(cfg, nodename, ifidx=ifidx)
else:
ncfg = netutil.get_nic_config(cfg, nodename, serverip=myip)
if env['PATH_INFO'] == '/self/deploycfg':
if reqpath == '/self/deploycfg':
for key in list(ncfg):
if 'v6' in key:
del ncfg[key]
@@ -375,7 +372,8 @@ def handle_request(req, make_response):
continue
keymap = ckeymap
try:
tdc = util.run(['timedatectl'])[0].split(b'\n')
tdcout, tdcerr = await util.check_output('timedatectl')
tdc = tdcout.split(b'\n')
except subprocess.CalledProcessError:
tdc = []
currtzvintage = time.time()
@@ -404,30 +402,31 @@ def handle_request(req, make_response):
ncfg['ntpservers'].append(ntpsrv)
dnsdomain = deployinfo.get('dns.domain', {}).get('value', None)
ncfg['dnsdomain'] = dnsdomain
start_response('200 OK', (('Content-Type', retype),))
yield dumper(ncfg)
elif env['PATH_INFO'] == '/self/sshcert' and reqbody:
mrsp = await make_response(mimetype, 200, 'OK')
await mrsp.write(dumper(ncfg).encode())
return mrsp
elif reqpath == '/self/sshcert' and reqbody:
if not sshutil.ca_exists():
start_response('500 Unconfigured', ())
yield 'CA is not configured on this system (run ...)'
return
mrsp = await make_response(mimetype, 500, 'Unconfigured')
await msrp.write(b'CA is not configured on this system (run ...)')
return mrsp
pals = get_extra_names(nodename, cfg, myip)
cert = sshutil.sign_host_key(reqbody, nodename, pals)
start_response('200 OK', (('Content-Type', 'text/plain'),))
yield cert
elif env['PATH_INFO'] == '/self/nodelist':
mrsp = await make_response('text/plain', 200, 'OK')
await mrsp.write(cert)
elif reqpath == '/self/nodelist':
nodes, _ = get_cluster_list(nodename, cfg)
if isgeneric:
start_response('200 OK', (('Content-Type', 'text/plain'),))
mrsp = await make_response('text/plain', 200, 'OK')
for node in util.natural_sort(nodes):
yield node + '\n'
await mrsp.write(node + '\n')
else:
start_response('200 OK', (('Content-Type', retype),))
mrsp = await make_response(retype, 200, 'OK')
if retype == 'application/yaml':
yield listdump(list(util.natural_sort(nodes)))
await mrsp.write(listdump(list(util.natural_sort(nodes))))
else:
yield dumper(list(util.natural_sort(nodes)))
elif env['PATH_INFO'] == '/self/remoteconfigbmc' and reqbody:
await mrsp.write(dumper(list(util.natural_sort(nodes))))
elif reqpath == '/self/remoteconfigbmc' and reqbody:
try:
reqbody = yamlload(reqbody)
except Exception:
@@ -438,11 +437,11 @@ def handle_request(req, make_response):
elif cfgmod == 'tsm':
tsm.remote_nodecfg(nodename, cfg)
else:
start_response('500 unsupported configmod', ())
yield 'Unsupported configmod "{}"'.format(cfgmod)
start_response('200 Ok', ())
yield 'complete'
elif env['PATH_INFO'] == '/self/updatestatus' and reqbody:
mrsp = await make_response(mimetype, 500, 'unsupported configmod')
await mrsp.write('Unsupported configmod "{}"'.format(cfgmod))
mrsp = await make_response(mimetype, 200, 'Ok')
await mrsp.write('complete')
elif reqpath == '/self/updatestatus' and reqbody:
update = yamlload(reqbody)
statusstr = update.get('state', None)
statusdetail = update.get('state_detail', None)
@@ -454,8 +453,8 @@ def handle_request(req, make_response):
cfg.set_node_attributes({nodename: {'deployment.state_detail': statusdetail}})
didstateupdate = True
if 'status' not in update and didstateupdate:
start_response('200 Ok', ())
yield 'Accepted'
mrsp = await make_response(mimetype, 200, 'Ok')
await mrsp.write('Accepted')
return
if update['status'] == 'staged':
targattr = 'deployment.stagedprofile'
@@ -479,20 +478,19 @@ def handle_request(req, make_response):
if currprof != pending:
updates[targattr] = {'value': pending}
cfg.set_node_attributes({nodename: updates})
start_response('200 OK', (('Content-Type', 'text/plain'),))
yield 'OK'
mrsp = await make_response('text/plain', 200, 'OK')
await mrsp.write('OK')
else:
start_response('500 Error', (('Content-Type', 'text/plain'),))
yield 'No pending profile detected, unable to accept status update'
elif env['PATH_INFO'] == '/self/saveapikey' and reqbody:
mrsp = await make_response('text/plain', 500, 'Error')
await mrsp.write('No pending profile detected, unable to accept status update')
elif reqpath == '/self/saveapikey' and reqbody:
if not isinstance(reqbody, str):
reqbody = reqbody.decode('utf8')
cfg.set_node_attributes({
nodename: {'deployment.sealedapikey': {'value': reqbody}}})
start_response('200 OK', ())
yield ''
elif env['PATH_INFO'].startswith('/self/remoteconfig/') and 'POST' == operation:
scriptcat = env['PATH_INFO'].replace('/self/remoteconfig/', '')
mrsp = await make_response(mimetype, 200, 'OK')
elif reqpath.startswith('/self/remoteconfig/') and 'POST' == operation:
scriptcat = reqpath.replace('/self/remoteconfig/', '')
playlist = []
for privacy in ('public', 'private'):
slist, profile = get_scriptlist(
@@ -508,72 +506,68 @@ def handle_request(req, make_response):
playlist.append(os.path.join(dirname, filename))
if playlist:
runansible.run_playbooks(playlist, [nodename])
start_response('202 Queued', ())
yield ''
mrsp = await make_response(mimetype, 202, 'Queued')
else:
start_response('200 OK', ())
yield ''
mrsp = await make_response(mimetype, 200, 'OK')
return
elif env['PATH_INFO'].startswith('/self/remotesyncfiles'):
elif reqpath.startswith('/self/remotesyncfiles'):
if 'POST' == operation:
pals = get_extra_names(nodename, cfg, myip)
result = syncfiles.start_syncfiles(
nodename, cfg, json.loads(reqbody), pals)
start_response(result[0], ())
yield result[1]
mrsp = await make_response(mimetype, result[0])
await mrsp.write(result[1])
return
if 'GET' == operation:
status, output = syncfiles.get_syncresult(nodename)
output = json.dumps(output)
start_response(status, (('Content-Type', 'application/json'),))
yield output
mrsp = await make_response('application/json', status)
await mrsp.write(output)
return
elif env['PATH_INFO'].startswith('/self/remoteconfig/status'):
elif reqpath.startswith('/self/remoteconfig/status'):
rst = runansible.running_status.get(nodename, None)
if not rst:
start_response('204 Not Running', (('Content-Length', '0'),))
yield ''
mrsp = await make_response(mimetype, 204, 'Not Running')
return
start_response('200 OK', ())
mrsp = await make_response(mimetype, 200, 'OK')
if rst.complete:
del runansible.running_status[nodename]
yield rst.dump_text()
await mrsp.write(rst.dump_text())
return
elif env['PATH_INFO'].startswith('/self/scriptlist/'):
scriptcat = env['PATH_INFO'].replace('/self/scriptlist/', '')
elif reqpath.startswith('/self/scriptlist/'):
scriptcat = reqpath.replace('/self/scriptlist/', '')
slist, _ = get_scriptlist(
scriptcat, cfg, nodename,
'/var/lib/confluent/public/os/{0}/scripts/{1}')
if slist:
start_response('200 OK', (('Content-Type', 'application/yaml'),))
yield yamldump(util.natural_sort(slist))
mrsp = await make_response('application/yaml', 200, 'OK')
await mrsp.write(yamldump(util.natural_sort(slist)))
else:
start_response('200 OK', ())
yield ''
elif env['PATH_INFO'].startswith('/self/profileprivate/pending/'):
fname = env['PATH_INFO'].replace('/self/profileprivate/', '')
mrsp = await make_response(mimetype, 200, 'OK')
elif reqpath.startswith('/self/profileprivate/pending/'):
fname = reqpath.replace('/self/profileprivate/', '')
deployinfo = cfg.get_node_attributes(
nodename, ('deployment.*',))
deployinfo = deployinfo.get(nodename, {})
profile = deployinfo.get(
'deployment.pendingprofile', {}).get('value', '')
if not profile:
start_response('400 No pending profile', ())
yield 'No profile'
mrsp = await make_response(mimetype, 400, 'No pending profile')
await mrsp.write('No profile')
return
fname = '/var/lib/confluent/private/os/{}/{}'.format(profile, fname)
try:
with open(fname, 'rb') as privdata:
start_response('200 OK', ())
yield privdata.read()
mrsp = await make_response(200, 'OK')
await mrsp.write(privdata.read())
return
except IOError:
start_response('404 Not Found', ())
yield 'Not found'
mrsp = await make_response(mimetype, 404, 'Not Found')
await mrsp.write('Not found')
return
else:
start_response('404 Not Found', ())
yield 'Not found'
mrsp = await make_response(mimetype, 404, 'Not Found')
await mrsp.write('Not found')
def get_scriptlist(scriptcat, cfg, nodename, pathtemplate):
if '..' in scriptcat: