diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index e952ced6..792ef4b2 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -23,6 +23,7 @@ # there should be no more than one handler per node import codecs import collections +import confluent.collective.manager as collective import confluent.config.configmanager as configmodule import confluent.exceptions as exc import confluent.interface.console as conapi @@ -138,11 +139,13 @@ def pytechars2line(chars, maxlen=None): class ConsoleHandler(object): _plugin_path = '/nodes/{0}/_console/session' _logtobuffer = True - _genwatchattribs = frozenset(('console.method', 'console.logging')) + _genwatchattribs = frozenset(('console.method', 'console.logging', + 'collective.manager')) def __init__(self, node, configmanager): self.clearpending = False self._dologging = True + self._is_local = True self._isondemand = False self.error = None self._retrytime = 0 @@ -214,7 +217,7 @@ class ConsoleHandler(object): def check_isondemand(self): self._dologging = True attrvalue = self.cfgmgr.get_node_attributes( - (self.node,), ('console.logging',)) + (self.node,), ('console.logging', 'collective.manager')) if self.node not in attrvalue: self._isondemand = False elif 'console.logging' not in attrvalue[self.node]: @@ -225,6 +228,15 @@ class ConsoleHandler(object): self._isondemand = True if (attrvalue[self.node]['console.logging']['value']) in ('none', 'memory'): self._dologging = False + self.check_collective(attrvalue) + + def check_collective(self, attrvalue): + myc = attrvalue.get(self.node, {}).get('collective.manager', {}).get( + 'value', None) + if myc and myc != collective.get_myname(): + # Do not do console connect for nodes managed by another + # confluent collective member + self._is_local = False def get_buffer_age(self): """Return age of buffered data @@ -236,6 +248,10 @@ class ConsoleHandler(object): return False def _attribschanged(self, nodeattribs, configmanager, **kwargs): + if 'collective.manager' in nodeattribs[self.node]: + attrval = configmanager.get_node_attributes(self.node, + 'collective.manager') + self.check_collective(attrval) if 'console.logging' in nodeattribs[self.node]: # decide whether logging changes how we react or not self._dologging = True @@ -305,6 +321,8 @@ class ConsoleHandler(object): self._disconnect() def _connect(self): + if not self._is_local: + return if self.connectionthread: self.connectionthread.kill() self.connectionthread = None @@ -568,7 +586,12 @@ def _nodechange(added, deleting, configmanager): def _start_tenant_sessions(cfm): - for node in cfm.list_nodes(): + nodeattrs = cfm.get_node_attributes(cfm.list_nodes(), 'collective.manager') + for node in nodeattrs: + manager = nodeattrs[node].get('collective.manager', {}).get('value', + None) + if manager and collective.get_myname() != manager: + continue try: connect_node(node, cfm) except: