diff --git a/confluent_client/bin/nodeconsole b/confluent_client/bin/nodeconsole index 5b36c575..4c2ef1db 100755 --- a/confluent_client/bin/nodeconsole +++ b/confluent_client/bin/nodeconsole @@ -29,6 +29,7 @@ import confluent.client as client import confluent.sortutil as sortutil import confluent.logreader as logreader import confluent.vnc as vnc +import enum import time import select import signal @@ -106,6 +107,224 @@ oldfl = None streaming = False +class ModifiedKey: + def __init__(self, key, modifierstate=None): + self.key = key + self.modifierstate = modifierstate + +class SpecialKeys(enum.Enum): + F1 = 0xffbe + F2 = 0xffbf + F3 = 0xffc0 + F4 = 0xffc1 + F5 = 0xffc2 + F6 = 0xffc3 + F7 = 0xffc4 + F8 = 0xffc5 + F9 = 0xffc6 + F10 = 0xffc7 + F11 = 0xffc8 + F12 = 0xffc9 + UP = 0xff52 + DOWN = 0xff54 + LEFT = 0xff51 + RIGHT = 0xff53 + ALT = 0xffe9 + CTRL = 0xffe4 + SHIFT = 0xffe1 + BACKSPACE = 0xff08 + TAB = 0xff09 + ENTER = 0xff0d + SYSRQ = 0xff15 + DELETE = 0xffff + INSERT = 0xff63 + PAGE_UP = 0xff55 + PAGE_DOWN = 0xff56 + HOME = 0xff50 + END = 0xff57 + ESC = 0xff1b + + +async def watch_input(): + handler = await InputHandler.create() + while True: + await asyncio.sleep(1) + +class InputHandler: + + ss3keys = { + 'P': SpecialKeys.F1, + 'Q': SpecialKeys.F2, + 'R': SpecialKeys.F3, + 'S': SpecialKeys.F4 + } + + csikeys = { + 'H': SpecialKeys.HOME, + 'F': SpecialKeys.END, + '2~': SpecialKeys.INSERT, + '3~': SpecialKeys.DELETE, + '5~': SpecialKeys.PAGE_UP, + '6~': SpecialKeys.PAGE_DOWN, + '11~': SpecialKeys.F1, + '12~': SpecialKeys.F2, + '13~': SpecialKeys.F3, + '14~': SpecialKeys.F4, + '15~': SpecialKeys.F5, + '17~': SpecialKeys.F6, + '18~': SpecialKeys.F7, + '19~': SpecialKeys.F8, + '20~': SpecialKeys.F9, + '21~': SpecialKeys.F10, + '23~': SpecialKeys.F11, + '24~': SpecialKeys.F12, + '[A': SpecialKeys.UP, + '[B': SpecialKeys.DOWN, + '[C': SpecialKeys.RIGHT, + '[D': SpecialKeys.LEFT + } + + @classmethod + async def create(cls): + self = cls() + self.buffer = '' + self.inputcontext = None + currloop = asyncio.get_running_loop() + self.fd = sys.stdin.fileno() + self.seqtimeout = None + currloop.add_reader(self.fd, self.handle_input) + return self + + def handle_input(self): + data = os.read(self.fd, 1) + if not data: + return + asyncio.create_task(self.process_input(data)) + + async def timeout_sequence(self, timeout=3, flushbuffer=False): + await asyncio.sleep(timeout) + if flushbuffer: + await relay_keypresses(self.buffer) + self.reset_input_context() + + def reset_input_context(self): + self.inputcontext = None + self.buffer = '' + global focus_pending + if focus_pending: + focus_pending = False + redraw() + + async def process_input(self, data): + if self.inputcontext is None: # no escape or command sequence + if data == b'\x05': # Ctrl-E + if self.seqtimeout: + self.seqtimeout.cancel() + self.seqtimeout = asyncio.create_task(self.timeout_sequence(flushbuffer=True)) + self.inputcontext = 'command_sequence' + self.buffer = '\x05' + return + elif data == b'\x1b': # ESC + if self.seqtimeout: + self.seqtimeout.cancel() + self.seqtimeout = asyncio.create_task(self.timeout_sequence(0.2, flushbuffer=True)) + self.inputcontext = 'escape_sequence' + self.buffer = '\x1b' + return + if data[0] == 0x7f: # Backspace + await relay_keypresses(SpecialKeys.BACKSPACE) + elif data[0] == 0x09: # Tab + await relay_keypresses(SpecialKeys.TAB) + elif data[0] == 0x0d: # Enter + await relay_keypresses(SpecialKeys.ENTER) + elif data[0] == 0: + await relay_keypresses(' ', modifiers=[SpecialKeys.CTRL]) # Ctrl-Space) + elif data[0] < 0x20: # other control characters, this is ambiguous, but assume letters + await relay_keypresses(chr(data[0] + 0x60), modifiers=[SpecialKeys.CTRL]) + else: + await relay_keypresses(data.decode('utf-8', errors='ignore')) + return + elif self.inputcontext == 'escape_sequence': + await self.handle_esc_sequence(data) + elif self.inputcontext == 'command_sequence': + await self.handle_command_sequence(data) + + valid_commands = ('\x05cb', '\x05c.', '\x05c?', '\x05cfa', '\x05c\x1b[A', '\x05c\x1b[B', '\x05c\x1b[C', '\x05c\x1b[D') + + def starts_valid_command(self): + for cmd in self.valid_commands: + if cmd.startswith(self.buffer): + return True + return False + + async def handle_command_sequence(self, data): + if self.seqtimeout: + self.seqtimeout.cancel() + self.buffer += data.decode('utf-8', errors='ignore') + if len(self.buffer) < 2: + raise Exception('Command sequence buffer should have at least 2 characters') + if not self.buffer.startswith('\x05c'): # not a command + await relay_keypresses('e', modifiers=[SpecialKeys.CTRL]) + await relay_keypresses(self.buffer[1:]) + self.reset_input_context() + if '\x05cb' == self.buffer: # send break + # but we need to know more, so wait for the next key + self.seqtimeout = asyncio.create_task(self.timeout_sequence()) + elif len(self.buffer) == 4 and self.buffer.startswith('\x05cb'): # Ctrl-E, then c, then b + await relay_keypresses(self.buffer[3:], modifiers=[SpecialKeys.ALT, SpecialKeys.SYSRQ]) + self.reset_input_context() + elif self.buffer == '\x05c.': # Ctrl-E, then . + sys.exit(0) + elif self.buffer == '\x05cfa': # toggle focus ... + toggle_focus_all() + elif self.buffer == '\x05c?': # Ctrl-E, then ? + sys.stdout.write('Command sequences:\n') + sys.stdout.write('Ctrl-E, then c: Send SysRq\n') + sys.stdout.write('Ctrl-E, then .: Exit console\n') + sys.stdout.write('Ctrl-E, then ?: This help message\n') + self.reset_input_context() + elif self.buffer in ('\x05c\x1b[A', '\x05c\x1b[B', '\x05c\x1b[C', '\x05c\x1b[D'): # Ctrl-E, then cursor keys + arrowkey_map = {'A': 'Up', 'B': 'Down', 'C': 'Right', 'D': 'Left'} + arrowkey = arrowkey_map.get(self.buffer[-1], self.buffer[-1]) + global focus_pending + focus_pending = True + move_focus(arrowkey) + self.buffer = '\x05c' + self.seqtimeout = asyncio.create_task(self.timeout_sequence()) + elif self.starts_valid_command(): + self.seqtimeout = asyncio.create_task(self.timeout_sequence()) + elif len(self.buffer) > 2: + # unknown command sequence, abort + self.reset_input_context() + + async def handle_esc_sequence(self, data): + if self.seqtimeout: + self.seqtimeout.cancel() + self.buffer += data.decode('utf-8', errors='ignore') + if len(self.buffer) >= 2 and self.buffer.startswith('\x1b['): #CSI + if self.buffer[1:] in self.csikeys: + await relay_keypresses(self.csikeys[self.buffer[1:]]) + self.reset_input_context() + return + for cand in self.csikeys: + if cand.startswith(self.buffer[1:]): + # continue to wait for more input to disambiguate + self.seqtimeout = asyncio.create_task(self.timeout_sequence(0.2)) + break + else: + self.reset_input_context() + elif len(self.buffer) >= 2 and self.buffer.startswith('\x1bO'): #SS3 + if len(self.buffer) == 3: + if self.buffer[2:] in self.ss3keys: + await relay_keypresses(self.ss3keys[self.buffer[2:]]) + self.reset_input_context() + return + elif len(self.buffer) >= 2: # ESC-key is a way to do alt + await relay_keypresses(self.buffer[1:], modifiers=[SpecialKeys.ALT]) + self.reset_input_context() + + +vncclientsbynode = {} def get_coords(): sys.stdout.write('\x1b[6n') # sys.stdout.flush() @@ -404,7 +623,10 @@ def prep_node_tile(node): if currrowcell: cursor_down(currrowcell) if node in focused_nodes: - sys.stdout.write('\x1b[44m') + if focus_pending: + sys.stdout.write('\x1b[43m') + else: + sys.stdout.write('\x1b[44m') sys.stdout.write('▏' + node) if node in focused_nodes: sys.stdout.write('\x1b[0m') @@ -451,6 +673,8 @@ async def do_screenshot(): global resized global numrows sess = client.Command() + if streaming: + asyncio.create_task(watch_input()) if options.tile: imageformat = os.environ.get('CONFLUENT_IMAGE_PROTOCOL', 'kitty') if imageformat not in ('kitty', 'iterm'): @@ -478,6 +702,8 @@ async def do_screenshot(): currrowcell = 0 for node in allnodes: nodepositions[node] = currcolcell, currrowcell + if streaming: + init_focus(node) if currcol < cols: currcol += 1 currcolcell += cwidth @@ -563,20 +789,35 @@ async def grab_vncs(urlbynode): conserversequence = '\05c' -async def relay_keypress(key): - # Implement the logic for relaying the keypress asynchronously - pass +async def relay_keypresses(keys, modifiers=None): + if focus_pending: + return + if hasattr(keys, 'value'): + keys = [keys] + if isinstance(keys, str): + keys = list(bytearray(keys.encode('utf-8'))) + shifted = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ~!@#$%^&*()_+{}|:"<>?' + for i in range(len(keys)): + if not isinstance(keys[i], int): + continue + if chr(keys[i]) in shifted: + if modifiers: + modifiers.append(SpecialKeys.SHIFT) + else: + modifiers = [SpecialKeys.SHIFT] + if keys[0] == 0x1b: + keys[0] = SpecialKeys.ESC + for node in focused_nodes: + vnclient = vncclientsbynode.get(node) + if not vnclient: + sys.stderr.write(f"No VNC client for node {node}, unable to relay keypress\n") + continue + try: + await vnclient.send_keypresses(keys, modifiers) + except Exception as e: + sys.stderr.write(f"Error relaying keypress to VNC client: {e}\n") + continue -def get_command_sequence(nbytes, timeout=3): - seq = '' - fd = sys.stdin.fileno() - start_time = time.time() - deadline = start_time + timeout - while len(seq) < nbytes and (time.time() - start_time) < timeout: - if sys.stdin in select.select([sys.stdin], [], [], deadline - time.time())[0]: - chunk = os.read(fd, 1).decode('utf-8', errors='ignore') - seq += chunk - return seq single_focus_node = None focused_nodes = {} @@ -585,10 +826,30 @@ def init_focus(node): if not focused_nodes: focused_nodes[node] = True single_focus_node = node - +focus_pending = False def move_focus(direction): + global single_focus_node # Implement the logic to move focus in the specified direction - sys.stderr.write("Move focus: {}\n".format(direction)) + currcolcell, currrowcell = nodepositions[single_focus_node] + if direction == 'Up': + target = (currcolcell, currrowcell - cheight) + elif direction == 'Down': + target = (currcolcell, currrowcell + cheight) + elif direction == 'Left': + target = (currcolcell - cwidth, currrowcell) + elif direction == 'Right': + target = (currcolcell + cwidth, currrowcell) + else: + sys.stderr.write("Invalid direction for focus movement: {}\n".format(direction)) + return + for node, pos in nodepositions.items(): + if pos == target: + focused_nodes.clear() + focused_nodes[node] = True + single_focus_node = node + redraw() + return + def toggle_focus_all(): # Implement the logic to toggle focus all if len(focused_nodes) < 2: @@ -600,55 +861,6 @@ def toggle_focus_all(): continue del focused_nodes[node] -async def check_keypress(node): - if sys.stdin in select.select([sys.stdin], [], [], 0)[0]: - key = os.read(sys.stdin.fileno(), 1).decode('utf-8', errors='ignore') - if key == '\x05': # Ctrl-E - return await handle_command_sequence() - await relay_keypress(key) - -async def handle_command_sequence(): - - seq = get_command_sequence(1) - if seq != 'c': - # Not a conserver command sequence, relay the initial key and any additional keys - await relay_keypress('\x05') - for k in seq: - await relay_keypress(k) - return - cmd = get_command_sequence(1) - if cmd == '?': # Ctrl-E, then c, then ? - sys.stdout.write('Command sequences:\n') - sys.stdout.write('Ctrl-E, then c, then .: Exit console\n') - sys.stdout.write('Ctrl-E, then c, then fa: Toggle focus all\n') - sys.stdout.write('Ctrl-E, then c, then arrow keys: Move focus\n') - sys.stdout.write('Ctrl-E, then c, then ?: Show this help message\n') - sys.stdout.flush() - return True - if cmd == '.': # Ctrl-E, then c, then . - indirect_console() - sys.exit(0) - if cmd == 'f': - subcmd = get_command_sequence(1) - if subcmd == 'a': # Ctrl-E, then c, then fa - # Implement focus all toggle logic here - toggle_focus_all() - return - if cmd == '\x1b': # Ctrl-E, then c, then f, then ESC - while cmd == '\x1b': - cursor_cmd = get_command_sequence(2, 0.2) - if cursor_cmd == '[A': # Up arrow - move_focus('up') - elif cursor_cmd == '[B': # Down arrow - move_focus('down') - elif cursor_cmd == '[C': # Right arrow - move_focus('right') - elif cursor_cmd == '[D': # Left arrow - move_focus('left') - else: - break - cmd = get_command_sequence(1) - return async def do_vnc(node, url): global streaming @@ -657,6 +869,7 @@ async def do_vnc(node, url): while keeprunning: try: async with await vnc.VNCClient.create(url) as client: + vncclientsbynode[node] = client while True: # Retrieve pixels as a 3D numpy array try: @@ -680,9 +893,6 @@ async def do_vnc(node, url): imgdata = outfile.getbuffer() if imgdata: draw_node(node, imgdata, '', '', cwidth, cheight) - if streaming: - init_focus(node) - await check_keypress(node) else: keeprunning = False break diff --git a/confluent_client/confluent/vnc.py b/confluent_client/confluent/vnc.py index 2ac68deb..b1ee7e47 100644 --- a/confluent_client/confluent/vnc.py +++ b/confluent_client/confluent/vnc.py @@ -71,6 +71,30 @@ class VNCClient: await self._do_vnc_handshake() return self + async def send_keypresses(self, keys, modifierkeys=None): + payload = ByteStream() + for modkey in (modifierkeys or []): + payload.add_number(4, 1) # Key event + payload.add_number(1, 1) # Down + payload.add_number(0, 2) # Padding + payload.add_number(modkey.value, 4) + for key in keys: + keynumber = key.value if hasattr(key, 'value') else key + payload.add_number(4, 1) # Key event + payload.add_number(1, 1) # Down + payload.add_number(0, 2) # Padding + payload.add_number(keynumber, 4) + payload.add_number(4, 1) # Key event + payload.add_number(0, 1) # Up + payload.add_number(0, 2) # Padding + payload.add_number(keynumber, 4) + for modkey in (modifierkeys or []): + payload.add_number(4, 1) # Key event + payload.add_number(0, 1) # Up + payload.add_number(0, 2) # Padding + payload.add_number(modkey.value, 4) + payload.flush(self.writer) + async def _read_number(self, num_bytes): data = await self.reader.readexactly(num_bytes) return int.from_bytes(data, byteorder='big', signed=True)