diff --git a/confluent/consoleserver.py b/confluent/consoleserver.py index 8c36d7e5..f6c76637 100644 --- a/confluent/consoleserver.py +++ b/confluent/consoleserver.py @@ -8,6 +8,7 @@ #we track nodes that are actively being logged, watched, or have attached #there should be no more than one handler per node import confluent.interface.console as conapi +import confluent.log as log import confluent.pluginapi as plugin import eventlet import eventlet.green.threading as threading @@ -21,6 +22,7 @@ class _ConsoleHandler(object): self.rcpts = {} self.cfgmgr = configmanager self.node = node + self.logger = log.Logger(node, tenant=configmanager.tenant) self.buffer = bytearray() self._connect() @@ -55,11 +57,22 @@ class _ConsoleHandler(object): # to the console object eventlet.spawn(self._handle_console_output, data) + def attachuser(self, username): + self.logger.log( + logdata=username, ltype=log.DataTypes.event, + event=log.Events.clientconnect) + + def detachuser(self, username): + self.logger.log( + logdata=username, ltype=log.DataTypes.event, + event=log.Events.clientdisconnect) + def _handle_console_output(self, data): if type(data) == int: if data == conapi.ConsoleEvent.Disconnect: self._connect() return + self.logger.log(data) self.buffer += data #TODO: analyze buffer for registered events, examples: # panics @@ -121,12 +134,14 @@ class ConsoleSession(object): :param node: Name of the node for which this session will be created """ - def __init__(self, node, configmanager, datacallback=None): + def __init__(self, node, configmanager, username, datacallback=None): self.tenant = configmanager.tenant consk = (node, self.tenant) self.ckey = consk + self.username = username if consk not in _handled_consoles: _handled_consoles[consk] = _ConsoleHandler(node, configmanager) + _handled_consoles[consk].attachuser(username) self._evt = threading.Event() self.node = node self.conshdl = _handled_consoles[consk] @@ -142,6 +157,7 @@ class ConsoleSession(object): datacallback(recdata) def destroy(self): + _handled_consoles[self.ckey].detachuser(self.username) _handled_consoles[self.ckey].unregister_rcpt(self.reghdl) self.databuffer = None self._evt = None diff --git a/confluent/httpapi.py b/confluent/httpapi.py index 1fcf5732..15f106d7 100644 --- a/confluent/httpapi.py +++ b/confluent/httpapi.py @@ -155,6 +155,7 @@ def _authorize_request(env): return {'code': 200, 'cookie': cookie, 'cfgmgr': authdata[1], + 'username': name, 'userdata': authdata[0]} else: return {'code': 401} @@ -237,8 +238,9 @@ def resourcehandler(env, start_response): _, _, nodename = prefix.rpartition('/') if 'session' not in querydict.keys() or not querydict['session']: # Request for new session - consession = consoleserver.ConsoleSession(node=nodename, - configmanager=cfgmgr) + consession = consoleserver.ConsoleSession( + node=nodename, configmanager=cfgmgr, + username=authorized['username']) if not consession: start_response("500 Internal Server Error", headers) return diff --git a/confluent/log.py b/confluent/log.py index eff02eb0..ef25a99c 100644 --- a/confluent/log.py +++ b/confluent/log.py @@ -36,7 +36,8 @@ # The information to store: # - leading bit reserved, 0 for now # - length of metadata record 7 bits -# - type of data referenced by this entry (one byte) +# - type of data referenced by this entry (one byte), currently: +# 0=text event, 1=json, 2=console data # - offset into the text log to begin (4 bytes) # - length of data referenced by this entry (2 bytes) # - UTC timestamp of this entry in seconds since epoch (unsigned 32 bit?) @@ -44,7 +45,11 @@ # (a future extended version might include suport for Forward Secure Sealing # or other fields) +import confluent.config.configmanager as configuration +import eventlet import os +import struct +import time # on conserving filehandles: # upon write, if file not open, open it for append @@ -57,7 +62,103 @@ import os # if that happens, warn to have user increase ulimit for optimal # performance + +class Events(object): + undefined, clearscreen, clientconnect, clientdisconnect = range(4) + logstr = { + 2: 'connection by ', + 3: 'disconnection by ', + } + + +class DataTypes(object): + text, dictionary, console, event = range(4) + class Logger(object): - def __init__(self, location, console=True, configmanager): - self.location = location - os.path.isdir(location) + """ + :param console: If true, [] will be used to denote non-text events. If + False, events will be formatted like syslog: + date: message + """ + def __init__(self, logname, console=True, tenant=None): + self.filepath = configuration.get_global("logdirectory") + if self.filepath is None: + self.filepath = "/var/log/confluent/" + self.isconsole = console + if console: + self.filepath += "consoles/" + self.textpath = self.filepath + logname + self.binpath = self.filepath + logname + ".cbl" + self.writer = None + self.closer = None + self.textfile = None + self.binfile = None + self.logentries = [] + + def writedata(self): + if self.textfile is None: + self.textfile = open(self.textpath, mode='ab') + if self.binfile is None: + self.binfile = open(self.binpath, mode='ab') + for entry in self.logentries: + ltype = entry[0] + tstamp = entry[1] + data = entry[2] + evtdata = entry[3] + textdate = '' + if self.isconsole and ltype != 2: + textdate = time.strftime( + '[%m/%d %H:%M:%S ', time.localtime(tstamp)) + if ltype == DataTypes.event and evtdata in Events.logstr: + textdate += Events.logstr[evtdata] + elif not self.isconsole: + textdate = time.strftime( + '%b %d %H:%M:%S ', time.localtime(tstamp)) + offset = self.textfile.tell() + len(textdate) + datalen = len(data) + # metadata length is always 16 for this code at the moment + binrecord = struct.pack(">BBIHII", + 16, ltype, offset, datalen, tstamp, evtdata) + if self.isconsole: + if ltype == 2: + textrecord = data + else: + textrecord = textdate + data + ']' + else: + textrecord = textdate + data + '\n' + self.textfile.write(textrecord) + self.binfile.write(binrecord) + self.logentries = [] + if self.closer is None: + self.closer = eventlet.spawn_after(15, self.closelog) + self.writer = None + + def log(self, logdata=None, ltype=None, event=0): + if type(logdata) not in (str, unicode, dict): + raise Exception("Unsupported logdata") + if ltype is None: + if type(logdata) == dict: + ltype = 1 + elif self.isconsole: + ltype = 2 + else: + ltype = 0 + if self.closer is not None: + self.closer.cancel() + self.closer = None + timestamp = int(time.time()) + if (len(self.logentries) > 0 and ltype == 2 and + event == 0 and self.logentries[-1][0] == 2 and + self.logentries[-1][1] == timestamp): + self.logentries[-1][2] += logdata + else: + self.logentries.append([ltype, timestamp, logdata, event]) + if self.writer is None: + self.writer = eventlet.spawn_after(2, self.writedata) + + def closelog(self): + self.textfile.close() + self.binfile.close() + self.textfile = None + self.binfile = None + self.closer = None diff --git a/confluent/sockapi.py b/confluent/sockapi.py index 1da87cd4..31e53df7 100644 --- a/confluent/sockapi.py +++ b/confluent/sockapi.py @@ -48,6 +48,7 @@ def sessionhdl(connection, authname): authdata = None if authname and isinstance(authname, bool): authenticated = True + authname = "superuser" cfm = configmanager.ConfigManager(tenant=None) elif authname: authdata = auth.authorize(authname, element=None) @@ -58,13 +59,13 @@ def sessionhdl(connection, authname): while not authenticated: # prompt for name and passphrase tlvdata.send(connection, {'authpassed': 0}) response = tlvdata.recv(connection) - username = response['username'] + authname = response['username'] passphrase = response['passphrase'] # note(jbjohnso): here, we need to authenticate, but not # authorize a user. When authorization starts understanding # element path, that authorization will need to be called # per request the user makes - authdata = auth.check_user_passphrase(username, passphrase) + authdata = auth.check_user_passphrase(authname, passphrase) if authdata is not None: authenticated = True cfm = authdata[1] @@ -72,7 +73,7 @@ def sessionhdl(connection, authname): request = tlvdata.recv(connection) while request is not None: try: - process_request(connection, request, cfm, authdata) + process_request(connection, request, cfm, authdata, authname) except: import traceback traceback.print_exc() @@ -90,7 +91,7 @@ def send_response(responses, connection): tlvdata.send(connection, {'_requestdone': 1}) -def process_request(connection, request, cfm, authdata): +def process_request(connection, request, cfm, authdata, authname): #TODO(jbjohnso): authorize each request if type(request) == dict: operation = request['operation'] @@ -105,7 +106,8 @@ def process_request(connection, request, cfm, authdata): node = elems[2] ccons = ClientConsole(connection) consession = consoleserver.ConsoleSession( - node=node, configmanager=cfm, datacallback=ccons.sendall) + node=node, configmanager=cfm, username=authname, + datacallback=ccons.sendall) if consession is None: raise Exception("TODO") tlvdata.send(connection, {'started': 1})