2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-03-28 13:13:33 +00:00

Merge branch 'master' into async

This commit is contained in:
Jarrod Johnson
2024-08-14 16:26:55 -04:00
6 changed files with 138 additions and 45 deletions

View File

@@ -405,20 +405,36 @@ class NetworkManager(object):
else:
cname = stgs.get('connection_name', None)
iname = list(cfg['interfaces'])[0]
if not cname:
cname = iname
ctype = self.devtypes.get(iname, None)
if not ctype:
sys.stderr.write("Warning, no device found for interface_name ({0}), skipping setup\n".format(iname))
return
if stgs.get('vlan_id', None):
vlan = stgs['vlan_id']
if ctype == 'infiniband':
vlan = '0x{0}'.format(vlan) if not vlan.startswith('0x') else vlan
cmdargs['infiniband.parent'] = iname
cmdargs['infiniband.p-key'] = vlan
iname = '{0}.{1}'.format(iname, vlan[2:])
elif ctype == 'ethernet':
ctype = 'vlan'
cmdargs['vlan.parent'] = iname
cmdargs['vlan.id'] = vlan
iname = '{0}.{1}'.format(iname, vlan)
else:
sys.stderr.write("Warning, unknown interface_name ({0}) device type ({1}) for VLAN/PKEY, skipping setup\n".format(iname, ctype))
return
cname = iname if not cname else cname
u = self.uuidbyname.get(cname, None)
cargs = []
for arg in cmdargs:
cargs.append(arg)
cargs.append(cmdargs[arg])
if u:
cargs.append('connection.interface-name')
cargs.append(iname)
subprocess.check_call(['nmcli', 'c', 'm', u] + cargs)
subprocess.check_call(['nmcli', 'c', 'm', u, 'connection.interface-name', iname] + cargs)
subprocess.check_call(['nmcli', 'c', 'u', u])
else:
subprocess.check_call(['nmcli', 'c', 'add', 'type', self.devtypes[iname], 'con-name', cname, 'connection.interface-name', iname] + cargs)
subprocess.check_call(['nmcli', 'c', 'add', 'type', ctype, 'con-name', cname, 'connection.interface-name', iname] + cargs)
self.read_connections()
u = self.uuidbyname.get(cname, None)
if u:
@@ -501,6 +517,8 @@ if __name__ == '__main__':
netname_to_interfaces['default']['interfaces'] -= netname_to_interfaces[netn]['interfaces']
if not netname_to_interfaces['default']['interfaces']:
del netname_to_interfaces['default']
# Make sure VLAN/PKEY connections are created last
netname_to_interfaces = dict(sorted(netname_to_interfaces.items(), key=lambda item: 'vlan_id' in item[1]['settings']))
rm_tmp_llas(tmpllas)
if os.path.exists('/usr/sbin/netplan'):
nm = NetplanManager(dc)

View File

@@ -1,7 +1,7 @@
#!/usr/bin/python2
#!/usr/bin/python3
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2017 Lenovo
# Copyright 2017,2024 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -31,7 +31,7 @@ import confluent.config.conf as conf
import confluent.main as main
argparser = optparse.OptionParser(
usage="Usage: %prog [options] [dump|restore] [path]")
usage="Usage: %prog [options] [dump|restore|merge] [path]")
argparser.add_option('-p', '--password',
help='Password to use to protect/unlock a protected dump')
argparser.add_option('-i', '--interactivepassword', help='Prompt for password',
@@ -52,13 +52,13 @@ argparser.add_option('-s', '--skipkeys', action='store_true',
'data is needed. keys do not change and as such '
'they do not require incremental backup')
(options, args) = argparser.parse_args()
if len(args) != 2 or args[0] not in ('dump', 'restore'):
if len(args) != 2 or args[0] not in ('dump', 'restore', 'merge'):
argparser.print_help()
sys.exit(1)
dumpdir = args[1]
if args[0] == 'restore':
if args[0] in ('restore', 'merge'):
pid = main.is_running()
if pid is not None:
print("Confluent is running, must shut down to restore db")
@@ -70,10 +70,22 @@ if args[0] == 'restore':
if options.interactivepassword:
password = getpass.getpass('Enter password to restore backup: ')
try:
cfm.init(True)
cfm.statelessmode = True
dp = cfm.restore_db_from_directory(dumpdir, password)
stateless = args[0] == 'restore'
cfm.init(stateless)
cfm.statelessmode = stateless
skipped = {'nodes': [], 'nodegroups': []}
dp = cfm.restore_db_from_directory(
dumpdir, password,
merge="skip" if args[0] == 'merge' else False, skipped=skipped)
asyncio.get_event_loop().run_until_complete(dp)
if skipped['nodes']:
skippedn = ','.join(skipped['nodes'])
print('The following nodes were skipped during merge: '
'{}'.format(skippedn))
if skipped['nodegroups']:
skippedn = ','.join(skipped['nodegroups'])
print('The following node groups were skipped during merge: '
'{}'.format(skippedn))
cfm.statelessmode = False
cfm.ConfigManager.wait_for_sync(True)
if owner != 0:

View File

@@ -469,9 +469,13 @@ node = {
'net.interface_names': {
'description': 'Interface name or comma delimited list of names to match for this interface. It is generally recommended '
'to leave this blank unless needing to set up interfaces that are not on a common subnet with a confluent server, '
'as confluent servers provide autodetection for matching the correct network definition to an interface.'
'as confluent servers provide autodetection for matching the correct network definition to an interface. '
'This would be the default name per the deployed OS and can be a comma delimited list to denote members of '
'a team'
'a team or a single interface for VLAN/PKEY connections.'
},
'net.vlan_id': {
'description': 'Ethernet VLAN or InfiniBand PKEY to use for this connection. '
'Specify the parent device using net.interface_names.'
},
'net.ipv4_address': {
'description': 'When configuring static, use this address. If '

View File

@@ -1914,7 +1914,7 @@ class ConfigManager(object):
def add_group_attributes(self, attribmap):
self.set_group_attributes(attribmap, autocreate=True)
async def set_group_attributes(self, attribmap, autocreate=False):
async def set_group_attributes(self, attribmap, autocreate=False, merge="replace", keydata=None, skipped=None):
for group in attribmap:
curr = attribmap[group]
for attrib in curr:
@@ -1935,11 +1935,11 @@ class ConfigManager(object):
if cfgstreams:
await exec_on_followers('_rpc_set_group_attributes', self.tenant,
attribmap, autocreate)
self._true_set_group_attributes(attribmap, autocreate)
self._true_set_group_attributes(attribmap, autocreate, merge=merge, keydata=keydata, skipped=skipped)
def _true_set_group_attributes(self, attribmap, autocreate=False):
def _true_set_group_attributes(self, attribmap, autocreate=False, merge="replace", keydata=None, skipped=None):
changeset = {}
for group in attribmap:
for group in list(attribmap):
if group == '':
raise ValueError('"{0}" is not a valid group name'.format(
group))
@@ -1952,6 +1952,11 @@ class ConfigManager(object):
group))
if not autocreate and group not in self._cfgstore['nodegroups']:
raise ValueError("{0} group does not exist".format(group))
if merge == 'skip' and group in self._cfgstore['nodegroups']:
if skipped is not None:
skipped.append(group)
del attribmap[group]
continue
for attr in list(attribmap[group]):
# first do a pass to normalize out any aliased attribute names
if attr in _attraliases:
@@ -2026,6 +2031,9 @@ class ConfigManager(object):
newdict = {'value': attribmap[group][attr]}
else:
newdict = attribmap[group][attr]
if keydata and attr.startswith('secret.') and 'cryptvalue' in newdict:
newdict['value'] = decrypt_value(newdict['cryptvalue'], keydata['cryptkey'], keydata['integritykey'])
del newdict['cryptvalue']
if 'value' in newdict and attr.startswith("secret."):
newdict['cryptvalue'] = crypt_value(newdict['value'])
del newdict['value']
@@ -2362,7 +2370,7 @@ class ConfigManager(object):
async def set_node_attributes(self, attribmap, autocreate=False):
async def set_node_attributes(self, attribmap, autocreate=False, merge="replace", keydata=None, skipped=None):
for node in attribmap:
curr = attribmap[node]
for attrib in curr:
@@ -2383,14 +2391,14 @@ class ConfigManager(object):
if cfgstreams:
await exec_on_followers('_rpc_set_node_attributes',
self.tenant, attribmap, autocreate)
self._true_set_node_attributes(attribmap, autocreate)
self._true_set_node_attributes(attribmap, autocreate, merge, keydata, skipped)
def _true_set_node_attributes(self, attribmap, autocreate):
def _true_set_node_attributes(self, attribmap, autocreate, merge="replace", keydata=None, skipped=None):
newnodes = []
changeset = {}
# first do a sanity check of the input upfront
# this mitigates risk of arguments being partially applied
for node in attribmap:
for node in list(attribmap):
node = confluent.util.stringify(node)
if node == '':
raise ValueError('"{0}" is not a valid node name'.format(node))
@@ -2403,6 +2411,11 @@ class ConfigManager(object):
'"{0}" is not a valid node name'.format(node))
if autocreate is False and node not in self._cfgstore['nodes']:
raise ValueError("node {0} does not exist".format(node))
if merge == "skip" and node in self._cfgstore['nodes']:
del attribmap[node]
if skipped is not None:
skipped.append(node)
continue
if 'groups' not in attribmap[node] and node not in self._cfgstore['nodes']:
attribmap[node]['groups'] = []
for attrname in list(attribmap[node]):
@@ -2473,6 +2486,9 @@ class ConfigManager(object):
# add check here, skip None attributes
if newdict is None:
continue
if keydata and attrname.startswith('secret.') and 'cryptvalue' in newdict:
newdict['value'] = decrypt_value(newdict['cryptvalue'], keydata['cryptkey'], keydata['integritykey'])
del newdict['cryptvalue']
if 'value' in newdict and attrname.startswith("secret."):
newdict['cryptvalue'] = crypt_value(newdict['value'])
del newdict['value']
@@ -2513,19 +2529,21 @@ class ConfigManager(object):
self._bg_sync_to_file()
#TODO: wait for synchronization to suceed/fail??)
async def _load_from_json(self, jsondata, sync=True):
async def _load_from_json(self, jsondata, sync=True, merge=False, keydata=None, skipped=None):
self.inrestore = True
try:
await self._load_from_json_backend(jsondata, sync=True)
await self._load_from_json_backend(jsondata, sync=True, merge=merge, keydata=keydata, skipped=skipped)
finally:
self.inrestore = False
async def _load_from_json_backend(self, jsondata, sync=True):
async def _load_from_json_backend(self, jsondata, sync=True, merge=False, keydata=None, skipped=None):
"""Load fresh configuration data from jsondata
:param jsondata: String of jsondata
:return:
"""
if not skipped:
skipped = {'nodes': None, 'nodegroups': None}
dumpdata = json.loads(jsondata)
tmpconfig = {}
for confarea in _config_areas:
@@ -2573,20 +2591,27 @@ class ConfigManager(object):
pass
# Now we have to iterate through each fixed up element, using the
# set attribute to flesh out inheritence and expressions
_cfgstore['main']['idmap'] = {}
if (not merge) or _cfgstore.get('main', {}).get('idmap', None) is None:
_cfgstore['main']['idmap'] = {}
attribmerge = merge if merge else "replace"
for confarea in _config_areas:
self._cfgstore[confarea] = {}
if not merge or confarea not in self._cfgstore:
self._cfgstore[confarea] = {}
if confarea not in tmpconfig:
continue
if confarea == 'nodes':
await self.set_node_attributes(tmpconfig[confarea], True)
await self.set_node_attributes(tmpconfig[confarea], True, merge=attribmerge, keydata=keydata, skipped=skipped['nodes'])
elif confarea == 'nodegroups':
await self.set_group_attributes(tmpconfig[confarea], True)
await self.set_group_attributes(tmpconfig[confarea], True, merge=attribmerge, keydata=keydata, skipped=skipped['nodegroups'])
elif confarea == 'usergroups':
if merge:
continue
for usergroup in tmpconfig[confarea]:
role = tmpconfig[confarea][usergroup].get('role', 'Administrator')
await self.create_usergroup(usergroup, role=role)
elif confarea == 'users':
if merge:
continue
for user in tmpconfig[confarea]:
ucfg = tmpconfig[confarea][user]
uid = ucfg.get('id', None)
@@ -2886,7 +2911,7 @@ def _restore_keys(jsond, password, newpassword=None, sync=True):
newpassword = keyfile.read()
set_global('master_privacy_key', _format_key(cryptkey,
password=newpassword), sync)
if integritykey:
if integritykey:
set_global('master_integrity_key', _format_key(integritykey,
password=newpassword), sync)
_masterkey = cryptkey
@@ -2921,12 +2946,22 @@ def _dump_keys(password, dojson=True):
return keydata
async def restore_db_from_directory(location, password):
async def restore_db_from_directory(location, password, merge=False, skipped=None):
kdd = None
try:
with open(os.path.join(location, 'keys.json'), 'r') as cfgfile:
keydata = cfgfile.read()
json.loads(keydata)
_restore_keys(keydata, password)
kdd = json.loads(keydata)
if merge:
if 'cryptkey' in kdd:
kdd['cryptkey'] = _parse_key(kdd['cryptkey'], password)
if 'integritykey' in kdd:
kdd['integritykey'] = _parse_key(kdd['integritykey'], password)
else:
kdd['integritykey'] = None # GCM
else:
kdd = None
_restore_keys(keydata, password)
except IOError as e:
if e.errno == 2:
raise Exception("Cannot restore without keys, this may be a "
@@ -2950,6 +2985,26 @@ async def restore_db_from_directory(location, password):
with open(os.path.join(location, 'main.json'), 'r') as cfgfile:
cfgdata = cfgfile.read()
await ConfigManager(tenant=None)._load_from_json(cfgdata)
if not merge:
try:
moreglobals = json.load(open(os.path.join(location, 'globals.json')))
for globvar in moreglobals:
set_global(globvar, moreglobals[globvar])
except IOError as e:
if e.errno != 2:
raise
try:
collective = json.load(open(os.path.join(location, 'collective.json')))
_cfgstore['collective'] = {}
for coll in collective:
await add_collective_member(coll, collective[coll]['address'],
collective[coll]['fingerprint'])
except IOError as e:
if e.errno != 2:
raise
with open(os.path.join(location, 'main.json'), 'r') as cfgfile:
cfgdata = cfgfile.read()
await ConfigManager(tenant=None)._load_from_json(cfgdata, merge=merge, keydata=kdd, skipped=skipped)
ConfigManager.wait_for_sync(True)

View File

@@ -193,6 +193,9 @@ class NetManager(object):
iname = attribs.get('interface_names', None)
if iname:
myattribs['interface_names'] = iname
vlanid = attribs.get('vlan_id', None)
if vlanid:
myattribs['vlan_id'] = vlanid
teammod = attribs.get('team_mode', None)
if teammod:
myattribs['team_mode'] = teammod

View File

@@ -210,17 +210,18 @@ async def initialize_root_key(generate, automation=False):
suffix = 'automationpubkey'
else:
suffix = 'rootpubkey'
# if myname suffix is rootpubkey, and file exists, zero it
# append instead of replace
keyname = '/var/lib/confluent/public/site/ssh/{0}.{1}'.format(
myname, suffix)
if authorized:
with open(keyname, 'w'):
pass
for auth in authorized:
shutil.copy(
auth,
'/var/lib/confluent/public/site/ssh/{0}.{1}'.format(
myname, suffix))
os.chmod('/var/lib/confluent/public/site/ssh/{0}.{1}'.format(
myname, suffix), 0o644)
os.chown('/var/lib/confluent/public/site/ssh/{0}.{1}'.format(
myname, suffix), neededuid, -1)
with open(auth, 'r') as local_key:
with open(keyname, 'a') as dest:
dest.write(local_key.read())
if os.path.exists(keyname):
os.chmod(keyname, 0o644)
os.chown(keyname, neededuid, -1)
if alreadyexist:
raise AlreadyExists()