diff --git a/confluent_server/bin/confluentdbutil b/confluent_server/bin/confluentdbutil index be35e5aa..25a5acf8 100755 --- a/confluent_server/bin/confluentdbutil +++ b/confluent_server/bin/confluentdbutil @@ -69,7 +69,11 @@ if args[0] == 'restore': if options.interactivepassword: password = getpass.getpass('Enter password to restore backup: ') try: + cfm.init(True) + cfm.statelessmode = True cfm.restore_db_from_directory(dumpdir, password) + cfm.statelessmode = False + cfm.ConfigManager.wait_for_sync(True) if owner != 0: for targdir in os.walk('/etc/confluent'): os.chown(targdir[0], owner, group) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 58f4bc59..68a8c94a 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -565,6 +565,8 @@ def _load_dict_from_dbm(dpath, tdb): currdict[tks] = cPickle.loads(dbe[tk]) # nosec tk = dbe.nextkey(tk) except dbm.error: + if os.path.exists(tdb): + raise return @@ -2604,7 +2606,13 @@ class ConfigManager(object): with _dirtylock: dirtyglobals = copy.deepcopy(_cfgstore['dirtyglobals']) del _cfgstore['dirtyglobals'] - globalf = dbm.open(os.path.join(cls._cfgdir, "globals"), 'c', 384) # 0600 + try: + globalf = dbm.open(os.path.join(cls._cfgdir, "globals"), 'c', 384) # 0600 + except dbm.error: + if not fullsync: + raise + os.remove(os.path.join(cls._cfgdir, "globals")) + globalf = dbm.open(os.path.join(cls._cfgdir, "globals"), 'c', 384) # 0600 try: for globalkey in dirtyglobals: if globalkey in _cfgstore['globals']: @@ -2617,8 +2625,15 @@ class ConfigManager(object): globalf.close() if fullsync or 'collectivedirty' in _cfgstore: if len(_cfgstore.get('collective', ())) > 1: - collectivef = dbm.open(os.path.join(cls._cfgdir, "collective"), - 'c', 384) + try: + collectivef = dbm.open(os.path.join(cls._cfgdir, 'collective'), + 'c', 384) + except dbm.error: + if not fullsync: + raise + os.remove(os.path.join(cls._cfgdir, 'collective')) + collectivef = dbm.open(os.path.join(cls._cfgdir, 'collective'), + 'c', 384) try: if fullsync: colls = _cfgstore['collective'] @@ -2645,7 +2660,13 @@ class ConfigManager(object): currdict = _cfgstore['main'] for category in currdict: _mkpath(pathname) - dbf = dbm.open(os.path.join(pathname, category), 'c', 384) # 0600 + try: + dbf = dbm.open(os.path.join(pathname, category), 'c', 384) # 0600 + except dbm.error: + if not fullsync: + raise + os.remove(os.path.join(pathname, category)) + dbf = dbm.open(os.path.join(pathname, category), 'c', 384) # 0600 try: for ck in currdict[category]: dbf[ck] = cPickle.dumps(currdict[category][ck], protocol=cPickle.HIGHEST_PROTOCOL) @@ -2665,7 +2686,13 @@ class ConfigManager(object): currdict = _cfgstore['tenant'][tenant] for category in dkdict: _mkpath(pathname) - dbf = dbm.open(os.path.join(pathname, category), 'c', 384) # 0600 + try: + dbf = dbm.open(os.path.join(pathname, category), 'c', 384) # 0600 + except dbm.error: + if not fullsync: + raise + os.remove(os.path.join(pathname, category)) + dbf = dbm.open(os.path.join(pathname, category), 'c', 384) # 0600 try: for ck in dkdict[category]: if ck not in currdict[category]: diff --git a/confluent_server/confluent/main.py b/confluent_server/confluent/main.py index 5f35b2c8..207d3783 100644 --- a/confluent_server/confluent/main.py +++ b/confluent_server/confluent/main.py @@ -29,6 +29,10 @@ import atexit import confluent.auth as auth import confluent.config.conf as conf import confluent.config.configmanager as configmanager +try: + import anydbm as dbm +except ModuleNotFoundError: + import dbm import confluent.consoleserver as consoleserver import confluent.core as confluentcore import confluent.httpapi as httpapi @@ -62,8 +66,10 @@ import os import glob import signal import socket +import subprocess import time import traceback +import tempfile import uuid @@ -232,8 +238,21 @@ def sanity_check(): assure_ownership('/etc/confluent/srvcert.pem') +def migrate_db(): + tdir = tempfile.mkdtemp() + subprocess.check_call(['python3', '-c', 'pass']) + subprocess.check_call(['python2', '/opt/confluent/bin/confluentdbutil', 'dump', '-u', tdir]) + subprocess.check_call(['python3', '/opt/confluent/bin/confluentdbutil', 'restore', '-u', tdir]) + subprocess.check_call(['rm', '-rf', tdir]) + configmanager.init() + + def run(args): setlimits() + try: + configmanager.ConfigManager(None) + except dbm.error: + migrate_db() try: signal.signal(signal.SIGUSR1, dumptrace) except AttributeError: