2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-03-26 20:23:29 +00:00

Numerous async improvements

Restore 'as available' behavior to noderange over socket

Bring the httpapi to the point where the webui is able to start working,
notably bringing the asynchttp online with the websocket.

Fix a flaw in the async ipmi that would cause hangups.
This commit is contained in:
Jarrod Johnson
2024-04-04 17:13:37 -04:00
parent 587ccd13cc
commit 7b2e32009f
5 changed files with 39 additions and 27 deletions

View File

@@ -39,6 +39,7 @@
# Much like console sessions, these will be reaped if a client spends too
# far away.
import asyncio
import collections
import confluent.exceptions as exc
import confluent.messages as messages
@@ -91,6 +92,8 @@ class AsyncSession(object):
return
self.responses.append((requestid, rsp))
if self._evt:
self._evt.send()
self._evt = None
@@ -118,11 +121,13 @@ class AsyncSession(object):
async def run_handler(self, handler, requestid):
try:
for rsp in handler:
handler = await handler
async for rsp in handler:
await self.add(requestid, rsp)
self.add(requestid, messages.AsyncCompletion())
await self.add(requestid, messages.AsyncCompletion())
except Exception as e:
self.add(requestid, e)
print(repr(e))
await self.add(requestid, e)
def get_responses(self, timeout=25):
self.reaper.cancel()
@@ -146,15 +151,17 @@ class AsyncSession(object):
yield self.responses.popleft()
async def run_handler(hdlr, env):
asyncsessid = env['HTTP_CONFLUENTASYNCID']
async def run_handler(hdlr, req):
asyncsessid = req.headers['ConfluentAsyncId']
try:
asyncsession = _asyncsessions[asyncsessid]['asyncsession']
requestid = env['HTTP_CONFLUENTREQUESTID']
requestid = req.headers['ConfluentRequestId']
except KeyError:
raise exc.InvalidArgumentException(
'Invalid Session ID or missing request id')
eventlet.spawn_n(asyncsession.run_handler, hdlr, requestid)
cloop = asyncio.get_event_loop()
cloop.create_task(asyncsession.run_handler(hdlr, requestid))
#eventlet.spawn_n(asyncsession.run_handler, hdlr, requestid)
return requestid

View File

@@ -642,14 +642,14 @@ def stripnode(iterablersp, node):
yield i
def iterate_collections(iterable, forcecollection=True):
async def iterate_collections(iterable, forcecollection=True):
for coll in iterable:
if forcecollection and coll[-1] != '/':
coll += '/'
yield msg.ChildCollection(coll, candelete=True)
def iterate_resources(fancydict):
async def iterate_resources(fancydict):
for resource in fancydict:
if resource.startswith("_"):
continue
@@ -768,7 +768,7 @@ def create_noderange(inputdata, configmanager):
def enumerate_collections(collections):
async def enumerate_collections(collections):
for collection in collections:
yield msg.ChildCollection(collection)
@@ -1107,10 +1107,10 @@ async def handle_node_request(configmanager, inputdata, operation,
'element': pathcomponents, 'configmanager': configmanager,
'inputdata': inputdata, 'operation': operation, 'isnoderange': isnoderange}))
if isnoderange or not autostrip:
return [x async for x in iterate_queue(numworkers, passvalues)]
return iterate_queue(numworkers, passvalues) # [x async for x in iterate_queue(numworkers, passvalues)]
else:
if numworkers > 0:
return [x async for x in iterate_queue(numworkers, passvalues, nodes[0])]
return iterate_queue(numworkers, passvalues, nodes[0]) # [x async for x in iterate_queue(numworkers, passvalues, nodes[0])]
else:
raise exc.NotImplementedException()

View File

@@ -26,7 +26,7 @@ try:
except ImportError:
webauthn = None
import asyncio
from aiohttp import web, web_urldispatcher, connector, ClientSession
from aiohttp import web, web_urldispatcher, connector, ClientSession, WSMsgType
import confluent.auth as auth
import confluent.config.attributes as attribs
import confluent.config.configmanager as configmanager
@@ -243,7 +243,7 @@ def _csrf_valid(req, session):
if 'csrftoken' not in session:
# The client has not (yet) requested CSRF protection
# so we return true
if 'ConfluentAuthToken' in req.headers:
if 'Confluentauthtoken' in req.headers:
# The client has requested CSRF countermeasures,
# oblige the request and apply a new token to the
# session
@@ -264,7 +264,7 @@ def _csrf_valid(req, session):
# The session has CSRF protection enabled, only mark valid if
# the client has provided an auth token and that token matches the
# value protecting the session
return ('ConfluentAuthToken' in req.headers and
return ('Confluentauthtoken' in req.headers and
req.headers['ConfluentAuthToken'] == session['csrftoken'])
@@ -459,7 +459,6 @@ async def wsock_handler(req):
return
httpsessions[sessid]['inflight'].add(rsp)
name = httpsessions[sessid]['name']
print(req.rel_url.path)
authdata = auth.authorize(name, req.rel_url.path, operation='start')
if not authdata:
return
@@ -467,9 +466,9 @@ async def wsock_handler(req):
username = httpsessions[sessid]['name']
if req.rel_url.path == '/sessions/current/async':
myconsoles = {}
async def asyncwscallback(rsp):
rsp = json.dumps(rsp.raw())
await rsp.send_str(u'!' + rsp)
async def asyncwscallback(rspm):
rspm = json.dumps(rspm.raw())
await rsp.send_str(u'!' + rspm)
currsess['inflight'].add(rsp)
asess = None
try:
@@ -479,10 +478,14 @@ async def wsock_handler(req):
clientmsg = True
while clientmsg:
clientmsg = await rsp.receive()
if clientmsg.type == WSMsgType.CLOSE:
break
elif clientmsg.type != WSMsgType.TEXT:
continue
clientmsg = clientmsg.data
if clientmsg:
if clientmsg[0] == '?':
ws.send('?')
await rsp.send_str('?')
elif clientmsg[0] == '$':
targid, data = clientmsg[1:].split('$', 1)
if data[0] == ' ':
@@ -631,7 +634,6 @@ async def resourcehandler(request):
try:
if 'Sec-WebSocket-Version' in request.headers:
print('WebSocket....')
return await wsock_handler(request)
else:
return await resourcehandler_backend(request, make_response)
@@ -953,9 +955,9 @@ async def resourcehandler_backend(req, make_response):
hdlr = pluginapi.handle_path(url, operation,
cfgmgr, querydict)
if 'ConfluentAsyncId' in req.headers:
confluent.asynchttp.run_handler(hdlr, env)
await make_response('text/plain', 202, cookies=cookies)
rsp.write(b'Request queued')
await confluent.asynchttp.run_handler(hdlr, req)
rsp = await make_response('text/plain', 202, cookies=cookies)
await rsp.write(b'Request queued')
return rsp
pagecontent = ""
if mimetype == 'text/html':
@@ -1051,7 +1053,7 @@ async def _assemble_json(responses, resource=None, url=None, extension=None):
else:
links['collection'] = {"href": "./" + extension}
rspdata = {}
for rsp in await responses:
async for rsp in await responses:
if isinstance(rsp, confluent.messages.LinkRelation):
haldata = rsp.raw()
for hk in haldata:

View File

@@ -411,6 +411,7 @@ async def perform_requests(operator, nodes, element, cfg, inputdata, realop):
except asyncio.QueueEmpty:
pass
except asyncio.TimeoutError:
print("whoopsie?")
pass
finally:
for datum in sorted(
@@ -525,7 +526,7 @@ class IpmiHandler:
self.ipmicmd = persistent_ipmicmds[(node, tenant)]
giveup = util.monotonic_time() + 60
while not self.ipmicmd.ipmi_session.broken and not self.ipmicmd.ipmi_session.logged and self.ipmicmd.ipmi_session.logging:
self.ipmicmd.ipmi_session.wait_for_rsp(3)
await self.ipmicmd.ipmi_session.wait_for_rsp(3)
if util.monotonic_time() > giveup:
self.ipmicmd.ipmi_session.broken = True
return self

View File

@@ -201,7 +201,9 @@ async def send_response(responses, connection):
if responses is None:
return
responses = await responses
for rsp in responses:
if responses is None:
return
async for rsp in responses:
await send_data(connection, rsp.raw())
await send_data(connection, {'_requestdone': 1})