2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-27 19:07:46 +00:00

Rework vinzmanager for async operation

This commit is contained in:
Jarrod Johnson
2024-09-10 11:26:18 -04:00
parent dac383af59
commit feaa3bb7b4
2 changed files with 98 additions and 53 deletions

View File

@@ -0,0 +1,34 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2024 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This provides linkage between vinz and confluent, with support
# for getting session authorization from the BMC
import confluent.vinzmanager as vinzmanager
import confluent.messages as msg
async def create(nodes, element, configmanager, inputdata):
for node in nodes:
url = await vinzmanager.get_url(node, inputdata)
yield msg.ChildCollection(url)
async def update(nodes, element, configmanager, inputdata):
for node in nodes:
url = await vinzmanager.get_url(node, inputdata)
yield msg.ChildCollection(url)

View File

@@ -1,13 +1,11 @@
import asyncio
import confluent.auth as auth
import eventlet
import confluent.messages as msg
import confluent.exceptions as exc
import confluent.util as util
import confluent.config.configmanager as configmanager
import struct
import eventlet.green.socket as socket
import eventlet.green.subprocess as subprocess
import base64
import os
import pwd
@@ -15,11 +13,12 @@ import confluent.httpapi as httpapi
mountsbyuser = {}
_vinzfd = None
_vinztoken = None
webclient = eventlet.import_patched('pyghmi.util.webclient')
import socket
import aiohmi.util.webclient as webclient
# Handle the vinz VNC session
def assure_vinz():
async def assure_vinz():
global _vinzfd
global _vinztoken
if _vinzfd is None:
@@ -27,26 +26,26 @@ def assure_vinz():
os.environ['VINZ_TOKEN'] = _vinztoken
os.makedirs('/var/run/confluent/vinz/sessions', exist_ok=True)
_vinzfd = subprocess.Popen(
['/opt/confluent/bin/vinz',
_vinzfd = await asyncio.subprocess.create_subprocess_exec(
'/opt/confluent/bin/vinz',
'-c', '/var/run/confluent/vinz/control',
'-w', '127.0.0.1:4007',
'-a', '/var/run/confluent/vinz/approval',
# vinz supports unix domain websocket, however apache reverse proxy is dicey that way in some versions
'-d', '/var/run/confluent/vinz/sessions'])
'-d', '/var/run/confluent/vinz/sessions')
while not os.path.exists('/var/run/confluent/vinz/control'):
eventlet.sleep(0.5)
eventlet.spawn(monitor_requests)
await asyncio.sleep(0.5)
util.spawn(monitor_requests())
_unix_by_nodename = {}
def get_url(nodename, inputdata):
async def get_url(nodename, inputdata):
method = inputdata.inputbynode[nodename]
assure_vinz()
if method == 'wss':
return f'/vinz/kvmsession/{nodename}'
elif method == 'unix':
if nodename not in _unix_by_nodename or not os.path.exists(_unix_by_nodename[nodename]):
_unix_by_nodename[nodename] = request_session(nodename)
_unix_by_nodename[nodename] = await request_session(nodename)
return _unix_by_nodename[nodename]
@@ -81,7 +80,8 @@ def close_session(sessionid):
'X-XSRF-TOKEN': wc.cookies['XSRF-TOKEN']})
def send_grant(conn, nodename):
async def send_grant(conn, nodename):
cloop = asyncio.get_event_loop()
cfg = configmanager.ConfigManager(None)
c = cfg.get_node_attributes(
nodename,
@@ -97,7 +97,7 @@ def send_grant(conn, nodename):
if bmcuser and bmcpass and bmc:
kv = util.TLSCertVerifier(cfg, nodename,
'pubkeys.tls_hardwaremanager').verify_cert
wc = webclient.SecureHTTPConnection(bmc, 443, verifycallback=kv)
wc = webclient.WebConnection(bmc, 443, verifycallback=kv)
if not isinstance(bmcuser, str):
bmcuser = bmcuser.decode()
if not isinstance(bmcpass, str):
@@ -120,69 +120,79 @@ def send_grant(conn, nodename):
return
fprint = fprint.split('$', 1)[1]
fprint = bytes.fromhex(fprint)
conn.send(struct.pack('!BI', 1, len(bmc)))
conn.send(bmc.encode())
conn.send(struct.pack('!I', len(sessionid)))
conn.send(sessionid.encode())
conn.send(struct.pack('!I', len(sessiontok)))
conn.send(sessiontok.encode())
conn.send(struct.pack('!I', len(fprint)))
conn.send(fprint)
conn.send(struct.pack('!I', len(url)))
conn.send(url.encode())
conn.send(b'\xff')
await cloop.sock_send(conn, struct.pack('!BI', 1, len(bmc)))
await cloop.sock_send(conn, bmc.encode())
await cloop.sock_send(conn, struct.pack('!I', len(sessionid)))
await cloop.sock_send(conn, sessionid.encode())
await cloop.sock_send(conn, struct.pack('!I', len(sessiontok)))
await cloop.sock_send(conn, sessiontok.encode())
await cloop.sock_send(conn, struct.pack('!I', len(fprint)))
await cloop.sock_send(conn, fprint)
await cloop.sock_send(conn, struct.pack('!I', len(url)))
await cloop.sock_send(conn, url.encode())
await cloop.sock_send(conn, b'\xff')
def evaluate_request(conn):
async def evaluate_request(conn):
allow = False
authname = None
cloop = asyncio.get_event_loop()
try:
creds = conn.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED,
struct.calcsize('iII'))
pid, uid, gid = struct.unpack('iII', creds)
if uid != os.getuid():
return
rqcode, fieldlen = struct.unpack('!BI', conn.recv(5))
authtoken = conn.recv(fieldlen).decode()
recvdata = await cloop.sock_recv(conn, 5)
rqcode, fieldlen = struct.unpack('!BI', recvdata)
authtoken = await cloop.sock_recv(conn, fieldlen)
authtoken = authtoken.decode()
if authtoken != _vinztoken:
return
if rqcode == 2: # disconnect notification
fieldlen = struct.unpack('!I', conn.recv(4))[0]
sessionid = conn.recv(fieldlen).decode()
msglen = await cloop.sock_recv(4)
fieldlen = struct.unpack('!I', msglen)[0]
sessionid = (await cloop.sock_recv(conn, fieldlen)).decode()
close_session(sessionid)
conn.recv(1) # digest 0xff
await cloop.sock_recv(conn, 1) # digest 0xff
if rqcode == 1: # request for new connection
fieldlen = struct.unpack('!I', conn.recv(4))[0]
nodename = conn.recv(fieldlen).decode()
idtype = struct.unpack('!B', conn.recv(1))[0]
lenbytes = await cloop.sock_recv(conn, 4)
fieldlen = struct.unpack('!I', lenbytes)[0]
nodename = (await cloop.sock_recv(conn, fieldlen)).decode()
idbyte = await cloop.sock_recv(conn, 1)
idtype = struct.unpack('!B', idbyte)[0]
if idtype == 1:
usernum = struct.unpack('!I', conn.recv(4))[0]
msgbytes = cloop.sock_recv(conn, 4)
usernum = struct.unpack('!I', msgbytes)[0]
if usernum == 0: # root is a special guy
send_grant(conn, nodename)
await send_grant(conn, nodename)
return
try:
authname = pwd.getpwuid(usernum).pw_name
except Exception:
return
elif idtype == 2:
fieldlen = struct.unpack('!I', conn.recv(4))[0]
sessionid = conn.recv(fieldlen)
fieldlen = struct.unpack('!I', conn.recv(4))[0]
sessiontok = conn.recv(fieldlen)
msgbytes = await cloop.sock_recv(conn, 4)
fieldlen = struct.unpack('!I', msgbytes)[0]
sessionid = cloop.sock_recv(conn, fieldlen)
msgbytes = await cloop.sock_recv(conn, 4)
fieldlen = struct.unpack('!I', msgbytes)[0]
sessiontok = await cloop.sock_recv(conn, fieldlen)
try:
authname = httpapi.get_user_for_session(sessionid, sessiontok)
except Exception:
return
else:
return
conn.recv(1) # should be 0xff
await cloop.sock_recv(conn, 1) # should be 0xff
if authname:
allow = auth.authorize(authname, f'/nodes/{nodename}/console/ikvm')
if allow:
send_grant(conn, nodename)
await send_grant(conn, nodename)
finally:
conn.close()
def monitor_requests():
async def monitor_requests():
cloop = asyncio.get_event_loop()
a = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
os.remove('/var/run/confluent/vinz/approval')
@@ -192,25 +202,26 @@ def monitor_requests():
os.chmod('/var/run/confluent/vinz/approval', 0o600)
a.listen(8)
while True:
conn, addr = a.accept()
eventlet.spawn_n(evaluate_request, conn)
conn, addr = await cloop.sock_accept(a)
util.spawn(evaluate_request(conn))
def request_session(nodename):
async def request_session(nodename):
assure_vinz()
cloop = asyncio.get_event_loop()
a = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
a.connect('/var/run/confluent/vinz/control')
nodename = nodename.encode()
a.send(struct.pack('!BI', 1, len(nodename)))
a.send(nodename)
a.send(b'\xff')
rsp = a.recv(1)
await cloop.sock_send(a, struct.pack('!BI', 1, len(nodename)))
await cloop.sock_send(a, nodename)
await cloop.sock_send(a, b'\xff')
rsp = await cloop.sock_recv(a, 1)
retcode = struct.unpack('!B', rsp)[0]
if retcode != 1:
raise Exception("Bad return code")
rsp = a.recv(4)
rsp = await cloop.sock_recv(a, 4)
nlen = struct.unpack('!I', rsp)[0]
sockname = a.recv(nlen).decode('utf8')
retcode = a.recv(1)
sockname = await cloop.sock_recv(a, nlen).decode('utf8')
retcode = await cloop.sock_recv(a, 1)
if retcode != b'\xff':
raise Exception("Unrecognized response")
return os.path.join('/var/run/confluent/vinz/sessions', sockname)