#!/usr/libexec/platform-python # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2015 Lenovo # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import asyncio import base64 import optparse import os import shlex import subprocess import sys path = os.path.dirname(os.path.realpath(__file__)) path = os.path.realpath(os.path.join(path, '..', 'lib', 'python')) if path.startswith('/opt'): sys.path.append(path) import confluent.asynclient as client import confluent.client as synclient import confluent.sortutil as sortutil import confluent.logreader as logreader import confluent.vnc as vnc import enum import time import select import signal import socket import re import tty import termios import fcntl import confluent.screensqueeze as sq try: from PIL import Image, ImageDraw, ImageFont except ImportError: Image = None try: # sixel is optional, attempt to import but stub out if unavailable import io import sixel class DumbWriter(sixel.SixelWriter): def restore_position(self, output): return except ImportError: class DumbWriter(): def draw(self, imgfile): sys.stderr.write("PySixel not detected, Sixel format display not supported\n") confettypath = os.path.join(os.path.dirname(sys.argv[0]), 'confetty') argparser = optparse.OptionParser( usage="Usage: %prog [options] [kill][-- [passthroughoptions]]", epilog="Command sequences are available while connected to a console, hit " "ctrl-'e', then release ctrl, then 'c', then '?' for a full list. " "For example, ctrl-'e', then 'c', then '.' will exit the current " "console") argparser.add_option('-a', '--automation', type='string', default=None, help='Specify an automation script') argparser.add_option('-t', '--tile', action='store_true', default=False, help='Tile console windows in the terminal') argparser.add_option('-e', '--headless', action='store_true', default=False, help='Run in headless mode, which is designed for use with ' 'automation scripts and disables interactive features') argparser.add_option('-l', '--log', action='store_true', default=False, help='Enter log replay mode instead of showing a live console') argparser.add_option('-T', '--Timestamp', action='store_true', default=False, help= 'Dump log in stdout with timestamps') argparser.add_option('-s', '--screenshot', action='store_true', default=False, help='Attempt to grab screenshot and render using kitty image protocol') argparser.add_option('-i', '--interval', type='float', help='Interval in seconds to redraw the screenshot. Currently only ' 'works for one node') argparser.add_option('-v', '--video', action='store_true', default=False, help='Attempt to continuously stream video from nodes that support streaming console via confluent') argparser.add_option('-w','--windowed', action='store_true', default=False, help='Open terminal windows for each node. The ' 'environment variable NODECONSOLE_WINDOWED_COMMAND ' 'should be set, which should be a text string corresponding ' 'to a command that can be used to open a windowed console,' ' omitting the "nodeconsole " part of the ' 'command, for example, to open a set of consoles for a ' 'range of nodes in separate xterm windows, set ' 'NODECONSOLE_WINDOWED_COMMAND to "xterm -e". To open a ' 'set of consoles for a range of nodes in separate ' 'GNOME Terminal windows with a size of 100 columns and ' '31 rows, set NODECONSOLE_WINDOWED_COMMAND ' 'to "gnome-terminal --geometry 100x31 --" or in a WSL ' 'environment, to open a set of consoles for a range of ' 'nodes in separate Windows Terminal windows, with the ' 'title set for each node, set NODECONSOLE_WINDOWED_COMMAND' ' to "wt.exe wsl.exe -d AlmaLinux-8 ' '--shell-type login". If the NODECONSOLE_WINDOWED_COMMAND ' 'environment variable isn\'t set, xterm will be used by' 'default.') (options, args) = argparser.parse_args() automation_args = [] if options.automation: automation_args = ['-a', options.automation] if options.headless: automation_args += ['--headless'] oldtcattr = None oldfl = None streaming = False class ModifiedKey: def __init__(self, key, modifierstate=None): self.key = key self.modifierstate = modifierstate class SpecialKeys(enum.Enum): F1 = 0xffbe F2 = 0xffbf F3 = 0xffc0 F4 = 0xffc1 F5 = 0xffc2 F6 = 0xffc3 F7 = 0xffc4 F8 = 0xffc5 F9 = 0xffc6 F10 = 0xffc7 F11 = 0xffc8 F12 = 0xffc9 UP = 0xff52 DOWN = 0xff54 LEFT = 0xff51 RIGHT = 0xff53 ALT = 0xffe9 CTRL = 0xffe4 SHIFT = 0xffe1 BACKSPACE = 0xff08 TAB = 0xff09 ENTER = 0xff0d SYSRQ = 0xff15 DELETE = 0xffff INSERT = 0xff63 PAGE_UP = 0xff55 PAGE_DOWN = 0xff56 HOME = 0xff50 END = 0xff57 ESC = 0xff1b extratextbynode = {} focustext = '' async def do_power_action(action, setboot=None): targnodes = list(focused_nodes) if not targnodes: return for node in targnodes: extratextbynode[node] = 'Power action: ' redraw() try: paclient = client.Command() if setboot: noderange = ','.join(targnodes) async for res in paclient.update(f'/noderange/{noderange}/boot/nextdevice', {'nextdevice': setboot, 'mode': 'uefi'}): if 'error' in res: for node in targnodes: extratextbynode[node] += 'Failed to set boot device: {0}'.format(res['error']) return for node in res.get('databynode', {}): extratextbynode[node] += 'set boot device to: ' + res['databynode'][node].get('nextdevice', {}).get('value', 'Unknown') + ', ' noderange = ','.join(targnodes) async for res in paclient.update(f'/noderange/{noderange}/power/state', {'state': action}): if 'error' in res: for node in targnodes: extratextbynode[node] += 'Failed to power ' + action + ': {0}'.format(res['error']) return for node in res.get('databynode', {}): extratextbynode[node] += res['databynode'][node].get('state', {}).get('value', 'Unknown') finally: redraw() await asyncio.sleep(5) for node in targnodes: extratextbynode[node] = '' redraw() def set_extra_text(text): global focustext focustext = text redraw() async def watch_input(): global focustext focustext = 'Hit Ctrl-E for command mode' handler = await InputHandler.create() while True: await asyncio.sleep(5) if focustext == 'Hit Ctrl-E for command mode': focustext = '' redraw() class InputHandler: ss3keys = { 'P': SpecialKeys.F1, 'Q': SpecialKeys.F2, 'R': SpecialKeys.F3, 'S': SpecialKeys.F4, 'M': SpecialKeys.ENTER, } csikeys = { '[H': SpecialKeys.HOME, '[F': SpecialKeys.END, '[1;3': False, # Stub to detect alt-arrow '[2~': SpecialKeys.INSERT, '[3~': SpecialKeys.DELETE, '[5~': SpecialKeys.PAGE_UP, '[6~': SpecialKeys.PAGE_DOWN, '[11~': SpecialKeys.F1, '[12~': SpecialKeys.F2, '[13~': SpecialKeys.F3, '[14~': SpecialKeys.F4, '[15~': SpecialKeys.F5, '[17~': SpecialKeys.F6, '[18~': SpecialKeys.F7, '[19~': SpecialKeys.F8, '[20~': SpecialKeys.F9, '[21~': SpecialKeys.F10, '[23~': SpecialKeys.F11, '[24~': SpecialKeys.F12, '[A': SpecialKeys.UP, '[B': SpecialKeys.DOWN, '[C': SpecialKeys.RIGHT, '[D': SpecialKeys.LEFT } @classmethod async def create(cls): self = cls() self.buffer = '' self.modkeys = None self.inputcontext = None currloop = asyncio.get_running_loop() self.fd = sys.stdin.fileno() self.seqtimeout = None currloop.add_reader(self.fd, self.handle_input) return self def handle_input(self): data = os.read(self.fd, 1) if not data: return asyncio.create_task(self.process_input(data)) async def timeout_sequence(self, timeout=3, flushbuffer=False): await asyncio.sleep(timeout) if flushbuffer: if self.buffer == '\x05': await relay_keypresses('e', modifiers=[SpecialKeys.CTRL]) elif self.buffer == '\x1bO': await relay_keypresses('O', modifiers=[SpecialKeys.ALT, SpecialKeys.SHIFT]) else: await relay_keypresses(self.buffer) self.reset_input_context() def reset_input_context(self, escmode=False): self.inputcontext = None self.buffer = '' self.modkeys = None if not escmode: set_extra_text('') global focus_pending if focus_pending: focus_pending = False if focus_pending or escmode: redraw() async def process_input(self, data): if self.inputcontext is None: # no escape or command sequence if data == b'\x05': # Ctrl-E set_extra_text('Ctrl-E pressed, release Ctrl and hit "c" to enter command mode, Ctrl-E again to send Ctrl-E') # clear any previous command sequence text if self.seqtimeout: self.seqtimeout.cancel() self.seqtimeout = asyncio.create_task(self.timeout_sequence(flushbuffer=True)) self.inputcontext = 'command_sequence' self.buffer = '\x05' return elif data == b'\x1b': # ESC if self.seqtimeout: self.seqtimeout.cancel() self.seqtimeout = asyncio.create_task(self.timeout_sequence(0.2, flushbuffer=True)) self.inputcontext = 'escape_sequence' self.buffer = '\x1b' return if data[0] == 0x7f: # Backspace await relay_keypresses(SpecialKeys.BACKSPACE) elif data[0] == 0x09: # Tab await relay_keypresses(SpecialKeys.TAB) elif data[0] == 0x0d: # Enter await relay_keypresses(SpecialKeys.ENTER) elif data[0] == 0: await relay_keypresses(' ', modifiers=[SpecialKeys.CTRL]) # Ctrl-Space) elif data[0] < 0x20: # other control characters, this is ambiguous, but assume letters await relay_keypresses(chr(data[0] + 0x60), modifiers=[SpecialKeys.CTRL]) else: await relay_keypresses(data.decode('utf-8', errors='ignore')) return elif self.inputcontext == 'escape_sequence': await self.handle_esc_sequence(data) elif self.inputcontext == 'command_sequence': await self.handle_command_sequence(data) valid_commands = ('\x05cb', # send sysrq '\x05cpo', # power off system '\x05cps', # shutdown system gracefully '\x05cpb\r', # reboot system '\x05cpbs', # boot system to setup '\x05cpbn', # boot system to network '\x05c.', # exit console '\x05cq', # exit console alias '\x05c?', # help '\x05cfa', # toggle focus all '\x05c\x1b[A', '\x05c\x1b[B', '\x05c\x1b[C', '\x05c\x1b[D', # move focus with arrow keys '\x05cf\x1b[A', '\x05cf\x1b[B', '\x05cf\x1b[C', '\x05cf\x1b[D', # move focus with arrow keys ) def starts_valid_command(self): for cmd in self.valid_commands: if cmd.startswith(self.buffer): return True return False async def handle_command_sequence(self, data): if self.seqtimeout: self.seqtimeout.cancel() self.buffer += data.decode('utf-8', errors='ignore') if '\x1b' == self.buffer[-1:]: # user hit escape, or arrow key.. # give it a short timeout for arrows.. self.seqtimeout = asyncio.create_task(self.timeout_sequence(0.2)) return if '\x03' == self.buffer[-1:]: # Ctrl-C, abort command sequence immediately self.reset_input_context() return if self.buffer == '\x05\x05': # double ctrl-e, send a single ctrl-e to the console await relay_keypresses('e', modifiers=[SpecialKeys.CTRL]) self.reset_input_context() return if len(self.buffer) < 2: raise Exception('Command sequence buffer should have at least 2 characters') if not self.buffer.startswith('\x05c'): # not a command await relay_keypresses('e', modifiers=[SpecialKeys.CTRL]) await relay_keypresses(self.buffer[1:]) self.reset_input_context() if self.buffer == '\x05c': set_extra_text('Commands: q:exit, p:power, f:input focus, b:sysrq, ^c:abort') self.seqtimeout = asyncio.create_task(self.timeout_sequence(10)) # extend timeout to navigate options return if '\x05cb' == self.buffer: # send break # but we need to know more, so wait for the next key set_extra_text('Enter sysrq command key (or to abort)') self.seqtimeout = asyncio.create_task(self.timeout_sequence()) elif len(self.buffer) == 4 and self.buffer.startswith('\x05cb'): # Ctrl-E, then c, then b await relay_keypresses(self.buffer[3:], modifiers=[SpecialKeys.ALT, SpecialKeys.SYSRQ]) self.reset_input_context() elif self.buffer == '\x05c.' or self.buffer == '\x05cq': # Ctrl-E, then . or q asyncio.get_running_loop().call_soon(sys.exit, 0) return elif self.buffer == '\x05cf': # Ctrl-E, then c, then f set_extra_text('Focus commands: ⇅⇄:move focus, a:toggle focus all') self.seqtimeout = asyncio.create_task(self.timeout_sequence(10)) elif self.buffer == '\x05cfa': # toggle focus ... toggle_focus_all() self.reset_input_context() elif self.buffer == '\x05cp': set_extra_text('Power commands: o:power off, s:shutdown, b:boot') self.seqtimeout = asyncio.create_task(self.timeout_sequence(10)) elif self.buffer == '\x05cpb': set_extra_text('Boot options: : normal boot, s:boot to setup, n:boot to network, ^c to abort') self.seqtimeout = asyncio.create_task(self.timeout_sequence(10)) elif self.buffer == '\x05cpo': # Ctrl-E, then power off await do_power_action('off') return self.reset_input_context() elif self.buffer == '\x05cps': # Ctrl-E, then shutdown await do_power_action('shutdown') return self.reset_input_context() elif self.buffer == '\x05cpb\r': # Ctrl-E, then reboot await do_power_action('boot') return self.reset_input_context() elif self.buffer == '\x05cpbs': # Ctrl-E, then boot to setup await do_power_action('boot', 'setup') return self.reset_input_context() elif self.buffer == '\x05cpbn': # Ctrl-E, then boot to network await do_power_action('boot', 'network') return self.reset_input_context() elif self.buffer == '\x05c?': # Ctrl-E, then c? set_extra_text('q:exit,⇅⇄:focus,po:power off,ps:shutdown,pb:reboot,pbs:boot setup,pbn:boot network,fa:(un)focus all,bX:sysrq, then X') self.buffer = '\x05c' # go back to let the user enter stuff based on text self.seqtimeout = asyncio.create_task(self.timeout_sequence(10)) return elif self.buffer[-3:] in ('\x1b[A', '\x1b[B', '\x1b[C', '\x1b[D'): # Ctrl-E, then cursor keys arrowkey_map = {'A': 'Up', 'B': 'Down', 'C': 'Right', 'D': 'Left'} arrowkey = arrowkey_map.get(self.buffer[-1], self.buffer[-1]) global focus_pending focus_pending = True move_focus(arrowkey) self.buffer = '\x05c' self.seqtimeout = asyncio.create_task(self.timeout_sequence()) elif self.starts_valid_command(): self.seqtimeout = asyncio.create_task(self.timeout_sequence()) elif len(self.buffer) > 2: # unknown command sequence, abort self.reset_input_context() async def handle_esc_sequence(self, data): if self.seqtimeout: self.seqtimeout.cancel() self.buffer += data.decode('utf-8', errors='ignore') # Shift-enter if len(self.buffer) >= 2 and self.buffer.startswith('\x1b['): #CSI if self.buffer.endswith(';3~'): self.buffer = self.buffer.replace(';3~', '~') self.modkeys = [SpecialKeys.ALT] if '1;3' in self.buffer: self.buffer = self.buffer.replace('1;3', '') self.modkeys = [SpecialKeys.ALT] if self.buffer[1:] in self.csikeys: await relay_keypresses(self.csikeys[self.buffer[1:]], modifiers=self.modkeys) self.reset_input_context(escmode=True) return for cand in self.csikeys: if cand.startswith(self.buffer[1:]): # continue to wait for more input to disambiguate self.seqtimeout = asyncio.create_task(self.timeout_sequence(0.2)) break else: self.reset_input_context(escmode=True) elif len(self.buffer) >= 2 and self.buffer.startswith('\x1bO'): #SS3 if len(self.buffer) == 3: if self.buffer[2:] in self.ss3keys: await relay_keypresses(self.ss3keys[self.buffer[2:]]) self.reset_input_context(escmode=True) return if len(self.buffer) == 2: self.seqtimeout = asyncio.create_task(self.timeout_sequence(0.2)) return elif len(self.buffer) >= 2: # ESC-key is a way to do alt await relay_keypresses(self.buffer[1:], modifiers=[SpecialKeys.ALT]) self.reset_input_context(escmode=True) vncclientsbynode = {} def get_coords(): sys.stdout.write('\x1b[6n') # sys.stdout.flush() gotreply = select.select([sys.stdin,], [], [], 0.250)[0] if gotreply: response = '' while select.select([sys.stdin,], [], [], 0.1)[0] and 'R' not in response: response += sys.stdin.read() coords = response.replace('R', '').split('[')[1].split(';') #sys.stdout.write('\x1b[{}:{}H'.format(*coords)) console_direct_mode = False def direct_console(): global console_direct_mode global oldtcattr global oldfl if console_direct_mode: return False console_direct_mode = True oldtcattr = termios.tcgetattr(sys.stdin.fileno()) oldfl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL) tty.setraw(sys.stdin.fileno()) #fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, oldfl | os.O_NONBLOCK) return True def indirect_console(): global console_direct_mode global oldtcattr global oldfl if not console_direct_mode: return False console_direct_mode = False #fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, oldfl & ~os.O_NONBLOCK) termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, oldtcattr) return True def determine_tile_size(numnodes): # for now, smash everything to a common aspect ratio. 16:11 # is pretty much wrong for everything, making 4:3 a bit too wide # and 16:9 significantly too narrow, but it is serviceable # An improvement could come with us owning the scaling # instead of delegating to Kitty, which says if we specify both, # we get stretching. In theory we should be able to get aspect correct # from kitty by omitting, but: # then we don't know how much to move the cursor left after draw_image # Konsole won't scale at all with only partial scaling specified directed = direct_console() cheight, cwidth, pixwidth, pixheight = sq.get_screengeom(escfallback=True) if directed: indirect_console() # 16:12 is to roughly account for the 'titles' of the tiles ratio = (pixwidth / 16) / (pixheight / 12) bestdeviation = None bestdims = [] for i in range(1, numnodes + 1): number = numnodes while number % i != 0: number += 1 columns = i rows = number // i deviation = abs(ratio - (columns / rows)) if bestdeviation is None: bestdeviation = deviation bestdims = [columns, rows] elif deviation < bestdeviation: bestdeviation = deviation bestdims = [columns, rows] # ok, the above algorithm can still pick things like # 1 2 3 # 4 # So we will let it pick the number of rows, and # then see if we can chop columns and still fit while (bestdims[0] - 1) * bestdims[1] >= numnodes: bestdims[0] = bestdims[0] - 1 cellswide = cwidth // bestdims[0] cellshigh = cheight // bestdims[1] tilewidth = cellswide * pixwidth / cwidth tileheight = cellshigh * pixheight / cheight if tilewidth > (tileheight * 16 / 11): tilewidth = tileheight * 16 / 11 cellswide = int(tilewidth // (pixwidth / cwidth)) if tileheight > (tilewidth * 11 /16): tileheight = tilewidth * 11 / 16 cellshigh = int(tileheight // (pixheight / cheight)) bestdims = bestdims + [cellswide, cellshigh, cellshigh * bestdims[1]] # incur any scrolling we might get. This allows us to accurately # save/restore cursor or even get coordinates without scrolling fouling # the desired target sys.stdout.write('\n' * bestdims[4]) sys.stdout.flush() cursor_up(bestdims[4]) return bestdims cursor_saved = False def sticky_cursor(): global cursor_saved # get cursor restore_position directed = False if sys.stdin.isatty() and not cursor_saved: try: directed = direct_console() sys.stdout.write('\x1b7') cursor_saved = True finally: if directed: indirect_console() elif cursor_saved: try: directed = direct_console() sys.stdout.write('\x1b8') finally: if directed: indirect_console() def cursor_up(count=1): sys.stdout.write(f'\x1b[{count}A') def cursor_down(count=1): sys.stdout.write(f'\x1b[{count}B') def cursor_right(count=1): sys.stdout.write(f'\x1b[{count}C') def cursor_left(count=1): sys.stdout.write(f'\x1b[{count}D') def cursor_save(): sys.stdout.write('\x1b7') def cursor_restore(): sys.stdout.write('\x1b8') def cursor_hide(): sys.stdout.write('\x1b[?25l') def cursor_show(): sys.stdout.write('\x1b[?25h') def get_pix_dimensions(width, height): directed = direct_console() cheight, cwidth, pixwidth, pixheight = sq.get_screengeom(escfallback=True) if directed: indirect_console() imgwidth = int(pixwidth / cwidth * width) imgheight = int(pixheight / cheight * height) return imgwidth, imgheight def draw_text(text, width, height): if Image: maxfntsize = 256 imgwidth, imgheight = get_pix_dimensions(width, height) nerr = Image.new(mode='RGB', size=(imgwidth, imgheight), color='green') nd = ImageDraw.Draw(nerr) for txtpiece in text.split('\n'): fntsize = 8 scalefont = True try: txtfont = ImageFont.truetype('DejaVuSans.ttf', size=fntsize) except OSError: scalefont = False txtfont = ImageFont.load_default(fntsize) while scalefont and nd.textlength(txtpiece, font=txtfont) < int(imgwidth * 0.90): fntsize += 1 try: txtfont = ImageFont.truetype('DejaVuSans.ttf', size=fntsize) except OSError: txtfont = ImageFont.load_default(fntsize) fntsize -= 1 if fntsize < maxfntsize: maxfntsize = fntsize hmargin = int(imgwidth * 0.05) vmargin = int(imgheight * 0.10) nd.text((hmargin, vmargin), text, font=txtfont) nd.rectangle((0, 0, nerr.width - 1, nerr.height -1), outline='white') outfile = io.BytesIO() nerr.save(outfile, format='PNG') draw_image(outfile.getvalue(), width, height, doscale=False) else: sys.stdout.write(text) cursor_left(len(text)) def draw_image(bindata, width, height, doscale=True): imageformat = os.environ.get('CONFLUENT_IMAGE_PROTOCOL', 'kitty') if doscale and Image and width: binfile = io.BytesIO() binfile.write(bindata) binfile.seek(0) try: img = Image.open(binfile) except Exception as e: errstr = 'Error rendering image:\n' + str(e) return draw_text(errstr, width, height) imgwidth, imgheight = get_pix_dimensions(width, height) nimg = Image.new(mode='RGBA', size=(imgwidth, imgheight)) imgwidth -= 4 imgheight -= 4 hscalefact = imgwidth / img.width vscalefact = imgheight / img.height if hscalefact < vscalefact: rzwidth = imgwidth rzheight = int(img.height * hscalefact) else: rzwidth = int(img.width * vscalefact) rzheight = imgheight img = img.resize((rzwidth, rzheight)) nd = ImageDraw.Draw(nimg) nd.rectangle((1, 1, rzwidth + 2, rzheight + 2), outline='black') nd.rectangle((0, 0, rzwidth + 3, rzheight + 3), outline='white') nimg.paste(img, box=(2, 2)) outfile = io.BytesIO() nimg.save(outfile, format='PNG') bindata = outfile.getvalue() if imageformat == 'sixel': sixel_draw(bindata) elif imageformat == 'iterm': iterm_draw(bindata, width, height) else: kitty_draw(bindata, width, height) def sixel_draw(bindata): binfile = io.BytesIO() binfile.write(bindata) binfile.seek(0) DumbWriter().draw(binfile) def iterm_draw(bindata, width, height): data = base64.b64encode(bindata) if not height: height = 'auto' if not width: width = 'auto' datalen = len(bindata) sys.stdout.write( '\x1b]1337;File=inline=1;width={};height={};size={}:'.format(width,height,datalen)) sys.stdout.write(data.decode('utf8')) sys.stdout.write('\a') sys.stdout.flush() def kitty_draw(bindata, width, height): data = base64.b64encode(bindata) preamble = '\x1b_Ga=T,f=100,q=2' if height: preamble += f',r={height},c={width}' #sys.stdout.write(repr(preamble)) #sys.stdout.write('\xb[{}D'.format(len(repr(preamble)))) #return first = True while data: chunk, data = data[:4096], data[4096:] m = 1 if data else 0 if first: sys.stdout.write('{},m={};'.format(preamble, m)) else: sys.stdout.write('\x1b_Gm={};'.format(m)) sys.stdout.write(chunk.decode('utf8')) sys.stdout.write('\x1b\\') sys.stdout.flush() pass_through_args = [] killcon = False try: noderange = args[0] if len(args) > 1: if args[1] == 'kill': killcon = True pass_through_args = args[1:] args = args[:1] except IndexError: argparser.print_help() sys.exit(1) if len(args) != 1: argparser.print_help() sys.exit(1) if options.log: logname = args[0] if not os.path.exists(logname) and logname[0] != '/': logname = os.path.join('/var/log/confluent/consoles', logname) if not os.path.exists(logname): sys.stderr.write('Unable to locate {0} on local system\n'.format(logname)) sys.exit(1) logreader.replay_to_console(logname) sys.exit(0) if options.Timestamp: logname = args[0] if not os.path.exists(logname) and logname[0] != '/': logname = os.path.join('/var/log/confluent/consoles', logname) if not os.path.exists(logname): sys.stderr.write('Unable to locate {0} on local system\n'.format(logname)) sys.exit(1) logreader.dump_to_console(logname) sys.exit(0) def prep_node_tile(node): currcolcell, currrowcell = nodepositions[node] if currcolcell: cursor_right(currcolcell) if currrowcell: cursor_down(currrowcell) titletext = node if extratextbynode.get(node, None): titletext += ' ' + extratextbynode[node] if node in focused_nodes: if focustext and not extratextbynode.get(node, None): titletext += ' ' + focustext if focus_pending: sys.stdout.write('\x1b[43m') else: sys.stdout.write('\x1b[44m') titlebar = f'▏{titletext:<{cwidth - 1}}' if len(titlebar) > cwidth: titlebar = titlebar[:cwidth - 1] + '…' sys.stdout.write(titlebar) if node in focused_nodes: sys.stdout.write('\x1b[0m') cursor_left(len(titlebar)) cursor_down() def reset_cursor(node): currcolcell, currrowcell = nodepositions[node] if currcolcell: cursor_left(currcolcell) cursor_up(currrowcell + 1) nodepositions = {} numrows = 0 cwidth = 0 cheight = 0 imagedatabynode = {} def redraw(): for node in imagedatabynode: imgdata = imagedatabynode[node] if node in nodepositions: prep_node_tile(node) cursor_save() else: if options.interval is not None: if node != firstnodename: sys.stderr.write('Multiple nodes not supported for interval') sys.exit(1) sticky_cursor() sys.stdout.write('{}: '.format(node)) # one row is used by our own name, so cheight - 1 for that allowance draw_image(imgdata, cwidth, cheight - 1 if cheight else cheight) if node in nodepositions: cursor_restore() reset_cursor(node) else: sys.stdout.write('\n') sys.stdout.flush() resized = False async def do_screenshot(): global streaming global resized global numrows sess = client.Command() if streaming: asyncio.create_task(watch_input()) if options.tile: imageformat = os.environ.get('CONFLUENT_IMAGE_PROTOCOL', 'kitty') if imageformat not in ('kitty', 'iterm'): sys.stderr.write('Tiled screenshots only supported with kitty or iterm protocol') sys.exit(1) allnodes = [] numnodes = 0 async for res in sess.read('/noderange/{}/nodes/'.format(args[0])): allnodes.append(res['item']['href'].replace('/', '')) numnodes += 1 resized = False def do_resize(a=None, b=None): global resized if a: resized = True # on a window resize, clear the old stuff sys.stdout.write('\x1bc') global numrows global cwidth global cheight cols, rows, cwidth, cheight, numrows = determine_tile_size(numnodes) currcol = 1 currcolcell = 0 currrowcell = 0 for node in allnodes: nodepositions[node] = currcolcell, currrowcell if streaming: init_focus(node) if currcol < cols: currcol += 1 currcolcell += cwidth else: currcol = 1 currcolcell = 0 currrowcell += cheight if a: redraw() do_resize() signal.signal(signal.SIGWINCH, do_resize) elif options.interval is not None: sys.stdout.write('\x1bc') firstnodename = None dorefresh = True vnconly = set([]) if streaming: doexit = 0 async for res in sess.read('/noderange/{}/console/ikvm_methods'.format(args[0])): for node in res.get('databynode', {}): methods = res['databynode'][node].get('ikvm_methods', []) if 'vnc' not in methods and 'openbmc' not in methods: sys.stderr.write(f'Node {node} does not support video via confluent\n') doexit = 1 vnconly.add(node) if doexit: sys.exit(1) while dorefresh: if not streaming: async for res in sess.read('/noderange/{}/console/ikvm_screenshot'.format(args[0])): for node in res.get('databynode', {}): errorstr = '' if not firstnodename: firstnodename = node error = res['databynode'][node].get('error') if error and 'vnc available' in error: vnconly.add(node) continue elif error: errorstr = error imgdata = res['databynode'][node].get('image', {}).get('imgdata', None) if imgdata: if len(imgdata) < 32: # We were subjected to error errorstr = f'Unable to get screenshot' if errorstr or imgdata: imgdata = base64.b64decode(imgdata) draw_node(node, imgdata, errorstr, firstnodename, cwidth, cheight) urlbynode = {} for node in vnconly: async for res in sess.update(f'/nodes/{node}/console/ikvm', {'method': 'unix'}): url = res.get('item', {}).get('href') if url: urlbynode[node] = url await grab_vncs(urlbynode) if resized: do_resize(True) resized = False if options.interval is None: dorefresh = False else: dorefresh = True time.sleep(options.interval) sys.exit(0) async def grab_vncs(urlbynode): global streaming tasks = [] directed = False try: if streaming: directed = direct_console() for node in urlbynode: url = urlbynode[node] tasks.append(asyncio.create_task(do_vnc(node, url))) await asyncio.gather(*tasks) except Exception as e: sys.stderr.write(f"Error in grab_vncs: {e}\n") finally: if directed: indirect_console() conserversequence = '\05c' async def relay_keypresses(keys, modifiers=None): if focus_pending: return if hasattr(keys, 'value'): keys = [keys] if isinstance(keys, str): keys = list(bytearray(keys.encode('utf-8'))) shifted = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ~!@#$%^&*()_+{}|:"<>?' for i in range(len(keys)): if not isinstance(keys[i], int): continue if chr(keys[i]) in shifted: if modifiers: modifiers.append(SpecialKeys.SHIFT) else: modifiers = [SpecialKeys.SHIFT] if keys[0] == 0x1b: keys[0] = SpecialKeys.ESC for node in focused_nodes: vnclient = vncclientsbynode.get(node) if not vnclient: sys.stderr.write(f"No VNC client for node {node}, unable to relay keypress\n") continue try: await vnclient.send_keypresses(keys, modifiers) except Exception as e: sys.stderr.write(f"Error relaying keypress to VNC client: {e}\n") continue single_focus_node = None focused_nodes = {} def init_focus(node): global single_focus_node if not focused_nodes: focused_nodes[node] = True single_focus_node = node focus_pending = False def move_focus(direction): global single_focus_node # Implement the logic to move focus in the specified direction currcolcell, currrowcell = nodepositions[single_focus_node] if direction == 'Up': target = (currcolcell, currrowcell - cheight) elif direction == 'Down': target = (currcolcell, currrowcell + cheight) elif direction == 'Left': target = (currcolcell - cwidth, currrowcell) elif direction == 'Right': target = (currcolcell + cwidth, currrowcell) else: sys.stderr.write("Invalid direction for focus movement: {}\n".format(direction)) return for node, pos in nodepositions.items(): if pos == target: focused_nodes.clear() focused_nodes[node] = True single_focus_node = node redraw() return def toggle_focus_all(): # Implement the logic to toggle focus all if len(focused_nodes) < 2: for node in nodepositions: focused_nodes[node] = True return for node in list(focused_nodes): if node == single_focus_node: continue del focused_nodes[node] async def do_vnc(node, url): global streaming keeprunning = True retries = 5 while keeprunning: try: async with await vnc.VNCClient.create(url) as client: vncclientsbynode[node] = client while True: # Retrieve pixels as a 3D numpy array try: # replace the stock screenshot function with our own # ask for a video refresh, then do reads until the video is complete # stock screenshot function discards data, # or we just patch the screenshot instead.... # but OpenBMC sends an alpha heavy mouse overlay # that results in blackness # possibly use client side cursor to suppress the ick? image = await client.get_screenshot() except asyncio.TimeoutError: # need a better closed connection detector, a static screen triggers timeouts too # without the timeout, we lose track of proxmox reset # with the timeout, we falsely assume dead on stale break retries = 5 # Save as PNG using PIL/pillow outfile = io.BytesIO() image.save(outfile, format='PNG') imgdata = outfile.getvalue() if imgdata: draw_node(node, imgdata, '', '', cwidth, cheight) else: keeprunning = False break except ValueError as e: draw_node(node, None, str(e), '', cwidth, cheight) retries -= 1 if retries <= 0: keeprunning = False sys.stderr.write(f'VNC connection for node {node} failed with error: {e}\n') await asyncio.sleep(1) def draw_node(node, imgdata, errorstr, firstnodename, cwidth, cheight): imagedatabynode[node] = imgdata if node in nodepositions: prep_node_tile(node) cursor_save() else: if options.interval is not None: if node != firstnodename: sys.stderr.write('Multiple nodes not supported for interval') sys.exit(1) sticky_cursor() sys.stdout.write('{}: '.format(node)) # one row is used by our own name, so cheight - 1 for that allowance if errorstr: draw_text(errorstr, cwidth, cheight -1 if cheight else cheight) else: draw_image(imgdata, cwidth, cheight - 1 if cheight else cheight) if node in nodepositions: cursor_restore() reset_cursor(node) else: sys.stdout.write('\n') sys.stdout.flush() if options.screenshot or options.video: streaming = options.video try: cursor_hide() asyncio.run(do_screenshot()) except KeyboardInterrupt: pass finally: cursor_show() cursor_down(numrows) sys.stdout.write('\n') sys.exit(0) async def kill(noderange): sess = client.Command() envstring=os.environ.get('NODECONSOLE_WINDOWED_COMMAND') if not envstring: envstring = 'xterm' nodes = [] async for res in sess.read('/noderange/{0}/nodes/'.format(noderange)): node = res.get('item', {}).get('href', '/').replace('/', '') if not node: sys.stderr.write(res.get('error', repr(res)) + '\n') sys.exit(1) nodes.append(node) for node in nodes: command = "ps auxww | grep {0} | grep console | egrep '\\b{1}\\b' | grep -v grep | awk '{{print $2}}'".format(envstring, node) process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() try: process_id = stdout.decode('utf-8').split()[0] except IndexError: sys.stderr.write(node + ": console window not found \n") continue subprocess.Popen(["kill", process_id], stdout=subprocess.PIPE, stderr=subprocess.PIPE) sys.exit(0) def handle_geometry(envlist, sizegeometry, side_pad=0, top_pad=0, first=False): if '-geometry' in envlist: g_index = envlist.index('-geometry') elif '-g' in envlist: g_index = envlist.index('-g') else: g_index = 0 if g_index: if first: envlist[g_index+1] = '{0}+{1}+{2}'.format(envlist[g_index+1],side_pad, top_pad) else: envlist[g_index+1] = '{0}+{1}+{2}'.format(sizegeometry,side_pad, top_pad) else: envlist.insert(1, '-geometry') envlist.insert(2, '{0}+{1}+{2}'.format(sizegeometry,side_pad, top_pad)) g_index = 1 return envlist # add funcltionality to close/kill all open consoles if killcon: asyncio.run(kill(noderange)) #added functionality for wcons if options.windowed: result=subprocess.Popen(['xwininfo', '-root'], stdout=subprocess.PIPE) rootinfo=result.communicate()[0] result.wait() for line in rootinfo.decode('utf-8').split('\n'): if 'Width' in line: screenwidth = int(line.split(':')[1]) if 'Height' in line: screenheight = int(line.split(':')[1]) envstring=os.environ.get('NODECONSOLE_WINDOWED_COMMAND') if not envstring: sizegeometry='100x31' corrected_x, corrected_y = (13,84) envlist = handle_geometry(['xterm'] + pass_through_args + ['-e'],sizegeometry, first=True) #envlist=['xterm', '-bg', 'black', '-fg', 'white', '-geometry', '{sizegeometry}+0+0'.format(sizegeometry=sizegeometry), '-e'] else: envlist=os.environ.get('NODECONSOLE_WINDOWED_COMMAND').split(' ') if envlist[0] == 'xterm': if '-geometry' in envlist: g_index = envlist.index('-geometry') elif '-g' in envlist: g_index = envlist.index('-g') else: g_index = 0 if g_index: envlist[g_index+1] = envlist[g_index+1] + '+0+0' else: envlist.insert(1, '-geometry') envlist.insert(2, '100x31+0+0') g_index = 1 nodes = [] sess = synclient.Command() for res in sess.read('/noderange/{0}/nodes/'.format(args[0])): node = res.get('item', {}).get('href', '/').replace('/', '') if not node: sys.stderr.write(res.get('error', repr(res)) + '\n') sys.exit(1) nodes.append(node) if options.tile and not envlist[0] == 'xterm': sys.stderr.write('[ERROR] UNSUPPORTED OPTIONS. \nWindowed and tiled consoles are only supported when using xterm \n') sys.exit(1) firstnode=nodes[0] nodes.pop(0) with open(os.devnull, 'wb') as devnull: xopen=subprocess.Popen(envlist + [confettypath, '-c', '/tmp/controlpath-{0}'.format(firstnode)] + automation_args + ['-m', '5', 'start', '/nodes/{0}/console/session'.format(firstnode)] , stdin=devnull) time.sleep(2) s=socket.socket(socket.AF_UNIX) winid='' try: s.connect('/tmp/controlpath-{firstnode}'.format(firstnode=firstnode)) s.recv(64) s.send(b'GETWINID') winid=s.recv(64).decode('utf-8') except: time.sleep(2) # try to get id of first panel/xterm window using name win=subprocess.Popen(['xwininfo', '-tree', '-root'], stdout=subprocess.PIPE) wintr=win.communicate()[0] for line in wintr.decode('utf-8').split('\n'): if 'console: {firstnode}'.format(firstnode=firstnode) in line or 'confetty' in line: win_obj = [ele for ele in line.split(' ') if ele.strip()] winid = win_obj[0] if winid: firstnode_window=subprocess.Popen(['xwininfo', '-id', '{winid}'.format(winid=winid)], stdout=subprocess.PIPE) xinfo=firstnode_window.communicate()[0] xinfl = xinfo.decode('utf-8').split('\n') for line in xinfl: if 'Absolute upper-left X:' in line: side_pad = int(line.split(':')[1]) elif 'Absolute upper-left Y:' in line: top_pad = int(line.split(':')[1]) elif 'Width:' in line: window_width = int(line.split(':')[1]) elif 'Height' in line: window_height = int(line.split(':')[1]) elif '-geometry' in line: l = re.split(' |x|-|\\+', line) l_nosp = [ele for ele in l if ele.strip()] wmxo = int(l_nosp[1]) wmyo = int(l_nosp[2]) sizegeometry = str(wmxo) + 'x' + str(wmyo) else: pass window_width += side_pad*2 window_height += side_pad+top_pad screenwidth -= wmxo screenheight -= wmyo currx = window_width curry = 0 for node in sortutil.natural_sort(nodes): if options.tile and envlist[0] == 'xterm': corrected_x = currx corrected_y = curry xgeometry = '{0}+{1}+{2}'.format(sizegeometry, corrected_x, corrected_y) currx += window_width if currx + window_width >= screenwidth: currx=0 curry += window_height if curry > screenheight: curry =top_pad if not envstring: envlist= handle_geometry(envlist, sizegeometry, corrected_x, corrected_y) else: if g_index: envlist[g_index+1] = xgeometry elif envlist[0] == 'xterm': envlist=handle_geometry(envlist, sizegeometry, side_pad, top_pad) side_pad+=(side_pad+1) top_pad+=(top_pad+30) else: pass with open(os.devnull, 'wb') as devnull: xopen=subprocess.Popen(envlist + [confettypath, '-c', '/tmp/controlpath-{0}'.format(node)] + automation_args + ['-m', '5', 'start', '/nodes/{0}/console/session'.format(node)] , stdin=devnull) sys.exit(0) #end of wcons if options.tile: null = open('/dev/null', 'w') nodes = [] sess = synclient.Command() for res in sess.read('/noderange/{0}/nodes/'.format(args[0])): node = res.get('item', {}).get('href', '/').replace('/', '') if not node: sys.stderr.write(res.get('error', repr(res)) + '\n') sys.exit(1) nodes.append(node) initial = True in_tmux = False pane = 0 sessname = 'nodeconsole_{0}'.format(os.getpid()) if os.environ.get("TMUX"): initial = False in_tmux = True subprocess.call(['tmux', 'rename-session', sessname]) for node in sortutil.natural_sort(nodes): panename = '{0}:{1}'.format(sessname, pane) if initial: initial = False confetty_cmd = [confettypath] + automation_args + ['-m', '5', 'start', '/nodes/{0}/console/session'.format(node)] subprocess.call( ['tmux', 'new-session', '-d', '-s', sessname, '-x', '800', '-y', '800', ' '.join(shlex.quote(arg) for arg in confetty_cmd)]) else: subprocess.call(['tmux', 'select-pane', '-t', sessname]) subprocess.call(['tmux', 'set-option', '-t', panename, 'pane-border-status', 'top'], stderr=null) confetty_cmd = [confettypath] + automation_args + ['-m', '5', 'start', '/nodes/{0}/console/session'.format(node)] subprocess.call( ['tmux', 'split', '-h', '-t', sessname, ' '.join(shlex.quote(arg) for arg in confetty_cmd)]) subprocess.call(['tmux', 'select-layout', '-t', sessname, 'tiled'], stdout=null) pane += 1 subprocess.call(['tmux', 'select-pane', '-t', sessname]) subprocess.call(['tmux', 'set-option', '-t', panename, 'pane-border-status', 'top'], stderr=null) if not in_tmux: os.execlp('tmux', 'tmux', 'attach', '-t', sessname) else: execl_args = [confettypath] + automation_args + ['start', '/nodes/{0}/console/session'.format(args[0])] os.execl(confettypath, *execl_args)