From fda2dd08d1377b7833dae9e53760ecc25f712303 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 1 Apr 2014 11:01:26 -0400 Subject: [PATCH] First pass at configuration change notification Add ability for code to add watchers on nodes and their attributes. This is likely to be reworked internally to better aggregate requests, but the code interface is potentially complete. --- confluent/config/configmanager.py | 140 ++++++++++++++++++++++++++++-- 1 file changed, 134 insertions(+), 6 deletions(-) diff --git a/confluent/config/configmanager.py b/confluent/config/configmanager.py index 88fc40dd..8a6d1e9c 100644 --- a/confluent/config/configmanager.py +++ b/confluent/config/configmanager.py @@ -65,8 +65,10 @@ import fcntl import math import operator import os +import random import re import string +import sys import threading @@ -363,6 +365,9 @@ class ConfigManager(object): _cfgdir = "/etc/confluent/cfg/" _cfgwriter = None _writepending = False + _attribwatchers = {} + _nodecollwatchers = {} + _notifierids = {} def __init__(self, tenant, decrypt=False): global _cfgstore @@ -382,6 +387,81 @@ class ConfigManager(object): self.tenant = tenant self._cfgstore = _cfgstore['tenant'][tenant] + def watch_attributes(self, nodes, attributes, callback): + """ + Watch a list of attributes for changes on a list of nodes + + :param nodes: An iterable of node names to be watching + :param attributes: An iterable of attribute names to be notified about + :param callback: A callback to process a notification + + Returns an identifier that can be used to unsubscribe from these + notifications using remove_watcher + """ + notifierid = random.randint(0, sys.maxint) + while notifierid in self._notifierids: + notifierid = random.randint(0, sys.maxint) + self._notifierids[notifierid] = { 'attriblist': [] } + if self.tenant not in self._attribwatchers: + self._attribwatchers[self.tenant] = {} + attribwatchers = self._attribwatchers[self.tenant] + for node in nodes: + if node not in attribwatchers: + attribwatchers[node] = {} + for attribute in attributes: + self._notifierids[notifierid]['attriblist'].append( + (node,attribute)) + if attribute not in attribwatchers[node]: + attribwatchers[node][attribute] = { + notifierid: callback + } + else: + attribwatchers[node][attribute][notifierid] = callback + return notifierid + + def watch_nodecollection(self, callback): + """ + Watch the nodecollection for addition or removal of nodes. + + A watcher is notified prior after node has been added and before node + is actually removed. + + :param callback: Function to call when a node is added or removed + + Returns an identifier that can be used to unsubscribe from these + notifications using remove_watcher + """ + # first provide an identifier for the calling code to + # use in case of cancellation. + # I anticipate no more than a handful of watchers of this sort, so + # this loop should not have to iterate too many times + notifierid = random.randint(0, sys.maxint) + while notifierid in self._notifierids: + notifierid = random.randint(0, sys.maxint) + # going to track that this is a nodecollection type watcher, + # but there is no additional data associated. + self.notifierids[notifierid] = set(['nodecollection']) + if self.tenant not in self._nodecollwatchers: + self._nodecollwatchers[self.tenant] = {} + self._nodecollwatchers[self.tenant][notifierid] = callback + return notifierid + + def remove_watcher(self, watcher): + # identifier of int would be a collection watcher + if watcher not in self._notifierids: + raise Exception("Invalid") + # return + if 'attriblist' in self._notifierids[watcher]: + attribwatchers = self._attribwatchers[self.tenant] + for nodeattrib in self._notifierids[watcher]['attriblist']: + node, attrib = nodeattrib + del attribwatchers[node][attrib][watcher] + elif 'nodecollection' in self.notifierids[watcher]: + del self._nodecollwatchers[self.tenant][watcher] + else: + raise Exception("Completely not a valid place to be") + del self._notifierids[watcher] + def get_user(self, name): """Get user information from DB @@ -430,6 +510,7 @@ class ConfigManager(object): raise Exception("Duplicate id requested") if 'users' not in self._cfgstore: self._cfgstore['users'] = { } + name = name.encode('utf-8') if name in self._cfgstore['users']: raise Exception("Duplicate username requested") _mark_dirtykey('users', name, self.tenant) @@ -518,6 +599,7 @@ class ConfigManager(object): return for attrib in groupcfg.iterkeys(): self._do_inheritance(nodecfg, attrib, node) + self._notif_attribwatchers([attrib], [node]) def _node_removed_from_group(self, node, group): try: @@ -534,6 +616,7 @@ class ConfigManager(object): _mark_dirtykey('nodes', node, self.tenant) del nodecfg[attrib] # remove invalid inherited data self._do_inheritance(nodecfg, attrib, node) + self._notif_attribwatchers([attrib], [node]) except KeyError: # inheritedfrom not set, move on pass @@ -560,6 +643,7 @@ class ConfigManager(object): copy.deepcopy(self._cfgstore['groups'][group][attrib]) nodecfg[attrib]['inheritedfrom'] = group self._refresh_nodecfg(nodecfg, attrib, nodename) + self._notif_attribwatchers([attrib], [nodename]) return if srcgroup is not None and group == srcgroup: # break out @@ -613,6 +697,7 @@ class ConfigManager(object): if 'groups' not in self._cfgstore: self._cfgstore['groups'] = {} for group in attribmap.iterkeys(): + group = group.encode('utf-8') _mark_dirtykey('groups', group, self.tenant) if group not in self._cfgstore['groups']: self._cfgstore['groups'][group] = {'nodes': set([])} @@ -627,7 +712,7 @@ class ConfigManager(object): if not isinstance(attribmap[group][attr], list): raise ValueError newdict = set(attribmap[group][attr]) - elif (isinstance(attribmap[group][attr], str) or + elif (isinstance(attribmap[group][attr], str) or isinstance(attribmap[group][attr], unicode)): newdict = { 'value': attribmap[group][attr] } else: @@ -643,6 +728,7 @@ class ConfigManager(object): for node in cfgobj['nodes']: nodecfg = self._cfgstore['nodes'][node] self._do_inheritance(nodecfg, attr, node, srcgroup=group) + self._notif_attribwatchers([attr], [node]) self._bg_sync_to_file() def _refresh_nodecfg(self, cfgobj, attrname, node): @@ -656,9 +742,37 @@ class ConfigManager(object): attrname in cfgobj['_expressionkeys']): if exprmgr is None: exprmgr = _ExpressionFormat(cfgobj, node) - self._recalculate_expressions(cfgobj, formatter=exprmgr) + self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node) + + def _notif_attribwatchers(self, attrnames, nodes): + if self.tenant not in self._attribwatchers: + return + notifdata = {} + attribwatchers = self._attribwatchers[self.tenant] + for node in nodes: + if node not in attribwatchers: + continue + attribwatchers = attribwatchers[node] + for attrname in attrnames: + if attrname not in attribwatchers: + continue + for notifierid in attribwatchers[attrname].iterkeys(): + if notifierid not in notifdata: + notifdata[notifierid] = { + 'nodes': [ node ], + 'attributes': [ attrname ], + 'callback': attribwatchers[attrname][notifierid] + } + for watcher in notifdata.itervalues(): + nodes = watcher['nodes'] + attributes = watcher['attributes'] + callback = watcher['callback'] + callback(nodes=nodes, attributes=attributes, configmanager=self) def del_nodes(self, nodes): + if self.tenant in self._nodecollwatchers: + for watcher in self._nodecollwatchers[self.tenant].itervalues(): + watcher(added=[], deleting=nodes, configmanager=self) if 'nodes' not in self._cfgstore: return for node in nodes: @@ -680,6 +794,7 @@ class ConfigManager(object): def clear_node_attributes(self, nodes, attributes): for node in nodes: + node = node.encode('utf-8') try: nodek = self._cfgstore['nodes'][node] except KeyError: @@ -692,12 +807,13 @@ class ConfigManager(object): _mark_dirtykey('nodes', node, self.tenant) del nodek[attrib] self._do_inheritance(nodek, attrib, node) + self._notif_attribwatchers([attrib], [node]) if ('_expressionkeys' in nodek and attrib in nodek['_expressionkeys']): recalcexpressions = True if recalcexpressions: exprmgr = _ExpressionFormat(nodek, node) - self._recalculate_expressions(nodek, formatter=exprmgr) + self._recalculate_expressions(nodek, formatter=exprmgr, node=node) self._bg_sync_to_file() def set_node_attributes(self, attribmap): @@ -706,11 +822,14 @@ class ConfigManager(object): # TODO(jbjohnso): multi mgr support, here if we have peers, # pickle the arguments and fire them off in eventlet # flows to peers, all should have the same result + newnodes = [] for node in attribmap.iterkeys(): + node = node.encode('utf-8') _mark_dirtykey('nodes', node, self.tenant) exprmgr = None _mark_dirtykey('nodes', node, self.tenant) if node not in self._cfgstore['nodes']: + newnodes.append(node) self._cfgstore['nodes'][node] = {} cfgobj = self._cfgstore['nodes'][node] recalcexpressions = False @@ -740,10 +859,18 @@ class ConfigManager(object): exprmgr = _ExpressionFormat(cfgobj, node) cfgobj[attrname] = _decode_attribute(attrname, cfgobj, formatter=exprmgr) + # if any code is watching these attributes, notify + # them of the change + self._notif_attribwatchers([attrname], [node]) if recalcexpressions: if exprmgr is None: exprmgr = _ExpressionFormat(cfgobj, node) - self._recalculate_expressions(cfgobj, formatter=exprmgr) + self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node) + if newnodes: + if self.tenant in self._nodecollwatchers: + nodecollwatchers = self._nodecollwatchers[self.tenant] + for watcher in nodecollwatchers[self.tenant].itervalues(): + watcher(added=newnodes, deleting=[], configmanager=self) self._bg_sync_to_file() #TODO: wait for synchronization to suceed/fail??) @@ -821,18 +948,19 @@ class ConfigManager(object): cls._writepending = False return cls._sync_to_file() - def _recalculate_expressions(self, cfgobj, formatter): + def _recalculate_expressions(self, cfgobj, formatter, node): for key in cfgobj.iterkeys(): if not isinstance(cfgobj[key],dict): continue if 'expression' in cfgobj[key]: cfgobj[key] = _decode_attribute(key, cfgobj, formatter=formatter) + self._notif_attribwatchers([key], [node]) elif ('cryptvalue' not in cfgobj[key] and 'value' not in cfgobj[key]): # recurse for nested structures, with some hint tha # it might indeed be a nested structure - _recalculate_expressions(cfgobj[key], formatter) + _recalculate_expressions(cfgobj[key], formatter, node) try: