2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-02-26 02:09:17 +00:00

Draft of converting tsmsol to asyncio

This commit is contained in:
Jarrod Johnson
2026-02-25 16:19:05 -05:00
parent 36898ba570
commit 61c063adf4

View File

@@ -19,19 +19,29 @@
# specification. consoleserver or shellserver would be equally likely
# to use this.
import asyncio
import confluent.exceptions as cexc
import confluent.interface.console as conapi
import confluent.log as log
import confluent.tasks as tasks
import confluent.util as util
import aiohmi.exceptions as pygexc
import aiohmi.redfish.command as rcmd
import eventlet
import eventlet.green.ssl as ssl
try:
websocket = eventlet.import_patched('websocket')
wso = websocket.WebSocket
except Exception:
wso = object
import aiohmi.util.webclient as webclient
import aiohttp
class CustomVerifier(aiohttp.Fingerprint):
def __init__(self, verifycallback):
self._certverify = verifycallback
def check(self, transport):
sslobj = transport.get_extra_info("ssl_object")
cert = sslobj.getpeercert(binary_form=True)
if not self._certverify(cert):
transport.close()
raise pygexc.UnrecognizedCertificate('Unknown certificate',
cert)
def get_conn_params(node, configdata):
if 'secret.hardwaremanagementuser' in configdata:
@@ -56,53 +66,6 @@ _configattributes = ('secret.hardwaremanagementuser',
'secret.hardwaremanagementpassword',
'hardwaremanagement.manager')
class WrappedWebSocket(wso):
def set_verify_callback(self, callback):
self._certverify = callback
def connect(self, url, **options):
add_tls = url.startswith('wss://')
if add_tls:
hostname, port, resource, _ = websocket._url.parse_url(url)
if hostname[0] != '[' and ':' in hostname:
hostname = '[{0}]'.format(hostname)
if resource[0] != '/':
resource = '/{0}'.format(resource)
url = 'ws://{0}:443{1}'.format(hostname,resource)
else:
return super(WrappedWebSocket, self).connect(url, **options)
self.sock_opt.timeout = options.get('timeout', self.sock_opt.timeout)
self.sock, addrs = websocket._http.connect(url, self.sock_opt, websocket._http.proxy_info(**options),
options.pop('socket', None))
self.sock = ssl.wrap_socket(self.sock, cert_reqs=ssl.CERT_NONE)
# The above is supersedeed by the _certverify, which provides
# known-hosts style cert validaiton
bincert = self.sock.getpeercert(binary_form=True)
if not self._certverify(bincert):
raise pygexc.UnrecognizedCertificate('Unknown certificate', bincert)
try:
try:
self.handshake_response = websocket._handshake.handshake(self.sock, *addrs, **options)
except TypeError:
self.handshake_response = websocket._handshake.handshake(self.sock, url, *addrs, **options)
if self.handshake_response.status in websocket._handshake.SUPPORTED_REDIRECT_STATUSES:
options['redirect_limit'] = options.pop('redirect_limit', 3) - 1
if options['redirect_limit'] < 0:
raise Exception('Redirect limit hit')
url = self.handshake_response.headers['location']
self.sock.close()
return self.connect(url, **options)
self.connected = True
except:
if self.sock:
self.sock.close()
self.sock = None
raise
class TsmConsole(conapi.Console):
@@ -121,44 +84,68 @@ class TsmConsole(conapi.Console):
self.datacallback = None
self.nodeconfig = config
self.connected = False
self.recvr = None
def recvdata(self):
while self.connected:
pendingdata = self.ws.recv()
if pendingdata == '':
self.datacallback(conapi.ConsoleEvent.Disconnect)
return
self.datacallback(pendingdata)
async def recvdata(self):
try:
while self.connected:
pendingdata = await self.ws.receive()
if pendingdata.type == aiohttp.WSMsgType.BINARY:
await self.datacallback(pendingdata.data)
continue
elif pendingdata.type == aiohttp.WSMsgType.TEXT:
await self.datacallback(pendingdata.data.encode())
continue
elif pendingdata.type == aiohttp.WSMsgType.CLOSE:
await self.datacallback(conapi.ConsoleEvent.Disconnect)
return
else:
print("Unknown response in WSConsoleHandler")
except asyncio.CancelledError:
pass
def connect(self, callback):
async def connect(self, callback):
self.datacallback = callback
rc = rcmd.Command(self.origbmc, self.username,
self.password,
verifycallback=lambda x: True)
wc = rc.oem.wc
kv = util.TLSCertVerifier(
self.nodeconfig, self.node, 'pubkeys.tls_hardwaremanager').verify_cert
wc = webclient.WebConnection(self.origbmc, 443, verifycallback=kv)
try:
rc = rcmd.Command(self.origbmc, self.username,
self.password,
verifycallback=kv)
await rc.await_redirect()
except Exception as e:
raise cexc.TargetEndpointUnreachable(str(e))
bmc = self.bmc
if '%' in self.bmc:
prefix = self.bmc.split('%')[0]
bmc = prefix + ']'
self.ws = WrappedWebSocket(host=bmc)
kv = util.TLSCertVerifier(
self.nodeconfig, self.node, 'pubkeys.tls_hardwaremanager').verify_cert
self.ws.set_verify_callback(kv)
self.ws.connect('wss://{0}/sol?CSRFTOKEN={1}'.format(self.bmc, rc.oem.csrftok), host=bmc, cookie='QSESSIONID={0}'.format(wc.cookies['QSESSIONID']))
self.ssl = CustomVerifier(kv)
self.clisess = aiohttp.ClientSession(cookie_jar=rc.oem.wc.cookies)
self.ws = await self.clisess.ws_connect(
'wss://{0}/sol?CSRFTOKEN={1}'.format(self.bmc, rc.oem.csrftok),
ssl=self.ssl)
self.connected = True
eventlet.spawn_n(self.recvdata)
self.recvr = tasks.spawn_task(self.recvdata())
return
def write(self, data):
self.ws.send(data)
async def write(self, data):
try:
await self.ws.send_str(data.decode())
except Exception as e:
print(repr(e))
await self.datacallback(conapi.ConsoleEvent.Disconnect)
def close(self):
async def close(self):
if self.recvr:
self.recvr.cancel()
self.recvr = None
if self.ws:
self.ws.close()
await self.ws.close()
self.connected = False
self.datacallback = None
def create(nodes, element, configmanager, inputdata):
async def create(nodes, element, configmanager, inputdata):
if len(nodes) == 1:
return TsmConsole(nodes[0], configmanager)
yield TsmConsole(nodes[0], configmanager)