2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-01-11 18:42:29 +00:00
Files
2025-11-20 16:44:24 -05:00

126 lines
4.8 KiB
Python

#!/usr/bin/python3
import os
import sys
import tempfile
import glob
import shutil
import shlex
import subprocess
import select
sys.path.append('/opt/lib/confluent/python')
import confluent.sortutil as sortutil
import confluent.client as client
def prep_outdir(node):
tmpdir = tempfile.mkdtemp()
for certfile in glob.glob('/var/lib/confluent/public/site/tls/*.pem'):
basename = os.path.basename(certfile)
destfile = os.path.join(tmpdir, basename)
shutil.copy2(certfile, destfile)
subprocess.check_call(shlex.split(f'confetty set /nodes/{node}/deployment/ident_image=create'))
shutil.copy2(f'/var/lib/confluent/private/identity_files/{node}.json', os.path.join(tmpdir, 'identity.json'))
return tmpdir
def exec_bfb_install(host, nodetorshim, bfbfile, installprocs, pipedesc, all, poller):
remotedir = subprocess.check_output(shlex.split(f'ssh {host} mktemp -d /tmp/bfb.XXXXXX')).decode().strip()
bfbbasename = os.path.basename(bfbfile)
subprocess.check_call(shlex.split(f'rsync -avz --info=progress2 {bfbfile} {host}:{remotedir}/{bfbbasename}'))
subprocess.check_call(shlex.split(f'rsync -avc --info=progress2 /opt/lib/confluent/osdeploy/bluefield/hostscripts/ {host}:{remotedir}/'))
for node in nodetorshim:
rshim = nodetorshim[node]
nodeoutdir = prep_outdir(node)
nodeprofile = subprocess.check_output(shlex.split(f'nodeattrib {node} deployment.pendingprofile')).decode().strip().split(':', 2)[2].strip()
shutil.copy2(f'/var/lib/confluent/public/os/{nodeprofile}/bfb.cfg.template', os.path.join(nodeoutdir, 'bfb.cfg.template'))
subprocess.check_call(shlex.split(f'rsync -avz {nodeoutdir}/ {host}:{remotedir}/{node}/'))
shutil.rmtree(nodeoutdir)
run_cmdv(node, shlex.split(f'ssh {host} sh /etc/confluent/functions confluentpython {remotedir}/bfb-autoinstall {node} {remotedir}/{bfbbasename} {rshim}'), all, poller, pipedesc)
def run_cmdv(node, cmdv, all, poller, pipedesc):
try:
nopen = subprocess.Popen(
cmdv, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError as e:
if e.errno == 2:
sys.stderr.write('{0}: Unable to find local executable file "{1}"\n'.format(node, cmdv[0]))
return
raise
pipedesc[nopen.stdout.fileno()] = {'node': node, 'popen': nopen,
'type': 'stdout', 'file': nopen.stdout}
pipedesc[nopen.stderr.fileno()] = {'node': node, 'popen': nopen,
'type': 'stderr', 'file': nopen.stderr}
all.add(nopen.stdout)
poller.register(nopen.stdout, select.EPOLLIN)
all.add(nopen.stderr)
poller.register(nopen.stderr, select.EPOLLIN)
if __name__ == '__main__':
if len(sys.argv) < 3:
print(f'Usage: {sys.argv[0]} <host> <bfbfile> <node1:rshim1> [<node2:rshim2> ...]')
sys.exit(1)
host = sys.argv[1]
bfbfile = sys.argv[2]
nodetorshim = {}
for arg in sys.argv[3:]:
node, rshim = arg.split(':')
nodetorshim[node] = rshim
installprocs = {}
pipedesc = {}
all = set()
poller = select.epoll()
exec_bfb_install(host, nodetorshim, bfbfile, installprocs, pipedesc, all, poller)
rdy = poller.poll(10)
pendingexecs = []
exitcode = 0
while all:
pernodeout = {}
for r in rdy:
r = r[0]
desc = pipedesc[r]
r = desc['file']
node = desc['node']
data = True
singlepoller = select.epoll()
singlepoller.register(r, select.EPOLLIN)
while data and singlepoller.poll(0):
data = r.readline()
if data:
if desc['type'] == 'stdout':
if node not in pernodeout:
pernodeout[node] = []
pernodeout[node].append(data)
else:
data = client.stringify(data)
sys.stderr.write('{0}: {1}'.format(node, data))
sys.stderr.flush()
else:
pop = desc['popen']
ret = pop.poll()
if ret is not None:
exitcode = exitcode | ret
all.discard(r)
poller.unregister(r)
r.close()
if desc['type'] == 'stdout' and pendingexecs:
node, cmdv = pendingexecs.popleft()
run_cmdv(node, cmdv, all, poller, pipedesc)
singlepoller.close()
for node in sortutil.natural_sort(pernodeout):
for line in pernodeout[node]:
line = client.stringify(line)
sys.stdout.write('{0}: {1}'.format(node, line))
sys.stdout.flush()
if all:
rdy = poller.poll(10)