From 9bcca6bfadb9fb3e930f06f020591aed2a4896be Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 19 Jul 2018 17:08:20 -0400 Subject: [PATCH] Provide collective show on all members --- .../confluent/collective/manager.py | 61 +++++++++++++++---- 1 file changed, 49 insertions(+), 12 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index fd1b6f36..99245d19 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -191,20 +191,31 @@ def handle_connection(connection, cert, request, local=False): {'collective': {'error': 'Collective does not have quorum'}}) return - collinfo = {} if follower: - collinfo['leader'] = cfm.get_collective_member_by_address( - currentleader)['name'] + linfo = cfm.get_collective_member_by_address(currentleader) + remote = socket.create_connection((currentleader, 13001)) + remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, + keyfile='/etc/confluent/privkey.pem', + certfile='/etc/confluent/srvcert.pem') + cert = remote.getpeercert(binary_form=True) + if not (linfo and util.cert_matches( + linfo['fingerprint'], + cert)): + remote.close() + tlvdata.send(connection, + {'error': 'Invalid certificate, ' + 'redo invitation process'}) + connection.close() + return + tlvdata.recv(remote) # ignore banner + tlvdata.recv(remote) # ignore authpassed: 0 + tlvdata.send(remote, + {'collective': {'operation': 'getinfo', + 'name': get_myname()}}) + collinfo = tlvdata.recv(remote) else: - iam = get_myname() - collinfo['leader'] = iam - collinfo['active'] = list(cfm.cfgstreams) - activemembers = set(cfm.cfgstreams) - activemembers.add(iam) - collinfo['offline'] = [] - for member in cfm.list_collective(): - if member not in activemembers: - collinfo['offline'].append(member) + collinfo = {} + populate_collinfo(collinfo) tlvdata.send(connection, {'collective': collinfo}) return if 'invite' == operation: @@ -325,6 +336,19 @@ def handle_connection(connection, cert, request, local=False): leader=connection.getpeername()[0]) tlvdata.send(connection, {'status': 0}) connection.close() + if 'getinfo' == operation: + drone = request['name'] + droneinfo = cfm.get_collective_member(drone) + if not (droneinfo and util.cert_matches(droneinfo['fingerprint'], + cert)): + tlvdata.send(connection, + {'error': 'Invalid certificate, ' + 'redo invitation process'}) + connection.close() + return + collinfo = {} + populate_collinfo(collinfo) + tlvdata.send(connection, collinfo) if 'connect' == operation: myself = connection.getsockname()[0] if myself != get_leader(connection): @@ -376,6 +400,19 @@ def handle_connection(connection, cert, request, local=False): # ok, we have a connecting member whose certificate checks out # He needs to bootstrap his configuration and subscribe it to updates + +def populate_collinfo(collinfo): + iam = get_myname() + collinfo['leader'] = iam + collinfo['active'] = list(cfm.cfgstreams) + activemembers = set(cfm.cfgstreams) + activemembers.add(iam) + collinfo['offline'] = [] + for member in cfm.list_collective(): + if member not in activemembers: + collinfo['offline'].append(member) + + def try_assimilate(drone): try: remote = connect_to_collective(None, drone)