mirror of
https://github.com/xcat2/confluent.git
synced 2026-03-26 12:13:30 +00:00
Restore VNC console handling to async branch
This commit is contained in:
@@ -63,8 +63,8 @@ class KvmConnHandler:
|
||||
self.pmxclient = pmxclient
|
||||
self.node = node
|
||||
|
||||
def connect(self):
|
||||
consdata = self.pmxclient.get_vm_ikvm(self.node)
|
||||
async def connect(self):
|
||||
consdata = await self.pmxclient.get_vm_ikvm(self.node)
|
||||
consdata['fprint'] = self.pmxclient.fprint
|
||||
return KvmConnection(consdata)
|
||||
|
||||
@@ -404,7 +404,7 @@ async def update(nodes, element, configmanager, inputdata):
|
||||
elif element == ['console', 'ikvm']:
|
||||
try:
|
||||
currclient = clientsbynode[node]
|
||||
url = vinzmanager.get_url(node, inputdata, nodeparmcallback=KvmConnHandler(currclient, node).connect)
|
||||
url = await vinzmanager.get_url(node, inputdata, nodeparmcallback=KvmConnHandler(currclient, node).connect)
|
||||
except Exception as e:
|
||||
print(repr(e))
|
||||
return
|
||||
@@ -418,7 +418,7 @@ async def create(nodes, element, configmanager, inputdata):
|
||||
if element == ['console', 'ikvm']:
|
||||
try:
|
||||
currclient = clientsbynode[node]
|
||||
url = vinzmanager.get_url(node, inputdata, nodeparmcallback=KvmConnHandler(currclient, node).connect)
|
||||
url = await vinzmanager.get_url(node, inputdata, nodeparmcallback=KvmConnHandler(currclient, node).connect)
|
||||
except Exception as e:
|
||||
print(repr(e))
|
||||
return
|
||||
|
||||
@@ -3,6 +3,7 @@ import asyncio
|
||||
import confluent.auth as auth
|
||||
import confluent.messages as msg
|
||||
import confluent.exceptions as exc
|
||||
import confluent.tasks as tasks
|
||||
import confluent.util as util
|
||||
import confluent.config.configmanager as configmanager
|
||||
import struct
|
||||
@@ -43,7 +44,7 @@ async def assure_vinz():
|
||||
'-d', '/var/run/confluent/vinz/sessions')
|
||||
while not os.path.exists('/var/run/confluent/vinz/control'):
|
||||
await asyncio.sleep(0.5)
|
||||
util.spawn(monitor_requests)
|
||||
tasks.spawn_task(monitor_requests())
|
||||
finally:
|
||||
startingup = False
|
||||
|
||||
@@ -52,7 +53,7 @@ _nodeparms = {}
|
||||
async def get_url(nodename, inputdata, nodeparmcallback=None):
|
||||
_nodeparms[nodename] = nodeparmcallback
|
||||
method = inputdata.inputbynode[nodename]
|
||||
assure_vinz()
|
||||
await assure_vinz()
|
||||
if method == 'wss':
|
||||
return f'/vinz/kvmsession/{nodename}'
|
||||
elif method == 'unix':
|
||||
@@ -62,7 +63,7 @@ async def get_url(nodename, inputdata, nodeparmcallback=None):
|
||||
|
||||
|
||||
_usersessions = {}
|
||||
def close_session(sessionid):
|
||||
async def close_session(sessionid):
|
||||
sessioninfo = _usersessions.get(sessionid, None)
|
||||
if not sessioninfo:
|
||||
return
|
||||
@@ -84,7 +85,7 @@ def close_session(sessionid):
|
||||
if not isinstance(bmcpass, str):
|
||||
bmcpass = bmcpass.decode()
|
||||
if bmcuser and bmcpass:
|
||||
wc.grab_json_response_with_status(
|
||||
await wc.grab_json_response_with_status(
|
||||
'/logout', {'data': [bmcuser, bmcpass]},
|
||||
headers={
|
||||
'Content-Type': 'application/json',
|
||||
@@ -104,7 +105,7 @@ async def send_grant(conn, nodename, rqtype):
|
||||
if parmcallback: # plugin that handles the specifics of the vnc wrapping
|
||||
if rqtype == 1:
|
||||
raise Exception("Plugin managed login data not supported with legacy grant request")
|
||||
cxnmgr = parmcallback()
|
||||
cxnmgr = await parmcallback()
|
||||
_usersessions[sessionid] = {
|
||||
'cxnmgr': cxnmgr,
|
||||
'nodename': nodename,
|
||||
@@ -168,43 +169,44 @@ async def send_grant(conn, nodename, rqtype):
|
||||
if '$' in fprint:
|
||||
fprint = fprint.split('$', 1)[1]
|
||||
fprint = bytes.fromhex(fprint)
|
||||
await cloop.sock_send(conn, struct.pack('!BI', rqtype, len(host)))
|
||||
await cloop.sock_send(conn, host.encode())
|
||||
await cloop.sock_send(conn, struct.pack('!I', len(sessionid)))
|
||||
await cloop.sock_send(conn, sessionid.encode())
|
||||
await cloop.sock_sendall(conn, struct.pack('!BI', rqtype, len(host)))
|
||||
await cloop.sock_sendall(conn, host.encode())
|
||||
await cloop.sock_sendall(conn, struct.pack('!I', len(sessionid)))
|
||||
await cloop.sock_sendall(conn, sessionid.encode())
|
||||
if rqtype == 1:
|
||||
await cloop.sock_send(conn, struct.pack('!I', len(sessiontok)))
|
||||
await cloop.sock_send(conn, sessiontok.encode())
|
||||
await cloop.sock_send(conn, struct.pack('!I', len(fprint)))
|
||||
await cloop.sock_send(conn, fprint)
|
||||
await cloop.sock_send(conn, struct.pack('!I', len(url)))
|
||||
await cloop.sock_send(conn, url.encode())
|
||||
await cloop.sock_sendall(conn, struct.pack('!I', len(sessiontok)))
|
||||
await cloop.sock_sendall(conn, sessiontok.encode())
|
||||
await cloop.sock_sendall(conn, struct.pack('!I', len(fprint)))
|
||||
await cloop.sock_sendall(conn, fprint)
|
||||
await cloop.sock_sendall(conn, struct.pack('!I', len(url)))
|
||||
await cloop.sock_sendall(conn, url.encode())
|
||||
else: # newer TLV style protocol
|
||||
await cloop.sock_send(conn, struct.pack('!H', portnum))
|
||||
await cloop.sock_send(conn, struct.pack('!BI', 4, len(url)))
|
||||
await cloop.sock_send(conn, url.encode())
|
||||
await cloop.sock_sendall(conn, struct.pack('!H', portnum))
|
||||
await cloop.sock_sendall(conn, struct.pack('!BI', 4, len(url)))
|
||||
await cloop.sock_sendall(conn, url.encode())
|
||||
for cook in cookies:
|
||||
v = cookies[cook]
|
||||
totlen = len(cook) + len(v) + 4
|
||||
await cloop.sock_send(conn, struct.pack('!BIH', 1, totlen, len(cook.encode())))
|
||||
await cloop.sock_send(conn, cook.encode())
|
||||
await cloop.sock_send(conn, struct.pack('!H', len(v.encode())))
|
||||
await cloop.sock_send(conn, v.encode())
|
||||
await cloop.sock_sendall(conn, struct.pack('!BIH', 1, totlen, len(cook.encode())))
|
||||
await cloop.sock_sendall(conn, cook.encode())
|
||||
await cloop.sock_sendall(conn, struct.pack('!H', len(v.encode())))
|
||||
await cloop.sock_sendall(conn, v.encode())
|
||||
for proto in protos:
|
||||
await cloop.sock_send(conn, struct.pack('!BI', 2, len(proto.encode())))
|
||||
await cloop.sock_send(conn, proto.encode())
|
||||
await cloop.sock_send(conn, struct.pack('!BI', 3, len(fprint)))
|
||||
await cloop.sock_send(conn, fprint)
|
||||
await cloop.sock_sendall(conn, struct.pack('!BI', 2, len(proto.encode())))
|
||||
await cloop.sock_sendall(conn, proto.encode())
|
||||
await cloop.sock_sendall(conn, struct.pack('!BI', 3, len(fprint)))
|
||||
await cloop.sock_sendall(conn, fprint)
|
||||
if passwd:
|
||||
await cloop.sock_send(conn, struct.pack('!BI', 5, len(passwd.encode()[:8])))
|
||||
await cloop.sock_send(conn, passwd.encode()[:8])
|
||||
await cloop.sock_send(conn, b'\xff')
|
||||
await cloop.sock_sendall(conn, struct.pack('!BI', 5, len(passwd.encode()[:8])))
|
||||
await cloop.sock_sendall(conn, passwd.encode()[:8])
|
||||
await cloop.sock_sendall(conn, b'\xff')
|
||||
|
||||
def recv_exact(conn, n):
|
||||
async def recv_exact(conn, n):
|
||||
#TODO:asyncmerge: review recv_exact usage
|
||||
cloop = asyncio.get_event_loop()
|
||||
retdata = b''
|
||||
while len(retdata) < n:
|
||||
currdata = conn.recv(n - len(retdata))
|
||||
currdata = await cloop.sock_recv(conn, n - len(retdata))
|
||||
if not currdata:
|
||||
raise Exception("Error receiving")
|
||||
retdata += currdata
|
||||
@@ -220,15 +222,15 @@ async def evaluate_request(conn):
|
||||
pid, uid, gid = struct.unpack('iII', creds)
|
||||
if uid != os.getuid():
|
||||
return
|
||||
rqcode, fieldlen = struct.unpack('!BI', recv_exact(conn, 5))
|
||||
authtoken = recv_exact(conn, fieldlen).decode()
|
||||
rqcode, fieldlen = struct.unpack('!BI', await recv_exact(conn, 5))
|
||||
authtoken = (await recv_exact(conn, fieldlen)).decode()
|
||||
if authtoken != _vinztoken:
|
||||
return
|
||||
if rqcode == 2: # disconnect notification
|
||||
fieldlen = struct.unpack('!I', recv_exact(conn, 4))[0]
|
||||
sessionid = recv_exact(conn, fieldlen).decode()
|
||||
close_session(sessionid)
|
||||
conn.recv(1) # digest 0xff
|
||||
fieldlen = struct.unpack('!I', await recv_exact(conn, 4))[0]
|
||||
sessionid = (await recv_exact(conn, fieldlen)).decode()
|
||||
await close_session(sessionid)
|
||||
await cloop.sock_recv(conn, 1) # digest 0xff
|
||||
# if rqcode == 3: # new form connection request
|
||||
# this will generalize things, to allow describing
|
||||
# arbitrary cookies and subprotocols
|
||||
@@ -281,26 +283,28 @@ async def monitor_requests():
|
||||
a.bind('/var/run/confluent/vinz/approval')
|
||||
os.chmod('/var/run/confluent/vinz/approval', 0o600)
|
||||
a.listen(8)
|
||||
a.setblocking(0)
|
||||
while True:
|
||||
conn, addr = await cloop.sock_accept(a)
|
||||
util.spawn(evaluate_request(conn))
|
||||
tasks.spawn_task(evaluate_request(conn))
|
||||
|
||||
async def request_session(nodename):
|
||||
assure_vinz()
|
||||
await assure_vinz()
|
||||
cloop = asyncio.get_event_loop()
|
||||
a = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
a.connect('/var/run/confluent/vinz/control')
|
||||
a.setblocking(0)
|
||||
await cloop.sock_connect(a, '/var/run/confluent/vinz/control')
|
||||
nodename = nodename.encode()
|
||||
await cloop.sock_send(a, struct.pack('!BI', 1, len(nodename)))
|
||||
await cloop.sock_send(a, nodename)
|
||||
await cloop.sock_send(a, b'\xff')
|
||||
await cloop.sock_sendall(a, struct.pack('!BI', 1, len(nodename)))
|
||||
await cloop.sock_sendall(a, nodename)
|
||||
await cloop.sock_sendall(a, b'\xff')
|
||||
rsp = await cloop.sock_recv(a, 1)
|
||||
retcode = struct.unpack('!B', rsp)[0]
|
||||
if retcode != 1:
|
||||
raise Exception("Bad return code")
|
||||
rsp = await cloop.sock_recv(a, 4)
|
||||
nlen = struct.unpack('!I', rsp)[0]
|
||||
sockname = await cloop.sock_recv(a, nlen).decode('utf8')
|
||||
sockname = (await cloop.sock_recv(a, nlen)).decode('utf8')
|
||||
retcode = await cloop.sock_recv(a, 1)
|
||||
if retcode != b'\xff':
|
||||
raise Exception("Unrecognized response")
|
||||
|
||||
Reference in New Issue
Block a user