From 39cd8a3bcbcd311d1f5d817baff3b735b2366ee1 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 15 Apr 2026 09:58:57 -0400 Subject: [PATCH] Correct async style in various parts of configmanager and dependent core --- .../confluent/config/configmanager.py | 26 ++++++++--------- confluent_server/confluent/core.py | 28 +++++++++---------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index e3dbd696..9d875400 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -1655,7 +1655,7 @@ class ConfigManager(object): :param attributemap: The mapping of keys to values to set """ if cfgleader: - return exec_on_leader('_rpc_master_set_usergroup', self.tenant, + return await exec_on_leader('_rpc_master_set_usergroup', self.tenant, groupname, attributemap) if cfgstreams: await exec_on_followers('_rpc_set_usergroup', self.tenant, groupname, @@ -1686,7 +1686,7 @@ class ConfigManager(object): "Administrator" """ if cfgleader: - return exec_on_leader('_rpc_master_create_usergroup', self.tenant, + return await exec_on_leader('_rpc_master_create_usergroup', self.tenant, groupname, role) if cfgstreams: await exec_on_followers('_rpc_create_usergroup', self.tenant, groupname, @@ -1715,7 +1715,7 @@ class ConfigManager(object): async def del_usergroup(self, name): if cfgleader: - return exec_on_leader('_rpc_master_del_usergroup', self.tenant, name) + return await exec_on_leader('_rpc_master_del_usergroup', self.tenant, name) if cfgstreams: await exec_on_followers('_rpc_del_usergroup', self.tenant, name) self._true_del_usergroup(name) @@ -1733,7 +1733,7 @@ class ConfigManager(object): :param attributemap: A dict of key values to set """ if cfgleader: - return exec_on_leader('_rpc_master_set_user', self.tenant, name, + return await exec_on_leader('_rpc_master_set_user', self.tenant, name, attributemap) if cfgstreams: await exec_on_followers('_rpc_set_user', self.tenant, name, attributemap) @@ -1766,7 +1766,7 @@ class ConfigManager(object): async def del_user(self, name): if cfgleader: - return exec_on_leader('_rpc_master_del_user', self.tenant, name) + return await exec_on_leader('_rpc_master_del_user', self.tenant, name) if cfgstreams: await exec_on_followers('_rpc_del_user', self.tenant, name) self._true_del_user(name) @@ -2004,8 +2004,8 @@ class ConfigManager(object): continue # next node, this node already in self._node_added_to_group(node, group, changeset) - def add_group_attributes(self, attribmap): - self.set_group_attributes(attribmap, autocreate=True) + async def add_group_attributes(self, attribmap): + await self.set_group_attributes(attribmap, autocreate=True) async def set_group_attributes(self, attribmap, autocreate=False, merge="replace", keydata=None, skipped=None): for group in attribmap: @@ -2025,8 +2025,8 @@ class ConfigManager(object): if 'expression' in curr[attrib]: ExpressionChecker().format(curr[attrib]['expression']) if cfgleader: # currently config slave to another - return exec_on_leader('_rpc_master_set_group_attributes', - self.tenant, attribmap, autocreate) + return await exec_on_leader('_rpc_master_set_group_attributes', + self.tenant, attribmap, autocreate) if cfgstreams: await exec_on_followers('_rpc_set_group_attributes', self.tenant, attribmap, autocreate) @@ -2155,7 +2155,7 @@ class ConfigManager(object): async def clear_group_attributes(self, groups, attributes): if cfgleader: - return exec_on_leader('_rpc_master_clear_group_attributes', + return await exec_on_leader('_rpc_master_clear_group_attributes', self.tenant, groups, attributes) if cfgstreams: await exec_on_followers('_rpc_clear_group_attributes', self.tenant, @@ -2330,7 +2330,7 @@ class ConfigManager(object): async def del_groups(self, groups): if cfgleader: - return exec_on_leader('_rpc_master_del_groups', self.tenant, + return await exec_on_leader('_rpc_master_del_groups', self.tenant, groups) if cfgstreams: await exec_on_followers('_rpc_del_groups', self.tenant, groups) @@ -2349,7 +2349,7 @@ class ConfigManager(object): async def clear_node_attributes(self, nodes, attributes, warnings=None): if cfgleader: - mywarnings = exec_on_leader('_rpc_master_clear_node_attributes', + mywarnings = await exec_on_leader('_rpc_master_clear_node_attributes', self.tenant, nodes, attributes) if mywarnings and warnings is not None: warnings.extend(mywarnings) @@ -2456,7 +2456,7 @@ class ConfigManager(object): async def rename_nodegroups(self, renamemap): if cfgleader: - return exec_on_leader('_rpc_master_rename_nodegroups', self.tenant, renamemap) + return await exec_on_leader('_rpc_master_rename_nodegroups', self.tenant, renamemap) if cfgstreams: await exec_on_followers('_rpc_rename_nodegroups', self.tenant, renamemap) self._true_rename_groups(renamemap) diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index e00c1616..597f23c5 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -748,19 +748,19 @@ async def iterate_resources(fancydict): yield msg.ChildCollection(resource) -def delete_user(user, configmanager): - configmanager.del_user(user) +async def delete_user(user, configmanager): + await configmanager.del_user(user) yield msg.DeletedResource(user) -def delete_usergroup(usergroup, configmanager): - configmanager.del_usergroup(usergroup) +async def delete_usergroup(usergroup, configmanager): + await configmanager.del_usergroup(usergroup) yield msg.DeletedResource(usergroup) -def delete_nodegroup_collection(collectionpath, configmanager): +async def delete_nodegroup_collection(collectionpath, configmanager): if len(collectionpath) == 2: # just the nodegroup group = collectionpath[-1] - configmanager.del_groups([group]) + await configmanager.del_groups([group]) yield msg.DeletedResource(group) else: raise Exception("Not implemented") @@ -808,7 +808,7 @@ def enumerate_node_collection(collectionpath, configmanager): return iterate_resources(collection) -def create_group(inputdata, configmanager): +async def create_group(inputdata, configmanager): try: groupname = inputdata['name'] del inputdata['name'] @@ -816,7 +816,7 @@ def create_group(inputdata, configmanager): except KeyError: raise exc.InvalidArgumentException() try: - configmanager.add_group_attributes(attribmap) + await configmanager.add_group_attributes(attribmap) except ValueError as e: raise exc.InvalidArgumentException(str(e)) yield msg.CreatedResource(groupname) @@ -861,7 +861,7 @@ async def enumerate_collections(collections): yield msg.ChildCollection(collection) -def handle_nodegroup_request(configmanager, inputdata, +async def handle_nodegroup_request(configmanager, inputdata, pathcomponents, operation): iscollection = False routespec = None @@ -1464,16 +1464,16 @@ async def handle_path(path, operation, configmanager, inputdata=None, autostrip= if not pathcomponents: # root collection list return enumerate_collections(rootcollections) elif pathcomponents[0] == 'noderange': - return await handle_node_request(configmanager, inputdata, operation, + return await handle_node_request(configmanager, inputdata, operation, pathcomponents, autostrip) elif pathcomponents[0] == 'deployment': - return handle_deployment(configmanager, inputdata, pathcomponents, + return await handle_deployment(configmanager, inputdata, pathcomponents, operation) elif pathcomponents[0] == 'storage': - return handle_storage(configmanager, inputdata, pathcomponents, + return await handle_storage(configmanager, inputdata, pathcomponents, operation) elif pathcomponents[0] == 'nodegroups': - return handle_nodegroup_request(configmanager, inputdata, + return await handle_nodegroup_request(configmanager, inputdata, pathcomponents, operation) elif pathcomponents[0] == 'nodes': @@ -1542,7 +1542,7 @@ async def handle_path(path, operation, configmanager, inputdata=None, autostrip= inputdata = msg.get_input_message( pathcomponents, operation, inputdata, configmanager=configmanager) - update_user(user, inputdata.attribs, configmanager) + await update_user(user, inputdata.attribs, configmanager) return show_user(user, configmanager) elif pathcomponents[0] == 'events': try: