diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py new file mode 100644 index 00000000..65d70818 --- /dev/null +++ b/confluent_server/confluent/asynchttp.py @@ -0,0 +1,128 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2016 Lenovo +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Overall, the result of this shall be: +# - Web clients can create the same out-of-order responsiveness as socket +# clients (but with more complexity on their end) +# - Web clients can share single request among console sessions +# - Web clients can get async notify of things like node add/remove, events + +# This provides an async strategy to http clients. The design is that a http +# session may have an 'async' resource. In such a case, any requests are +# queued and immediately the response is given accepting the queued request. +# A request flags itself as queue-compatible through an HTTP header indicating +# the identifier of the async thread. As responses happen to the queued +# request, data is dispatched to the first registered poller for data on +# the session. This way, a client may elect to provide multiple pollers +# to mitigate general choppiness of http network pattern. It may not be +# worth it, but it's possible. + +# Additionally support console session multiplexing, to mitigate needed +# connection count. + +# Also, this should allow a client to register for notifications of things +# like node add/delete or an event firing, ultimately. + +# Much like console sessions, these will be reaped if a client spends too +# far away. + +import collections +import confluent.exceptions as exc +import confluent.messages as messages +import confluent.util as util +import eventlet + +_asyncsessions = {} +_cleanthread = None + + +def _assign_asyncid(asyncsession): + sessid = util.randomstring(32) + while sessid in _asyncsessions: + sessid = util.randomstring(32) + _asyncsessions[sessid] = {'asyncsession': asyncsession} + return sessid + + +class AsyncSession(object): + + def __init__(self): + self.asyncid = _assign_asyncid(self) + self.responses = collections.deque() + self._evt = None + self.reaper = eventlet.spawn_after(15, self.destroy) + + def add(self, rsp, requestid): + self.responses.append(rsp, requestid) + if self._evt: + self._evt.send() + self._evt = None + + def destroy(self): + if self._evt: + self._evt.send() + self._evt = None + del _asyncsessions[self.asyncid] + + def run_handler(self, handler, requestid): + for rsp in handler: + self.add(rsp, requestid) + self.add({'_requestdone': True}, requestid) + + def get_responses(self, timeout=25): + self.reaper.cancel() + self.reaper = eventlet.spawn_after(timeout + 15, self.destroy) + if self._evt(): + # TODO(jjohnson2): This precludes the goal of 'double barreled' + # access.... revisit if this could matter + raise Exception('get_responses is not re-entrant') + if not self.responses: # wait to accumulate some + self._evt = eventlet.event.Event() + with eventlet.Timout(timeout, False): + self._evt.wait() + self._evt = None + while self.responses: + yield self.responses.popleft() + + +def run_handler(hdlr, env): + asyncsessid = env['HTTP_CONFLUENTASYNCID'] + try: + asyncsession = _asyncsessions[asyncsessid] + requestid = env['HTTP_CONFLUENTREQUESTID'] + except KeyError: + raise exc.InvalidArgumentException( + 'Invalid Session ID or missing request id') + eventlet.spawn_n(asyncsession.run_handler, hdlr, requestid) + return requestid + + +def handle_async(env, querydict): + global _cleanthread + # This may be one of two things, a request for a new async stream + # or a request for next data from async stream + # httpapi otherwise handles requests an injecting them to queue + if 'asyncid' not in querydict or not querydict['asyncid']: + # This is a new request, create a new multiplexer + currsess = AsyncSession() + yield messages.AsyncSession(currsess.asyncid) + return + currsess = _asyncsessions[querydict['asyncid']]['asyncsession'] + for rsp in currsess.get_responses(): + yield rsp + + + diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index 8a18ea77..4769bad8 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -25,7 +25,7 @@ import confluent.exceptions as exc import confluent.log as log import confluent.messages import confluent.core as pluginapi -import confluent.requestmultiplexer +import confluent.asynchttp import confluent.shellserver as shellserver import confluent.tlvdata import confluent.util as util @@ -353,8 +353,14 @@ def resourcehandler_backend(env, start_response): ("Set-Cookie", m.OutputString()) for m in authorized['cookie'].values()) cfgmgr = authorized['cfgmgr'] - if (operation == 'create') and env['PATH_INFO'] == '/multiplexer': - confluent.multiplexer.handle_http(env, querydict) + if (operation == 'create') and env['PATH_INFO'] == '/sessions/current/async': + pagecontent = "" + for rsp in _assemble_json( + confluent.asynchttp.handle_async(env, querydict)): + pagecontent += rsp + start_response("200 OK", headers) + yield pagecontent + return elif (operation == 'create' and ('/console/session' in env['PATH_INFO'] or '/shell/sessions/' in env['PATH_INFO'])): #hard bake JSON into this path, do not support other incarnations @@ -480,7 +486,11 @@ def resourcehandler_backend(env, start_response): try: hdlr = pluginapi.handle_path(url, operation, cfgmgr, querydict) - + if 'HTTP_CONFLUENTASYNCID' in env: + asynchttp.run_handler(hdlr, env) + start_response('202 Accepted', headers) + yield 'Request queued' + return pagecontent = "" if mimetype == 'text/html': for datum in _assemble_html(hdlr, resource, lquerydict, url, @@ -572,21 +582,21 @@ def _assemble_html(responses, resource, querydict, url, extension): '') -def _assemble_json(responses, resource, url, extension): +def _assemble_json(responses, resource=None, url=None, extension=None): #NOTE(jbjohnso) I'm considering giving up on yielding bit by bit #in json case over http. Notably, duplicate key values from plugin #overwrite, but we'd want to preserve them into an array instead. #the downside is that http would just always blurt it ll out at #once and hold on to all the data in memory - links = { - 'self': {"href": resource + extension}, - } - if url == '/': - pass - elif resource[-1] == '/': - links['collection'] = {"href": "../" + extension} - else: - links['collection'] = {"href": "./" + extension} + links = {} + if resource is not None: + links['self'] = {"href": resource + extension} + if url == '/': + pass + elif resource[-1] == '/': + links['collection'] = {"href": "../" + extension} + else: + links['collection'] = {"href": "./" + extension} rspdata = {} for rsp in responses: if isinstance(rsp, confluent.messages.LinkRelation): diff --git a/confluent_server/confluent/messages.py b/confluent_server/confluent/messages.py index 3ef4128c..78845ee4 100644 --- a/confluent_server/confluent/messages.py +++ b/confluent_server/confluent/messages.py @@ -835,6 +835,13 @@ class EventCollection(ConfluentMessage): self.kvpairs = {name: {'events': eventdata}} +class AsyncSession(ConfluentMessage): + def __init__(self, id): + self.desc = 'foo' + self.notnode = True + self.stripped = True + self.kvpairs = {'asyncid': id} + class User(ConfluentMessage): def __init__(self, uid, username, privilege_level, name=None): self.desc = 'foo' diff --git a/confluent_server/confluent/multiplexer.py b/confluent_server/confluent/multiplexer.py deleted file mode 100644 index a1e6ce07..00000000 --- a/confluent_server/confluent/multiplexer.py +++ /dev/null @@ -1,80 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2016 Lenovo -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This module handles the task of multplexing console and any watchers. -# For example, 3 console windows may share a single http long poller -# It can additionally add watchers for certain messages -# messages.py will then check in for any watchers for the relevant resource -# and trigger notifications on watchers. -# This will allow a request to watch each individual nodes power state ond/or -# health results will async come -# over the watcher. A client may only request to monitor a resource -# if it would normally be allowed to actually request it. Tho monitoring -# continues, meaning any request, related or not, will send a notification -# to a watching client -# This enables, for example, for a web page to react on the fly to anyone -# noticing the health, power state, add, or delete of a node (any message -# suitably instrumented in messages.py). - -# This is broken out so that messages and httpapi can both import it. -# This could be added to the socket api as well, but for now the focus shall -# be on httpapi to enable dynamic web behavior. - -import confluent.util as util -import eventlet -import time - -_multiplexers = {} -_cleanthread = None - - -def _assaign_multiplexid(multiplexer): - sessid = util.randomstring(32) - while sessid in _multiplexers: - sessid = util.randomstring(32) - _multiplexers[sessid] = {'multiplexer': multiplexer, - 'expiry': time.time() + 60} - return sessid - - -def _expire_multiplexers(): - global _cleanthread - while multiplexers: - currtime = time.time() - for session in _multiplexers: - if _multiplexers[session]['expiry'] < currtime: - del _multiplexers[session] - if multiplexers: - _cleanthread = eventlet.spawn_after(15, _expire_multiplexers) - else: - _cleanthread = None - - -class Multiplexer(object): - def __init__(self): - _assign_multiplexid(self) - - -def handle_http(env, querydict): - global _cleanthread - if _cleanthread is None: - _cleanthread = eventlet.spawn_after(60, _expire_multiplexers) - if 'multiplexid' not in querydict or not querydict['multiplexid']: - # This is a new request, create a new multiplexer - multiplexer = Multiplexer() - else: - multiplexer = _multiplexers['multiplexid']['multiplexer'] -