diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index 1bb35bf3..a5f8e743 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -349,9 +349,9 @@ class ConsoleHandler(object): self.reconnect.cancel() self.reconnect = None try: - self._console = plugin.handle_path( + self._console = list(plugin.handle_path( self._plugin_path.format(self.node), - "create", self.cfgmgr) + "create", self.cfgmgr))[0] except (exc.NotImplementedException, exc.NotFoundException): self._console = None except: diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index b76c3aed..5f046358 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -55,7 +55,9 @@ except ImportError: # Only required for collective mode crypto = None import confluent.util as util +import eventlet.greenpool as greenpool import eventlet.green.ssl as ssl +import eventlet.queue as queue import itertools import os try: @@ -711,7 +713,7 @@ def handle_node_request(configmanager, inputdata, operation, else: raise Exception("TODO here") del pathcomponents[0:2] - passvalues = [] + passvalues = queue.Queue() plugroute = routespec.routeinfo inputdata = msg.get_input_message( pathcomponents, operation, inputdata, nodes, isnoderange, @@ -764,24 +766,25 @@ def handle_node_request(configmanager, inputdata, operation, nodesbyhandler[hfunc].append(node) else: nodesbyhandler[hfunc] = [node] + workers = greenpool.GreenPool() + numworkers = 0 for hfunc in nodesbyhandler: - passvalues.append(hfunc( - nodes=nodesbyhandler[hfunc], element=pathcomponents, - configmanager=configmanager, - inputdata=inputdata)) + numworkers += 1 + workers.spawn(addtoqueue, passvalues, hfunc, {'nodes': nodesbyhandler[hfunc], + 'element': pathcomponents, + 'configmanager': configmanager, + 'inputdata': inputdata}) for manager in nodesbymanager: - passvalues.append(dispatch_request( - nodes=nodesbymanager[manager], manager=manager, - element=pathcomponents, configmanager=configmanager, - inputdata=inputdata, operation=operation)) + numworkers += 1 + workers.spawn(addtoqueue, passvalues, dispatch_request, { + 'nodes': nodesbymanager[manager], 'manager': manager, + 'element': pathcomponents, 'configmanager': configmanager, + 'inputdata': inputdata, 'operation': operation}) if isnoderange or not autostrip: - return itertools.chain(*passvalues) + return iterate_queue(numworkers, passvalues) else: - if len(passvalues) > 0: - if isinstance(passvalues[0], console.Console): - return passvalues[0] - else: - return stripnode(passvalues[0], nodes[0]) + if numworkers > 0: + return iterate_queue(numworkers, passvalues, nodes[0]) else: raise exc.NotImplementedException() @@ -791,6 +794,30 @@ def handle_node_request(configmanager, inputdata, operation, # return stripnode(passvalues[0], nodes[0]) +def iterate_queue(numworkers, passvalues, strip=False): + completions = 0 + while completions < numworkers: + nv = passvalues.get() + if nv == 'theend': + completions += 1 + else: + if strip and not isinstance(nv, console.Console): + nv.strip_node(strip) + yield nv + + +def addtoqueue(theq, fun, kwargs): + try: + result = fun(**kwargs) + if isinstance(result, console.Console): + theq.put(result) + else: + for pv in result: + theq.put(pv) + finally: + theq.put('theend') + + def dispatch_request(nodes, manager, element, configmanager, inputdata, operation): a = configmanager.get_collective_member(manager)