diff --git a/confluent_server/confluent/log.py b/confluent_server/confluent/log.py index ee2444c1..7e34553a 100644 --- a/confluent_server/confluent/log.py +++ b/confluent_server/confluent/log.py @@ -63,11 +63,11 @@ # (a future extended version might include suport for Forward Secure Sealing # or other fields) +import asyncio import collections import confluent.config.configmanager import confluent.config.conf as conf import confluent.exceptions as exc -import eventlet import glob import json import os @@ -76,6 +76,7 @@ import stat import struct import time import traceback +import random try: unicode except NameError: @@ -113,6 +114,39 @@ except ImportError: MIDNIGHT = 24 * 60 * 60 _loggers = {} + +async def _sleep_and_run(sleeptime, func, args): + await asyncio.sleep(sleeptime) + await func(*args) + + +def spawn_after(sleeptime, func, *args): + if func is None: + raise Exception('tf') + return spawn(_sleep_and_run(sleeptime, func, args)) + + +tsks = {} + + +def spawn(coro): + tskid = random.random() + while tskid in tsks: + tskid = random.random() + tsks[tskid] = 1 + try: + tsks[tskid] = asyncio.create_task(_run(coro, tskid), name=repr(coro)) + except AttributeError: + tsks[tskid] = asyncio.get_event_loop().create_task(_run(coro, tskid), name=repr(coro)) + return tsks[tskid] + + +async def _run(coro, taskid): + ret = await coro + del tsks[taskid] + return ret + + class Events(object): ( undefined, clearscreen, clientconnect, clientdisconnect, @@ -628,7 +662,7 @@ class Logger(object): self.logentries.appendleft([DataTypes.event, tstamp, roll_data, Events.logrollover, None]) if self.closer is None: - self.closer = eventlet.spawn_after(15, self.closelog) + self.closer = spawn_after(15, self.closelog) self.writer = None def read_recent_text(self, size): @@ -777,7 +811,7 @@ class Logger(object): [ltype, timestamp, logdata, event, eventdata]) if self.buffered: if self.writer is None: - self.writer = eventlet.spawn_after(2, self.writedata) + self.writer = spawn_after(2, self.writedata) else: self.writedata()