2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-06 17:01:30 +00:00

Merge branch 'master' into async

This commit is contained in:
Jarrod Johnson
2024-08-21 09:56:43 -04:00
4 changed files with 52 additions and 22 deletions

View File

@@ -90,6 +90,7 @@ class AsyncSession(object):
async def run_handler(self, handler, requestid):
try:
# iterate_responses from core maybe? handler might return other stuff
handler = await handler
async for rsp in handler:
await self.add(requestid, rsp)

View File

@@ -468,7 +468,7 @@ async def _find_service(service, target):
# pooltargs.append(('/redfish/v1/', peerdata[nid]))
tsks = []
for targ in pooltargs:
tsks.append(util.spawn(check_fish(targ)))
tsks.append(util.spawn_task(check_fish(targ)))
while tsks:
done, tsks = await asyncio.wait(tsks, return_when=asyncio.FIRST_COMPLETED)
for dt in done:

View File

@@ -19,6 +19,7 @@
# specification. consoleserver or shellserver would be equally likely
# to use this.
import asyncio
import confluent.exceptions as cexc
import confluent.interface.console as conapi
import confluent.log as log
@@ -84,19 +85,23 @@ class OpenBmcConsole(conapi.Console):
self.datacallback = None
self.nodeconfig = config
self.connected = False
self.recvr = None
async def recvdata(self):
while self.connected:
pendingdata = await self.ws.receive()
if pendingdata.type == aiohttp.WSMsgType.BINARY:
await self.datacallback(pendingdata.data)
continue
elif pendingdata.type == aiohttp.WSMsgType.CLOSE:
await self.datacallback(conapi.ConsoleEvent.Disconnect)
return
else:
print("Unknown response in WSConsoleHandler")
try:
while self.connected:
pendingdata = await self.ws.receive()
if pendingdata.type == aiohttp.WSMsgType.BINARY:
await self.datacallback(pendingdata.data)
continue
elif pendingdata.type == aiohttp.WSMsgType.CLOSE:
await self.datacallback(conapi.ConsoleEvent.Disconnect)
return
else:
print("Unknown response in WSConsoleHandler")
except asyncio.CancelledError:
pass
async def connect(self, callback):
@@ -124,13 +129,16 @@ class OpenBmcConsole(conapi.Console):
self.ws = await self.clisess.ws_connect('wss://{0}/console0'.format(self.bmc), protocols=protos, ssl=self.ssl)
#self.ws.connect('wss://{0}/console0'.format(self.bmc), host=bmc, cookie='XSRF-TOKEN={0}; SESSION={1}'.format(wc.cookies['XSRF-TOKEN'], wc.cookies['SESSION']), subprotocols=[wc.cookies['XSRF-TOKEN']])
self.connected = True
util.spawn(self.recvdata())
self.recvr = util.spawn_task(self.recvdata())
return
async def write(self, data):
await self.ws.send_str(data.decode())
async def close(self):
if self.recvr:
self.recvr.cancel()
self.recvr = None
if self.ws:
await self.ws.close()
self.connected = False

View File

@@ -27,7 +27,6 @@ import re
import socket
import ssl
import struct
import asyncio
import random
import subprocess
@@ -58,21 +57,43 @@ def spawn_after(sleeptime, func, *args):
tsks = {}
tasksitter = None
async def _sit_tasks():
while True:
while not tsks:
await asyncio.sleep(15)
tsk_list = [tsks[x] for x in tsks]
cmpl, pnding = await asyncio.wait(tsk_list, return_when=asyncio.FIRST_COMPLETED, timeout=15)
for tskid in list(tsks):
if tsks[tskid].done():
try:
tsk = tsks[tskid]
del tsks[tskid]
await tsk
except Exception as e:
print(repr(e))
def spawn_task(coro):
try:
return asyncio.create_task(coro)
except AttributeError:
return asyncio.get_event_loop().create_task(coro)
def spawn(coro):
global tasksitter
if not tasksitter:
tasksitter = spawn_task(_sit_tasks())
tskid = random.random()
while tskid in tsks:
tskid = random.random()
tsks[tskid] = 1
try:
tsks[tskid] = asyncio.create_task(_run(coro, tskid), name=repr(coro))
except AttributeError:
tsks[tskid] = asyncio.get_event_loop().create_task(_run(coro, tskid), name=repr(coro))
tsks[tskid] = spawn_task(coro)
return tsks[tskid]
async def _run(coro, taskid):
ret = await coro
del tsks[taskid]
return ret
async def check_output(*cmd):