diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 0f4a91e0..420dda28 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -159,17 +159,87 @@ def _do_notifier(cfg, watcher, callback): logException() +def _rpc_master_set_user(tenant, name, attributemap): + ConfigManager(tenant).set_user(name, attributemap) + + +def _rpc_set_user(tenant, name, attributemap): + ConfigManager(tenant)._true_set_user(name, attributemap) + + def _rpc_master_set_node_attributes(tenant, attribmap, autocreate): - c = ConfigManager(tenant) - c.send_to_followers('_rpc_set_node_attributes', tenant, - attribmap, autocreate) - c._true_set_node_attributes(attribmap, autocreate) + ConfigManager(tenant).set_node_attributes(attribmap, autocreate) + + +def _rpc_master_set_group_attributes(tenant, attribmap, autocreate): + ConfigManager(tenant).set_group_attributes(attribmap, autocreate) + + +def _rpc_master_del_user(tenant, name): + ConfigManager(tenant).del_user(name) + + +def _rpc_del_user(tenant, name): + ConfigManager(tenant)._true_del_user(name) + + +def _rpc_master_create_user(tenant, *args): + ConfigManager(tenant).create_user(*args) + +def _rpc_create_user(tenant, *args): + ConfigManager(tenant)._true_create_user(*args) + +def _rpc_master_del_groups(tenant, groups): + ConfigManager(tenant).del_groups(groups) + +def _rpc_del_groups(tenant, groups): + ConfigManager(tenant)._true_del_groups(groups) + + +def _rpc_master_del_nodes(tenant, nodes): + ConfigManager(tenant).del_nodes(nodes) + + +def _rpc_del_nodes(tenant, nodes) + ConfigManager(tenant)._true_del_nodes(nodes) def _rpc_set_node_attributes(tenant, attribmap, autocreate): ConfigManager(tenant)._true_set_node_attributes(attribmap, autocreate) +def _rpc_set_group_attributes(tenant, attribmap, autocreate): + ConfigManager(tenant)._true_set_group_attributes(attribmap, autocreate) + +def exec_on_leader(function, *args): + xid = os.urandom(8) + while xid in _pendingchangesets: + xid = os.urandom(8) + _pendingchangesets[xid] = event.Event() + rpcpayload = cPickle.dumps({'function': function, 'args': args, + 'xid': xid}) + rpclen = len(rpcpayload) + cfgleader.sendall(struct.pack('!Q', rpclen)) + cfgleader.sendall(rpcpayload) + _pendingchangesets[xid].wait() + del _pendingchangesets[xid] + return + + +def exec_on_followers(fnname, *args): + global _txcount + _txcount += 1 + if len(cfgstreams) < (len(_cfgstore['collective']) // 2) : + # the leader counts in addition to registered streams + raise Exception("collective does not have quorum") + pushes = eventlet.GreenPool() + payload = cPickle.dumps({'function': fnname, 'args': args, + 'txcount': _txcount}) + for res in pushes.starmap( + _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): + pass + + def logException(): global tracelog if tracelog is None: @@ -903,6 +973,14 @@ class ConfigManager(object): :param name: The login name of the user :param attributemap: A dict of key values to set """ + if cfgleader: + return exec_on_leader('_rpc_master_set_user', self.tenant, name, + attributemap) + if cfgstreams: + exec_on_followers('_rpc_set_user', self.tenant, name) + self._true_set_user(name, attributemap) + + def _true_set_user(self, name, attributemap): user = self._cfgstore['users'][name] for attribute in attributemap: if attribute == 'password': @@ -919,6 +997,13 @@ class ConfigManager(object): self._bg_sync_to_file() def del_user(self, name): + if cfgleader: + return exec_on_leader('_rpc_master_del_user', name) + if cfgstreams: + exec_on_followers('_rpc_del_user', name) + self._true_del_user(name) + + def _true_del_user(self, name): if name in self._cfgstore['users']: del self._cfgstore['users'][name] _mark_dirtykey('users', name, self.tenant) @@ -936,6 +1021,16 @@ class ConfigManager(object): :param uid: Custom identifier number if desired. Defaults to random. :param displayname: Optional long format name for UI consumption """ + if cfgleader: + return exec_on_leader('_rpc_master_create_user', self.tenant, + name, role, uid, displayname, attributemap) + if cfgstreams: + exec_on_followers('_rpc_set_group_attributes', self.tenant, name, + role, uid, displayname, attributemap) + self._true_create_user(name, role, uid, displayname, attributemap) + + def _true_create_user(self, name, role="Administrator", uid=None, + displayname=None, attributemap=None): if 'idmap' not in _cfgstore['main']: _cfgstore['main']['idmap'] = {} if uid is None: @@ -1133,6 +1228,15 @@ class ConfigManager(object): self.set_group_attributes(attribmap, autocreate=True) def set_group_attributes(self, attribmap, autocreate=False): + if cfgleader: # currently config slave to another + return exec_on_leader('_rpc_master_set_group_attributes', + self.tenant, attribmap, autocreate) + if cfgstreams: + exec_on_followers('_rpc_set_group_attributes', self.tenant, + attribmap, autocreate) + self._true_set_group_attributes(attribmap, autocreate) + + def _true_set_group_attributes(self, attribmap, autocreate=False): changeset = {} for group in attribmap: if group == '': @@ -1326,8 +1430,15 @@ class ConfigManager(object): callback = watcher['callback'] eventlet.spawn_n(_do_notifier, self, watcher, callback) - def del_nodes(self, nodes): + if cfgleader: # slaved to a collective + return exec_on_loader('_rpc_master_del_nodes', self.tenant, + nodes) + if cfgstreams: + exec_on_followers('_rpc_del_nodes', self.tenant, nodes) + self._true_del_nodes(nodes) + + def _true_del_nodes(self, nodes): if self.tenant in self._nodecollwatchers: for watcher in self._nodecollwatchers[self.tenant].itervalues(): watcher(added=[], deleting=nodes, configmanager=self) @@ -1346,6 +1457,14 @@ class ConfigManager(object): self._bg_sync_to_file() def del_groups(self, groups): + if cfgleader: + return exec_on_leader('_rpc_master_del_groups', self.tenant, + groups) + if cfgstreams: + exec_on_followers('_rpc_del_groups', self.tenant, groups) + self._true_del_groups(groups) + + def _true_del_groups(self, groups): changeset = {} for group in groups: if group in self._cfgstore['nodegroups']: @@ -1397,39 +1516,12 @@ class ConfigManager(object): attribmap[node]['groups'] = [] self.set_node_attributes(attribmap, autocreate=True) - def set_leader_node_attributes(self, attribmap, autocreate): - xid = os.urandom(8) - while xid in _pendingchangesets: - xid = os.urandom(8) - _pendingchangesets[xid] = event.Event() - rpcpayload = cPickle.dumps({'function': '_rpc_master_set_node_attributes', - 'args': (self.tenant, attribmap, - autocreate), 'xid': xid}) - rpclen = len(rpcpayload) - cfgleader.sendall(struct.pack('!Q', rpclen)) - cfgleader.sendall(rpcpayload) - _pendingchangesets[xid].wait() - del _pendingchangesets[xid] - return - - def send_to_followers(self, fnname, *args): - global _txcount - _txcount += 1 - if len(cfgstreams) < (len(_cfgstore['collective']) // 2) : - # the leader counts in addition to registered streams - raise Exception("collective does not have quorum") - pushes = eventlet.GreenPool() - payload = cPickle.dumps({'function': fnname, 'args': args, - 'txcount': _txcount}) - for res in pushes.starmap( - _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): - pass - def set_node_attributes(self, attribmap, autocreate=False): if cfgleader: # currently config slave to another - return self.set_leader_node_attributes(attribmap, autocreate) + return exec_on_leader('_rpc_master_set_node_attributes', + self.tenant, attribmap, autocreate) if cfgstreams: - self.send_to_followers('_rpc_set_node_attributes', + exec_on_followers('_rpc_set_node_attributes', self.tenant, attribmap, autocreate) self._true_set_node_attributes(attribmap, autocreate)