From ad0a7de1e3ba3fee1c24dec934678cb5b2cc2921 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 15 Mar 2018 18:59:49 -0400 Subject: [PATCH 01/45] Add utility function to get certificate from file This can be used to get our own certificate, for use in the multimanager membership establishment. --- confluent_server/confluent/util.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/confluent_server/confluent/util.py b/confluent_server/confluent/util.py index 6da69603..07bd25ed 100644 --- a/confluent_server/confluent/util.py +++ b/confluent_server/confluent/util.py @@ -24,6 +24,7 @@ import netifaces import os import re import socket +import ssl import struct @@ -99,6 +100,20 @@ def monotonic_time(): return os.times()[4] +def get_certificate_from_file(certfile): + cert = open(certfile, 'rb').read() + inpemcert = False + prunedcert = '' + for line in cert.split('\n'): + if '-----BEGIN CERTIFICATE-----' in line: + inpemcert = True + if inpemcert: + prunedcert += line + if '-----END CERTIFICATE-----' in line: + break + return ssl.PEM_cert_to_DER_cert(prunedcert) + + def get_fingerprint(certificate, algo='sha512'): if algo == 'sha256': return 'sha256$' + hashlib.sha256(certificate).hexdigest() From afd366f134872049950aadf84bb80d271f48cd39 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 15 Mar 2018 19:22:03 -0400 Subject: [PATCH 02/45] Create invitation management module This facilitates the generation of invitations and logistics of proving knowledge of the invitation and the integrity of the certificates. peercert is to be gotten through getpeercert(binary_form=True) and local cert through the util function to load from file, since we don't have another way of getting local certificate. --- .../confluent/multimanager/__init__.py | 0 .../confluent/multimanager/invites.py | 55 +++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 confluent_server/confluent/multimanager/__init__.py create mode 100644 confluent_server/confluent/multimanager/invites.py diff --git a/confluent_server/confluent/multimanager/__init__.py b/confluent_server/confluent/multimanager/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/confluent_server/confluent/multimanager/invites.py b/confluent_server/confluent/multimanager/invites.py new file mode 100644 index 00000000..72187ab4 --- /dev/null +++ b/confluent_server/confluent/multimanager/invites.py @@ -0,0 +1,55 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2018 Lenovo +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This handles the process of generating and tracking/validating invites + +import base64 +import hashlib +import hmac +import os +pending_invites = {} + +def create_server_invitation(servername): + invitation = os.urandom(66) + pending_invites[servername] = invitation + return base64.b64encode(invitation) + +def create_client_proof(invitation, mycert, peercert): + return hmac.new(invitation, peercert + mycert, hashlib.sha256).digest() + +def check_server_proof(invitation, mycert, peercert, proof): + validproof = hmac.new(invitation, mycert + peercert, hashlib.sha256 + ).digest() + return proof == validproof + +def check_client_proof(servername, mycert, peercert, proof): + invitation = pending_invites[servername] + validproof = hmac.new(invitation, mycert + peercert, hashlib.sha256 + ).digest() + if proof == validproof: + # We know that the client knew the secret, and that it measured our + # certificate, and thus calling code can bless the certificate, and + # we can forget the invitation + del pending_invites[servername] + # We now want to prove to the client that we also know the secret, + # and that we measured their certificate well + # Now to generate an answer...., reverse the cert order so our answer + # is different, but still proving things + return hmac.new(invitation, peercert + mycert, hashlib.sha256 + ).digest() + # The given proof did not verify the invitation + return False + From 27c1355a4f0a055c179e61f82c4b8e46cca63e91 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 24 Apr 2018 11:00:23 -0400 Subject: [PATCH 03/45] Add ability to get client certificates Unfortunately, to pull off the target user experience, we must register a custom client certificate validation to allow us to not require a CA. --- confluent_server/confluent/sockapi.py | 65 ++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 239137a6..fd0ecc41 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -57,6 +57,19 @@ except AttributeError: else: SO_PEERCRED = 17 +try: + # Python core TLS despite improvements still has no support for custom + # verify functions.... try to use PyOpenSSL where available to support + # client certificates with custom verify + import eventlet.green.OpenSSL.SSL as libssl + # further, not even pyopenssl exposes SSL_CTX_set_cert_verify_callback + # so we need to ffi that in using a strategy compatible with PyOpenSSL + import OpenSSL.SSL as libssln + from OpenSSL._util import ffi + import OpenSSL.crypto as crypto +except ImportError: + libssl = None + plainsocket = None class ClientConsole(object): @@ -86,11 +99,14 @@ def send_data(connection, data): raise -def sessionhdl(connection, authname, skipauth=False): +def sessionhdl(connection, authname, skipauth=False, cert=None): # For now, trying to test the console stuff, so let's just do n4. authenticated = False authdata = None cfm = None + if cert: + print(repr(crypto.dump_certificate(crypto.FILETYPE_ASN1, + cert))) if skipauth: authenticated = True cfm = configmanager.ConfigManager(tenant=None, username=authname) @@ -275,14 +291,51 @@ def _tlshandler(bind_host, bind_port): cnn, addr = plainsocket.accept() eventlet.spawn_n(_tlsstartup, cnn) +@ffi.callback("int(*)( X509_STORE_CTX *, void*)") +def verify_stub(store, misc): + return 1 def _tlsstartup(cnn): authname = None - cnn = ssl.wrap_socket(cnn, keyfile="/etc/confluent/privkey.pem", - certfile="/etc/confluent/srvcert.pem", - ssl_version=ssl.PROTOCOL_TLSv1, - server_side=True) - sessionhdl(cnn, authname) + cert = None + if libssl: + # most fully featured SSL function + ctx = libssl.Context(libssl.SSLv23_METHOD) + ctx.set_options(libssl.OP_NO_SSLv2 | libssl.OP_NO_SSLv3 | + libssl.OP_NO_TLSv1 | libssl.OP_NO_TLSv1_1 | + libssl.OP_CIPHER_SERVER_PREFERENCE) + ctx.set_cipher_list( + 'ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:' + 'ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384') + ctx.use_certificate_file('/etc/confluent/srvcert.pem') + ctx.use_privatekey_file('/etc/confluent/privkey.pem') + ctx.set_verify(libssln.VERIFY_PEER, lambda *args: True) + libssln._lib.SSL_CTX_set_cert_verify_callback(ctx._context, + verify_stub, ffi.NULL) + cnn = libssl.Connection(ctx, cnn) + cnn.set_accept_state() + cnn.do_handshake() + cert = cnn.get_peer_certificate() + else: + try: + # Try relatively newer python TLS function + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.options |= ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 + ctx.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 + ctx.options |= ssl.OP_CIPHER_SERVER_PREFERENCE + ctx.set_ciphers( + 'ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:' + 'ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384') + ctx.load_cert_chain('/etc/confluent/srvcert.pem', + '/etc/confluent/privkey.pem') + cnn = ctx.wrap_socket(cnn, server_side=True) + except AttributeError: + # Python 2.6 era, go with best effort + cnn = ssl.wrap_socket(cnn, keyfile="/etc/confluent/privkey.pem", + certfile="/etc/confluent/srvcert.pem", + ssl_version=ssl.PROTOCOL_TLSv1, + server_side=True) + sessionhdl(cnn, authname, cert=cert) def removesocket(): try: From f3a0ccbff88bb82ae0b50ee688fd762a16a9a64b Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 24 Apr 2018 12:59:24 -0400 Subject: [PATCH 04/45] Migrate 'multimanager' to 'swarm' It's easier to say 'swarm' and conveys the sense without confusion of 'cluster' mode. --- confluent_server/confluent/swarm/__init__.py | 0 confluent_server/confluent/swarm/invites.py | 55 ++++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 confluent_server/confluent/swarm/__init__.py create mode 100644 confluent_server/confluent/swarm/invites.py diff --git a/confluent_server/confluent/swarm/__init__.py b/confluent_server/confluent/swarm/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/confluent_server/confluent/swarm/invites.py b/confluent_server/confluent/swarm/invites.py new file mode 100644 index 00000000..72187ab4 --- /dev/null +++ b/confluent_server/confluent/swarm/invites.py @@ -0,0 +1,55 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2018 Lenovo +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This handles the process of generating and tracking/validating invites + +import base64 +import hashlib +import hmac +import os +pending_invites = {} + +def create_server_invitation(servername): + invitation = os.urandom(66) + pending_invites[servername] = invitation + return base64.b64encode(invitation) + +def create_client_proof(invitation, mycert, peercert): + return hmac.new(invitation, peercert + mycert, hashlib.sha256).digest() + +def check_server_proof(invitation, mycert, peercert, proof): + validproof = hmac.new(invitation, mycert + peercert, hashlib.sha256 + ).digest() + return proof == validproof + +def check_client_proof(servername, mycert, peercert, proof): + invitation = pending_invites[servername] + validproof = hmac.new(invitation, mycert + peercert, hashlib.sha256 + ).digest() + if proof == validproof: + # We know that the client knew the secret, and that it measured our + # certificate, and thus calling code can bless the certificate, and + # we can forget the invitation + del pending_invites[servername] + # We now want to prove to the client that we also know the secret, + # and that we measured their certificate well + # Now to generate an answer...., reverse the cert order so our answer + # is different, but still proving things + return hmac.new(invitation, peercert + mycert, hashlib.sha256 + ).digest() + # The given proof did not verify the invitation + return False + From c8e5808daf91f8232032d25ef9959e160b7c8ba0 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 24 Apr 2018 14:44:45 -0400 Subject: [PATCH 05/45] Further advance the swarm concept This marks the start of attempting to connect the invitation to sockets and using the invitation to measure the certificates as well as proving client knowledge of an invitation token. --- .../confluent/multimanager/__init__.py | 0 .../confluent/multimanager/invites.py | 55 ------------- confluent_server/confluent/sockapi.py | 13 +-- confluent_server/confluent/swarm/invites.py | 2 +- confluent_server/confluent/swarm/manager.py | 82 +++++++++++++++++++ 5 files changed, 90 insertions(+), 62 deletions(-) delete mode 100644 confluent_server/confluent/multimanager/__init__.py delete mode 100644 confluent_server/confluent/multimanager/invites.py create mode 100644 confluent_server/confluent/swarm/manager.py diff --git a/confluent_server/confluent/multimanager/__init__.py b/confluent_server/confluent/multimanager/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/confluent_server/confluent/multimanager/invites.py b/confluent_server/confluent/multimanager/invites.py deleted file mode 100644 index 72187ab4..00000000 --- a/confluent_server/confluent/multimanager/invites.py +++ /dev/null @@ -1,55 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2018 Lenovo -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This handles the process of generating and tracking/validating invites - -import base64 -import hashlib -import hmac -import os -pending_invites = {} - -def create_server_invitation(servername): - invitation = os.urandom(66) - pending_invites[servername] = invitation - return base64.b64encode(invitation) - -def create_client_proof(invitation, mycert, peercert): - return hmac.new(invitation, peercert + mycert, hashlib.sha256).digest() - -def check_server_proof(invitation, mycert, peercert, proof): - validproof = hmac.new(invitation, mycert + peercert, hashlib.sha256 - ).digest() - return proof == validproof - -def check_client_proof(servername, mycert, peercert, proof): - invitation = pending_invites[servername] - validproof = hmac.new(invitation, mycert + peercert, hashlib.sha256 - ).digest() - if proof == validproof: - # We know that the client knew the secret, and that it measured our - # certificate, and thus calling code can bless the certificate, and - # we can forget the invitation - del pending_invites[servername] - # We now want to prove to the client that we also know the secret, - # and that we measured their certificate well - # Now to generate an answer...., reverse the cert order so our answer - # is different, but still proving things - return hmac.new(invitation, peercert + mycert, hashlib.sha256 - ).digest() - # The given proof did not verify the invitation - return False - diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index fd0ecc41..ef5f11e1 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -44,7 +44,7 @@ import confluent.exceptions as exc import confluent.log as log import confluent.core as pluginapi import confluent.shellserver as shellserver - +import confluent.swarm.manager as swarm tracelog = None auditlog = None @@ -66,9 +66,9 @@ try: # so we need to ffi that in using a strategy compatible with PyOpenSSL import OpenSSL.SSL as libssln from OpenSSL._util import ffi - import OpenSSL.crypto as crypto except ImportError: libssl = None + ffi = None plainsocket = None @@ -104,9 +104,6 @@ def sessionhdl(connection, authname, skipauth=False, cert=None): authenticated = False authdata = None cfm = None - if cert: - print(repr(crypto.dump_certificate(crypto.FILETYPE_ASN1, - cert))) if skipauth: authenticated = True cfm = configmanager.ConfigManager(tenant=None, username=authname) @@ -119,6 +116,8 @@ def sessionhdl(connection, authname, skipauth=False, cert=None): while not authenticated: # prompt for name and passphrase send_data(connection, {'authpassed': 0}) response = tlvdata.recv(connection) + if 'swarm' in response: + return swarm.handle_connection(connection, cert, response['swarm']) authname = response['username'] passphrase = response['password'] # note(jbjohnso): here, we need to authenticate, but not @@ -134,6 +133,8 @@ def sessionhdl(connection, authname, skipauth=False, cert=None): cfm = authdata[1] send_data(connection, {'authpassed': 1}) request = tlvdata.recv(connection) + if 'swarm' in request and skipauth: + swarm.handle_connection(connection, None, request['swarm'], local=True) while request is not None: try: process_request( @@ -192,7 +193,7 @@ def process_request(connection, request, cfm, authdata, authname, skipauth): if operation == 'start': return start_term(authname, cfm, connection, params, path, authdata, skipauth) - elif operation == 'shutdown': + elif operation == 'shutdown' and skipauth: configmanager.ConfigManager.shutdown() else: hdlr = pluginapi.handle_path(path, operation, cfm, params) diff --git a/confluent_server/confluent/swarm/invites.py b/confluent_server/confluent/swarm/invites.py index 72187ab4..7945ee6d 100644 --- a/confluent_server/confluent/swarm/invites.py +++ b/confluent_server/confluent/swarm/invites.py @@ -25,7 +25,7 @@ pending_invites = {} def create_server_invitation(servername): invitation = os.urandom(66) pending_invites[servername] = invitation - return base64.b64encode(invitation) + return base64.b64encode(servername + '@' + invitation) def create_client_proof(invitation, mycert, peercert): return hmac.new(invitation, peercert + mycert, hashlib.sha256).digest() diff --git a/confluent_server/confluent/swarm/manager.py b/confluent_server/confluent/swarm/manager.py new file mode 100644 index 00000000..cbac994a --- /dev/null +++ b/confluent_server/confluent/swarm/manager.py @@ -0,0 +1,82 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2018 Lenovo +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import confluent.swarm.invites as invites +import confluent.tlvdata as tlvdata +import confluent.util as util +import eventlet.green.socket as socket +import eventlet.green.ssl as ssl +try: + import OpenSSL.crypto as crypto +except ImportError: + # while not always required, we use pyopenssl required for at least swarm + crypto = None + +swarmcerts = {} + + +def handle_connection(connection, cert, swarmrequest, local=False): + operation = swarmrequest['operation'] + if cert: + cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) + else: + if not local: + return + if 'invite' == operation: + name = swarmrequest['invite']['name'] + invitation = invites.create_server_invitation(name) + tlvdata.send(connection, {'swarm': {'invitation': invitation}}) + if 'join' == operation: + invitation = swarmrequest['invitation'] + invitation = base64.b64decode(invitation) + name, invitation = invitation.split('@') + host = swarmrequest['server'] + remote = socket.create_connection((host, 13001)) + # This isn't what it looks like. We do CERT_NONE to disable + # openssl verification, but then use the invitation as a + # shared secret to validate the certs as part of the join + # operation + remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, + keyfile='/etc/confluent/privkey.pem', + certfile='/etc/confluent/srvcert.pem') + mycert = util.get_certificate_from_file( + '/etc/confluent/srvcert.pem') + cert = remote.getpeercert(binary_form=True) + proof = base64.b64encode(invites.create_client_proof( + invitation, mycert, cert)) + tlvdata.recv(remote) # ignore banner + tlvdata.recv(remote) # ignore authpassed: 0 + tlvdata.send(remote, {'swarm': {'operation': 'joinchallenge', + 'name': name, 'hmac': proof}}) + rsp = tlvdata.recv(remote) + proof = rsp['swarm']['approval'] + j = invites.check_server_proof(invitation, mycert, cert, proof) + if not j: + return + if 'joinchallenge' == operation: + mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') + proof = base64.b64decode(swarmrequest['hmac']) + myrsp = invites.check_client_proof(swarmrequest['name'], mycert, + cert, proof) + if not myrsp: + connection.close() + return + myrsp = base64.b64encode(myrsp) + swarmcerts[swarmrequest['name']] = cert + tlvdata.send(connection, {'swarm': {'approval': myrsp}}) + clientready = tlvdata.recv(connection) + print(repr(clientready)) From a3de8b9374f9e0e0d5ea73114372e86661d8d02b Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 24 Apr 2018 15:58:40 -0400 Subject: [PATCH 06/45] Add swarm to setup.py Make sure the swarm content is actually installed. --- confluent_server/setup.py.tmpl | 1 + 1 file changed, 1 insertion(+) diff --git a/confluent_server/setup.py.tmpl b/confluent_server/setup.py.tmpl index 94b010f0..a9c056b3 100644 --- a/confluent_server/setup.py.tmpl +++ b/confluent_server/setup.py.tmpl @@ -15,6 +15,7 @@ setup( 'confluent/networking/', 'confluent/plugins/hardwaremanagement/', 'confluent/plugins/shell/', + 'confluent/swarm/', 'confluent/plugins/configuration/'], install_requires=['paramiko', 'pycrypto>=2.6', 'confluent_client>=0.1.0', 'eventlet', 'dnspython', 'netifaces', 'pyte', 'pysnmp', 'pyparsing', From 6b9aed37222089f539f675791eca5e5dfcab7360 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 25 Apr 2018 13:01:59 -0400 Subject: [PATCH 07/45] Afetr some feedback, rename it 'collective' --- .../confluent/{swarm => collective}/__init__.py | 0 .../confluent/{swarm => collective}/invites.py | 0 .../confluent/{swarm => collective}/manager.py | 10 +++++----- confluent_server/confluent/sockapi.py | 12 +++++++----- 4 files changed, 12 insertions(+), 10 deletions(-) rename confluent_server/confluent/{swarm => collective}/__init__.py (100%) rename confluent_server/confluent/{swarm => collective}/invites.py (100%) rename confluent_server/confluent/{swarm => collective}/manager.py (91%) diff --git a/confluent_server/confluent/swarm/__init__.py b/confluent_server/confluent/collective/__init__.py similarity index 100% rename from confluent_server/confluent/swarm/__init__.py rename to confluent_server/confluent/collective/__init__.py diff --git a/confluent_server/confluent/swarm/invites.py b/confluent_server/confluent/collective/invites.py similarity index 100% rename from confluent_server/confluent/swarm/invites.py rename to confluent_server/confluent/collective/invites.py diff --git a/confluent_server/confluent/swarm/manager.py b/confluent_server/confluent/collective/manager.py similarity index 91% rename from confluent_server/confluent/swarm/manager.py rename to confluent_server/confluent/collective/manager.py index cbac994a..1b64bc3f 100644 --- a/confluent_server/confluent/swarm/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -23,7 +23,7 @@ import eventlet.green.ssl as ssl try: import OpenSSL.crypto as crypto except ImportError: - # while not always required, we use pyopenssl required for at least swarm + # while not always required, we use pyopenssl required for at least collective crypto = None swarmcerts = {} @@ -39,7 +39,7 @@ def handle_connection(connection, cert, swarmrequest, local=False): if 'invite' == operation: name = swarmrequest['invite']['name'] invitation = invites.create_server_invitation(name) - tlvdata.send(connection, {'swarm': {'invitation': invitation}}) + tlvdata.send(connection, {'collective': {'invitation': invitation}}) if 'join' == operation: invitation = swarmrequest['invitation'] invitation = base64.b64decode(invitation) @@ -60,10 +60,10 @@ def handle_connection(connection, cert, swarmrequest, local=False): invitation, mycert, cert)) tlvdata.recv(remote) # ignore banner tlvdata.recv(remote) # ignore authpassed: 0 - tlvdata.send(remote, {'swarm': {'operation': 'joinchallenge', + tlvdata.send(remote, {'collective': {'operation': 'joinchallenge', 'name': name, 'hmac': proof}}) rsp = tlvdata.recv(remote) - proof = rsp['swarm']['approval'] + proof = rsp['collective']['approval'] j = invites.check_server_proof(invitation, mycert, cert, proof) if not j: return @@ -77,6 +77,6 @@ def handle_connection(connection, cert, swarmrequest, local=False): return myrsp = base64.b64encode(myrsp) swarmcerts[swarmrequest['name']] = cert - tlvdata.send(connection, {'swarm': {'approval': myrsp}}) + tlvdata.send(connection, {'collective': {'approval': myrsp}}) clientready = tlvdata.recv(connection) print(repr(clientready)) diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index ef5f11e1..044c371b 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -44,7 +44,7 @@ import confluent.exceptions as exc import confluent.log as log import confluent.core as pluginapi import confluent.shellserver as shellserver -import confluent.swarm.manager as swarm +import confluent.collective.manager as collective tracelog = None auditlog = None @@ -116,8 +116,9 @@ def sessionhdl(connection, authname, skipauth=False, cert=None): while not authenticated: # prompt for name and passphrase send_data(connection, {'authpassed': 0}) response = tlvdata.recv(connection) - if 'swarm' in response: - return swarm.handle_connection(connection, cert, response['swarm']) + if 'collective' in response: + return collective.handle_connection(connection, cert, + response['collective']) authname = response['username'] passphrase = response['password'] # note(jbjohnso): here, we need to authenticate, but not @@ -133,8 +134,9 @@ def sessionhdl(connection, authname, skipauth=False, cert=None): cfm = authdata[1] send_data(connection, {'authpassed': 1}) request = tlvdata.recv(connection) - if 'swarm' in request and skipauth: - swarm.handle_connection(connection, None, request['swarm'], local=True) + if 'collective' in request and skipauth: + collective.handle_connection(connection, None, request['collective'], + local=True) while request is not None: try: process_request( From a94a724fe03822a169830309d95032dbc9c6c3c8 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 25 Apr 2018 13:31:26 -0400 Subject: [PATCH 08/45] Rename swarm to collective in setup.py.tmpl --- confluent_server/setup.py.tmpl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/confluent_server/setup.py.tmpl b/confluent_server/setup.py.tmpl index a9c056b3..8e04f848 100644 --- a/confluent_server/setup.py.tmpl +++ b/confluent_server/setup.py.tmpl @@ -15,7 +15,7 @@ setup( 'confluent/networking/', 'confluent/plugins/hardwaremanagement/', 'confluent/plugins/shell/', - 'confluent/swarm/', + 'confluent/collective/', 'confluent/plugins/configuration/'], install_requires=['paramiko', 'pycrypto>=2.6', 'confluent_client>=0.1.0', 'eventlet', 'dnspython', 'netifaces', 'pyte', 'pysnmp', 'pyparsing', From 8246ebdd2b40e9988b0de450db4df95fe12d2390 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 25 Apr 2018 14:21:12 -0400 Subject: [PATCH 09/45] Fix invite process and unicode Unicode strings do not fit with our world view, make them bytes. --- .../confluent/collective/invites.py | 4 +++- .../confluent/collective/manager.py | 20 +++++++++---------- confluent_server/confluent/sockapi.py | 2 +- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/confluent_server/confluent/collective/invites.py b/confluent_server/confluent/collective/invites.py index 7945ee6d..ec1b1532 100644 --- a/confluent_server/confluent/collective/invites.py +++ b/confluent_server/confluent/collective/invites.py @@ -23,9 +23,10 @@ import os pending_invites = {} def create_server_invitation(servername): + servername = servername.encode('utf-8') invitation = os.urandom(66) pending_invites[servername] = invitation - return base64.b64encode(servername + '@' + invitation) + return base64.b64encode(servername + b'@' + invitation) def create_client_proof(invitation, mycert, peercert): return hmac.new(invitation, peercert + mycert, hashlib.sha256).digest() @@ -36,6 +37,7 @@ def check_server_proof(invitation, mycert, peercert, proof): return proof == validproof def check_client_proof(servername, mycert, peercert, proof): + servername = servername.encode('utf-8') invitation = pending_invites[servername] validproof = hmac.new(invitation, mycert + peercert, hashlib.sha256 ).digest() diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 1b64bc3f..b61b51d2 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -15,7 +15,7 @@ # limitations under the License. import base64 -import confluent.swarm.invites as invites +import confluent.collective.invites as invites import confluent.tlvdata as tlvdata import confluent.util as util import eventlet.green.socket as socket @@ -26,25 +26,25 @@ except ImportError: # while not always required, we use pyopenssl required for at least collective crypto = None -swarmcerts = {} +collcerts = {} -def handle_connection(connection, cert, swarmrequest, local=False): - operation = swarmrequest['operation'] +def handle_connection(connection, cert, request, local=False): + operation = request['operation'] if cert: cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) else: if not local: return if 'invite' == operation: - name = swarmrequest['invite']['name'] + name = request['name'] invitation = invites.create_server_invitation(name) tlvdata.send(connection, {'collective': {'invitation': invitation}}) if 'join' == operation: - invitation = swarmrequest['invitation'] + invitation = request['invitation'] invitation = base64.b64decode(invitation) name, invitation = invitation.split('@') - host = swarmrequest['server'] + host = request['server'] remote = socket.create_connection((host, 13001)) # This isn't what it looks like. We do CERT_NONE to disable # openssl verification, but then use the invitation as a @@ -69,14 +69,14 @@ def handle_connection(connection, cert, swarmrequest, local=False): return if 'joinchallenge' == operation: mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') - proof = base64.b64decode(swarmrequest['hmac']) - myrsp = invites.check_client_proof(swarmrequest['name'], mycert, + proof = base64.b64decode(request['hmac']) + myrsp = invites.check_client_proof(request['name'], mycert, cert, proof) if not myrsp: connection.close() return myrsp = base64.b64encode(myrsp) - swarmcerts[swarmrequest['name']] = cert + collcerts[request['name']] = cert tlvdata.send(connection, {'collective': {'approval': myrsp}}) clientready = tlvdata.recv(connection) print(repr(clientready)) diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 044c371b..c76f50b2 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -135,7 +135,7 @@ def sessionhdl(connection, authname, skipauth=False, cert=None): send_data(connection, {'authpassed': 1}) request = tlvdata.recv(connection) if 'collective' in request and skipauth: - collective.handle_connection(connection, None, request['collective'], + return collective.handle_connection(connection, None, request['collective'], local=True) while request is not None: try: From 619bbbca965a37dfa0ebaa93fc75fd6c4c7830fc Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 25 Apr 2018 16:47:42 -0400 Subject: [PATCH 10/45] Provide more feedback and fix some flow issues --- confluent_server/confluent/collective/invites.py | 2 ++ confluent_server/confluent/collective/manager.py | 9 +++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/confluent_server/confluent/collective/invites.py b/confluent_server/confluent/collective/invites.py index ec1b1532..94c0906f 100644 --- a/confluent_server/confluent/collective/invites.py +++ b/confluent_server/confluent/collective/invites.py @@ -38,6 +38,8 @@ def check_server_proof(invitation, mycert, peercert, proof): def check_client_proof(servername, mycert, peercert, proof): servername = servername.encode('utf-8') + if servername not in pending_invites: + return False invitation = pending_invites[servername] validproof = hmac.new(invitation, mycert + peercert, hashlib.sha256 ).digest() diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index b61b51d2..d1f35049 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -43,7 +43,7 @@ def handle_connection(connection, cert, request, local=False): if 'join' == operation: invitation = request['invitation'] invitation = base64.b64decode(invitation) - name, invitation = invitation.split('@') + name, invitation = invitation.split('@', 1) host = request['server'] remote = socket.create_connection((host, 13001)) # This isn't what it looks like. We do CERT_NONE to disable @@ -66,17 +66,22 @@ def handle_connection(connection, cert, request, local=False): proof = rsp['collective']['approval'] j = invites.check_server_proof(invitation, mycert, cert, proof) if not j: + tlvdata.send(connection, + {'errorcode': 500, + 'error': 'Response failed validation'}) return + tlvdata.send(remote, {'collective': 'success'}) if 'joinchallenge' == operation: mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') proof = base64.b64decode(request['hmac']) myrsp = invites.check_client_proof(request['name'], mycert, cert, proof) if not myrsp: + tlvdata.send(connection, {'error': 'Invalid token'}) connection.close() return myrsp = base64.b64encode(myrsp) - collcerts[request['name']] = cert tlvdata.send(connection, {'collective': {'approval': myrsp}}) clientready = tlvdata.recv(connection) print(repr(clientready)) + collcerts[request['name']] = cert From 1deb44021f4d714a4d3797e6f3e8ca10b3623bdb Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 25 Apr 2018 20:49:08 -0400 Subject: [PATCH 11/45] Fix encoding of the response proof The response was not decoded, causing it to always fail. --- confluent_server/confluent/collective/manager.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index d1f35049..669b4ed8 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -64,13 +64,16 @@ def handle_connection(connection, cert, request, local=False): 'name': name, 'hmac': proof}}) rsp = tlvdata.recv(remote) proof = rsp['collective']['approval'] + proof = base64.b64decode(proof) j = invites.check_server_proof(invitation, mycert, cert, proof) if not j: tlvdata.send(connection, {'errorcode': 500, 'error': 'Response failed validation'}) return - tlvdata.send(remote, {'collective': 'success'}) + tlvdata.send(remote, {'collective': {'success': True}}) + tlvdata.send(connection, {'collective': {'status': 'Success'}}) + #Ok, here start getting assimilated, connect to get the database and register for changes... if 'joinchallenge' == operation: mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') proof = base64.b64decode(request['hmac']) @@ -83,5 +86,6 @@ def handle_connection(connection, cert, request, local=False): myrsp = base64.b64encode(myrsp) tlvdata.send(connection, {'collective': {'approval': myrsp}}) clientready = tlvdata.recv(connection) - print(repr(clientready)) - collcerts[request['name']] = cert + if clientready.get('collective', {}).get('success', False): + collcerts[request['name']] = cert + # store certificate signature for the collective trust From d38d9204a73b93dc311721f7012f9f47abebb35f Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 26 Apr 2018 11:35:06 -0400 Subject: [PATCH 12/45] Ensure the invitation works out to even multiple of 3 bytes It's cosmetic, but a nice way to avoid '=' in the tokens. --- confluent_server/confluent/collective/invites.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/confluent_server/confluent/collective/invites.py b/confluent_server/confluent/collective/invites.py index 94c0906f..b635dbeb 100644 --- a/confluent_server/confluent/collective/invites.py +++ b/confluent_server/confluent/collective/invites.py @@ -24,7 +24,8 @@ pending_invites = {} def create_server_invitation(servername): servername = servername.encode('utf-8') - invitation = os.urandom(66) + randbytes = (3 - ((len(servername) + 2) % 3)) % 3 + 64 + invitation = os.urandom(randbytes) pending_invites[servername] = invitation return base64.b64encode(servername + b'@' + invitation) From de89803b9cbc58cddfde626fa69bb9cd916edc7b Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 26 Apr 2018 15:48:15 -0400 Subject: [PATCH 13/45] Persint collective info to disk Additionally, simplify the concluding steps of the join conversation. --- .../confluent/collective/manager.py | 27 +++++++------- .../confluent/config/configmanager.py | 36 +++++++++++++++++++ 2 files changed, 51 insertions(+), 12 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 669b4ed8..6cea50ea 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -16,6 +16,7 @@ import base64 import confluent.collective.invites as invites +import confluent.config.configmanager as cfm import confluent.tlvdata as tlvdata import confluent.util as util import eventlet.green.socket as socket @@ -26,8 +27,7 @@ except ImportError: # while not always required, we use pyopenssl required for at least collective crypto = None -collcerts = {} - +currentleader = None def handle_connection(connection, cert, request, local=False): operation = request['operation'] @@ -67,13 +67,8 @@ def handle_connection(connection, cert, request, local=False): proof = base64.b64decode(proof) j = invites.check_server_proof(invitation, mycert, cert, proof) if not j: - tlvdata.send(connection, - {'errorcode': 500, - 'error': 'Response failed validation'}) return - tlvdata.send(remote, {'collective': {'success': True}}) tlvdata.send(connection, {'collective': {'status': 'Success'}}) - #Ok, here start getting assimilated, connect to get the database and register for changes... if 'joinchallenge' == operation: mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') proof = base64.b64decode(request['hmac']) @@ -84,8 +79,16 @@ def handle_connection(connection, cert, request, local=False): connection.close() return myrsp = base64.b64encode(myrsp) - tlvdata.send(connection, {'collective': {'approval': myrsp}}) - clientready = tlvdata.recv(connection) - if clientready.get('collective', {}).get('success', False): - collcerts[request['name']] = cert - # store certificate signature for the collective trust + fprint = util.get_fingerprint(cert) + cfm.add_collective_member(request['name'], + connection.getpeername()[0], fprint) + tlvdata.send(connection, + {'collective': {'approval': myrsp, + 'leader': get_leader(connection)}}) + + +def get_leader(connection): + global currentleader + if currentleader is None: + currentleader = connection.getsockname()[0] + return currentleader diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 9d2087af..61bfbea8 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -363,6 +363,24 @@ def set_global(globalname, value): ConfigManager._bg_sync_to_file() +def add_collective_member(name, address, fingerprint): + try: + name = name.encode('utf-8') + except AttributeError: + pass + if _cfgstore is None: + init() + if 'collective' not in _cfgstore: + _cfgstore['collective'] = {} + _cfgstore['collective'][name] = {'address': address, + 'fingerprint': fingerprint} + with _dirtylock: + if 'collectivedirty' not in _cfgstore: + _cfgstore['collectivedirty'] = set([]) + _cfgstore['collectivedirty'].add(name) + ConfigManager._bg_sync_to_file() + + def _mark_dirtykey(category, key, tenant=None): if type(key) in (str, unicode): key = key.encode('utf-8') @@ -1521,6 +1539,8 @@ class ConfigManager(object): global _cfgstore _cfgstore = {} rootpath = cls._cfgdir + _load_dict_from_dbm(['collective'], os.path.join(rootpath, + "collective")) _load_dict_from_dbm(['globals'], os.path.join(rootpath, "globals")) for confarea in _config_areas: _load_dict_from_dbm(['main', confarea], os.path.join(rootpath, confarea)) @@ -1579,6 +1599,22 @@ class ConfigManager(object): del globalf[globalkey] finally: globalf.close() + if 'collectivedirty' in _cfgstore: + collectivef = dbm.open(os.path.join(cls._cfgdir, "collective"), + 'c', 384) + try: + with _dirtylock: + colls = copy.deepcopy(_cfgstore['collectivedirty']) + del _cfgstore['collectivedirty'] + for coll in colls: + if coll in _cfgstore['collective']: + collectivef[coll] = cPickle.dumps( + _cfgstore['collective'][coll]) + else: + if coll in collectivef: + del globalf[coll] + finally: + collectivef.close() if 'dirtykeys' in _cfgstore: with _dirtylock: currdirt = copy.deepcopy(_cfgstore['dirtykeys']) From 06fdc648b845f71cf943579acf0b2146a621fcfa Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 26 Apr 2018 16:03:03 -0400 Subject: [PATCH 14/45] Add collective info to DB backup Now persisted to disk *and* accessible to backup. --- confluent_server/confluent/config/configmanager.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 61bfbea8..05c2d520 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -1727,6 +1727,14 @@ def restore_db_from_directory(location, password): except IOError as e: if e.errno != 2: raise + try: + collective = json.load(open(os.path.join(location, 'collective.json'))) + for coll in collective: + add_collective_member(coll, collective[coll]['address'], + collective[coll]['fingerprint']) + except IOError as e: + if e.errno != 2: + raise with open(os.path.join(location, 'main.json'), 'r') as cfgfile: cfgdata = cfgfile.read() ConfigManager(tenant=None)._load_from_json(cfgdata) @@ -1737,6 +1745,10 @@ def dump_db_to_directory(location, password, redact=None, skipkeys=False): with open(os.path.join(location, 'keys.json'), 'w') as cfgfile: cfgfile.write(_dump_keys(password)) cfgfile.write('\n') + if 'collective' in _cfgstore: + with open(os.path.join(location, 'collective.json'), 'w') as cfgfile: + cfgfile.write(json.dumps(_cfgstore['collective'])) + cfgfile.write('\n') with open(os.path.join(location, 'main.json'), 'w') as cfgfile: cfgfile.write(ConfigManager(tenant=None)._dump_to_json(redact=redact)) cfgfile.write('\n') From c7b01e00b616672a7a1d55434942ce480c136191 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 26 Apr 2018 16:34:54 -0400 Subject: [PATCH 15/45] Begin the 'connect' collective operation First check if we are current leader, reject if not, then if cert is invalid, reject, then comes the TODO. --- .../confluent/collective/manager.py | 29 ++++++++++++++++--- .../confluent/config/configmanager.py | 2 ++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 6cea50ea..7b8f1a9b 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -24,12 +24,15 @@ import eventlet.green.ssl as ssl try: import OpenSSL.crypto as crypto except ImportError: - # while not always required, we use pyopenssl required for at least collective + # while not always required, we use pyopenssl required for at least + # collective crypto = None currentleader = None + def handle_connection(connection, cert, request, local=False): + global currentleader operation = request['operation'] if cert: cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) @@ -39,7 +42,8 @@ def handle_connection(connection, cert, request, local=False): if 'invite' == operation: name = request['name'] invitation = invites.create_server_invitation(name) - tlvdata.send(connection, {'collective': {'invitation': invitation}}) + tlvdata.send(connection, + {'collective': {'invitation': invitation}}) if 'join' == operation: invitation = request['invitation'] invitation = base64.b64decode(invitation) @@ -61,7 +65,7 @@ def handle_connection(connection, cert, request, local=False): tlvdata.recv(remote) # ignore banner tlvdata.recv(remote) # ignore authpassed: 0 tlvdata.send(remote, {'collective': {'operation': 'joinchallenge', - 'name': name, 'hmac': proof}}) + 'name': name, 'hmac': proof}}) rsp = tlvdata.recv(remote) proof = rsp['collective']['approval'] proof = base64.b64decode(proof) @@ -69,6 +73,7 @@ def handle_connection(connection, cert, request, local=False): if not j: return tlvdata.send(connection, {'collective': {'status': 'Success'}}) + currentleader = rsp['collective']['leader'] if 'joinchallenge' == operation: mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') proof = base64.b64decode(request['hmac']) @@ -85,7 +90,23 @@ def handle_connection(connection, cert, request, local=False): tlvdata.send(connection, {'collective': {'approval': myrsp, 'leader': get_leader(connection)}}) - + if 'connect' == operation: + myself = connection.getsockname()[0] + if myself != get_leader(connection): + tlvdata.send( + connection, + {'error': 'Cannot assimilate, our leader is ' + 'in another castle', 'leader': currentleader}) + return + drone = request['name'] + droneinfo = cfm.get_collective_member(drone) + if not util.cert_matches(droneinfo['fingerprint'], cert): + tlvdata.send(connection, + {'error': 'Invalid certificate,' + 'redo invitation process'}) + return + # ok, we have a connecting member whose certificate checks out + # He needs to bootstrap his configuration and subscribe it to updates def get_leader(connection): global currentleader diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 05c2d520..2bb83740 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -380,6 +380,8 @@ def add_collective_member(name, address, fingerprint): _cfgstore['collectivedirty'].add(name) ConfigManager._bg_sync_to_file() +def get_collective_member(name): + return _cfgstore['collective'][name] def _mark_dirtykey(category, key, tenant=None): if type(key) in (str, unicode): From a2a0b5de2cfd977822d821dc908ddd8fa9e2104a Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 30 Apr 2018 11:37:23 -0400 Subject: [PATCH 16/45] Rename 'joinchallenge' to 'enroll' Seems like a better word to use. --- confluent_server/confluent/collective/manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 7b8f1a9b..cb68a9d3 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -64,7 +64,7 @@ def handle_connection(connection, cert, request, local=False): invitation, mycert, cert)) tlvdata.recv(remote) # ignore banner tlvdata.recv(remote) # ignore authpassed: 0 - tlvdata.send(remote, {'collective': {'operation': 'joinchallenge', + tlvdata.send(remote, {'collective': {'operation': 'enroll', 'name': name, 'hmac': proof}}) rsp = tlvdata.recv(remote) proof = rsp['collective']['approval'] @@ -74,7 +74,7 @@ def handle_connection(connection, cert, request, local=False): return tlvdata.send(connection, {'collective': {'status': 'Success'}}) currentleader = rsp['collective']['leader'] - if 'joinchallenge' == operation: + if 'enroll' == operation: mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') proof = base64.b64decode(request['hmac']) myrsp = invites.check_client_proof(request['name'], mycert, From 1200f7b7a14ff2158f3d541daa5e5ab9bf2a7db7 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 30 Apr 2018 16:08:15 -0400 Subject: [PATCH 17/45] Add function to check address equivalence As we start needing to compare addresses, provide a central function to handle the various oddities associated with that. --- confluent_server/confluent/netutil.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/confluent_server/confluent/netutil.py b/confluent_server/confluent/netutil.py index d31a4a0d..28fc71c8 100644 --- a/confluent_server/confluent/netutil.py +++ b/confluent_server/confluent/netutil.py @@ -122,4 +122,28 @@ def get_prefix_len_for_ip(ip): nbits += 1 maskn = maskn << 1 & 0xffffffff return nbits - raise exc.NotImplementedException("Non local addresses not supported") \ No newline at end of file + raise exc.NotImplementedException("Non local addresses not supported") + +def addresses_match(addr1, addr2): + """Check two network addresses for similarity + + Is it zero padded in one place, not zero padded in another? Is one place by name and another by IP?? + Is one context getting a normal IPv4 address and another getting IPv4 in IPv6 notation? + This function examines the two given names, performing the required changes to compare them for equivalency + + :param addr1: + :param addr2: + :return: True if the given addresses refer to the same thing + """ + for addrinfo in socket.getaddrinfo(addr1, 0, 0, socket.SOCK_STREAM): + rootaddr1 = socket.inet_pton(addrinfo[0], addrinfo[4][0]) + if addrinfo[0] == socket.AF_INET6 and rootaddr1[:12] == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff': + # normalize to standard IPv4 + rootaddr1 = rootaddr1[-4:] + for otherinfo in socket.getaddrinfo(addr2, 0, 0, socket.SOCK_STREAM): + otheraddr = socket.inet_pton(otherinfo[0], otherinfo[4][0]) + if otherinfo[0] == socket.AF_INET6 and otheraddr[:12] == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff': + otheraddr = otheraddr[-4:] + if otheraddr == rootaddr1: + return True + return False From dbb50f08077de375b764672989f9bfda789c9168 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 30 Apr 2018 16:09:28 -0400 Subject: [PATCH 18/45] Add hooks for collective mode and refactor In support of config replication, need configmanager to do a few things --- .../confluent/config/configmanager.py | 38 ++++++++++++++----- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 2bb83740..b6bd2600 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -69,6 +69,7 @@ import confluent.config.conf as conf import confluent.log import confluent.noderange as noderange import confluent.util +import confluent.netutil as netutil import confluent.exceptions as exc import copy import cPickle @@ -362,6 +363,9 @@ def set_global(globalname, value): _cfgstore['globals'][globalname] = value ConfigManager._bg_sync_to_file() +cfgstreams = {} +def register_config_listener(name, listener): + cfgstreams[listener] = name def add_collective_member(name, address, fingerprint): try: @@ -372,7 +376,7 @@ def add_collective_member(name, address, fingerprint): init() if 'collective' not in _cfgstore: _cfgstore['collective'] = {} - _cfgstore['collective'][name] = {'address': address, + _cfgstore['collective'][name] = {'name': name, 'address': address, 'fingerprint': fingerprint} with _dirtylock: if 'collectivedirty' not in _cfgstore: @@ -383,6 +387,14 @@ def add_collective_member(name, address, fingerprint): def get_collective_member(name): return _cfgstore['collective'][name] + +def get_collective_member_by_address(address): + for name in _cfgstore.get('collective', {}): + currdrone = _cfgstore['collective'][name] + if netutil.addresses_match(address, currdrone['address']): + return currdrone + + def _mark_dirtykey(category, key, tenant=None): if type(key) in (str, unicode): key = key.encode('utf-8') @@ -1691,7 +1703,7 @@ def _restore_keys(jsond, password, newpassword=None): # At this point, we should have the key situation all sorted -def _dump_keys(password): +def _dump_keys(password, dojson=True): if _masterkey is None or _masterintegritykey is None: init_masterkey() cryptkey = _format_key(_masterkey, password=password) @@ -1708,8 +1720,10 @@ def _dump_keys(password): else: integritykey = '*unencrypted:{0}'.format(base64.b64encode( integritykey['unencryptedvalue'])) - return json.dumps({'cryptkey': cryptkey, 'integritykey': integritykey}, - sort_keys=True, indent=4, separators=(',', ': ')) + keydata = {'cryptkey': cryptkey, 'integritykey': integritykey} + if dojson: + return json.dumps(keydata, sort_keys=True, indent=4, separators=(',', ': ')) + return keydata def restore_db_from_directory(location, password): @@ -1754,11 +1768,7 @@ def dump_db_to_directory(location, password, redact=None, skipkeys=False): with open(os.path.join(location, 'main.json'), 'w') as cfgfile: cfgfile.write(ConfigManager(tenant=None)._dump_to_json(redact=redact)) cfgfile.write('\n') - bkupglobals = {} - for globvar in _cfgstore['globals']: - if globvar.endswith('_key'): - continue - bkupglobals[globvar] = _cfgstore['globals'][globvar] + bkupglobals = get_globals() if bkupglobals: json.dump(bkupglobals, open(os.path.join(location, 'globals.json'))) try: @@ -1772,6 +1782,16 @@ def dump_db_to_directory(location, password, redact=None, skipkeys=False): except OSError: pass + +def get_globals(): + bkupglobals = {} + for globvar in _cfgstore['globals']: + if globvar.endswith('_key'): + continue + bkupglobals[globvar] = _cfgstore['globals'][globvar] + return bkupglobals + + def init(stateless=False): global _cfgstore if stateless: From 196e8d0d588148691ee0dba0554f161f55ae9b94 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 30 Apr 2018 16:20:49 -0400 Subject: [PATCH 19/45] Draft for starting the databse replication Does not actually heed the data, or implement ongoing relay of data back and forth. --- .../confluent/collective/manager.py | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index cb68a9d3..fc143db6 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -31,6 +31,28 @@ except ImportError: currentleader = None +def connect_to_leader(): + remote = socket.create_connection((currentleader, 13001)) + # TLS cert validation is custom and will not pass normal CA vetting + # to override completely in the right place requires enormous effort, so just defer until after connect + remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', + certfile='/etc/confluent/srvcert.pem') + collent = cfm.get_collective_member_by_address(currentleader) + if not util.cert_matches(remote.getpeercert(binary_form=True), collent['fingerprint']): + raise Exception("Certificate mismatch in the collective") # probably Janeway up to something + tlvdata.send(remote, {'collective': {'operation': 'connect'}}) + keydata = tlvdata.recv(remote) + colldata = tlvdata.recv(remote) + globaldata = tlvdata.recv(remote) + dbsize = tlvdata.recv(remote)['dbsize'] + dbjson = '' + while (len(dbjson) < dbsize): + ndata = remote.recv(dbsize - len(dbjson)) + if not ndata: + raise Exception("Error doing initial DB transfer") + dbjson += ndata + + def handle_connection(connection, cert, request, local=False): global currentleader operation = request['operation'] @@ -74,6 +96,7 @@ def handle_connection(connection, cert, request, local=False): return tlvdata.send(connection, {'collective': {'status': 'Success'}}) currentleader = rsp['collective']['leader'] + eventlet.spawn_n(connect_to_leader()) if 'enroll' == operation: mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') proof = base64.b64decode(request['hmac']) @@ -105,6 +128,14 @@ def handle_connection(connection, cert, request, local=False): {'error': 'Invalid certificate,' 'redo invitation process'}) return + tlvdata.send(connection, cfm._dump_keys(None, False)) + tlvdata.send(connection, cfm._cfgstore['collective]']) + tlvdata.send(connection, cfm.get_globals()) + cfgdata = cfm.ConfigManager(None)._dump_to_json() + tlvdata.send(connection, {'dbsize': len(cfgdata)}) + connection.write(cfgdata) + tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, so far unused anyway + cfm.register_cfg_listener(drone, connection) # ok, we have a connecting member whose certificate checks out # He needs to bootstrap his configuration and subscribe it to updates From 1b17c42cae3eba5ab9c7b5da05038c2afe95adbb Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 1 May 2018 16:18:31 -0400 Subject: [PATCH 20/45] Fix backup of globals Globals failed to open the backup file as writable, causing failure if a global had been set. --- confluent_server/confluent/config/configmanager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index b6bd2600..21061088 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -1770,7 +1770,8 @@ def dump_db_to_directory(location, password, redact=None, skipkeys=False): cfgfile.write('\n') bkupglobals = get_globals() if bkupglobals: - json.dump(bkupglobals, open(os.path.join(location, 'globals.json'))) + json.dump(bkupglobals, open(os.path.join(location, 'globals.json'), + 'w')) try: for tenant in os.listdir( os.path.join(ConfigManager._cfgdir, '/tenants/')): From c8961377ed68c39746d058b2cc69b5cc5300a7c1 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 1 May 2018 16:51:57 -0400 Subject: [PATCH 21/45] Fix missing import of eventlet Unable to spawn the connect thread due to missing import. --- confluent_server/confluent/collective/manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index fc143db6..6865dd88 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -19,6 +19,7 @@ import confluent.collective.invites as invites import confluent.config.configmanager as cfm import confluent.tlvdata as tlvdata import confluent.util as util +import eventlet import eventlet.green.socket as socket import eventlet.green.ssl as ssl try: From ce4c72eae274d35584c39030cbf151ef1ce3cb76 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 3 May 2018 11:18:02 -0400 Subject: [PATCH 22/45] For enroll, track the remote cert special For reconnect, we will have collective objects. However at enroll time, we need to be special. --- confluent_server/confluent/collective/manager.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 6865dd88..580c587e 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -32,15 +32,20 @@ except ImportError: currentleader = None -def connect_to_leader(): +def connect_to_leader(cert=None): remote = socket.create_connection((currentleader, 13001)) # TLS cert validation is custom and will not pass normal CA vetting # to override completely in the right place requires enormous effort, so just defer until after connect remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', certfile='/etc/confluent/srvcert.pem') - collent = cfm.get_collective_member_by_address(currentleader) - if not util.cert_matches(remote.getpeercert(binary_form=True), collent['fingerprint']): - raise Exception("Certificate mismatch in the collective") # probably Janeway up to something + if cert: + fprint = util.get_fingerprint(cert) + else: + collent = cfm.get_collective_member_by_address(currentleader) + fprint = collent['fingerprint'] + if not util.cert_matches(fprint, remote.getpeercert(binary_form=True)): + # probably Janeway up to something + raise Exception("Certificate mismatch in the collective") tlvdata.send(remote, {'collective': {'operation': 'connect'}}) keydata = tlvdata.recv(remote) colldata = tlvdata.recv(remote) @@ -97,7 +102,7 @@ def handle_connection(connection, cert, request, local=False): return tlvdata.send(connection, {'collective': {'status': 'Success'}}) currentleader = rsp['collective']['leader'] - eventlet.spawn_n(connect_to_leader()) + eventlet.spawn_n(connect_to_leader, cert) if 'enroll' == operation: mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') proof = base64.b64decode(request['hmac']) From 855241a043360afc6af56ddf447be7fdd1e0334a Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 3 May 2018 13:18:08 -0400 Subject: [PATCH 23/45] Fixes to the connect draft Needed to track it's own name, skip the banner and auth message... --- confluent_server/confluent/collective/manager.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 580c587e..884abd1e 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -32,7 +32,7 @@ except ImportError: currentleader = None -def connect_to_leader(cert=None): +def connect_to_leader(cert=None, name=None): remote = socket.create_connection((currentleader, 13001)) # TLS cert validation is custom and will not pass normal CA vetting # to override completely in the right place requires enormous effort, so just defer until after connect @@ -46,7 +46,9 @@ def connect_to_leader(cert=None): if not util.cert_matches(fprint, remote.getpeercert(binary_form=True)): # probably Janeway up to something raise Exception("Certificate mismatch in the collective") - tlvdata.send(remote, {'collective': {'operation': 'connect'}}) + tlvdata.recv(remote) # the banner + tlvdata.recv(remote) # authpassed... 0.. + tlvdata.send(remote, {'collective': {'operation': 'connect', 'name': name}}) keydata = tlvdata.recv(remote) colldata = tlvdata.recv(remote) globaldata = tlvdata.recv(remote) @@ -102,7 +104,7 @@ def handle_connection(connection, cert, request, local=False): return tlvdata.send(connection, {'collective': {'status': 'Success'}}) currentleader = rsp['collective']['leader'] - eventlet.spawn_n(connect_to_leader, cert) + eventlet.spawn_n(connect_to_leader, cert, name) if 'enroll' == operation: mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') proof = base64.b64decode(request['hmac']) @@ -135,13 +137,13 @@ def handle_connection(connection, cert, request, local=False): 'redo invitation process'}) return tlvdata.send(connection, cfm._dump_keys(None, False)) - tlvdata.send(connection, cfm._cfgstore['collective]']) + tlvdata.send(connection, cfm._cfgstore['collective']) tlvdata.send(connection, cfm.get_globals()) cfgdata = cfm.ConfigManager(None)._dump_to_json() tlvdata.send(connection, {'dbsize': len(cfgdata)}) - connection.write(cfgdata) + connection.sendall(cfgdata) tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, so far unused anyway - cfm.register_cfg_listener(drone, connection) + cfm.register_config_listener(drone, connection) # ok, we have a connecting member whose certificate checks out # He needs to bootstrap his configuration and subscribe it to updates From ecfb4d68c5613dcae5b38a626b049a4b96f48dd8 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 3 May 2018 13:27:48 -0400 Subject: [PATCH 24/45] Add self to collective database Database would omit initial leader otherwise. --- confluent_server/confluent/collective/manager.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 884abd1e..09b54a0f 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -116,6 +116,9 @@ def handle_connection(connection, cert, request, local=False): return myrsp = base64.b64encode(myrsp) fprint = util.get_fingerprint(cert) + myfprint = util.get_fingerprint(mycert) + cfm.add_collective_member(socket.gethostname(), + connection.getsockname()[0], myfprint) cfm.add_collective_member(request['name'], connection.getpeername()[0], fprint) tlvdata.send(connection, From ecfc56efde2344b940529539b49f43d03355bd4c Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 3 May 2018 14:03:56 -0400 Subject: [PATCH 25/45] Actually execute replicate-on-connect This creates a duplicate of the leader. --- confluent_server/confluent/collective/manager.py | 6 ++++++ confluent_server/confluent/config/configmanager.py | 13 ++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 09b54a0f..422980fe 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -59,6 +59,12 @@ def connect_to_leader(cert=None, name=None): if not ndata: raise Exception("Error doing initial DB transfer") dbjson += ndata + cfm._restore_keys(keydata, None) + cfm._cfgstore['collective'] = colldata + for globvar in globaldata: + cfm.set_global(globvar, globaldata[globvar]) + cfm.ConfigManager(tenant=None)._load_from_json(dbjson) + cfm.ConfigManager._bg_sync_to_file() def handle_connection(connection, cert, request, local=False): diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 21061088..b49140c6 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -353,6 +353,13 @@ def set_global(globalname, value): """ if _cfgstore is None: init() + try: + globalname = globalname.encode('utf-8') + except AttributeError: + # We have to remove the unicode-ness of the string, + # but if it is already bytes in python 3, then we will + # get an attributeerror, so pass + pass with _dirtylock: if 'dirtyglobals' not in _cfgstore: _cfgstore['dirtyglobals'] = set() @@ -1684,7 +1691,10 @@ def _restore_keys(jsond, password, newpassword=None): # the file, and newpassword to use, (also check the service.cfg file) global _masterkey global _masterintegritykey - keydata = json.loads(jsond) + if isinstance(jsond, dict): + keydata = jsond + else: + keydata = json.loads(jsond) cryptkey = _parse_key(keydata['cryptkey'], password) integritykey = _parse_key(keydata['integritykey'], password) conf.init_config() @@ -1745,6 +1755,7 @@ def restore_db_from_directory(location, password): raise try: collective = json.load(open(os.path.join(location, 'collective.json'))) + _cfgstore['collective'] = {} for coll in collective: add_collective_member(coll, collective[coll]['address'], collective[coll]['fingerprint']) From 5abaddfe638cf8c1d403d6a8788b782eeaf519d6 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 4 May 2018 12:19:45 -0400 Subject: [PATCH 26/45] Clear configuration prior to sync --- confluent_server/confluent/collective/manager.py | 4 ++++ confluent_server/confluent/config/configmanager.py | 11 +++++++++++ 2 files changed, 15 insertions(+) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 422980fe..1c66158a 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -59,7 +59,11 @@ def connect_to_leader(cert=None, name=None): if not ndata: raise Exception("Error doing initial DB transfer") dbjson += ndata + cfm.clear_configuration() cfm._restore_keys(keydata, None) + for c in colldata: + cfm.add_collective_member(c, colldata[c]['address'], + colldata[c]['fingerprint']) cfm._cfgstore['collective'] = colldata for globvar in globaldata: cfm.set_global(globvar, globaldata[globvar]) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index b49140c6..bd787e97 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -374,6 +374,17 @@ cfgstreams = {} def register_config_listener(name, listener): cfgstreams[listener] = name +def clear_configuration(): + global _cfgstore + _cfgstore = {} + todelete = _config_areas + ('globals', 'collective') + for cfg in todelete: + try: + os.remove(os.path.join(ConfigManager._cfgdir, cfg)) + except OSError as oe: + pass + + def add_collective_member(name, address, fingerprint): try: name = name.encode('utf-8') From a78aa6816c51a721d25fa1686b86731896eb3a20 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 4 May 2018 15:16:30 -0400 Subject: [PATCH 27/45] Rough draft for ongoing syncronization Putting thoughts down on how xmit will work, will add recv and relay, do some testing, and then decide how much can be done to apply it neatly to the various points. --- .../confluent/collective/manager.py | 1 + .../confluent/config/configmanager.py | 49 ++++++++++++++++++- 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 1c66158a..00a17d2c 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -69,6 +69,7 @@ def connect_to_leader(cert=None, name=None): cfm.set_global(globvar, globaldata[globvar]) cfm.ConfigManager(tenant=None)._load_from_json(dbjson) cfm.ConfigManager._bg_sync_to_file() + cfm.set_leader_channel(remote) def handle_connection(connection, cert, request, local=False): diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index bd787e97..e2e24a9f 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -75,6 +75,7 @@ import copy import cPickle import errno import eventlet +import eventlet.event as event import fnmatch import json import operator @@ -82,6 +83,7 @@ import os import random import re import string +import struct import sys import threading import traceback @@ -94,6 +96,8 @@ _config_areas = ('nodegroups', 'nodes', 'usergroups', 'users') tracelog = None statelessmode = False _cfgstore = None +_pendingchangesets = {} +_txcount = 0 _attraliases = { 'bmc': 'hardwaremanagement.manager', @@ -372,7 +376,7 @@ def set_global(globalname, value): cfgstreams = {} def register_config_listener(name, listener): - cfgstreams[listener] = name + cfgstreams[name] = listener def clear_configuration(): global _cfgstore @@ -384,6 +388,10 @@ def clear_configuration(): except OSError as oe: pass +cfgleader = None +def set_leader_channel(channel): + global cfgleader + cfgleader = channel def add_collective_member(name, address, fingerprint): try: @@ -1338,7 +1346,46 @@ class ConfigManager(object): attribmap[node]['groups'] = [] self.set_node_attributes(attribmap, autocreate=True) + def set_leader_node_attributes(self, attribmap, autocreate): + xid = os.urandom(8) + while xid in _pendingchangesets: + xid = os.urandom(8) + _pendingchangesets[xid] = event.Event() + rpcpayload = cPickle.dumps({'function': 'set_node_attributes', + 'args': (attribmap, autocreate), + 'xid': xid}) + rpclen = len(rpcpayload) + cfgleader.sendall(struct.pack('!Q', rpclen)) + cfgleader.sendall(rpcpayload) + _pendingchangesets[xid].wait(0) + return + + def _push_rpc(self, stream, payload): + stream.sendall(struct.pack('!Q', len(payload))) + stream.sendall(payload) + + def set_follower_node_attributes(self, attribmap, autocreate): + global _txcount + _txcount += 1 + if len(cfgstreams) < (len(_cfgstore['collective']) // 2) : + # the leader counts in addition to registered streams + raise Exception("collective does not have quorum") + pushes = eventlet.GreenPool() + payload = cPickle.dumps({'function': '_set_node_attributes', + 'args': (attribmap, autocreate), + 'txcount': _txcount}) + for res in pushes.imap(_push_rpc, + [(payload, s) for s in cfgstreams]): + print(repr(res)) + def set_node_attributes(self, attribmap, autocreate=False): + if cfgleader: # currently config slave to another + return self.set_leader_node_attributes(attribmap, autocreate) + if cfgstreams: + self.set_follower_node_attributes(attribmap, autocreate) + self._set_node_attributes(attribmap, autocreate) + + def _set_node_attributes(self, attribmap, autocreate, xid=None): # TODO(jbjohnso): multi mgr support, here if we have peers, # pickle the arguments and fire them off in eventlet # flows to peers, all should have the same result From aec4e746e9e7f4362c3a39a0b3445b998651f120 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 4 May 2018 16:12:53 -0400 Subject: [PATCH 28/45] Apply changes from leader subscription --- .../confluent/collective/manager.py | 4 ++-- .../confluent/config/configmanager.py | 23 ++++++++++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 00a17d2c..807259d0 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -69,7 +69,7 @@ def connect_to_leader(cert=None, name=None): cfm.set_global(globvar, globaldata[globvar]) cfm.ConfigManager(tenant=None)._load_from_json(dbjson) cfm.ConfigManager._bg_sync_to_file() - cfm.set_leader_channel(remote) + cfm.follow_channel(remote) def handle_connection(connection, cert, request, local=False): @@ -156,7 +156,7 @@ def handle_connection(connection, cert, request, local=False): cfgdata = cfm.ConfigManager(None)._dump_to_json() tlvdata.send(connection, {'dbsize': len(cfgdata)}) connection.sendall(cfgdata) - tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, so far unused anyway + #tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, so far unused anyway cfm.register_config_listener(drone, connection) # ok, we have a connecting member whose certificate checks out # He needs to bootstrap his configuration and subscribe it to updates diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index e2e24a9f..ccb5489d 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -389,9 +389,26 @@ def clear_configuration(): pass cfgleader = None -def set_leader_channel(channel): +def follow_channel(channel): global cfgleader cfgleader = channel + msg = channel.recv(8) + while msg: + sz = struct.unpack('!Q', msg) + if sz != 0: + rpc = '' + while len(rpc) < sz: + nrpc = channel.recv(sz - len(rpc)) + if not nrpc: + raise Exception('Truncated message error') + rpc += nrpc + rpc = cPickle.loads(rpc) + locals()[rpc['function']](*rpc['args']) + _txcount = rpc['txcount'] + if 'xid' in rpc: + _pendingchangesets[rpc['xid']].send() + + def add_collective_member(name, address, fingerprint): try: @@ -1375,7 +1392,7 @@ class ConfigManager(object): 'args': (attribmap, autocreate), 'txcount': _txcount}) for res in pushes.imap(_push_rpc, - [(payload, s) for s in cfgstreams]): + [(s, payload) for s in cfgstreams]): print(repr(res)) def set_node_attributes(self, attribmap, autocreate=False): @@ -1385,7 +1402,7 @@ class ConfigManager(object): self.set_follower_node_attributes(attribmap, autocreate) self._set_node_attributes(attribmap, autocreate) - def _set_node_attributes(self, attribmap, autocreate, xid=None): + def _set_node_attributes(self, attribmap, autocreate): # TODO(jbjohnso): multi mgr support, here if we have peers, # pickle the arguments and fire them off in eventlet # flows to peers, all should have the same result From d11c716b6a2205d478181559f13350d87772546e Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 8 May 2018 11:33:42 -0400 Subject: [PATCH 29/45] Succeed in pushing config to followers from leader Still more work to be done for multiple transactions. --- .../confluent/config/configmanager.py | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index ccb5489d..c8cf99ed 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -174,6 +174,10 @@ def _do_notifier(cfg, watcher, callback): logException() +def _remote_set_node_attributes(tenant, attribmap, autocreate): + ConfigManager(tenant)._set_node_attributes(attribmap, autocreate) + + def logException(): global tracelog if tracelog is None: @@ -213,6 +217,11 @@ def init_masterkey(password=None): password=password)) +def _push_rpc(stream, payload): + stream.sendall(struct.pack('!Q', len(payload))) + stream.sendall(payload) + + def decrypt_value(cryptvalue, key=None, integritykey=None): @@ -394,7 +403,7 @@ def follow_channel(channel): cfgleader = channel msg = channel.recv(8) while msg: - sz = struct.unpack('!Q', msg) + sz = struct.unpack('!Q', msg)[0] if sz != 0: rpc = '' while len(rpc) < sz: @@ -403,7 +412,7 @@ def follow_channel(channel): raise Exception('Truncated message error') rpc += nrpc rpc = cPickle.loads(rpc) - locals()[rpc['function']](*rpc['args']) + globals()[rpc['function']](*rpc['args']) _txcount = rpc['txcount'] if 'xid' in rpc: _pendingchangesets[rpc['xid']].send() @@ -1377,10 +1386,6 @@ class ConfigManager(object): _pendingchangesets[xid].wait(0) return - def _push_rpc(self, stream, payload): - stream.sendall(struct.pack('!Q', len(payload))) - stream.sendall(payload) - def set_follower_node_attributes(self, attribmap, autocreate): global _txcount _txcount += 1 @@ -1388,11 +1393,11 @@ class ConfigManager(object): # the leader counts in addition to registered streams raise Exception("collective does not have quorum") pushes = eventlet.GreenPool() - payload = cPickle.dumps({'function': '_set_node_attributes', - 'args': (attribmap, autocreate), + payload = cPickle.dumps({'function': '_remote_set_node_attributes', + 'args': (self.tenant, attribmap, autocreate), 'txcount': _txcount}) - for res in pushes.imap(_push_rpc, - [(s, payload) for s in cfgstreams]): + for res in pushes.starmap( + _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): print(repr(res)) def set_node_attributes(self, attribmap, autocreate=False): From e5f553801b1df47932c65e1372448d6f05701e0a Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 8 May 2018 13:35:30 -0400 Subject: [PATCH 30/45] Fix reuse of channel for receiving changes --- confluent_server/confluent/collective/manager.py | 3 ++- confluent_server/confluent/config/configmanager.py | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 807259d0..15d69d63 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -48,7 +48,8 @@ def connect_to_leader(cert=None, name=None): raise Exception("Certificate mismatch in the collective") tlvdata.recv(remote) # the banner tlvdata.recv(remote) # authpassed... 0.. - tlvdata.send(remote, {'collective': {'operation': 'connect', 'name': name}}) + tlvdata.send(remote, {'collective': {'operation': 'connect', + 'name': name}}) keydata = tlvdata.recv(remote) colldata = tlvdata.recv(remote) globaldata = tlvdata.recv(remote) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index c8cf99ed..8f0db2b9 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -416,8 +416,7 @@ def follow_channel(channel): _txcount = rpc['txcount'] if 'xid' in rpc: _pendingchangesets[rpc['xid']].send() - - + msg = channel.recv(8) def add_collective_member(name, address, fingerprint): try: From c962d10222374bfb9d7f71ef2e789bd05ed62abf Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 8 May 2018 16:53:59 -0400 Subject: [PATCH 31/45] Allow slave collective drones to set It works (once), but needs xid fix. --- .../confluent/collective/manager.py | 5 +- .../confluent/config/configmanager.py | 54 +++++++++++++------ 2 files changed, 41 insertions(+), 18 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 15d69d63..6bfb532b 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -157,8 +157,9 @@ def handle_connection(connection, cert, request, local=False): cfgdata = cfm.ConfigManager(None)._dump_to_json() tlvdata.send(connection, {'dbsize': len(cfgdata)}) connection.sendall(cfgdata) - #tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, so far unused anyway - cfm.register_config_listener(drone, connection) + #tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, + # so far unused anyway + cfm.relay_slaved_requests(drone, connection) # ok, we have a connecting member whose certificate checks out # He needs to bootstrap his configuration and subscribe it to updates diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 8f0db2b9..5f92c591 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -1,4 +1,4 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 +7# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2014 IBM Corporation # Copyright 2015-2018 Lenovo @@ -174,8 +174,15 @@ def _do_notifier(cfg, watcher, callback): logException() -def _remote_set_node_attributes(tenant, attribmap, autocreate): - ConfigManager(tenant)._set_node_attributes(attribmap, autocreate) +def _rpc_master_set_node_attributes(tenant, attribmap, autocreate, xid): + c = ConfigManager(tenant) + c.send_to_followers(xid, '_rpc_set_node_attributes', tenant, + attribmap, autocreate) + c._true_set_node_attributes(attribmap, autocreate) + + +def _rpc_set_node_attributes(tenant, attribmap, autocreate): + ConfigManager(tenant)._true_set_node_attributes(attribmap, autocreate) def logException(): @@ -384,8 +391,22 @@ def set_global(globalname, value): ConfigManager._bg_sync_to_file() cfgstreams = {} -def register_config_listener(name, listener): +def relay_slaved_requests(name, listener): cfgstreams[name] = listener + msg = listener.recv(8) + while msg: + sz = struct.unpack('!Q', msg)[0] + if sz != 0: + rpc = '' + while len(rpc) < sz: + nrpc = listener.recv(sz - len(rpc)) + if not nrpc: + raise Exception('Truncated client error') + rpc += nrpc + rpc = cPickle.loads(rpc) + globals()[rpc['function']](*rpc['args']) + msg = listener.recv(8) + def clear_configuration(): global _cfgstore @@ -414,7 +435,7 @@ def follow_channel(channel): rpc = cPickle.loads(rpc) globals()[rpc['function']](*rpc['args']) _txcount = rpc['txcount'] - if 'xid' in rpc: + if 'xid' in rpc and rpc['xid']: _pendingchangesets[rpc['xid']].send() msg = channel.recv(8) @@ -1376,37 +1397,38 @@ class ConfigManager(object): while xid in _pendingchangesets: xid = os.urandom(8) _pendingchangesets[xid] = event.Event() - rpcpayload = cPickle.dumps({'function': 'set_node_attributes', - 'args': (attribmap, autocreate), - 'xid': xid}) + rpcpayload = cPickle.dumps({'function': '_rpc_master_set_node_attributes', + 'args': (self.tenant, attribmap, + autocreate, xid)}) rpclen = len(rpcpayload) cfgleader.sendall(struct.pack('!Q', rpclen)) cfgleader.sendall(rpcpayload) _pendingchangesets[xid].wait(0) + del _pendingchangesets[xid] return - def set_follower_node_attributes(self, attribmap, autocreate): + def send_to_followers(self, xid, fnname, *args): global _txcount _txcount += 1 if len(cfgstreams) < (len(_cfgstore['collective']) // 2) : # the leader counts in addition to registered streams raise Exception("collective does not have quorum") pushes = eventlet.GreenPool() - payload = cPickle.dumps({'function': '_remote_set_node_attributes', - 'args': (self.tenant, attribmap, autocreate), - 'txcount': _txcount}) + payload = cPickle.dumps({'function': fnname, 'args': args, + 'txcount': _txcount, 'xid': xid}) for res in pushes.starmap( _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): - print(repr(res)) + pass def set_node_attributes(self, attribmap, autocreate=False): if cfgleader: # currently config slave to another return self.set_leader_node_attributes(attribmap, autocreate) if cfgstreams: - self.set_follower_node_attributes(attribmap, autocreate) - self._set_node_attributes(attribmap, autocreate) + self.send_to_followers(None, '_rpc_set_node_attributes', + self.tenant, attribmap, autocreate) + self._true_set_node_attributes(attribmap, autocreate) - def _set_node_attributes(self, attribmap, autocreate): + def _true_set_node_attributes(self, attribmap, autocreate): # TODO(jbjohnso): multi mgr support, here if we have peers, # pickle the arguments and fire them off in eventlet # flows to peers, all should have the same result From 267d83e6e48e4ca96b76c2bcba0566e003f374e4 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 9 May 2018 17:01:23 -0400 Subject: [PATCH 32/45] Have attrib set wait on all collective members This will mean that it is reliable that a nodeattrib ; in delegation scenarios is guaranteed to execute in order. --- .../confluent/config/configmanager.py | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 5f92c591..cf95d21a 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -174,9 +174,9 @@ def _do_notifier(cfg, watcher, callback): logException() -def _rpc_master_set_node_attributes(tenant, attribmap, autocreate, xid): +def _rpc_master_set_node_attributes(tenant, attribmap, autocreate): c = ConfigManager(tenant) - c.send_to_followers(xid, '_rpc_set_node_attributes', tenant, + c.send_to_followers('_rpc_set_node_attributes', tenant, attribmap, autocreate) c._true_set_node_attributes(attribmap, autocreate) @@ -405,6 +405,8 @@ def relay_slaved_requests(name, listener): rpc += nrpc rpc = cPickle.loads(rpc) globals()[rpc['function']](*rpc['args']) + if 'xid' in rpc: + _push_rpc(listener, cPickle.dumps({'xid': rpc['xid']})) msg = listener.recv(8) @@ -433,8 +435,10 @@ def follow_channel(channel): raise Exception('Truncated message error') rpc += nrpc rpc = cPickle.loads(rpc) - globals()[rpc['function']](*rpc['args']) - _txcount = rpc['txcount'] + if 'function' in rpc: + globals()[rpc['function']](*rpc['args']) + if 'txcount' in rpc: + _txcount = rpc['txcount'] if 'xid' in rpc and rpc['xid']: _pendingchangesets[rpc['xid']].send() msg = channel.recv(8) @@ -1399,15 +1403,15 @@ class ConfigManager(object): _pendingchangesets[xid] = event.Event() rpcpayload = cPickle.dumps({'function': '_rpc_master_set_node_attributes', 'args': (self.tenant, attribmap, - autocreate, xid)}) + autocreate), 'xid': xid}) rpclen = len(rpcpayload) cfgleader.sendall(struct.pack('!Q', rpclen)) cfgleader.sendall(rpcpayload) - _pendingchangesets[xid].wait(0) + _pendingchangesets[xid].wait() del _pendingchangesets[xid] return - def send_to_followers(self, xid, fnname, *args): + def send_to_followers(self, fnname, *args): global _txcount _txcount += 1 if len(cfgstreams) < (len(_cfgstore['collective']) // 2) : @@ -1415,7 +1419,7 @@ class ConfigManager(object): raise Exception("collective does not have quorum") pushes = eventlet.GreenPool() payload = cPickle.dumps({'function': fnname, 'args': args, - 'txcount': _txcount, 'xid': xid}) + 'txcount': _txcount}) for res in pushes.starmap( _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): pass @@ -1424,7 +1428,7 @@ class ConfigManager(object): if cfgleader: # currently config slave to another return self.set_leader_node_attributes(attribmap, autocreate) if cfgstreams: - self.send_to_followers(None, '_rpc_set_node_attributes', + self.send_to_followers('_rpc_set_node_attributes', self.tenant, attribmap, autocreate) self._true_set_node_attributes(attribmap, autocreate) From 0477ab7d8531e1928383c42706626410d70e8810 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 10 May 2018 16:19:46 -0400 Subject: [PATCH 33/45] Check and try to start collective on startup Not yet good enough for a leader to rejoin, but enough for a follower to rejoin automatically. --- .../confluent/collective/manager.py | 28 +++++++++++++++++-- .../confluent/config/configmanager.py | 18 ++---------- confluent_server/confluent/main.py | 2 ++ 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 6bfb532b..01c3e526 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -32,8 +32,10 @@ except ImportError: currentleader = None -def connect_to_leader(cert=None, name=None): - remote = socket.create_connection((currentleader, 13001)) +def connect_to_leader(cert=None, name=None, leader=None): + if leader is None: + leader = currentleader + remote = socket.create_connection((leader, 13001)) # TLS cert validation is custom and will not pass normal CA vetting # to override completely in the right place requires enormous effort, so just defer until after connect remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', @@ -41,7 +43,7 @@ def connect_to_leader(cert=None, name=None): if cert: fprint = util.get_fingerprint(cert) else: - collent = cfm.get_collective_member_by_address(currentleader) + collent = cfm.get_collective_member_by_address(leader) fprint = collent['fingerprint'] if not util.cert_matches(fprint, remote.getpeercert(binary_form=True)): # probably Janeway up to something @@ -51,6 +53,10 @@ def connect_to_leader(cert=None, name=None): tlvdata.send(remote, {'collective': {'operation': 'connect', 'name': name}}) keydata = tlvdata.recv(remote) + if 'error' in keydata: + if 'leader' in keydata: + return connect_to_leader(name=name, leader=keydata['leader']) + raise Exception(keydata['error']) colldata = tlvdata.recv(remote) globaldata = tlvdata.recv(remote) dbsize = tlvdata.recv(remote)['dbsize'] @@ -168,3 +174,19 @@ def get_leader(connection): if currentleader is None: currentleader = connection.getsockname()[0] return currentleader + +def startup(): + members = list(cfm.list_collective()) + if len(members) < 2: + # Not in collective mode, return + return + eventlet.spawn_n(start_collective) + +def start_collective(): + myname = socket.gethostname() + for member in members: + if member == myname: + continue + ldrcandidate = cfm.get_collective_member(member)['address'] + connect_to_leader(name=myname, leader=ldrcandidate) + diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index cf95d21a..7d5fea93 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -27,21 +27,6 @@ # encrypted fields do not support expressions, either as a source or # destination -#TODO: clustered mode -# In clustered case, only one instance is the 'master'. If some 'def set' -# is requested on a slave, it creates a transaction id and an event, firing it -# to master. It then waits on the event. When the master reflects the data -# back and that reflection data goes into memory, the wait will be satisfied -# this means that set on a slave will be much longer. -# the assumption is that only the calls to 'def set' need be pushed to/from -# master and all the implicit activity that ensues will pan out since -# the master is ensuring a strict ordering of transactions -# for missed transactions, transaction log will be used to track transactions -# transaction log can have a constrained size if we want, in which case full -# replication will trigger. -# uuid.uuid4() will be used for transaction ids - - # Note on the cryptography. Default behavior is mostly just to pave the # way to meaningful security. Root all potentially sensitive data in # one key. That key is in plain sight, so not meaningfully protected @@ -460,6 +445,9 @@ def add_collective_member(name, address, fingerprint): _cfgstore['collectivedirty'].add(name) ConfigManager._bg_sync_to_file() +def list_collective(): + return iter(_cfgstore['collective']) + def get_collective_member(name): return _cfgstore['collective'][name] diff --git a/confluent_server/confluent/main.py b/confluent_server/confluent/main.py index e84a8233..ea4601ae 100644 --- a/confluent_server/confluent/main.py +++ b/confluent_server/confluent/main.py @@ -33,6 +33,7 @@ import confluent.consoleserver as consoleserver import confluent.core as confluentcore import confluent.httpapi as httpapi import confluent.log as log +import confluent.collective.manager as collective try: import confluent.sockapi as sockapi except ImportError: @@ -228,6 +229,7 @@ def run(): _updatepidfile() signal.signal(signal.SIGINT, terminate) signal.signal(signal.SIGTERM, terminate) + collective.startup() if dbgif: oumask = os.umask(0077) try: From 5087c8bed5b99085be06fb77286035c82ef829c4 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 10 May 2018 16:40:13 -0400 Subject: [PATCH 34/45] Try to persist name as myname hostname may not agree with the name chosen by user, in such a case persist the name and use that, falling back to gethostname() as needed. --- .../confluent/collective/manager.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 01c3e526..30bb464a 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -78,6 +78,15 @@ def connect_to_leader(cert=None, name=None, leader=None): cfm.ConfigManager._bg_sync_to_file() cfm.follow_channel(remote) +def get_myname(): + try: + with open('/etc/confluent/cfg/myname', 'r') as f: + return f.read() + except IOError: + myname = socket.gethostname() + with open('/etc/confluenut/cfg/myname', 'w') as f: + f.write(myname) + return myname def handle_connection(connection, cert, request, local=False): global currentleader @@ -122,6 +131,9 @@ def handle_connection(connection, cert, request, local=False): return tlvdata.send(connection, {'collective': {'status': 'Success'}}) currentleader = rsp['collective']['leader'] + f = open('/etc/confluent/cfg/myname', 'w') + f.write(name) + f.close() eventlet.spawn_n(connect_to_leader, cert, name) if 'enroll' == operation: mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') @@ -135,7 +147,7 @@ def handle_connection(connection, cert, request, local=False): myrsp = base64.b64encode(myrsp) fprint = util.get_fingerprint(cert) myfprint = util.get_fingerprint(mycert) - cfm.add_collective_member(socket.gethostname(), + cfm.add_collective_member(get_myname(), connection.getsockname()[0], myfprint) cfm.add_collective_member(request['name'], connection.getpeername()[0], fprint) @@ -183,8 +195,8 @@ def startup(): eventlet.spawn_n(start_collective) def start_collective(): - myname = socket.gethostname() - for member in members: + myname = get_myname() + for member in cfm.list_collective(): if member == myname: continue ldrcandidate = cfm.get_collective_member(member)['address'] From e98ecd9867680c0d038c0a5d328dab2fde96fc80 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 10 May 2018 16:46:29 -0400 Subject: [PATCH 35/45] Correct spelling error --- confluent_server/confluent/collective/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 30bb464a..90a5e55b 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -84,7 +84,7 @@ def get_myname(): return f.read() except IOError: myname = socket.gethostname() - with open('/etc/confluenut/cfg/myname', 'w') as f: + with open('/etc/confluent/cfg/myname', 'w') as f: f.write(myname) return myname From 7fa431dbc9d7c44d79fcd1fd5e9513313219ad7c Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 11 May 2018 11:53:45 -0400 Subject: [PATCH 36/45] Persist the transactioncount Needed for eventually ascertaining the viability in selecting leader. --- confluent_server/confluent/config/configmanager.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 7d5fea93..4b34151d 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -446,7 +446,7 @@ def add_collective_member(name, address, fingerprint): ConfigManager._bg_sync_to_file() def list_collective(): - return iter(_cfgstore['collective']) + return iter(_cfgstore.get('collective', ())) def get_collective_member(name): return _cfgstore['collective'][name] @@ -1651,8 +1651,14 @@ class ConfigManager(object): @classmethod def _read_from_path(cls): global _cfgstore + global _txcount _cfgstore = {} rootpath = cls._cfgdir + try: + with open(os.path.join(rootpath, 'transactioncount', 'r')) as f: + _txcount = struct.unpack('!Q', f.read())[0] + except IOError: + pass _load_dict_from_dbm(['collective'], os.path.join(rootpath, "collective")) _load_dict_from_dbm(['globals'], os.path.join(rootpath, "globals")) @@ -1697,6 +1703,8 @@ class ConfigManager(object): def _sync_to_file(cls): if statelessmode: return + with open(os.path.join(cls._cfgdir, 'transactioncount'), 'w') as f: + f.write(struct.pack('!Q', _txcount)) if 'dirtyglobals' in _cfgstore: with _dirtylock: dirtyglobals = copy.deepcopy(_cfgstore['dirtyglobals']) From 58155a47b58eadb787877259a97c5521683a2308 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 11 May 2018 13:49:54 -0400 Subject: [PATCH 37/45] Add --json to nodeinventory Have nodeinventory have an option to output in json. --- confluent_client/bin/nodeinventory | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/confluent_client/bin/nodeinventory b/confluent_client/bin/nodeinventory index 1cc99e72..d7a1acb7 100755 --- a/confluent_client/bin/nodeinventory +++ b/confluent_client/bin/nodeinventory @@ -16,6 +16,7 @@ # limitations under the License. import codecs +import json import optparse import os import re @@ -89,9 +90,10 @@ url = '/noderange/{0}/inventory/hardware/all/all' argparser = optparse.OptionParser( usage="Usage: %prog [serial|model|uuid|mac]") +argparser.add_option('-j', '--json', action='store_true', help='Output JSON') (options, args) = argparser.parse_args() try: - noderange = sys.argv[1] + noderange = args[0] except IndexError: argparser.print_help() sys.exit(1) @@ -114,6 +116,8 @@ if len(args) > 1: filters.append(re.compile('mac address')) url = '/noderange/{0}/inventory/hardware/all/all' try: + if options.json: + databynode = {} session = client.Command() for res in session.read(url.format(noderange)): printerror(res) @@ -121,6 +125,10 @@ try: continue for node in res['databynode']: printerror(res['databynode'][node], node) + if options.json: + databynode[node] = dict(databynode.get(node, {}), + **res['databynode'][node]) + continue if 'inventory' not in res['databynode'][node]: continue for inv in res['databynode'][node]['inventory']: @@ -150,6 +158,9 @@ try: print(u'{0}: {1} {2}: {3}'.format(node, prefix, pretty(datum), info[datum])) + if options.json: + print(json.dumps(databynode, sort_keys=True, indent=4, + separators=(',', ': '))) except KeyboardInterrupt: print('') -sys.exit(exitcode) \ No newline at end of file +sys.exit(exitcode) From 0d4b1a4213bb826c25c2884ca7d1e6f9c5428537 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 11 May 2018 14:18:04 -0400 Subject: [PATCH 38/45] Amend json output Have the nodeinventory json output in a bit more directly useful format, rather than regarding the API structured JSON... --- confluent_client/bin/nodeinventory | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/confluent_client/bin/nodeinventory b/confluent_client/bin/nodeinventory index d7a1acb7..9cef8549 100755 --- a/confluent_client/bin/nodeinventory +++ b/confluent_client/bin/nodeinventory @@ -125,16 +125,12 @@ try: continue for node in res['databynode']: printerror(res['databynode'][node], node) - if options.json: - databynode[node] = dict(databynode.get(node, {}), - **res['databynode'][node]) - continue if 'inventory' not in res['databynode'][node]: continue for inv in res['databynode'][node]['inventory']: prefix = inv['name'] if not inv['present']: - if not filters: + if not filters and not options.json: print '{0}: {1}: Not Present'.format(node, prefix) continue info = inv['information'] @@ -144,6 +140,11 @@ try: info.pop('product_extra', None) if 'memory_type' in info: if not filters: + if options.json: + if node not in databynode: + databynode[node] = {} + databynode[node][prefix] = inv + continue print_mem_info(node, prefix, info) continue for datum in info: @@ -155,6 +156,11 @@ try: continue if info[datum] is None: continue + if options.json: + if node not in databynode: + databynode[node] = {} + databynode[node][prefix] = inv + break print(u'{0}: {1} {2}: {3}'.format(node, prefix, pretty(datum), info[datum])) From 173f1eaf7ebc69e2faf41cd409106e4129880229 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 11 May 2018 14:24:47 -0400 Subject: [PATCH 39/45] Include absent devices in the json of nodeinventory --- confluent_client/bin/nodeinventory | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/confluent_client/bin/nodeinventory b/confluent_client/bin/nodeinventory index 9cef8549..9c8a6f2b 100755 --- a/confluent_client/bin/nodeinventory +++ b/confluent_client/bin/nodeinventory @@ -130,8 +130,13 @@ try: for inv in res['databynode'][node]['inventory']: prefix = inv['name'] if not inv['present']: - if not filters and not options.json: - print '{0}: {1}: Not Present'.format(node, prefix) + if not filters: + if options.json: + if node not in databynode: + databynode[node] = {} + databynode[node][prefix] = inv + else: + print '{0}: {1}: Not Present'.format(node, prefix) continue info = inv['information'] info.pop('board_extra', None) From 3cc9ee1a1793898e3d328e962b39a52686f2145d Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 11 May 2018 14:49:46 -0400 Subject: [PATCH 40/45] Block some early startup problems in collective --- confluent_server/confluent/collective/manager.py | 3 +++ confluent_server/confluent/config/configmanager.py | 8 +++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 90a5e55b..ece48b14 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -55,6 +55,9 @@ def connect_to_leader(cert=None, name=None, leader=None): keydata = tlvdata.recv(remote) if 'error' in keydata: if 'leader' in keydata: + ldrc = cfm.get_collective_member_by_address(keydata['leader']) + if ldrc and ldrc['name'] == name: + raise Exception("Redirected to self") return connect_to_leader(name=name, leader=keydata['leader']) raise Exception(keydata['error']) colldata = tlvdata.recv(remote) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 4b34151d..59c98d68 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -446,13 +446,19 @@ def add_collective_member(name, address, fingerprint): ConfigManager._bg_sync_to_file() def list_collective(): + if _cfgstore is None: + init() return iter(_cfgstore.get('collective', ())) def get_collective_member(name): - return _cfgstore['collective'][name] + if _cfgstore is None: + init() + return _cfgstore.get('collective', {}).get(name, None) def get_collective_member_by_address(address): + if _cfgstore is None: + init() for name in _cfgstore.get('collective', {}): currdrone = _cfgstore['collective'][name] if netutil.addresses_match(address, currdrone['address']): From aa2be98dc3cc76dc090df0de09c7463ec40313fe Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 11 May 2018 15:00:18 -0400 Subject: [PATCH 41/45] Fix not having currentleader set A slave node now recognizes itself as such. --- confluent_server/confluent/collective/manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index ece48b14..335dae9a 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -79,6 +79,7 @@ def connect_to_leader(cert=None, name=None, leader=None): cfm.set_global(globvar, globaldata[globvar]) cfm.ConfigManager(tenant=None)._load_from_json(dbjson) cfm.ConfigManager._bg_sync_to_file() + currentleader = leader cfm.follow_channel(remote) def get_myname(): From 297513bba7c06e689c70b392bfd07ad256c7cd90 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 11 May 2018 15:03:41 -0400 Subject: [PATCH 42/45] Correct scope of currentleader --- confluent_server/confluent/collective/manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 335dae9a..002eb94b 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -33,6 +33,7 @@ currentleader = None def connect_to_leader(cert=None, name=None, leader=None): + global currentleader if leader is None: leader = currentleader remote = socket.create_connection((leader, 13001)) From 7a912b31cb31847859927815eabcea48c48f1f7b Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 14 May 2018 15:35:44 -0400 Subject: [PATCH 43/45] Fix transaction count in collective Slave members were not persisting the value to disk --- confluent_server/confluent/config/configmanager.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 59c98d68..46e95100 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -408,6 +408,7 @@ def clear_configuration(): cfgleader = None def follow_channel(channel): global cfgleader + global _txcount cfgleader = channel msg = channel.recv(8) while msg: @@ -420,10 +421,10 @@ def follow_channel(channel): raise Exception('Truncated message error') rpc += nrpc rpc = cPickle.loads(rpc) - if 'function' in rpc: - globals()[rpc['function']](*rpc['args']) if 'txcount' in rpc: _txcount = rpc['txcount'] + if 'function' in rpc: + globals()[rpc['function']](*rpc['args']) if 'xid' in rpc and rpc['xid']: _pendingchangesets[rpc['xid']].send() msg = channel.recv(8) From 4f73ddc41e722a0b9b3562deab5cf0688e44dca2 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 14 May 2018 16:22:27 -0400 Subject: [PATCH 44/45] Start setting the stage for leader change on restart Have connect() have a way to recover if leader is dead. Also these will be involved in configmanager detected loss of leader --- .../confluent/collective/manager.py | 57 +++++++++++++++++-- .../confluent/config/configmanager.py | 6 ++ 2 files changed, 58 insertions(+), 5 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 002eb94b..eebfc332 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -51,8 +51,11 @@ def connect_to_leader(cert=None, name=None, leader=None): raise Exception("Certificate mismatch in the collective") tlvdata.recv(remote) # the banner tlvdata.recv(remote) # authpassed... 0.. + if name is None: + name = get_myname() tlvdata.send(remote, {'collective': {'operation': 'connect', - 'name': name}}) + 'name': name, + 'txcount': cfm._txcount}}) keydata = tlvdata.recv(remote) if 'error' in keydata: if 'leader' in keydata: @@ -159,6 +162,23 @@ def handle_connection(connection, cert, request, local=False): tlvdata.send(connection, {'collective': {'approval': myrsp, 'leader': get_leader(connection)}}) + if 'assimilate' == operation: + drone = request['name'] + droneinfo = cfm.get_collective_member(drone) + if not util.cert_matches(droneinfo['fingerprint'], cert): + tlvdata.send(connection, + {'error': 'Invalid certificate, ' + 'redo invitation process'}) + return + if request['txcount'] < cfm._txcount: + tlvdata.send(connection, + {'error': 'Refusing to be assimilated by inferior' + 'transaction count', + 'txcount': cfm._txcount}) + return + eventlet.spawn_n(connect_to_leader, None, None, + leader=connection.getpeername()[0]) + tlvdata.send(connection, {'status': 0}) if 'connect' == operation: myself = connection.getsockname()[0] if myself != get_leader(connection): @@ -171,9 +191,18 @@ def handle_connection(connection, cert, request, local=False): droneinfo = cfm.get_collective_member(drone) if not util.cert_matches(droneinfo['fingerprint'], cert): tlvdata.send(connection, - {'error': 'Invalid certificate,' + {'error': 'Invalid certificate, ' 'redo invitation process'}) return + if request['txcount'] > cfm._txcount: + retire_as_leader() + tlvdata.send(connection, + {'error': 'Client has higher tranasaction count, ' + 'should assimilate me, connecting..', + 'txcount': cfm._txcount}) + eventlet.spawn_n(connect_to_leader, None, None, + connection.getpeername()[0]) + return tlvdata.send(connection, cfm._dump_keys(None, False)) tlvdata.send(connection, cfm._cfgstore['collective']) tlvdata.send(connection, cfm.get_globals()) @@ -186,12 +215,30 @@ def handle_connection(connection, cert, request, local=False): # ok, we have a connecting member whose certificate checks out # He needs to bootstrap his configuration and subscribe it to updates +def try_assimilate(drone): + pass + def get_leader(connection): - global currentleader - if currentleader is None: - currentleader = connection.getsockname()[0] + if currentleader is None or connection.getpeername()[0] == currentleader: + become_leader(connection) return currentleader +def retire_as_leader(): + global currentleader + cfm.stop_leading() + currentleader = None + +def become_leader(connection): + global currentleader + currentleader = connection.getsockname()[0] + skipaddr = connection.getpeername()[0] + for member in cfm.list_collective(): + dronecandidate = cfm.get_collective_member(member)['address'] + if dronecandidate in (currentleader, skipaddr): + continue + eventlet.spawn_n(try_assimilate, dronecandidate) + + def startup(): members = list(cfm.list_collective()) if len(members) < 2: diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 46e95100..90a8d566 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -395,6 +395,12 @@ def relay_slaved_requests(name, listener): msg = listener.recv(8) +def stop_leading(): + for stream in list(cfgstreams): + cfgstreams[stream].close() + del cfgstreams[stream] + + def clear_configuration(): global _cfgstore _cfgstore = {} From 08dcab4c7286f6bb17fe7dbcc62d1c77da86d518 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 14 May 2018 16:25:27 -0400 Subject: [PATCH 45/45] Fix load of txcount Mistake caused txcount not to restore from disk. --- confluent_server/confluent/config/configmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 90a8d566..1463ac26 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -1668,7 +1668,7 @@ class ConfigManager(object): _cfgstore = {} rootpath = cls._cfgdir try: - with open(os.path.join(rootpath, 'transactioncount', 'r')) as f: + with open(os.path.join(rootpath, 'transactioncount'), 'r') as f: _txcount = struct.unpack('!Q', f.read())[0] except IOError: pass