2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-13 12:21:30 +00:00

Refine asyncsupport

Asyncsupport progress continues.  Renaming from 'multiplex'
as 'async' seems to describe the pattern better.
This commit is contained in:
Jarrod Johnson
2016-03-18 17:04:05 -04:00
parent bcb9c2660f
commit 7d67ea0685
4 changed files with 159 additions and 94 deletions

View File

@@ -0,0 +1,128 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2016 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Overall, the result of this shall be:
# - Web clients can create the same out-of-order responsiveness as socket
# clients (but with more complexity on their end)
# - Web clients can share single request among console sessions
# - Web clients can get async notify of things like node add/remove, events
# This provides an async strategy to http clients. The design is that a http
# session may have an 'async' resource. In such a case, any requests are
# queued and immediately the response is given accepting the queued request.
# A request flags itself as queue-compatible through an HTTP header indicating
# the identifier of the async thread. As responses happen to the queued
# request, data is dispatched to the first registered poller for data on
# the session. This way, a client may elect to provide multiple pollers
# to mitigate general choppiness of http network pattern. It may not be
# worth it, but it's possible.
# Additionally support console session multiplexing, to mitigate needed
# connection count.
# Also, this should allow a client to register for notifications of things
# like node add/delete or an event firing, ultimately.
# Much like console sessions, these will be reaped if a client spends too
# far away.
import collections
import confluent.exceptions as exc
import confluent.messages as messages
import confluent.util as util
import eventlet
_asyncsessions = {}
_cleanthread = None
def _assign_asyncid(asyncsession):
sessid = util.randomstring(32)
while sessid in _asyncsessions:
sessid = util.randomstring(32)
_asyncsessions[sessid] = {'asyncsession': asyncsession}
return sessid
class AsyncSession(object):
def __init__(self):
self.asyncid = _assign_asyncid(self)
self.responses = collections.deque()
self._evt = None
self.reaper = eventlet.spawn_after(15, self.destroy)
def add(self, rsp, requestid):
self.responses.append(rsp, requestid)
if self._evt:
self._evt.send()
self._evt = None
def destroy(self):
if self._evt:
self._evt.send()
self._evt = None
del _asyncsessions[self.asyncid]
def run_handler(self, handler, requestid):
for rsp in handler:
self.add(rsp, requestid)
self.add({'_requestdone': True}, requestid)
def get_responses(self, timeout=25):
self.reaper.cancel()
self.reaper = eventlet.spawn_after(timeout + 15, self.destroy)
if self._evt():
# TODO(jjohnson2): This precludes the goal of 'double barreled'
# access.... revisit if this could matter
raise Exception('get_responses is not re-entrant')
if not self.responses: # wait to accumulate some
self._evt = eventlet.event.Event()
with eventlet.Timout(timeout, False):
self._evt.wait()
self._evt = None
while self.responses:
yield self.responses.popleft()
def run_handler(hdlr, env):
asyncsessid = env['HTTP_CONFLUENTASYNCID']
try:
asyncsession = _asyncsessions[asyncsessid]
requestid = env['HTTP_CONFLUENTREQUESTID']
except KeyError:
raise exc.InvalidArgumentException(
'Invalid Session ID or missing request id')
eventlet.spawn_n(asyncsession.run_handler, hdlr, requestid)
return requestid
def handle_async(env, querydict):
global _cleanthread
# This may be one of two things, a request for a new async stream
# or a request for next data from async stream
# httpapi otherwise handles requests an injecting them to queue
if 'asyncid' not in querydict or not querydict['asyncid']:
# This is a new request, create a new multiplexer
currsess = AsyncSession()
yield messages.AsyncSession(currsess.asyncid)
return
currsess = _asyncsessions[querydict['asyncid']]['asyncsession']
for rsp in currsess.get_responses():
yield rsp

View File

@@ -25,7 +25,7 @@ import confluent.exceptions as exc
import confluent.log as log
import confluent.messages
import confluent.core as pluginapi
import confluent.requestmultiplexer
import confluent.asynchttp
import confluent.shellserver as shellserver
import confluent.tlvdata
import confluent.util as util
@@ -353,8 +353,14 @@ def resourcehandler_backend(env, start_response):
("Set-Cookie", m.OutputString())
for m in authorized['cookie'].values())
cfgmgr = authorized['cfgmgr']
if (operation == 'create') and env['PATH_INFO'] == '/multiplexer':
confluent.multiplexer.handle_http(env, querydict)
if (operation == 'create') and env['PATH_INFO'] == '/sessions/current/async':
pagecontent = ""
for rsp in _assemble_json(
confluent.asynchttp.handle_async(env, querydict)):
pagecontent += rsp
start_response("200 OK", headers)
yield pagecontent
return
elif (operation == 'create' and ('/console/session' in env['PATH_INFO'] or
'/shell/sessions/' in env['PATH_INFO'])):
#hard bake JSON into this path, do not support other incarnations
@@ -480,7 +486,11 @@ def resourcehandler_backend(env, start_response):
try:
hdlr = pluginapi.handle_path(url, operation,
cfgmgr, querydict)
if 'HTTP_CONFLUENTASYNCID' in env:
asynchttp.run_handler(hdlr, env)
start_response('202 Accepted', headers)
yield 'Request queued'
return
pagecontent = ""
if mimetype == 'text/html':
for datum in _assemble_html(hdlr, resource, lquerydict, url,
@@ -572,21 +582,21 @@ def _assemble_html(responses, resource, querydict, url, extension):
'</form></body></html>')
def _assemble_json(responses, resource, url, extension):
def _assemble_json(responses, resource=None, url=None, extension=None):
#NOTE(jbjohnso) I'm considering giving up on yielding bit by bit
#in json case over http. Notably, duplicate key values from plugin
#overwrite, but we'd want to preserve them into an array instead.
#the downside is that http would just always blurt it ll out at
#once and hold on to all the data in memory
links = {
'self': {"href": resource + extension},
}
if url == '/':
pass
elif resource[-1] == '/':
links['collection'] = {"href": "../" + extension}
else:
links['collection'] = {"href": "./" + extension}
links = {}
if resource is not None:
links['self'] = {"href": resource + extension}
if url == '/':
pass
elif resource[-1] == '/':
links['collection'] = {"href": "../" + extension}
else:
links['collection'] = {"href": "./" + extension}
rspdata = {}
for rsp in responses:
if isinstance(rsp, confluent.messages.LinkRelation):

View File

@@ -835,6 +835,13 @@ class EventCollection(ConfluentMessage):
self.kvpairs = {name: {'events': eventdata}}
class AsyncSession(ConfluentMessage):
def __init__(self, id):
self.desc = 'foo'
self.notnode = True
self.stripped = True
self.kvpairs = {'asyncid': id}
class User(ConfluentMessage):
def __init__(self, uid, username, privilege_level, name=None):
self.desc = 'foo'

View File

@@ -1,80 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2016 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This module handles the task of multplexing console and any watchers.
# For example, 3 console windows may share a single http long poller
# It can additionally add watchers for certain messages
# messages.py will then check in for any watchers for the relevant resource
# and trigger notifications on watchers.
# This will allow a request to watch each individual nodes power state ond/or
# health results will async come
# over the watcher. A client may only request to monitor a resource
# if it would normally be allowed to actually request it. Tho monitoring
# continues, meaning any request, related or not, will send a notification
# to a watching client
# This enables, for example, for a web page to react on the fly to anyone
# noticing the health, power state, add, or delete of a node (any message
# suitably instrumented in messages.py).
# This is broken out so that messages and httpapi can both import it.
# This could be added to the socket api as well, but for now the focus shall
# be on httpapi to enable dynamic web behavior.
import confluent.util as util
import eventlet
import time
_multiplexers = {}
_cleanthread = None
def _assaign_multiplexid(multiplexer):
sessid = util.randomstring(32)
while sessid in _multiplexers:
sessid = util.randomstring(32)
_multiplexers[sessid] = {'multiplexer': multiplexer,
'expiry': time.time() + 60}
return sessid
def _expire_multiplexers():
global _cleanthread
while multiplexers:
currtime = time.time()
for session in _multiplexers:
if _multiplexers[session]['expiry'] < currtime:
del _multiplexers[session]
if multiplexers:
_cleanthread = eventlet.spawn_after(15, _expire_multiplexers)
else:
_cleanthread = None
class Multiplexer(object):
def __init__(self):
_assign_multiplexid(self)
def handle_http(env, querydict):
global _cleanthread
if _cleanthread is None:
_cleanthread = eventlet.spawn_after(60, _expire_multiplexers)
if 'multiplexid' not in querydict or not querydict['multiplexid']:
# This is a new request, create a new multiplexer
multiplexer = Multiplexer()
else:
multiplexer = _multiplexers['multiplexid']['multiplexer']