2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-03-12 02:49:18 +00:00

Merge master into asyncio

This commit is contained in:
Jarrod Johnson
2024-03-22 15:52:04 -04:00
14 changed files with 107 additions and 31 deletions

View File

@@ -161,3 +161,4 @@ async def main():
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

View File

@@ -689,6 +689,9 @@ async def updateattrib(session, updateargs, nodetype, noderange, options, dictas
for attrib in updateargs[1:]:
keydata[attrib] = None
async for res in session.update(targpath, keydata):
for node in res.get('databynode', {}):
for warnmsg in res['databynode'][node].get('_warnings', []):
sys.stderr.write('Warning: ' + warnmsg + '\n')
if 'error' in res:
if 'errorcode' in res:
exitcode = res['errorcode']

View File

@@ -326,8 +326,9 @@ def install_to_disk(imgpath):
biggestsize = fs['initsize']
if fs['device'].startswith('/dev/mapper'):
oldvgname = fs['device'].rsplit('/', 1)[-1]
# if node has - then /dev/mapper will double up the hypen
if '_' in oldvgname and '-' in oldvgname.split('_')[-1]:
oldvgname = oldvgname.rsplit('-', 1)[0]
oldvgname = oldvgname.rsplit('-', 1)[0].replace('--', '-')
osname = oldvgname.split('_')[0]
nodename = socket.gethostname().split('.')[0]
vgname = '{}_{}'.format(osname, nodename)

View File

@@ -4,4 +4,5 @@ confluent_mgr=$(grep ^deploy_server $deploycfg|awk '{print $2}')
confluent_profile=$(grep ^profile: $deploycfg|awk '{print $2}')
export deploycfg confluent_mgr confluent_profile
curl -f https://$confluent_mgr/confluent-public/os/$confluent_profile/scripts/post.sh > /tmp/post.sh
. /tmp/post.sh
bash /tmp/post.sh
true

View File

@@ -2,7 +2,10 @@
echo "Confluent first boot is running"
HOME=$(getent passwd $(whoami)|cut -d: -f 6)
export HOME
seems a potentially relevant thing to put i... by Jarrod Johnson
(
exec >> /target/var/log/confluent/confluent-firstboot.log
exec 2>> /target/var/log/confluent/confluent-firstboot.log
chmod 600 /target/var/log/confluent/confluent-firstboot.log
cp -a /etc/confluent/ssh/* /etc/ssh/
systemctl restart sshd
rootpw=$(grep ^rootpassword: /etc/confluent/confluent.deploycfg |awk '{print $2}')
@@ -18,7 +21,10 @@ done
hostnamectl set-hostname $(grep ^NODENAME: /etc/confluent/confluent.info | awk '{print $2}')
touch /etc/cloud/cloud-init.disabled
source /etc/confluent/functions
confluent_profile=$(grep ^profile: /etc/confluent/confluent.deploycfg|awk '{print $2}')
export confluent_mgr confluent_profile
run_remote_parts firstboot.d
run_remote_config firstboot.d
curl --capath /etc/confluent/tls -f -H "CONFLUENT_NODENAME: $nodename" -H "CONFLUENT_APIKEY: $confluent_apikey" -X POST -d "status: complete" https://$confluent_mgr/confluent-api/self/updatestatus
) &
tail --pid $! -n 0 -F /target/var/log/confluent/confluent-post.log > /dev/console

View File

@@ -8,7 +8,6 @@ chmod go-rwx /etc/confluent/*
for i in /custom-installation/ssh/*.ca; do
echo '@cert-authority *' $(cat $i) >> /target/etc/ssh/ssh_known_hosts
done
cp -a /etc/ssh/ssh_host* /target/etc/confluent/ssh/
cp -a /etc/ssh/sshd_config.d/confluent.conf /target/etc/confluent/ssh/sshd_config.d/
sshconf=/target/etc/ssh/ssh_config
@@ -19,10 +18,15 @@ echo 'Host *' >> $sshconf
echo ' HostbasedAuthentication yes' >> $sshconf
echo ' EnableSSHKeysign yes' >> $sshconf
echo ' HostbasedKeyTypes *ed25519*' >> $sshconf
cp /etc/confluent/functions /target/etc/confluent/functions
source /etc/confluent/functions
mkdir -p /target/var/log/confluent
cp /var/log/confluent/* /target/var/log/confluent/
(
exec >> /target/var/log/confluent/confluent-post.log
exec 2>> /target/var/log/confluent/confluent-post.log
chmod 600 /target/var/log/confluent/confluent-post.log
curl -f https://$confluent_mgr/confluent-public/os/$confluent_profile/scripts/firstboot.sh > /target/etc/confluent/firstboot.sh
curl -f https://$confluent_mgr/confluent-public/os/$confluent_profile/scripts/functions > /target/etc/confluent/functions
source /target/etc/confluent/functions
chmod +x /target/etc/confluent/firstboot.sh
cp /tmp/allnodes /target/root/.shosts
cp /tmp/allnodes /target/etc/ssh/shosts.equiv
@@ -84,6 +88,8 @@ chroot /target bash -c "source /etc/confluent/functions; run_remote_parts post.d
source /target/etc/confluent/functions
run_remote_config post
python3 /opt/confluent/bin/apiclient /confluent-api/self/updatestatus -d 'status: staged'
umount /target/sys /target/dev /target/proc
) &
tail --pid $! -n 0 -F /target/var/log/confluent/confluent-post.log > /dev/console

View File

@@ -1,5 +1,16 @@
#!/bin/bash
deploycfg=/custom-installation/confluent/confluent.deploycfg
mkdir -p /var/log/confluent
mkdir -p /opt/confluent/bin
mkdir -p /etc/confluent
cp /custom-installation/confluent/confluent.info /custom-installation/confluent/confluent.apikey /etc/confluent/
cat /custom-installation/tls/*.pem >> /etc/confluent/ca.pem
cp /custom-installation/confluent/bin/apiclient /opt/confluent/bin
cp $deploycfg /etc/confluent/
(
exec >> /var/log/confluent/confluent-pre.log
exec 2>> /var/log/confluent/confluent-pre.log
chmod 600 /var/log/confluent/confluent-pre.log
cryptboot=$(grep encryptboot: $deploycfg|sed -e 's/^encryptboot: //')
if [ "$cryptboot" != "" ] && [ "$cryptboot" != "none" ] && [ "$cryptboot" != "null" ]; then
@@ -23,7 +34,17 @@ echo HostbasedAuthentication yes >> /etc/ssh/sshd_config.d/confluent.conf
echo HostbasedUsesNameFromPacketOnly yes >> /etc/ssh/sshd_config.d/confluent.conf
echo IgnoreRhosts no >> /etc/ssh/sshd_config.d/confluent.conf
systemctl restart sshd
mkdir -p /etc/confluent
export nodename confluent_profile confluent_mgr
curl -f https://$confluent_mgr/confluent-public/os/$confluent_profile/scripts/functions > /etc/confluent/functions
. /etc/confluent/functions
run_remote_parts pre.d
curl -f -X POST -H "CONFLUENT_NODENAME: $nodename" -H "CONFLUENT_APIKEY: $apikey" https://$confluent_mgr/confluent-api/self/nodelist > /tmp/allnodes
curl -f https://$confluent_mgr/confluent-public/os/$confluent_profile/scripts/getinstalldisk > /custom-installation/getinstalldisk
python3 /custom-installation/getinstalldisk
if [ ! -e /tmp/installdisk ]; then
curl -f https://$confluent_mgr/confluent-public/os/$confluent_profile/scripts/getinstalldisk > /custom-installation/getinstalldisk
python3 /custom-installation/getinstalldisk
fi
sed -i s!%%INSTALLDISK%%!/dev/$(cat /tmp/installdisk)! /autoinstall.yaml
) &
tail --pid $! -n 0 -F /var/log/confluent/confluent-pre.log > /dev/console

View File

@@ -4,4 +4,5 @@ confluent_mgr=$(grep ^deploy_server $deploycfg|awk '{print $2}')
confluent_profile=$(grep ^profile: $deploycfg|awk '{print $2}')
export deploycfg confluent_mgr confluent_profile
curl -f https://$confluent_mgr/confluent-public/os/$confluent_profile/scripts/post.sh > /tmp/post.sh
. /tmp/post.sh
bash /tmp/post.sh
true

View File

@@ -3,5 +3,12 @@ sed -i 's/label: ubuntu/label: Ubuntu/' $2/profile.yaml && \
ln -s $1/casper/vmlinuz $2/boot/kernel && \
ln -s $1/casper/initrd $2/boot/initramfs/distribution && \
mkdir -p $2/boot/efi/boot && \
ln -s $1/EFI/boot/* $2/boot/efi/boot
if [ -d $1/EFI/boot/ ]; then
ln -s $1/EFI/boot/* $2/boot/efi/boot
elif [ -d $1/efi/boot/ ]; then
ln -s $1/efi/boot/* $2/boot/efi/boot
else
echo "Unrecogrized boot contents in media" >&2
exit 1
fi

View File

@@ -1089,6 +1089,11 @@ class _ExpressionFormat(string.Formatter):
self._nodename = nodename
self._numbers = None
def _vformat(self, format_string, args, kwargs, used_args, recursion_depth,
auto_arg_index=False):
return super()._vformat(format_string, args, kwargs, used_args,
recursion_depth, auto_arg_index)
def get_field(self, field_name, args, kwargs):
return field_name, field_name
@@ -2197,16 +2202,16 @@ class ConfigManager(object):
self._notif_attribwatchers(changeset)
self._bg_sync_to_file()
def clear_node_attributes(self, nodes, attributes):
def clear_node_attributes(self, nodes, attributes, warnings=None):
if cfgleader:
return exec_on_leader('_rpc_master_clear_node_attributes',
self.tenant, nodes, attributes)
if cfgstreams:
exec_on_followers('_rpc_clear_node_attributes', self.tenant,
nodes, attributes)
self._true_clear_node_attributes(nodes, attributes)
self._true_clear_node_attributes(nodes, attributes, warnings)
def _true_clear_node_attributes(self, nodes, attributes):
def _true_clear_node_attributes(self, nodes, attributes, warnings):
# accumulate all changes into a changeset and push in one go
changeset = {}
realattributes = []
@@ -2229,8 +2234,17 @@ class ConfigManager(object):
# delete it and check for inheritence to backfil data
del nodek[attrib]
self._do_inheritance(nodek, attrib, node, changeset)
if not warnings is None:
if attrib in nodek:
warnings.append('The attribute "{}" was defined specifically for the node and clearing now has a value inherited from the group "{}"'.format(attrib, nodek[attrib]['inheritedfrom']))
_addchange(changeset, node, attrib)
_mark_dirtykey('nodes', node, self.tenant)
elif attrib in nodek:
if not warnings is None:
warnings.append('The attribute "{0}" is inherited from group "{1}", leaving the inherited value alone (use "{0}=" with no value to explicitly blank the value if desired)'.format(attrib, nodek[attrib]['inheritedfrom']))
else:
if not warnings is None:
warnings.append('Attribute "{}" is either already cleared, or does not match a defined attribute (if referencing an attribute group, try a wildcard)'.format(attrib))
if ('_expressionkeys' in nodek and
attrib in nodek['_expressionkeys']):
recalcexpressions = True

View File

@@ -247,6 +247,10 @@ class NodeHandler(immhandler.NodeHandler):
if rsp.status == 200:
pwdchanged = True
password = newpassword
wc.set_header('Authorization', 'Bearer ' + rspdata['access_token'])
if '_csrf_token' in wc.cookies:
wc.set_header('X-XSRF-TOKEN', wc.cookies['_csrf_token'])
wc.grab_json_response_with_status('/api/providers/logout')
else:
if rspdata.get('locktime', 0) > 0:
raise LockedUserException(
@@ -280,6 +284,7 @@ class NodeHandler(immhandler.NodeHandler):
rsp.read()
if rsp.status != 200:
return (None, None)
wc.grab_json_response_with_status('/api/providers/logout')
self._currcreds = (username, newpassword)
wc.set_basic_credentials(username, newpassword)
pwdchanged = True
@@ -434,6 +439,7 @@ class NodeHandler(immhandler.NodeHandler):
'/api/function',
{'USER_UserModify': '{0},{1},,1,4,0,0,0,0,,8,,,'.format(uid, username)})
if status == 200 and rsp.get('return', 0) == 13:
wc.grab_json_response('/api/providers/logout')
wc.set_basic_credentials(self._currcreds[0], self._currcreds[1])
status = 503
while status != 200:
@@ -442,10 +448,13 @@ class NodeHandler(immhandler.NodeHandler):
{'UserName': username}, method='PATCH')
if status != 200:
rsp = json.loads(rsp)
if rsp.get('error', {}).get('code', 'Unknown') in ('Base.1.8.GeneralError', 'Base.1.12.GeneralError'):
eventlet.sleep(10)
if rsp.get('error', {}).get('code', 'Unknown') in ('Base.1.8.GeneralError', 'Base.1.12.GeneralError', 'Base.1.14.GeneralError'):
eventlet.sleep(4)
else:
break
self.tmppasswd = None
self._currcreds = (username, passwd)
return
self.tmppasswd = None
wc.grab_json_response('/api/providers/logout')
self._currcreds = (username, passwd)
@@ -632,3 +641,4 @@ def remote_nodecfg(nodename, cfm):
info = {'addresses': [ipaddr]}
nh = NodeHandler(info, cfm)
nh.config(nodename)

View File

@@ -411,9 +411,7 @@ def check_ubuntu(isoinfo):
]
return {'name': 'ubuntu-{0}-{1}'.format(ver, arch),
'method': EXTRACT|COPY,
'extractlist': ['casper/vmlinuz', 'casper/initrd',
'efi/boot/bootx64.efi', 'efi/boot/grubx64.efi'
],
'extractlist': exlist,
'copyto': 'install.iso',
'category': 'ubuntu{0}'.format(major)}

View File

@@ -21,16 +21,16 @@ import confluent.util as util
from fnmatch import fnmatch
def retrieve(nodes, element, configmanager, inputdata):
def retrieve(nodes, element, configmanager, inputdata, clearwarnbynode=None):
configmanager.check_quorum()
if nodes is not None:
return retrieve_nodes(nodes, element, configmanager, inputdata)
return retrieve_nodes(nodes, element, configmanager, inputdata, clearwarnbynode)
elif element[0] == 'nodegroups':
return retrieve_nodegroup(
element[1], element[3], configmanager, inputdata)
element[1], element[3], configmanager, inputdata, clearwarnbynode)
def retrieve_nodegroup(nodegroup, element, configmanager, inputdata):
def retrieve_nodegroup(nodegroup, element, configmanager, inputdata, clearwarnbynode):
try:
grpcfg = configmanager.get_nodegroup_attributes(nodegroup)
except KeyError:
@@ -106,10 +106,12 @@ def retrieve_nodegroup(nodegroup, element, configmanager, inputdata):
raise Exception("BUGGY ATTRIBUTE FOR NODEGROUP")
def retrieve_nodes(nodes, element, configmanager, inputdata):
def retrieve_nodes(nodes, element, configmanager, inputdata, clearwarnbynode):
attributes = configmanager.get_node_attributes(nodes)
if element[-1] == 'all':
for node in util.natural_sort(nodes):
if clearwarnbynode and node in clearwarnbynode:
yield msg.Attributes(node, {'_warnings': clearwarnbynode[node]})
theattrs = set(allattributes.node).union(set(attributes[node]))
for attribute in sorted(theattrs):
if attribute in attributes[node]: # have a setting for it
@@ -266,6 +268,7 @@ def update_nodes(nodes, element, configmanager, inputdata):
namemap[node] = rename['rename']
configmanager.rename_nodes(namemap)
return yield_rename_resources(namemap, isnode=True)
clearwarnbynode = {}
for node in nodes:
updatenode = inputdata.get_attributes(node, allattributes.node)
clearattribs = []
@@ -299,10 +302,11 @@ def update_nodes(nodes, element, configmanager, inputdata):
markup = (e.text[:e.offset-1] + '-->' + e.text[e.offset-1] + '<--' + e.text[e.offset:]).strip()
raise exc.InvalidArgumentException('Syntax error in attribute name: "{0}"'.format(markup))
if len(clearattribs) > 0:
configmanager.clear_node_attributes([node], clearattribs)
clearwarnbynode[node] = []
configmanager.clear_node_attributes([node], clearattribs, warnings=clearwarnbynode[node])
updatedict[node] = updatenode
try:
configmanager.set_node_attributes(updatedict)
except ValueError as e:
raise exc.InvalidArgumentException(str(e))
return retrieve(nodes, element, configmanager, inputdata)
return retrieve(nodes, element, configmanager, inputdata, clearwarnbynode)

View File

@@ -24,6 +24,7 @@ import confluent.noderange as noderange
import eventlet
import pwd
import grp
import sys
def mkdirp(path):
try:
@@ -193,9 +194,8 @@ def sync_list_to_node(sl, node, suffixes, peerip=None):
targip = node
if peerip:
targip = peerip
#BOOO, need stderr!!!
output = util.run(
['rsync', '-rvLD', targdir + '/', 'root@[{}]:/'.format(targip)])[0]
output, stderr = util.run(
['rsync', '-rvLD', targdir + '/', 'root@[{}]:/'.format(targip)])
except Exception as e:
if 'CalledProcessError' not in repr(e):
# https://github.com/eventlet/eventlet/issues/413
@@ -215,6 +215,9 @@ def sync_list_to_node(sl, node, suffixes, peerip=None):
raise Exception("Syncing failed due to unreadable files: " + ','.join(unreadablefiles))
elif hasattr(e, 'stderr') and e.stderr and b'Permission denied, please try again.' in e.stderr:
raise Exception('Syncing failed due to authentication error, is the confluent automation key not set up (osdeploy initialize -a) or is there some process replacing authorized_keys on the host?')
elif hasattr(e, 'stderr') and e.stderr:
sys.stderr.write(e.stderr.decode('utf8'))
raise
else:
raise
finally: