2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-01 15:03:31 +00:00

Fix async syncfiles

This commit is contained in:
Jarrod Johnson
2026-03-17 16:10:21 -04:00
parent 2ab85bb687
commit b4c4ac0861

View File

@@ -167,7 +167,7 @@ class SyncList(object):
self.optmap[f] = entopts
def sync_list_to_node(sl, node, suffixes, peerip=None):
async def sync_list_to_node(sl, node, suffixes, peerip=None):
targdir = tempfile.mkdtemp('.syncto{}'.format(node))
output = ''
try:
@@ -195,7 +195,7 @@ def sync_list_to_node(sl, node, suffixes, peerip=None):
targip = node
if peerip:
targip = peerip
output, stderr = util.check_output(
output, stderr = await util.check_output(
'rsync', '-rvLD', targdir + '/', 'root@[{}]:/'.format(targip))
except Exception as e:
if 'CalledProcessError' not in repr(e):
@@ -318,8 +318,12 @@ def start_syncfiles(nodename, cfg, suffixes, principals=[]):
if not (sl.appendmap or sl.mergemap or sl.replacemap or sl.appendoncemap):
return 200, 'OK', 'Empty synclist' # the synclist has no actual entries
if nodename in syncrunners:
if syncrunners[nodename].dead:
syncrunners[nodename].wait()
if syncrunners[nodename].done():
try:
syncrunners[nodename].result()
except Exception as e:
print(repr(e))
del syncrunners[nodename]
else:
return 503, 'Synchronization already in progress', 'Synchronization already in progress for {}'.format(nodename)
syncrunners[nodename] = tasks.spawn(
@@ -333,10 +337,10 @@ async def cleanit():
toreap = {}
while True:
for nn in list(syncrunners):
if syncrunners[nn].dead:
if syncrunners[nn].done():
if nn in toreap:
try:
syncrunners[nn].wait()
syncrunners[nn].result()
except Exception as e:
print(repr(e))
pass
@@ -352,8 +356,12 @@ async def cleanit():
def get_syncresult(nodename):
if nodename not in syncrunners:
return 204, 'Not Running', ''
if not syncrunners[nodename].dead:
if not syncrunners[nodename].done():
return 200, 'OK', ''
result = syncrunners[nodename].wait()
try:
result = syncrunners[nodename].result()
except Exception as e:
print(repr(e))
result = None
del syncrunners[nodename]
return 200, 'OK', result