diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 1a0b0152..efc5f630 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -61,6 +61,7 @@ import cPickle import errno import eventlet import eventlet.event as event +import eventlet.green.select as select import eventlet.green.threading as gthread import fnmatch import json @@ -80,6 +81,7 @@ _masterintegritykey = None _dirtylock = threading.RLock() _leaderlock = gthread.RLock() _synclock = threading.RLock() +_rpclock = gthread.RLock() _followerlocks = {} _config_areas = ('nodegroups', 'nodes', 'usergroups', 'users') tracelog = None @@ -311,8 +313,10 @@ def init_masterkey(password=None, autogen=True): def _push_rpc(stream, payload): - stream.sendall(struct.pack('!Q', len(payload))) - stream.sendall(payload) + with _rpclock: + stream.sendall(struct.pack('!Q', len(payload))) + if len(payload): + stream.sendall(payload) def decrypt_value(cryptvalue, @@ -491,10 +495,8 @@ def relay_slaved_requests(name, listener): except Exception: pass cfgstreams[name] = listener - try: - msg = listener.recv(8) - except Exception: - msg = None + lh = StreamHandler(listener) + msg = lh.get_next_msg() while msg: if name not in cfgstreams: raise Exception("Unexpected loss of node in followers: " + name) @@ -511,7 +513,7 @@ def relay_slaved_requests(name, listener): if 'xid' in rpc: _push_rpc(listener, cPickle.dumps({'xid': rpc['xid']})) try: - msg = listener.recv(8) + msg = lh.get_next_msg() except Exception: msg = None finally: @@ -527,6 +529,35 @@ def relay_slaved_requests(name, listener): stop_following(True) +class StreamHandler(object): + def __init__(self, sock): + self.sock = sock + self.keepalive = confluent.util.monotonic_time() + 20 + self.expiry = self.keepalive + 40 + + + def get_next_msg(self): + r = (False,) + try: + while not r[0]: + r = select.select( + (self.sock,), (), (), + self.keepalive - confluent.util.monotonic_time()) + if confluent.util.monotonic_time() > self.expiry: + return None + if confluent.util.monotonic_time() > self.keepalive: + _push_rpc(self.sock, b'') # nulls are a keepalive + self.keepalive = confluent.util.monotonic_time() + 20 + self.expiry = confluent.util.monotonic_time() + 60 + msg = self.sock.recv(8) + except Exception: + msg = None + return msg + + def close(self): + self.sock = None + + def stop_following(replacement=None): with _leaderlock: global cfgleader @@ -598,10 +629,8 @@ def follow_channel(channel): global _txcount stop_leading() stop_following(channel) - try: - msg = channel.recv(8) - except Exception: - msg = None + lh = StreamHandler(channel) + msg = lh.get_next_msg() while msg: sz = struct.unpack('!Q', msg)[0] if sz != 0: @@ -618,10 +647,8 @@ def follow_channel(channel): globals()[rpc['function']](*rpc['args']) if 'xid' in rpc and rpc['xid']: _pendingchangesets[rpc['xid']].send() - try: - msg = channel.recv(8) - except Exception: - msg = None + _push_rpc(channel, b'') # use null as ACK + msg = lh.get_next_msg() # mark the connection as broken if cfgstreams: stop_following(None)