diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 6bfb532b..01c3e526 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -32,8 +32,10 @@ except ImportError: currentleader = None -def connect_to_leader(cert=None, name=None): - remote = socket.create_connection((currentleader, 13001)) +def connect_to_leader(cert=None, name=None, leader=None): + if leader is None: + leader = currentleader + remote = socket.create_connection((leader, 13001)) # TLS cert validation is custom and will not pass normal CA vetting # to override completely in the right place requires enormous effort, so just defer until after connect remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', @@ -41,7 +43,7 @@ def connect_to_leader(cert=None, name=None): if cert: fprint = util.get_fingerprint(cert) else: - collent = cfm.get_collective_member_by_address(currentleader) + collent = cfm.get_collective_member_by_address(leader) fprint = collent['fingerprint'] if not util.cert_matches(fprint, remote.getpeercert(binary_form=True)): # probably Janeway up to something @@ -51,6 +53,10 @@ def connect_to_leader(cert=None, name=None): tlvdata.send(remote, {'collective': {'operation': 'connect', 'name': name}}) keydata = tlvdata.recv(remote) + if 'error' in keydata: + if 'leader' in keydata: + return connect_to_leader(name=name, leader=keydata['leader']) + raise Exception(keydata['error']) colldata = tlvdata.recv(remote) globaldata = tlvdata.recv(remote) dbsize = tlvdata.recv(remote)['dbsize'] @@ -168,3 +174,19 @@ def get_leader(connection): if currentleader is None: currentleader = connection.getsockname()[0] return currentleader + +def startup(): + members = list(cfm.list_collective()) + if len(members) < 2: + # Not in collective mode, return + return + eventlet.spawn_n(start_collective) + +def start_collective(): + myname = socket.gethostname() + for member in members: + if member == myname: + continue + ldrcandidate = cfm.get_collective_member(member)['address'] + connect_to_leader(name=myname, leader=ldrcandidate) + diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index cf95d21a..7d5fea93 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -27,21 +27,6 @@ # encrypted fields do not support expressions, either as a source or # destination -#TODO: clustered mode -# In clustered case, only one instance is the 'master'. If some 'def set' -# is requested on a slave, it creates a transaction id and an event, firing it -# to master. It then waits on the event. When the master reflects the data -# back and that reflection data goes into memory, the wait will be satisfied -# this means that set on a slave will be much longer. -# the assumption is that only the calls to 'def set' need be pushed to/from -# master and all the implicit activity that ensues will pan out since -# the master is ensuring a strict ordering of transactions -# for missed transactions, transaction log will be used to track transactions -# transaction log can have a constrained size if we want, in which case full -# replication will trigger. -# uuid.uuid4() will be used for transaction ids - - # Note on the cryptography. Default behavior is mostly just to pave the # way to meaningful security. Root all potentially sensitive data in # one key. That key is in plain sight, so not meaningfully protected @@ -460,6 +445,9 @@ def add_collective_member(name, address, fingerprint): _cfgstore['collectivedirty'].add(name) ConfigManager._bg_sync_to_file() +def list_collective(): + return iter(_cfgstore['collective']) + def get_collective_member(name): return _cfgstore['collective'][name] diff --git a/confluent_server/confluent/main.py b/confluent_server/confluent/main.py index e84a8233..ea4601ae 100644 --- a/confluent_server/confluent/main.py +++ b/confluent_server/confluent/main.py @@ -33,6 +33,7 @@ import confluent.consoleserver as consoleserver import confluent.core as confluentcore import confluent.httpapi as httpapi import confluent.log as log +import confluent.collective.manager as collective try: import confluent.sockapi as sockapi except ImportError: @@ -228,6 +229,7 @@ def run(): _updatepidfile() signal.signal(signal.SIGINT, terminate) signal.signal(signal.SIGTERM, terminate) + collective.startup() if dbgif: oumask = os.umask(0077) try: