2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-05-01 04:47:45 +00:00

Honor 'Done' message to avoid incurring a delay after task is done.

This commit is contained in:
Jarrod Johnson
2026-04-29 09:57:36 -04:00
parent 34bc45aa9e
commit 27b951b7cb
2 changed files with 27 additions and 22 deletions

View File

@@ -405,21 +405,23 @@ async def perform_requests(operator, nodes, element, cfg, inputdata, realop):
bundle = []
datum = await asyncio.wait_for(resultdata.get(), timeout=10.0)
while datum:
if datum != 'Done':
if isinstance(datum, Exception):
raise datum
if (hasattr(datum, 'kvpairs') and datum.kvpairs and
len(datum.kvpairs) == 1):
bundle.append((list(datum.kvpairs)[0], datum))
numnodes -= 1
else:
yield datum
timeout = 0.1 if numnodes else 0.001
datum = await asyncio.wait_for(resultdata.get(), timeout=timeout)
if datum == 'Done':
datum = None
break
if isinstance(datum, Exception):
raise datum
if (hasattr(datum, 'kvpairs') and datum.kvpairs and
len(datum.kvpairs) == 1):
bundle.append((list(datum.kvpairs)[0], datum))
numnodes -= 1
else:
yield datum
timeout = 0.1 if numnodes else 0.001
datum = await asyncio.wait_for(resultdata.get(), timeout=timeout)
except asyncio.QueueEmpty:
pass
except asyncio.TimeoutError:
print("odd timeout?" + repr(element) + repr(nodes))
# the timeout is just a sign to push out pending bundled stuff
pass
finally:
for datum in sorted(

View File

@@ -266,21 +266,24 @@ async def perform_requests(operator, nodes, element, cfg, inputdata, realop):
bundle = []
datum = await asyncio.wait_for(resultdata.get(), timeout=10.0)
while datum:
if datum != 'Done':
if isinstance(datum, Exception):
raise datum
if (hasattr(datum, 'kvpairs') and datum.kvpairs and
len(datum.kvpairs) == 1):
bundle.append((list(datum.kvpairs)[0], datum))
numnodes -= 1
else:
yield datum
if datum == 'Done':
datum = None
break
if isinstance(datum, Exception):
raise datum
if (hasattr(datum, 'kvpairs') and datum.kvpairs and
len(datum.kvpairs) == 1):
bundle.append((list(datum.kvpairs)[0], datum))
numnodes -= 1
else:
yield datum
timeout = 0.1 if numnodes else 0.001
datum = await asyncio.wait_for(resultdata.get(), timeout=timeout)
except asyncio.QueueEmpty:
pass
except asyncio.TimeoutError:
print("odd timeout?" + repr(element) + repr(nodes))
# the timeout is just a sign to push out pending bundled stuff
pass
finally:
for datum in sorted(
bundle, key=lambda x: util.naturalize_string(x[0])):