diff --git a/confluent_server/confluent/config/conf.py b/confluent_server/confluent/config/conf.py new file mode 100644 index 00000000..58f33bda --- /dev/null +++ b/confluent_server/confluent/config/conf.py @@ -0,0 +1,50 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2014 IBM Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +#This defines config variable to store the global configuration for confluent +import ConfigParser + +_config = None + +def init_config(): + global _config + configfile = "/etc/confluent/service.cfg" + _config = ConfigParser.ConfigParser() + _config.read(configfile) + +def get_config(): + return _config + +def get_int_option(section, option): + try: + return _config.getint(section, option) + except ( + ConfigParser.NoSectionError, ConfigParser.NoOptionError, ValueError): + return None + +def get_boolean_option(section, option): + try: + return _config.getboolean(section, option) + except ( + ConfigParser.NoSectionError, ConfigParser.NoOptionError, ValueError): + return None + +def get_option(section, option): + try: + return _config.get(section, option) + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + return None diff --git a/confluent_server/confluent/exceptions.py b/confluent_server/confluent/exceptions.py index 822b1713..15d26dbe 100644 --- a/confluent_server/confluent/exceptions.py +++ b/confluent_server/confluent/exceptions.py @@ -56,4 +56,8 @@ class ForbiddenRequest(ConfluentException): class NotImplementedException(ConfluentException): # The current configuration/plugin is unable to perform # the requested task. http code 501 - pass \ No newline at end of file + pass + +class GlobalConfigError(ConfluentException): + # The configuration in the global config file is not right + pass diff --git a/confluent_server/confluent/log.py b/confluent_server/confluent/log.py index 5e15dd76..29b291cd 100644 --- a/confluent_server/confluent/log.py +++ b/confluent_server/confluent/log.py @@ -61,10 +61,15 @@ import collections import confluent.config.configmanager +import confluent.config.conf as conf +import confluent.exceptions as exc import eventlet import fcntl +import glob import json import os +import re +import stat import struct import time import traceback @@ -80,14 +85,14 @@ import traceback # if that happens, warn to have user increase ulimit for optimal # performance +MIDNIGHT = 24 * 60 * 60 _loggers = {} - class Events(object): ( undefined, clearscreen, clientconnect, clientdisconnect, - consoledisconnect, consoleconnect, stacktrace - ) = range(7) + consoledisconnect, consoleconnect, stacktrace, logrollover + ) = range(8) logstr = { 2: 'connection by ', 3: 'disconnection by ', @@ -98,6 +103,366 @@ class DataTypes(object): text, dictionary, console, event = range(4) +class RollingTypes(object): + no_rolling, size_rolling, time_rolling = range(3) + + +class BaseRotatingHandler(object): + + def __init__(self, filepath, logname): + """ + Use the specified filename for streamed logging + """ + self.filepath = filepath + self.textpath = self.filepath +logname + self.binpath = self.filepath + logname + ".cbl" + self.textfile = None + self.binfile = None + + def open(self): + if self.textfile is None: + self.textfile = open(self.textpath, mode='ab') + if self.binfile is None: + self.binfile = open(self.binpath, mode='ab') + return self.textfile, self.binfile + + def try_emit(self, binrecord, textrecord): + """ + Emit a record. + + Output the record to the file, catering for rollover as described + in doRollover(). + """ + rolling_type = self.shouldRollover(binrecord, textrecord) + if rolling_type: + return self.doRollover(rolling_type) + return None + + def emit(self, binrecord, textrecord): + if self.textfile is None: + self.textfile = open(self.textpath, mode='ab') + if self.binfile is None: + self.binfile = open(self.binpath, mode='ab') + self.textfile.write(textrecord) + self.binfile.write(binrecord) + self.textfile.flush() + self.binfile.flush() + + def get_textfile_offset(self, data_len): + if self.textfile is None: + self.textfile = open(self.textpath, mode='ab') + return self.textfile.tell() + data_len + + def close(self): + if self.textfile: + self.textfile.close + self.textfile = None + if self.binfile: + self.binfile.close + self.binfile = None + + +class TimedAndSizeRotatingFileHandler(BaseRotatingHandler): + """ + Handler for logging to a file, rotating the log file at certain timed + intervals. + + If backupCount is > 0, when rollover is done, no more than backupCount + files are kept - the oldest ones are deleted. + """ + + def __init__(self, filepath, logname, interval=1): + BaseRotatingHandler.__init__(self, filepath, logname) + try: + self.when = conf.get_option('log', 'when').upper() + except (AttributeError): + self.when = 'D' + self.backupCount = conf.get_int_option('log', 'backup_count') or 3 + self.maxBytes = conf.get_int_option( + 'log','max_bytes') or 4 * 1024 * 1024 * 1024 + if self.maxBytes < 8192: + raise exc.GlobalConfigError("The minimum value of max_bytes " + "of log rolling size in the log " + "section should larger than 8192.") + self.utc = conf.get_boolean_option('log', 'utc') or False + + # Calculate the real rollover interval, which is just the number of + # seconds between rollovers. Also set the filename suffix used when + # a rollover occurs. Current 'when' events supported: + # S - Seconds + # M - Minutes + # H - Hours + # D - Days + # midnight - roll over at midnight + # W{0-6} - roll over on a certain day; 0 - Monday + # + # Case of the 'when' specifier is not important; lower or upper case + # will work. + if self.when == 'S': + self.interval = 1 # one second + self.suffix = "%Y-%m-%d_%H-%M-%S" + self.extMatch = r"^(cbl\.){0,1}\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}\.{0,1}\d*$" + elif self.when == 'M': + self.interval = 60 # one minute + self.suffix = "%Y-%m-%d_%H-%M" + self.extMatch = r"^(cbl\.){0,1}\d{4}-\d{2}-\d{2}_\d{2}-\d{2}\.{0,1}\d*$" + elif self.when == 'H': + self.interval = 60 * 60 # one hour + self.suffix = "%Y-%m-%d_%H" + self.extMatch = r"^(cbl\.){0,1}\d{4}-\d{2}-\d{2}_\d{2}$" + elif self.when == 'D' or self.when == 'MIDNIGHT': + self.interval = 60 * 60 * 24 # one day + self.suffix = "%Y-%m-%d" + self.extMatch = r"^(cbl\.){0,1}\d{4}-\d{2}-\d{2}\.{0,1}\d*$" + elif self.when.startswith('W'): + self.interval = 60 * 60 * 24 * 7 # one week + if len(self.when) != 2: + raise ValueError("You must specify a day for weekly rollover from 0 to 6 (0 is Monday): %s" % self.when) + if self.when[1] < '0' or self.when[1] > '6': + raise ValueError("Invalid day specified for weekly rollover: %s" % self.when) + self.dayOfWeek = int(self.when[1]) + self.suffix = "%Y-%m-%d" + self.extMatch = r"^(cbl\.){0,1}\d{4}-\d{2}-\d{2}\.{0,1}\d*$" + else: + raise ValueError("Invalid rollover interval specified: %s" % self.when) + + self.extMatch = re.compile(self.extMatch) + self.interval = self.interval * interval # multiply by units requested + # Note: Get the modify time of text log file to calculate the + # rollover time + if os.path.exists(self.textpath): + t = os.stat(self.textpath)[stat.ST_MTIME] + else: + t = int(time.time()) + self.rolloverAt = self.computeRollover(t) + self.sizeRollingCount = 0 + self.initSizeRollingCount() + + def computeRollover(self, currentTime): + """ + Work out the rollover time based on the specified time. + """ + result = currentTime + self.interval + # If we are rolling over at midnight or weekly, then the interval is already known. + # What we need to figure out is WHEN the next interval is. In other words, + # if you are rolling over at midnight, then your base interval is 1 day, + # but you want to start that one day clock at midnight, not now. So, we + # have to fudge the rolloverAt value in order to trigger the first rollover + # at the right time. After that, the regular interval will take care of + # the rest. Note that this code doesn't care about leap seconds. :) + if self.when == 'MIDNIGHT' or self.when.startswith('W'): + # This could be done with less code, but I wanted it to be clear + if self.utc: + t = time.gmtime(currentTime) + else: + t = time.localtime(currentTime) + currentHour = t[3] + currentMinute = t[4] + currentSecond = t[5] + # r is the number of seconds left between now and midnight + r = MIDNIGHT - ((currentHour * 60 + currentMinute) * 60 + + currentSecond) + result = currentTime + r + # If we are rolling over on a certain day, add in the number of days until + # the next rollover, but offset by 1 since we just calculated the time + # until the next day starts. There are three cases: + # Case 1) The day to rollover is today; in this case, do nothing + # Case 2) The day to rollover is further in the interval (i.e., today is + # day 2 (Wednesday) and rollover is on day 6 (Sunday). Days to + # next rollover is simply 6 - 2 - 1, or 3. + # Case 3) The day to rollover is behind us in the interval (i.e., today + # is day 5 (Saturday) and rollover is on day 3 (Thursday). + # Days to rollover is 6 - 5 + 3, or 4. In this case, it's the + # number of days left in the current week (1) plus the number + # of days in the next week until the rollover day (3). + # The calculations described in 2) and 3) above need to have a day added. + # This is because the above time calculation takes us to midnight on this + # day, i.e. the start of the next day. + if self.when.startswith('W'): + day = t[6] # 0 is Monday + if day != self.dayOfWeek: + if day < self.dayOfWeek: + daysToWait = self.dayOfWeek - day + else: + daysToWait = 6 - day + self.dayOfWeek + 1 + newRolloverAt = result + (daysToWait * (60 * 60 * 24)) + if not self.utc: + dstNow = t[-1] + dstAtRollover = time.localtime(newRolloverAt)[-1] + if dstNow != dstAtRollover: + if not dstNow: # DST kicks in before next rollover, so we need to deduct an hour + addend = -3600 + else: # DST bows out before next rollover, so we need to add an hour + addend = 3600 + newRolloverAt += addend + result = newRolloverAt + return result + + def shouldRollover(self, binrecord, textrecord): + """ + Determine if rollover should occur. + Just compare times. + """ + # time rolling first + t = int(time.time()) + if t >= self.rolloverAt: + return RollingTypes.time_rolling + self.open() + if self.maxBytes > 0: # are we rolling over? + if self.textfile.tell() + len(textrecord) >= self.maxBytes: + return RollingTypes.size_rolling + if self.binfile.tell() + len(binrecord) >= self.maxBytes: + return RollingTypes.size_rolling + return RollingTypes.no_rolling + + def getFilesToDelete(self): + """ + Determine the files to delete when rolling over. + """ + dirName, baseName = os.path.split(self.textpath) + files = [] + prefix = baseName + "." + filePaths = glob.glob(os.path.join(dirName, "%s*" % prefix)) + fileNames = [os.path.split(f)[1] for f in filePaths] + plen = len(prefix) + t_set = set() + for fileName in fileNames: + suffix = fileName[plen:] + if self.extMatch.match(suffix): + s = suffix.split(".") + t = s[1] if suffix.startswith("cbl") else s[0] + t_set.add(t) + files.append({'time': t, 'file': os.path.join(dirName, + fileName)}) + + t_list = list(t_set) + t_list.sort() + result = [f['file'] for f in files if + f['time'] in t_list[:-(self.backupCount - 1)]] + return result + + def initSizeRollingCount(self): + """ + Init the max number of log files for current time. + """ + dirName, baseName = os.path.split(self.textpath) + prefix = baseName + "." + filePaths = glob.glob(os.path.join(dirName, "%s*" % prefix)) + fileNames = [os.path.split(f)[1] for f in filePaths] + plen = len(prefix) + for fileName in fileNames: + suffix = fileName[plen:] + try: + self.sizeRollingCount = max(self.sizeRollingCount, int(suffix)) + except ValueError: + pass + + def _sizeRoll(self): + self.close() + for i in range(self.sizeRollingCount, 0, -1): + sbfn = "%s.%d" % (self.binpath, i) + dbfn = "%s.%d" % (self.binpath, i + 1) + stfn = "%s.%d" % (self.textpath, i) + dtfn = "%s.%d" % (self.textpath, i + 1) + if os.path.exists(sbfn): + if os.path.exists(dbfn): + os.remove(dbfn) + os.rename(sbfn, dbfn) + if os.path.exists(stfn): + if os.path.exists(dtfn): + os.remove(dtfn) + os.rename(stfn, dtfn) + # size rolling happens, add statistics count + self.sizeRollingCount += 1 + dbfn = self.binpath + ".1" + dtfn = self.textpath + ".1" + if os.path.exists(dbfn): + os.remove(dbfn) + if os.path.exists(dtfn): + os.remove(dtfn) + if os.path.exists(self.binpath): + os.rename(self.binpath, dbfn) + if os.path.exists(self.textpath): + os.rename(self.textpath, dtfn) + return dbfn, dtfn + + def _timeRoll(self): + self.close() + # get the time that this sequence started at and make it a TimeTuple + currentTime = int(time.time()) + dstNow = time.localtime(currentTime)[-1] + t = self.rolloverAt - self.interval + if self.utc: + timeTuple = time.gmtime(t) + else: + timeTuple = time.localtime(t) + dstThen = timeTuple[-1] + if dstNow != dstThen: + if dstNow: + addend = 3600 + else: + addend = -3600 + timeTuple = time.localtime(t + addend) + + # if size rolling files exist + for i in range(self.sizeRollingCount, 0, -1): + sbfn = "%s.%d" % ( self.binpath, i) + dbfn = "%s.%s.%d" % ( + self.binpath, time.strftime(self.suffix, timeTuple),i) + stfn = "%s.%d" % (self.textpath, i) + dtfn = "%s.%s.%d" % ( + self.textpath, time.strftime(self.suffix, timeTuple), i) + if os.path.exists(sbfn): + if os.path.exists(dbfn): + os.remove(dbfn) + os.rename(sbfn, dbfn) + if os.path.exists(stfn): + if os.path.exists(dtfn): + os.remove(dtfn) + os.rename(stfn, dtfn) + + # As time rolling happens, reset statistics count + self.sizeRollingCount = 0 + dbfn = self.binpath + "." + time.strftime(self.suffix, timeTuple) + dtfn = self.textpath + "." + time.strftime(self.suffix, timeTuple) + + if os.path.exists(dbfn): + os.remove(dbfn) + if os.path.exists(dtfn): + os.remove(dtfn) + if os.path.exists(self.binpath): + os.rename(self.binpath, dbfn) + if os.path.exists(self.textpath): + os.rename(self.textpath, dtfn) + if self.backupCount > 0: + for s in self.getFilesToDelete(): + os.remove(s) + + newRolloverAt = self.computeRollover(currentTime) + while newRolloverAt <= currentTime: + newRolloverAt = newRolloverAt + self.interval + #If DST changes and midnight or weekly rollover, adjust for this. + if (self.when == 'MIDNIGHT' or self.when.startswith('W')) and not self.utc: + dstAtRollover = time.localtime(newRolloverAt)[-1] + if dstNow != dstAtRollover: + if not dstNow: # DST kicks in before next rollover, so we need to deduct an hour + addend = -3600 + else: # DST bows out before next rollover, so we need to add an hour + addend = 3600 + newRolloverAt += addend + self.rolloverAt = newRolloverAt + return dbfn, dtfn + + def doRollover(self, rolling_type): + """ + do a rollover based on the rolling type. + """ + if rolling_type == RollingTypes.size_rolling: + return self._sizeRoll() + if rolling_type == RollingTypes.time_rolling: + return self._timeRoll() + + class Logger(object): """ :param console: If true, [] will be used to denote non-text events. If @@ -128,20 +493,23 @@ class Logger(object): self.filepath += "consoles/" if not os.path.isdir(self.filepath): os.makedirs(self.filepath, 448) - self.textpath = self.filepath + logname - self.binpath = self.filepath + logname + ".cbl" self.writer = None self.closer = None - self.textfile = None - self.binfile = None + self.handler = TimedAndSizeRotatingFileHandler(self.filepath, logname, + interval=1) + self.lockfile = None + self.logname = logname self.logentries = collections.deque() + def _lock(self, arrribute): + if self.lockfile is None or self.lockfile.closed: + lockpath = os.path.join(self.filepath, "%s-lock" % self.logname) + self.lockfile = open(lockpath, 'a') + fcntl.flock(self.lockfile, arrribute) + def writedata(self): - if self.textfile is None: - self.textfile = open(self.textpath, mode='ab') - if self.binfile is None: - self.binfile = open(self.binpath, mode='ab') while self.logentries: + textfile, binfile = self.handler.open() entry = self.logentries.popleft() ltype = entry[0] tstamp = entry[1] @@ -156,8 +524,8 @@ class Logger(object): elif not self.isconsole: textdate = time.strftime( '%b %d %H:%M:%S ', time.localtime(tstamp)) - fcntl.flock(self.textfile, fcntl.LOCK_EX) - offset = self.textfile.tell() + len(textdate) + self._lock(fcntl.LOCK_EX) + offset = textfile.tell() + len(textdate) datalen = len(data) eventaux = entry[4] if eventaux is None: @@ -175,53 +543,95 @@ class Logger(object): textrecord = textdate + data if not textrecord.endswith('\n'): textrecord += '\n' - self.textfile.write(textrecord) - fcntl.flock(self.textfile, fcntl.LOCK_UN) - fcntl.flock(self.binfile, fcntl.LOCK_EX) - self.binfile.write(binrecord) - fcntl.flock(self.binfile, fcntl.LOCK_UN) - self.textfile.flush() - self.binfile.flush() + files = self.handler.try_emit(binrecord, textrecord) + if not files: + self.handler.emit(binrecord, textrecord) + else: + # Log the rolling event at first, then log the last data + # which cause the rolling event. + to_bfile, to_tfile = files + self.logentries.appendleft(entry) + roll_data = "rename:%s>%s" % (self.handler.textpath, to_tfile) + self.logentries.appendleft([DataTypes.event, tstamp, roll_data, + Events.logrollover, None]) + self._lock(fcntl.LOCK_UN) if self.closer is None: self.closer = eventlet.spawn_after(15, self.closelog) self.writer = None def read_recent_text(self, size): + + def parse_last_rolling_files(textfile, offset, datalen): + textfile.seek(offset, 0) + textpath = textfile.read(datalen).split('>')[1] + dir_name, base_name = os.path.split(textpath) + temp = base_name.split('.') + temp.insert(1,'cbl') + # find the recent bin file + binpath = os.path.join(dir_name, ".".join(temp)) + return textpath, binpath + + textpath = self.handler.textpath + binpath = self.handler.binpath try: - textfile = open(self.textpath, mode='r') - binfile = open(self.binpath, mode='r') + textfile = open(textpath, mode='r') + binfile = open(binpath, mode='r') except IOError: return '', 0, 0 - fcntl.flock(binfile, fcntl.LOCK_SH) + self._lock(fcntl.LOCK_SH) binfile.seek(0, 2) - binidx = binfile.tell() - 16 + binidx = binfile.tell() currsize = 0 offsets = [] termstate = None recenttimestamp = 0 + access_last_rename = False while binidx > 0 and currsize < size: - binfile.seek(binidx, 0) binidx -= 16 + binfile.seek(binidx, 0) recbytes = binfile.read(16) (_, ltype, offset, datalen, tstamp, evtdata, eventaux, _) = \ struct.unpack(">BBIHIBBH", recbytes) - if ltype != 2: + # rolling events found. + if ltype == DataTypes.event and evtdata == Events.logrollover: + # Now, we can only find the last renamed file which logging + # the data. + if access_last_rename == False: + access_last_rename = True + else: + break + textpath, binpath = parse_last_rolling_files(textfile, offset, + datalen) + # Rolling event detected, close the current bin file, then open + # the renamed bin file. + binfile.close() + try: + binfile = open(binpath, mode='r') + except IOError: + return '', 0, 0 + binfile.seek(0, 2) + binidx = binfile.tell() + elif ltype != 2: continue if tstamp > recenttimestamp: recenttimestamp = tstamp currsize += datalen - offsets.append((offset, datalen)) + offsets.append((offset, datalen, textpath)) if termstate is None: termstate = eventaux - fcntl.flock(binfile, fcntl.LOCK_UN) binfile.close() textdata = '' - fcntl.flock(textfile, fcntl.LOCK_SH) while offsets: - (offset, length) = offsets.pop() + (offset, length, textpath) = offsets.pop() + if textfile.name != textpath: + textfile.close() + try: + textfile = open(textpath) + except IOError: + return '', 0, 0 textfile.seek(offset, 0) textdata += textfile.read(length) - fcntl.flock(textfile, fcntl.LOCK_UN) + self._lock(fcntl.LOCK_UN) textfile.close() if termstate is None: termstate = 0 @@ -270,8 +680,5 @@ class Logger(object): self.writer = eventlet.spawn_after(2, self.writedata) def closelog(self): - self.textfile.close() - self.binfile.close() - self.textfile = None - self.binfile = None + self.handler.close() self.closer = None diff --git a/confluent_server/confluent/main.py b/confluent_server/confluent/main.py index 8533d551..ec3cd501 100644 --- a/confluent_server/confluent/main.py +++ b/confluent_server/confluent/main.py @@ -27,6 +27,7 @@ import atexit import confluent.auth as auth +import confluent.config.conf as conf import confluent.config.configmanager as configmanager import confluent.consoleserver as consoleserver import confluent.core as confluentcore @@ -40,7 +41,6 @@ import fcntl import sys import os import signal -import ConfigParser def _daemonize(): @@ -126,10 +126,9 @@ def _initsecurity(config): def run(): _checkpidfile() - configfile = "/etc/confluent/service.cfg" - config = ConfigParser.ConfigParser() - config.read(configfile) + conf.init_config() try: + config = conf.get_config() _initsecurity(config) except: sys.stderr.write("Error unlocking credential store\n") @@ -150,8 +149,8 @@ def run(): #dbgsock = eventlet.listen("/var/run/confluent/dbg.sock", # family=socket.AF_UNIX) #eventlet.spawn_n(backdoor.backdoor_server, dbgsock) - http_bind_host, http_bind_port = _get_connector_config(config, 'http') - sock_bind_host, sock_bind_port = _get_connector_config(config, 'socket') + http_bind_host, http_bind_port = _get_connector_config('http') + sock_bind_host, sock_bind_port = _get_connector_config('socket') consoleserver.start_console_sessions() webservice = httpapi.HttpApi(http_bind_host, http_bind_port) webservice.start() @@ -161,12 +160,7 @@ def run(): while 1: eventlet.sleep(100) - -def _get_connector_config(config, session): - try: - host = config.get(session, 'bindhost') - port = config.getint(session, 'bindport') - except (ConfigParser.NoSectionError, ConfigParser.NoOptionError) as e: - host = None - port = None +def _get_connector_config(session): + host = conf.get_option(session, 'bindhost') + port = conf.get_int_option(session, 'bindport') return (host, port)