mirror of
https://github.com/xcat2/confluent.git
synced 2026-05-07 17:27:16 +00:00
Switch to homegrown async vnc implementation
The pip ones didn't support tight. Further, when switching to streaming, they were a bit hiccupy with performance.
This commit is contained in:
@@ -28,6 +28,7 @@ if path.startswith('/opt'):
|
||||
import confluent.client as client
|
||||
import confluent.sortutil as sortutil
|
||||
import confluent.logreader as logreader
|
||||
import confluent.vnc as vnc
|
||||
import time
|
||||
import select
|
||||
import signal
|
||||
@@ -498,19 +499,16 @@ async def do_screenshot(streaming=False):
|
||||
if errorstr or imgdata:
|
||||
imgdata = base64.b64decode(imgdata)
|
||||
draw_node(node, imgdata, errorstr, firstnodename, cwidth, cheight)
|
||||
if asyncvnc:
|
||||
urlbynode = {}
|
||||
for node in vnconly:
|
||||
for res in sess.update(f'/nodes/{node}/console/ikvm', {'method': 'unix'}):
|
||||
url = res.get('item', {}).get('href')
|
||||
if url:
|
||||
urlbynode[node] = url
|
||||
await grab_vncs(urlbynode, cwidth, cheight, streaming)
|
||||
if resized:
|
||||
do_resize(True)
|
||||
resized = False
|
||||
elif vnconly:
|
||||
sys.stderr.write("Require asyncvnc installed to do VNC screenshotting\n")
|
||||
urlbynode = {}
|
||||
for node in vnconly:
|
||||
for res in sess.update(f'/nodes/{node}/console/ikvm', {'method': 'unix'}):
|
||||
url = res.get('item', {}).get('href')
|
||||
if url:
|
||||
urlbynode[node] = url
|
||||
await grab_vncs(urlbynode, cwidth, cheight, streaming)
|
||||
if resized:
|
||||
do_resize(True)
|
||||
resized = False
|
||||
if options.interval is None:
|
||||
dorefresh = False
|
||||
else:
|
||||
@@ -518,10 +516,7 @@ async def do_screenshot(streaming=False):
|
||||
time.sleep(options.interval)
|
||||
sys.exit(0)
|
||||
|
||||
try:
|
||||
import asyncio, asyncvnc
|
||||
except ImportError:
|
||||
asyncvnc = None
|
||||
import asyncio
|
||||
|
||||
async def grab_vncs(urlbynode, cwidth, cheight, streaming=False):
|
||||
tasks = []
|
||||
@@ -530,16 +525,12 @@ async def grab_vncs(urlbynode, cwidth, cheight, streaming=False):
|
||||
tasks.append(asyncio.create_task(do_vnc(node, url, cwidth, cheight, streaming)))
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
async def my_opener(host, port):
|
||||
# really, host is the unix
|
||||
return await asyncio.open_unix_connection(host)
|
||||
|
||||
async def do_vnc(node, url, cwidth, cheight, streaming=False):
|
||||
keeprunning = True
|
||||
retries = 5
|
||||
while keeprunning:
|
||||
try:
|
||||
async with asyncvnc.connect(url, opener=my_opener) as client:
|
||||
async with vnc.VNCClient.create(url) as client:
|
||||
while True:
|
||||
# Retrieve pixels as a 3D numpy array
|
||||
try:
|
||||
@@ -550,7 +541,7 @@ async def do_vnc(node, url, cwidth, cheight, streaming=False):
|
||||
# but OpenBMC sends an alpha heavy mouse overlay
|
||||
# that results in blackness
|
||||
# possibly use client side cursor to suppress the ick?
|
||||
pixels = await asyncio.wait_for(client.screenshot(), 4)
|
||||
image = await client.get_screenshot()
|
||||
except asyncio.TimeoutError:
|
||||
# need a better closed connection detector, a static screen triggers timeouts too
|
||||
# without the timeout, we lose track of proxmox reset
|
||||
@@ -558,7 +549,6 @@ async def do_vnc(node, url, cwidth, cheight, streaming=False):
|
||||
break
|
||||
retries = 5
|
||||
# Save as PNG using PIL/pillow
|
||||
image = Image.fromarray(pixels)
|
||||
outfile = io.BytesIO()
|
||||
image.save(outfile, format='PNG')
|
||||
imgdata = outfile.getbuffer()
|
||||
@@ -567,7 +557,6 @@ async def do_vnc(node, url, cwidth, cheight, streaming=False):
|
||||
if not streaming:
|
||||
keeprunning = False
|
||||
break
|
||||
await asyncio.sleep(0)
|
||||
except ValueError as e:
|
||||
draw_node(node, None, str(e), '', cwidth, cheight)
|
||||
retries -= 1
|
||||
|
||||
@@ -0,0 +1,237 @@
|
||||
import asyncio
|
||||
from PIL import Image
|
||||
import io
|
||||
import zlib
|
||||
|
||||
# This results in an RGBA organization of pixels
|
||||
MYPIXFORMAT = bytearray([
|
||||
32, # bits per pixel
|
||||
24, # depth
|
||||
0, # big endian
|
||||
1, # true color
|
||||
0, 255, # red max
|
||||
0, 255, # green max
|
||||
0, 255, # blue max
|
||||
0, 8, 16, # red shift, green shift, blue shift
|
||||
0, 0, 0 # padding
|
||||
])
|
||||
|
||||
class ByteStream:
|
||||
def __init__(self):
|
||||
self.buffer = b''
|
||||
|
||||
def add_number(self, number, num_bytes):
|
||||
data = number.to_bytes(num_bytes, byteorder='big')
|
||||
self.buffer += data
|
||||
|
||||
def extend(self, data):
|
||||
self.buffer += data
|
||||
|
||||
def get_bytes(self):
|
||||
return self.buffer
|
||||
|
||||
def clear(self):
|
||||
self.buffer = b''
|
||||
|
||||
def flush(self, writer):
|
||||
writer.write(self.buffer)
|
||||
self.clear()
|
||||
|
||||
class VNCClient:
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
await self.close()
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
async def create(cls, url):
|
||||
self = cls()
|
||||
if url.startswith('unix://'):
|
||||
url = url.replace('unix://', '')
|
||||
if url.startswith('/'):
|
||||
self.reader, self.writer = await asyncio.open_unix_connection(url)
|
||||
elif url.startswith('@'):
|
||||
url = '\0' + url[1:]
|
||||
self.reader, self.writer = await asyncio.open_unix_connection(url)
|
||||
elif url.startswith('tcp://'):
|
||||
url = url.replace('tcp://', '')
|
||||
host, port = url.split(':')
|
||||
self.reader, self.writer = await asyncio.open_connection(host, int(port))
|
||||
else:
|
||||
raise ValueError('Unsupported URL: {}'.format(url))
|
||||
self.receiver = None
|
||||
self.framebuffer = None
|
||||
self.copytext = None
|
||||
self._updating = True
|
||||
self.decompressor = zlib.decompressobj()
|
||||
await self._do_vnc_handshake()
|
||||
return self
|
||||
|
||||
async def _read_number(self, num_bytes):
|
||||
data = await self.reader.readexactly(num_bytes)
|
||||
return int.from_bytes(data, byteorder='big')
|
||||
|
||||
def _write_number(self, number, num_bytes):
|
||||
data = number.to_bytes(num_bytes, byteorder='big')
|
||||
self.writer.write(data)
|
||||
return data
|
||||
|
||||
async def get_screenshot(self):
|
||||
while self._updating:
|
||||
await asyncio.sleep(0.1)
|
||||
await asyncio.sleep(0)
|
||||
if self.framebuffer is None:
|
||||
raise Exception('No framebuffer data available')
|
||||
self._updating = True
|
||||
return self.framebuffer.copy()
|
||||
|
||||
async def _do_vnc_handshake(self):
|
||||
rfbver = await self.reader.readline()
|
||||
if not rfbver.startswith(b'RFB 003.008'):
|
||||
self.writer.close()
|
||||
await self.writer.wait_closed()
|
||||
raise Exception('Unsupported RFB version')
|
||||
self.writer.write(b'RFB 003.008\n')
|
||||
numsectypes = await self._read_number(1)
|
||||
if not numsectypes:
|
||||
self.writer.close()
|
||||
await self.writer.wait_closed()
|
||||
raise Exception('No security types supported by the server')
|
||||
sectypes = await self.reader.readexactly(numsectypes)
|
||||
sectypes = bytearray(sectypes)
|
||||
secresult = 1
|
||||
if 1 in sectypes:
|
||||
self.writer.write(b'\x01')
|
||||
await self.writer.drain()
|
||||
secresult = await self._read_number(4) # Security result
|
||||
if secresult != 0:
|
||||
self.writer.close()
|
||||
await self.writer.wait_closed()
|
||||
raise Exception('VNC authentication failed')
|
||||
self.writer.write(b'\x01') # Share display
|
||||
self.width = await self._read_number(2)
|
||||
self.height = await self._read_number(2)
|
||||
pixformat = await self.reader.readexactly(16)
|
||||
name_length = await self._read_number(4)
|
||||
self.name = await self.reader.readexactly(name_length)
|
||||
payload = ByteStream()
|
||||
if pixformat != MYPIXFORMAT:
|
||||
payload.add_number(0, 1) # Set pixel format
|
||||
payload.add_number(0, 3) # Padding
|
||||
payload.extend(MYPIXFORMAT)
|
||||
payload.flush(self.writer)
|
||||
self.receiver = asyncio.create_task(self._receive_loop())
|
||||
payload.add_number(2, 1) # Set encodings
|
||||
payload.add_number(0, 1) # Padding
|
||||
payload.add_number(2, 2) # Number of encodings
|
||||
payload.add_number(6, 4) # zlib
|
||||
payload.add_number(7, 4) # tight
|
||||
#payload.add_number(-223, 4) # desktopsize
|
||||
payload.flush(self.writer)
|
||||
self._request_screen_update(incremental=False)
|
||||
|
||||
def _request_screen_update(self, incremental=True):
|
||||
incremental = 1 if incremental else 0
|
||||
payload = ByteStream()
|
||||
payload.add_number(3, 1) # Framebuffer update request
|
||||
payload.add_number(incremental, 1) # Incremental
|
||||
payload.add_number(0, 2) # x position
|
||||
payload.add_number(0, 2) # y position
|
||||
payload.add_number(self.width, 2) # width
|
||||
payload.add_number(self.height, 2) # height
|
||||
payload.flush(self.writer)
|
||||
|
||||
async def _receive_loop(self):
|
||||
while True:
|
||||
try:
|
||||
message_type = await self._read_number(1)
|
||||
if message_type == 0: # Framebuffer update
|
||||
await self._handle_framebuffer_update()
|
||||
elif message_type == 1: # Set color map entries
|
||||
raise NotImplementedError('Set color map entries not implemented')
|
||||
elif message_type == 2: # Bell
|
||||
pass
|
||||
elif message_type == 3: # Server cut text
|
||||
padding = await self._read_number(3)
|
||||
length = await self._read_number(4)
|
||||
self.copytext = await self.reader.readexactly(length)
|
||||
else:
|
||||
raise Exception(f'Unknown message type: {message_type}')
|
||||
except Exception as e:
|
||||
print(f"Error in receive loop: {e}")
|
||||
break
|
||||
|
||||
async def _handle_framebuffer_update(self):
|
||||
_ = await self._read_number(1) # Padding
|
||||
num_rects = await self._read_number(2)
|
||||
self._updating = True
|
||||
for _ in range(num_rects):
|
||||
await self._handle_rectangle()
|
||||
self._updating = False
|
||||
self._request_screen_update(incremental=True)
|
||||
|
||||
async def _handle_rectangle(self):
|
||||
if self.framebuffer == None:
|
||||
self.framebuffer = Image.new('RGBA', (self.width, self.height))
|
||||
x = await self._read_number(2)
|
||||
y = await self._read_number(2)
|
||||
width = await self._read_number(2)
|
||||
height = await self._read_number(2)
|
||||
encoding_type = await self._read_number(4)
|
||||
pixel_data = None
|
||||
if encoding_type == 6:
|
||||
compressed_data_length = await self._read_number(4)
|
||||
compressed_data = await self.reader.readexactly(compressed_data_length)
|
||||
# Decompress the data using zlib and store it in the framebuffer
|
||||
pixel_data = self.decompressor.decompress(compressed_data)
|
||||
elif encoding_type == 0:
|
||||
pixel_data = await self.reader.readexactly(width * height * 4) # Assuming 32 bits per pixel
|
||||
if encoding_type == -223: # desktopsize
|
||||
self.width = width
|
||||
self.height = height
|
||||
self.framebuffer = Image.new('RGBA', (self.width, self.height))
|
||||
elif pixel_data:
|
||||
pixel_data = bytearray(pixel_data)
|
||||
for i in range(3, len(pixel_data), 4):
|
||||
pixel_data[i] = 0xff
|
||||
img = Image.frombytes('RGBA', (width, height), bytes(pixel_data))
|
||||
self.framebuffer.paste(img, (x, y))
|
||||
elif encoding_type == 6: # zlib
|
||||
compressed_data_length = await self._read_number(4)
|
||||
compressed_data = await self.reader.readexactly(compressed_data_length)
|
||||
# Decompress the data using zlib and store it in the framebuffer
|
||||
pixel_data = self.decompressor.decompress(compressed_data)
|
||||
img = Image.frombytes('RGBA', (width, height), pixel_data)
|
||||
self.framebuffer.paste(img, (x, y))
|
||||
elif encoding_type == 7: # tight
|
||||
# Best document I could see was:
|
||||
# https://github.com/TurboVNC/tightvnc/blob/main/vnc_winsrc/rfb/rfbproto.h
|
||||
tightheader = await self._read_number(1)
|
||||
streamid = tightheader & 0x0F
|
||||
if streamid:
|
||||
raise NotImplementedError('tight encoding with streamid not implemented')
|
||||
comptype = (tightheader >> 4) & 0x0F
|
||||
if comptype != 9:
|
||||
raise NotImplementedError(f'tight encoding with comptype {comptype} not implemented')
|
||||
compressed_data_length = await self._read_tight_length()
|
||||
compressed_data = await self.reader.readexactly(compressed_data_length)
|
||||
jpgimg = io.BytesIO(compressed_data)
|
||||
img = Image.open(jpgimg)
|
||||
self.framebuffer.paste(img, (x, y))
|
||||
else:
|
||||
raise Exception(f'Unsupported encoding type: {encoding_type}')
|
||||
|
||||
async def _read_tight_length(self):
|
||||
length = 0
|
||||
for i in range(3):
|
||||
byte = await self._read_number(1)
|
||||
length |= ((byte & 0x7F) << (i * 7))
|
||||
if not (byte & 0x80):
|
||||
break
|
||||
return length
|
||||
async def close(self):
|
||||
self.writer.close()
|
||||
await self.writer.wait_closed()
|
||||
Reference in New Issue
Block a user