mirror of
https://github.com/xcat2/confluent.git
synced 2026-04-11 03:11:32 +00:00
Rework merge to be async friendly
This commit is contained in:
@@ -1026,12 +1026,9 @@ async def del_collective_member(name):
|
||||
_true_del_collective_member(name)
|
||||
if cfgstreams:
|
||||
_hasquorum = has_quorum()
|
||||
pushes = eventlet.GreenPool()
|
||||
payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False)
|
||||
for _ in pushes.starmap(
|
||||
_push_rpc,
|
||||
[(cfgstreams[s]['stream'], payload) for s in cfgstreams]):
|
||||
pass
|
||||
# Check health of collective prior to attempting
|
||||
await asyncio.gather(*[_push_rpc(cfgstreams[s]['stream'], payload) for s in cfgstreams])
|
||||
|
||||
def _true_del_collective_member(name, sync=True):
|
||||
global cfgleader
|
||||
|
||||
@@ -541,9 +541,7 @@ async def handle_request(req, make_response, mimetype):
|
||||
fname = '/var/lib/confluent/private/os/{}/{}'.format(profile, fname)
|
||||
fullpath = os.path.abspath(fname)
|
||||
if not fullpath.startswith('/var/lib/confluent/private/os/{}/'.format(profile)):
|
||||
start_response('400 Bad Request', ())
|
||||
yield 'Bad Request'
|
||||
return
|
||||
return await make_response(mimetype, 400, 'Bad Request', body='Bad Request')
|
||||
try:
|
||||
with open(fname, 'rb') as privdata:
|
||||
return await make_response(mimetype, 200, 'OK', body=privdata.read())
|
||||
|
||||
Reference in New Issue
Block a user