diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 2519cc39..cd099993 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -716,6 +716,7 @@ def become_leader(connection): if reassimilate is not None: reassimilate.kill() reassimilate = eventlet.spawn(reassimilate_missing) + cfm._init_indexes() cfm._ready = True if _assimilate_missing(skipaddr): schedule_rebalance() diff --git a/confluent_server/confluent/config/attributes.py b/confluent_server/confluent/config/attributes.py index dc5b8d40..84e527b2 100644 --- a/confluent_server/confluent/config/attributes.py +++ b/confluent_server/confluent/config/attributes.py @@ -450,6 +450,9 @@ node = { #IBM Flex)''', # 'appliesto': ['system'], # }, + 'id.index': { + 'description': 'Confluent generated numeric index for the node.', + }, 'id.model': { 'description': 'The model number of a node. In scenarios where there ' 'is both a name and a model number, it is generally ' diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 1810641a..fd8a97a7 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -164,6 +164,45 @@ def _mkpath(pathname): raise +def _count_freeindexes(freeindexes): + count = 0 + for idx in freeindexes: + if isinstance(idx, list): + for subidx in range(idx[0], idx[1] + 1): + count += 1 + else: + count += 1 + return count + +def _is_free_index(freeindexes, idx): + for freeidx in freeindexes: + if isinstance(freeidx, list): + if freeidx[0] <= idx <= freeidx[1]: + return True + else: + if freeidx == idx: + return True + return False + +def _remove_free_index(freeindexes, idx): + for i, freeidx in enumerate(freeindexes): + if isinstance(freeidx, list): + if freeidx[0] <= idx <= freeidx[1]: + if freeidx[0] == freeidx[1]: + del freeindexes[i] + elif freeidx[0] == idx: + freeindexes[i][0] += 1 + elif freeidx[1] == idx: + freeindexes[i][1] -= 1 + else: + freeindexes.insert(i + 1, [idx + 1, freeidx[1]]) + freeindexes[i][1] = idx - 1 + return + else: + if freeidx == idx: + del freeindexes[i] + return + def _derive_keys(password, salt): #implement our specific combination of pbkdf2 transforms to get at #key. We bump the iterations up because we can afford to @@ -1220,6 +1259,16 @@ class _ExpressionFormat(string.Formatter): arg1 = self._handle_ast_node(node.args[0]) arg2 = self._handle_ast_node(node.args[1]) return baseval.replace(arg1, arg2) + elif fun_name == 'upper': + return baseval.upper() + elif fun_name == 'lower': + return baseval.lower() + elif fun_name == 'block_number': + chunk_size = self._handle_ast_node(node.args[0]) + return (int(baseval) - 1) // chunk_size + 1 + elif fun_name == 'block_offset': + chunk_size = self._handle_ast_node(node.args[0]) + return (int(baseval) - 1) % chunk_size + 1 else: raise ValueError("Unsupported function in expression") else: @@ -2225,7 +2274,7 @@ class ConfigManager(object): watcher = self._nodecollwatchers[self.tenant][watcher] watcher(added=(), deleting=nodes, renamed=(), configmanager=self) changeset = {} - for node in nodes: + for node in confluent.util.natural_sort(nodes): # set a reserved attribute for the sake of the change notification # framework to trigger on changeset[node] = {'_nodedeleted': 1} @@ -2233,6 +2282,29 @@ class ConfigManager(object): if node in self._cfgstore['nodes']: self._sync_groups_to_node(node=node, groups=[], changeset=changeset) + nidx = self._cfgstore['nodes'][node].get('id.index', {}).get('value', None) + if nidx is not None: + currmaxidx = get_global('max_node_index') + freeindexes = get_global('free_node_indexes') + if not freeindexes: + freeindexes = [] + if nidx == currmaxidx - 1: + currmaxidx = currmaxidx - 1 + while _is_free_index(freeindexes, currmaxidx - 1): + _remove_free_index(freeindexes, currmaxidx - 1) + currmaxidx = currmaxidx - 1 + set_global('max_node_index', currmaxidx) + else: + lastindex = freeindexes[-1] if freeindexes else [-2, -2] + if not isinstance(lastindex, list): + lastindex = [lastindex, lastindex] + if nidx == lastindex[1] + 1: + lastindex[1] = nidx + if freeindexes: + freeindexes[-1] = lastindex + else: + freeindexes.append(nidx) + set_global('free_node_indexes', freeindexes) del self._cfgstore['nodes'][node] _mark_dirtykey('nodes', node, self.tenant) self._notif_attribwatchers(changeset) @@ -2510,12 +2582,29 @@ class ConfigManager(object): attrname, node) raise ValueError(errstr) attribmap[node][attrname] = attrval - for node in attribmap: + for node in confluent.util.natural_sort(attribmap): node = confluent.util.stringify(node) exprmgr = None if node not in self._cfgstore['nodes']: newnodes.append(node) - self._cfgstore['nodes'][node] = {} + freeindexes = get_global('free_node_indexes') + if not freeindexes: + freeindexes = [] + if _count_freeindexes(freeindexes) > 128: # tend to leave freed indexes disused until a lot have accumulated + if isinstance(freeindexes[0], list): + nidx = freeindexes[0][0] + freeindexes[0][0] = nidx + 1 + if freeindexes[0][0] == freeindexes[0][1]: + freeindexes[0] = freeindexes[0][0] + else: + nidx = freeindexes.pop(0) + set_global('free_node_indexes', freeindexes) + else: + nidx = get_global('max_node_index') + if nidx is None: + nidx = 0 + set_global('max_node_index', nidx + 1) + self._cfgstore['nodes'][node] = {'id.index': {'value': nidx}} cfgobj = self._cfgstore['nodes'][node] recalcexpressions = False for attrname in attribmap[node]: @@ -3148,6 +3237,29 @@ def get_globals(): bkupglobals[globvar] = _cfgstore['globals'][globvar] return bkupglobals +def _init_indexes(): + maxidx = get_global('max_node_index') + if maxidx is not None: + return + maxidx = 0 + maincfgstore = _cfgstore['main'] + nodes_without_index = [] + for node in confluent.util.natural_sort(maincfgstore.get('nodes', {})): + nidx = maincfgstore['nodes'][node].get('id.index', {}).get('value', None) + if nidx is not None: + if nidx >= maxidx: + maxidx = nidx + 1 + else: + nodes_without_index.append(node) + for node in nodes_without_index: + maincfgstore['nodes'][node]['id.index'] = {'value': maxidx} + maxidx += 1 + _mark_dirtykey('nodes', node, None) + set_global('max_node_index', maxidx) + set_global('free_node_indexes', []) + ConfigManager._bg_sync_to_file() + + def init(stateless=False): global _cfgstore global _ready @@ -3160,6 +3272,7 @@ def init(stateless=False): _cfgstore = {} members = list(list_collective()) if len(members) < 2: + _init_indexes() _ready = True