mirror of
https://github.com/xcat2/confluent.git
synced 2026-04-25 02:01:27 +00:00
Rework async session handling
This commit is contained in:
@@ -117,7 +117,7 @@ def get_async(env, querydict):
|
||||
return _asyncsessions[env['HTTP_CONFLUENTASYNCID']]['asyncsession']
|
||||
|
||||
|
||||
def handle_async(env, querydict, threadset, wshandler=None):
|
||||
def handle_async(querydict, wshandler=None):
|
||||
# This may be one of two things, a request for a new async stream
|
||||
# or a request for next data from async stream
|
||||
# httpapi otherwise handles requests an injecting them to queue
|
||||
|
||||
@@ -487,7 +487,7 @@ async def wsock_handler(req):
|
||||
asess = None
|
||||
try:
|
||||
for asess in confluent.asynchttp.handle_async(
|
||||
{}, {}, currsess['inflight'], asyncwscallback):
|
||||
{}, asyncwscallback):
|
||||
await rsp.send_str(u' ASYNCID: {0}'.format(asess.asyncid))
|
||||
clientmsg = True
|
||||
while clientmsg:
|
||||
@@ -569,7 +569,6 @@ async def wsock_handler(req):
|
||||
myconsoles[cons].destroy()
|
||||
if asess:
|
||||
asess.destroy()
|
||||
currsess['inflight'].discard(mythreadid)
|
||||
return
|
||||
if '/console/session' in ws.path or '/shell/sessions/' in ws.path:
|
||||
def datacallback(data):
|
||||
@@ -604,7 +603,6 @@ async def wsock_handler(req):
|
||||
)
|
||||
except exc.NotFoundException:
|
||||
return
|
||||
currsess['inflight'].add(mythreadid)
|
||||
clientmsg = ws.wait()
|
||||
try:
|
||||
while clientmsg is not None:
|
||||
@@ -622,7 +620,6 @@ async def wsock_handler(req):
|
||||
ws.send(u'?')
|
||||
clientmsg = ws.wait()
|
||||
finally:
|
||||
currsess['inflight'].discard(mythreadid)
|
||||
consession.destroy()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user