2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-14 12:51:27 +00:00

Fix proxmox console for async operation

This commit is contained in:
Jarrod Johnson
2026-02-26 10:49:33 -05:00
parent 61c063adf4
commit e6bcf3cf9a

View File

@@ -3,18 +3,26 @@ import asyncio
import confluent.vinzmanager as vinzmanager
import confluent.util as util
import confluent.messages as msg
import confluent.tasks as tasks
import aiohmi.util.webclient as webclient
import aiohmi.exceptions as pygexc
import confluent.interface.console as conapi
import io
import urllib.parse as urlparse
import eventlet
import aiohttp
class CustomVerifier(aiohttp.Fingerprint):
def __init__(self, verifycallback):
self._certverify = verifycallback
def check(self, transport):
sslobj = transport.get_extra_info("ssl_object")
cert = sslobj.getpeercert(binary_form=True)
if not self._certverify(cert):
transport.close()
raise pygexc.UnrecognizedCertificate('Unknown certificate',
cert)
try:
websocket = eventlet.import_patched('websocket')
wso = websocket.WebSocket
except Exception:
wso = object
class RetainedIO(io.BytesIO):
# Need to retain buffer after close
@@ -60,55 +68,10 @@ class KvmConnHandler:
consdata['fprint'] = self.pmxclient.fprint
return KvmConnection(consdata)
class WrappedWebSocket(wso):
def set_verify_callback(self, callback):
self._certverify = callback
def connect(self, url, **options):
add_tls = url.startswith('wss://')
if add_tls:
hostname, port, resource, _ = websocket._url.parse_url(url)
if hostname[0] != '[' and ':' in hostname:
hostname = '[{0}]'.format(hostname)
if resource[0] != '/':
resource = '/{0}'.format(resource)
url = 'ws://{0}:8006{1}'.format(hostname,resource)
else:
return super(WrappedWebSocket, self).connect(url, **options)
self.sock_opt.timeout = options.get('timeout', self.sock_opt.timeout)
self.sock, addrs = websocket._http.connect(url, self.sock_opt, websocket._http.proxy_info(**options),
options.pop('socket', None))
self.sock = ssl.wrap_socket(self.sock, cert_reqs=ssl.CERT_NONE)
# The above is supersedeed by the _certverify, which provides
# known-hosts style cert validaiton
bincert = self.sock.getpeercert(binary_form=True)
if not self._certverify(bincert):
raise pygexc.UnrecognizedCertificate('Unknown certificate', bincert)
try:
try:
self.handshake_response = websocket._handshake.handshake(self.sock, *addrs, **options)
except TypeError:
self.handshake_response = websocket._handshake.handshake(self.sock, url, *addrs, **options)
if self.handshake_response.status in websocket._handshake.SUPPORTED_REDIRECT_STATUSES:
options['redirect_limit'] = options.pop('redirect_limit', 3) - 1
if options['redirect_limit'] < 0:
raise Exception('Redirect limit hit')
url = self.handshake_response.headers['location']
self.sock.close()
return self.connect(url, **options)
self.connected = True
except:
if self.sock:
self.sock.close()
self.sock = None
raise
class PmxConsole(conapi.Console):
def __init__(self, consdata, node, configmanager, apiclient):
self.ws = None
self.clisess = None
self.consdata = consdata
self.nodeconfig = configmanager
self.connected = False
@@ -117,20 +80,27 @@ class PmxConsole(conapi.Console):
self.recvr = None
self.apiclient = apiclient
def recvdata(self):
while self.connected:
try:
pendingdata = self.ws.recv()
except websocket.WebSocketConnectionClosedException:
pendingdata = ''
if pendingdata == '':
self.datacallback(conapi.ConsoleEvent.Disconnect)
return
self.datacallback(pendingdata)
async def recvdata(self):
try:
while self.connected:
pendingdata = await self.ws.receive()
if pendingdata.type == aiohttp.WSMsgType.BINARY:
await self.datacallback(pendingdata.data)
continue
elif pendingdata.type == aiohttp.WSMsgType.TEXT:
await self.datacallback(pendingdata.data.encode())
continue
elif pendingdata.type == aiohttp.WSMsgType.CLOSE:
await self.datacallback(conapi.ConsoleEvent.Disconnect)
return
else:
print("Unknown response in PmxConsole WSHandler")
except asyncio.CancelledError:
pass
def connect(self, callback):
if self.apiclient.get_vm_power(self.node) != 'on':
callback(conapi.ConsoleEvent.Disconnect)
async def connect(self, callback):
if await self.apiclient.get_vm_power(self.node) != 'on':
await callback(conapi.ConsoleEvent.Disconnect)
return
# socket = new WebSocket(socketURL, 'binary'); - subprotocol binary
# client handshake is:
@@ -147,8 +117,7 @@ class PmxConsole(conapi.Console):
if '%' in self.bmc:
prefix = self.bmc.split('%')[0]
bmc = prefix + ']'
self.ws = WrappedWebSocket(host=bmc)
self.ws.set_verify_callback(kv)
self.ssl = CustomVerifier(kv)
ticket = self.consdata['ticket']
user = self.consdata['user']
port = self.consdata['port']
@@ -157,33 +126,38 @@ class PmxConsole(conapi.Console):
guest = self.consdata['guest']
pac = self.consdata['pac'] # fortunately, we terminate this on our end, but it does kind of reduce the value of the
# 'ticket' approach, as the general cookie must be provided as cookie along with the VNC ticket
self.ws.connect(f'wss://{self.bmc}:8006/api2/json/nodes/{host}/{guest}/vncwebsocket?port={port}&vncticket={urlticket}',
host=bmc, cookie=f'PVEAuthCookie={pac}', # cookie='XSRF-TOKEN={0}; SESSION={1}'.format(wc.cookies['XSRF-TOKEN'], wc.cookies['SESSION']),
subprotocols=['binary'])
self.ws.send(f'{user}:{ticket}\n')
data = self.ws.recv()
if data == b'OK':
self.ws.recv() # swallow the 'starting serial terminal' message
cookies = aiohttp.CookieJar(unsafe=True, quote_cookie=False)
cookies.update_cookies({'PVEAuthCookie': pac})
self.clisess = aiohttp.ClientSession(cookie_jar=cookies)
self.ws = await self.clisess.ws_connect(
f'wss://{self.bmc}:8006/api2/json/nodes/{host}/{guest}/vncwebsocket?port={port}&vncticket={urlticket}',
protocols=['binary'], ssl=self.ssl)
await self.ws.send_str(f'{user}:{ticket}\n')
data = await self.ws.receive()
if data.data == b'OK' or data.data == 'OK':
await self.ws.receive() # swallow the 'starting serial terminal' message
self.connected = True
self.recvr = eventlet.spawn(self.recvdata)
self.recvr = tasks.spawn_task(self.recvdata())
else:
print(repr(data))
print(repr(data.data))
return
def write(self, data):
async def write(self, data):
try:
dlen = str(len(data))
data = data.decode()
self.ws.send('0:' + dlen + ':' + data)
except websocket.WebSocketConnectionClosedException:
self.datacallback(conapi.ConsoleEvent.Disconnect)
await self.ws.send_str('0:' + dlen + ':' + data)
except Exception:
await self.datacallback(conapi.ConsoleEvent.Disconnect)
def close(self):
async def close(self):
if self.recvr:
self.recvr.kill()
self.recvr.cancel()
self.recvr = None
if self.ws:
self.ws.close()
await self.ws.close()
if self.clisess:
await self.clisess.close()
self.connected = False
self.datacallback = None