diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index 114ea25c..9bdfd700 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -329,7 +329,7 @@ class ConsoleHandler(object): async def _connect_backend(self): if self._console: - self._console.close() + await self._console.close() self._console = None self.connectstate = 'connecting' await self._send_rcpts({'connectstate': self.connectstate}) @@ -499,13 +499,13 @@ class ConsoleHandler(object): self._disconnect() - def reopen(self): - self._got_disconnected() + async def reopen(self): + await self._got_disconnected() async def _handle_console_output(self, data): if type(data) == int: if data == conapi.ConsoleEvent.Disconnect: - self._got_disconnected() + await self._got_disconnected() return elif data in (b'', u''): # ignore empty strings from a cconsole provider @@ -816,7 +816,7 @@ class ConsoleSession(object): Returns False if no data buffered yet""" return self.conshdl.get_buffer_age() - def reopen(self): + async def reopen(self): """Reopen the session This can be useful if there is suspicion that the remote console is @@ -825,7 +825,7 @@ class ConsoleSession(object): automatically detecting an unusable console in the underlying technology that cannot be unambiguously autodetected. """ - self.conshdl.reopen() + await self.conshdl.reopen() async def destroy(self): if self.registered: diff --git a/confluent_server/confluent/plugins/console/openbmc.py b/confluent_server/confluent/plugins/console/openbmc.py index 519ca2d4..db1ca309 100644 --- a/confluent_server/confluent/plugins/console/openbmc.py +++ b/confluent_server/confluent/plugins/console/openbmc.py @@ -23,16 +23,24 @@ import confluent.exceptions as cexc import confluent.interface.console as conapi import confluent.log as log import confluent.util as util -import pyghmi.exceptions as pygexc -import pyghmi.redfish.command as rcmd -import pyghmi.util.webclient as webclient -import eventlet -import eventlet.green.ssl as ssl -try: - websocket = eventlet.import_patched('websocket') - wso = websocket.WebSocket -except Exception: - wso = object +import aiohmi.exceptions as pygexc +import aiohmi.redfish.command as rcmd +import aiohmi.util.webclient as webclient +import aiohttp + +class CustomVerifier(aiohttp.Fingerprint): + def __init__(self, verifycallback): + self._certverify = verifycallback + + def check(self, transport): + sslobj = transport.get_extra_info("ssl_object") + cert = sslobj.getpeercert(binary_form=True) + if not self._certverify(cert): + transport.close() + raise pygexc.UnrecognizedCertificate('Unknown certificate', + cert) + + def get_conn_params(node, configdata): if 'secret.hardwaremanagementuser' in configdata: @@ -56,54 +64,11 @@ def get_conn_params(node, configdata): _configattributes = ('secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword', 'hardwaremanagement.manager') - -class WrappedWebSocket(wso): - - def set_verify_callback(self, callback): - self._certverify = callback - - def connect(self, url, **options): - add_tls = url.startswith('wss://') - if add_tls: - hostname, port, resource, _ = websocket._url.parse_url(url) - if hostname[0] != '[' and ':' in hostname: - hostname = '[{0}]'.format(hostname) - if resource[0] != '/': - resource = '/{0}'.format(resource) - url = 'ws://{0}:443{1}'.format(hostname,resource) - else: - return super(WrappedWebSocket, self).connect(url, **options) - self.sock_opt.timeout = options.get('timeout', self.sock_opt.timeout) - self.sock, addrs = websocket._http.connect(url, self.sock_opt, websocket._http.proxy_info(**options), - options.pop('socket', None)) - self.sock = ssl.wrap_socket(self.sock, cert_reqs=ssl.CERT_NONE) - # The above is supersedeed by the _certverify, which provides - # known-hosts style cert validaiton - bincert = self.sock.getpeercert(binary_form=True) - if not self._certverify(bincert): - raise pygexc.UnrecognizedCertificate('Unknown certificate', bincert) - try: - self.handshake_response = websocket._handshake.handshake(self.sock, *addrs, **options) - if self.handshake_response.status in websocket._handshake.SUPPORTED_REDIRECT_STATUSES: - options['redirect_limit'] = options.pop('redirect_limit', 3) - 1 - if options['redirect_limit'] < 0: - raise Exception('Redirect limit hit') - url = self.handshake_response.headers['location'] - self.sock.close() - return self.connect(url, **options) - self.connected = True - except: - if self.sock: - self.sock.close() - self.sock = None - raise - - -class TsmConsole(conapi.Console): +class OpenBmcConsole(conapi.Console): def __init__(self, node, config): self.node = node @@ -121,21 +86,27 @@ class TsmConsole(conapi.Console): self.connected = False - def recvdata(self): + async def recvdata(self): while self.connected: - pendingdata = self.ws.recv() - if pendingdata == '': + pendingdata = await self.ws.receive() + if pendingdata.type == aiohttp.WSMsgType.BINARY: + self.datacallback(pendingdata.data) + continue + elif pendingdata.type == aiohttp.WSMsgType.CLOSE: self.datacallback(conapi.ConsoleEvent.Disconnect) return - self.datacallback(pendingdata) + else: + print("Unknown response in WSConsoleHandler") - def connect(self, callback): + + async def connect(self, callback): self.datacallback = callback kv = util.TLSCertVerifier( self.nodeconfig, self.node, 'pubkeys.tls_hardwaremanager').verify_cert - wc = webclient.SecureHTTPConnection(self.origbmc, 443, verifycallback=kv) + + wc = webclient.WebConnection(self.origbmc, 443, verifycallback=kv) try: - rsp = wc.grab_json_response_with_status('/login', {'data': [self.username.decode('utf8'), self.password.decode("utf8")]}, headers={'Content-Type': 'application/json', 'Accept': 'application/json'}) + rsp = await wc.grab_json_response_with_status('/login', {'data': [self.username.decode('utf8'), self.password.decode("utf8")]}, headers={'Content-Type': 'application/json', 'Accept': 'application/json'}) except Exception as e: raise cexc.TargetEndpointUnreachable(str(e)) if rsp[1] > 400: @@ -144,22 +115,27 @@ class TsmConsole(conapi.Console): if '%' in self.bmc: prefix = self.bmc.split('%')[0] bmc = prefix + ']' - self.ws = WrappedWebSocket(host=bmc) - self.ws.set_verify_callback(kv) - self.ws.connect('wss://{0}/console0'.format(self.bmc), host=bmc, cookie='XSRF-TOKEN={0}; SESSION={1}'.format(wc.cookies['XSRF-TOKEN'], wc.cookies['SESSION']), subprotocols=[wc.cookies['XSRF-TOKEN']]) + self.ssl = CustomVerifier(kv) + self.clisess = aiohttp.ClientSession(cookie_jar=wc.cookies) + protos = [] + for ck in wc.cookies: + if ck.key == 'XSRF-TOKEN': + protos = [ck.value] + self.ws = await self.clisess.ws_connect('wss://{0}/console0'.format(self.bmc), protocols=protos, ssl=self.ssl) + #self.ws.connect('wss://{0}/console0'.format(self.bmc), host=bmc, cookie='XSRF-TOKEN={0}; SESSION={1}'.format(wc.cookies['XSRF-TOKEN'], wc.cookies['SESSION']), subprotocols=[wc.cookies['XSRF-TOKEN']]) self.connected = True - eventlet.spawn_n(self.recvdata) + util.spawn(self.recvdata()) return - def write(self, data): - self.ws.send(data) + async def write(self, data): + await self.ws.send_str(data.decode()) - def close(self): + async def close(self): if self.ws: - self.ws.close() + await self.ws.close() self.connected = False self.datacallback = None -def create(nodes, element, configmanager, inputdata): +async def create(nodes, element, configmanager, inputdata): if len(nodes) == 1: - return TsmConsole(nodes[0], configmanager) + yield OpenBmcConsole(nodes[0], configmanager) diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index b1a70753..0c84461a 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -328,7 +328,7 @@ async def term_interact(authdata, authname, ccons, cfm, connection, consession, consession.send_break() continue elif data['operation'] == 'reopen': - consession.reopen() + await consession.reopen() continue elif data['operation'] == 'pause': ccons.xmit = False