diff --git a/confluent_server/confluent/plugins/hardwaremanagement/vcenter.py b/confluent_server/confluent/plugins/hardwaremanagement/vcenter.py index 1f1cf82a..79cbe37d 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/vcenter.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/vcenter.py @@ -1,14 +1,14 @@ +import asyncio import codecs import confluent.util as util import confluent.messages as msg -import eventlet +import confluent.tasks as tasks import struct +import ssl import aiohmi.util.webclient as webclient -import eventlet.green.socket as socket -import eventlet.green.ssl as ssl -import eventlet +import aiohmi.exceptions as pygexc import confluent.interface.console as conapi import io @@ -35,75 +35,95 @@ class VmConsole(conapi.Console): self.tls = tls self.host = host self.port = port - self.socket = None + self.reader = None + self.writer = None self.nodeconfig = configmanager + self.connected = False + self.recvr = None - def connect(self, callback): + async def connect(self, callback): try: - self.socket = socket.create_connection((self.host, self.port)) + if self.tls: + if not self.nodeconfig: + raise Exception('config manager instance required for TLS operation') + kv = util.TLSCertVerifier( + self.nodeconfig, self.host, 'pubkeys.tls').verify_cert + sslctx = ssl.create_default_context() + sslctx.check_hostname = False + sslctx.verify_mode = ssl.CERT_NONE + self.reader, self.writer = await asyncio.open_connection( + self.host, self.port, ssl=sslctx) + # The above disables default validation, replaced by + # known-hosts style cert validation below + sslobj = self.writer.get_extra_info('ssl_object') + bincert = sslobj.getpeercert(binary_form=True) + if not kv(bincert): + self.writer.close() + await self.writer.wait_closed() + raise pygexc.UnrecognizedCertificate('Unknown certificate', bincert) + else: + self.reader, self.writer = await asyncio.open_connection( + self.host, self.port) except Exception: - callback(conapi.ConsoleEvent.Disconnect) - if self.tls: - if not self.nodeconfig: - raise Exception('config manager instance required for TLS operation') - kv = util.TLSCertVerifier( - self.nodeconfig, self.host, 'pubkeys.tls').verify_cert - sock = ssl.wrap_socket(self.socket, cert_reqs=ssl.CERT_NONE) - # The above is supersedeed by the _certverify, which provides - # known-hosts style cert validaiton - bincert = sock.getpeercert(binary_form=True) - if not kv(bincert): - raise pygexc.UnrecognizedCertificate('Unknown certificate', bincert) - self.socket = sock + await callback(conapi.ConsoleEvent.Disconnect) + return self.connected = True self.datacallback = callback - self.recvr = eventlet.spawn(self.recvdata) + self.recvr = tasks.spawn_task(self.recvdata()) - def write(self, data): - self.socket.sendall(data) + async def write(self, data): + self.writer.write(data) + await self.writer.drain() - def close(self): + async def close(self): self.connected = False - if self.socket: - self.socket.close() + if self.recvr: + self.recvr.cancel() + self.recvr = None + if self.writer: + self.writer.close() + await self.writer.wait_closed() - def recvdata(self): - while self.connected: - try: - pendingdata = self.socket.recv(1024) - except Exception as e: - pendingdata = b'' - if pendingdata == b'': - self.connected = False - self.datacallback(conapi.ConsoleEvent.Disconnect) - return - reply = b'' - while pendingdata and pendingdata[0] == 255: - cmd = pendingdata[1] - if cmd == 255: - pendingdata = pendingdata[1:] - break - subcmd = pendingdata[2] - if cmd == 253: # DO - # binary, suppress go ohaed - if subcmd in (0, 3): - reply += b'\xff\xfb' + bytes([subcmd]) # will + async def recvdata(self): + try: + while self.connected: + try: + pendingdata = await self.reader.read(1024) + except Exception: + pendingdata = b'' + if pendingdata == b'': + self.connected = False + await self.datacallback(conapi.ConsoleEvent.Disconnect) + return + reply = b'' + while pendingdata and pendingdata[0] == 255: + cmd = pendingdata[1] + if cmd == 255: + pendingdata = pendingdata[1:] + break + subcmd = pendingdata[2] + if cmd == 253: # DO + # binary, suppress go ohaed + if subcmd in (0, 3): + reply += b'\xff\xfb' + bytes([subcmd]) # will + else: + reply += b'\xff\xfc' + bytes([subcmd]) # won't do anything else + pendingdata = pendingdata[3:] + elif cmd == 251: # will + # binary, suppress go ahead, echo + if subcmd in (0, 1, 3): + reply += b'\xff\xfd' + bytes([subcmd]) # do the implemented things + else: + reply += B'\xff\xfe' + bytes([subcmd]) # don't do others' + pendingdata = pendingdata[3:] else: - reply += b'\xff\xfc' + bytes([subcmd]) # won't do anything else - pendingdata = pendingdata[3:] - elif cmd == 251: # will - # binary, suppress go ahead, echo - if subcmd in (0, 1, 3): - reply += b'\xff\xfd' + bytes([subcmd]) # do the implemented things - else: - reply += B'\xff\xfe' + bytes([subcmd]) # don't do others' - pendingdata = pendingdata[3:] - else: - raise Exception(repr(pendingdata[:3])) - if reply: - self.write(reply) - if pendingdata: - self.datacallback(pendingdata) + raise Exception(repr(pendingdata[:3])) + if reply: + await self.write(reply) + if pendingdata: + await self.datacallback(pendingdata) + except asyncio.CancelledError: + pass