mirror of
https://github.com/xcat2/confluent.git
synced 2026-02-19 14:14:26 +00:00
Have OpenBMC work with async changes
This commit is contained in:
@@ -329,7 +329,7 @@ class ConsoleHandler(object):
|
||||
|
||||
async def _connect_backend(self):
|
||||
if self._console:
|
||||
self._console.close()
|
||||
await self._console.close()
|
||||
self._console = None
|
||||
self.connectstate = 'connecting'
|
||||
await self._send_rcpts({'connectstate': self.connectstate})
|
||||
@@ -499,13 +499,13 @@ class ConsoleHandler(object):
|
||||
self._disconnect()
|
||||
|
||||
|
||||
def reopen(self):
|
||||
self._got_disconnected()
|
||||
async def reopen(self):
|
||||
await self._got_disconnected()
|
||||
|
||||
async def _handle_console_output(self, data):
|
||||
if type(data) == int:
|
||||
if data == conapi.ConsoleEvent.Disconnect:
|
||||
self._got_disconnected()
|
||||
await self._got_disconnected()
|
||||
return
|
||||
elif data in (b'', u''):
|
||||
# ignore empty strings from a cconsole provider
|
||||
@@ -816,7 +816,7 @@ class ConsoleSession(object):
|
||||
Returns False if no data buffered yet"""
|
||||
return self.conshdl.get_buffer_age()
|
||||
|
||||
def reopen(self):
|
||||
async def reopen(self):
|
||||
"""Reopen the session
|
||||
|
||||
This can be useful if there is suspicion that the remote console is
|
||||
@@ -825,7 +825,7 @@ class ConsoleSession(object):
|
||||
automatically detecting an unusable console in the underlying
|
||||
technology that cannot be unambiguously autodetected.
|
||||
"""
|
||||
self.conshdl.reopen()
|
||||
await self.conshdl.reopen()
|
||||
|
||||
async def destroy(self):
|
||||
if self.registered:
|
||||
|
||||
@@ -23,16 +23,24 @@ import confluent.exceptions as cexc
|
||||
import confluent.interface.console as conapi
|
||||
import confluent.log as log
|
||||
import confluent.util as util
|
||||
import pyghmi.exceptions as pygexc
|
||||
import pyghmi.redfish.command as rcmd
|
||||
import pyghmi.util.webclient as webclient
|
||||
import eventlet
|
||||
import eventlet.green.ssl as ssl
|
||||
try:
|
||||
websocket = eventlet.import_patched('websocket')
|
||||
wso = websocket.WebSocket
|
||||
except Exception:
|
||||
wso = object
|
||||
import aiohmi.exceptions as pygexc
|
||||
import aiohmi.redfish.command as rcmd
|
||||
import aiohmi.util.webclient as webclient
|
||||
import aiohttp
|
||||
|
||||
class CustomVerifier(aiohttp.Fingerprint):
|
||||
def __init__(self, verifycallback):
|
||||
self._certverify = verifycallback
|
||||
|
||||
def check(self, transport):
|
||||
sslobj = transport.get_extra_info("ssl_object")
|
||||
cert = sslobj.getpeercert(binary_form=True)
|
||||
if not self._certverify(cert):
|
||||
transport.close()
|
||||
raise pygexc.UnrecognizedCertificate('Unknown certificate',
|
||||
cert)
|
||||
|
||||
|
||||
|
||||
def get_conn_params(node, configdata):
|
||||
if 'secret.hardwaremanagementuser' in configdata:
|
||||
@@ -56,54 +64,11 @@ def get_conn_params(node, configdata):
|
||||
_configattributes = ('secret.hardwaremanagementuser',
|
||||
'secret.hardwaremanagementpassword',
|
||||
'hardwaremanagement.manager')
|
||||
|
||||
class WrappedWebSocket(wso):
|
||||
|
||||
def set_verify_callback(self, callback):
|
||||
self._certverify = callback
|
||||
|
||||
def connect(self, url, **options):
|
||||
add_tls = url.startswith('wss://')
|
||||
if add_tls:
|
||||
hostname, port, resource, _ = websocket._url.parse_url(url)
|
||||
if hostname[0] != '[' and ':' in hostname:
|
||||
hostname = '[{0}]'.format(hostname)
|
||||
if resource[0] != '/':
|
||||
resource = '/{0}'.format(resource)
|
||||
url = 'ws://{0}:443{1}'.format(hostname,resource)
|
||||
else:
|
||||
return super(WrappedWebSocket, self).connect(url, **options)
|
||||
self.sock_opt.timeout = options.get('timeout', self.sock_opt.timeout)
|
||||
self.sock, addrs = websocket._http.connect(url, self.sock_opt, websocket._http.proxy_info(**options),
|
||||
options.pop('socket', None))
|
||||
self.sock = ssl.wrap_socket(self.sock, cert_reqs=ssl.CERT_NONE)
|
||||
# The above is supersedeed by the _certverify, which provides
|
||||
# known-hosts style cert validaiton
|
||||
bincert = self.sock.getpeercert(binary_form=True)
|
||||
if not self._certverify(bincert):
|
||||
raise pygexc.UnrecognizedCertificate('Unknown certificate', bincert)
|
||||
try:
|
||||
self.handshake_response = websocket._handshake.handshake(self.sock, *addrs, **options)
|
||||
if self.handshake_response.status in websocket._handshake.SUPPORTED_REDIRECT_STATUSES:
|
||||
options['redirect_limit'] = options.pop('redirect_limit', 3) - 1
|
||||
if options['redirect_limit'] < 0:
|
||||
raise Exception('Redirect limit hit')
|
||||
url = self.handshake_response.headers['location']
|
||||
self.sock.close()
|
||||
return self.connect(url, **options)
|
||||
self.connected = True
|
||||
except:
|
||||
if self.sock:
|
||||
self.sock.close()
|
||||
self.sock = None
|
||||
raise
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class TsmConsole(conapi.Console):
|
||||
class OpenBmcConsole(conapi.Console):
|
||||
|
||||
def __init__(self, node, config):
|
||||
self.node = node
|
||||
@@ -121,21 +86,27 @@ class TsmConsole(conapi.Console):
|
||||
self.connected = False
|
||||
|
||||
|
||||
def recvdata(self):
|
||||
async def recvdata(self):
|
||||
while self.connected:
|
||||
pendingdata = self.ws.recv()
|
||||
if pendingdata == '':
|
||||
pendingdata = await self.ws.receive()
|
||||
if pendingdata.type == aiohttp.WSMsgType.BINARY:
|
||||
self.datacallback(pendingdata.data)
|
||||
continue
|
||||
elif pendingdata.type == aiohttp.WSMsgType.CLOSE:
|
||||
self.datacallback(conapi.ConsoleEvent.Disconnect)
|
||||
return
|
||||
self.datacallback(pendingdata)
|
||||
else:
|
||||
print("Unknown response in WSConsoleHandler")
|
||||
|
||||
def connect(self, callback):
|
||||
|
||||
async def connect(self, callback):
|
||||
self.datacallback = callback
|
||||
kv = util.TLSCertVerifier(
|
||||
self.nodeconfig, self.node, 'pubkeys.tls_hardwaremanager').verify_cert
|
||||
wc = webclient.SecureHTTPConnection(self.origbmc, 443, verifycallback=kv)
|
||||
|
||||
wc = webclient.WebConnection(self.origbmc, 443, verifycallback=kv)
|
||||
try:
|
||||
rsp = wc.grab_json_response_with_status('/login', {'data': [self.username.decode('utf8'), self.password.decode("utf8")]}, headers={'Content-Type': 'application/json', 'Accept': 'application/json'})
|
||||
rsp = await wc.grab_json_response_with_status('/login', {'data': [self.username.decode('utf8'), self.password.decode("utf8")]}, headers={'Content-Type': 'application/json', 'Accept': 'application/json'})
|
||||
except Exception as e:
|
||||
raise cexc.TargetEndpointUnreachable(str(e))
|
||||
if rsp[1] > 400:
|
||||
@@ -144,22 +115,27 @@ class TsmConsole(conapi.Console):
|
||||
if '%' in self.bmc:
|
||||
prefix = self.bmc.split('%')[0]
|
||||
bmc = prefix + ']'
|
||||
self.ws = WrappedWebSocket(host=bmc)
|
||||
self.ws.set_verify_callback(kv)
|
||||
self.ws.connect('wss://{0}/console0'.format(self.bmc), host=bmc, cookie='XSRF-TOKEN={0}; SESSION={1}'.format(wc.cookies['XSRF-TOKEN'], wc.cookies['SESSION']), subprotocols=[wc.cookies['XSRF-TOKEN']])
|
||||
self.ssl = CustomVerifier(kv)
|
||||
self.clisess = aiohttp.ClientSession(cookie_jar=wc.cookies)
|
||||
protos = []
|
||||
for ck in wc.cookies:
|
||||
if ck.key == 'XSRF-TOKEN':
|
||||
protos = [ck.value]
|
||||
self.ws = await self.clisess.ws_connect('wss://{0}/console0'.format(self.bmc), protocols=protos, ssl=self.ssl)
|
||||
#self.ws.connect('wss://{0}/console0'.format(self.bmc), host=bmc, cookie='XSRF-TOKEN={0}; SESSION={1}'.format(wc.cookies['XSRF-TOKEN'], wc.cookies['SESSION']), subprotocols=[wc.cookies['XSRF-TOKEN']])
|
||||
self.connected = True
|
||||
eventlet.spawn_n(self.recvdata)
|
||||
util.spawn(self.recvdata())
|
||||
return
|
||||
|
||||
def write(self, data):
|
||||
self.ws.send(data)
|
||||
async def write(self, data):
|
||||
await self.ws.send_str(data.decode())
|
||||
|
||||
def close(self):
|
||||
async def close(self):
|
||||
if self.ws:
|
||||
self.ws.close()
|
||||
await self.ws.close()
|
||||
self.connected = False
|
||||
self.datacallback = None
|
||||
|
||||
def create(nodes, element, configmanager, inputdata):
|
||||
async def create(nodes, element, configmanager, inputdata):
|
||||
if len(nodes) == 1:
|
||||
return TsmConsole(nodes[0], configmanager)
|
||||
yield OpenBmcConsole(nodes[0], configmanager)
|
||||
|
||||
@@ -328,7 +328,7 @@ async def term_interact(authdata, authname, ccons, cfm, connection, consession,
|
||||
consession.send_break()
|
||||
continue
|
||||
elif data['operation'] == 'reopen':
|
||||
consession.reopen()
|
||||
await consession.reopen()
|
||||
continue
|
||||
elif data['operation'] == 'pause':
|
||||
ccons.xmit = False
|
||||
|
||||
Reference in New Issue
Block a user