2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-06-18 01:20:47 +00:00

Fix for potential hangs on race condition with task exit

This commit is contained in:
Jarrod Johnson
2025-04-08 14:10:40 -04:00
parent 0e3543c4aa
commit 672bc73756
2 changed files with 17 additions and 8 deletions
@@ -59,9 +59,12 @@ def update(nodes, element, configmanager, inputdata):
for encmgr in baysbyencmgr:
gp.spawn_n(reseat_bays, encmgr, baysbyencmgr[encmgr], configmanager, rspq)
while gp.running():
nrsp = rspq.get()
if nrsp is not None:
yield nrsp
try:
nrsp = rspq.get(timeout=0.1)
if nrsp is not None:
yield nrsp
except queue.Empty:
continue
while not rspq.empty():
nrsp = rspq.get()
if nrsp is not None:
@@ -53,9 +53,12 @@ def retrieve(nodes, element, configmanager, inputdata):
for pdu in relpdus:
gp.spawn(readpdu, pdu, relpdus[pdu], configmanager, rspq)
while gp.running():
nrsp = rspq.get()
if not isinstance(nrsp, TaskDone):
try:
nrsp = rspq.get(timeout=0.1)
if nrsp is not None and not isinstance(nrsp, TaskDone):
yield nrsp
except queue.Empty:
continue
while not rspq.empty():
nrsp = rspq.get()
if not isinstance(nrsp, TaskDone):
@@ -115,9 +118,12 @@ def update(nodes, element, configmanager, inputdata):
for pdu in relpdus:
gp.spawn(updatepdu, pdu, relpdus[pdu], configmanager, inputdata, rspq)
while gp.running():
nrsp = rspq.get()
if not isinstance(nrsp, TaskDone):
yield nrsp
try:
nrsp = rspq.get(timeout=0.1)
if nrsp is not None and not isinstance(nrsp, TaskDone):
yield nrsp
except queue.Empty:
continue
while not rspq.empty():
nrsp = rspq.get()
if not isinstance(nrsp, TaskDone):