2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-06-02 09:19:39 +00:00
Files
Jarrod Johnson a99f3de910 Implement a headless mode
For automation, this can make more sense.
2026-05-12 14:55:45 -04:00

1304 lines
50 KiB
Plaintext
Executable File

#!/usr/libexec/platform-python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2015 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import base64
import optparse
import os
import shlex
import subprocess
import sys
path = os.path.dirname(os.path.realpath(__file__))
path = os.path.realpath(os.path.join(path, '..', 'lib', 'python'))
if path.startswith('/opt'):
sys.path.append(path)
import confluent.asynclient as client
import confluent.client as synclient
import confluent.sortutil as sortutil
import confluent.logreader as logreader
import confluent.vnc as vnc
import enum
import time
import select
import signal
import socket
import re
import tty
import termios
import fcntl
import confluent.screensqueeze as sq
try:
from PIL import Image, ImageDraw, ImageFont
except ImportError:
Image = None
try:
# sixel is optional, attempt to import but stub out if unavailable
import io
import sixel
class DumbWriter(sixel.SixelWriter):
def restore_position(self, output):
return
except ImportError:
class DumbWriter():
def draw(self, imgfile):
sys.stderr.write("PySixel not detected, Sixel format display not supported\n")
confettypath = os.path.join(os.path.dirname(sys.argv[0]), 'confetty')
argparser = optparse.OptionParser(
usage="Usage: %prog [options] <noderange> [kill][-- [passthroughoptions]]",
epilog="Command sequences are available while connected to a console, hit "
"ctrl-'e', then release ctrl, then 'c', then '?' for a full list. "
"For example, ctrl-'e', then 'c', then '.' will exit the current "
"console")
argparser.add_option('-a', '--automation', type='string', default=None,
help='Specify an automation script')
argparser.add_option('-t', '--tile', action='store_true', default=False,
help='Tile console windows in the terminal')
argparser.add_option('-e', '--headless', action='store_true', default=False,
help='Run in headless mode, which is designed for use with '
'automation scripts and disables interactive features')
argparser.add_option('-l', '--log', action='store_true', default=False,
help='Enter log replay mode instead of showing a live console')
argparser.add_option('-T', '--Timestamp', action='store_true', default=False,
help= 'Dump log in stdout with timestamps')
argparser.add_option('-s', '--screenshot', action='store_true', default=False,
help='Attempt to grab screenshot and render using kitty image protocol')
argparser.add_option('-i', '--interval', type='float',
help='Interval in seconds to redraw the screenshot. Currently only '
'works for one node')
argparser.add_option('-v', '--video', action='store_true', default=False,
help='Attempt to continuously stream video from nodes that support streaming console via confluent')
argparser.add_option('-w','--windowed', action='store_true', default=False,
help='Open terminal windows for each node. The '
'environment variable NODECONSOLE_WINDOWED_COMMAND '
'should be set, which should be a text string corresponding '
'to a command that can be used to open a windowed console,'
' omitting the "nodeconsole <noderange>" part of the '
'command, for example, to open a set of consoles for a '
'range of nodes in separate xterm windows, set '
'NODECONSOLE_WINDOWED_COMMAND to "xterm -e". To open a '
'set of consoles for a range of nodes in separate '
'GNOME Terminal windows with a size of 100 columns and '
'31 rows, set NODECONSOLE_WINDOWED_COMMAND '
'to "gnome-terminal --geometry 100x31 --" or in a WSL '
'environment, to open a set of consoles for a range of '
'nodes in separate Windows Terminal windows, with the '
'title set for each node, set NODECONSOLE_WINDOWED_COMMAND'
' to "wt.exe wsl.exe -d AlmaLinux-8 '
'--shell-type login". If the NODECONSOLE_WINDOWED_COMMAND '
'environment variable isn\'t set, xterm will be used by'
'default.')
(options, args) = argparser.parse_args()
automation_args = []
if options.automation:
automation_args = ['-a', options.automation]
if options.headless:
automation_args += ['--headless']
oldtcattr = None
oldfl = None
streaming = False
class ModifiedKey:
def __init__(self, key, modifierstate=None):
self.key = key
self.modifierstate = modifierstate
class SpecialKeys(enum.Enum):
F1 = 0xffbe
F2 = 0xffbf
F3 = 0xffc0
F4 = 0xffc1
F5 = 0xffc2
F6 = 0xffc3
F7 = 0xffc4
F8 = 0xffc5
F9 = 0xffc6
F10 = 0xffc7
F11 = 0xffc8
F12 = 0xffc9
UP = 0xff52
DOWN = 0xff54
LEFT = 0xff51
RIGHT = 0xff53
ALT = 0xffe9
CTRL = 0xffe4
SHIFT = 0xffe1
BACKSPACE = 0xff08
TAB = 0xff09
ENTER = 0xff0d
SYSRQ = 0xff15
DELETE = 0xffff
INSERT = 0xff63
PAGE_UP = 0xff55
PAGE_DOWN = 0xff56
HOME = 0xff50
END = 0xff57
ESC = 0xff1b
extratextbynode = {}
focustext = ''
async def do_power_action(action, setboot=None):
targnodes = list(focused_nodes)
if not targnodes:
return
for node in targnodes:
extratextbynode[node] = 'Power action: '
redraw()
try:
paclient = client.Command()
if setboot:
noderange = ','.join(targnodes)
async for res in paclient.update(f'/noderange/{noderange}/boot/nextdevice', {'nextdevice': setboot, 'mode': 'uefi'}):
if 'error' in res:
for node in targnodes:
extratextbynode[node] += 'Failed to set boot device: {0}'.format(res['error'])
return
for node in res.get('databynode', {}):
extratextbynode[node] += 'set boot device to: ' + res['databynode'][node].get('nextdevice', {}).get('value', 'Unknown') + ', '
noderange = ','.join(targnodes)
async for res in paclient.update(f'/noderange/{noderange}/power/state', {'state': action}):
if 'error' in res:
for node in targnodes:
extratextbynode[node] += 'Failed to power ' + action + ': {0}'.format(res['error'])
return
for node in res.get('databynode', {}):
extratextbynode[node] += res['databynode'][node].get('state', {}).get('value', 'Unknown')
finally:
redraw()
await asyncio.sleep(5)
for node in targnodes:
extratextbynode[node] = ''
redraw()
def set_extra_text(text):
global focustext
focustext = text
redraw()
async def watch_input():
global focustext
focustext = 'Hit Ctrl-E for command mode'
handler = await InputHandler.create()
while True:
await asyncio.sleep(5)
if focustext == 'Hit Ctrl-E for command mode':
focustext = ''
redraw()
class InputHandler:
ss3keys = {
'P': SpecialKeys.F1,
'Q': SpecialKeys.F2,
'R': SpecialKeys.F3,
'S': SpecialKeys.F4,
'M': SpecialKeys.ENTER,
}
csikeys = {
'[H': SpecialKeys.HOME,
'[F': SpecialKeys.END,
'[1;3': False, # Stub to detect alt-arrow
'[2~': SpecialKeys.INSERT,
'[3~': SpecialKeys.DELETE,
'[5~': SpecialKeys.PAGE_UP,
'[6~': SpecialKeys.PAGE_DOWN,
'[11~': SpecialKeys.F1,
'[12~': SpecialKeys.F2,
'[13~': SpecialKeys.F3,
'[14~': SpecialKeys.F4,
'[15~': SpecialKeys.F5,
'[17~': SpecialKeys.F6,
'[18~': SpecialKeys.F7,
'[19~': SpecialKeys.F8,
'[20~': SpecialKeys.F9,
'[21~': SpecialKeys.F10,
'[23~': SpecialKeys.F11,
'[24~': SpecialKeys.F12,
'[A': SpecialKeys.UP,
'[B': SpecialKeys.DOWN,
'[C': SpecialKeys.RIGHT,
'[D': SpecialKeys.LEFT
}
@classmethod
async def create(cls):
self = cls()
self.buffer = ''
self.modkeys = None
self.inputcontext = None
currloop = asyncio.get_running_loop()
self.fd = sys.stdin.fileno()
self.seqtimeout = None
currloop.add_reader(self.fd, self.handle_input)
return self
def handle_input(self):
data = os.read(self.fd, 1)
if not data:
return
asyncio.create_task(self.process_input(data))
async def timeout_sequence(self, timeout=3, flushbuffer=False):
await asyncio.sleep(timeout)
if flushbuffer:
if self.buffer == '\x05':
await relay_keypresses('e', modifiers=[SpecialKeys.CTRL])
elif self.buffer == '\x1bO':
await relay_keypresses('O', modifiers=[SpecialKeys.ALT, SpecialKeys.SHIFT])
else:
await relay_keypresses(self.buffer)
self.reset_input_context()
def reset_input_context(self, escmode=False):
self.inputcontext = None
self.buffer = ''
self.modkeys = None
if not escmode:
set_extra_text('')
global focus_pending
if focus_pending:
focus_pending = False
if focus_pending or escmode:
redraw()
async def process_input(self, data):
if self.inputcontext is None: # no escape or command sequence
if data == b'\x05': # Ctrl-E
set_extra_text('Ctrl-E pressed, release Ctrl and hit "c" to enter command mode, Ctrl-E again to send Ctrl-E') # clear any previous command sequence text
if self.seqtimeout:
self.seqtimeout.cancel()
self.seqtimeout = asyncio.create_task(self.timeout_sequence(flushbuffer=True))
self.inputcontext = 'command_sequence'
self.buffer = '\x05'
return
elif data == b'\x1b': # ESC
if self.seqtimeout:
self.seqtimeout.cancel()
self.seqtimeout = asyncio.create_task(self.timeout_sequence(0.2, flushbuffer=True))
self.inputcontext = 'escape_sequence'
self.buffer = '\x1b'
return
if data[0] == 0x7f: # Backspace
await relay_keypresses(SpecialKeys.BACKSPACE)
elif data[0] == 0x09: # Tab
await relay_keypresses(SpecialKeys.TAB)
elif data[0] == 0x0d: # Enter
await relay_keypresses(SpecialKeys.ENTER)
elif data[0] == 0:
await relay_keypresses(' ', modifiers=[SpecialKeys.CTRL]) # Ctrl-Space)
elif data[0] < 0x20: # other control characters, this is ambiguous, but assume letters
await relay_keypresses(chr(data[0] + 0x60), modifiers=[SpecialKeys.CTRL])
else:
await relay_keypresses(data.decode('utf-8', errors='ignore'))
return
elif self.inputcontext == 'escape_sequence':
await self.handle_esc_sequence(data)
elif self.inputcontext == 'command_sequence':
await self.handle_command_sequence(data)
valid_commands = ('\x05cb', # send sysrq
'\x05cpo', # power off system
'\x05cps', # shutdown system gracefully
'\x05cpb\r', # reboot system
'\x05cpbs', # boot system to setup
'\x05cpbn', # boot system to network
'\x05c.', # exit console
'\x05cq', # exit console alias
'\x05c?', # help
'\x05cfa', # toggle focus all
'\x05c\x1b[A', '\x05c\x1b[B', '\x05c\x1b[C', '\x05c\x1b[D', # move focus with arrow keys
'\x05cf\x1b[A', '\x05cf\x1b[B', '\x05cf\x1b[C', '\x05cf\x1b[D', # move focus with arrow keys
)
def starts_valid_command(self):
for cmd in self.valid_commands:
if cmd.startswith(self.buffer):
return True
return False
async def handle_command_sequence(self, data):
if self.seqtimeout:
self.seqtimeout.cancel()
self.buffer += data.decode('utf-8', errors='ignore')
if '\x1b' == self.buffer[-1:]:
# user hit escape, or arrow key..
# give it a short timeout for arrows..
self.seqtimeout = asyncio.create_task(self.timeout_sequence(0.2))
return
if '\x03' == self.buffer[-1:]: # Ctrl-C, abort command sequence immediately
self.reset_input_context()
return
if self.buffer == '\x05\x05': # double ctrl-e, send a single ctrl-e to the console
await relay_keypresses('e', modifiers=[SpecialKeys.CTRL])
self.reset_input_context()
return
if len(self.buffer) < 2:
raise Exception('Command sequence buffer should have at least 2 characters')
if not self.buffer.startswith('\x05c'): # not a command
await relay_keypresses('e', modifiers=[SpecialKeys.CTRL])
await relay_keypresses(self.buffer[1:])
self.reset_input_context()
if self.buffer == '\x05c':
set_extra_text('Commands: q:exit, p:power, f:input focus, b:sysrq, ^c:abort')
self.seqtimeout = asyncio.create_task(self.timeout_sequence(10)) # extend timeout to navigate options
return
if '\x05cb' == self.buffer: # send break
# but we need to know more, so wait for the next key
set_extra_text('Enter sysrq command key (or <ESC> to abort)')
self.seqtimeout = asyncio.create_task(self.timeout_sequence())
elif len(self.buffer) == 4 and self.buffer.startswith('\x05cb'): # Ctrl-E, then c, then b
await relay_keypresses(self.buffer[3:], modifiers=[SpecialKeys.ALT, SpecialKeys.SYSRQ])
self.reset_input_context()
elif self.buffer == '\x05c.' or self.buffer == '\x05cq': # Ctrl-E, then . or q
asyncio.get_running_loop().call_soon(sys.exit, 0)
return
elif self.buffer == '\x05cf': # Ctrl-E, then c, then f
set_extra_text('Focus commands: ⇅⇄:move focus, a:toggle focus all')
self.seqtimeout = asyncio.create_task(self.timeout_sequence(10))
elif self.buffer == '\x05cfa': # toggle focus ...
toggle_focus_all()
self.reset_input_context()
elif self.buffer == '\x05cp':
set_extra_text('Power commands: o:power off, s:shutdown, b:boot')
self.seqtimeout = asyncio.create_task(self.timeout_sequence(10))
elif self.buffer == '\x05cpb':
set_extra_text('Boot options: <enter>: normal boot, s:boot to setup, n:boot to network, ^c to abort')
self.seqtimeout = asyncio.create_task(self.timeout_sequence(10))
elif self.buffer == '\x05cpo': # Ctrl-E, then power off
await do_power_action('off')
return self.reset_input_context()
elif self.buffer == '\x05cps': # Ctrl-E, then shutdown
await do_power_action('shutdown')
return self.reset_input_context()
elif self.buffer == '\x05cpb\r': # Ctrl-E, then reboot
await do_power_action('boot')
return self.reset_input_context()
elif self.buffer == '\x05cpbs': # Ctrl-E, then boot to setup
await do_power_action('boot', 'setup')
return self.reset_input_context()
elif self.buffer == '\x05cpbn': # Ctrl-E, then boot to network
await do_power_action('boot', 'network')
return self.reset_input_context()
elif self.buffer == '\x05c?': # Ctrl-E, then c?
set_extra_text('q:exit,⇅⇄:focus,po:power off,ps:shutdown,pb<enter>:reboot,pbs:boot setup,pbn:boot network,fa:(un)focus all,bX:sysrq, then X')
self.buffer = '\x05c' # go back to let the user enter stuff based on text
self.seqtimeout = asyncio.create_task(self.timeout_sequence(10))
return
elif self.buffer[-3:] in ('\x1b[A', '\x1b[B', '\x1b[C', '\x1b[D'): # Ctrl-E, then cursor keys
arrowkey_map = {'A': 'Up', 'B': 'Down', 'C': 'Right', 'D': 'Left'}
arrowkey = arrowkey_map.get(self.buffer[-1], self.buffer[-1])
global focus_pending
focus_pending = True
move_focus(arrowkey)
self.buffer = '\x05c'
self.seqtimeout = asyncio.create_task(self.timeout_sequence())
elif self.starts_valid_command():
self.seqtimeout = asyncio.create_task(self.timeout_sequence())
elif len(self.buffer) > 2:
# unknown command sequence, abort
self.reset_input_context()
async def handle_esc_sequence(self, data):
if self.seqtimeout:
self.seqtimeout.cancel()
self.buffer += data.decode('utf-8', errors='ignore')
# Shift-enter
if len(self.buffer) >= 2 and self.buffer.startswith('\x1b['): #CSI
if self.buffer.endswith(';3~'):
self.buffer = self.buffer.replace(';3~', '~')
self.modkeys = [SpecialKeys.ALT]
if '1;3' in self.buffer:
self.buffer = self.buffer.replace('1;3', '')
self.modkeys = [SpecialKeys.ALT]
if self.buffer[1:] in self.csikeys:
await relay_keypresses(self.csikeys[self.buffer[1:]], modifiers=self.modkeys)
self.reset_input_context(escmode=True)
return
for cand in self.csikeys:
if cand.startswith(self.buffer[1:]):
# continue to wait for more input to disambiguate
self.seqtimeout = asyncio.create_task(self.timeout_sequence(0.2))
break
else:
self.reset_input_context(escmode=True)
elif len(self.buffer) >= 2 and self.buffer.startswith('\x1bO'): #SS3
if len(self.buffer) == 3:
if self.buffer[2:] in self.ss3keys:
await relay_keypresses(self.ss3keys[self.buffer[2:]])
self.reset_input_context(escmode=True)
return
if len(self.buffer) == 2:
self.seqtimeout = asyncio.create_task(self.timeout_sequence(0.2))
return
elif len(self.buffer) >= 2: # ESC-key is a way to do alt
await relay_keypresses(self.buffer[1:], modifiers=[SpecialKeys.ALT])
self.reset_input_context(escmode=True)
vncclientsbynode = {}
def get_coords():
sys.stdout.write('\x1b[6n') #
sys.stdout.flush()
gotreply = select.select([sys.stdin,], [], [], 0.250)[0]
if gotreply:
response = ''
while select.select([sys.stdin,], [], [], 0.1)[0] and 'R' not in response:
response += sys.stdin.read()
coords = response.replace('R', '').split('[')[1].split(';')
#sys.stdout.write('\x1b[{}:{}H'.format(*coords))
console_direct_mode = False
def direct_console():
global console_direct_mode
global oldtcattr
global oldfl
if console_direct_mode:
return False
console_direct_mode = True
oldtcattr = termios.tcgetattr(sys.stdin.fileno())
oldfl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
tty.setraw(sys.stdin.fileno())
#fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, oldfl | os.O_NONBLOCK)
return True
def indirect_console():
global console_direct_mode
global oldtcattr
global oldfl
if not console_direct_mode:
return False
console_direct_mode = False
#fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, oldfl & ~os.O_NONBLOCK)
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, oldtcattr)
return True
def determine_tile_size(numnodes):
# for now, smash everything to a common aspect ratio. 16:11
# is pretty much wrong for everything, making 4:3 a bit too wide
# and 16:9 significantly too narrow, but it is serviceable
# An improvement could come with us owning the scaling
# instead of delegating to Kitty, which says if we specify both,
# we get stretching. In theory we should be able to get aspect correct
# from kitty by omitting, but:
# then we don't know how much to move the cursor left after draw_image
# Konsole won't scale at all with only partial scaling specified
directed = direct_console()
cheight, cwidth, pixwidth, pixheight = sq.get_screengeom(escfallback=True)
if directed:
indirect_console()
# 16:12 is to roughly account for the 'titles' of the tiles
ratio = (pixwidth / 16) / (pixheight / 12)
bestdeviation = None
bestdims = []
for i in range(1, numnodes + 1):
number = numnodes
while number % i != 0:
number += 1
columns = i
rows = number // i
deviation = abs(ratio - (columns / rows))
if bestdeviation is None:
bestdeviation = deviation
bestdims = [columns, rows]
elif deviation < bestdeviation:
bestdeviation = deviation
bestdims = [columns, rows]
# ok, the above algorithm can still pick things like
# 1 2 3
# 4
# So we will let it pick the number of rows, and
# then see if we can chop columns and still fit
while (bestdims[0] - 1) * bestdims[1] >= numnodes:
bestdims[0] = bestdims[0] - 1
cellswide = cwidth // bestdims[0]
cellshigh = cheight // bestdims[1]
tilewidth = cellswide * pixwidth / cwidth
tileheight = cellshigh * pixheight / cheight
if tilewidth > (tileheight * 16 / 11):
tilewidth = tileheight * 16 / 11
cellswide = int(tilewidth // (pixwidth / cwidth))
if tileheight > (tilewidth * 11 /16):
tileheight = tilewidth * 11 / 16
cellshigh = int(tileheight // (pixheight / cheight))
bestdims = bestdims + [cellswide, cellshigh, cellshigh * bestdims[1]]
# incur any scrolling we might get. This allows us to accurately
# save/restore cursor or even get coordinates without scrolling fouling
# the desired target
sys.stdout.write('\n' * bestdims[4])
sys.stdout.flush()
cursor_up(bestdims[4])
return bestdims
cursor_saved = False
def sticky_cursor():
global cursor_saved
# get cursor restore_position
directed = False
if sys.stdin.isatty() and not cursor_saved:
try:
directed = direct_console()
sys.stdout.write('\x1b7')
cursor_saved = True
finally:
if directed:
indirect_console()
elif cursor_saved:
try:
directed = direct_console()
sys.stdout.write('\x1b8')
finally:
if directed:
indirect_console()
def cursor_up(count=1):
sys.stdout.write(f'\x1b[{count}A')
def cursor_down(count=1):
sys.stdout.write(f'\x1b[{count}B')
def cursor_right(count=1):
sys.stdout.write(f'\x1b[{count}C')
def cursor_left(count=1):
sys.stdout.write(f'\x1b[{count}D')
def cursor_save():
sys.stdout.write('\x1b7')
def cursor_restore():
sys.stdout.write('\x1b8')
def cursor_hide():
sys.stdout.write('\x1b[?25l')
def cursor_show():
sys.stdout.write('\x1b[?25h')
def get_pix_dimensions(width, height):
directed = direct_console()
cheight, cwidth, pixwidth, pixheight = sq.get_screengeom(escfallback=True)
if directed:
indirect_console()
imgwidth = int(pixwidth / cwidth * width)
imgheight = int(pixheight / cheight * height)
return imgwidth, imgheight
def draw_text(text, width, height):
if Image:
maxfntsize = 256
imgwidth, imgheight = get_pix_dimensions(width, height)
nerr = Image.new(mode='RGB', size=(imgwidth, imgheight), color='green')
nd = ImageDraw.Draw(nerr)
for txtpiece in text.split('\n'):
fntsize = 8
scalefont = True
try:
txtfont = ImageFont.truetype('DejaVuSans.ttf', size=fntsize)
except OSError:
scalefont = False
txtfont = ImageFont.load_default(fntsize)
while scalefont and nd.textlength(txtpiece, font=txtfont) < int(imgwidth * 0.90):
fntsize += 1
try:
txtfont = ImageFont.truetype('DejaVuSans.ttf', size=fntsize)
except OSError:
txtfont = ImageFont.load_default(fntsize)
fntsize -= 1
if fntsize < maxfntsize:
maxfntsize = fntsize
hmargin = int(imgwidth * 0.05)
vmargin = int(imgheight * 0.10)
nd.text((hmargin, vmargin), text, font=txtfont)
nd.rectangle((0, 0, nerr.width - 1, nerr.height -1), outline='white')
outfile = io.BytesIO()
nerr.save(outfile, format='PNG')
draw_image(outfile.getbuffer(), width, height, doscale=False)
else:
sys.stdout.write(text)
cursor_left(len(text))
def draw_image(bindata, width, height, doscale=True):
imageformat = os.environ.get('CONFLUENT_IMAGE_PROTOCOL', 'kitty')
if doscale and Image and width:
binfile = io.BytesIO()
binfile.write(bindata)
binfile.seek(0)
try:
img = Image.open(binfile)
except Exception as e:
errstr = 'Error rendering image:\n' + str(e)
return draw_text(errstr, width, height)
imgwidth, imgheight = get_pix_dimensions(width, height)
nimg = Image.new(mode='RGBA', size=(imgwidth, imgheight))
imgwidth -= 4
imgheight -= 4
hscalefact = imgwidth / img.width
vscalefact = imgheight / img.height
if hscalefact < vscalefact:
rzwidth = imgwidth
rzheight = int(img.height * hscalefact)
else:
rzwidth = int(img.width * vscalefact)
rzheight = imgheight
img = img.resize((rzwidth, rzheight))
nd = ImageDraw.Draw(nimg)
nd.rectangle((1, 1, rzwidth + 2, rzheight + 2), outline='black')
nd.rectangle((0, 0, rzwidth + 3, rzheight + 3), outline='white')
nimg.paste(img, box=(2, 2))
outfile = io.BytesIO()
nimg.save(outfile, format='PNG')
bindata = outfile.getbuffer()
if imageformat == 'sixel':
sixel_draw(bindata)
elif imageformat == 'iterm':
iterm_draw(bindata, width, height)
else:
kitty_draw(bindata, width, height)
def sixel_draw(bindata):
binfile = io.BytesIO()
binfile.write(bindata)
binfile.seek(0)
DumbWriter().draw(binfile)
def iterm_draw(bindata, width, height):
data = base64.b64encode(bindata)
if not height:
height = 'auto'
if not width:
width = 'auto'
datalen = len(bindata)
sys.stdout.write(
'\x1b]1337;File=inline=1;width={};height={};size={}:'.format(width,height,datalen))
sys.stdout.write(data.decode('utf8'))
sys.stdout.write('\a')
sys.stdout.flush()
def kitty_draw(bindata, width, height):
data = base64.b64encode(bindata)
preamble = '\x1b_Ga=T,f=100,q=2'
if height:
preamble += f',r={height},c={width}'
#sys.stdout.write(repr(preamble))
#sys.stdout.write('\xb[{}D'.format(len(repr(preamble))))
#return
first = True
while data:
chunk, data = data[:4096], data[4096:]
m = 1 if data else 0
if first:
sys.stdout.write('{},m={};'.format(preamble, m))
else:
sys.stdout.write('\x1b_Gm={};'.format(m))
sys.stdout.write(chunk.decode('utf8'))
sys.stdout.write('\x1b\\')
sys.stdout.flush()
pass_through_args = []
killcon = False
try:
noderange = args[0]
if len(args) > 1:
if args[1] == 'kill':
killcon = True
pass_through_args = args[1:]
args = args[:1]
except IndexError:
argparser.print_help()
sys.exit(1)
if len(args) != 1:
argparser.print_help()
sys.exit(1)
if options.log:
logname = args[0]
if not os.path.exists(logname) and logname[0] != '/':
logname = os.path.join('/var/log/confluent/consoles', logname)
if not os.path.exists(logname):
sys.stderr.write('Unable to locate {0} on local system\n'.format(logname))
sys.exit(1)
logreader.replay_to_console(logname)
sys.exit(0)
if options.Timestamp:
logname = args[0]
if not os.path.exists(logname) and logname[0] != '/':
logname = os.path.join('/var/log/confluent/consoles', logname)
if not os.path.exists(logname):
sys.stderr.write('Unable to locate {0} on local system\n'.format(logname))
sys.exit(1)
logreader.dump_to_console(logname)
sys.exit(0)
def prep_node_tile(node):
currcolcell, currrowcell = nodepositions[node]
if currcolcell:
cursor_right(currcolcell)
if currrowcell:
cursor_down(currrowcell)
titletext = node
if extratextbynode.get(node, None):
titletext += ' ' + extratextbynode[node]
if node in focused_nodes:
if focustext and not extratextbynode.get(node, None):
titletext += ' ' + focustext
if focus_pending:
sys.stdout.write('\x1b[43m')
else:
sys.stdout.write('\x1b[44m')
titlebar = f'▏{titletext:<{cwidth - 1}}'
if len(titlebar) > cwidth:
titlebar = titlebar[:cwidth - 1] + '…'
sys.stdout.write(titlebar)
if node in focused_nodes:
sys.stdout.write('\x1b[0m')
cursor_left(len(titlebar))
cursor_down()
def reset_cursor(node):
currcolcell, currrowcell = nodepositions[node]
if currcolcell:
cursor_left(currcolcell)
cursor_up(currrowcell + 1)
nodepositions = {}
numrows = 0
cwidth = 0
cheight = 0
imagedatabynode = {}
def redraw():
for node in imagedatabynode:
imgdata = imagedatabynode[node]
if node in nodepositions:
prep_node_tile(node)
cursor_save()
else:
if options.interval is not None:
if node != firstnodename:
sys.stderr.write('Multiple nodes not supported for interval')
sys.exit(1)
sticky_cursor()
sys.stdout.write('{}: '.format(node))
# one row is used by our own name, so cheight - 1 for that allowance
draw_image(imgdata, cwidth, cheight - 1 if cheight else cheight)
if node in nodepositions:
cursor_restore()
reset_cursor(node)
else:
sys.stdout.write('\n')
sys.stdout.flush()
resized = False
async def do_screenshot():
global streaming
global resized
global numrows
sess = client.Command()
if streaming:
asyncio.create_task(watch_input())
if options.tile:
imageformat = os.environ.get('CONFLUENT_IMAGE_PROTOCOL', 'kitty')
if imageformat not in ('kitty', 'iterm'):
sys.stderr.write('Tiled screenshots only supported with kitty or iterm protocol')
sys.exit(1)
allnodes = []
numnodes = 0
async for res in sess.read('/noderange/{}/nodes/'.format(args[0])):
allnodes.append(res['item']['href'].replace('/', ''))
numnodes += 1
resized = False
def do_resize(a=None, b=None):
global resized
if a:
resized = True
# on a window resize, clear the old stuff
sys.stdout.write('\x1bc')
global numrows
global cwidth
global cheight
cols, rows, cwidth, cheight, numrows = determine_tile_size(numnodes)
currcol = 1
currcolcell = 0
currrowcell = 0
for node in allnodes:
nodepositions[node] = currcolcell, currrowcell
if streaming:
init_focus(node)
if currcol < cols:
currcol += 1
currcolcell += cwidth
else:
currcol = 1
currcolcell = 0
currrowcell += cheight
if a:
redraw()
do_resize()
signal.signal(signal.SIGWINCH, do_resize)
elif options.interval is not None:
sys.stdout.write('\x1bc')
firstnodename = None
dorefresh = True
vnconly = set([])
if streaming:
doexit = 0
async for res in sess.read('/noderange/{}/console/ikvm_methods'.format(args[0])):
for node in res.get('databynode', {}):
methods = res['databynode'][node].get('ikvm_methods', [])
if 'vnc' not in methods and 'openbmc' not in methods:
sys.stderr.write(f'Node {node} does not support video via confluent\n')
doexit = 1
vnconly.add(node)
if doexit:
sys.exit(1)
while dorefresh:
if not streaming:
async for res in sess.read('/noderange/{}/console/ikvm_screenshot'.format(args[0])):
for node in res.get('databynode', {}):
errorstr = ''
if not firstnodename:
firstnodename = node
error = res['databynode'][node].get('error')
if error and 'vnc available' in error:
vnconly.add(node)
continue
elif error:
errorstr = error
imgdata = res['databynode'][node].get('image', {}).get('imgdata', None)
if imgdata:
if len(imgdata) < 32: # We were subjected to error
errorstr = f'Unable to get screenshot'
if errorstr or imgdata:
imgdata = base64.b64decode(imgdata)
draw_node(node, imgdata, errorstr, firstnodename, cwidth, cheight)
urlbynode = {}
for node in vnconly:
async for res in sess.update(f'/nodes/{node}/console/ikvm', {'method': 'unix'}):
url = res.get('item', {}).get('href')
if url:
urlbynode[node] = url
await grab_vncs(urlbynode)
if resized:
do_resize(True)
resized = False
if options.interval is None:
dorefresh = False
else:
dorefresh = True
time.sleep(options.interval)
sys.exit(0)
async def grab_vncs(urlbynode):
global streaming
tasks = []
directed = False
try:
if streaming:
directed = direct_console()
for node in urlbynode:
url = urlbynode[node]
tasks.append(asyncio.create_task(do_vnc(node, url)))
await asyncio.gather(*tasks)
except Exception as e:
sys.stderr.write(f"Error in grab_vncs: {e}\n")
finally:
if directed:
indirect_console()
conserversequence = '\05c'
async def relay_keypresses(keys, modifiers=None):
if focus_pending:
return
if hasattr(keys, 'value'):
keys = [keys]
if isinstance(keys, str):
keys = list(bytearray(keys.encode('utf-8')))
shifted = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ~!@#$%^&*()_+{}|:"<>?'
for i in range(len(keys)):
if not isinstance(keys[i], int):
continue
if chr(keys[i]) in shifted:
if modifiers:
modifiers.append(SpecialKeys.SHIFT)
else:
modifiers = [SpecialKeys.SHIFT]
if keys[0] == 0x1b:
keys[0] = SpecialKeys.ESC
for node in focused_nodes:
vnclient = vncclientsbynode.get(node)
if not vnclient:
sys.stderr.write(f"No VNC client for node {node}, unable to relay keypress\n")
continue
try:
await vnclient.send_keypresses(keys, modifiers)
except Exception as e:
sys.stderr.write(f"Error relaying keypress to VNC client: {e}\n")
continue
single_focus_node = None
focused_nodes = {}
def init_focus(node):
global single_focus_node
if not focused_nodes:
focused_nodes[node] = True
single_focus_node = node
focus_pending = False
def move_focus(direction):
global single_focus_node
# Implement the logic to move focus in the specified direction
currcolcell, currrowcell = nodepositions[single_focus_node]
if direction == 'Up':
target = (currcolcell, currrowcell - cheight)
elif direction == 'Down':
target = (currcolcell, currrowcell + cheight)
elif direction == 'Left':
target = (currcolcell - cwidth, currrowcell)
elif direction == 'Right':
target = (currcolcell + cwidth, currrowcell)
else:
sys.stderr.write("Invalid direction for focus movement: {}\n".format(direction))
return
for node, pos in nodepositions.items():
if pos == target:
focused_nodes.clear()
focused_nodes[node] = True
single_focus_node = node
redraw()
return
def toggle_focus_all():
# Implement the logic to toggle focus all
if len(focused_nodes) < 2:
for node in nodepositions:
focused_nodes[node] = True
return
for node in list(focused_nodes):
if node == single_focus_node:
continue
del focused_nodes[node]
async def do_vnc(node, url):
global streaming
keeprunning = True
retries = 5
while keeprunning:
try:
async with await vnc.VNCClient.create(url) as client:
vncclientsbynode[node] = client
while True:
# Retrieve pixels as a 3D numpy array
try:
# replace the stock screenshot function with our own
# ask for a video refresh, then do reads until the video is complete
# stock screenshot function discards data,
# or we just patch the screenshot instead....
# but OpenBMC sends an alpha heavy mouse overlay
# that results in blackness
# possibly use client side cursor to suppress the ick?
image = await client.get_screenshot()
except asyncio.TimeoutError:
# need a better closed connection detector, a static screen triggers timeouts too
# without the timeout, we lose track of proxmox reset
# with the timeout, we falsely assume dead on stale
break
retries = 5
# Save as PNG using PIL/pillow
outfile = io.BytesIO()
image.save(outfile, format='PNG')
imgdata = outfile.getbuffer()
if imgdata:
draw_node(node, imgdata, '', '', cwidth, cheight)
else:
keeprunning = False
break
except ValueError as e:
draw_node(node, None, str(e), '', cwidth, cheight)
retries -= 1
if retries <= 0:
keeprunning = False
sys.stderr.write(f'VNC connection for node {node} failed with error: {e}\n')
await asyncio.sleep(1)
def draw_node(node, imgdata, errorstr, firstnodename, cwidth, cheight):
imagedatabynode[node] = imgdata
if node in nodepositions:
prep_node_tile(node)
cursor_save()
else:
if options.interval is not None:
if node != firstnodename:
sys.stderr.write('Multiple nodes not supported for interval')
sys.exit(1)
sticky_cursor()
sys.stdout.write('{}: '.format(node))
# one row is used by our own name, so cheight - 1 for that allowance
if errorstr:
draw_text(errorstr, cwidth, cheight -1 if cheight else cheight)
else:
draw_image(imgdata, cwidth, cheight - 1 if cheight else cheight)
if node in nodepositions:
cursor_restore()
reset_cursor(node)
else:
sys.stdout.write('\n')
sys.stdout.flush()
if options.screenshot or options.video:
streaming = options.video
try:
cursor_hide()
asyncio.run(do_screenshot())
except KeyboardInterrupt:
pass
finally:
cursor_show()
cursor_down(numrows)
sys.stdout.write('\n')
sys.exit(0)
async def kill(noderange):
sess = client.Command()
envstring=os.environ.get('NODECONSOLE_WINDOWED_COMMAND')
if not envstring:
envstring = 'xterm'
nodes = []
async for res in sess.read('/noderange/{0}/nodes/'.format(noderange)):
node = res.get('item', {}).get('href', '/').replace('/', '')
if not node:
sys.stderr.write(res.get('error', repr(res)) + '\n')
sys.exit(1)
nodes.append(node)
for node in nodes:
command = "ps auxww | grep {0} | grep console | egrep '\\b{1}\\b' | grep -v grep | awk '{{print $2}}'".format(envstring, node)
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
try:
process_id = stdout.decode('utf-8').split()[0]
except IndexError:
sys.stderr.write(node + ": console window not found \n")
continue
subprocess.Popen(["kill", process_id], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
sys.exit(0)
def handle_geometry(envlist, sizegeometry, side_pad=0, top_pad=0, first=False):
if '-geometry' in envlist:
g_index = envlist.index('-geometry')
elif '-g' in envlist:
g_index = envlist.index('-g')
else:
g_index = 0
if g_index:
if first:
envlist[g_index+1] = '{0}+{1}+{2}'.format(envlist[g_index+1],side_pad, top_pad)
else:
envlist[g_index+1] = '{0}+{1}+{2}'.format(sizegeometry,side_pad, top_pad)
else:
envlist.insert(1, '-geometry')
envlist.insert(2, '{0}+{1}+{2}'.format(sizegeometry,side_pad, top_pad))
g_index = 1
return envlist
# add funcltionality to close/kill all open consoles
if killcon:
asyncio.run(kill(noderange))
#added functionality for wcons
if options.windowed:
result=subprocess.Popen(['xwininfo', '-root'], stdout=subprocess.PIPE)
rootinfo=result.communicate()[0]
result.wait()
for line in rootinfo.decode('utf-8').split('\n'):
if 'Width' in line:
screenwidth = int(line.split(':')[1])
if 'Height' in line:
screenheight = int(line.split(':')[1])
envstring=os.environ.get('NODECONSOLE_WINDOWED_COMMAND')
if not envstring:
sizegeometry='100x31'
corrected_x, corrected_y = (13,84)
envlist = handle_geometry(['xterm'] + pass_through_args + ['-e'],sizegeometry, first=True)
#envlist=['xterm', '-bg', 'black', '-fg', 'white', '-geometry', '{sizegeometry}+0+0'.format(sizegeometry=sizegeometry), '-e']
else:
envlist=os.environ.get('NODECONSOLE_WINDOWED_COMMAND').split(' ')
if envlist[0] == 'xterm':
if '-geometry' in envlist:
g_index = envlist.index('-geometry')
elif '-g' in envlist:
g_index = envlist.index('-g')
else:
g_index = 0
if g_index:
envlist[g_index+1] = envlist[g_index+1] + '+0+0'
else:
envlist.insert(1, '-geometry')
envlist.insert(2, '100x31+0+0')
g_index = 1
nodes = []
sess = synclient.Command()
for res in sess.read('/noderange/{0}/nodes/'.format(args[0])):
node = res.get('item', {}).get('href', '/').replace('/', '')
if not node:
sys.stderr.write(res.get('error', repr(res)) + '\n')
sys.exit(1)
nodes.append(node)
if options.tile and not envlist[0] == 'xterm':
sys.stderr.write('[ERROR] UNSUPPORTED OPTIONS. \nWindowed and tiled consoles are only supported when using xterm \n')
sys.exit(1)
firstnode=nodes[0]
nodes.pop(0)
with open(os.devnull, 'wb') as devnull:
xopen=subprocess.Popen(envlist + [confettypath, '-c', '/tmp/controlpath-{0}'.format(firstnode)] + automation_args + ['-m', '5', 'start', '/nodes/{0}/console/session'.format(firstnode)] , stdin=devnull)
time.sleep(2)
s=socket.socket(socket.AF_UNIX)
winid=''
try:
s.connect('/tmp/controlpath-{firstnode}'.format(firstnode=firstnode))
s.recv(64)
s.send(b'GETWINID')
winid=s.recv(64).decode('utf-8')
except:
time.sleep(2)
# try to get id of first panel/xterm window using name
win=subprocess.Popen(['xwininfo', '-tree', '-root'], stdout=subprocess.PIPE)
wintr=win.communicate()[0]
for line in wintr.decode('utf-8').split('\n'):
if 'console: {firstnode}'.format(firstnode=firstnode) in line or 'confetty' in line:
win_obj = [ele for ele in line.split(' ') if ele.strip()]
winid = win_obj[0]
if winid:
firstnode_window=subprocess.Popen(['xwininfo', '-id', '{winid}'.format(winid=winid)], stdout=subprocess.PIPE)
xinfo=firstnode_window.communicate()[0]
xinfl = xinfo.decode('utf-8').split('\n')
for line in xinfl:
if 'Absolute upper-left X:' in line:
side_pad = int(line.split(':')[1])
elif 'Absolute upper-left Y:' in line:
top_pad = int(line.split(':')[1])
elif 'Width:' in line:
window_width = int(line.split(':')[1])
elif 'Height' in line:
window_height = int(line.split(':')[1])
elif '-geometry' in line:
l = re.split(' |x|-|\\+', line)
l_nosp = [ele for ele in l if ele.strip()]
wmxo = int(l_nosp[1])
wmyo = int(l_nosp[2])
sizegeometry = str(wmxo) + 'x' + str(wmyo)
else:
pass
window_width += side_pad*2
window_height += side_pad+top_pad
screenwidth -= wmxo
screenheight -= wmyo
currx = window_width
curry = 0
for node in sortutil.natural_sort(nodes):
if options.tile and envlist[0] == 'xterm':
corrected_x = currx
corrected_y = curry
xgeometry = '{0}+{1}+{2}'.format(sizegeometry, corrected_x, corrected_y)
currx += window_width
if currx + window_width >= screenwidth:
currx=0
curry += window_height
if curry > screenheight:
curry =top_pad
if not envstring:
envlist= handle_geometry(envlist, sizegeometry, corrected_x, corrected_y)
else:
if g_index:
envlist[g_index+1] = xgeometry
elif envlist[0] == 'xterm':
envlist=handle_geometry(envlist, sizegeometry, side_pad, top_pad)
side_pad+=(side_pad+1)
top_pad+=(top_pad+30)
else:
pass
with open(os.devnull, 'wb') as devnull:
xopen=subprocess.Popen(envlist + [confettypath, '-c', '/tmp/controlpath-{0}'.format(node)] + automation_args + ['-m', '5', 'start', '/nodes/{0}/console/session'.format(node)] , stdin=devnull)
sys.exit(0)
#end of wcons
if options.tile:
null = open('/dev/null', 'w')
nodes = []
sess = synclient.Command()
for res in sess.read('/noderange/{0}/nodes/'.format(args[0])):
node = res.get('item', {}).get('href', '/').replace('/', '')
if not node:
sys.stderr.write(res.get('error', repr(res)) + '\n')
sys.exit(1)
nodes.append(node)
initial = True
in_tmux = False
pane = 0
sessname = 'nodeconsole_{0}'.format(os.getpid())
if os.environ.get("TMUX"):
initial = False
in_tmux = True
subprocess.call(['tmux', 'rename-session', sessname])
for node in sortutil.natural_sort(nodes):
panename = '{0}:{1}'.format(sessname, pane)
if initial:
initial = False
confetty_cmd = [confettypath] + automation_args + ['-m', '5', 'start', '/nodes/{0}/console/session'.format(node)]
subprocess.call(
['tmux', 'new-session', '-d', '-s',
sessname, '-x', '800', '-y',
'800', ' '.join(shlex.quote(arg) for arg in confetty_cmd)])
else:
subprocess.call(['tmux', 'select-pane', '-t', sessname])
subprocess.call(['tmux', 'set-option', '-t', panename, 'pane-border-status', 'top'], stderr=null)
confetty_cmd = [confettypath] + automation_args + ['-m', '5', 'start', '/nodes/{0}/console/session'.format(node)]
subprocess.call(
['tmux', 'split', '-h', '-t', sessname,
' '.join(shlex.quote(arg) for arg in confetty_cmd)])
subprocess.call(['tmux', 'select-layout', '-t', sessname, 'tiled'], stdout=null)
pane += 1
subprocess.call(['tmux', 'select-pane', '-t', sessname])
subprocess.call(['tmux', 'set-option', '-t', panename, 'pane-border-status', 'top'], stderr=null)
if not in_tmux:
os.execlp('tmux', 'tmux', 'attach', '-t', sessname)
else:
execl_args = [confettypath] + automation_args + ['start',
'/nodes/{0}/console/session'.format(args[0])]
os.execl(confettypath, *execl_args)