mirror of
https://github.com/xcat2/confluent.git
synced 2026-01-12 02:52:30 +00:00
First pass at implementing console auto start
This commit is contained in:
@@ -379,6 +379,23 @@ def _addchange(changeset, node, attrname):
|
||||
changeset[node][attrname] = 1
|
||||
|
||||
|
||||
def hook_new_configmanagers(callback):
|
||||
'''Register callback for new tenants
|
||||
|
||||
From the point when this function is called until the end,
|
||||
callback may be invoked to indicate a new tenant and
|
||||
callback is notified to perform whatever tasks appropriate for
|
||||
a new tenant
|
||||
|
||||
:param callback: Function to call for each possible config manager
|
||||
:returns: identifier that can be used to cancel this registration
|
||||
'''
|
||||
#TODO(jbjohnso): actually live up to the promise of ongoing callbacks
|
||||
callback(ConfigManager(None))
|
||||
for tenant in _cfgstore['tenant'].iterkeys():
|
||||
callback(ConfigManager(tenant))
|
||||
|
||||
|
||||
class ConfigManager(object):
|
||||
_cfgdir = "/etc/confluent/cfg/"
|
||||
_cfgwriter = None
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#we track nodes that are actively being logged, watched, or have attached
|
||||
#there should be no more than one handler per node
|
||||
import collections
|
||||
import confluent.config.configmanager as configmodule
|
||||
import confluent.exceptions as exc
|
||||
import confluent.interface.console as conapi
|
||||
import confluent.log as log
|
||||
@@ -118,6 +119,15 @@ class _ConsoleHandler(object):
|
||||
self._send_rcpts({'connectstate': self.connectstate})
|
||||
self._connect()
|
||||
|
||||
def close(self):
|
||||
self._send_rcpts({'deleting': True})
|
||||
if self._console:
|
||||
self._console.close()
|
||||
self._console = None
|
||||
if self.connectionthread:
|
||||
self.connectionthread.kill()
|
||||
self.connectionthread = None
|
||||
|
||||
def unregister_rcpt(self, handle):
|
||||
self.clientcount -= 1
|
||||
if handle in self.rcpts:
|
||||
@@ -254,6 +264,30 @@ class _ConsoleHandler(object):
|
||||
self._console.write(data)
|
||||
|
||||
|
||||
def disconnect_node(node, configmanager):
|
||||
consk = (node, configmanager.tenant)
|
||||
if consk in _handled_consoles:
|
||||
_handled_consoles[consk].close()
|
||||
del _handled_consoles[consk]
|
||||
|
||||
|
||||
def _nodechange(added, deleting, configmanager):
|
||||
for node in added:
|
||||
connect_node(node, configmanager)
|
||||
for node in deleting:
|
||||
disconnect_node(node, configmanager)
|
||||
|
||||
|
||||
def _start_tenant_sessions(cfm):
|
||||
for node in cfm.list_nodes():
|
||||
connect_node(node, cfm)
|
||||
cfm.watch_nodecollection(_nodechange)
|
||||
|
||||
|
||||
def start_console_sessions():
|
||||
configmodule.hook_new_configmanagers(_start_tenant_sessions)
|
||||
|
||||
|
||||
def connect_node(node, configmanager):
|
||||
consk = (node, configmanager.tenant)
|
||||
if consk not in _handled_consoles:
|
||||
|
||||
Reference in New Issue
Block a user