2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-06-03 01:38:30 +00:00
Files
confluent/confluent_server/confluent/vinzmanager.py
T
2026-04-21 12:42:20 -04:00

314 lines
12 KiB
Python

import asyncio
import confluent.auth as auth
import confluent.messages as msg
import confluent.exceptions as exc
import confluent.tasks as tasks
import confluent.util as util
import confluent.config.configmanager as configmanager
import struct
import base64
import os
import pwd
import confluent.httpapi as httpapi
mountsbyuser = {}
_vinzfd = None
_vinztoken = None
import socket
import aiohmi.util.webclient as webclient
startingup = False
# Handle the vinz VNC session
async def assure_vinz():
global _vinzfd
global _vinztoken
global startingup
while startingup:
await asyncio.sleep(0.5)
try:
startingup = True
if _vinzfd is None:
_vinztoken = base64.b64encode(os.urandom(33), altchars=b'_-').decode()
os.environ['VINZ_TOKEN'] = _vinztoken
os.makedirs('/var/run/confluent/vinz/sessions', exist_ok=True)
os.chmod('/var/run/confluent/vinz', 0o711)
os.chmod('/var/run/confluent/vinz/sessions', 0o711)
_vinzfd = await asyncio.subprocess.create_subprocess_exec(
'/opt/confluent/bin/vinz',
'-c', '/var/run/confluent/vinz/control',
'-w', '127.0.0.1:4007',
'-a', '/var/run/confluent/vinz/approval',
# vinz supports unix domain websocket, however apache reverse proxy is dicey that way in some versions
'-d', '/var/run/confluent/vinz/sessions')
while not os.path.exists('/var/run/confluent/vinz/control'):
await asyncio.sleep(0.5)
tasks.spawn_task(monitor_requests())
finally:
startingup = False
_unix_by_nodename = {}
_nodeparms = {}
async def get_url(nodename, inputdata, nodeparmcallback=None):
_nodeparms[nodename] = nodeparmcallback
method = inputdata.inputbynode[nodename]
await assure_vinz()
if method == 'wss':
return f'/vinz/kvmsession/{nodename}'
elif method == 'unix':
if nodename not in _unix_by_nodename or not os.path.exists(_unix_by_nodename[nodename]):
_unix_by_nodename[nodename] = await request_session(nodename)
return _unix_by_nodename[nodename]
_usersessions = {}
async def close_session(sessionid):
sessioninfo = _usersessions.get(sessionid, None)
if not sessioninfo:
return
del _usersessions[sessionid]
nodename = sessioninfo['nodename']
wc = sessioninfo['webclient']
cfg = configmanager.ConfigManager(None)
c = cfg.get_node_attributes(
nodename,
['secret.hardwaremanagementuser',
'secret.hardwaremanagementpassword',
], decrypt=True)
bmcuser = c.get(nodename, {}).get(
'secret.hardwaremanagementuser', {}).get('value', None)
bmcpass = c.get(nodename, {}).get(
'secret.hardwaremanagementpassword', {}).get('value', None)
if not isinstance(bmcuser, str):
bmcuser = bmcuser.decode()
if not isinstance(bmcpass, str):
bmcpass = bmcpass.decode()
if bmcuser and bmcpass:
await wc.grab_json_response_with_status(
'/logout', {'data': [bmcuser, bmcpass]},
headers={
'Content-Type': 'application/json',
'Accept': 'application/json',
'X-XSRF-TOKEN': wc.cookies['XSRF-TOKEN']})
async def send_grant(conn, nodename, rqtype):
cloop = asyncio.get_event_loop()
parmcallback = _nodeparms.get(nodename, None)
cookies = {}
protos = []
passwd = None
sessionid = os.urandom(8).hex()
while sessionid in _usersessions:
sessionid = os.urandom(8).hex()
if parmcallback: # plugin that handles the specifics of the vnc wrapping
if rqtype == 1:
raise Exception("Plugin managed login data not supported with legacy grant request")
cxnmgr = await parmcallback()
_usersessions[sessionid] = {
'cxnmgr': cxnmgr,
'nodename': nodename,
}
url = cxnmgr.url
fprint = cxnmgr.fprint
cookies = cxnmgr.cookies
protos = cxnmgr.protos
host = cxnmgr.host
portnum = cxnmgr.portnum
passwd = cxnmgr.password
#url, fprint, cookies, protos = parmcallback(nodename)
else:
# original openbmc dialect
portnum = 443
cloop = asyncio.get_event_loop()
cfg = configmanager.ConfigManager(None)
c = cfg.get_node_attributes(
nodename,
['secret.hardwaremanagementuser',
'secret.hardwaremanagementpassword',
'hardwaremanagement.manager'], decrypt=True)
bmcuser = c.get(nodename, {}).get(
'secret.hardwaremanagementuser', {}).get('value', None)
bmcpass = c.get(nodename, {}).get(
'secret.hardwaremanagementpassword', {}).get('value', None)
host = c.get(nodename, {}).get(
'hardwaremanagement.manager', {}).get('value', None)
if bmcuser and bmcpass and host:
kv = util.TLSCertVerifier(cfg, nodename,
'pubkeys.tls_hardwaremanager').verify_cert
wc = webclient.WebConnection(host, 443, verifycallback=kv)
if not isinstance(bmcuser, str):
bmcuser = bmcuser.decode()
if not isinstance(bmcpass, str):
bmcpass = bmcpass.decode()
rsp = await wc.grab_json_response_with_status(
'/login', {'data': [bmcuser, bmcpass]},
headers={'Content-Type': 'application/json',
'Accept': 'application/json'})
for cky in wc.cookies:
if cky.key == 'SESSION' or cky.key == 'XSRF-TOKEN':
cookies[cky.key] = cky.value
if rqtype == 1:
# unfortunately, the original protocol failed to
# provide a means for separate tracking bmc side
# and confluent side
# chances are pretty good still
sessionid = cookies['SESSION']
sessiontok = cookies['XSRF-TOKEN']
protos.append(sessiontok)
_usersessions[sessionid] = {
'webclient': wc,
'nodename': nodename,
}
url = '/kvm/0'
fprintinfo = cfg.get_node_attributes(nodename, 'pubkeys.tls_hardwaremanager')
fprint = fprintinfo.get(
nodename, {}).get('pubkeys.tls_hardwaremanager', {}).get('value', None)
if not fprint:
return
if '$' in fprint:
fprint = fprint.split('$', 1)[1]
fprint = bytes.fromhex(fprint)
await cloop.sock_sendall(conn, struct.pack('!BI', rqtype, len(host)))
await cloop.sock_sendall(conn, host.encode())
await cloop.sock_sendall(conn, struct.pack('!I', len(sessionid)))
await cloop.sock_sendall(conn, sessionid.encode())
if rqtype == 1:
await cloop.sock_sendall(conn, struct.pack('!I', len(sessiontok)))
await cloop.sock_sendall(conn, sessiontok.encode())
await cloop.sock_sendall(conn, struct.pack('!I', len(fprint)))
await cloop.sock_sendall(conn, fprint)
await cloop.sock_sendall(conn, struct.pack('!I', len(url)))
await cloop.sock_sendall(conn, url.encode())
else: # newer TLV style protocol
await cloop.sock_sendall(conn, struct.pack('!H', portnum))
await cloop.sock_sendall(conn, struct.pack('!BI', 4, len(url)))
await cloop.sock_sendall(conn, url.encode())
for cook in cookies:
v = cookies[cook]
totlen = len(cook) + len(v) + 4
await cloop.sock_sendall(conn, struct.pack('!BIH', 1, totlen, len(cook.encode())))
await cloop.sock_sendall(conn, cook.encode())
await cloop.sock_sendall(conn, struct.pack('!H', len(v.encode())))
await cloop.sock_sendall(conn, v.encode())
for proto in protos:
await cloop.sock_sendall(conn, struct.pack('!BI', 2, len(proto.encode())))
await cloop.sock_sendall(conn, proto.encode())
await cloop.sock_sendall(conn, struct.pack('!BI', 3, len(fprint)))
await cloop.sock_sendall(conn, fprint)
if passwd:
await cloop.sock_sendall(conn, struct.pack('!BI', 5, len(passwd.encode()[:8])))
await cloop.sock_sendall(conn, passwd.encode()[:8])
await cloop.sock_sendall(conn, b'\xff')
async def recv_exact(conn, n):
#TODO:asyncmerge: review recv_exact usage
cloop = asyncio.get_event_loop()
retdata = b''
while len(retdata) < n:
currdata = await cloop.sock_recv(conn, n - len(retdata))
if not currdata:
raise Exception("Error receiving")
retdata += currdata
return retdata
async def evaluate_request(conn):
allow = False
authname = None
cloop = asyncio.get_event_loop()
try:
creds = conn.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED,
struct.calcsize('iII'))
pid, uid, gid = struct.unpack('iII', creds)
if uid != os.getuid():
return
rqcode, fieldlen = struct.unpack('!BI', await recv_exact(conn, 5))
authtoken = (await recv_exact(conn, fieldlen)).decode()
if authtoken != _vinztoken:
return
if rqcode == 2: # disconnect notification
fieldlen = struct.unpack('!I', await recv_exact(conn, 4))[0]
sessionid = (await recv_exact(conn, fieldlen)).decode()
await close_session(sessionid)
await cloop.sock_recv(conn, 1) # digest 0xff
# if rqcode == 3: # new form connection request
# this will generalize things, to allow describing
# arbitrary cookies and subprotocols
# for the websocket connection
if rqcode in (1, 3): # request for new connection
lenbytes = await recv_exact(conn, 4)
fieldlen = struct.unpack('!I', lenbytes)[0]
nodename = await recv_exact(conn, fieldlen)
nodename = nodename.decode()
idbyte = await cloop.sock_recv(conn, 1)
idtype = struct.unpack('!B', idbyte)[0]
if idtype == 1:
msgbytes = await recv_exact(conn, 4)
usernum = struct.unpack('!I', msgbytes)[0]
if usernum == 0: # root is a special guy
await send_grant(conn, nodename, rqcode)
return
try:
authname = pwd.getpwuid(usernum).pw_name
except Exception:
return
elif idtype == 2:
msgbytes = await recv_exact(conn, 4)
fieldlen = struct.unpack('!I', msgbytes)[0]
sessionid = await recv_exact(conn, fieldlen)
msgbytes = await recv_exact(conn, 4)
fieldlen = struct.unpack('!I', msgbytes)[0]
sessiontok = await recv_exact(conn, fieldlen)
try:
authname = httpapi.get_user_for_session(sessionid, sessiontok)
except Exception:
return
else:
return
await cloop.sock_recv(conn, 1) # should be 0xff
if authname:
allow = auth.authorize(authname, f'/nodes/{nodename}/console/ikvm')
if allow:
await send_grant(conn, nodename, rqcode)
finally:
conn.close()
async def monitor_requests():
cloop = asyncio.get_event_loop()
a = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
os.remove('/var/run/confluent/vinz/approval')
except Exception:
pass
a.bind('/var/run/confluent/vinz/approval')
os.chmod('/var/run/confluent/vinz/approval', 0o600)
a.listen(8)
a.setblocking(0)
while True:
conn, addr = await cloop.sock_accept(a)
tasks.spawn_task(evaluate_request(conn))
async def request_session(nodename):
await assure_vinz()
cloop = asyncio.get_event_loop()
a = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
a.setblocking(0)
await cloop.sock_connect(a, '/var/run/confluent/vinz/control')
nodename = nodename.encode()
await cloop.sock_sendall(a, struct.pack('!BI', 1, len(nodename)))
await cloop.sock_sendall(a, nodename)
await cloop.sock_sendall(a, b'\xff')
rsp = await cloop.sock_recv(a, 1)
retcode = struct.unpack('!B', rsp)[0]
if retcode != 1:
raise Exception("Bad return code")
rsp = await cloop.sock_recv(a, 4)
nlen = struct.unpack('!I', rsp)[0]
sockname = (await cloop.sock_recv(a, nlen)).decode('utf8')
retcode = await cloop.sock_recv(a, 1)
if retcode != b'\xff':
raise Exception("Unrecognized response")
return os.path.join('/var/run/confluent/vinz/sessions', sockname)