From c3cafd9bf809082f4f1b4a7f21f8aead441538ec Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 11 Apr 2024 09:09:02 -0400 Subject: [PATCH] Purge eventlet and greenlet and long-polling support Rather than try to support long deprecated http api behavior, purge it for simpler code and remove eventlet/greenlet from the http stack. --- confluent_server/confluent/asynchttp.py | 88 ++----------------------- confluent_server/confluent/httpapi.py | 73 +++----------------- 2 files changed, 17 insertions(+), 144 deletions(-) diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py index 9051aebc..7294112b 100644 --- a/confluent_server/confluent/asynchttp.py +++ b/confluent_server/confluent/asynchttp.py @@ -14,21 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Overall, the result of this shall be: -# - Web clients can create the same out-of-order responsiveness as socket -# clients (but with more complexity on their end) -# - Web clients can share single request among console sessions -# - Web clients can get async notify of things like node add/remove, events - -# This provides an async strategy to http clients. The design is that a http -# session may have an 'async' resource. In such a case, any requests are -# queued and immediately the response is given accepting the queued request. -# A request flags itself as queue-compatible through an HTTP header indicating -# the identifier of the async thread. As responses happen to the queued -# request, data is dispatched to the first registered poller for data on -# the session. This way, a client may elect to provide multiple pollers -# to mitigate general choppiness of http network pattern. It may not be -# worth it, but it's possible. +# This handles ownership of asynchronous behavior driving sessions +# with websockets. There was a long-polling HTTP mechanism but that is removed +# Now it's possible to have asynchronous requests multiplexed over a single websockets +# with none of the "choppiness" inherent to multiple long-polling requests # Additionally support console session multiplexing, to mitigate needed # connection count. @@ -44,12 +33,9 @@ import collections import confluent.exceptions as exc import confluent.messages as messages import confluent.util as util -import eventlet -import greenlet import time _asyncsessions = {} -_cleanthread = None _consolesessions = None @@ -75,27 +61,14 @@ class AsyncTermRelation(object): class AsyncSession(object): - def __init__(self, wshandler=None): + def __init__(self, wshandler): self.asyncid = _assign_asyncid(self) - self.responses = collections.deque() self.wshandler = wshandler - self._evt = None self.termrelations = [] self.consoles = set([]) - if not wshandler: - self.reaper = eventlet.spawn_after(15, self.destroy) async def add(self, requestid, rsp): - if self.wshandler: - await self.wshandler(messages.AsyncMessage((requestid, rsp))) - if self.responses is None: - return - self.responses.append((requestid, rsp)) - if self._evt: - - - self._evt.send() - self._evt = None + await self.wshandler(messages.AsyncMessage((requestid, rsp))) def set_term_relation(self, env): # need a term relation to keep track of what data belongs @@ -110,13 +83,9 @@ class AsyncSession(object): self.consoles.add(sessionid) def destroy(self): - if self._evt: - self._evt.send() - self._evt = None for console in self.consoles: _consolesessions[console]['session'].destroy() self.consoles = set([]) - self.responses = None del _asyncsessions[self.asyncid] async def run_handler(self, handler, requestid): @@ -129,28 +98,6 @@ class AsyncSession(object): print(repr(e)) await self.add(requestid, e) - def get_responses(self, timeout=25): - self.reaper.cancel() - self.reaper = eventlet.spawn_after(timeout + 15, self.destroy) - nextexpiry = time.time() + 90 - for csess in list(self.consoles): - try: - _consolesessions[csess]['expiry'] = nextexpiry - except KeyError: # session has been closed elsewhere - self.consoles.discard(csess) - if self._evt: - # TODO(jjohnson2): This precludes the goal of 'double barreled' - # access.... revisit if this could matter - raise Exception('get_responses is not re-entrant') - if not self.responses: # wait to accumulate some - self._evt = eventlet.event.Event() - with eventlet.Timeout(timeout, False): - self._evt.wait() - self._evt = None - while self.responses: - yield self.responses.popleft() - - async def run_handler(hdlr, req): asyncsessid = req.headers['ConfluentAsyncId'] try: @@ -161,17 +108,14 @@ async def run_handler(hdlr, req): 'Invalid Session ID or missing request id') cloop = asyncio.get_event_loop() cloop.create_task(asyncsession.run_handler(hdlr, requestid)) - #eventlet.spawn_n(asyncsession.run_handler, hdlr, requestid) return requestid def get_async(env, querydict): - global _cleanthread return _asyncsessions[env['HTTP_CONFLUENTASYNCID']]['asyncsession'] def handle_async(env, querydict, threadset, wshandler=None): - global _cleanthread # This may be one of two things, a request for a new async stream # or a request for next data from async stream # httpapi otherwise handles requests an injecting them to queue @@ -181,25 +125,7 @@ def handle_async(env, querydict, threadset, wshandler=None): if wshandler: yield currsess return - yield messages.AsyncSession(currsess.asyncid) - return - if querydict['asyncid'] not in _asyncsessions: - raise exc.InvalidArgumentException( - 'Invalid or expired async id') - mythreadid = greenlet.getcurrent() - threadset.add(mythreadid) - loggedout = None - currsess = None - try: - currsess = _asyncsessions[querydict['asyncid']]['asyncsession'] - for rsp in currsess.get_responses(): - yield messages.AsyncMessage(rsp) - except greenlet.GreenletExit as ge: - loggedout = ge - threadset.discard(mythreadid) - if loggedout is not None: - currsess.destroy() - raise exc.LoggedOut() + raise Exception("Long polling asynchttp is discontinued") def set_console_sessions(consolesessions): diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index cd114c40..5701ebf9 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -43,9 +43,6 @@ import confluent.shellserver as shellserver import confluent.tlvdata import confluent.util as util import copy -import eventlet -import eventlet.greenthread -import greenlet import json import socket import sys @@ -55,11 +52,11 @@ try: import urlparse except ModuleNotFoundError: import urllib.parse as urlparse -import eventlet.websocket -import eventlet.wsgi tlvdata = confluent.tlvdata +_cleaner = None + auditlog = None tracelog = None consolesessions = {} @@ -149,7 +146,7 @@ create_resource_functions = { } -def _sessioncleaner(): +async def _sessioncleaner(): while True: currtime = time.time() targsessions = [] @@ -165,7 +162,7 @@ def _sessioncleaner(): targsessions.append(session) for session in targsessions: del consolesessions[session] - eventlet.sleep(10) + await asyncio.sleep(10) def _get_query_dict(req, reqbody, reqtype): @@ -300,7 +297,7 @@ async def _authorize_request(req, operation, reqbody): for mythread in httpsessions[sessionid]['inflight']: targets.append(mythread) for mythread in targets: - eventlet.greenthread.kill(mythread) + print(repr(mythread)) forwarder.close_session(sessionid) del httpsessions[sessionid] return ('logout',) @@ -791,7 +788,6 @@ async def resourcehandler_backend(req, make_response): return rsp elif (operation == 'create' and ('/console/session' in reqpath or '/shell/sessions/' in reqpath)): - #hard bake JSON into this path, do not support other incarnations if '/console/session' in reqpath: prefix, _, _ = reqpath.partition('/console/session') shellsession = False @@ -879,54 +875,7 @@ async def resourcehandler_backend(req, make_response): rsp.write(json.dumps({'session': querydict['session']})) return rsp else: # no keys, but a session, means it's hooking to receive data - sessid = querydict['session'] - if sessid not in consolesessions: - start_response('400 Expired Session', headers) - return rsp - consolesessions[sessid]['expiry'] = time.time() + 90 - # add our thread to the 'inflight' to have a hook to terminate - # a long polling request - loggedout = None - mythreadid = greenlet.getcurrent() - httpsessions[authorized['sessionid']]['inflight'].add(mythreadid) - try: - outdata = consolesessions[sessid]['session'].get_next_output( - timeout=25) - except greenlet.GreenletExit as ge: - loggedout = ge - httpsessions[authorized['sessionid']]['inflight'].discard( - mythreadid) - if sessid not in consolesessions: - start_response('400 Expired Session', headers) - return rsp - if loggedout is not None: - consolesessions[sessid]['session'].destroy() - start_response('401 Logged out', headers) - rsp.write(b'{"loggedout": 1}') - return rsp - bufferage = False - if 'stampsent' not in consolesessions[sessid]: - consolesessions[sessid]['stampsent'] = True - bufferage = consolesessions[sessid]['session'].get_buffer_age() - if isinstance(outdata, dict): - rspdata = outdata - rspdata['session'] = querydict['session'] - else: - rspdata = {'session': querydict['session'], - 'data': outdata} - if bufferage is not False: - rspdata['bufferage'] = bufferage - try: - rspj = json.dumps(rspdata) - except UnicodeDecodeError: - try: - rspj = json.dumps(rspdata, encoding='cp437') - except UnicodeDecodeError: - rspj = json.dumps({'session': querydict['session'], - 'data': 'DECODEERROR'}) - start_response('200 OK', headers) - rsp.write(rspj) - return rsp + raise Exception("long polling console sessions are discontinued") else: # normal request url = reqpath @@ -1128,8 +1077,6 @@ async def serve(bind_host, bind_port): await runner.setup() site = web.SockSite(runner, sock) await site.start() - # eventlet.wsgi.server(sock, resourcehandler, log=False, log_output=False, - # debug=False, socket_timeout=60, keepalive=False) @@ -1142,13 +1089,13 @@ class HttpApi(object): self.bind_port = bind_port or 4005 def start(self): + global _cleaner global auditlog global tracelog + if _cleaner is None: + _cleaner = asyncio.get_event_loop().create_task( + _sessioncleaner()) tracelog = log.Logger('trace') auditlog = log.Logger('audit') self.server = asyncio.get_event_loop().create_task( serve(self.bind_host, self.bind_port)) - # self.server = eventlet.spawn(serve, self.bind_host, self.bind_port) - - -_cleaner = eventlet.spawn(_sessioncleaner)