2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-01-11 18:42:29 +00:00
Files
confluent/confluent_server/confluent/sshutil.py
Jarrod Johnson 595b628e08 Validate that the agent socket actually works
If agent is 'kill -9', then recover
from that by reaping the now dead socket.
2025-08-26 14:00:36 -04:00

258 lines
8.4 KiB
Python

#!/usr/bin/python
import base64
import confluent.config.configmanager as cfm
import confluent.collective.manager as collective
import confluent.util as util
import eventlet.green.subprocess as subprocess
import eventlet.green.socket as socket
import eventlet
import glob
import eventlet.green.os as os
import shutil
import tempfile
agent_pid = None
ready_keys = {}
_sshver = None
def sshver():
global _sshver
if _sshver is None:
p = subprocess.Popen(['ssh', '-V'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
_, output = p.communicate()
_sshver = float(output.split()[0].split(b'_')[1].split(b'p')[0])
return _sshver
def normalize_uid():
curruid = os.geteuid()
neededuid = os.stat('/etc/confluent').st_uid
if curruid != neededuid:
os.seteuid(neededuid)
if os.geteuid() != neededuid:
raise Exception('Need to run as root or owner of /etc/confluent')
return curruid
agent_starting = False
def assure_agent():
global agent_starting
global agent_pid
while agent_starting:
eventlet.sleep(0.1)
if agent_pid is None:
try:
agent_starting = True
sai = util.run(['ssh-agent'])[0]
for line in sai.split(b'\n'):
if b';' not in line:
continue
line, _ = line.split(b';', 1)
if b'=' not in line:
continue
k, v = line.split(b'=', 1)
if not isinstance(k, str):
k = k.decode('utf8')
v = v.decode('utf8')
if k == 'SSH_AGENT_PID':
agent_pid = int(v)
os.environ[k] = v
finally:
agent_starting = False
return True
def get_passphrase():
if sshver() <= 7.6:
return ''
# convert the master key to base64
# for use in ssh passphrase context
if cfm._masterkey is None:
cfm.init_masterkey()
phrase = base64.b64encode(cfm._masterkey)
if not isinstance(phrase, str):
phrase = phrase.decode('utf8')
return phrase
class AlreadyExists(Exception):
pass
def initialize_ca():
ouid = normalize_uid()
# if already there, skip, make warning
myname = collective.get_myname()
if os.path.exists('/etc/confluent/ssh/ca.pub'):
cafilename = '/var/lib/confluent/public/site/ssh/{0}.ca'.format(myname)
shutil.copy('/etc/confluent/ssh/ca.pub', cafilename)
os.seteuid(ouid)
raise AlreadyExists()
try:
os.makedirs('/etc/confluent/ssh', mode=0o700)
except OSError as e:
if e.errno != 17:
raise
finally:
os.seteuid(ouid)
comment = '{0} SSH CA'.format(myname)
subprocess.check_call(
['ssh-keygen', '-C', comment, '-t', 'ed25519', '-f',
'/etc/confluent/ssh/ca', '-N', get_passphrase()],
preexec_fn=normalize_uid)
ouid = normalize_uid()
try:
try:
os.makedirs('/var/lib/confluent/public/site/ssh/', mode=0o755)
except OSError as e:
if e.errno != 17:
raise
cafilename = '/var/lib/confluent/public/site/ssh/{0}.ca'.format(myname)
shutil.copy('/etc/confluent/ssh/ca.pub', cafilename)
finally:
os.seteuid(ouid)
# newent = '@cert-authority * ' + capub.read()
adding_key = False
def prep_ssh_key(keyname):
global adding_key
global agent_pid
while adding_key:
eventlet.sleep(0.1)
adding_key = True
if agent_pid:
if os.path.exists(os.environ['SSH_AUTH_SOCK']):
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(os.environ['SSH_AUTH_SOCK'])
except Exception:
os.unlink(os.environ['SSH_AUTH_SOCK'])
os.rmdir(os.path.dirname(os.environ['SSH_AUTH_SOCK']))
finally:
sock.close()
if not os.path.exists(os.environ['SSH_AUTH_SOCK']):
agent_pid = None
ready_keys.clear()
if keyname in ready_keys:
adding_key = False
return
if not assure_agent():
ready_keys[keyname] = 1
adding_key = False
return
tmpdir = tempfile.mkdtemp()
try:
askpass = os.path.join(tmpdir, 'askpass.sh')
with open(askpass, 'w') as ap:
ap.write('#!/bin/sh\necho $CONFLUENT_SSH_PASSPHRASE\nrm {0}\n'.format(askpass))
os.chmod(askpass, 0o700)
os.environ['CONFLUENT_SSH_PASSPHRASE'] = get_passphrase()
olddisplay = os.environ.get('DISPLAY', None)
oldaskpass = os.environ.get('SSH_ASKPASS', None)
os.environ['DISPLAY'] = 'NONE'
os.environ['SSH_ASKPASS'] = askpass
try:
with open(os.devnull, 'wb') as devnull:
subprocess.check_output(['ssh-add', keyname], stdin=devnull, stderr=subprocess.PIPE)
finally:
del os.environ['CONFLUENT_SSH_PASSPHRASE']
del os.environ['DISPLAY']
del os.environ['SSH_ASKPASS']
if olddisplay:
os.environ['DISPLAY'] = olddisplay
if oldaskpass:
os.environ['SSH_ASKPASS'] = oldaskpass
ready_keys[keyname] = 1
finally:
adding_key = False
shutil.rmtree(tmpdir)
def sign_host_key(pubkey, nodename, principals=()):
tmpdir = tempfile.mkdtemp()
try:
prep_ssh_key('/etc/confluent/ssh/ca')
ready_keys['ca.pub'] = 1
pkeyname = os.path.join(tmpdir, 'hostkey.pub')
with open(pkeyname, 'wb') as pubfile:
pubfile.write(pubkey)
principals = set(principals)
principals.add(nodename)
principals = ','.join(sorted(principals))
flags = '-Us' if sshver() > 7.6 else '-s'
keyname = '/etc/confluent/ssh/ca.pub' if flags == '-Us' else '/etc/confluent/ssh/ca'
subprocess.check_call(
['ssh-keygen', flags, keyname, '-I', nodename,
'-n', principals, '-h', pkeyname])
certname = pkeyname.replace('.pub', '-cert.pub')
with open(certname) as cert:
return cert.read()
finally:
shutil.rmtree(tmpdir)
def initialize_root_key(generate, automation=False):
authorized = []
myname = collective.get_myname()
alreadyexist = False
for currkey in glob.glob('/root/.ssh/*.pub'):
authorized.append(currkey)
if generate and not authorized and not automation:
subprocess.check_call(['ssh-keygen', '-t', 'ed25519', '-f', '/root/.ssh/id_ed25519', '-N', ''])
for currkey in glob.glob('/root/.ssh/*.pub'):
authorized.append(currkey)
if automation and generate:
if os.path.exists('/etc/confluent/ssh/automation'):
alreadyexist = True
else:
ouid = normalize_uid()
try:
os.makedirs('/etc/confluent/ssh', mode=0o700)
except OSError as e:
if e.errno != 17:
raise
finally:
os.seteuid(ouid)
subprocess.check_call(
['ssh-keygen', '-t', 'ed25519',
'-f','/etc/confluent/ssh/automation', '-N', get_passphrase(),
'-C', 'Confluent Automation by {}'.format(myname)],
preexec_fn=normalize_uid)
authorized = ['/etc/confluent/ssh/automation.pub']
ouid = normalize_uid()
try:
os.makedirs('/var/lib/confluent/public/site/ssh', mode=0o755)
except OSError as e:
if e.errno != 17:
raise
finally:
os.seteuid(ouid)
neededuid = os.stat('/etc/confluent').st_uid
if automation:
suffix = 'automationpubkey'
else:
suffix = 'rootpubkey'
keyname = '/var/lib/confluent/public/site/ssh/{0}.{1}'.format(
myname, suffix)
if authorized:
with open(keyname, 'w'):
pass
for auth in authorized:
with open(auth, 'r') as local_key:
with open(keyname, 'a') as dest:
dest.write(local_key.read())
if os.path.exists(keyname):
os.chmod(keyname, 0o644)
os.chown(keyname, neededuid, -1)
if alreadyexist:
raise AlreadyExists()
def ca_exists():
return os.path.exists('/etc/confluent/ssh/ca')
if __name__ == '__main__':
initialize_root_key(True)
if not ca_exists():
initialize_ca()
print(repr(sign_host_key(open('/etc/ssh/ssh_host_ed25519_key.pub').read(), collective.get_myname())))