2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-01-12 02:52:30 +00:00

Rework a number of subprecess calls and osdeploy

Some subprocess calls were reworked to use asyncio friendly
variants.

Also, osdeploy initialize was checked, and reworked the ssh and tls
handling.

osdeploy import was also reworked to functional with async only.
This commit is contained in:
Jarrod Johnson
2024-05-31 17:22:26 -04:00
parent 85c8268ad8
commit bdb7f064d6
9 changed files with 215 additions and 168 deletions

View File

@@ -17,10 +17,9 @@ path = os.path.realpath(os.path.join(path, '..', 'lib', 'python'))
if path.startswith('/opt'):
sys.path.append(path)
import confluent.collective.manager as collective
import eventlet.green.subprocess as subprocess
import confluent.selfservice as selfservice
import confluent.util as util
import confluent.client as client
import confluent.asynclient as client
import confluent.sshutil as sshutil
import confluent.certutil as certutil
import confluent.netutil as netutil
@@ -67,7 +66,7 @@ async def main(args):
if cmdset.command == 'list':
return oslist()
if cmdset.command == 'import':
return osimport(cmdset.imagefile, custname=cmdset.n)
return await osimport(cmdset.imagefile, custname=cmdset.n)
if cmdset.command == 'importcheck':
return osimport(cmdset.imagefile, checkonly=True)
if cmdset.command == 'initialize':
@@ -161,7 +160,7 @@ def init_confluent_myname():
os._exit(0)
def local_node_trust_setup():
async def local_node_trust_setup():
init_confluent_myname()
allnodes, domain = selfservice.get_cluster_list()
myname = collective.get_myname()
@@ -187,7 +186,12 @@ def local_node_trust_setup():
with open(certfile, 'w') as certout:
certout.write(cert)
if restorecon:
subprocess.check_call(['/usr/sbin/restorecon', certfile])
rcproc = await asyncio.create_subprocess_exec(
'/usr/sbin/restorecon', certfile)
rc = await rcproc.wait()
if rc != 0:
raise Exception("Failure to restorecon")
#subprocess.check_call(['/usr/sbin/restorecon', certfile])
with open('/etc/ssh/sshd_config', 'r') as sshconf:
currconfig = sshconf.read().split('\n')
for conline in currconfig:
@@ -205,12 +209,17 @@ def local_node_trust_setup():
for node in util.natural_sort(allnodes):
equivout.write(node + '\n')
if restorecon:
subprocess.check_call(
['/usr/sbin/restorecon',
'/etc/ssh/shosts.equiv', '/root/.shosts'])
rcproc = await asyncio.create_subprocess_exec(
'/usr/sbin/restorecon', '/etc/ssh/shosts.equiv', '/root/.shosts')
rc = await rcproc.wait()
if rc != 0:
raise Exception('Unable to restorecon')
#subprocess.check_call(
# ['/usr/sbin/restorecon',
# '/etc/ssh/shosts.equiv', '/root/.shosts'])
def install_tftp_content():
async def install_tftp_content():
tftplocation = None
candidates = ('/tftpboot', '/var/lib/tftpboot', '/srv/tftpboot', '/srv/tftp')
for cand in candidates:
@@ -225,7 +234,11 @@ def install_tftp_content():
emprint('/tftpboot is detected as tftp directory, will not try to automatically enable tftp, as it is presumed to be externally managed')
else:
try:
subprocess.check_call(['systemctl', 'enable', 'tftp.socket', '--now'])
tfproc = await asyncio.create_subprocess_exec('systemctl', 'enable', 'tftp.socket', '--now')
rc = await tfproc.wait()
if rc != 0:
raise Exception('{0}'.format(rc))
#subprocess.check_call(['systemctl', 'enable', 'tftp.socket', '--now'])
print('TFTP service is enabled and running')
except Exception:
emprint('Unable to automatically enable and start tftp.socket, tftp server may already be running outside of systemd control')
@@ -301,20 +314,28 @@ async def initialize(cmdset):
'passphrase protected ssh key easier.\n')
sys.exit(1)
init_confluent_myname()
sshutil.initialize_root_key(False)
await sshutil.initialize_root_key(False)
if cmdset.t:
didsomething = True
init_confluent_myname()
certutil.create_certificate()
await certutil.create_certificate()
if os.path.exists('/usr/lib/systemd/system/httpd.service'):
try:
subprocess.check_call(['systemctl', 'try-restart', 'httpd'])
hrproc = await asyncio.create_subprocess_exec('systemctl', 'try-restart', 'httpd')
rc = await hrproc.wait()
if rc != 0:
raise Exception('Failed restarting HTTP')
#subprocess.check_call(['systemctl', 'try-restart', 'httpd'])
print('HTTP server has been restarted if it was running')
except Exception:
emprint('New HTTPS certificates generated, restart the web server manually')
elif os.path.exists('/usr/lib/systemd/system/apache2.service'):
try:
subprocess.check_call(['systemctl', 'try-restart', 'apache2'])
hrproc = await asyncio.create_subprocess_exec('systemctl', 'try-restart', 'apache2')
rc = await hrproc.wait()
if rc != 0:
raise Exception('Failed restarting HTTP')
# subprocess.check_call(['systemctl', 'try-restart', 'apache2'])
print('HTTP server has been restarted if it was running')
except Exception:
emprint('New HTTPS certificates generated, restart the web server manually')
@@ -324,20 +345,20 @@ async def initialize(cmdset):
didsomething = True
init_confluent_myname()
try:
sshutil.initialize_ca()
await sshutil.initialize_ca()
except sshutil.AlreadyExists:
emprint('Skipping generation of SSH CA, already present and would likely be more problematic to regenerate than to reuse (if absolutely sure you want to discard old CA, then delete /etc/confluent/ssh/ca* and restart confluent)')
if cmdset.a:
didsomething = True
init_confluent_myname()
try:
sshutil.initialize_root_key(True, True)
await sshutil.initialize_root_key(True, True)
except sshutil.AlreadyExists:
emprint('Skipping generation of new automation key, already present and regeneration usually causes more problems. (If absolutely certain, delete /etc/confluent/ssh/automation* and restart confluent)')
if cmdset.p:
install_tftp_content()
await install_tftp_content()
if cmdset.l:
local_node_trust_setup()
await local_node_trust_setup()
if cmdset.k:
cas = set([])
cakeys = set([])
@@ -411,15 +432,20 @@ async def initialize(cmdset):
for fname in files:
topack.append(os.path.join(currd, fname))
with open(tmpname, 'wb') as initramfs:
packit = subprocess.Popen(['cpio', '-H', 'newc', '-o'],
stdout=initramfs, stdin=subprocess.PIPE)
packit = await asyncio.subprocess.create_subprocess_exec(
'cpio', '-H', 'newc', '-o',
stdin=asyncio.subprocess.PIPE, stdout=initramfs)
#packit = subprocess.Popen(['cpio', '-H', 'newc', '-o'],
# stdout=initramfs, stdin=subprocess.PIPE)
for packfile in topack:
if not isinstance(packfile, bytes):
packfile = packfile.encode('utf8')
packit.stdin.write(packfile)
packit.stdin.write(b'\n')
await packit.stdin.drain()
packit.stdin.close()
res = packit.wait()
await packit.stdin.wait_closed()
res = await packit.wait()
if res:
sys.stderr.write('Error occurred while packing site initramfs')
sys.exit(1)
@@ -439,7 +465,9 @@ async def initialize(cmdset):
if totar:
tmptarname = tmpname.replace('cpio', 'tgz')
tarcmd = ['tar', '-czf', tmptarname] + totar
subprocess.check_call(tarcmd)
tarproc = await asyncio.subprocess.create_subprocess_exec(*tarcmd)
await tarproc.wait()
#subprocess.check_call(tarcmd)
os.rename(tmptarname, '/var/lib/confluent/public/site/initramfs.tgz')
oum = os.umask(0o22)
try:
@@ -450,7 +478,9 @@ async def initialize(cmdset):
print('Site initramfs content packed successfully')
if not os.path.exists('/etc/confluent/srvcert.pem'):
subprocess.check_call(['collective', 'gencert'])
gcproc = await asyncio.subprocess.create_subprocess_exec('collective', 'gencert')
await gcproc.wait()
#subprocess.check_call(['collective', 'gencert'])
# TODO: check selinux and segetbool for httpd_can_network_connect
# httpd available and enabled?
@@ -502,7 +532,7 @@ def oslist():
print("")
def osimport(imagefile, checkonly=False, custname=None):
async def osimport(imagefile, checkonly=False, custname=None):
c = client.Command()
imagefile = os.path.abspath(imagefile)
if c.unixdomain:
@@ -519,7 +549,7 @@ def osimport(imagefile, checkonly=False, custname=None):
apiargs = {'filename': imagefile}
if custname:
apiargs['custname'] = custname
for rsp in c.create(apipath, apiargs):
async for rsp in c.create(apipath, apiargs):
if 'target' in rsp:
importing = True
shortname = rsp['name']
@@ -544,7 +574,7 @@ def osimport(imagefile, checkonly=False, custname=None):
print(repr(rsp))
try:
while importing:
for rsp in c.read('/deployment/importing/{0}'.format(shortname)):
async for rsp in c.read('/deployment/importing/{0}'.format(shortname)):
if 'progress' in rsp:
sys.stdout.write('{0}: {1:.2f}% \r'.format(rsp['phase'],
rsp['progress']))
@@ -564,7 +594,8 @@ def osimport(imagefile, checkonly=False, custname=None):
time.sleep(0.5)
finally:
if shortname:
list(c.delete('/deployment/importing/{0}'.format(shortname)))
async for x in c.delete('/deployment/importing/{0}'.format(shortname)):
pass
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main(sys.argv))

View File

@@ -4,7 +4,6 @@ import confluent.util as util
from os.path import exists
import shutil
import socket
import eventlet.green.subprocess as subprocess
import tempfile
def mkdirp(targ):
@@ -31,8 +30,8 @@ def normalize_uid():
raise Exception('Need to run as root or owner of /etc/confluent')
return curruid
def get_ip_addresses():
lines, _ = util.run(['ip', 'addr'])
async def get_ip_addresses():
lines, _ = await util.check_output('ip', 'addr')
if not isinstance(lines, str):
lines = lines.decode('utf8')
for line in lines.split('\n'):
@@ -83,11 +82,11 @@ def get_certificate_paths():
return keypath, certpath
def assure_tls_ca():
async def assure_tls_ca():
keyout, certout = ('/etc/confluent/tls/cakey.pem', '/etc/confluent/tls/cacert.pem')
if not os.path.exists(certout):
#create_simple_ca(keyout, certout)
create_full_ca(certout)
await create_full_ca(certout)
fname = '/var/lib/confluent/public/site/tls/{0}.pem'.format(
collective.get_myname())
ouid = normalize_uid()
@@ -99,8 +98,8 @@ def assure_tls_ca():
raise
try:
shutil.copy2('/etc/confluent/tls/cacert.pem', fname)
hv, _ = util.run(
['openssl', 'x509', '-in', '/etc/confluent/tls/cacert.pem', '-hash', '-noout'])
hv, _ = await util.check_output(
'openssl', 'x509', '-in', '/etc/confluent/tls/cacert.pem', '-hash', '-noout')
if not isinstance(hv, str):
hv = hv.decode('utf8')
hv = hv.strip()
@@ -125,7 +124,7 @@ def substitute_cfg(setting, key, val, newval, cfgfile, line):
return True
return False
def create_full_ca(certout):
async def create_full_ca(certout):
mkdirp('/etc/confluent/tls/ca/private')
keyout = '/etc/confluent/tls/ca/private/cakey.pem'
csrout = '/etc/confluent/tls/ca/ca.csr'
@@ -163,23 +162,23 @@ def create_full_ca(certout):
continue
cfgfile.write(line.strip() + '\n')
cfgfile.write('\n[CACert]\nbasicConstraints = CA:true\n\n[ca_confluent]\n')
subprocess.check_call(
['openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out',
keyout])
subprocess.check_call(
['openssl', 'req', '-new', '-key', keyout, '-out', csrout, '-subj', subj])
subprocess.check_call(
['openssl', 'ca', '-config', newcfg, '-batch', '-selfsign',
await util.check_call(
'openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out',
keyout)
await util.check_call(
'openssl', 'req', '-new', '-key', keyout, '-out', csrout, '-subj', subj)
await util.check_call(
'openssl', 'ca', '-config', newcfg, '-batch', '-selfsign',
'-extensions', 'CACert', '-extfile', newcfg,
'-notext', '-startdate',
'19700101010101Z', '-enddate', '21000101010101Z', '-keyfile',
keyout, '-out', '/etc/confluent/tls/ca/cacert.pem', '-in', csrout]
keyout, '-out', '/etc/confluent/tls/ca/cacert.pem', '-in', csrout
)
shutil.copy2('/etc/confluent/tls/ca/cacert.pem', certout)
#openssl ca -config openssl.cnf -selfsign -keyfile cakey.pem -startdate 20150214120000Z -enddate 20160214120000Z
#20160107071311Z -enddate 20170106071311Z
def create_simple_ca(keyout, certout):
async def create_simple_ca(keyout, certout):
try:
os.makedirs('/etc/confluent/tls')
except OSError as e:
@@ -189,39 +188,39 @@ def create_simple_ca(keyout, certout):
tmphdl, tmpconfig = tempfile.mkstemp()
os.close(tmphdl)
shutil.copy2(sslcfg, tmpconfig)
subprocess.check_call(
['openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out',
keyout])
await util.check_call(
'openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out',
keyout)
try:
subj = '/CN=Confluent TLS Certificate authority ({0})'.format(socket.gethostname())
if len(subj) > 68:
subj = subj[:68]
with open(tmpconfig, 'a') as cfgfile:
cfgfile.write('\n[CACert]\nbasicConstraints = CA:true\n')
subprocess.check_call([
await util.check_call(
'openssl', 'req', '-new', '-x509', '-key', keyout, '-days',
'27300', '-out', certout, '-subj', subj,
'-extensions', 'CACert', '-config', tmpconfig
])
)
finally:
os.remove(tmpconfig)
def create_certificate(keyout=None, certout=None, csrout=None):
async def create_certificate(keyout=None, certout=None, csrout=None):
if not keyout:
keyout, certout = get_certificate_paths()
if not keyout:
raise Exception('Unable to locate TLS certificate path automatically')
assure_tls_ca()
await assure_tls_ca()
shortname = socket.gethostname().split('.')[0]
longname = shortname # socket.getfqdn()
if not csrout:
subprocess.check_call(
['openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out',
keyout])
san = ['IP:{0}'.format(x) for x in get_ip_addresses()]
await util.check_call(
'openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out',
keyout)
san = ['IP:{0}'.format(x) async for x in get_ip_addresses()]
# It is incorrect to put IP addresses as DNS type. However
# there exists non-compliant clients that fail with them as IP
san.extend(['DNS:{0}'.format(x) for x in get_ip_addresses()])
san.extend(['DNS:{0}'.format(x) async for x in get_ip_addresses()])
san.append('DNS:{0}'.format(shortname))
#san.append('DNS:{0}'.format(longname))
san = ','.join(san)
@@ -242,11 +241,11 @@ def create_certificate(keyout=None, certout=None, csrout=None):
cfgfile.write('\n[SAN]\nsubjectAltName={0}'.format(san))
with open(extconfig, 'a') as cfgfile:
cfgfile.write('\nbasicConstraints=CA:false\nsubjectAltName={0}'.format(san))
subprocess.check_call([
await util.check_call(
'openssl', 'req', '-new', '-key', keyout, '-out', csrout, '-subj',
'/CN={0}'.format(longname),
'-extensions', 'SAN', '-config', tmpconfig
])
)
else:
# when used manually, allow the csr SAN to stand
# may add explicit subj/SAN argument, in which case we would skip copy
@@ -258,13 +257,13 @@ def create_certificate(keyout=None, certout=None, csrout=None):
# simple style CA in effect, make a random serial number and
# hope for the best, and accept inability to backdate the cert
serialnum = '0x' + ''.join(['{:02x}'.format(x) for x in bytearray(os.urandom(20))])
subprocess.check_call([
await util.check_call(
'openssl', 'x509', '-req', '-in', csrout,
'-CA', '/etc/confluent/tls/cacert.pem',
'-CAkey', '/etc/confluent/tls/cakey.pem',
'-set_serial', serialnum, '-out', certout, '-days', '27300',
'-extfile', extconfig
])
)
else:
# we moved to a 'proper' CA, mainly for access to backdating
# start of certs for finicky system clocks
@@ -279,12 +278,12 @@ def create_certificate(keyout=None, certout=None, csrout=None):
os.close(tmphdl)
cacfgfile = tmpcafile
# with realcalock: # if we put it in server, we must lock it
subprocess.check_call([
await util.check_call(
'openssl', 'ca', '-config', cacfgfile,
'-in', csrout, '-out', certout, '-batch', '-notext',
'-startdate', '19700101010101Z', '-enddate', '21000101010101Z',
'-extfile', extconfig
])
)
finally:
os.remove(tmpconfig)
if needcsr:

View File

@@ -191,7 +191,7 @@ def handle_storage(configmanager, inputdata, pathcomponents, operation):
for rsp in mountmanager.handle_request(configmanager, inputdata, pathcomponents[2:], operation):
yield rsp
def handle_deployment(configmanager, inputdata, pathcomponents,
async def handle_deployment(configmanager, inputdata, pathcomponents,
operation):
if len(pathcomponents) == 1:
yield msg.ChildCollection('distributions/')
@@ -228,12 +228,12 @@ def handle_deployment(configmanager, inputdata, pathcomponents,
yield msg.ChildCollection('info')
if operation == 'update':
if 'updateboot' in inputdata:
osimage.update_boot(profname)
await osimage.update_boot(profname)
yield msg.KeyValueData({'updated': profname})
return
elif 'rebase' in inputdata:
try:
updated, customized = osimage.rebase_profile(profname)
updated, customized = await osimage.rebase_profile(profname)
except osimage.ManifestMissing:
raise exc.InvalidArgumentException('Specified profile {0} does not have a manifest.yaml for rebase'.format(profname))
for upd in updated:
@@ -260,10 +260,12 @@ def handle_deployment(configmanager, inputdata, pathcomponents,
return
elif operation == 'create':
if inputdata.get('custname', None):
importer = osimage.MediaImporter(inputdata['filename'],
importer = osimage.MediaImporter()
await importer.init(inputdata['filename'],
configmanager, inputdata['custname'])
else:
importer = osimage.MediaImporter(inputdata['filename'],
importer = osimage.MediaImporter()
await importer.init(inputdata['filename'],
configmanager)
yield msg.KeyValueData({'target': importer.targpath,
'name': importer.importkey})

View File

@@ -90,8 +90,6 @@ import struct
#import eventlet.green.socket as socket
import socket
import socket as nsocket
import subprocess
#import eventlet.green.subprocess as subprocess
#webclient = eventlet.import_patched('pyghmi.util.webclient')
@@ -1454,7 +1452,7 @@ async def discover_node(cfg, handler, info, nodename, manual):
else:
bmcaddr = bmcaddr.split('/', 1)[0]
await wait_for_connection(bmcaddr)
subprocess.check_call(['/opt/confluent/bin/nodeconfig', nodename] + nodeconfig)
await util.check_call('/opt/confluent/bin/nodeconfig', nodename, nodeconfig)
log.log({'info': 'Configured {0} ({1})'.format(nodename,
handler.devname)})

View File

@@ -1,26 +1,26 @@
import asyncio
import eventlet
import confluent.messages as msg
import confluent.exceptions as exc
import struct
import eventlet.green.socket as socket
import eventlet.green.subprocess as subprocess
import os
mountsbyuser = {}
_browserfsd = None
def assure_browserfs():
async def assure_browserfs():
global _browserfsd
if _browserfsd is None:
os.makedirs('/var/run/confluent/browserfs/mount', exist_ok=True)
_browserfsd = subprocess.Popen(
['/opt/confluent/bin/browserfs',
_browserfsd = await asyncio.subprocess.create_subprocess_exec(
'/opt/confluent/bin/browserfs',
'-c', '/var/run/confluent/browserfs/control',
'-s', '127.0.0.1:4006',
# browserfs supports unix domain websocket, however apache reverse proxy is dicey that way in some versions
'-w', '/var/run/confluent/browserfs/mount'])
'-w', '/var/run/confluent/browserfs/mount')
while not os.path.exists('/var/run/confluent/browserfs/control'):
eventlet.sleep(0.5)
await asyncio.sleep(0.5)
def handle_request(configmanager, inputdata, pathcomponents, operation):
@@ -50,8 +50,8 @@ def handle_request(configmanager, inputdata, pathcomponents, operation):
'authtoken': currmount['authtoken']
})
def requestmount(subdir, filename):
assure_browserfs()
async def requestmount(subdir, filename):
await assure_browserfs()
a = socket.socket(socket.AF_UNIX)
a.connect('/var/run/confluent/browserfs/control')
subname = subdir.encode()

View File

@@ -1,7 +1,6 @@
#!/usr/bin/python
import asyncio
import eventlet
import eventlet.green.select as select
import eventlet.green.subprocess as subprocess
from fnmatch import fnmatch
import glob
import logging
@@ -21,6 +20,7 @@ if __name__ == '__main__':
import confluent.exceptions as exc
import confluent.messages as msg
import confluent.util as util
COPY = 1
EXTRACT = 2
@@ -64,7 +64,7 @@ def symlink(src, targ):
raise
def update_boot(profilename):
async def update_boot(profilename):
if profilename.startswith('/var/lib/confluent/public'):
profiledir = profilename
else:
@@ -78,11 +78,11 @@ def update_boot(profilename):
label = profile.get('label', profname)
ostype = profile.get('ostype', 'linux')
if ostype == 'linux':
update_boot_linux(profiledir, profile, label)
await update_boot_linux(profiledir, profile, label)
elif ostype == 'esxi':
update_boot_esxi(profiledir, profile, label)
await update_boot_esxi(profiledir, profile, label)
def update_boot_esxi(profiledir, profile, label):
async def update_boot_esxi(profiledir, profile, label):
profname = os.path.basename(profiledir)
kernelargs = profile.get('kernelargs', '')
oum = os.umask(0o22)
@@ -149,9 +149,9 @@ def update_boot_esxi(profiledir, profile, label):
'chain boot/efi/boot/bootx64.efi -c /confluent-public/os/{0}/boot/boot.cfg'.format(pname))
finally:
ipxeout.close()
subprocess.check_call(
['/opt/confluent/bin/dir2img', '{0}/boot'.format(profiledir),
'{0}/boot.img'.format(profiledir), profname], preexec_fn=relax_umask)
await util.check_call(
'/opt/confluent/bin/dir2img', '{0}/boot'.format(profiledir),
'{0}/boot.img'.format(profiledir), profname, preexec_fn=relax_umask)
def find_glob(loc, fileglob):
@@ -162,7 +162,7 @@ def find_glob(loc, fileglob):
return None
def update_boot_linux(profiledir, profile, label):
async def update_boot_linux(profiledir, profile, label):
profname = os.path.basename(profiledir)
kernelargs = profile.get('kernelargs', '')
grubcfg = "set timeout=5\nmenuentry '"
@@ -200,9 +200,9 @@ def update_boot_linux(profiledir, profile, label):
ipxeout.write('imgload kernel\nimgexec kernel\n')
finally:
ipxeout.close()
subprocess.check_call(
['/opt/confluent/bin/dir2img', '{0}/boot'.format(profiledir),
'{0}/boot.img'.format(profiledir), profname], preexec_fn=relax_umask)
await util.check_call(
'/opt/confluent/bin/dir2img', '{0}/boot'.format(profiledir),
'{0}/boot.img'.format(profiledir), profname, preexec_fn=relax_umask)
def extract_entries(entries, flags=0, callback=None, totalsize=None, extractlist=None):
@@ -551,7 +551,7 @@ def check_rhel(isoinfo):
return {'name': 'rhel-{0}-{1}'.format(ver, arch), 'method': EXTRACT, 'category': 'el{0}'.format(major)}
def scan_iso(archive):
async def scan_iso(archive):
filesizes = {}
filecontents = {}
dfd = os.dup(archive.fileno())
@@ -561,7 +561,7 @@ def scan_iso(archive):
for ent in reader:
if str(ent).endswith('TRANS.TBL'):
continue
eventlet.sleep(0)
await asyncio.sleep(0)
filesizes[str(ent)] = ent.size
if str(ent) in READFILES:
filecontents[str(ent)] = b''
@@ -572,13 +572,13 @@ def scan_iso(archive):
return filesizes, filecontents
def fingerprint(archive):
async def fingerprint(archive):
archive.seek(0)
header = archive.read(32768)
archive.seek(32769)
if archive.read(6) == b'CD001\x01':
# ISO image
isoinfo = scan_iso(archive)
isoinfo = await scan_iso(archive)
name = None
for fun in globals():
if fun.startswith('check_'):
@@ -599,12 +599,12 @@ def fingerprint(archive):
return imginfo, None, None
def import_image(filename, callback, backend=False, mfd=None, custtargpath=None, custdistpath=None, custname=''):
async def import_image(filename, callback, backend=False, mfd=None, custtargpath=None, custdistpath=None, custname=''):
if mfd:
archive = os.fdopen(int(mfd), 'rb')
else:
archive = open(filename, 'rb')
identity = fingerprint(archive)
identity = await fingerprint(archive)
if not identity:
return -1
identity, imginfo, funname = identity
@@ -689,17 +689,20 @@ def copy_file(src, dst):
makedirs(newdir, 0o755)
shutil.copy2(src, dst)
def get_hash(fname):
async def get_hash(fname):
currhash = hashlib.sha512()
with open(fname, 'rb') as currf:
currd = currf.read(2048)
await asyncio.sleep(0)
while currd:
currhash.update(currd)
currd = currf.read(2048)
await asyncio.sleep(0)
return currhash.hexdigest()
def rebase_profile(dirname):
async def rebase_profile(dirname):
if dirname.startswith('/var/lib/confluent/public'):
profiledir = dirname
else:
@@ -712,7 +715,7 @@ def rebase_profile(dirname):
except IOError:
raise ManifestMissing()
distdir = manifest['distdir']
newdisthashes = get_hashes(distdir)
newdisthashes = await get_hashes(distdir)
olddisthashes = manifest['disthashes']
customized = []
newmanifest = []
@@ -737,7 +740,7 @@ def rebase_profile(dirname):
newmanifest.append(updatecandidate)
for nf in newmanifest:
nfname = os.path.join(profiledir, nf)
currhash = get_hash(nfname)
currhash = await get_hash(nfname)
manifest['disthashes'][nf] = currhash
with open('{0}/manifest.yaml'.format(profiledir), 'w') as yout:
yout.write('# This manifest enables rebase to know original source of profile data and if any customizations have been done\n')
@@ -753,21 +756,20 @@ def rebase_profile(dirname):
def get_hashes(dirname):
async def get_hashes(dirname):
hashmap = {}
for dname, _, fnames in os.walk(dirname):
for fname in fnames:
if fname == 'profile.yaml':
continue
fullname = os.path.join(dname, fname)
currhash = hashlib.sha512()
subname = fullname.replace(dirname + '/', '')
if os.path.isfile(fullname):
hashmap[subname] = get_hash(fullname)
hashmap[subname] = await get_hash(fullname)
return hashmap
def generate_stock_profiles(defprofile, distpath, targpath, osname,
async def generate_stock_profiles(defprofile, distpath, targpath, osname,
profilelist, customname):
osd, osversion, arch = osname.split('-')
bootupdates = []
@@ -782,7 +784,7 @@ def generate_stock_profiles(defprofile, distpath, targpath, osname,
continue
oumask = os.umask(0o22)
shutil.copytree(srcname, dirname)
hmap = get_hashes(dirname)
hmap = await get_hashes(dirname)
profdata = None
try:
os.makedirs('{0}/boot/initramfs'.format(dirname), 0o755)
@@ -819,18 +821,18 @@ def generate_stock_profiles(defprofile, distpath, targpath, osname,
'/var/lib/confluent/public/site/initramfs.cpio',
'{0}/boot/initramfs/site.cpio'.format(dirname))
os.symlink(distpath, '{0}/distribution'.format(dirname))
subprocess.check_call(
['sh', '{0}/initprofile.sh'.format(dirname),
targpath, dirname])
bootupdates.append(eventlet.spawn(update_boot, dirname))
await util.check_call(
'sh', '{0}/initprofile.sh'.format(dirname),
targpath, dirname)
bootupdates.append(util.spawn(update_boot(dirname)))
profilelist.append(profname)
for upd in bootupdates:
upd.wait()
await upd
class MediaImporter(object):
def __init__(self, media, cfm=None, customname=None, checkonly=False):
async def init(self, media, cfm=None, customname=None, checkonly=False):
self.worker = None
if not os.path.exists('/var/lib/confluent/public'):
raise Exception('`osdeploy initialize` must be executed before importing any media')
@@ -844,7 +846,7 @@ class MediaImporter(object):
else:
medfile = open(media, 'rb')
try:
identity = fingerprint(medfile)
identity = await fingerprint(medfile)
finally:
if not self.medfile:
medfile.close()
@@ -886,27 +888,33 @@ class MediaImporter(object):
importing[importkey] = self
self.filename = os.path.abspath(media)
self.error = ''
self.importer = eventlet.spawn(self.importmedia)
self.importer = util.spawn(self.importmedia())
def stop(self):
if self.worker and self.worker.poll() is None:
if self.worker and self.worker.returncode is None:
self.worker.kill()
@property
def progress(self):
return {'phase': self.phase, 'progress': self.percent, 'profiles': self.profiles, 'error': self.error}
def importmedia(self):
async def importmedia(self):
if self.medfile:
os.environ['CONFLUENT_MEDIAFD'] = '{0}'.format(self.medfile.fileno())
with open(os.devnull, 'w') as devnull:
self.worker = subprocess.Popen(
[sys.executable, __file__, self.filename, '-b', self.targpath, self.distpath, self.customname],
stdin=devnull, stdout=subprocess.PIPE, close_fds=False)
self.worker = await asyncio.create_subprocess_exec(
sys.executable, __file__, self.filename, '-b',
self.targpath, self.distpath, self.customname,
stdout=asyncio.subprocess.PIPE, close_fds=False)
wkr = self.worker
currline = b''
while wkr.poll() is None:
currline += wkr.stdout.read(1)
while wkr.returncode is None:
try:
await asyncio.wait_for(wkr.wait(), 0.001)
except asyncio.TimeoutError:
pass
nb = await wkr.stdout.read(128)
currline += nb
if b'\r' in currline:
if b'%' in currline:
val = currline.split(b'%')[0].strip()
@@ -920,7 +928,7 @@ class MediaImporter(object):
self.percent = 100.0
return
currline = b''
a = wkr.stdout.read(1)
a = await wkr.stdout.read(1)
while a:
currline += a
if b'\r' in currline:
@@ -935,12 +943,12 @@ class MediaImporter(object):
self.phase = 'error'
return
currline = b''
a = wkr.stdout.read(1)
a = await wkr.stdout.read(1)
if self.oscategory:
defprofile = '/opt/confluent/lib/osdeploy/{0}'.format(
self.oscategory)
try:
generate_stock_profiles(defprofile, self.distpath, self.targpath,
await generate_stock_profiles(defprofile, self.distpath, self.targpath,
self.osname, self.profiles, self.customname)
except Exception as e:
self.phase = 'error'
@@ -968,7 +976,7 @@ if __name__ == '__main__':
os.umask(0o022)
if len(sys.argv) > 2:
mfd = os.environ.get('CONFLUENT_MEDIAFD', None)
sys.exit(import_image(sys.argv[1], callback=printit, backend=True, mfd=mfd, custtargpath=sys.argv[3], custdistpath=sys.argv[4], custname=sys.argv[5]))
asyncio.get_event_loop().run_until_complete(import_image(sys.argv[1], callback=printit, backend=True, mfd=mfd, custtargpath=sys.argv[3], custdistpath=sys.argv[4], custname=sys.argv[5]))
else:
sys.exit(import_image(sys.argv[1], callback=printit))
asyncio.get_event_loop().run_until_complete(import_image(sys.argv[1], callback=printit))

View File

@@ -20,7 +20,7 @@
# to use this.
import confluent.messages as msg
import confluent.netutil as netutil
import eventlet.green.subprocess as subprocess
import confluent.util as util
import os
import shutil
import tempfile
@@ -41,7 +41,7 @@ def create_apikey():
return newpass
def create_ident_image(node, configmanager):
async def create_ident_image(node, configmanager):
tmpd = tempfile.mkdtemp()
ident = { 'nodename': node }
apikey = create_apikey()
@@ -63,13 +63,13 @@ def create_ident_image(node, configmanager):
imgname = '/var/lib/confluent/private/identity_images/{0}.img'.format(node)
if os.path.exists(imgname):
os.remove(imgname)
subprocess.check_call(['/opt/confluent/bin/dir2img', tmpd, imgname, 'cnflnt_idnt'])
await util.check_call('/opt/confluent/bin/dir2img', tmpd, imgname, 'cnflnt_idnt')
shutil.rmtree(tmpd)
def update(nodes, element, configmanager, inputdata):
async def update(nodes, element, configmanager, inputdata):
for node in nodes:
create_ident_image(node, configmanager)
await create_ident_image(node, configmanager)
yield msg.CreatedResource(
'nodes/{0}/deployment/ident_image'.format(node))

View File

@@ -1,11 +1,10 @@
#!/usr/bin/python
import asyncio
import base64
import confluent.config.configmanager as cfm
import confluent.collective.manager as collective
import confluent.util as util
import eventlet.green.subprocess as subprocess
import eventlet
import glob
import os
import shutil
@@ -15,12 +14,10 @@ agent_pid = None
ready_keys = {}
_sshver = None
def sshver():
async def sshver():
global _sshver
if _sshver is None:
p = subprocess.Popen(['ssh', '-V'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
_, output = p.communicate()
_, output =await util.check_output('ssh', '-V')
_sshver = float(output.split()[0].split(b'_')[1].split(b'p')[0])
return _sshver
@@ -34,11 +31,11 @@ def normalize_uid():
return curruid
agent_starting = False
def assure_agent():
async def assure_agent():
global agent_starting
global agent_pid
while agent_starting:
eventlet.sleep(0.1)
await asyncio.sleep(0.1)
if agent_pid is None:
try:
agent_starting = True
@@ -60,8 +57,8 @@ def assure_agent():
agent_starting = False
return True
def get_passphrase():
if sshver() <= 7.6:
async def get_passphrase():
if await sshver() <= 7.6:
return ''
# convert the master key to base64
# for use in ssh passphrase context
@@ -75,7 +72,7 @@ def get_passphrase():
class AlreadyExists(Exception):
pass
def initialize_ca():
async def initialize_ca():
ouid = normalize_uid()
# if already there, skip, make warning
myname = collective.get_myname()
@@ -92,9 +89,9 @@ def initialize_ca():
finally:
os.seteuid(ouid)
comment = '{0} SSH CA'.format(myname)
subprocess.check_call(
['ssh-keygen', '-C', comment, '-t', 'ed25519', '-f',
'/etc/confluent/ssh/ca', '-N', get_passphrase()],
await util.check_call(
'ssh-keygen', '-C', comment, '-t', 'ed25519', '-f',
'/etc/confluent/ssh/ca', '-N', await get_passphrase(),
preexec_fn=normalize_uid)
ouid = normalize_uid()
try:
@@ -111,15 +108,15 @@ def initialize_ca():
adding_key = False
def prep_ssh_key(keyname):
async def prep_ssh_key(keyname):
global adding_key
while adding_key:
eventlet.sleep(0.1)
await asyncio.sleep(0.1)
adding_key = True
if keyname in ready_keys:
adding_key = False
return
if not assure_agent():
if not await assure_agent():
ready_keys[keyname] = 1
adding_key = False
return
@@ -129,14 +126,14 @@ def prep_ssh_key(keyname):
with open(askpass, 'w') as ap:
ap.write('#!/bin/sh\necho $CONFLUENT_SSH_PASSPHRASE\nrm {0}\n'.format(askpass))
os.chmod(askpass, 0o700)
os.environ['CONFLUENT_SSH_PASSPHRASE'] = get_passphrase()
os.environ['CONFLUENT_SSH_PASSPHRASE'] = await get_passphrase()
olddisplay = os.environ.get('DISPLAY', None)
oldaskpass = os.environ.get('SSH_ASKPASS', None)
os.environ['DISPLAY'] = 'NONE'
os.environ['SSH_ASKPASS'] = askpass
try:
with open(os.devnull, 'wb') as devnull:
subprocess.check_output(['ssh-add', keyname], stdin=devnull, stderr=devnull)
await util.check_call('ssh-add', keyname)
finally:
del os.environ['CONFLUENT_SSH_PASSPHRASE']
del os.environ['DISPLAY']
@@ -150,10 +147,10 @@ def prep_ssh_key(keyname):
adding_key = False
shutil.rmtree(tmpdir)
def sign_host_key(pubkey, nodename, principals=()):
async def sign_host_key(pubkey, nodename, principals=()):
tmpdir = tempfile.mkdtemp()
try:
prep_ssh_key('/etc/confluent/ssh/ca')
await prep_ssh_key('/etc/confluent/ssh/ca')
ready_keys['ca.pub'] = 1
pkeyname = os.path.join(tmpdir, 'hostkey.pub')
with open(pkeyname, 'wb') as pubfile:
@@ -161,25 +158,25 @@ def sign_host_key(pubkey, nodename, principals=()):
principals = set(principals)
principals.add(nodename)
principals = ','.join(sorted(principals))
flags = '-Us' if sshver() > 7.6 else '-s'
flags = '-Us' if await sshver() > 7.6 else '-s'
keyname = '/etc/confluent/ssh/ca.pub' if flags == '-Us' else '/etc/confluent/ssh/ca'
subprocess.check_call(
['ssh-keygen', flags, keyname, '-I', nodename,
'-n', principals, '-h', pkeyname])
await util.check_call(
'ssh-keygen', flags, keyname, '-I', nodename,
'-n', principals, '-h', pkeyname)
certname = pkeyname.replace('.pub', '-cert.pub')
with open(certname) as cert:
return cert.read()
finally:
shutil.rmtree(tmpdir)
def initialize_root_key(generate, automation=False):
async def initialize_root_key(generate, automation=False):
authorized = []
myname = collective.get_myname()
alreadyexist = False
for currkey in glob.glob('/root/.ssh/*.pub'):
authorized.append(currkey)
if generate and not authorized and not automation:
subprocess.check_call(['ssh-keygen', '-t', 'ed25519', '-f', '/root/.ssh/id_ed25519', '-N', ''])
await util.check_call('ssh-keygen', '-t', 'ed25519', '-f', '/root/.ssh/id_ed25519', '-N', '')
for currkey in glob.glob('/root/.ssh/*.pub'):
authorized.append(currkey)
if automation and generate:
@@ -194,10 +191,10 @@ def initialize_root_key(generate, automation=False):
raise
finally:
os.seteuid(ouid)
subprocess.check_call(
['ssh-keygen', '-t', 'ed25519',
'-f','/etc/confluent/ssh/automation', '-N', get_passphrase(),
'-C', 'Confluent Automation by {}'.format(myname)],
await util.check_call(
'ssh-keygen', '-t', 'ed25519',
'-f','/etc/confluent/ssh/automation', '-N', await get_passphrase(),
'-C', 'Confluent Automation by {}'.format(myname),
preexec_fn=normalize_uid)
authorized = ['/etc/confluent/ssh/automation.pub']
ouid = normalize_uid()
@@ -213,6 +210,8 @@ def initialize_root_key(generate, automation=False):
suffix = 'automationpubkey'
else:
suffix = 'rootpubkey'
# if myname suffix is rootpubkey, and file exists, zero it
# append instead of replace
for auth in authorized:
shutil.copy(
auth,

View File

@@ -16,6 +16,7 @@
# limitations under the License.
# Various utility functions that do not neatly fit into one category or another
import asyncio
import base64
import confluent.exceptions as cexc
import confluent.log as log
@@ -26,9 +27,9 @@ import re
import socket
import ssl
import struct
import eventlet.green.subprocess as subprocess
import asyncio
import random
import subprocess
def mkdirp(path, mode=0o777):
@@ -39,6 +40,12 @@ def mkdirp(path, mode=0o777):
raise
async def check_call(*cmd, **kwargs):
subproc = await asyncio.create_subprocess_exec(*cmd, **kwargs)
rc = await subproc.wait()
if rc != 0:
raise subprocess.CalledProcessError(rc, cmd)
async def _sleep_and_run(sleeptime, func, args):
await asyncio.sleep(sleeptime)
await func(*args)
@@ -68,12 +75,15 @@ async def _run(coro, taskid):
return ret
def run(cmd):
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
retcode = process.poll()
async def check_output(*cmd):
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await process.communicate()
retcode = process.returncode
if retcode:
raise subprocess.CalledProcessError(retcode, process.args, output=stdout, stderr=stderr)
raise subprocess.CalledProcessError(
retcode, process.args,
output=stdout, stderr=stderr)
return stdout, stderr