From 810be7172074fba6d8aafe63107a96cb5a38efb8 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 14 Jun 2018 17:01:31 -0400 Subject: [PATCH] Initial support for non-console dispatch For non-exceptional cases, it is now functional. --- .../confluent/config/configmanager.py | 3 + confluent_server/confluent/core.py | 125 +++++++++++++++++- confluent_server/confluent/httpapi.py | 2 +- confluent_server/confluent/sockapi.py | 3 + 4 files changed, 126 insertions(+), 7 deletions(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 897a3206..114d6feb 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -759,6 +759,9 @@ class ConfigManager(object): self._cfgstore['nodes'] = {} self._bg_sync_to_file() + def get_collective_member(self, name): + return get_collective_member(name) + def filter_node_attributes(self, expression, nodes=None): """Filtered nodelist according to expression diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index dee42e07..593016a1 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -35,7 +35,9 @@ import confluent import confluent.alerts as alerts +import confluent.tlvdata as tlvdata import confluent.config.attributes as attrscheme +import confluent.config.configmanager as cfm import confluent.collective.manager as collective import confluent.discovery.core as disco import confluent.interface.console as console @@ -47,8 +49,21 @@ try: import confluent.shellmodule as shellmodule except ImportError: pass +try: + import OpenSSL.crypto as crypto +except ImportError: + # Only required for collective mode + crypto = None +import confluent.util as util +import eventlet.green.ssl as ssl import itertools import os +try: + import cPickle as pickle +except ImportError: + import pickle +import socket +import struct import sys pluginmap = {} @@ -560,6 +575,61 @@ def abbreviate_noderange(configmanager, inputdata, operation): return (msg.KeyValueData({'noderange': noderange.ReverseNodeRange(inputdata['nodes'], configmanager).noderange}),) +def handle_dispatch(connection, cert, dispatch): + cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) + if not util.cert_matches( + cfm.get_collective_member(dispatch['name'])['fingerprint'], cert): + connection.close() + configmanager = cfm.ConfigManager(dispatch['tenant']) + nodes = dispatch['nodes'] + inputdata = dispatch['inputdata'] + operation = dispatch['operation'] + pathcomponents = dispatch['path'] + routespec = nested_lookup(noderesources, pathcomponents) + plugroute = routespec.routeinfo + plugpath = None + nodesbyhandler = {} + passvalues = [] + nodeattr = configmanager.get_node_attributes( + nodes, plugroute['pluginattrs']) + for node in nodes: + for attrname in plugroute['pluginattrs']: + if attrname in nodeattr[node]: + plugpath = nodeattr[node][attrname]['value'] + elif 'default' in plugroute: + plugpath = plugroute['default'] + if plugpath is not None: + try: + hfunc = getattr(pluginmap[plugpath], operation) + except KeyError: + nodesbyhandler[BadPlugin(node, plugpath).error] = [node] + continue + if hfunc in nodesbyhandler: + nodesbyhandler[hfunc].append(node) + else: + nodesbyhandler[hfunc] = [node] + try: + for hfunc in nodesbyhandler: + passvalues.append(hfunc( + nodes=nodesbyhandler[hfunc], element=pathcomponents, + configmanager=configmanager, + inputdata=inputdata)) + for res in itertools.chain(*passvalues): + _forward_rsp(connection, res) + except Exception as res: + _forward_rsp(connection, res) + connection.sendall('\x00\x00\x00\x00\x00\x00\x00\x00') + + +def _forward_rsp(connection, res): + r = pickle.dumps(res) + rlen = len(r) + if not rlen: + return + connection.sendall(struct.pack('!Q', rlen)) + connection.sendall(r) + + def handle_node_request(configmanager, inputdata, operation, pathcomponents, autostrip=True): iscollection = False @@ -663,26 +733,25 @@ def handle_node_request(configmanager, inputdata, operation, nodeattr = configmanager.get_node_attributes( nodes, plugroute['pluginattrs'] + ['collective.manager']) plugpath = None - if 'default' in plugroute: - plugpath = plugroute['default'] - mynodes = set(nodes) nodesbymanager = {} nodesbyhandler = {} for node in nodes: for attrname in plugroute['pluginattrs']: if attrname in nodeattr[node]: plugpath = nodeattr[node][attrname]['value'] + elif 'default' in plugroute: + plugpath = plugroute['default'] if plugpath in dispatch_plugins: manager = nodeattr[node].get('collective.manager', {}).get( 'value', None) if manager: if collective.get_myname() != manager: - mynodes.discard(manager) if manager not in nodesbymanager: nodesbymanager[manager] = set([node]) else: nodesbymanager[manager].add(node) - if plugpath is not None and node in mynodes: + continue + if plugpath is not None: try: hfunc = getattr(pluginmap[plugpath], operation) except KeyError: @@ -698,7 +767,10 @@ def handle_node_request(configmanager, inputdata, operation, configmanager=configmanager, inputdata=inputdata)) for manager in nodesbymanager: - raise Exception('TODO: dispatch requests') + passvalues.append(dispatch_request( + nodes=nodesbymanager[manager], manager=manager, + element=pathcomponents, configmanager=configmanager, + inputdata=inputdata, operation=operation)) if isnoderange or not autostrip: return itertools.chain(*passvalues) else: @@ -716,6 +788,47 @@ def handle_node_request(configmanager, inputdata, operation, # return stripnode(passvalues[0], nodes[0]) +def dispatch_request(nodes, manager, element, configmanager, inputdata, + operation): + a = configmanager.get_collective_member(manager) + remote = socket.create_connection((a['address'], 13001)) + remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, + keyfile='/etc/confluent/privkey.pem', + certfile='/etc/confluent/srvcert.pem') + if not util.cert_matches(a['fingerprint'], remote.getpeercert( + binary_form=True)): + raise Exception("Invalid certificate on peer") + tlvdata.recv(remote) + tlvdata.recv(remote) + myname = collective.get_myname() + tlvdata.send(remote, + {'dispatch': {'name': myname, 'nodes': list(nodes), + 'path': element, + 'tenant': configmanager.tenant, + 'operation': operation, + 'inputdata': inputdata}}) + while True: + rlen = remote.recv(8) + while len(rlen) < 8: + nlen = remote.recv(8 - len(rlen)) + if not nlen: + raise Exception('Error receiving data') + rlen += nlen + rlen = struct.unpack('!Q', rlen)[0] + if rlen == 0: + break + rsp = remote.recv(rlen) + while len(rsp) < rlen: + nrsp = remote.recv(rlen - len(rsp)) + if not nrsp: + raise Exception('Error receving data') + rsp += nrsp + rsp = pickle.loads(rsp) + if isinstance(rsp, Exception): + raise rsp + yield rsp + + def handle_discovery(pathcomponents, operation, configmanager, inputdata): if pathcomponents[0] == 'detected': pass diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index 2a4d2fe6..27418009 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -785,7 +785,7 @@ def serve(bind_host, bind_port): ' a second\n') eventlet.sleep(1) eventlet.wsgi.server(sock, resourcehandler, log=False, log_output=False, - debug=False, socket_timeout=60) + debug=False, socket_timeout=60, keepalive=False) class HttpApi(object): diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index c76f50b2..87ec4a2e 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -119,6 +119,9 @@ def sessionhdl(connection, authname, skipauth=False, cert=None): if 'collective' in response: return collective.handle_connection(connection, cert, response['collective']) + if 'dispatch' in response: + return pluginapi.handle_dispatch(connection, cert, + response['dispatch']) authname = response['username'] passphrase = response['password'] # note(jbjohnso): here, we need to authenticate, but not