diff --git a/confluent_osdeploy/el8-diskless/profiles/default/scripts/syncfileclient b/confluent_osdeploy/el8-diskless/profiles/default/scripts/syncfileclient index 088fa9f7..5f2efc5e 100644 --- a/confluent_osdeploy/el8-diskless/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/el8-diskless/profiles/default/scripts/syncfileclient @@ -9,6 +9,7 @@ import os import shutil import pwd import grp +import sys from importlib.machinery import SourceFileLoader try: apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module() @@ -231,12 +232,14 @@ def synchronize(): status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) if status >= 300: sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) - sys.stderr.write(repr(rsp)) + sys.stderr.write(rsp.decode('utf8')) + sys.stderr.write('\n') + sys.stderr.flush() return status if status == 202: lastrsp = '' while status != 204: - time.sleep(1+(2*random.random(a))) + time.sleep(1+(2*random.random())) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -297,6 +300,8 @@ if __name__ == '__main__': status = synchronize() except Exception as e: sys.stderr.write(str(e)) + sys.stderr.write('\n') + sys.stderr.flush() status = 300 if status not in (204, 200): time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/el8/profiles/default/scripts/syncfileclient b/confluent_osdeploy/el8/profiles/default/scripts/syncfileclient index 088fa9f7..5f2efc5e 100644 --- a/confluent_osdeploy/el8/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/el8/profiles/default/scripts/syncfileclient @@ -9,6 +9,7 @@ import os import shutil import pwd import grp +import sys from importlib.machinery import SourceFileLoader try: apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module() @@ -231,12 +232,14 @@ def synchronize(): status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) if status >= 300: sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) - sys.stderr.write(repr(rsp)) + sys.stderr.write(rsp.decode('utf8')) + sys.stderr.write('\n') + sys.stderr.flush() return status if status == 202: lastrsp = '' while status != 204: - time.sleep(1+(2*random.random(a))) + time.sleep(1+(2*random.random())) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -297,6 +300,8 @@ if __name__ == '__main__': status = synchronize() except Exception as e: sys.stderr.write(str(e)) + sys.stderr.write('\n') + sys.stderr.flush() status = 300 if status not in (204, 200): time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/el9-diskless/profiles/default/scripts/syncfileclient b/confluent_osdeploy/el9-diskless/profiles/default/scripts/syncfileclient index 088fa9f7..5f2efc5e 100644 --- a/confluent_osdeploy/el9-diskless/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/el9-diskless/profiles/default/scripts/syncfileclient @@ -9,6 +9,7 @@ import os import shutil import pwd import grp +import sys from importlib.machinery import SourceFileLoader try: apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module() @@ -231,12 +232,14 @@ def synchronize(): status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) if status >= 300: sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) - sys.stderr.write(repr(rsp)) + sys.stderr.write(rsp.decode('utf8')) + sys.stderr.write('\n') + sys.stderr.flush() return status if status == 202: lastrsp = '' while status != 204: - time.sleep(1+(2*random.random(a))) + time.sleep(1+(2*random.random())) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -297,6 +300,8 @@ if __name__ == '__main__': status = synchronize() except Exception as e: sys.stderr.write(str(e)) + sys.stderr.write('\n') + sys.stderr.flush() status = 300 if status not in (204, 200): time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/genesis/profiles/default/scripts/syncfileclient b/confluent_osdeploy/genesis/profiles/default/scripts/syncfileclient index 088fa9f7..5f2efc5e 100644 --- a/confluent_osdeploy/genesis/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/genesis/profiles/default/scripts/syncfileclient @@ -9,6 +9,7 @@ import os import shutil import pwd import grp +import sys from importlib.machinery import SourceFileLoader try: apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module() @@ -231,12 +232,14 @@ def synchronize(): status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) if status >= 300: sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) - sys.stderr.write(repr(rsp)) + sys.stderr.write(rsp.decode('utf8')) + sys.stderr.write('\n') + sys.stderr.flush() return status if status == 202: lastrsp = '' while status != 204: - time.sleep(1+(2*random.random(a))) + time.sleep(1+(2*random.random())) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -297,6 +300,8 @@ if __name__ == '__main__': status = synchronize() except Exception as e: sys.stderr.write(str(e)) + sys.stderr.write('\n') + sys.stderr.flush() status = 300 if status not in (204, 200): time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/suse15-diskless/profiles/default/scripts/syncfileclient b/confluent_osdeploy/suse15-diskless/profiles/default/scripts/syncfileclient index 088fa9f7..5f2efc5e 100644 --- a/confluent_osdeploy/suse15-diskless/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/suse15-diskless/profiles/default/scripts/syncfileclient @@ -9,6 +9,7 @@ import os import shutil import pwd import grp +import sys from importlib.machinery import SourceFileLoader try: apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module() @@ -231,12 +232,14 @@ def synchronize(): status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) if status >= 300: sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) - sys.stderr.write(repr(rsp)) + sys.stderr.write(rsp.decode('utf8')) + sys.stderr.write('\n') + sys.stderr.flush() return status if status == 202: lastrsp = '' while status != 204: - time.sleep(1+(2*random.random(a))) + time.sleep(1+(2*random.random())) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -297,6 +300,8 @@ if __name__ == '__main__': status = synchronize() except Exception as e: sys.stderr.write(str(e)) + sys.stderr.write('\n') + sys.stderr.flush() status = 300 if status not in (204, 200): time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/suse15/profiles/hpc/scripts/syncfileclient b/confluent_osdeploy/suse15/profiles/hpc/scripts/syncfileclient index 088fa9f7..5f2efc5e 100644 --- a/confluent_osdeploy/suse15/profiles/hpc/scripts/syncfileclient +++ b/confluent_osdeploy/suse15/profiles/hpc/scripts/syncfileclient @@ -9,6 +9,7 @@ import os import shutil import pwd import grp +import sys from importlib.machinery import SourceFileLoader try: apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module() @@ -231,12 +232,14 @@ def synchronize(): status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) if status >= 300: sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) - sys.stderr.write(repr(rsp)) + sys.stderr.write(rsp.decode('utf8')) + sys.stderr.write('\n') + sys.stderr.flush() return status if status == 202: lastrsp = '' while status != 204: - time.sleep(1+(2*random.random(a))) + time.sleep(1+(2*random.random())) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -297,6 +300,8 @@ if __name__ == '__main__': status = synchronize() except Exception as e: sys.stderr.write(str(e)) + sys.stderr.write('\n') + sys.stderr.flush() status = 300 if status not in (204, 200): time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/suse15/profiles/server/scripts/syncfileclient b/confluent_osdeploy/suse15/profiles/server/scripts/syncfileclient index 088fa9f7..5f2efc5e 100644 --- a/confluent_osdeploy/suse15/profiles/server/scripts/syncfileclient +++ b/confluent_osdeploy/suse15/profiles/server/scripts/syncfileclient @@ -9,6 +9,7 @@ import os import shutil import pwd import grp +import sys from importlib.machinery import SourceFileLoader try: apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module() @@ -231,12 +232,14 @@ def synchronize(): status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) if status >= 300: sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) - sys.stderr.write(repr(rsp)) + sys.stderr.write(rsp.decode('utf8')) + sys.stderr.write('\n') + sys.stderr.flush() return status if status == 202: lastrsp = '' while status != 204: - time.sleep(1+(2*random.random(a))) + time.sleep(1+(2*random.random())) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -297,6 +300,8 @@ if __name__ == '__main__': status = synchronize() except Exception as e: sys.stderr.write(str(e)) + sys.stderr.write('\n') + sys.stderr.flush() status = 300 if status not in (204, 200): time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/ubuntu20.04-diskless/profiles/default/scripts/syncfileclient b/confluent_osdeploy/ubuntu20.04-diskless/profiles/default/scripts/syncfileclient index 088fa9f7..5f2efc5e 100644 --- a/confluent_osdeploy/ubuntu20.04-diskless/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/ubuntu20.04-diskless/profiles/default/scripts/syncfileclient @@ -9,6 +9,7 @@ import os import shutil import pwd import grp +import sys from importlib.machinery import SourceFileLoader try: apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module() @@ -231,12 +232,14 @@ def synchronize(): status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) if status >= 300: sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) - sys.stderr.write(repr(rsp)) + sys.stderr.write(rsp.decode('utf8')) + sys.stderr.write('\n') + sys.stderr.flush() return status if status == 202: lastrsp = '' while status != 204: - time.sleep(1+(2*random.random(a))) + time.sleep(1+(2*random.random())) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -297,6 +300,8 @@ if __name__ == '__main__': status = synchronize() except Exception as e: sys.stderr.write(str(e)) + sys.stderr.write('\n') + sys.stderr.flush() status = 300 if status not in (204, 200): time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/ubuntu20.04/profiles/default/scripts/syncfileclient b/confluent_osdeploy/ubuntu20.04/profiles/default/scripts/syncfileclient index 088fa9f7..5f2efc5e 100644 --- a/confluent_osdeploy/ubuntu20.04/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/ubuntu20.04/profiles/default/scripts/syncfileclient @@ -9,6 +9,7 @@ import os import shutil import pwd import grp +import sys from importlib.machinery import SourceFileLoader try: apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module() @@ -231,12 +232,14 @@ def synchronize(): status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) if status >= 300: sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) - sys.stderr.write(repr(rsp)) + sys.stderr.write(rsp.decode('utf8')) + sys.stderr.write('\n') + sys.stderr.flush() return status if status == 202: lastrsp = '' while status != 204: - time.sleep(1+(2*random.random(a))) + time.sleep(1+(2*random.random())) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -297,6 +300,8 @@ if __name__ == '__main__': status = synchronize() except Exception as e: sys.stderr.write(str(e)) + sys.stderr.write('\n') + sys.stderr.flush() status = 300 if status not in (204, 200): time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/ubuntu22.04/profiles/default/scripts/syncfileclient b/confluent_osdeploy/ubuntu22.04/profiles/default/scripts/syncfileclient index 088fa9f7..5f2efc5e 100644 --- a/confluent_osdeploy/ubuntu22.04/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/ubuntu22.04/profiles/default/scripts/syncfileclient @@ -9,6 +9,7 @@ import os import shutil import pwd import grp +import sys from importlib.machinery import SourceFileLoader try: apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module() @@ -231,12 +232,14 @@ def synchronize(): status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) if status >= 300: sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) - sys.stderr.write(repr(rsp)) + sys.stderr.write(rsp.decode('utf8')) + sys.stderr.write('\n') + sys.stderr.flush() return status if status == 202: lastrsp = '' while status != 204: - time.sleep(1+(2*random.random(a))) + time.sleep(1+(2*random.random())) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -297,6 +300,8 @@ if __name__ == '__main__': status = synchronize() except Exception as e: sys.stderr.write(str(e)) + sys.stderr.write('\n') + sys.stderr.flush() status = 300 if status not in (204, 200): time.sleep((random.random()*3)+2) diff --git a/confluent_server/confluent/selfservice.py b/confluent_server/confluent/selfservice.py index 68b02ec0..8b316d98 100644 --- a/confluent_server/confluent/selfservice.py +++ b/confluent_server/confluent/selfservice.py @@ -517,8 +517,8 @@ def handle_request(req, make_response): pals = get_extra_names(nodename, cfg, myip) result = syncfiles.start_syncfiles( nodename, cfg, json.loads(reqbody), pals) - start_response(result, ()) - yield '' + start_response(result[0], ()) + yield result[1] return if 'GET' == operation: status, output = syncfiles.get_syncresult(nodename) diff --git a/confluent_server/confluent/syncfiles.py b/confluent_server/confluent/syncfiles.py index df5574e3..16cf4c49 100644 --- a/confluent_server/confluent/syncfiles.py +++ b/confluent_server/confluent/syncfiles.py @@ -285,12 +285,11 @@ def mkpathorlink(source, destination, appendexist=False): syncrunners = {} - +cleaner = None def start_syncfiles(nodename, cfg, suffixes, principals=[]): + global cleaner peerip = None - if nodename in syncrunners: - return '503 Synchronization already in progress ' if 'myips' in suffixes: targips = suffixes['myips'] del suffixes['myips'] @@ -313,13 +312,41 @@ def start_syncfiles(nodename, cfg, suffixes, principals=[]): raise Exception('Cannot perform syncfiles without profile assigned') synclist = '/var/lib/confluent/public/os/{}/syncfiles'.format(profile) if not os.path.exists(synclist): - return '200 OK' # not running + return '200 OK', 'No synclist' # not running sl = SyncList(synclist, nodename, cfg) if not (sl.appendmap or sl.mergemap or sl.replacemap or sl.appendoncemap): - return '200 OK' # the synclist has no actual entries + return '200 OK', 'Empty synclist' # the synclist has no actual entries + if nodename in syncrunners: + if syncrunners[nodename].dead: + syncrunners[nodename].wait() + else: + return '503 Synchronization already in progress', 'Synchronization already in progress for {}'.format(nodename) syncrunners[nodename] = eventlet.spawn( sync_list_to_node, sl, nodename, suffixes, peerip) - return '202 Queued' # backgrounded + if not cleaner: + cleaner = eventlet.spawn(cleanit) + return '202 Queued', 'Background synchronization initiated' # backgrounded + + +def cleanit(): + toreap = {} + while True: + for nn in list(syncrunners): + if syncrunners[nn].dead: + if nn in toreap: + try: + syncrunners[nn].wait() + except Exception as e: + print(repr(e)) + pass + del syncrunners[nn] + del toreap[nn] + else: + toreap[nn] = 1 + elif nn in toreap: + del toreap[nn] + eventlet.sleep(30) + def get_syncresult(nodename): if nodename not in syncrunners: