From d753ac28331748d70a5f559f730322b7184da8bd Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 25 Mar 2016 14:50:47 -0400 Subject: [PATCH] Add terminal sessions to async http This functionality enables a browser to hold more terminals open than their max connection rating would normally allow. --- confluent_server/confluent/asynchttp.py | 43 ++++++++++++++++++++++++- confluent_server/confluent/httpapi.py | 17 ++++++++-- confluent_server/confluent/messages.py | 10 +++++- 3 files changed, 66 insertions(+), 4 deletions(-) diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py index 43fe5362..9e53e3bd 100644 --- a/confluent_server/confluent/asynchttp.py +++ b/confluent_server/confluent/asynchttp.py @@ -45,9 +45,11 @@ import confluent.messages as messages import confluent.util as util import eventlet import greenlet +import time _asyncsessions = {} _cleanthread = None +_consolesessions = None def _assign_asyncid(asyncsession): @@ -58,12 +60,26 @@ def _assign_asyncid(asyncsession): return sessid +class AsyncTermRelation(object): + # Need to keep an association of term object to async + # This allows the async handler to know the context of + # outgoing data to provide to calling code + def __init__(self, termid, async): + self.async = async + self.termid = termid + + def got_data(self, data): + self.async.add(data, self.termid) + + class AsyncSession(object): def __init__(self): self.asyncid = _assign_asyncid(self) self.responses = collections.deque() self._evt = None + self.termrelations = [] + self.consoles = set([]) self.reaper = eventlet.spawn_after(15, self.destroy) def add(self, rsp, requestid): @@ -72,6 +88,18 @@ class AsyncSession(object): self._evt.send() self._evt = None + def set_term_relation(self, env): + # need a term relation to keep track of what data belongs + # to what object (since the callback does not provide context + # for data, and here ultimately the client is responsible + # for sorting out which is which. + termrel = AsyncTermRelation(['HTTP_CONFLUENTREQUESTID'], self) + self.termrelations.append(termrel) + return termrel + + def add_console_session(self, sessionid): + self.consoles.add(sessionid) + def destroy(self): if self._evt: self._evt.send() @@ -85,6 +113,9 @@ class AsyncSession(object): def get_responses(self, timeout=25): self.reaper.cancel() + nextexpiry = time.time() + 90 + for csess in self.consoles: + _consolesessions[csess]['expiry'] = nextexpiry self.reaper = eventlet.spawn_after(timeout + 15, self.destroy) if self._evt: # TODO(jjohnson2): This precludes the goal of 'double barreled' @@ -111,6 +142,11 @@ def run_handler(hdlr, env): return requestid +def get_async(env, querydict): + global _cleanthread + return _asyncsessions[querydict['asyncid']]['asyncsession'] + + def handle_async(env, querydict, threadset): global _cleanthread # This may be one of two things, a request for a new async stream @@ -134,4 +170,9 @@ def handle_async(env, querydict, threadset): threadset.discard(mythreadid) if loggedout is not None: currsess.destroy() - raise exc.LoggedOut() \ No newline at end of file + raise exc.LoggedOut() + + +def set_console_sessions(consolesessions): + global _consolesessions + _consolesessions = consolesessions \ No newline at end of file diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index 7a9f0c32..a5ce66e3 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -46,6 +46,7 @@ tlvdata = confluent.tlvdata auditlog = None tracelog = None consolesessions = {} +confluent.asynchttp.set_console_sessions(consolesessions) httpsessions = {} opmap = { 'POST': 'create', @@ -394,15 +395,25 @@ def resourcehandler_backend(env, start_response): skipreplay = False if 'skipreplay' in querydict and querydict['skipreplay']: skipreplay = True + datacallback = None + async = None + if 'HTTP_CONFLUENTASYNCID' in env: + async = confluent.asynchttp.get_async(env, querydict) + termrel = async.set_term_relation(env) + datacallback = termrel.got_data try: if shellsession: consession = shellserver.ShellSession( node=nodename, configmanager=cfgmgr, - username=authorized['username'], skipreplay=skipreplay) + username=authorized['username'], skipreplay=skipreplay, + datacallback=datacallback + ) else: consession = consoleserver.ConsoleSession( node=nodename, configmanager=cfgmgr, - username=authorized['username'], skipreplay=skipreplay) + username=authorized['username'], skipreplay=skipreplay, + datacallback=datacallback + ) except exc.NotFoundException: start_response("404 Not found", headers) yield "404 - Request Path not recognized" @@ -411,6 +422,8 @@ def resourcehandler_backend(env, start_response): start_response("500 Internal Server Error", headers) return sessid = _assign_consessionid(consession) + if async: + async.add_consolesession(sessid) start_response('200 OK', headers) yield '{"session":"%s","data":""}' % sessid return diff --git a/confluent_server/confluent/messages.py b/confluent_server/confluent/messages.py index 8c13c9ac..573ce738 100644 --- a/confluent_server/confluent/messages.py +++ b/confluent_server/confluent/messages.py @@ -851,9 +851,17 @@ class AsyncMessage(ConfluentMessage): self.msgpair = pair def raw(self): + rsp = self.msgpair[1] + rspdict = None + if isinstance(rsp, ConfluentMessage): + rspdict = rsp.raw() + elif isinstance(rsp, dict): # console metadata + rspdict = rsp + else: # terminal text + rspdict = {'data': rsp} return {'asyncresponse': {'requestid': self.msgpair[0], - 'response': self.msgpair[1].raw()}} + 'response': rspdict}} class AsyncSession(ConfluentMessage): def __init__(self, id):