mirror of
https://github.com/xcat2/confluent.git
synced 2026-03-06 14:29:18 +00:00
When a non-readable file was encountered, confluent would cryptically report rsync failure. Check for the usual culprit, unreadable files if rsync fails. Cause this error to manifest with clearer text.
198 lines
7.1 KiB
Python
198 lines
7.1 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2021 Lenovo
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import glob
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import confluent.sshutil as sshutil
|
|
import eventlet.green.subprocess as subprocess
|
|
import eventlet
|
|
|
|
def mkdirp(path):
|
|
try:
|
|
os.makedirs(path)
|
|
except OSError as e:
|
|
if e.errno != 17:
|
|
raise
|
|
|
|
class SyncList(object):
|
|
def __init__(self, filename):
|
|
slist = None
|
|
self.replacemap = {}
|
|
self.appendmap = {}
|
|
self.mergemap = {}
|
|
with open(filename, 'r') as slfile:
|
|
slist = slfile.read()
|
|
entries = slist.split('\n')
|
|
currmap = self.replacemap
|
|
for ent in entries:
|
|
try:
|
|
cmtidx = ent.index('#')
|
|
ent = ent[:cmtidx]
|
|
except ValueError:
|
|
pass
|
|
for special in '!@$%^&()|{}':
|
|
if special in ent:
|
|
raise Exception(
|
|
'Special character "{}" reserved for future use'.format(special))
|
|
ent = ent.strip()
|
|
if not ent:
|
|
continue
|
|
if ent[-1] == ':':
|
|
if ent == 'MERGE:':
|
|
currmap = self.mergemap
|
|
else:
|
|
raise Exception(
|
|
'Section "{}" is not currently supported in syncfiles'.format(ent[:-1]))
|
|
continue
|
|
if '->' in ent:
|
|
k, v = ent.split('->')
|
|
k = k.strip()
|
|
v = v.strip()
|
|
else:
|
|
k = ent
|
|
v = None
|
|
currmap[k] = v
|
|
|
|
|
|
def sync_list_to_node(sl, node, suffixes):
|
|
targdir = tempfile.mkdtemp('.syncto{}'.format(node))
|
|
output = ''
|
|
try:
|
|
for ent in sl.replacemap:
|
|
stage_ent(sl.replacemap, ent, targdir)
|
|
if 'append' in suffixes:
|
|
while suffixes['append'] and suffixes['append'][0] == '/':
|
|
suffixes['append'] = suffixes['append'][1:]
|
|
for ent in sl.appendmap:
|
|
stage_ent(sl.appendmap, ent,
|
|
os.path.join(targdir, suffixes['append']))
|
|
if 'merge' in suffixes:
|
|
while suffixes['merge'] and suffixes['merge'][0] == '/':
|
|
suffixes['merge'] = suffixes['merge'][1:]
|
|
for ent in sl.mergemap:
|
|
stage_ent(sl.mergemap, ent,
|
|
os.path.join(targdir, suffixes['merge']), True)
|
|
sshutil.prep_ssh_key('/etc/confluent/ssh/automation')
|
|
output = subprocess.check_output(
|
|
['rsync', '-rvL', targdir + '/', 'root@{}:/'.format(node)], timeout=86400)
|
|
except Exception as e:
|
|
if 'CalledProcessError' not in repr(e):
|
|
# https://github.com/eventlet/eventlet/issues/413
|
|
# for some reason, can't catch the calledprocesserror normally
|
|
# for this exception, implement a hack workaround
|
|
raise
|
|
unreadablefiles = []
|
|
for root, dirnames, filenames in os.walk(targdir):
|
|
for filename in filenames:
|
|
filename = os.path.join(root, filename)
|
|
try:
|
|
with open(filename, 'r') as _:
|
|
pass
|
|
except OSError as e:
|
|
unreadablefiles.append(filename.replace(targdir, ''))
|
|
if unreadablefiles:
|
|
raise Exception("Syncing failed due to unreadable files: " + ','.join(unreadablefiles))
|
|
else:
|
|
raise
|
|
finally:
|
|
shutil.rmtree(targdir)
|
|
return output
|
|
|
|
def stage_ent(currmap, ent, targdir, appendexist=False):
|
|
dst = currmap[ent]
|
|
everyfent = []
|
|
allfents = ent.split()
|
|
for tmpent in allfents:
|
|
fents = glob.glob(tmpent)
|
|
everyfent.extend(fents)
|
|
if not everyfent:
|
|
raise Exception('No matching files for "{}"'.format(ent))
|
|
if dst is None: # this is to indicate source and destination as one
|
|
dst = os.path.dirname(everyfent[0]) + '/'
|
|
while dst and dst[0] == '/':
|
|
dst = dst[1:]
|
|
if len(everyfent) > 1 and dst[-1] != '/':
|
|
raise Exception(
|
|
'Multiple files match {}, {} needs a trailing slash to indicate a directory'.format(ent, dst))
|
|
fulltarg = os.path.join(targdir, dst)
|
|
for targ in everyfent:
|
|
mkpathorlink(targ, fulltarg, appendexist)
|
|
|
|
def mkpathorlink(source, destination, appendexist=False):
|
|
if os.path.isdir(source):
|
|
mkdirp(destination)
|
|
for ent in os.listdir(source):
|
|
currsrc = os.path.join(source, ent)
|
|
currdst = os.path.join(destination, ent)
|
|
mkpathorlink(currsrc, currdst)
|
|
else:
|
|
if destination[-1] == '/':
|
|
mkdirp(destination)
|
|
destination = os.path.join(destination, os.path.basename(source))
|
|
else:
|
|
mkdirp(os.path.dirname(destination))
|
|
if appendexist and os.path.exists(destination):
|
|
tmpnam = tempfile.mktemp()
|
|
shutil.copy(destination, tmpnam)
|
|
os.remove(destination)
|
|
with open(destination, 'w') as realdest:
|
|
with open(tmpnam) as olddest:
|
|
realdest.write(olddest.read())
|
|
with open(source) as sourcedata:
|
|
realdest.write(sourcedata.read())
|
|
os.remove(tmpnam)
|
|
else:
|
|
os.symlink(source, destination)
|
|
|
|
|
|
syncrunners = {}
|
|
|
|
|
|
def start_syncfiles(nodename, cfg, suffixes):
|
|
deployinfo = cfg.get_node_attributes(
|
|
nodename, ('deployment.*',))
|
|
deployinfo = deployinfo.get(nodename, {})
|
|
profile = deployinfo.get(
|
|
'deployment.pendingprofile', {}).get('value', '')
|
|
if not profile:
|
|
profile = deployinfo.get(
|
|
'deployment.stagedprofile', {}).get('value', '')
|
|
if not profile:
|
|
profile = deployinfo.get(
|
|
'deployment.profile', {}).get('value', '')
|
|
if not profile:
|
|
raise Exception('Cannot perform syncfiles without profile assigned')
|
|
synclist = '/var/lib/confluent/public/os/{}/syncfiles'.format(profile)
|
|
if not os.path.exists(synclist):
|
|
return '200 OK' # not running
|
|
sl = SyncList(synclist)
|
|
if not (sl.appendmap or sl.mergemap or sl.replacemap):
|
|
return '200 OK' # the synclist has no actual entries
|
|
syncrunners[nodename] = eventlet.spawn(
|
|
sync_list_to_node, sl, nodename, suffixes)
|
|
return '202 Queued' # backgrounded
|
|
|
|
def get_syncresult(nodename):
|
|
if nodename not in syncrunners:
|
|
return ('204 Not Running', '')
|
|
if not syncrunners[nodename].dead:
|
|
return ('200 OK', '')
|
|
result = syncrunners[nodename].wait()
|
|
del syncrunners[nodename]
|
|
return ('200 OK', result)
|