From b7f6c158ea71c1507b3e7e3dd1c9b8187f718497 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Sat, 2 May 2026 12:37:52 -0400 Subject: [PATCH] Switch to homegrown async vnc implementation The pip ones didn't support tight. Further, when switching to streaming, they were a bit hiccupy with performance. --- confluent_client/bin/nodeconsole | 39 ++--- confluent_client/confluent/vnc.py | 237 ++++++++++++++++++++++++++++++ 2 files changed, 251 insertions(+), 25 deletions(-) create mode 100644 confluent_client/confluent/vnc.py diff --git a/confluent_client/bin/nodeconsole b/confluent_client/bin/nodeconsole index 097cf017..cd7bdb65 100755 --- a/confluent_client/bin/nodeconsole +++ b/confluent_client/bin/nodeconsole @@ -28,6 +28,7 @@ if path.startswith('/opt'): import confluent.client as client import confluent.sortutil as sortutil import confluent.logreader as logreader +import confluent.vnc as vnc import time import select import signal @@ -498,19 +499,16 @@ async def do_screenshot(streaming=False): if errorstr or imgdata: imgdata = base64.b64decode(imgdata) draw_node(node, imgdata, errorstr, firstnodename, cwidth, cheight) - if asyncvnc: - urlbynode = {} - for node in vnconly: - for res in sess.update(f'/nodes/{node}/console/ikvm', {'method': 'unix'}): - url = res.get('item', {}).get('href') - if url: - urlbynode[node] = url - await grab_vncs(urlbynode, cwidth, cheight, streaming) - if resized: - do_resize(True) - resized = False - elif vnconly: - sys.stderr.write("Require asyncvnc installed to do VNC screenshotting\n") + urlbynode = {} + for node in vnconly: + for res in sess.update(f'/nodes/{node}/console/ikvm', {'method': 'unix'}): + url = res.get('item', {}).get('href') + if url: + urlbynode[node] = url + await grab_vncs(urlbynode, cwidth, cheight, streaming) + if resized: + do_resize(True) + resized = False if options.interval is None: dorefresh = False else: @@ -518,10 +516,7 @@ async def do_screenshot(streaming=False): time.sleep(options.interval) sys.exit(0) -try: - import asyncio, asyncvnc -except ImportError: - asyncvnc = None +import asyncio async def grab_vncs(urlbynode, cwidth, cheight, streaming=False): tasks = [] @@ -530,16 +525,12 @@ async def grab_vncs(urlbynode, cwidth, cheight, streaming=False): tasks.append(asyncio.create_task(do_vnc(node, url, cwidth, cheight, streaming))) await asyncio.gather(*tasks) -async def my_opener(host, port): - # really, host is the unix - return await asyncio.open_unix_connection(host) - async def do_vnc(node, url, cwidth, cheight, streaming=False): keeprunning = True retries = 5 while keeprunning: try: - async with asyncvnc.connect(url, opener=my_opener) as client: + async with vnc.VNCClient.create(url) as client: while True: # Retrieve pixels as a 3D numpy array try: @@ -550,7 +541,7 @@ async def do_vnc(node, url, cwidth, cheight, streaming=False): # but OpenBMC sends an alpha heavy mouse overlay # that results in blackness # possibly use client side cursor to suppress the ick? - pixels = await asyncio.wait_for(client.screenshot(), 4) + image = await client.get_screenshot() except asyncio.TimeoutError: # need a better closed connection detector, a static screen triggers timeouts too # without the timeout, we lose track of proxmox reset @@ -558,7 +549,6 @@ async def do_vnc(node, url, cwidth, cheight, streaming=False): break retries = 5 # Save as PNG using PIL/pillow - image = Image.fromarray(pixels) outfile = io.BytesIO() image.save(outfile, format='PNG') imgdata = outfile.getbuffer() @@ -567,7 +557,6 @@ async def do_vnc(node, url, cwidth, cheight, streaming=False): if not streaming: keeprunning = False break - await asyncio.sleep(0) except ValueError as e: draw_node(node, None, str(e), '', cwidth, cheight) retries -= 1 diff --git a/confluent_client/confluent/vnc.py b/confluent_client/confluent/vnc.py new file mode 100644 index 00000000..253e28c9 --- /dev/null +++ b/confluent_client/confluent/vnc.py @@ -0,0 +1,237 @@ +import asyncio +from PIL import Image +import io +import zlib + +# This results in an RGBA organization of pixels +MYPIXFORMAT = bytearray([ + 32, # bits per pixel + 24, # depth + 0, # big endian + 1, # true color + 0, 255, # red max + 0, 255, # green max + 0, 255, # blue max + 0, 8, 16, # red shift, green shift, blue shift + 0, 0, 0 # padding +]) + +class ByteStream: + def __init__(self): + self.buffer = b'' + + def add_number(self, number, num_bytes): + data = number.to_bytes(num_bytes, byteorder='big') + self.buffer += data + + def extend(self, data): + self.buffer += data + + def get_bytes(self): + return self.buffer + + def clear(self): + self.buffer = b'' + + def flush(self, writer): + writer.write(self.buffer) + self.clear() + +class VNCClient: + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + return False + + @classmethod + async def create(cls, url): + self = cls() + if url.startswith('unix://'): + url = url.replace('unix://', '') + if url.startswith('/'): + self.reader, self.writer = await asyncio.open_unix_connection(url) + elif url.startswith('@'): + url = '\0' + url[1:] + self.reader, self.writer = await asyncio.open_unix_connection(url) + elif url.startswith('tcp://'): + url = url.replace('tcp://', '') + host, port = url.split(':') + self.reader, self.writer = await asyncio.open_connection(host, int(port)) + else: + raise ValueError('Unsupported URL: {}'.format(url)) + self.receiver = None + self.framebuffer = None + self.copytext = None + self._updating = True + self.decompressor = zlib.decompressobj() + await self._do_vnc_handshake() + return self + + async def _read_number(self, num_bytes): + data = await self.reader.readexactly(num_bytes) + return int.from_bytes(data, byteorder='big') + + def _write_number(self, number, num_bytes): + data = number.to_bytes(num_bytes, byteorder='big') + self.writer.write(data) + return data + + async def get_screenshot(self): + while self._updating: + await asyncio.sleep(0.1) + await asyncio.sleep(0) + if self.framebuffer is None: + raise Exception('No framebuffer data available') + self._updating = True + return self.framebuffer.copy() + + async def _do_vnc_handshake(self): + rfbver = await self.reader.readline() + if not rfbver.startswith(b'RFB 003.008'): + self.writer.close() + await self.writer.wait_closed() + raise Exception('Unsupported RFB version') + self.writer.write(b'RFB 003.008\n') + numsectypes = await self._read_number(1) + if not numsectypes: + self.writer.close() + await self.writer.wait_closed() + raise Exception('No security types supported by the server') + sectypes = await self.reader.readexactly(numsectypes) + sectypes = bytearray(sectypes) + secresult = 1 + if 1 in sectypes: + self.writer.write(b'\x01') + await self.writer.drain() + secresult = await self._read_number(4) # Security result + if secresult != 0: + self.writer.close() + await self.writer.wait_closed() + raise Exception('VNC authentication failed') + self.writer.write(b'\x01') # Share display + self.width = await self._read_number(2) + self.height = await self._read_number(2) + pixformat = await self.reader.readexactly(16) + name_length = await self._read_number(4) + self.name = await self.reader.readexactly(name_length) + payload = ByteStream() + if pixformat != MYPIXFORMAT: + payload.add_number(0, 1) # Set pixel format + payload.add_number(0, 3) # Padding + payload.extend(MYPIXFORMAT) + payload.flush(self.writer) + self.receiver = asyncio.create_task(self._receive_loop()) + payload.add_number(2, 1) # Set encodings + payload.add_number(0, 1) # Padding + payload.add_number(2, 2) # Number of encodings + payload.add_number(6, 4) # zlib + payload.add_number(7, 4) # tight + #payload.add_number(-223, 4) # desktopsize + payload.flush(self.writer) + self._request_screen_update(incremental=False) + + def _request_screen_update(self, incremental=True): + incremental = 1 if incremental else 0 + payload = ByteStream() + payload.add_number(3, 1) # Framebuffer update request + payload.add_number(incremental, 1) # Incremental + payload.add_number(0, 2) # x position + payload.add_number(0, 2) # y position + payload.add_number(self.width, 2) # width + payload.add_number(self.height, 2) # height + payload.flush(self.writer) + + async def _receive_loop(self): + while True: + try: + message_type = await self._read_number(1) + if message_type == 0: # Framebuffer update + await self._handle_framebuffer_update() + elif message_type == 1: # Set color map entries + raise NotImplementedError('Set color map entries not implemented') + elif message_type == 2: # Bell + pass + elif message_type == 3: # Server cut text + padding = await self._read_number(3) + length = await self._read_number(4) + self.copytext = await self.reader.readexactly(length) + else: + raise Exception(f'Unknown message type: {message_type}') + except Exception as e: + print(f"Error in receive loop: {e}") + break + + async def _handle_framebuffer_update(self): + _ = await self._read_number(1) # Padding + num_rects = await self._read_number(2) + self._updating = True + for _ in range(num_rects): + await self._handle_rectangle() + self._updating = False + self._request_screen_update(incremental=True) + + async def _handle_rectangle(self): + if self.framebuffer == None: + self.framebuffer = Image.new('RGBA', (self.width, self.height)) + x = await self._read_number(2) + y = await self._read_number(2) + width = await self._read_number(2) + height = await self._read_number(2) + encoding_type = await self._read_number(4) + pixel_data = None + if encoding_type == 6: + compressed_data_length = await self._read_number(4) + compressed_data = await self.reader.readexactly(compressed_data_length) + # Decompress the data using zlib and store it in the framebuffer + pixel_data = self.decompressor.decompress(compressed_data) + elif encoding_type == 0: + pixel_data = await self.reader.readexactly(width * height * 4) # Assuming 32 bits per pixel + if encoding_type == -223: # desktopsize + self.width = width + self.height = height + self.framebuffer = Image.new('RGBA', (self.width, self.height)) + elif pixel_data: + pixel_data = bytearray(pixel_data) + for i in range(3, len(pixel_data), 4): + pixel_data[i] = 0xff + img = Image.frombytes('RGBA', (width, height), bytes(pixel_data)) + self.framebuffer.paste(img, (x, y)) + elif encoding_type == 6: # zlib + compressed_data_length = await self._read_number(4) + compressed_data = await self.reader.readexactly(compressed_data_length) + # Decompress the data using zlib and store it in the framebuffer + pixel_data = self.decompressor.decompress(compressed_data) + img = Image.frombytes('RGBA', (width, height), pixel_data) + self.framebuffer.paste(img, (x, y)) + elif encoding_type == 7: # tight + # Best document I could see was: + # https://github.com/TurboVNC/tightvnc/blob/main/vnc_winsrc/rfb/rfbproto.h + tightheader = await self._read_number(1) + streamid = tightheader & 0x0F + if streamid: + raise NotImplementedError('tight encoding with streamid not implemented') + comptype = (tightheader >> 4) & 0x0F + if comptype != 9: + raise NotImplementedError(f'tight encoding with comptype {comptype} not implemented') + compressed_data_length = await self._read_tight_length() + compressed_data = await self.reader.readexactly(compressed_data_length) + jpgimg = io.BytesIO(compressed_data) + img = Image.open(jpgimg) + self.framebuffer.paste(img, (x, y)) + else: + raise Exception(f'Unsupported encoding type: {encoding_type}') + + async def _read_tight_length(self): + length = 0 + for i in range(3): + byte = await self._read_number(1) + length |= ((byte & 0x7F) << (i * 7)) + if not (byte & 0x80): + break + return length + async def close(self): + self.writer.close() + await self.writer.wait_closed() \ No newline at end of file