From a09792f969f8ceefebce2c72a5ee77a92a0a0fd1 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 19 Jul 2018 15:49:05 -0400 Subject: [PATCH] Schedule periodic attempts to restart collective If collective is lost due to connectivity, this will cause occasional attempts to bring it back. --- confluent_server/confluent/collective/manager.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 868b1aed..1c6d118e 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -35,6 +35,7 @@ except ImportError: currentleader = None cfginitlock = None follower = None +retrythread = None class ContextBool(object): def __init__(self): @@ -168,6 +169,7 @@ def get_myname(): def handle_connection(connection, cert, request, local=False): global currentleader + global retrythread operation = request['operation'] if cert: cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) @@ -340,6 +342,9 @@ def handle_connection(connection, cert, request, local=False): connection.getpeername()[0]) connection.close() return + if retrythread: + retrythread.cancel() + retrythread = None cfm.update_collective_address(request['name'], connection.getpeername()[0]) tlvdata.send(connection, cfm._dump_keys(None, False)) @@ -409,6 +414,7 @@ def startup(): def start_collective(): global follower + global retrythread if follower: follower.kill() follower = None @@ -422,10 +428,7 @@ def start_collective(): if connect_to_leader(name=myname, leader=ldrcandidate): break else: - for member in cfm.list_collective(): - if member == myname: - continue - eventlet.spawn_n(try_assimilate, - cfm.get_collective_member(member)['address']) + retrythread = eventlet.spawn_after(30 + random.random(), + start_collective)