From 131fa052e0730e0d79854f39ba425c5d81ab7566 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 8 Apr 2026 15:13:42 -0400 Subject: [PATCH] Rework merge to be async friendly --- confluent_server/confluent/config/configmanager.py | 7 ++----- confluent_server/confluent/selfservice.py | 4 +--- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 626ba03d..e3dbd696 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -1026,12 +1026,9 @@ async def del_collective_member(name): _true_del_collective_member(name) if cfgstreams: _hasquorum = has_quorum() - pushes = eventlet.GreenPool() payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False) - for _ in pushes.starmap( - _push_rpc, - [(cfgstreams[s]['stream'], payload) for s in cfgstreams]): - pass + # Check health of collective prior to attempting + await asyncio.gather(*[_push_rpc(cfgstreams[s]['stream'], payload) for s in cfgstreams]) def _true_del_collective_member(name, sync=True): global cfgleader diff --git a/confluent_server/confluent/selfservice.py b/confluent_server/confluent/selfservice.py index edd46e85..682ff084 100644 --- a/confluent_server/confluent/selfservice.py +++ b/confluent_server/confluent/selfservice.py @@ -541,9 +541,7 @@ async def handle_request(req, make_response, mimetype): fname = '/var/lib/confluent/private/os/{}/{}'.format(profile, fname) fullpath = os.path.abspath(fname) if not fullpath.startswith('/var/lib/confluent/private/os/{}/'.format(profile)): - start_response('400 Bad Request', ()) - yield 'Bad Request' - return + return await make_response(mimetype, 400, 'Bad Request', body='Bad Request') try: with open(fname, 'rb') as privdata: return await make_response(mimetype, 200, 'OK', body=privdata.read())