mirror of
https://github.com/xcat2/confluent.git
synced 2026-04-14 12:51:27 +00:00
Convert vcenter console to async
This commit is contained in:
@@ -1,14 +1,14 @@
|
||||
|
||||
import asyncio
|
||||
import codecs
|
||||
import confluent.util as util
|
||||
import confluent.messages as msg
|
||||
import eventlet
|
||||
import confluent.tasks as tasks
|
||||
import struct
|
||||
import ssl
|
||||
|
||||
import aiohmi.util.webclient as webclient
|
||||
import eventlet.green.socket as socket
|
||||
import eventlet.green.ssl as ssl
|
||||
import eventlet
|
||||
import aiohmi.exceptions as pygexc
|
||||
import confluent.interface.console as conapi
|
||||
import io
|
||||
|
||||
@@ -35,75 +35,95 @@ class VmConsole(conapi.Console):
|
||||
self.tls = tls
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.socket = None
|
||||
self.reader = None
|
||||
self.writer = None
|
||||
self.nodeconfig = configmanager
|
||||
self.connected = False
|
||||
self.recvr = None
|
||||
|
||||
def connect(self, callback):
|
||||
async def connect(self, callback):
|
||||
try:
|
||||
self.socket = socket.create_connection((self.host, self.port))
|
||||
if self.tls:
|
||||
if not self.nodeconfig:
|
||||
raise Exception('config manager instance required for TLS operation')
|
||||
kv = util.TLSCertVerifier(
|
||||
self.nodeconfig, self.host, 'pubkeys.tls').verify_cert
|
||||
sslctx = ssl.create_default_context()
|
||||
sslctx.check_hostname = False
|
||||
sslctx.verify_mode = ssl.CERT_NONE
|
||||
self.reader, self.writer = await asyncio.open_connection(
|
||||
self.host, self.port, ssl=sslctx)
|
||||
# The above disables default validation, replaced by
|
||||
# known-hosts style cert validation below
|
||||
sslobj = self.writer.get_extra_info('ssl_object')
|
||||
bincert = sslobj.getpeercert(binary_form=True)
|
||||
if not kv(bincert):
|
||||
self.writer.close()
|
||||
await self.writer.wait_closed()
|
||||
raise pygexc.UnrecognizedCertificate('Unknown certificate', bincert)
|
||||
else:
|
||||
self.reader, self.writer = await asyncio.open_connection(
|
||||
self.host, self.port)
|
||||
except Exception:
|
||||
callback(conapi.ConsoleEvent.Disconnect)
|
||||
if self.tls:
|
||||
if not self.nodeconfig:
|
||||
raise Exception('config manager instance required for TLS operation')
|
||||
kv = util.TLSCertVerifier(
|
||||
self.nodeconfig, self.host, 'pubkeys.tls').verify_cert
|
||||
sock = ssl.wrap_socket(self.socket, cert_reqs=ssl.CERT_NONE)
|
||||
# The above is supersedeed by the _certverify, which provides
|
||||
# known-hosts style cert validaiton
|
||||
bincert = sock.getpeercert(binary_form=True)
|
||||
if not kv(bincert):
|
||||
raise pygexc.UnrecognizedCertificate('Unknown certificate', bincert)
|
||||
self.socket = sock
|
||||
await callback(conapi.ConsoleEvent.Disconnect)
|
||||
return
|
||||
self.connected = True
|
||||
self.datacallback = callback
|
||||
self.recvr = eventlet.spawn(self.recvdata)
|
||||
self.recvr = tasks.spawn_task(self.recvdata())
|
||||
|
||||
def write(self, data):
|
||||
self.socket.sendall(data)
|
||||
async def write(self, data):
|
||||
self.writer.write(data)
|
||||
await self.writer.drain()
|
||||
|
||||
def close(self):
|
||||
async def close(self):
|
||||
self.connected = False
|
||||
if self.socket:
|
||||
self.socket.close()
|
||||
if self.recvr:
|
||||
self.recvr.cancel()
|
||||
self.recvr = None
|
||||
if self.writer:
|
||||
self.writer.close()
|
||||
await self.writer.wait_closed()
|
||||
|
||||
def recvdata(self):
|
||||
while self.connected:
|
||||
try:
|
||||
pendingdata = self.socket.recv(1024)
|
||||
except Exception as e:
|
||||
pendingdata = b''
|
||||
if pendingdata == b'':
|
||||
self.connected = False
|
||||
self.datacallback(conapi.ConsoleEvent.Disconnect)
|
||||
return
|
||||
reply = b''
|
||||
while pendingdata and pendingdata[0] == 255:
|
||||
cmd = pendingdata[1]
|
||||
if cmd == 255:
|
||||
pendingdata = pendingdata[1:]
|
||||
break
|
||||
subcmd = pendingdata[2]
|
||||
if cmd == 253: # DO
|
||||
# binary, suppress go ohaed
|
||||
if subcmd in (0, 3):
|
||||
reply += b'\xff\xfb' + bytes([subcmd]) # will
|
||||
async def recvdata(self):
|
||||
try:
|
||||
while self.connected:
|
||||
try:
|
||||
pendingdata = await self.reader.read(1024)
|
||||
except Exception:
|
||||
pendingdata = b''
|
||||
if pendingdata == b'':
|
||||
self.connected = False
|
||||
await self.datacallback(conapi.ConsoleEvent.Disconnect)
|
||||
return
|
||||
reply = b''
|
||||
while pendingdata and pendingdata[0] == 255:
|
||||
cmd = pendingdata[1]
|
||||
if cmd == 255:
|
||||
pendingdata = pendingdata[1:]
|
||||
break
|
||||
subcmd = pendingdata[2]
|
||||
if cmd == 253: # DO
|
||||
# binary, suppress go ohaed
|
||||
if subcmd in (0, 3):
|
||||
reply += b'\xff\xfb' + bytes([subcmd]) # will
|
||||
else:
|
||||
reply += b'\xff\xfc' + bytes([subcmd]) # won't do anything else
|
||||
pendingdata = pendingdata[3:]
|
||||
elif cmd == 251: # will
|
||||
# binary, suppress go ahead, echo
|
||||
if subcmd in (0, 1, 3):
|
||||
reply += b'\xff\xfd' + bytes([subcmd]) # do the implemented things
|
||||
else:
|
||||
reply += B'\xff\xfe' + bytes([subcmd]) # don't do others'
|
||||
pendingdata = pendingdata[3:]
|
||||
else:
|
||||
reply += b'\xff\xfc' + bytes([subcmd]) # won't do anything else
|
||||
pendingdata = pendingdata[3:]
|
||||
elif cmd == 251: # will
|
||||
# binary, suppress go ahead, echo
|
||||
if subcmd in (0, 1, 3):
|
||||
reply += b'\xff\xfd' + bytes([subcmd]) # do the implemented things
|
||||
else:
|
||||
reply += B'\xff\xfe' + bytes([subcmd]) # don't do others'
|
||||
pendingdata = pendingdata[3:]
|
||||
else:
|
||||
raise Exception(repr(pendingdata[:3]))
|
||||
if reply:
|
||||
self.write(reply)
|
||||
if pendingdata:
|
||||
self.datacallback(pendingdata)
|
||||
raise Exception(repr(pendingdata[:3]))
|
||||
if reply:
|
||||
await self.write(reply)
|
||||
if pendingdata:
|
||||
await self.datacallback(pendingdata)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user