2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-03-28 21:23:28 +00:00

Fix confluent_selfcheck for asyncio

Most dramatically, rework to avoid os.fork, which
ruins threading and by extension the getaddrinfo behavior.
This commit is contained in:
Jarrod Johnson
2026-03-19 17:21:30 -04:00
parent 07a6eb32ed
commit 40da956a06

View File

@@ -23,6 +23,7 @@ import pwd
import signal
import confluent.collective.manager as collective
import confluent.noderange as noderange
import subprocess
def check_sysctl_tuning():
with open('/proc/sys/net/ipv4/tcp_sack', 'r') as f:
@@ -73,7 +74,7 @@ def webserver_listening():
return False
def certificates_missing_ips(conn):
async def certificates_missing_ips(conn):
# check if the tls can verify by the right CAs, then further
# check if all ip addresses are in the certificate offered
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
@@ -83,9 +84,8 @@ def certificates_missing_ips(conn):
sock = ctx.wrap_socket(conn)
crt = sock.getpeercert()
sans = crt.get('subjectAltName', [])
ips = certutil.get_ip_addresses()
missing_ips = []
for ip in ips:
async for ip in certutil.get_ip_addresses():
for san in sans:
field, val = san
if val[-1] == '\n':
@@ -174,6 +174,61 @@ async def lookup_node(node):
return await cloop.getaddrinfo(node, 0)
except Exception:
return None
async def check_ssh_to_node(targsships):
sshutil.ready_keys = {}
sshutil.agent_pid = None
cuser = pwd.getpwnam('confluent')
os.setgid(cuser.pw_gid)
os.setuid(cuser.pw_uid)
await sshutil.prep_ssh_key('/etc/confluent/ssh/automation')
for targ in targsships:
srun = subprocess.run(
['ssh', '-Tn', '-o', 'BatchMode=yes', '-l', 'root',
'-o', 'StrictHostKeyChecking=yes', targ, 'true'],
stdin=subprocess.DEVNULL, stderr=subprocess.PIPE)
if srun.returncode == 0:
print(f'Confluent automation access to {targ} seems OK')
else:
if b'Host key verification failed' in srun.stderr:
emprint(f'Confluent ssh unable to verify host key for {targ}, check /etc/ssh/ssh_known_hosts. (Example resolution: osdeploy initialize -k)')
elif b'ermission denied' in srun.stderr:
emprint(f'Confluent user unable to ssh in to {targ}, check /root/.ssh/authorized_keys on the target system versus /etc/confluent/ssh/automation.pub (Example resolution: osdeploy initialize -a)')
else:
emprint('Unknown error attempting confluent automation ssh:')
sys.stderr.buffer.write(srun.stderr)
if sshutil.agent_pid:
os.kill(int(sshutil.agent_pid), signal.SIGTERM)
sys.exit(0)
async def check_confluent_ssh():
sshutil.ready_keys = {}
sshutil.agent_pid = None
cuser = pwd.getpwnam('confluent')
os.setgid(cuser.pw_gid)
os.setuid(cuser.pw_uid)
fprint('Checking SSH Certificate authority: ')
try:
await sshutil.prep_ssh_key('/etc/confluent/ssh/ca')
print('OK')
except Exception as e:
if type(e).__name__ == 'CalledProcessError' and 'UNPROTECTED' in e.stderr.decode():
emprint('Permissions incorrect on /etc/confluent/ssh/ca (Example resolution: chmod 600 /etc/confluent/ssh/ca)')
else:
emprint('Failed to load SSH authority key, deployed servers will not have host certificates for known_hosts and users may be unable to ssh between nodes without a password (Example resolution: osdeploy initialize -s)')
fprint('Checking confluent SSH automation key: ')
try:
await sshutil.prep_ssh_key('/etc/confluent/ssh/automation')
print('OK')
except Exception as e:
if type(e).__name__ == 'CalledProcessError' and 'UNPROTECTED' in e.stderr.decode():
emprint('Permissions incorrect on /etc/confluent/ssh/automation (Example resolution: chmod 600 /etc/confluent/ssh/automation)')
else:
emprint('Failed to load confluent automation key, syncfiles and profile ansible plays will not work (Example resolution: osdeploy initialize -a)')
if sshutil.agent_pid:
os.kill(int(sshutil.agent_pid), signal.SIGTERM)
sys.exit(0)
async def main():
ap = argparse.ArgumentParser(description='Run configuration checks for a system running confluent service')
@@ -198,7 +253,7 @@ async def main():
if conn:
print('Running')
fprint('Web Certificate: ')
cert = certificates_missing_ips(conn)
cert = await certificates_missing_ips(conn)
if cert:
cert = ', '.join(cert)
emprint('Addresses missing from certificate: {0} (Example resolution: osdeploy initialize -t)'.format(cert))
@@ -252,37 +307,8 @@ async def main():
emprint('No matching public key found for root user (Example resolution: osdeploy initialize -u)')
else:
emprint('No trusted ssh keys for root user, passwordless SSH from managers to nodes may not work (Example resolution: osdeploy initialize -u)')
if sshutil.sshver() > 7.6:
child = os.fork()
if child > 0:
pid, extcode = os.waitpid(child, 0)
else:
sshutil.ready_keys = {}
sshutil.agent_pid = None
cuser = pwd.getpwnam('confluent')
os.setgid(cuser.pw_gid)
os.setuid(cuser.pw_uid)
fprint('Checking SSH Certificate authority: ')
try:
sshutil.prep_ssh_key('/etc/confluent/ssh/ca')
print('OK')
except Exception as e:
if type(e).__name__ == 'CalledProcessError' and 'UNPROTECTED' in e.stderr.decode():
emprint('Permissions incorrect on /etc/confluent/ssh/ca (Example resolution: chmod 600 /etc/confluent/ssh/ca)')
else:
emprint('Failed to load SSH authority key, deployed servers will not have host certificates for known_hosts and users may be unable to ssh between nodes without a password (Example resolution: osdeploy initialize -s)')
fprint('Checking confluent SSH automation key: ')
try:
sshutil.prep_ssh_key('/etc/confluent/ssh/automation')
print('OK')
except Exception as e:
if type(e).__name__ == 'CalledProcessError' and 'UNPROTECTED' in e.stderr.decode():
emprint('Permissions incorrect on /etc/confluent/ssh/automation (Example resolution: chmod 600 /etc/confluent/ssh/automation)')
else:
emprint('Failed to load confluent automation key, syncfiles and profile ansible plays will not work (Example resolution: osdeploy initialize -a)')
if sshutil.agent_pid:
os.kill(int(sshutil.agent_pid), signal.SIGTERM)
sys.exit(0)
if await sshutil.sshver() > 7.6:
subprocess.run([sys.executable, __file__, '--check-ssh'])
fprint('Checking for blocked insecure boot: ')
if insecure_boot_attempts():
emprint('Some nodes are attempting network boot using PXE or HTTP boot, but the node is not configured to allow this (Example resolution: nodegroupattrib everything deployment.useinsecureprotocols=firmware)')
@@ -304,8 +330,8 @@ async def main():
allok = True
uuidok = False
macok = False
valid_nodes = [node['item']['href'][:-1] async for node in sess.read('/nodes/')] #get all valid nodes
async for rsp in sess.read(f'/nodes/{args.node}/attributes/all'):
valid_nodes = [node['item']['href'][:-1] for node in sess.read('/nodes/')] #get all valid nodes
for rsp in sess.read(f'/nodes/{args.node}/attributes/all'):
if rsp.get('errorcode', None) == 404:
emprint(f'There is no node named "{args.node}"')
allok = False
@@ -399,34 +425,7 @@ async def main():
print("OK")
if args.automation:
print(f'Checking confluent automation access to {args.node}...')
child = os.fork()
if child > 0:
pid, extcode = os.waitpid(child, 0)
else:
sshutil.ready_keys = {}
sshutil.agent_pid = None
cuser = pwd.getpwnam('confluent')
os.setgid(cuser.pw_gid)
os.setuid(cuser.pw_uid)
sshutil.prep_ssh_key('/etc/confluent/ssh/automation')
for targ in targsships:
srun = subprocess.run(
['ssh', '-Tn', '-o', 'BatchMode=yes', '-l', 'root',
'-o', 'StrictHostKeyChecking=yes', targ, 'true'],
stdin=subprocess.DEVNULL, stderr=subprocess.PIPE)
if srun.returncode == 0:
print(f'Confluent automation access to {targ} seems OK')
else:
if b'Host key verification failed' in srun.stderr:
emprint(f'Confluent ssh unable to verify host key for {targ}, check /etc/ssh/ssh_known_hosts. (Example resolution: osdeploy initialize -k)')
elif b'ermission denied' in srun.stderr:
emprint(f'Confluent user unable to ssh in to {targ}, check /root/.ssh/authorized_keys on the target system versus /etc/confluent/ssh/automation.pub (Example resolution: osdeploy initialize -a)')
else:
emprint('Unknown error attempting confluent automation ssh:')
sys.stderr.buffer.write(srun.stderr)
if sshutil.agent_pid:
os.kill(int(sshutil.agent_pid), signal.SIGTERM)
sys.exit(0)
subprocess.run([sys.executable, __file__, '--check-ssh'] + targsships)
else:
print("Skipping node checks, no node specified (Example: confluent_selfcheck -n n1)")
# possible checks:
@@ -434,4 +433,10 @@ async def main():
# arping -D for mgt own ip addresses? check for dupes, also check for bleed through from one nic to another
# iterate through profiles, use mtools to extract site initramfs, check if outdated
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
if len(sys.argv) >= 2 and sys.argv[1] == '--check-ssh':
if len(sys.argv) >= 3:
asyncio.run(check_ssh_to_node(sys.argv[2:]))
else:
asyncio.run(check_confluent_ssh())
else:
asyncio.run(main())