mirror of
https://github.com/xcat2/confluent.git
synced 2026-01-11 18:42:29 +00:00
126 lines
4.8 KiB
Python
126 lines
4.8 KiB
Python
#!/usr/bin/python3
|
|
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import glob
|
|
import shutil
|
|
import shlex
|
|
import subprocess
|
|
import select
|
|
|
|
sys.path.append('/opt/lib/confluent/python')
|
|
|
|
import confluent.sortutil as sortutil
|
|
import confluent.client as client
|
|
|
|
|
|
def prep_outdir(node):
|
|
tmpdir = tempfile.mkdtemp()
|
|
for certfile in glob.glob('/var/lib/confluent/public/site/tls/*.pem'):
|
|
basename = os.path.basename(certfile)
|
|
destfile = os.path.join(tmpdir, basename)
|
|
shutil.copy2(certfile, destfile)
|
|
subprocess.check_call(shlex.split(f'confetty set /nodes/{node}/deployment/ident_image=create'))
|
|
shutil.copy2(f'/var/lib/confluent/private/identity_files/{node}.json', os.path.join(tmpdir, 'identity.json'))
|
|
return tmpdir
|
|
|
|
def exec_bfb_install(host, nodetorshim, bfbfile, installprocs, pipedesc, all, poller):
|
|
remotedir = subprocess.check_output(shlex.split(f'ssh {host} mktemp -d /tmp/bfb.XXXXXX')).decode().strip()
|
|
bfbbasename = os.path.basename(bfbfile)
|
|
subprocess.check_call(shlex.split(f'rsync -avz --info=progress2 {bfbfile} {host}:{remotedir}/{bfbbasename}'))
|
|
subprocess.check_call(shlex.split(f'rsync -avc --info=progress2 /opt/lib/confluent/osdeploy/bluefield/hostscripts/ {host}:{remotedir}/'))
|
|
for node in nodetorshim:
|
|
rshim = nodetorshim[node]
|
|
nodeoutdir = prep_outdir(node)
|
|
nodeprofile = subprocess.check_output(shlex.split(f'nodeattrib {node} deployment.pendingprofile')).decode().strip().split(':', 2)[2].strip()
|
|
shutil.copy2(f'/var/lib/confluent/public/os/{nodeprofile}/bfb.cfg.template', os.path.join(nodeoutdir, 'bfb.cfg.template'))
|
|
subprocess.check_call(shlex.split(f'rsync -avz {nodeoutdir}/ {host}:{remotedir}/{node}/'))
|
|
shutil.rmtree(nodeoutdir)
|
|
run_cmdv(node, shlex.split(f'ssh {host} sh /etc/confluent/functions confluentpython {remotedir}/bfb-autoinstall {node} {remotedir}/{bfbbasename} {rshim}'), all, poller, pipedesc)
|
|
|
|
|
|
def run_cmdv(node, cmdv, all, poller, pipedesc):
|
|
try:
|
|
nopen = subprocess.Popen(
|
|
cmdv, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
except OSError as e:
|
|
if e.errno == 2:
|
|
sys.stderr.write('{0}: Unable to find local executable file "{1}"\n'.format(node, cmdv[0]))
|
|
return
|
|
raise
|
|
pipedesc[nopen.stdout.fileno()] = {'node': node, 'popen': nopen,
|
|
'type': 'stdout', 'file': nopen.stdout}
|
|
pipedesc[nopen.stderr.fileno()] = {'node': node, 'popen': nopen,
|
|
'type': 'stderr', 'file': nopen.stderr}
|
|
all.add(nopen.stdout)
|
|
poller.register(nopen.stdout, select.EPOLLIN)
|
|
all.add(nopen.stderr)
|
|
poller.register(nopen.stderr, select.EPOLLIN)
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
|
if len(sys.argv) < 3:
|
|
print(f'Usage: {sys.argv[0]} <host> <bfbfile> <node1:rshim1> [<node2:rshim2> ...]')
|
|
sys.exit(1)
|
|
|
|
host = sys.argv[1]
|
|
bfbfile = sys.argv[2]
|
|
nodetorshim = {}
|
|
for arg in sys.argv[3:]:
|
|
node, rshim = arg.split(':')
|
|
nodetorshim[node] = rshim
|
|
|
|
installprocs = {}
|
|
pipedesc = {}
|
|
all = set()
|
|
poller = select.epoll()
|
|
|
|
exec_bfb_install(host, nodetorshim, bfbfile, installprocs, pipedesc, all, poller)
|
|
rdy = poller.poll(10)
|
|
pendingexecs = []
|
|
exitcode = 0
|
|
while all:
|
|
pernodeout = {}
|
|
for r in rdy:
|
|
r = r[0]
|
|
desc = pipedesc[r]
|
|
r = desc['file']
|
|
node = desc['node']
|
|
data = True
|
|
singlepoller = select.epoll()
|
|
singlepoller.register(r, select.EPOLLIN)
|
|
while data and singlepoller.poll(0):
|
|
data = r.readline()
|
|
if data:
|
|
if desc['type'] == 'stdout':
|
|
if node not in pernodeout:
|
|
pernodeout[node] = []
|
|
pernodeout[node].append(data)
|
|
else:
|
|
data = client.stringify(data)
|
|
sys.stderr.write('{0}: {1}'.format(node, data))
|
|
sys.stderr.flush()
|
|
else:
|
|
pop = desc['popen']
|
|
ret = pop.poll()
|
|
if ret is not None:
|
|
exitcode = exitcode | ret
|
|
all.discard(r)
|
|
poller.unregister(r)
|
|
r.close()
|
|
if desc['type'] == 'stdout' and pendingexecs:
|
|
node, cmdv = pendingexecs.popleft()
|
|
run_cmdv(node, cmdv, all, poller, pipedesc)
|
|
singlepoller.close()
|
|
for node in sortutil.natural_sort(pernodeout):
|
|
for line in pernodeout[node]:
|
|
line = client.stringify(line)
|
|
sys.stdout.write('{0}: {1}'.format(node, line))
|
|
sys.stdout.flush()
|
|
if all:
|
|
rdy = poller.poll(10)
|
|
|
|
|