2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-03-26 12:13:30 +00:00

Fix some async gaps

This commit is contained in:
Jarrod Johnson
2026-02-27 13:37:32 -05:00
parent 9d85a3c993
commit 580dd283cc
3 changed files with 6 additions and 6 deletions

View File

@@ -1333,7 +1333,7 @@ def _addchange(changeset, node, attrname):
changeset[node][attrname] = 1
def hook_new_configmanagers(callback):
async def hook_new_configmanagers(callback):
"""Register callback for new tenants
From the point when this function is called until the end,
@@ -1345,10 +1345,10 @@ def hook_new_configmanagers(callback):
:returns: identifier that can be used to cancel this registration
"""
#TODO(jbjohnso): actually live up to the promise of ongoing callbacks
callback(ConfigManager(None))
await callback(ConfigManager(None))
try:
for tenant in _cfgstore['tenant']:
callback(ConfigManager(tenant))
await callback(ConfigManager(tenant))
except KeyError:
pass

View File

@@ -589,7 +589,7 @@ def _nodechange(added, deleting, renamed, configmanager):
tasks.spawn(connect_node(node, configmanager))
def _start_tenant_sessions(cfm):
async def _start_tenant_sessions(cfm):
nodeattrs = cfm.get_node_attributes(cfm.list_nodes(), 'collective.manager')
for node in nodeattrs:
manager = nodeattrs[node].get('collective.manager', {}).get('value',
@@ -616,7 +616,7 @@ async def initialize():
async def start_console_sessions():
configmodule.hook_new_configmanagers(_start_tenant_sessions)
await configmodule.hook_new_configmanagers(_start_tenant_sessions)
async def connect_node(node, configmanager, username=None, direct=True, width=80,

View File

@@ -90,7 +90,7 @@ def create(nodes, element, configmanager, inputdata):
yield msg.ConfluentNodeError(node, "Not Implemented")
def retrieve(nodes, element, configmanager, inputdata):
async def retrieve(nodes, element, configmanager, inputdata):
results = asyncio.Queue()
workers = set([])
if element == ["power", "state"]: