diff --git a/confluent_server/confluent/plugins/hardwaremanagement/proxmox.py b/confluent_server/confluent/plugins/hardwaremanagement/proxmox.py index 1f35e71d..9a983faf 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/proxmox.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/proxmox.py @@ -3,18 +3,26 @@ import asyncio import confluent.vinzmanager as vinzmanager import confluent.util as util import confluent.messages as msg +import confluent.tasks as tasks import aiohmi.util.webclient as webclient +import aiohmi.exceptions as pygexc import confluent.interface.console as conapi import io import urllib.parse as urlparse -import eventlet +import aiohttp +class CustomVerifier(aiohttp.Fingerprint): + def __init__(self, verifycallback): + self._certverify = verifycallback + + def check(self, transport): + sslobj = transport.get_extra_info("ssl_object") + cert = sslobj.getpeercert(binary_form=True) + if not self._certverify(cert): + transport.close() + raise pygexc.UnrecognizedCertificate('Unknown certificate', + cert) -try: - websocket = eventlet.import_patched('websocket') - wso = websocket.WebSocket -except Exception: - wso = object class RetainedIO(io.BytesIO): # Need to retain buffer after close @@ -60,55 +68,10 @@ class KvmConnHandler: consdata['fprint'] = self.pmxclient.fprint return KvmConnection(consdata) -class WrappedWebSocket(wso): - - def set_verify_callback(self, callback): - self._certverify = callback - - def connect(self, url, **options): - - add_tls = url.startswith('wss://') - if add_tls: - hostname, port, resource, _ = websocket._url.parse_url(url) - if hostname[0] != '[' and ':' in hostname: - hostname = '[{0}]'.format(hostname) - if resource[0] != '/': - resource = '/{0}'.format(resource) - url = 'ws://{0}:8006{1}'.format(hostname,resource) - else: - return super(WrappedWebSocket, self).connect(url, **options) - self.sock_opt.timeout = options.get('timeout', self.sock_opt.timeout) - self.sock, addrs = websocket._http.connect(url, self.sock_opt, websocket._http.proxy_info(**options), - options.pop('socket', None)) - self.sock = ssl.wrap_socket(self.sock, cert_reqs=ssl.CERT_NONE) - # The above is supersedeed by the _certverify, which provides - # known-hosts style cert validaiton - bincert = self.sock.getpeercert(binary_form=True) - if not self._certverify(bincert): - raise pygexc.UnrecognizedCertificate('Unknown certificate', bincert) - try: - try: - self.handshake_response = websocket._handshake.handshake(self.sock, *addrs, **options) - except TypeError: - self.handshake_response = websocket._handshake.handshake(self.sock, url, *addrs, **options) - if self.handshake_response.status in websocket._handshake.SUPPORTED_REDIRECT_STATUSES: - options['redirect_limit'] = options.pop('redirect_limit', 3) - 1 - if options['redirect_limit'] < 0: - raise Exception('Redirect limit hit') - url = self.handshake_response.headers['location'] - self.sock.close() - return self.connect(url, **options) - self.connected = True - except: - if self.sock: - self.sock.close() - self.sock = None - raise - - class PmxConsole(conapi.Console): def __init__(self, consdata, node, configmanager, apiclient): self.ws = None + self.clisess = None self.consdata = consdata self.nodeconfig = configmanager self.connected = False @@ -117,20 +80,27 @@ class PmxConsole(conapi.Console): self.recvr = None self.apiclient = apiclient - def recvdata(self): - while self.connected: - try: - pendingdata = self.ws.recv() - except websocket.WebSocketConnectionClosedException: - pendingdata = '' - if pendingdata == '': - self.datacallback(conapi.ConsoleEvent.Disconnect) - return - self.datacallback(pendingdata) + async def recvdata(self): + try: + while self.connected: + pendingdata = await self.ws.receive() + if pendingdata.type == aiohttp.WSMsgType.BINARY: + await self.datacallback(pendingdata.data) + continue + elif pendingdata.type == aiohttp.WSMsgType.TEXT: + await self.datacallback(pendingdata.data.encode()) + continue + elif pendingdata.type == aiohttp.WSMsgType.CLOSE: + await self.datacallback(conapi.ConsoleEvent.Disconnect) + return + else: + print("Unknown response in PmxConsole WSHandler") + except asyncio.CancelledError: + pass - def connect(self, callback): - if self.apiclient.get_vm_power(self.node) != 'on': - callback(conapi.ConsoleEvent.Disconnect) + async def connect(self, callback): + if await self.apiclient.get_vm_power(self.node) != 'on': + await callback(conapi.ConsoleEvent.Disconnect) return # socket = new WebSocket(socketURL, 'binary'); - subprotocol binary # client handshake is: @@ -147,8 +117,7 @@ class PmxConsole(conapi.Console): if '%' in self.bmc: prefix = self.bmc.split('%')[0] bmc = prefix + ']' - self.ws = WrappedWebSocket(host=bmc) - self.ws.set_verify_callback(kv) + self.ssl = CustomVerifier(kv) ticket = self.consdata['ticket'] user = self.consdata['user'] port = self.consdata['port'] @@ -157,33 +126,38 @@ class PmxConsole(conapi.Console): guest = self.consdata['guest'] pac = self.consdata['pac'] # fortunately, we terminate this on our end, but it does kind of reduce the value of the # 'ticket' approach, as the general cookie must be provided as cookie along with the VNC ticket - self.ws.connect(f'wss://{self.bmc}:8006/api2/json/nodes/{host}/{guest}/vncwebsocket?port={port}&vncticket={urlticket}', - host=bmc, cookie=f'PVEAuthCookie={pac}', # cookie='XSRF-TOKEN={0}; SESSION={1}'.format(wc.cookies['XSRF-TOKEN'], wc.cookies['SESSION']), - subprotocols=['binary']) - self.ws.send(f'{user}:{ticket}\n') - data = self.ws.recv() - if data == b'OK': - self.ws.recv() # swallow the 'starting serial terminal' message + cookies = aiohttp.CookieJar(unsafe=True, quote_cookie=False) + cookies.update_cookies({'PVEAuthCookie': pac}) + self.clisess = aiohttp.ClientSession(cookie_jar=cookies) + self.ws = await self.clisess.ws_connect( + f'wss://{self.bmc}:8006/api2/json/nodes/{host}/{guest}/vncwebsocket?port={port}&vncticket={urlticket}', + protocols=['binary'], ssl=self.ssl) + await self.ws.send_str(f'{user}:{ticket}\n') + data = await self.ws.receive() + if data.data == b'OK' or data.data == 'OK': + await self.ws.receive() # swallow the 'starting serial terminal' message self.connected = True - self.recvr = eventlet.spawn(self.recvdata) + self.recvr = tasks.spawn_task(self.recvdata()) else: - print(repr(data)) + print(repr(data.data)) return - def write(self, data): + async def write(self, data): try: dlen = str(len(data)) data = data.decode() - self.ws.send('0:' + dlen + ':' + data) - except websocket.WebSocketConnectionClosedException: - self.datacallback(conapi.ConsoleEvent.Disconnect) + await self.ws.send_str('0:' + dlen + ':' + data) + except Exception: + await self.datacallback(conapi.ConsoleEvent.Disconnect) - def close(self): + async def close(self): if self.recvr: - self.recvr.kill() + self.recvr.cancel() self.recvr = None if self.ws: - self.ws.close() + await self.ws.close() + if self.clisess: + await self.clisess.close() self.connected = False self.datacallback = None