diff --git a/confluent_server/bin/osdeploy b/confluent_server/bin/osdeploy index 0c6f340f..abb83d79 100644 --- a/confluent_server/bin/osdeploy +++ b/confluent_server/bin/osdeploy @@ -17,10 +17,9 @@ path = os.path.realpath(os.path.join(path, '..', 'lib', 'python')) if path.startswith('/opt'): sys.path.append(path) import confluent.collective.manager as collective -import eventlet.green.subprocess as subprocess import confluent.selfservice as selfservice import confluent.util as util -import confluent.client as client +import confluent.asynclient as client import confluent.sshutil as sshutil import confluent.certutil as certutil import confluent.netutil as netutil @@ -67,7 +66,7 @@ async def main(args): if cmdset.command == 'list': return oslist() if cmdset.command == 'import': - return osimport(cmdset.imagefile, custname=cmdset.n) + return await osimport(cmdset.imagefile, custname=cmdset.n) if cmdset.command == 'importcheck': return osimport(cmdset.imagefile, checkonly=True) if cmdset.command == 'initialize': @@ -161,7 +160,7 @@ def init_confluent_myname(): os._exit(0) -def local_node_trust_setup(): +async def local_node_trust_setup(): init_confluent_myname() allnodes, domain = selfservice.get_cluster_list() myname = collective.get_myname() @@ -187,7 +186,12 @@ def local_node_trust_setup(): with open(certfile, 'w') as certout: certout.write(cert) if restorecon: - subprocess.check_call(['/usr/sbin/restorecon', certfile]) + rcproc = await asyncio.create_subprocess_exec( + '/usr/sbin/restorecon', certfile) + rc = await rcproc.wait() + if rc != 0: + raise Exception("Failure to restorecon") + #subprocess.check_call(['/usr/sbin/restorecon', certfile]) with open('/etc/ssh/sshd_config', 'r') as sshconf: currconfig = sshconf.read().split('\n') for conline in currconfig: @@ -205,12 +209,17 @@ def local_node_trust_setup(): for node in util.natural_sort(allnodes): equivout.write(node + '\n') if restorecon: - subprocess.check_call( - ['/usr/sbin/restorecon', - '/etc/ssh/shosts.equiv', '/root/.shosts']) + rcproc = await asyncio.create_subprocess_exec( + '/usr/sbin/restorecon', '/etc/ssh/shosts.equiv', '/root/.shosts') + rc = await rcproc.wait() + if rc != 0: + raise Exception('Unable to restorecon') + #subprocess.check_call( + # ['/usr/sbin/restorecon', + # '/etc/ssh/shosts.equiv', '/root/.shosts']) -def install_tftp_content(): +async def install_tftp_content(): tftplocation = None candidates = ('/tftpboot', '/var/lib/tftpboot', '/srv/tftpboot', '/srv/tftp') for cand in candidates: @@ -225,7 +234,11 @@ def install_tftp_content(): emprint('/tftpboot is detected as tftp directory, will not try to automatically enable tftp, as it is presumed to be externally managed') else: try: - subprocess.check_call(['systemctl', 'enable', 'tftp.socket', '--now']) + tfproc = await asyncio.create_subprocess_exec('systemctl', 'enable', 'tftp.socket', '--now') + rc = await tfproc.wait() + if rc != 0: + raise Exception('{0}'.format(rc)) + #subprocess.check_call(['systemctl', 'enable', 'tftp.socket', '--now']) print('TFTP service is enabled and running') except Exception: emprint('Unable to automatically enable and start tftp.socket, tftp server may already be running outside of systemd control') @@ -301,20 +314,28 @@ async def initialize(cmdset): 'passphrase protected ssh key easier.\n') sys.exit(1) init_confluent_myname() - sshutil.initialize_root_key(False) + await sshutil.initialize_root_key(False) if cmdset.t: didsomething = True init_confluent_myname() - certutil.create_certificate() + await certutil.create_certificate() if os.path.exists('/usr/lib/systemd/system/httpd.service'): try: - subprocess.check_call(['systemctl', 'try-restart', 'httpd']) + hrproc = await asyncio.create_subprocess_exec('systemctl', 'try-restart', 'httpd') + rc = await hrproc.wait() + if rc != 0: + raise Exception('Failed restarting HTTP') + #subprocess.check_call(['systemctl', 'try-restart', 'httpd']) print('HTTP server has been restarted if it was running') except Exception: emprint('New HTTPS certificates generated, restart the web server manually') elif os.path.exists('/usr/lib/systemd/system/apache2.service'): try: - subprocess.check_call(['systemctl', 'try-restart', 'apache2']) + hrproc = await asyncio.create_subprocess_exec('systemctl', 'try-restart', 'apache2') + rc = await hrproc.wait() + if rc != 0: + raise Exception('Failed restarting HTTP') + # subprocess.check_call(['systemctl', 'try-restart', 'apache2']) print('HTTP server has been restarted if it was running') except Exception: emprint('New HTTPS certificates generated, restart the web server manually') @@ -324,20 +345,20 @@ async def initialize(cmdset): didsomething = True init_confluent_myname() try: - sshutil.initialize_ca() + await sshutil.initialize_ca() except sshutil.AlreadyExists: emprint('Skipping generation of SSH CA, already present and would likely be more problematic to regenerate than to reuse (if absolutely sure you want to discard old CA, then delete /etc/confluent/ssh/ca* and restart confluent)') if cmdset.a: didsomething = True init_confluent_myname() try: - sshutil.initialize_root_key(True, True) + await sshutil.initialize_root_key(True, True) except sshutil.AlreadyExists: emprint('Skipping generation of new automation key, already present and regeneration usually causes more problems. (If absolutely certain, delete /etc/confluent/ssh/automation* and restart confluent)') if cmdset.p: - install_tftp_content() + await install_tftp_content() if cmdset.l: - local_node_trust_setup() + await local_node_trust_setup() if cmdset.k: cas = set([]) cakeys = set([]) @@ -411,15 +432,20 @@ async def initialize(cmdset): for fname in files: topack.append(os.path.join(currd, fname)) with open(tmpname, 'wb') as initramfs: - packit = subprocess.Popen(['cpio', '-H', 'newc', '-o'], - stdout=initramfs, stdin=subprocess.PIPE) + packit = await asyncio.subprocess.create_subprocess_exec( + 'cpio', '-H', 'newc', '-o', + stdin=asyncio.subprocess.PIPE, stdout=initramfs) + #packit = subprocess.Popen(['cpio', '-H', 'newc', '-o'], + # stdout=initramfs, stdin=subprocess.PIPE) for packfile in topack: if not isinstance(packfile, bytes): packfile = packfile.encode('utf8') packit.stdin.write(packfile) packit.stdin.write(b'\n') + await packit.stdin.drain() packit.stdin.close() - res = packit.wait() + await packit.stdin.wait_closed() + res = await packit.wait() if res: sys.stderr.write('Error occurred while packing site initramfs') sys.exit(1) @@ -439,7 +465,9 @@ async def initialize(cmdset): if totar: tmptarname = tmpname.replace('cpio', 'tgz') tarcmd = ['tar', '-czf', tmptarname] + totar - subprocess.check_call(tarcmd) + tarproc = await asyncio.subprocess.create_subprocess_exec(*tarcmd) + await tarproc.wait() + #subprocess.check_call(tarcmd) os.rename(tmptarname, '/var/lib/confluent/public/site/initramfs.tgz') oum = os.umask(0o22) try: @@ -450,7 +478,9 @@ async def initialize(cmdset): print('Site initramfs content packed successfully') if not os.path.exists('/etc/confluent/srvcert.pem'): - subprocess.check_call(['collective', 'gencert']) + gcproc = await asyncio.subprocess.create_subprocess_exec('collective', 'gencert') + await gcproc.wait() + #subprocess.check_call(['collective', 'gencert']) # TODO: check selinux and segetbool for httpd_can_network_connect # httpd available and enabled? @@ -502,7 +532,7 @@ def oslist(): print("") -def osimport(imagefile, checkonly=False, custname=None): +async def osimport(imagefile, checkonly=False, custname=None): c = client.Command() imagefile = os.path.abspath(imagefile) if c.unixdomain: @@ -519,7 +549,7 @@ def osimport(imagefile, checkonly=False, custname=None): apiargs = {'filename': imagefile} if custname: apiargs['custname'] = custname - for rsp in c.create(apipath, apiargs): + async for rsp in c.create(apipath, apiargs): if 'target' in rsp: importing = True shortname = rsp['name'] @@ -544,7 +574,7 @@ def osimport(imagefile, checkonly=False, custname=None): print(repr(rsp)) try: while importing: - for rsp in c.read('/deployment/importing/{0}'.format(shortname)): + async for rsp in c.read('/deployment/importing/{0}'.format(shortname)): if 'progress' in rsp: sys.stdout.write('{0}: {1:.2f}% \r'.format(rsp['phase'], rsp['progress'])) @@ -564,7 +594,8 @@ def osimport(imagefile, checkonly=False, custname=None): time.sleep(0.5) finally: if shortname: - list(c.delete('/deployment/importing/{0}'.format(shortname))) + async for x in c.delete('/deployment/importing/{0}'.format(shortname)): + pass if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main(sys.argv)) diff --git a/confluent_server/confluent/certutil.py b/confluent_server/confluent/certutil.py index 9a478787..9b1193c4 100644 --- a/confluent_server/confluent/certutil.py +++ b/confluent_server/confluent/certutil.py @@ -4,7 +4,6 @@ import confluent.util as util from os.path import exists import shutil import socket -import eventlet.green.subprocess as subprocess import tempfile def mkdirp(targ): @@ -31,8 +30,8 @@ def normalize_uid(): raise Exception('Need to run as root or owner of /etc/confluent') return curruid -def get_ip_addresses(): - lines, _ = util.run(['ip', 'addr']) +async def get_ip_addresses(): + lines, _ = await util.check_output('ip', 'addr') if not isinstance(lines, str): lines = lines.decode('utf8') for line in lines.split('\n'): @@ -83,11 +82,11 @@ def get_certificate_paths(): return keypath, certpath -def assure_tls_ca(): +async def assure_tls_ca(): keyout, certout = ('/etc/confluent/tls/cakey.pem', '/etc/confluent/tls/cacert.pem') if not os.path.exists(certout): #create_simple_ca(keyout, certout) - create_full_ca(certout) + await create_full_ca(certout) fname = '/var/lib/confluent/public/site/tls/{0}.pem'.format( collective.get_myname()) ouid = normalize_uid() @@ -99,8 +98,8 @@ def assure_tls_ca(): raise try: shutil.copy2('/etc/confluent/tls/cacert.pem', fname) - hv, _ = util.run( - ['openssl', 'x509', '-in', '/etc/confluent/tls/cacert.pem', '-hash', '-noout']) + hv, _ = await util.check_output( + 'openssl', 'x509', '-in', '/etc/confluent/tls/cacert.pem', '-hash', '-noout') if not isinstance(hv, str): hv = hv.decode('utf8') hv = hv.strip() @@ -125,7 +124,7 @@ def substitute_cfg(setting, key, val, newval, cfgfile, line): return True return False -def create_full_ca(certout): +async def create_full_ca(certout): mkdirp('/etc/confluent/tls/ca/private') keyout = '/etc/confluent/tls/ca/private/cakey.pem' csrout = '/etc/confluent/tls/ca/ca.csr' @@ -163,23 +162,23 @@ def create_full_ca(certout): continue cfgfile.write(line.strip() + '\n') cfgfile.write('\n[CACert]\nbasicConstraints = CA:true\n\n[ca_confluent]\n') - subprocess.check_call( - ['openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out', - keyout]) - subprocess.check_call( - ['openssl', 'req', '-new', '-key', keyout, '-out', csrout, '-subj', subj]) - subprocess.check_call( - ['openssl', 'ca', '-config', newcfg, '-batch', '-selfsign', + await util.check_call( + 'openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out', + keyout) + await util.check_call( + 'openssl', 'req', '-new', '-key', keyout, '-out', csrout, '-subj', subj) + await util.check_call( + 'openssl', 'ca', '-config', newcfg, '-batch', '-selfsign', '-extensions', 'CACert', '-extfile', newcfg, '-notext', '-startdate', '19700101010101Z', '-enddate', '21000101010101Z', '-keyfile', - keyout, '-out', '/etc/confluent/tls/ca/cacert.pem', '-in', csrout] + keyout, '-out', '/etc/confluent/tls/ca/cacert.pem', '-in', csrout ) shutil.copy2('/etc/confluent/tls/ca/cacert.pem', certout) #openssl ca -config openssl.cnf -selfsign -keyfile cakey.pem -startdate 20150214120000Z -enddate 20160214120000Z #20160107071311Z -enddate 20170106071311Z -def create_simple_ca(keyout, certout): +async def create_simple_ca(keyout, certout): try: os.makedirs('/etc/confluent/tls') except OSError as e: @@ -189,39 +188,39 @@ def create_simple_ca(keyout, certout): tmphdl, tmpconfig = tempfile.mkstemp() os.close(tmphdl) shutil.copy2(sslcfg, tmpconfig) - subprocess.check_call( - ['openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out', - keyout]) + await util.check_call( + 'openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out', + keyout) try: subj = '/CN=Confluent TLS Certificate authority ({0})'.format(socket.gethostname()) if len(subj) > 68: subj = subj[:68] with open(tmpconfig, 'a') as cfgfile: cfgfile.write('\n[CACert]\nbasicConstraints = CA:true\n') - subprocess.check_call([ + await util.check_call( 'openssl', 'req', '-new', '-x509', '-key', keyout, '-days', '27300', '-out', certout, '-subj', subj, '-extensions', 'CACert', '-config', tmpconfig - ]) + ) finally: os.remove(tmpconfig) -def create_certificate(keyout=None, certout=None, csrout=None): +async def create_certificate(keyout=None, certout=None, csrout=None): if not keyout: keyout, certout = get_certificate_paths() if not keyout: raise Exception('Unable to locate TLS certificate path automatically') - assure_tls_ca() + await assure_tls_ca() shortname = socket.gethostname().split('.')[0] longname = shortname # socket.getfqdn() if not csrout: - subprocess.check_call( - ['openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out', - keyout]) - san = ['IP:{0}'.format(x) for x in get_ip_addresses()] + await util.check_call( + 'openssl', 'ecparam', '-name', 'secp384r1', '-genkey', '-out', + keyout) + san = ['IP:{0}'.format(x) async for x in get_ip_addresses()] # It is incorrect to put IP addresses as DNS type. However # there exists non-compliant clients that fail with them as IP - san.extend(['DNS:{0}'.format(x) for x in get_ip_addresses()]) + san.extend(['DNS:{0}'.format(x) async for x in get_ip_addresses()]) san.append('DNS:{0}'.format(shortname)) #san.append('DNS:{0}'.format(longname)) san = ','.join(san) @@ -242,11 +241,11 @@ def create_certificate(keyout=None, certout=None, csrout=None): cfgfile.write('\n[SAN]\nsubjectAltName={0}'.format(san)) with open(extconfig, 'a') as cfgfile: cfgfile.write('\nbasicConstraints=CA:false\nsubjectAltName={0}'.format(san)) - subprocess.check_call([ + await util.check_call( 'openssl', 'req', '-new', '-key', keyout, '-out', csrout, '-subj', '/CN={0}'.format(longname), '-extensions', 'SAN', '-config', tmpconfig - ]) + ) else: # when used manually, allow the csr SAN to stand # may add explicit subj/SAN argument, in which case we would skip copy @@ -258,13 +257,13 @@ def create_certificate(keyout=None, certout=None, csrout=None): # simple style CA in effect, make a random serial number and # hope for the best, and accept inability to backdate the cert serialnum = '0x' + ''.join(['{:02x}'.format(x) for x in bytearray(os.urandom(20))]) - subprocess.check_call([ + await util.check_call( 'openssl', 'x509', '-req', '-in', csrout, '-CA', '/etc/confluent/tls/cacert.pem', '-CAkey', '/etc/confluent/tls/cakey.pem', '-set_serial', serialnum, '-out', certout, '-days', '27300', '-extfile', extconfig - ]) + ) else: # we moved to a 'proper' CA, mainly for access to backdating # start of certs for finicky system clocks @@ -279,12 +278,12 @@ def create_certificate(keyout=None, certout=None, csrout=None): os.close(tmphdl) cacfgfile = tmpcafile # with realcalock: # if we put it in server, we must lock it - subprocess.check_call([ + await util.check_call( 'openssl', 'ca', '-config', cacfgfile, '-in', csrout, '-out', certout, '-batch', '-notext', '-startdate', '19700101010101Z', '-enddate', '21000101010101Z', '-extfile', extconfig - ]) + ) finally: os.remove(tmpconfig) if needcsr: diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index f34c2554..161b43c3 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -191,7 +191,7 @@ def handle_storage(configmanager, inputdata, pathcomponents, operation): for rsp in mountmanager.handle_request(configmanager, inputdata, pathcomponents[2:], operation): yield rsp -def handle_deployment(configmanager, inputdata, pathcomponents, +async def handle_deployment(configmanager, inputdata, pathcomponents, operation): if len(pathcomponents) == 1: yield msg.ChildCollection('distributions/') @@ -228,12 +228,12 @@ def handle_deployment(configmanager, inputdata, pathcomponents, yield msg.ChildCollection('info') if operation == 'update': if 'updateboot' in inputdata: - osimage.update_boot(profname) + await osimage.update_boot(profname) yield msg.KeyValueData({'updated': profname}) return elif 'rebase' in inputdata: try: - updated, customized = osimage.rebase_profile(profname) + updated, customized = await osimage.rebase_profile(profname) except osimage.ManifestMissing: raise exc.InvalidArgumentException('Specified profile {0} does not have a manifest.yaml for rebase'.format(profname)) for upd in updated: @@ -260,10 +260,12 @@ def handle_deployment(configmanager, inputdata, pathcomponents, return elif operation == 'create': if inputdata.get('custname', None): - importer = osimage.MediaImporter(inputdata['filename'], + importer = osimage.MediaImporter() + await importer.init(inputdata['filename'], configmanager, inputdata['custname']) else: - importer = osimage.MediaImporter(inputdata['filename'], + importer = osimage.MediaImporter() + await importer.init(inputdata['filename'], configmanager) yield msg.KeyValueData({'target': importer.targpath, 'name': importer.importkey}) diff --git a/confluent_server/confluent/discovery/core.py b/confluent_server/confluent/discovery/core.py index 66055a40..0fe31f26 100644 --- a/confluent_server/confluent/discovery/core.py +++ b/confluent_server/confluent/discovery/core.py @@ -90,8 +90,6 @@ import struct #import eventlet.green.socket as socket import socket import socket as nsocket -import subprocess -#import eventlet.green.subprocess as subprocess #webclient = eventlet.import_patched('pyghmi.util.webclient') @@ -1454,7 +1452,7 @@ async def discover_node(cfg, handler, info, nodename, manual): else: bmcaddr = bmcaddr.split('/', 1)[0] await wait_for_connection(bmcaddr) - subprocess.check_call(['/opt/confluent/bin/nodeconfig', nodename] + nodeconfig) + await util.check_call('/opt/confluent/bin/nodeconfig', nodename, nodeconfig) log.log({'info': 'Configured {0} ({1})'.format(nodename, handler.devname)}) diff --git a/confluent_server/confluent/mountmanager.py b/confluent_server/confluent/mountmanager.py index 36f654d2..02ae1003 100644 --- a/confluent_server/confluent/mountmanager.py +++ b/confluent_server/confluent/mountmanager.py @@ -1,26 +1,26 @@ +import asyncio import eventlet import confluent.messages as msg import confluent.exceptions as exc import struct import eventlet.green.socket as socket -import eventlet.green.subprocess as subprocess import os mountsbyuser = {} _browserfsd = None -def assure_browserfs(): +async def assure_browserfs(): global _browserfsd if _browserfsd is None: os.makedirs('/var/run/confluent/browserfs/mount', exist_ok=True) - _browserfsd = subprocess.Popen( - ['/opt/confluent/bin/browserfs', + _browserfsd = await asyncio.subprocess.create_subprocess_exec( + '/opt/confluent/bin/browserfs', '-c', '/var/run/confluent/browserfs/control', '-s', '127.0.0.1:4006', # browserfs supports unix domain websocket, however apache reverse proxy is dicey that way in some versions - '-w', '/var/run/confluent/browserfs/mount']) + '-w', '/var/run/confluent/browserfs/mount') while not os.path.exists('/var/run/confluent/browserfs/control'): - eventlet.sleep(0.5) + await asyncio.sleep(0.5) def handle_request(configmanager, inputdata, pathcomponents, operation): @@ -50,8 +50,8 @@ def handle_request(configmanager, inputdata, pathcomponents, operation): 'authtoken': currmount['authtoken'] }) -def requestmount(subdir, filename): - assure_browserfs() +async def requestmount(subdir, filename): + await assure_browserfs() a = socket.socket(socket.AF_UNIX) a.connect('/var/run/confluent/browserfs/control') subname = subdir.encode() diff --git a/confluent_server/confluent/osimage.py b/confluent_server/confluent/osimage.py index e0c1a8cb..02363408 100644 --- a/confluent_server/confluent/osimage.py +++ b/confluent_server/confluent/osimage.py @@ -1,7 +1,6 @@ #!/usr/bin/python +import asyncio import eventlet -import eventlet.green.select as select -import eventlet.green.subprocess as subprocess from fnmatch import fnmatch import glob import logging @@ -21,6 +20,7 @@ if __name__ == '__main__': import confluent.exceptions as exc import confluent.messages as msg +import confluent.util as util COPY = 1 EXTRACT = 2 @@ -64,7 +64,7 @@ def symlink(src, targ): raise -def update_boot(profilename): +async def update_boot(profilename): if profilename.startswith('/var/lib/confluent/public'): profiledir = profilename else: @@ -78,11 +78,11 @@ def update_boot(profilename): label = profile.get('label', profname) ostype = profile.get('ostype', 'linux') if ostype == 'linux': - update_boot_linux(profiledir, profile, label) + await update_boot_linux(profiledir, profile, label) elif ostype == 'esxi': - update_boot_esxi(profiledir, profile, label) + await update_boot_esxi(profiledir, profile, label) -def update_boot_esxi(profiledir, profile, label): +async def update_boot_esxi(profiledir, profile, label): profname = os.path.basename(profiledir) kernelargs = profile.get('kernelargs', '') oum = os.umask(0o22) @@ -149,9 +149,9 @@ def update_boot_esxi(profiledir, profile, label): 'chain boot/efi/boot/bootx64.efi -c /confluent-public/os/{0}/boot/boot.cfg'.format(pname)) finally: ipxeout.close() - subprocess.check_call( - ['/opt/confluent/bin/dir2img', '{0}/boot'.format(profiledir), - '{0}/boot.img'.format(profiledir), profname], preexec_fn=relax_umask) + await util.check_call( + '/opt/confluent/bin/dir2img', '{0}/boot'.format(profiledir), + '{0}/boot.img'.format(profiledir), profname, preexec_fn=relax_umask) def find_glob(loc, fileglob): @@ -162,7 +162,7 @@ def find_glob(loc, fileglob): return None -def update_boot_linux(profiledir, profile, label): +async def update_boot_linux(profiledir, profile, label): profname = os.path.basename(profiledir) kernelargs = profile.get('kernelargs', '') grubcfg = "set timeout=5\nmenuentry '" @@ -200,9 +200,9 @@ def update_boot_linux(profiledir, profile, label): ipxeout.write('imgload kernel\nimgexec kernel\n') finally: ipxeout.close() - subprocess.check_call( - ['/opt/confluent/bin/dir2img', '{0}/boot'.format(profiledir), - '{0}/boot.img'.format(profiledir), profname], preexec_fn=relax_umask) + await util.check_call( + '/opt/confluent/bin/dir2img', '{0}/boot'.format(profiledir), + '{0}/boot.img'.format(profiledir), profname, preexec_fn=relax_umask) def extract_entries(entries, flags=0, callback=None, totalsize=None, extractlist=None): @@ -551,7 +551,7 @@ def check_rhel(isoinfo): return {'name': 'rhel-{0}-{1}'.format(ver, arch), 'method': EXTRACT, 'category': 'el{0}'.format(major)} -def scan_iso(archive): +async def scan_iso(archive): filesizes = {} filecontents = {} dfd = os.dup(archive.fileno()) @@ -561,7 +561,7 @@ def scan_iso(archive): for ent in reader: if str(ent).endswith('TRANS.TBL'): continue - eventlet.sleep(0) + await asyncio.sleep(0) filesizes[str(ent)] = ent.size if str(ent) in READFILES: filecontents[str(ent)] = b'' @@ -572,13 +572,13 @@ def scan_iso(archive): return filesizes, filecontents -def fingerprint(archive): +async def fingerprint(archive): archive.seek(0) header = archive.read(32768) archive.seek(32769) if archive.read(6) == b'CD001\x01': # ISO image - isoinfo = scan_iso(archive) + isoinfo = await scan_iso(archive) name = None for fun in globals(): if fun.startswith('check_'): @@ -599,12 +599,12 @@ def fingerprint(archive): return imginfo, None, None -def import_image(filename, callback, backend=False, mfd=None, custtargpath=None, custdistpath=None, custname=''): +async def import_image(filename, callback, backend=False, mfd=None, custtargpath=None, custdistpath=None, custname=''): if mfd: archive = os.fdopen(int(mfd), 'rb') else: archive = open(filename, 'rb') - identity = fingerprint(archive) + identity = await fingerprint(archive) if not identity: return -1 identity, imginfo, funname = identity @@ -689,17 +689,20 @@ def copy_file(src, dst): makedirs(newdir, 0o755) shutil.copy2(src, dst) -def get_hash(fname): +async def get_hash(fname): currhash = hashlib.sha512() with open(fname, 'rb') as currf: currd = currf.read(2048) + await asyncio.sleep(0) while currd: currhash.update(currd) currd = currf.read(2048) + await asyncio.sleep(0) + return currhash.hexdigest() -def rebase_profile(dirname): +async def rebase_profile(dirname): if dirname.startswith('/var/lib/confluent/public'): profiledir = dirname else: @@ -712,7 +715,7 @@ def rebase_profile(dirname): except IOError: raise ManifestMissing() distdir = manifest['distdir'] - newdisthashes = get_hashes(distdir) + newdisthashes = await get_hashes(distdir) olddisthashes = manifest['disthashes'] customized = [] newmanifest = [] @@ -737,7 +740,7 @@ def rebase_profile(dirname): newmanifest.append(updatecandidate) for nf in newmanifest: nfname = os.path.join(profiledir, nf) - currhash = get_hash(nfname) + currhash = await get_hash(nfname) manifest['disthashes'][nf] = currhash with open('{0}/manifest.yaml'.format(profiledir), 'w') as yout: yout.write('# This manifest enables rebase to know original source of profile data and if any customizations have been done\n') @@ -753,21 +756,20 @@ def rebase_profile(dirname): -def get_hashes(dirname): +async def get_hashes(dirname): hashmap = {} for dname, _, fnames in os.walk(dirname): for fname in fnames: if fname == 'profile.yaml': continue fullname = os.path.join(dname, fname) - currhash = hashlib.sha512() subname = fullname.replace(dirname + '/', '') if os.path.isfile(fullname): - hashmap[subname] = get_hash(fullname) + hashmap[subname] = await get_hash(fullname) return hashmap -def generate_stock_profiles(defprofile, distpath, targpath, osname, +async def generate_stock_profiles(defprofile, distpath, targpath, osname, profilelist, customname): osd, osversion, arch = osname.split('-') bootupdates = [] @@ -782,7 +784,7 @@ def generate_stock_profiles(defprofile, distpath, targpath, osname, continue oumask = os.umask(0o22) shutil.copytree(srcname, dirname) - hmap = get_hashes(dirname) + hmap = await get_hashes(dirname) profdata = None try: os.makedirs('{0}/boot/initramfs'.format(dirname), 0o755) @@ -819,18 +821,18 @@ def generate_stock_profiles(defprofile, distpath, targpath, osname, '/var/lib/confluent/public/site/initramfs.cpio', '{0}/boot/initramfs/site.cpio'.format(dirname)) os.symlink(distpath, '{0}/distribution'.format(dirname)) - subprocess.check_call( - ['sh', '{0}/initprofile.sh'.format(dirname), - targpath, dirname]) - bootupdates.append(eventlet.spawn(update_boot, dirname)) + await util.check_call( + 'sh', '{0}/initprofile.sh'.format(dirname), + targpath, dirname) + bootupdates.append(util.spawn(update_boot(dirname))) profilelist.append(profname) for upd in bootupdates: - upd.wait() + await upd class MediaImporter(object): - def __init__(self, media, cfm=None, customname=None, checkonly=False): + async def init(self, media, cfm=None, customname=None, checkonly=False): self.worker = None if not os.path.exists('/var/lib/confluent/public'): raise Exception('`osdeploy initialize` must be executed before importing any media') @@ -844,7 +846,7 @@ class MediaImporter(object): else: medfile = open(media, 'rb') try: - identity = fingerprint(medfile) + identity = await fingerprint(medfile) finally: if not self.medfile: medfile.close() @@ -886,27 +888,33 @@ class MediaImporter(object): importing[importkey] = self self.filename = os.path.abspath(media) self.error = '' - self.importer = eventlet.spawn(self.importmedia) + self.importer = util.spawn(self.importmedia()) def stop(self): - if self.worker and self.worker.poll() is None: + if self.worker and self.worker.returncode is None: self.worker.kill() @property def progress(self): return {'phase': self.phase, 'progress': self.percent, 'profiles': self.profiles, 'error': self.error} - def importmedia(self): + async def importmedia(self): if self.medfile: os.environ['CONFLUENT_MEDIAFD'] = '{0}'.format(self.medfile.fileno()) with open(os.devnull, 'w') as devnull: - self.worker = subprocess.Popen( - [sys.executable, __file__, self.filename, '-b', self.targpath, self.distpath, self.customname], - stdin=devnull, stdout=subprocess.PIPE, close_fds=False) + self.worker = await asyncio.create_subprocess_exec( + sys.executable, __file__, self.filename, '-b', + self.targpath, self.distpath, self.customname, + stdout=asyncio.subprocess.PIPE, close_fds=False) wkr = self.worker currline = b'' - while wkr.poll() is None: - currline += wkr.stdout.read(1) + while wkr.returncode is None: + try: + await asyncio.wait_for(wkr.wait(), 0.001) + except asyncio.TimeoutError: + pass + nb = await wkr.stdout.read(128) + currline += nb if b'\r' in currline: if b'%' in currline: val = currline.split(b'%')[0].strip() @@ -920,7 +928,7 @@ class MediaImporter(object): self.percent = 100.0 return currline = b'' - a = wkr.stdout.read(1) + a = await wkr.stdout.read(1) while a: currline += a if b'\r' in currline: @@ -935,12 +943,12 @@ class MediaImporter(object): self.phase = 'error' return currline = b'' - a = wkr.stdout.read(1) + a = await wkr.stdout.read(1) if self.oscategory: defprofile = '/opt/confluent/lib/osdeploy/{0}'.format( self.oscategory) try: - generate_stock_profiles(defprofile, self.distpath, self.targpath, + await generate_stock_profiles(defprofile, self.distpath, self.targpath, self.osname, self.profiles, self.customname) except Exception as e: self.phase = 'error' @@ -968,7 +976,7 @@ if __name__ == '__main__': os.umask(0o022) if len(sys.argv) > 2: mfd = os.environ.get('CONFLUENT_MEDIAFD', None) - sys.exit(import_image(sys.argv[1], callback=printit, backend=True, mfd=mfd, custtargpath=sys.argv[3], custdistpath=sys.argv[4], custname=sys.argv[5])) + asyncio.get_event_loop().run_until_complete(import_image(sys.argv[1], callback=printit, backend=True, mfd=mfd, custtargpath=sys.argv[3], custdistpath=sys.argv[4], custname=sys.argv[5])) else: - sys.exit(import_image(sys.argv[1], callback=printit)) + asyncio.get_event_loop().run_until_complete(import_image(sys.argv[1], callback=printit)) diff --git a/confluent_server/confluent/plugins/deployment/identimage.py b/confluent_server/confluent/plugins/deployment/identimage.py index 4959a63e..ffc8f3c6 100644 --- a/confluent_server/confluent/plugins/deployment/identimage.py +++ b/confluent_server/confluent/plugins/deployment/identimage.py @@ -20,7 +20,7 @@ # to use this. import confluent.messages as msg import confluent.netutil as netutil -import eventlet.green.subprocess as subprocess +import confluent.util as util import os import shutil import tempfile @@ -41,7 +41,7 @@ def create_apikey(): return newpass -def create_ident_image(node, configmanager): +async def create_ident_image(node, configmanager): tmpd = tempfile.mkdtemp() ident = { 'nodename': node } apikey = create_apikey() @@ -63,13 +63,13 @@ def create_ident_image(node, configmanager): imgname = '/var/lib/confluent/private/identity_images/{0}.img'.format(node) if os.path.exists(imgname): os.remove(imgname) - subprocess.check_call(['/opt/confluent/bin/dir2img', tmpd, imgname, 'cnflnt_idnt']) + await util.check_call('/opt/confluent/bin/dir2img', tmpd, imgname, 'cnflnt_idnt') shutil.rmtree(tmpd) -def update(nodes, element, configmanager, inputdata): +async def update(nodes, element, configmanager, inputdata): for node in nodes: - create_ident_image(node, configmanager) + await create_ident_image(node, configmanager) yield msg.CreatedResource( 'nodes/{0}/deployment/ident_image'.format(node)) diff --git a/confluent_server/confluent/sshutil.py b/confluent_server/confluent/sshutil.py index cf17f37a..e0d71fea 100644 --- a/confluent_server/confluent/sshutil.py +++ b/confluent_server/confluent/sshutil.py @@ -1,11 +1,10 @@ #!/usr/bin/python +import asyncio import base64 import confluent.config.configmanager as cfm import confluent.collective.manager as collective import confluent.util as util -import eventlet.green.subprocess as subprocess -import eventlet import glob import os import shutil @@ -15,12 +14,10 @@ agent_pid = None ready_keys = {} _sshver = None -def sshver(): +async def sshver(): global _sshver if _sshver is None: - p = subprocess.Popen(['ssh', '-V'], stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - _, output = p.communicate() + _, output =await util.check_output('ssh', '-V') _sshver = float(output.split()[0].split(b'_')[1].split(b'p')[0]) return _sshver @@ -34,11 +31,11 @@ def normalize_uid(): return curruid agent_starting = False -def assure_agent(): +async def assure_agent(): global agent_starting global agent_pid while agent_starting: - eventlet.sleep(0.1) + await asyncio.sleep(0.1) if agent_pid is None: try: agent_starting = True @@ -60,8 +57,8 @@ def assure_agent(): agent_starting = False return True -def get_passphrase(): - if sshver() <= 7.6: +async def get_passphrase(): + if await sshver() <= 7.6: return '' # convert the master key to base64 # for use in ssh passphrase context @@ -75,7 +72,7 @@ def get_passphrase(): class AlreadyExists(Exception): pass -def initialize_ca(): +async def initialize_ca(): ouid = normalize_uid() # if already there, skip, make warning myname = collective.get_myname() @@ -92,9 +89,9 @@ def initialize_ca(): finally: os.seteuid(ouid) comment = '{0} SSH CA'.format(myname) - subprocess.check_call( - ['ssh-keygen', '-C', comment, '-t', 'ed25519', '-f', - '/etc/confluent/ssh/ca', '-N', get_passphrase()], + await util.check_call( + 'ssh-keygen', '-C', comment, '-t', 'ed25519', '-f', + '/etc/confluent/ssh/ca', '-N', await get_passphrase(), preexec_fn=normalize_uid) ouid = normalize_uid() try: @@ -111,15 +108,15 @@ def initialize_ca(): adding_key = False -def prep_ssh_key(keyname): +async def prep_ssh_key(keyname): global adding_key while adding_key: - eventlet.sleep(0.1) + await asyncio.sleep(0.1) adding_key = True if keyname in ready_keys: adding_key = False return - if not assure_agent(): + if not await assure_agent(): ready_keys[keyname] = 1 adding_key = False return @@ -129,14 +126,14 @@ def prep_ssh_key(keyname): with open(askpass, 'w') as ap: ap.write('#!/bin/sh\necho $CONFLUENT_SSH_PASSPHRASE\nrm {0}\n'.format(askpass)) os.chmod(askpass, 0o700) - os.environ['CONFLUENT_SSH_PASSPHRASE'] = get_passphrase() + os.environ['CONFLUENT_SSH_PASSPHRASE'] = await get_passphrase() olddisplay = os.environ.get('DISPLAY', None) oldaskpass = os.environ.get('SSH_ASKPASS', None) os.environ['DISPLAY'] = 'NONE' os.environ['SSH_ASKPASS'] = askpass try: with open(os.devnull, 'wb') as devnull: - subprocess.check_output(['ssh-add', keyname], stdin=devnull, stderr=devnull) + await util.check_call('ssh-add', keyname) finally: del os.environ['CONFLUENT_SSH_PASSPHRASE'] del os.environ['DISPLAY'] @@ -150,10 +147,10 @@ def prep_ssh_key(keyname): adding_key = False shutil.rmtree(tmpdir) -def sign_host_key(pubkey, nodename, principals=()): +async def sign_host_key(pubkey, nodename, principals=()): tmpdir = tempfile.mkdtemp() try: - prep_ssh_key('/etc/confluent/ssh/ca') + await prep_ssh_key('/etc/confluent/ssh/ca') ready_keys['ca.pub'] = 1 pkeyname = os.path.join(tmpdir, 'hostkey.pub') with open(pkeyname, 'wb') as pubfile: @@ -161,25 +158,25 @@ def sign_host_key(pubkey, nodename, principals=()): principals = set(principals) principals.add(nodename) principals = ','.join(sorted(principals)) - flags = '-Us' if sshver() > 7.6 else '-s' + flags = '-Us' if await sshver() > 7.6 else '-s' keyname = '/etc/confluent/ssh/ca.pub' if flags == '-Us' else '/etc/confluent/ssh/ca' - subprocess.check_call( - ['ssh-keygen', flags, keyname, '-I', nodename, - '-n', principals, '-h', pkeyname]) + await util.check_call( + 'ssh-keygen', flags, keyname, '-I', nodename, + '-n', principals, '-h', pkeyname) certname = pkeyname.replace('.pub', '-cert.pub') with open(certname) as cert: return cert.read() finally: shutil.rmtree(tmpdir) -def initialize_root_key(generate, automation=False): +async def initialize_root_key(generate, automation=False): authorized = [] myname = collective.get_myname() alreadyexist = False for currkey in glob.glob('/root/.ssh/*.pub'): authorized.append(currkey) if generate and not authorized and not automation: - subprocess.check_call(['ssh-keygen', '-t', 'ed25519', '-f', '/root/.ssh/id_ed25519', '-N', '']) + await util.check_call('ssh-keygen', '-t', 'ed25519', '-f', '/root/.ssh/id_ed25519', '-N', '') for currkey in glob.glob('/root/.ssh/*.pub'): authorized.append(currkey) if automation and generate: @@ -194,10 +191,10 @@ def initialize_root_key(generate, automation=False): raise finally: os.seteuid(ouid) - subprocess.check_call( - ['ssh-keygen', '-t', 'ed25519', - '-f','/etc/confluent/ssh/automation', '-N', get_passphrase(), - '-C', 'Confluent Automation by {}'.format(myname)], + await util.check_call( + 'ssh-keygen', '-t', 'ed25519', + '-f','/etc/confluent/ssh/automation', '-N', await get_passphrase(), + '-C', 'Confluent Automation by {}'.format(myname), preexec_fn=normalize_uid) authorized = ['/etc/confluent/ssh/automation.pub'] ouid = normalize_uid() @@ -213,6 +210,8 @@ def initialize_root_key(generate, automation=False): suffix = 'automationpubkey' else: suffix = 'rootpubkey' + # if myname suffix is rootpubkey, and file exists, zero it + # append instead of replace for auth in authorized: shutil.copy( auth, diff --git a/confluent_server/confluent/util.py b/confluent_server/confluent/util.py index 4ba4c7eb..041d7460 100644 --- a/confluent_server/confluent/util.py +++ b/confluent_server/confluent/util.py @@ -16,6 +16,7 @@ # limitations under the License. # Various utility functions that do not neatly fit into one category or another +import asyncio import base64 import confluent.exceptions as cexc import confluent.log as log @@ -26,9 +27,9 @@ import re import socket import ssl import struct -import eventlet.green.subprocess as subprocess import asyncio import random +import subprocess def mkdirp(path, mode=0o777): @@ -39,6 +40,12 @@ def mkdirp(path, mode=0o777): raise +async def check_call(*cmd, **kwargs): + subproc = await asyncio.create_subprocess_exec(*cmd, **kwargs) + rc = await subproc.wait() + if rc != 0: + raise subprocess.CalledProcessError(rc, cmd) + async def _sleep_and_run(sleeptime, func, args): await asyncio.sleep(sleeptime) await func(*args) @@ -68,12 +75,15 @@ async def _run(coro, taskid): return ret -def run(cmd): - process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - retcode = process.poll() +async def check_output(*cmd): + process = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + stdout, stderr = await process.communicate() + retcode = process.returncode if retcode: - raise subprocess.CalledProcessError(retcode, process.args, output=stdout, stderr=stderr) + raise subprocess.CalledProcessError( + retcode, process.args, + output=stdout, stderr=stderr) return stdout, stderr