From bdb7f064d6876df6a6c5260102fc04115351f2b1 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 31 May 2024 17:22:26 -0400 Subject: [PATCH] Rework a number of subprecess calls and osdeploy Some subprocess calls were reworked to use asyncio friendly variants. Also, osdeploy initialize was checked, and reworked the ssh and tls handling. osdeploy import was also reworked to functional with async only. --- confluent_server/bin/osdeploy | 85 +++++++++----- confluent_server/confluent/certutil.py | 69 ++++++------ confluent_server/confluent/core.py | 12 +- confluent_server/confluent/discovery/core.py | 4 +- confluent_server/confluent/mountmanager.py | 16 +-- confluent_server/confluent/osimage.py | 104 ++++++++++-------- .../plugins/deployment/identimage.py | 10 +- confluent_server/confluent/sshutil.py | 61 +++++----- confluent_server/confluent/util.py | 22 +++- 9 files changed, 215 insertions(+), 168 deletions(-) diff --git a/confluent_server/bin/osdeploy b/confluent_server/bin/osdeploy index 0c6f340f..abb83d79 100644 --- a/confluent_server/bin/osdeploy +++ b/confluent_server/bin/osdeploy @@ -17,10 +17,9 @@ path = os.path.realpath(os.path.join(path, '..', 'lib', 'python')) if path.startswith('/opt'): sys.path.append(path) import confluent.collective.manager as collective -import eventlet.green.subprocess as subprocess import confluent.selfservice as selfservice import confluent.util as util -import confluent.client as client +import confluent.asynclient as client import confluent.sshutil as sshutil import confluent.certutil as certutil import confluent.netutil as netutil @@ -67,7 +66,7 @@ async def main(args): if cmdset.command == 'list': return oslist() if cmdset.command == 'import': - return osimport(cmdset.imagefile, custname=cmdset.n) + return await osimport(cmdset.imagefile, custname=cmdset.n) if cmdset.command == 'importcheck': return osimport(cmdset.imagefile, checkonly=True) if cmdset.command == 'initialize': @@ -161,7 +160,7 @@ def init_confluent_myname(): os._exit(0) -def local_node_trust_setup(): +async def local_node_trust_setup(): init_confluent_myname() allnodes, domain = selfservice.get_cluster_list() myname = collective.get_myname() @@ -187,7 +186,12 @@ def local_node_trust_setup(): with open(certfile, 'w') as certout: certout.write(cert) if restorecon: - subprocess.check_call(['/usr/sbin/restorecon', certfile]) + rcproc = await asyncio.create_subprocess_exec( + '/usr/sbin/restorecon', certfile) + rc = await rcproc.wait() + if rc != 0: + raise Exception("Failure to restorecon") + #subprocess.check_call(['/usr/sbin/restorecon', certfile]) with open('/etc/ssh/sshd_config', 'r') as sshconf: currconfig = sshconf.read().split('\n') for conline in currconfig: @@ -205,12 +209,17 @@ def local_node_trust_setup(): for node in util.natural_sort(allnodes): equivout.write(node + '\n') if restorecon: - subprocess.check_call( - ['/usr/sbin/restorecon', - '/etc/ssh/shosts.equiv', '/root/.shosts']) + rcproc = await asyncio.create_subprocess_exec( + '/usr/sbin/restorecon', '/etc/ssh/shosts.equiv', '/root/.shosts') + rc = await rcproc.wait() + if rc != 0: + raise Exception('Unable to restorecon') + #subprocess.check_call( + # ['/usr/sbin/restorecon', + # '/etc/ssh/shosts.equiv', '/root/.shosts']) -def install_tftp_content(): +async def install_tftp_content(): tftplocation = None candidates = ('/tftpboot', '/var/lib/tftpboot', '/srv/tftpboot', '/srv/tftp') for cand in candidates: @@ -225,7 +234,11 @@ def install_tftp_content(): emprint('/tftpboot is detected as tftp directory, will not try to automatically enable tftp, as it is presumed to be externally managed') else: try: - subprocess.check_call(['systemctl', 'enable', 'tftp.socket', '--now']) + tfproc = await asyncio.create_subprocess_exec('systemctl', 'enable', 'tftp.socket', '--now') + rc = await tfproc.wait() + if rc != 0: + raise Exception('{0}'.format(rc)) + #subprocess.check_call(['systemctl', 'enable', 'tftp.socket', '--now']) print('TFTP service is enabled and running') except Exception: emprint('Unable to automatically enable and start tftp.socket, tftp server may already be running outside of systemd control') @@ -301,20 +314,28 @@ async def initialize(cmdset): 'passphrase protected ssh key easier.\n') sys.exit(1) init_confluent_myname() - sshutil.initialize_root_key(False) + await sshutil.initialize_root_key(False) if cmdset.t: didsomething = True init_confluent_myname() - certutil.create_certificate() + await certutil.create_certificate() if os.path.exists('/usr/lib/systemd/system/httpd.service'): try: - subprocess.check_call(['systemctl', 'try-restart', 'httpd']) + hrproc = await asyncio.create_subprocess_exec('systemctl', 'try-restart', 'httpd') + rc = await hrproc.wait() + if rc != 0: + raise Exception('Failed restarting HTTP') + #subprocess.check_call(['systemctl', 'try-restart', 'httpd']) print('HTTP server has been restarted if it was running') except Exception: emprint('New HTTPS certificates generated, restart the web server manually') elif os.path.exists('/usr/lib/systemd/system/apache2.service'): try: - subprocess.check_call(['systemctl', 'try-restart', 'apache2']) + hrproc = await asyncio.create_subprocess_exec('systemctl', 'try-restart', 'apache2') + rc = await hrproc.wait() + if rc != 0: + raise Exception('Failed restarting HTTP') + # subprocess.check_call(['systemctl', 'try-restart', 'apache2']) print('HTTP server has been restarted if it was running') except Exception: emprint('New HTTPS certificates generated, restart the web server manually') @@ -324,20 +345,20 @@ async def initialize(cmdset): didsomething = True init_confluent_myname() try: - sshutil.initialize_ca() + await sshutil.initialize_ca() except sshutil.AlreadyExists: emprint('Skipping generation of SSH CA, already present and would likely be more problematic to regenerate than to reuse (if absolutely sure you want to discard old CA, then delete /etc/confluent/ssh/ca* and restart confluent)') if cmdset.a: didsomething = True init_confluent_myname() try: - sshutil.initialize_root_key(True, True) + await sshutil.initialize_root_key(True, True) except sshutil.AlreadyExists: emprint('Skipping generation of new automation key, already present and regeneration usually causes more problems. (If absolutely certain, delete /etc/confluent/ssh/automation* and restart confluent)') if cmdset.p: - install_tftp_content() + await install_tftp_content() if cmdset.l: - local_node_trust_setup() + await local_node_trust_setup() if cmdset.k: cas = set([]) cakeys = set([]) @@ -411,15 +432,20 @@ async def initialize(cmdset): for fname in files: topack.append(os.path.join(currd, fname)) with open(tmpname, 'wb') as initramfs: - packit = subprocess.Popen(['cpio', '-H', 'newc', '-o'], - stdout=initramfs, stdin=subprocess.PIPE) + packit = await asyncio.subprocess.create_subprocess_exec( + 'cpio', '-H', 'newc', '-o', + stdin=asyncio.subprocess.PIPE, stdout=initramfs) + #packit = subprocess.Popen(['cpio', '-H', 'newc', '-o'], + # stdout=initramfs, stdin=subprocess.PIPE) for packfile in topack: if not isinstance(packfile, bytes): packfile = packfile.encode('utf8') packit.stdin.write(packfile) packit.stdin.write(b'\n') + await packit.stdin.drain() packit.stdin.close() - res = packit.wait() + await packit.stdin.wait_closed() + res = await packit.wait() if res: sys.stderr.write('Error occurred while packing site initramfs') sys.exit(1) @@ -439,7 +465,9 @@ async def initialize(cmdset): if totar: tmptarname = tmpname.replace('cpio', 'tgz') tarcmd = ['tar', '-czf', tmptarname] + totar - subprocess.check_call(tarcmd) + tarproc = await asyncio.subprocess.create_subprocess_exec(*tarcmd) + await tarproc.wait() + #subprocess.check_call(tarcmd) os.rename(tmptarname, '/var/lib/confluent/public/site/initramfs.tgz') oum = os.umask(0o22) try: @@ -450,7 +478,9 @@ async def initialize(cmdset): print('Site initramfs content packed successfully') if not os.path.exists('/etc/confluent/srvcert.pem'): - subprocess.check_call(['collective', 'gencert']) + gcproc = await asyncio.subprocess.create_subprocess_exec('collective', 'gencert') + await gcproc.wait() + #subprocess.check_call(['collective', 'gencert']) # TODO: check selinux and segetbool for httpd_can_network_connect # httpd available and enabled? @@ -502,7 +532,7 @@ def oslist(): print("") -def osimport(imagefile, checkonly=False, custname=None): +async def osimport(imagefile, checkonly=False, custname=None): c = client.Command() imagefile = os.path.abspath(imagefile) if c.unixdomain: @@ -519,7 +549,7 @@ def osimport(imagefile, checkonly=False, custname=None): apiargs = {'filename': imagefile} if custname: apiargs['custname'] = custname - for rsp in c.create(apipath, apiargs): + async for rsp in c.create(apipath, apiargs): if 'target' in rsp: importing = True shortname = rsp['name'] @@ -544,7 +574,7 @@ def osimport(imagefile, checkonly=False, custname=None): print(repr(rsp)) try: while importing: - for rsp in c.read('/deployment/importing/{0}'.format(shortname)): + async for rsp in c.read('/deployment/importing/{0}'.format(shortname)): if 'progress' in rsp: sys.stdout.write('{0}: {1:.2f}% \r'.format(rsp['phase'], rsp['progress'])) @@ -564,7 +594,8 @@ def osimport(imagefile, checkonly=False, custname=None): time.sleep(0.5) finally: if shortname: - list(c.delete('/deployment/importing/{0}'.format(shortname))) + async for x in c.delete('/deployment/importing/{0}'.format(shortname)): + pass if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main(sys.argv)) diff --git a/confluent_server/confluent/certutil.py b/confluent_server/confluent/certutil.py index 9a478787..9b1193c4 100644 --- a/confluent_server/confluent/certutil.py +++ b/confluent_server/confluent/certutil.py @@ -4,7 +4,6 @@ import confluent.util as util from os.path import exists import shutil import socket -import eventlet.green.subprocess as subprocess import tempfile def mkdirp(targ): @@ -31,8 +30,8 @@ def normalize_uid(): raise Exception('Need to run as root or owner of /etc/confluent') return curruid -def get_ip_addresses(): - lines, _ = util.run(['ip', 'addr']) +async def get_ip_addresses(): + lines, _ = await util.check_output('ip', 'addr') if not isinstance(lines, str): lines = lines.decode('utf8') for line in lines.split('\n'): @@ -83,11 +82,11 @@ def get_certificate_paths(): return keypath, certpath -def assure_tls_ca(): +async def assure_tls_ca(): keyout, certout = ('/etc/confluent/tls/cakey.pem', '/etc/confluent/tls/cacert.pem') if not os.path.exists(certout): #create_simple_ca(keyout, certout) - create_full_ca(certout) + await create_full_ca(certout) fname = '/var/lib/confluent/public/site/tls/{0}.pem'.format( collective.get_myname()) ouid = normalize_uid() @@ -99,8 +98,8 @@ def assure_tls_ca(): raise try: shutil.copy2('/etc/confluent/tls/cacert.pem', fname) - hv, _ = util.run( - ['openssl', 'x509', '-in', '/etc/confluent/tls/cacert.pem', '-hash', '-noout']) + hv, _ = await util.check_output( + 'openssl', 'x509', '-in', '/etc/confluent/tls/cacert.pem', '-hash', '-noout') if not isinstance(hv, str): hv = hv.decode('utf8') hv = hv.strip() @@ -125,7 +124,7 @@ def substitute_cfg(setting, key, val, newval, cfgfile, line): return True return False -def create_full_ca(certout): +async def create_full_ca(certout): mkdirp('/etc/confluent/tls/ca/private') keyout = '/etc/confluent/tls/ca/private/cakey.pem' csrout = '/etc/confluent/tls/ca/ca.csr' @@ -163,23 +162,23 @@ def create_full_ca(certout): continue cfgfile.write(line.strip() + '\n') cfgfile.write('\n[CACert]\nbasicConstraints = CA:true\n\n[ca_confluent]\n') - subprocess.check_call( - ['openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out', - keyout]) - subprocess.check_call( - ['openssl', 'req', '-new', '-key', keyout, '-out', csrout, '-subj', subj]) - subprocess.check_call( - ['openssl', 'ca', '-config', newcfg, '-batch', '-selfsign', + await util.check_call( + 'openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out', + keyout) + await util.check_call( + 'openssl', 'req', '-new', '-key', keyout, '-out', csrout, '-subj', subj) + await util.check_call( + 'openssl', 'ca', '-config', newcfg, '-batch', '-selfsign', '-extensions', 'CACert', '-extfile', newcfg, '-notext', '-startdate', '19700101010101Z', '-enddate', '21000101010101Z', '-keyfile', - keyout, '-out', '/etc/confluent/tls/ca/cacert.pem', '-in', csrout] + keyout, '-out', '/etc/confluent/tls/ca/cacert.pem', '-in', csrout ) shutil.copy2('/etc/confluent/tls/ca/cacert.pem', certout) #openssl ca -config openssl.cnf -selfsign -keyfile cakey.pem -startdate 20150214120000Z -enddate 20160214120000Z #20160107071311Z -enddate 20170106071311Z -def create_simple_ca(keyout, certout): +async def create_simple_ca(keyout, certout): try: os.makedirs('/etc/confluent/tls') except OSError as e: @@ -189,39 +188,39 @@ def create_simple_ca(keyout, certout): tmphdl, tmpconfig = tempfile.mkstemp() os.close(tmphdl) shutil.copy2(sslcfg, tmpconfig) - subprocess.check_call( - ['openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out', - keyout]) + await util.check_call( + 'openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out', + keyout) try: subj = '/CN=Confluent TLS Certificate authority ({0})'.format(socket.gethostname()) if len(subj) > 68: subj = subj[:68] with open(tmpconfig, 'a') as cfgfile: cfgfile.write('\n[CACert]\nbasicConstraints = CA:true\n') - subprocess.check_call([ + await util.check_call( 'openssl', 'req', '-new', '-x509', '-key', keyout, '-days', '27300', '-out', certout, '-subj', subj, '-extensions', 'CACert', '-config', tmpconfig - ]) + ) finally: os.remove(tmpconfig) -def create_certificate(keyout=None, certout=None, csrout=None): +async def create_certificate(keyout=None, certout=None, csrout=None): if not keyout: keyout, certout = get_certificate_paths() if not keyout: raise Exception('Unable to locate TLS certificate path automatically') - assure_tls_ca() + await assure_tls_ca() shortname = socket.gethostname().split('.')[0] longname = shortname # socket.getfqdn() if not csrout: - subprocess.check_call( - ['openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out', - keyout]) - san = ['IP:{0}'.format(x) for x in get_ip_addresses()] + await util.check_call( + 'openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out', + keyout) + san = ['IP:{0}'.format(x) async for x in get_ip_addresses()] # It is incorrect to put IP addresses as DNS type. However # there exists non-compliant clients that fail with them as IP - san.extend(['DNS:{0}'.format(x) for x in get_ip_addresses()]) + san.extend(['DNS:{0}'.format(x) async for x in get_ip_addresses()]) san.append('DNS:{0}'.format(shortname)) #san.append('DNS:{0}'.format(longname)) san = ','.join(san) @@ -242,11 +241,11 @@ def create_certificate(keyout=None, certout=None, csrout=None): cfgfile.write('\n[SAN]\nsubjectAltName={0}'.format(san)) with open(extconfig, 'a') as cfgfile: cfgfile.write('\nbasicConstraints=CA:false\nsubjectAltName={0}'.format(san)) - subprocess.check_call([ + await util.check_call( 'openssl', 'req', '-new', '-key', keyout, '-out', csrout, '-subj', '/CN={0}'.format(longname), '-extensions', 'SAN', '-config', tmpconfig - ]) + ) else: # when used manually, allow the csr SAN to stand # may add explicit subj/SAN argument, in which case we would skip copy @@ -258,13 +257,13 @@ def create_certificate(keyout=None, certout=None, csrout=None): # simple style CA in effect, make a random serial number and # hope for the best, and accept inability to backdate the cert serialnum = '0x' + ''.join(['{:02x}'.format(x) for x in bytearray(os.urandom(20))]) - subprocess.check_call([ + await util.check_call( 'openssl', 'x509', '-req', '-in', csrout, '-CA', '/etc/confluent/tls/cacert.pem', '-CAkey', '/etc/confluent/tls/cakey.pem', '-set_serial', serialnum, '-out', certout, '-days', '27300', '-extfile', extconfig - ]) + ) else: # we moved to a 'proper' CA, mainly for access to backdating # start of certs for finicky system clocks @@ -279,12 +278,12 @@ def create_certificate(keyout=None, certout=None, csrout=None): os.close(tmphdl) cacfgfile = tmpcafile # with realcalock: # if we put it in server, we must lock it - subprocess.check_call([ + await util.check_call( 'openssl', 'ca', '-config', cacfgfile, '-in', csrout, '-out', certout, '-batch', '-notext', '-startdate', '19700101010101Z', '-enddate', '21000101010101Z', '-extfile', extconfig - ]) + ) finally: os.remove(tmpconfig) if needcsr: diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index f34c2554..161b43c3 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -191,7 +191,7 @@ def handle_storage(configmanager, inputdata, pathcomponents, operation): for rsp in mountmanager.handle_request(configmanager, inputdata, pathcomponents[2:], operation): yield rsp -def handle_deployment(configmanager, inputdata, pathcomponents, +async def handle_deployment(configmanager, inputdata, pathcomponents, operation): if len(pathcomponents) == 1: yield msg.ChildCollection('distributions/') @@ -228,12 +228,12 @@ def handle_deployment(configmanager, inputdata, pathcomponents, yield msg.ChildCollection('info') if operation == 'update': if 'updateboot' in inputdata: - osimage.update_boot(profname) + await osimage.update_boot(profname) yield msg.KeyValueData({'updated': profname}) return elif 'rebase' in inputdata: try: - updated, customized = osimage.rebase_profile(profname) + updated, customized = await osimage.rebase_profile(profname) except osimage.ManifestMissing: raise exc.InvalidArgumentException('Specified profile {0} does not have a manifest.yaml for rebase'.format(profname)) for upd in updated: @@ -260,10 +260,12 @@ def handle_deployment(configmanager, inputdata, pathcomponents, return elif operation == 'create': if inputdata.get('custname', None): - importer = osimage.MediaImporter(inputdata['filename'], + importer = osimage.MediaImporter() + await importer.init(inputdata['filename'], configmanager, inputdata['custname']) else: - importer = osimage.MediaImporter(inputdata['filename'], + importer = osimage.MediaImporter() + await importer.init(inputdata['filename'], configmanager) yield msg.KeyValueData({'target': importer.targpath, 'name': importer.importkey}) diff --git a/confluent_server/confluent/discovery/core.py b/confluent_server/confluent/discovery/core.py index 66055a40..0fe31f26 100644 --- a/confluent_server/confluent/discovery/core.py +++ b/confluent_server/confluent/discovery/core.py @@ -90,8 +90,6 @@ import struct #import eventlet.green.socket as socket import socket import socket as nsocket -import subprocess -#import eventlet.green.subprocess as subprocess #webclient = eventlet.import_patched('pyghmi.util.webclient') @@ -1454,7 +1452,7 @@ async def discover_node(cfg, handler, info, nodename, manual): else: bmcaddr = bmcaddr.split('/', 1)[0] await wait_for_connection(bmcaddr) - subprocess.check_call(['/opt/confluent/bin/nodeconfig', nodename] + nodeconfig) + await util.check_call('/opt/confluent/bin/nodeconfig', nodename, nodeconfig) log.log({'info': 'Configured {0} ({1})'.format(nodename, handler.devname)}) diff --git a/confluent_server/confluent/mountmanager.py b/confluent_server/confluent/mountmanager.py index 36f654d2..02ae1003 100644 --- a/confluent_server/confluent/mountmanager.py +++ b/confluent_server/confluent/mountmanager.py @@ -1,26 +1,26 @@ +import asyncio import eventlet import confluent.messages as msg import confluent.exceptions as exc import struct import eventlet.green.socket as socket -import eventlet.green.subprocess as subprocess import os mountsbyuser = {} _browserfsd = None -def assure_browserfs(): +async def assure_browserfs(): global _browserfsd if _browserfsd is None: os.makedirs('/var/run/confluent/browserfs/mount', exist_ok=True) - _browserfsd = subprocess.Popen( - ['/opt/confluent/bin/browserfs', + _browserfsd = await asyncio.subprocess.create_subprocess_exec( + '/opt/confluent/bin/browserfs', '-c', '/var/run/confluent/browserfs/control', '-s', '127.0.0.1:4006', # browserfs supports unix domain websocket, however apache reverse proxy is dicey that way in some versions - '-w', '/var/run/confluent/browserfs/mount']) + '-w', '/var/run/confluent/browserfs/mount') while not os.path.exists('/var/run/confluent/browserfs/control'): - eventlet.sleep(0.5) + await asyncio.sleep(0.5) def handle_request(configmanager, inputdata, pathcomponents, operation): @@ -50,8 +50,8 @@ def handle_request(configmanager, inputdata, pathcomponents, operation): 'authtoken': currmount['authtoken'] }) -def requestmount(subdir, filename): - assure_browserfs() +async def requestmount(subdir, filename): + await assure_browserfs() a = socket.socket(socket.AF_UNIX) a.connect('/var/run/confluent/browserfs/control') subname = subdir.encode() diff --git a/confluent_server/confluent/osimage.py b/confluent_server/confluent/osimage.py index e0c1a8cb..02363408 100644 --- a/confluent_server/confluent/osimage.py +++ b/confluent_server/confluent/osimage.py @@ -1,7 +1,6 @@ #!/usr/bin/python +import asyncio import eventlet -import eventlet.green.select as select -import eventlet.green.subprocess as subprocess from fnmatch import fnmatch import glob import logging @@ -21,6 +20,7 @@ if __name__ == '__main__': import confluent.exceptions as exc import confluent.messages as msg +import confluent.util as util COPY = 1 EXTRACT = 2 @@ -64,7 +64,7 @@ def symlink(src, targ): raise -def update_boot(profilename): +async def update_boot(profilename): if profilename.startswith('/var/lib/confluent/public'): profiledir = profilename else: @@ -78,11 +78,11 @@ def update_boot(profilename): label = profile.get('label', profname) ostype = profile.get('ostype', 'linux') if ostype == 'linux': - update_boot_linux(profiledir, profile, label) + await update_boot_linux(profiledir, profile, label) elif ostype == 'esxi': - update_boot_esxi(profiledir, profile, label) + await update_boot_esxi(profiledir, profile, label) -def update_boot_esxi(profiledir, profile, label): +async def update_boot_esxi(profiledir, profile, label): profname = os.path.basename(profiledir) kernelargs = profile.get('kernelargs', '') oum = os.umask(0o22) @@ -149,9 +149,9 @@ def update_boot_esxi(profiledir, profile, label): 'chain boot/efi/boot/bootx64.efi -c /confluent-public/os/{0}/boot/boot.cfg'.format(pname)) finally: ipxeout.close() - subprocess.check_call( - ['/opt/confluent/bin/dir2img', '{0}/boot'.format(profiledir), - '{0}/boot.img'.format(profiledir), profname], preexec_fn=relax_umask) + await util.check_call( + '/opt/confluent/bin/dir2img', '{0}/boot'.format(profiledir), + '{0}/boot.img'.format(profiledir), profname, preexec_fn=relax_umask) def find_glob(loc, fileglob): @@ -162,7 +162,7 @@ def find_glob(loc, fileglob): return None -def update_boot_linux(profiledir, profile, label): +async def update_boot_linux(profiledir, profile, label): profname = os.path.basename(profiledir) kernelargs = profile.get('kernelargs', '') grubcfg = "set timeout=5\nmenuentry '" @@ -200,9 +200,9 @@ def update_boot_linux(profiledir, profile, label): ipxeout.write('imgload kernel\nimgexec kernel\n') finally: ipxeout.close() - subprocess.check_call( - ['/opt/confluent/bin/dir2img', '{0}/boot'.format(profiledir), - '{0}/boot.img'.format(profiledir), profname], preexec_fn=relax_umask) + await util.check_call( + '/opt/confluent/bin/dir2img', '{0}/boot'.format(profiledir), + '{0}/boot.img'.format(profiledir), profname, preexec_fn=relax_umask) def extract_entries(entries, flags=0, callback=None, totalsize=None, extractlist=None): @@ -551,7 +551,7 @@ def check_rhel(isoinfo): return {'name': 'rhel-{0}-{1}'.format(ver, arch), 'method': EXTRACT, 'category': 'el{0}'.format(major)} -def scan_iso(archive): +async def scan_iso(archive): filesizes = {} filecontents = {} dfd = os.dup(archive.fileno()) @@ -561,7 +561,7 @@ def scan_iso(archive): for ent in reader: if str(ent).endswith('TRANS.TBL'): continue - eventlet.sleep(0) + await asyncio.sleep(0) filesizes[str(ent)] = ent.size if str(ent) in READFILES: filecontents[str(ent)] = b'' @@ -572,13 +572,13 @@ def scan_iso(archive): return filesizes, filecontents -def fingerprint(archive): +async def fingerprint(archive): archive.seek(0) header = archive.read(32768) archive.seek(32769) if archive.read(6) == b'CD001\x01': # ISO image - isoinfo = scan_iso(archive) + isoinfo = await scan_iso(archive) name = None for fun in globals(): if fun.startswith('check_'): @@ -599,12 +599,12 @@ def fingerprint(archive): return imginfo, None, None -def import_image(filename, callback, backend=False, mfd=None, custtargpath=None, custdistpath=None, custname=''): +async def import_image(filename, callback, backend=False, mfd=None, custtargpath=None, custdistpath=None, custname=''): if mfd: archive = os.fdopen(int(mfd), 'rb') else: archive = open(filename, 'rb') - identity = fingerprint(archive) + identity = await fingerprint(archive) if not identity: return -1 identity, imginfo, funname = identity @@ -689,17 +689,20 @@ def copy_file(src, dst): makedirs(newdir, 0o755) shutil.copy2(src, dst) -def get_hash(fname): +async def get_hash(fname): currhash = hashlib.sha512() with open(fname, 'rb') as currf: currd = currf.read(2048) + await asyncio.sleep(0) while currd: currhash.update(currd) currd = currf.read(2048) + await asyncio.sleep(0) + return currhash.hexdigest() -def rebase_profile(dirname): +async def rebase_profile(dirname): if dirname.startswith('/var/lib/confluent/public'): profiledir = dirname else: @@ -712,7 +715,7 @@ def rebase_profile(dirname): except IOError: raise ManifestMissing() distdir = manifest['distdir'] - newdisthashes = get_hashes(distdir) + newdisthashes = await get_hashes(distdir) olddisthashes = manifest['disthashes'] customized = [] newmanifest = [] @@ -737,7 +740,7 @@ def rebase_profile(dirname): newmanifest.append(updatecandidate) for nf in newmanifest: nfname = os.path.join(profiledir, nf) - currhash = get_hash(nfname) + currhash = await get_hash(nfname) manifest['disthashes'][nf] = currhash with open('{0}/manifest.yaml'.format(profiledir), 'w') as yout: yout.write('# This manifest enables rebase to know original source of profile data and if any customizations have been done\n') @@ -753,21 +756,20 @@ def rebase_profile(dirname): -def get_hashes(dirname): +async def get_hashes(dirname): hashmap = {} for dname, _, fnames in os.walk(dirname): for fname in fnames: if fname == 'profile.yaml': continue fullname = os.path.join(dname, fname) - currhash = hashlib.sha512() subname = fullname.replace(dirname + '/', '') if os.path.isfile(fullname): - hashmap[subname] = get_hash(fullname) + hashmap[subname] = await get_hash(fullname) return hashmap -def generate_stock_profiles(defprofile, distpath, targpath, osname, +async def generate_stock_profiles(defprofile, distpath, targpath, osname, profilelist, customname): osd, osversion, arch = osname.split('-') bootupdates = [] @@ -782,7 +784,7 @@ def generate_stock_profiles(defprofile, distpath, targpath, osname, continue oumask = os.umask(0o22) shutil.copytree(srcname, dirname) - hmap = get_hashes(dirname) + hmap = await get_hashes(dirname) profdata = None try: os.makedirs('{0}/boot/initramfs'.format(dirname), 0o755) @@ -819,18 +821,18 @@ def generate_stock_profiles(defprofile, distpath, targpath, osname, '/var/lib/confluent/public/site/initramfs.cpio', '{0}/boot/initramfs/site.cpio'.format(dirname)) os.symlink(distpath, '{0}/distribution'.format(dirname)) - subprocess.check_call( - ['sh', '{0}/initprofile.sh'.format(dirname), - targpath, dirname]) - bootupdates.append(eventlet.spawn(update_boot, dirname)) + await util.check_call( + 'sh', '{0}/initprofile.sh'.format(dirname), + targpath, dirname) + bootupdates.append(util.spawn(update_boot(dirname))) profilelist.append(profname) for upd in bootupdates: - upd.wait() + await upd class MediaImporter(object): - def __init__(self, media, cfm=None, customname=None, checkonly=False): + async def init(self, media, cfm=None, customname=None, checkonly=False): self.worker = None if not os.path.exists('/var/lib/confluent/public'): raise Exception('`osdeploy initialize` must be executed before importing any media') @@ -844,7 +846,7 @@ class MediaImporter(object): else: medfile = open(media, 'rb') try: - identity = fingerprint(medfile) + identity = await fingerprint(medfile) finally: if not self.medfile: medfile.close() @@ -886,27 +888,33 @@ class MediaImporter(object): importing[importkey] = self self.filename = os.path.abspath(media) self.error = '' - self.importer = eventlet.spawn(self.importmedia) + self.importer = util.spawn(self.importmedia()) def stop(self): - if self.worker and self.worker.poll() is None: + if self.worker and self.worker.returncode is None: self.worker.kill() @property def progress(self): return {'phase': self.phase, 'progress': self.percent, 'profiles': self.profiles, 'error': self.error} - def importmedia(self): + async def importmedia(self): if self.medfile: os.environ['CONFLUENT_MEDIAFD'] = '{0}'.format(self.medfile.fileno()) with open(os.devnull, 'w') as devnull: - self.worker = subprocess.Popen( - [sys.executable, __file__, self.filename, '-b', self.targpath, self.distpath, self.customname], - stdin=devnull, stdout=subprocess.PIPE, close_fds=False) + self.worker = await asyncio.create_subprocess_exec( + sys.executable, __file__, self.filename, '-b', + self.targpath, self.distpath, self.customname, + stdout=asyncio.subprocess.PIPE, close_fds=False) wkr = self.worker currline = b'' - while wkr.poll() is None: - currline += wkr.stdout.read(1) + while wkr.returncode is None: + try: + await asyncio.wait_for(wkr.wait(), 0.001) + except asyncio.TimeoutError: + pass + nb = await wkr.stdout.read(128) + currline += nb if b'\r' in currline: if b'%' in currline: val = currline.split(b'%')[0].strip() @@ -920,7 +928,7 @@ class MediaImporter(object): self.percent = 100.0 return currline = b'' - a = wkr.stdout.read(1) + a = await wkr.stdout.read(1) while a: currline += a if b'\r' in currline: @@ -935,12 +943,12 @@ class MediaImporter(object): self.phase = 'error' return currline = b'' - a = wkr.stdout.read(1) + a = await wkr.stdout.read(1) if self.oscategory: defprofile = '/opt/confluent/lib/osdeploy/{0}'.format( self.oscategory) try: - generate_stock_profiles(defprofile, self.distpath, self.targpath, + await generate_stock_profiles(defprofile, self.distpath, self.targpath, self.osname, self.profiles, self.customname) except Exception as e: self.phase = 'error' @@ -968,7 +976,7 @@ if __name__ == '__main__': os.umask(0o022) if len(sys.argv) > 2: mfd = os.environ.get('CONFLUENT_MEDIAFD', None) - sys.exit(import_image(sys.argv[1], callback=printit, backend=True, mfd=mfd, custtargpath=sys.argv[3], custdistpath=sys.argv[4], custname=sys.argv[5])) + asyncio.get_event_loop().run_until_complete(import_image(sys.argv[1], callback=printit, backend=True, mfd=mfd, custtargpath=sys.argv[3], custdistpath=sys.argv[4], custname=sys.argv[5])) else: - sys.exit(import_image(sys.argv[1], callback=printit)) + asyncio.get_event_loop().run_until_complete(import_image(sys.argv[1], callback=printit)) diff --git a/confluent_server/confluent/plugins/deployment/identimage.py b/confluent_server/confluent/plugins/deployment/identimage.py index 4959a63e..ffc8f3c6 100644 --- a/confluent_server/confluent/plugins/deployment/identimage.py +++ b/confluent_server/confluent/plugins/deployment/identimage.py @@ -20,7 +20,7 @@ # to use this. import confluent.messages as msg import confluent.netutil as netutil -import eventlet.green.subprocess as subprocess +import confluent.util as util import os import shutil import tempfile @@ -41,7 +41,7 @@ def create_apikey(): return newpass -def create_ident_image(node, configmanager): +async def create_ident_image(node, configmanager): tmpd = tempfile.mkdtemp() ident = { 'nodename': node } apikey = create_apikey() @@ -63,13 +63,13 @@ def create_ident_image(node, configmanager): imgname = '/var/lib/confluent/private/identity_images/{0}.img'.format(node) if os.path.exists(imgname): os.remove(imgname) - subprocess.check_call(['/opt/confluent/bin/dir2img', tmpd, imgname, 'cnflnt_idnt']) + await util.check_call('/opt/confluent/bin/dir2img', tmpd, imgname, 'cnflnt_idnt') shutil.rmtree(tmpd) -def update(nodes, element, configmanager, inputdata): +async def update(nodes, element, configmanager, inputdata): for node in nodes: - create_ident_image(node, configmanager) + await create_ident_image(node, configmanager) yield msg.CreatedResource( 'nodes/{0}/deployment/ident_image'.format(node)) diff --git a/confluent_server/confluent/sshutil.py b/confluent_server/confluent/sshutil.py index cf17f37a..e0d71fea 100644 --- a/confluent_server/confluent/sshutil.py +++ b/confluent_server/confluent/sshutil.py @@ -1,11 +1,10 @@ #!/usr/bin/python +import asyncio import base64 import confluent.config.configmanager as cfm import confluent.collective.manager as collective import confluent.util as util -import eventlet.green.subprocess as subprocess -import eventlet import glob import os import shutil @@ -15,12 +14,10 @@ agent_pid = None ready_keys = {} _sshver = None -def sshver(): +async def sshver(): global _sshver if _sshver is None: - p = subprocess.Popen(['ssh', '-V'], stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - _, output = p.communicate() + _, output =await util.check_output('ssh', '-V') _sshver = float(output.split()[0].split(b'_')[1].split(b'p')[0]) return _sshver @@ -34,11 +31,11 @@ def normalize_uid(): return curruid agent_starting = False -def assure_agent(): +async def assure_agent(): global agent_starting global agent_pid while agent_starting: - eventlet.sleep(0.1) + await asyncio.sleep(0.1) if agent_pid is None: try: agent_starting = True @@ -60,8 +57,8 @@ def assure_agent(): agent_starting = False return True -def get_passphrase(): - if sshver() <= 7.6: +async def get_passphrase(): + if await sshver() <= 7.6: return '' # convert the master key to base64 # for use in ssh passphrase context @@ -75,7 +72,7 @@ def get_passphrase(): class AlreadyExists(Exception): pass -def initialize_ca(): +async def initialize_ca(): ouid = normalize_uid() # if already there, skip, make warning myname = collective.get_myname() @@ -92,9 +89,9 @@ def initialize_ca(): finally: os.seteuid(ouid) comment = '{0} SSH CA'.format(myname) - subprocess.check_call( - ['ssh-keygen', '-C', comment, '-t', 'ed25519', '-f', - '/etc/confluent/ssh/ca', '-N', get_passphrase()], + await util.check_call( + 'ssh-keygen', '-C', comment, '-t', 'ed25519', '-f', + '/etc/confluent/ssh/ca', '-N', await get_passphrase(), preexec_fn=normalize_uid) ouid = normalize_uid() try: @@ -111,15 +108,15 @@ def initialize_ca(): adding_key = False -def prep_ssh_key(keyname): +async def prep_ssh_key(keyname): global adding_key while adding_key: - eventlet.sleep(0.1) + await asyncio.sleep(0.1) adding_key = True if keyname in ready_keys: adding_key = False return - if not assure_agent(): + if not await assure_agent(): ready_keys[keyname] = 1 adding_key = False return @@ -129,14 +126,14 @@ def prep_ssh_key(keyname): with open(askpass, 'w') as ap: ap.write('#!/bin/sh\necho $CONFLUENT_SSH_PASSPHRASE\nrm {0}\n'.format(askpass)) os.chmod(askpass, 0o700) - os.environ['CONFLUENT_SSH_PASSPHRASE'] = get_passphrase() + os.environ['CONFLUENT_SSH_PASSPHRASE'] = await get_passphrase() olddisplay = os.environ.get('DISPLAY', None) oldaskpass = os.environ.get('SSH_ASKPASS', None) os.environ['DISPLAY'] = 'NONE' os.environ['SSH_ASKPASS'] = askpass try: with open(os.devnull, 'wb') as devnull: - subprocess.check_output(['ssh-add', keyname], stdin=devnull, stderr=devnull) + await util.check_call('ssh-add', keyname) finally: del os.environ['CONFLUENT_SSH_PASSPHRASE'] del os.environ['DISPLAY'] @@ -150,10 +147,10 @@ def prep_ssh_key(keyname): adding_key = False shutil.rmtree(tmpdir) -def sign_host_key(pubkey, nodename, principals=()): +async def sign_host_key(pubkey, nodename, principals=()): tmpdir = tempfile.mkdtemp() try: - prep_ssh_key('/etc/confluent/ssh/ca') + await prep_ssh_key('/etc/confluent/ssh/ca') ready_keys['ca.pub'] = 1 pkeyname = os.path.join(tmpdir, 'hostkey.pub') with open(pkeyname, 'wb') as pubfile: @@ -161,25 +158,25 @@ def sign_host_key(pubkey, nodename, principals=()): principals = set(principals) principals.add(nodename) principals = ','.join(sorted(principals)) - flags = '-Us' if sshver() > 7.6 else '-s' + flags = '-Us' if await sshver() > 7.6 else '-s' keyname = '/etc/confluent/ssh/ca.pub' if flags == '-Us' else '/etc/confluent/ssh/ca' - subprocess.check_call( - ['ssh-keygen', flags, keyname, '-I', nodename, - '-n', principals, '-h', pkeyname]) + await util.check_call( + 'ssh-keygen', flags, keyname, '-I', nodename, + '-n', principals, '-h', pkeyname) certname = pkeyname.replace('.pub', '-cert.pub') with open(certname) as cert: return cert.read() finally: shutil.rmtree(tmpdir) -def initialize_root_key(generate, automation=False): +async def initialize_root_key(generate, automation=False): authorized = [] myname = collective.get_myname() alreadyexist = False for currkey in glob.glob('/root/.ssh/*.pub'): authorized.append(currkey) if generate and not authorized and not automation: - subprocess.check_call(['ssh-keygen', '-t', 'ed25519', '-f', '/root/.ssh/id_ed25519', '-N', '']) + await util.check_call('ssh-keygen', '-t', 'ed25519', '-f', '/root/.ssh/id_ed25519', '-N', '') for currkey in glob.glob('/root/.ssh/*.pub'): authorized.append(currkey) if automation and generate: @@ -194,10 +191,10 @@ def initialize_root_key(generate, automation=False): raise finally: os.seteuid(ouid) - subprocess.check_call( - ['ssh-keygen', '-t', 'ed25519', - '-f','/etc/confluent/ssh/automation', '-N', get_passphrase(), - '-C', 'Confluent Automation by {}'.format(myname)], + await util.check_call( + 'ssh-keygen', '-t', 'ed25519', + '-f','/etc/confluent/ssh/automation', '-N', await get_passphrase(), + '-C', 'Confluent Automation by {}'.format(myname), preexec_fn=normalize_uid) authorized = ['/etc/confluent/ssh/automation.pub'] ouid = normalize_uid() @@ -213,6 +210,8 @@ def initialize_root_key(generate, automation=False): suffix = 'automationpubkey' else: suffix = 'rootpubkey' + # if myname suffix is rootpubkey, and file exists, zero it + # append instead of replace for auth in authorized: shutil.copy( auth, diff --git a/confluent_server/confluent/util.py b/confluent_server/confluent/util.py index 4ba4c7eb..041d7460 100644 --- a/confluent_server/confluent/util.py +++ b/confluent_server/confluent/util.py @@ -16,6 +16,7 @@ # limitations under the License. # Various utility functions that do not neatly fit into one category or another +import asyncio import base64 import confluent.exceptions as cexc import confluent.log as log @@ -26,9 +27,9 @@ import re import socket import ssl import struct -import eventlet.green.subprocess as subprocess import asyncio import random +import subprocess def mkdirp(path, mode=0o777): @@ -39,6 +40,12 @@ def mkdirp(path, mode=0o777): raise +async def check_call(*cmd, **kwargs): + subproc = await asyncio.create_subprocess_exec(*cmd, **kwargs) + rc = await subproc.wait() + if rc != 0: + raise subprocess.CalledProcessError(rc, cmd) + async def _sleep_and_run(sleeptime, func, args): await asyncio.sleep(sleeptime) await func(*args) @@ -68,12 +75,15 @@ async def _run(coro, taskid): return ret -def run(cmd): - process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - retcode = process.poll() +async def check_output(*cmd): + process = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + stdout, stderr = await process.communicate() + retcode = process.returncode if retcode: - raise subprocess.CalledProcessError(retcode, process.args, output=stdout, stderr=stderr) + raise subprocess.CalledProcessError( + retcode, process.args, + output=stdout, stderr=stderr) return stdout, stderr