From a537a7ba5c1e17c83fd5de38c2d09d6e99746606 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 11 May 2026 13:55:54 -0400 Subject: [PATCH] Provide automation facility for nodeconsole Allow nodeconsole to walk console according to a script --- confluent_client/bin/confetty | 100 +++++++++++++++++++++++++++++++ confluent_client/bin/nodeconsole | 24 +++++--- 2 files changed, 116 insertions(+), 8 deletions(-) diff --git a/confluent_client/bin/confetty b/confluent_client/bin/confetty index 149d62ff..483e59b6 100755 --- a/confluent_client/bin/confetty +++ b/confluent_client/bin/confetty @@ -637,6 +637,7 @@ def startconsole(nodename): currfl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL) fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, currfl | os.O_NONBLOCK) inconsole = True + check_automation('') # give any leading 'sends' a chance def quitconfetty(code=0, fullexit=False, fixterm=True): @@ -854,6 +855,52 @@ def check_escape_seq(currinput, filehandle): currinput += filehandle.read() return currinput +automation_directives = [] +current_automation_directive = None +automation_map = { + '': '\x1b[A', + '': '\x1b[B', + '': '\x1b[C', + '': '\x1b[D', + '': '\r', + '': '\x1b', + '': '\t', +} + +def parse_automation_script(script): + global current_automation_directive + for line in script.splitlines(): + line = line.strip() + if not line or line.startswith('#'): + continue + if line.startswith('exit'): + automation_directives.append(('exit', None)) + continue + if ' ' not in line: + sys.stderr.write("Invalid line in automation script: %s\n" % line) + continue + directive, arg = line.split(' ', 1) + directive = directive.strip().lower() + if directive not in ('expect', 'send', 'forget'): + sys.stderr.write("Unknown directive in automation script: %s\n" % directive) + continue + arg = arg.strip() + if arg[0] not in ('"', "'"): + arg = '"' + arg + '"' + if arg[0] == "'" and arg[-1] == "'": + # do not process '<>' sequences in single quotes + arg = arg[1:-1] + arg = bytes(arg, "utf-8").decode("unicode_escape") + elif arg[0] == '"' and arg[-1] == '"': + arg = bytes(arg[1:-1], "utf-8").decode("unicode_escape") + for key, value in automation_map.items(): + arg = arg.replace(key, value) + arg = re.sub(r'', lambda m: os.environ[m.group(1)], arg) + automation_directives.append((directive, arg)) + if automation_directives: + current_automation_directive = automation_directives.pop(0) + + parser = optparse.OptionParser() parser.add_option("-s", "--server", dest="netserver", help="Confluent instance to connect to", @@ -861,12 +908,19 @@ parser.add_option("-s", "--server", dest="netserver", parser.add_option("-c", "--control", dest="controlpath", help="Path to offer terminal control", metavar="PATH") +parser.add_option('-a', '--automation', type='string', default=None, + help='Specify an automation script to run', metavar='SCRIPT') parser.add_option( '-m', '--mintime', default=0, help='Minimum time to run or else pause for input (used to keep a ' 'terminal from closing quickly on error)') opts, shellargs = parser.parse_args() +if opts.automation: + with open(opts.automation) as f: + parse_automation_script(f.read()) + + username = None passphrase = None def server_connect(): @@ -986,6 +1040,49 @@ fgcolor = None bgcolor = None fgshifted = False pendseq = '' +automation_check = '' + +def check_automation(data): + global automation_check + global current_automation_directive + if type(data) != str: + data = data.decode('utf-8', errors='ignore') + while current_automation_directive: + if current_automation_directive[0] == 'forget' and data: + current_automation_directive = None + automation_check = '' + if automation_directives: + current_automation_directive = automation_directives.pop(0) + return + if current_automation_directive[0] == 'expect': + expected = current_automation_directive[1] + combined = automation_check + data + if expected and expected in combined: + data = data[combined.rindex(expected) + len(expected):] + current_automation_directive = None + automation_check = '' + if automation_directives: + current_automation_directive = automation_directives.pop(0) + else: + # Check if there's potential start of expected data in the incoming data + combined = automation_check + data + automation_check = '' + for i in range(1, min(len(expected), len(combined)) + 1): + if expected.startswith(combined[-i:]): + automation_check = combined[-i:] + return # wait for next check + elif current_automation_directive[0] == 'send': + data = '' + automation_check = '' + if current_automation_directive[1]: + tlvdata.send(session.connection, current_automation_directive[1]) + current_automation_directive = None + if automation_directives: + current_automation_directive = automation_directives.pop(0) + elif current_automation_directive[0] == 'exit': + return True + return False + def consume_termdata(fh, bufferonly=False): global clearpowermessage global fgcolor, bgcolor, fgshifted, pendseq @@ -997,6 +1094,7 @@ def consume_termdata(fh, bufferonly=False): updatestatus(data) return '' if data is not None: + shouldexit = check_automation(data) indata = pendseq + client.stringify(data) pendseq = '' data = '' @@ -1083,6 +1181,8 @@ def consume_termdata(fh, bufferonly=False): # this scenario comfortable that it # will come out soon enough pass + if shouldexit: + quitconfetty(fullexit=True) else: deadline = 5 connected = False diff --git a/confluent_client/bin/nodeconsole b/confluent_client/bin/nodeconsole index 0a0c5409..7a49496c 100755 --- a/confluent_client/bin/nodeconsole +++ b/confluent_client/bin/nodeconsole @@ -20,6 +20,7 @@ import asyncio import base64 import optparse import os +import shlex import subprocess import sys path = os.path.dirname(os.path.realpath(__file__)) @@ -67,6 +68,8 @@ argparser = optparse.OptionParser( "ctrl-'e', then release ctrl, then 'c', then '?' for a full list. " "For example, ctrl-'e', then 'c', then '.' will exit the current " "console") +argparser.add_option('-a', '--automation', type='string', default=None, + help='Specify an automation script') argparser.add_option('-t', '--tile', action='store_true', default=False, help='Tile console windows in the terminal') argparser.add_option('-l', '--log', action='store_true', default=False, @@ -105,6 +108,10 @@ argparser.add_option('-w','--windowed', action='store_true', default=False, (options, args) = argparser.parse_args() +automation_args = [] +if options.automation: + automation_args = ['-a', options.automation] + oldtcattr = None oldfl = None @@ -1170,7 +1177,7 @@ if options.windowed: firstnode=nodes[0] nodes.pop(0) with open(os.devnull, 'wb') as devnull: - xopen=subprocess.Popen(envlist + [confettypath, '-c', '/tmp/controlpath-{0}'.format(firstnode), '-m', '5', 'start', '/nodes/{0}/console/session'.format(firstnode) ] , stdin=devnull) + xopen=subprocess.Popen(envlist + [confettypath, '-c', '/tmp/controlpath-{0}'.format(firstnode)] + automation_args + ['-m', '5', 'start', '/nodes/{0}/console/session'.format(firstnode)] , stdin=devnull) time.sleep(2) s=socket.socket(socket.AF_UNIX) winid='' @@ -1242,7 +1249,7 @@ if options.windowed: else: pass with open(os.devnull, 'wb') as devnull: - xopen=subprocess.Popen(envlist + [confettypath, '-c', '/tmp/controlpath-{0}'.format(node), '-m', '5', 'start', '/nodes/{0}/console/session'.format(node)] , stdin=devnull) + xopen=subprocess.Popen(envlist + [confettypath, '-c', '/tmp/controlpath-{0}'.format(node)] + automation_args + ['-m', '5', 'start', '/nodes/{0}/console/session'.format(node)] , stdin=devnull) sys.exit(0) #end of wcons if options.tile: @@ -1267,18 +1274,18 @@ if options.tile: panename = '{0}:{1}'.format(sessname, pane) if initial: initial = False + confetty_cmd = [confettypath] + automation_args + ['-m', '5', 'start', '/nodes/{0}/console/session'.format(node)] subprocess.call( ['tmux', 'new-session', '-d', '-s', sessname, '-x', '800', '-y', - '800', '{0} -m 5 start /nodes/{1}/console/session'.format( - confettypath, node)]) + '800', ' '.join(shlex.quote(arg) for arg in confetty_cmd)]) else: subprocess.call(['tmux', 'select-pane', '-t', sessname]) subprocess.call(['tmux', 'set-option', '-t', panename, 'pane-border-status', 'top'], stderr=null) + confetty_cmd = [confettypath] + automation_args + ['-m', '5', 'start', '/nodes/{0}/console/session'.format(node)] subprocess.call( ['tmux', 'split', '-h', '-t', sessname, - '{0} -m 5 start /nodes/{1}/console/session'.format( - confettypath, node)]) + ' '.join(shlex.quote(arg) for arg in confetty_cmd)]) subprocess.call(['tmux', 'select-layout', '-t', sessname, 'tiled'], stdout=null) pane += 1 subprocess.call(['tmux', 'select-pane', '-t', sessname]) @@ -1286,5 +1293,6 @@ if options.tile: if not in_tmux: os.execlp('tmux', 'tmux', 'attach', '-t', sessname) else: - os.execl(confettypath, confettypath, 'start', - '/nodes/{0}/console/session'.format(args[0])) + execl_args = [confettypath] + automation_args + ['start', + '/nodes/{0}/console/session'.format(args[0])] + os.execl(confettypath, *execl_args)