From 28a9c9b900ef1ae7a396b5aa165619dc6a693ade Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 12 Mar 2014 17:12:01 -0400 Subject: [PATCH] Attempt to read in recent state/data from log The attempt seems to not work right at the moment, but it seems to be in the right direction and also does grab terminal state. --- confluent/consoleserver.py | 18 ++++++++++++---- confluent/log.py | 42 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/confluent/consoleserver.py b/confluent/consoleserver.py index 4eb92579..b4140219 100644 --- a/confluent/consoleserver.py +++ b/confluent/consoleserver.py @@ -24,10 +24,17 @@ class _ConsoleHandler(object): self.node = node self.logger = log.Logger(node, tenant=configmanager.tenant) self.buffer = bytearray() - self._connect() - self.users = {} + (text, termstate) = self.logger.read_recent_text(8192) + self.buffer += text self.appmodedetected = False self.shiftin = None + if termstate & 1: + self.appmodedetected = True + if termstate & 2: + self.shiftin = '0' + self._connect() + self.users = {} + def _connect(self): self._console = plugin.handle_path( @@ -159,6 +166,10 @@ class _ConsoleHandler(object): self._console.write(data) +def connect_node(node, configmanager): + consk = (node, configmanager.tenant) + if consk not in _handled_consoles: + _handled_consoles[consk] = _ConsoleHandler(node, configmanager) #this represents some api view of a console handler. This handles things like #holding the caller specific queue data, for example, when http api should be #sending data, but there is no outstanding POST request to hold it, @@ -178,8 +189,7 @@ class ConsoleSession(object): consk = (node, self.tenant) self.ckey = consk self.username = username - if consk not in _handled_consoles: - _handled_consoles[consk] = _ConsoleHandler(node, configmanager) + connect_node(node, configmanager) _handled_consoles[consk].attachuser(username) self._evt = threading.Event() self.node = node diff --git a/confluent/log.py b/confluent/log.py index a2c7a6f9..1601f683 100644 --- a/confluent/log.py +++ b/confluent/log.py @@ -48,6 +48,7 @@ import collections import confluent.config.configmanager as configuration import eventlet +import fcntl import os import struct import time @@ -116,6 +117,7 @@ class Logger(object): elif not self.isconsole: textdate = time.strftime( '%b %d %H:%M:%S ', time.localtime(tstamp)) + fcntl.flock(self.textfile, fcntl.LOCK_EX) offset = self.textfile.tell() + len(textdate) datalen = len(data) eventaux = entry[4] @@ -132,13 +134,53 @@ class Logger(object): else: textrecord = textdate + data + '\n' self.textfile.write(textrecord) + fcntl.flock(self.textfile, fcntl.LOCK_UN) + fcntl.flock(self.binfile, fcntl.LOCK_EX) self.binfile.write(binrecord) + fcntl.flock(self.binfile, fcntl.LOCK_UN) self.textfile.flush() self.binfile.flush() if self.closer is None: self.closer = eventlet.spawn_after(15, self.closelog) self.writer = None + def read_recent_text(self, size): + try: + textfile = open(self.textpath, mode='r') + binfile = open(self.binpath, mode='r') + except IOError: + return ('', 0) + fcntl.flock(binfile, fcntl.LOCK_SH) + binfile.seek(0, 2) + binidx = binfile.tell() - 16 + currsize = 0 + offsets = collections.deque() + termstate = 0 + while binidx > 0 and currsize < size: + binfile.seek(binidx, 0) + binidx -= 16 + recbytes = binfile.read(16) + (_, ltype, offset, datalen, tstamp, evtdata, eventaux, _) = \ + struct.unpack(">BBIHIBBH", recbytes) + binrecord = struct.pack(">BBIHIBBH", + 16, ltype, offset, datalen, tstamp, evtdata, eventaux, 0) + if ltype != 2: + continue + currsize += datalen + offsets.append((offset, datalen)) + termstate = termstate | eventaux + fcntl.flock(binfile, fcntl.LOCK_UN) + binfile.close() + textdata = '' + fcntl.flock(textfile, fcntl.LOCK_SH) + while offsets: + (offset, len) = offsets.popleft() + textfile.seek(offset, 0) + textdata += textfile.read(len) + fcntl.flock(textfile, fcntl.LOCK_UN) + textfile.close() + return (textdata, termstate) + def log(self, logdata=None, ltype=None, event=0, eventdata=None): if type(logdata) not in (str, unicode, dict): raise Exception("Unsupported logdata")