diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index ddd06360..89df6b04 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -79,6 +79,7 @@ _masterkey = None _masterintegritykey = None _dirtylock = threading.RLock() _leaderlock = gthread.RLock() +_synclock = threading.RLock() _config_areas = ('nodegroups', 'nodes', 'usergroups', 'users') tracelog = None statelessmode = False @@ -548,7 +549,7 @@ def commit_clear(): os.remove(os.path.join(ConfigManager._cfgdir, cfg)) except OSError as oe: pass - ConfigManager._sync_to_file(fullsync=True) + ConfigManager.wait_for_sync(True) ConfigManager._bg_sync_to_file() cfgleader = None @@ -1877,8 +1878,8 @@ class ConfigManager(object): pass @classmethod - def wait_for_sync(cls): - cls._bg_sync_to_file() + def wait_for_sync(cls, fullsync=False): + cls._bg_sync_to_file(fullsync) if cls._cfgwriter is not None: cls._cfgwriter.join() @@ -1888,7 +1889,7 @@ class ConfigManager(object): sys.exit(0) @classmethod - def _bg_sync_to_file(cls): + def _bg_sync_to_file(cls, fullsync=False): if statelessmode: return with cls._syncstate: @@ -1899,88 +1900,89 @@ class ConfigManager(object): # if the thread is exiting, join it to let it close, just in case if cls._cfgwriter is not None: cls._cfgwriter.join() - cls._cfgwriter = threading.Thread(target=cls._sync_to_file) + cls._cfgwriter = threading.Thread(target=cls._sync_to_file, args=fullsync) cls._cfgwriter.start() @classmethod def _sync_to_file(cls, fullsync=False): - if statelessmode: - return - with open(os.path.join(cls._cfgdir, 'transactioncount'), 'w') as f: - f.write(struct.pack('!Q', _txcount)) - if fullsync or 'dirtyglobals' in _cfgstore: - if fullsync: - dirtyglobals = _cfgstore['globals'] - else: - with _dirtylock: - dirtyglobals = copy.deepcopy(_cfgstore['dirtyglobals']) - del _cfgstore['dirtyglobals'] - _mkpath(cls._cfgdir) - globalf = dbm.open(os.path.join(cls._cfgdir, "globals"), 'c', 384) # 0600 - try: - for globalkey in dirtyglobals: - if globalkey in _cfgstore['globals']: - globalf[globalkey] = \ - cPickle.dumps(_cfgstore['globals'][globalkey]) - else: - if globalkey in globalf: - del globalf[globalkey] - finally: - globalf.close() - if fullsync or 'collectivedirty' in _cfgstore: - collectivef = dbm.open(os.path.join(cls._cfgdir, "collective"), - 'c', 384) - try: + with _synclock: + if statelessmode: + return + with open(os.path.join(cls._cfgdir, 'transactioncount'), 'w') as f: + f.write(struct.pack('!Q', _txcount)) + if fullsync or 'dirtyglobals' in _cfgstore: if fullsync: - colls = _cfgstore['collective'] + dirtyglobals = _cfgstore['globals'] else: with _dirtylock: - colls = copy.deepcopy(_cfgstore['collectivedirty']) - del _cfgstore['collectivedirty'] - for coll in colls: - if coll in _cfgstore['collective']: - collectivef[coll] = cPickle.dumps( - _cfgstore['collective'][coll]) - else: - if coll in collectivef: - del globalf[coll] - finally: - collectivef.close() - if fullsync: - pathname = cls._cfgdir - currdict = _cfgstore['main'] - for category in currdict: - _mkpath(pathname) - dbf = dbm.open(os.path.join(pathname, category), 'c', 384) # 0600 + dirtyglobals = copy.deepcopy(_cfgstore['dirtyglobals']) + del _cfgstore['dirtyglobals'] + _mkpath(cls._cfgdir) + globalf = dbm.open(os.path.join(cls._cfgdir, "globals"), 'c', 384) # 0600 try: - for ck in currdict[category]: - dbf[ck] = cPickle.dumps(currdict[category][ck]) + for globalkey in dirtyglobals: + if globalkey in _cfgstore['globals']: + globalf[globalkey] = \ + cPickle.dumps(_cfgstore['globals'][globalkey]) + else: + if globalkey in globalf: + del globalf[globalkey] finally: - dbf.close() - elif 'dirtykeys' in _cfgstore: - with _dirtylock: - currdirt = copy.deepcopy(_cfgstore['dirtykeys']) - del _cfgstore['dirtykeys'] - for tenant in currdirt: - dkdict = currdirt[tenant] - if tenant is None: - pathname = cls._cfgdir - currdict = _cfgstore['main'] - else: - pathname = os.path.join(cls._cfgdir, 'tenants', tenant) - currdict = _cfgstore['tenant'][tenant] - for category in dkdict: + globalf.close() + if fullsync or 'collectivedirty' in _cfgstore: + collectivef = dbm.open(os.path.join(cls._cfgdir, "collective"), + 'c', 384) + try: + if fullsync: + colls = _cfgstore['collective'] + else: + with _dirtylock: + colls = copy.deepcopy(_cfgstore['collectivedirty']) + del _cfgstore['collectivedirty'] + for coll in colls: + if coll in _cfgstore['collective']: + collectivef[coll] = cPickle.dumps( + _cfgstore['collective'][coll]) + else: + if coll in collectivef: + del globalf[coll] + finally: + collectivef.close() + if fullsync: + pathname = cls._cfgdir + currdict = _cfgstore['main'] + for category in currdict: _mkpath(pathname) dbf = dbm.open(os.path.join(pathname, category), 'c', 384) # 0600 try: - for ck in dkdict[category]: - if ck not in currdict[category]: - if ck in dbf: - del dbf[ck] - else: - dbf[ck] = cPickle.dumps(currdict[category][ck]) + for ck in currdict[category]: + dbf[ck] = cPickle.dumps(currdict[category][ck]) finally: dbf.close() + elif 'dirtykeys' in _cfgstore: + with _dirtylock: + currdirt = copy.deepcopy(_cfgstore['dirtykeys']) + del _cfgstore['dirtykeys'] + for tenant in currdirt: + dkdict = currdirt[tenant] + if tenant is None: + pathname = cls._cfgdir + currdict = _cfgstore['main'] + else: + pathname = os.path.join(cls._cfgdir, 'tenants', tenant) + currdict = _cfgstore['tenant'][tenant] + for category in dkdict: + _mkpath(pathname) + dbf = dbm.open(os.path.join(pathname, category), 'c', 384) # 0600 + try: + for ck in dkdict[category]: + if ck not in currdict[category]: + if ck in dbf: + del dbf[ck] + else: + dbf[ck] = cPickle.dumps(currdict[category][ck]) + finally: + dbf.close() willrun = False with cls._syncstate: if cls._writepending: