2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-01-10 18:12:30 +00:00

Attribute feature enhancement

Add expression functions upper, lower, block_number, and block_offset.

Add an 'id.index' auto-attribute to
yield a number for nodes.
This commit is contained in:
Jarrod Johnson
2025-11-17 11:58:04 -05:00
parent d3e7a49f92
commit 53760ab5dd
3 changed files with 120 additions and 3 deletions

View File

@@ -716,6 +716,7 @@ def become_leader(connection):
if reassimilate is not None:
reassimilate.kill()
reassimilate = eventlet.spawn(reassimilate_missing)
cfm._init_indexes()
cfm._ready = True
if _assimilate_missing(skipaddr):
schedule_rebalance()

View File

@@ -450,6 +450,9 @@ node = {
#IBM Flex)''',
# 'appliesto': ['system'],
# },
'id.index': {
'description': 'Confluent generated numeric index for the node.',
},
'id.model': {
'description': 'The model number of a node. In scenarios where there '
'is both a name and a model number, it is generally '

View File

@@ -164,6 +164,45 @@ def _mkpath(pathname):
raise
def _count_freeindexes(freeindexes):
count = 0
for idx in freeindexes:
if isinstance(idx, list):
for subidx in range(idx[0], idx[1] + 1):
count += 1
else:
count += 1
return count
def _is_free_index(freeindexes, idx):
for freeidx in freeindexes:
if isinstance(freeidx, list):
if freeidx[0] <= idx <= freeidx[1]:
return True
else:
if freeidx == idx:
return True
return False
def _remove_free_index(freeindexes, idx):
for i, freeidx in enumerate(freeindexes):
if isinstance(freeidx, list):
if freeidx[0] <= idx <= freeidx[1]:
if freeidx[0] == freeidx[1]:
del freeindexes[i]
elif freeidx[0] == idx:
freeindexes[i][0] += 1
elif freeidx[1] == idx:
freeindexes[i][1] -= 1
else:
freeindexes.insert(i + 1, [idx + 1, freeidx[1]])
freeindexes[i][1] = idx - 1
return
else:
if freeidx == idx:
del freeindexes[i]
return
def _derive_keys(password, salt):
#implement our specific combination of pbkdf2 transforms to get at
#key. We bump the iterations up because we can afford to
@@ -1220,6 +1259,16 @@ class _ExpressionFormat(string.Formatter):
arg1 = self._handle_ast_node(node.args[0])
arg2 = self._handle_ast_node(node.args[1])
return baseval.replace(arg1, arg2)
elif fun_name == 'upper':
return baseval.upper()
elif fun_name == 'lower':
return baseval.lower()
elif fun_name == 'block_number':
chunk_size = self._handle_ast_node(node.args[0])
return (int(baseval) - 1) // chunk_size + 1
elif fun_name == 'block_offset':
chunk_size = self._handle_ast_node(node.args[0])
return (int(baseval) - 1) % chunk_size + 1
else:
raise ValueError("Unsupported function in expression")
else:
@@ -2225,7 +2274,7 @@ class ConfigManager(object):
watcher = self._nodecollwatchers[self.tenant][watcher]
watcher(added=(), deleting=nodes, renamed=(), configmanager=self)
changeset = {}
for node in nodes:
for node in confluent.util.natural_sort(nodes):
# set a reserved attribute for the sake of the change notification
# framework to trigger on
changeset[node] = {'_nodedeleted': 1}
@@ -2233,6 +2282,29 @@ class ConfigManager(object):
if node in self._cfgstore['nodes']:
self._sync_groups_to_node(node=node, groups=[],
changeset=changeset)
nidx = self._cfgstore['nodes'][node].get('id.index', {}).get('value', None)
if nidx is not None:
currmaxidx = get_global('max_node_index')
freeindexes = get_global('free_node_indexes')
if not freeindexes:
freeindexes = []
if nidx == currmaxidx - 1:
currmaxidx = currmaxidx - 1
while _is_free_index(freeindexes, currmaxidx - 1):
_remove_free_index(freeindexes, currmaxidx - 1)
currmaxidx = currmaxidx - 1
set_global('max_node_index', currmaxidx)
else:
lastindex = freeindexes[-1] if freeindexes else [-2, -2]
if not isinstance(lastindex, list):
lastindex = [lastindex, lastindex]
if nidx == lastindex[1] + 1:
lastindex[1] = nidx
if freeindexes:
freeindexes[-1] = lastindex
else:
freeindexes.append(nidx)
set_global('free_node_indexes', freeindexes)
del self._cfgstore['nodes'][node]
_mark_dirtykey('nodes', node, self.tenant)
self._notif_attribwatchers(changeset)
@@ -2510,12 +2582,29 @@ class ConfigManager(object):
attrname, node)
raise ValueError(errstr)
attribmap[node][attrname] = attrval
for node in attribmap:
for node in confluent.util.natural_sort(attribmap):
node = confluent.util.stringify(node)
exprmgr = None
if node not in self._cfgstore['nodes']:
newnodes.append(node)
self._cfgstore['nodes'][node] = {}
freeindexes = get_global('free_node_indexes')
if not freeindexes:
freeindexes = []
if _count_freeindexes(freeindexes) > 128: # tend to leave freed indexes disused until a lot have accumulated
if isinstance(freeindexes[0], list):
nidx = freeindexes[0][0]
freeindexes[0][0] = nidx + 1
if freeindexes[0][0] == freeindexes[0][1]:
freeindexes[0] = freeindexes[0][0]
else:
nidx = freeindexes.pop(0)
set_global('free_node_indexes', freeindexes)
else:
nidx = get_global('max_node_index')
if nidx is None:
nidx = 0
set_global('max_node_index', nidx + 1)
self._cfgstore['nodes'][node] = {'id.index': {'value': nidx}}
cfgobj = self._cfgstore['nodes'][node]
recalcexpressions = False
for attrname in attribmap[node]:
@@ -3148,6 +3237,29 @@ def get_globals():
bkupglobals[globvar] = _cfgstore['globals'][globvar]
return bkupglobals
def _init_indexes():
maxidx = get_global('max_node_index')
if maxidx is not None:
return
maxidx = 0
maincfgstore = _cfgstore['main']
nodes_without_index = []
for node in confluent.util.natural_sort(maincfgstore.get('nodes', {})):
nidx = maincfgstore['nodes'][node].get('id.index', {}).get('value', None)
if nidx is not None:
if nidx >= maxidx:
maxidx = nidx + 1
else:
nodes_without_index.append(node)
for node in nodes_without_index:
maincfgstore['nodes'][node]['id.index'] = {'value': maxidx}
maxidx += 1
_mark_dirtykey('nodes', node, None)
set_global('max_node_index', maxidx)
set_global('free_node_indexes', [])
ConfigManager._bg_sync_to_file()
def init(stateless=False):
global _cfgstore
global _ready
@@ -3160,6 +3272,7 @@ def init(stateless=False):
_cfgstore = {}
members = list(list_collective())
if len(members) < 2:
_init_indexes()
_ready = True