diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py index 7294112b..279a7fcb 100644 --- a/confluent_server/confluent/asynchttp.py +++ b/confluent_server/confluent/asynchttp.py @@ -90,6 +90,7 @@ class AsyncSession(object): async def run_handler(self, handler, requestid): try: + # iterate_responses from core maybe? handler might return other stuff handler = await handler async for rsp in handler: await self.add(requestid, rsp) diff --git a/confluent_server/confluent/discovery/protocols/ssdp.py b/confluent_server/confluent/discovery/protocols/ssdp.py index 26903df4..1205726d 100644 --- a/confluent_server/confluent/discovery/protocols/ssdp.py +++ b/confluent_server/confluent/discovery/protocols/ssdp.py @@ -468,7 +468,7 @@ async def _find_service(service, target): # pooltargs.append(('/redfish/v1/', peerdata[nid])) tsks = [] for targ in pooltargs: - tsks.append(util.spawn(check_fish(targ))) + tsks.append(util.spawn_task(check_fish(targ))) while tsks: done, tsks = await asyncio.wait(tsks, return_when=asyncio.FIRST_COMPLETED) for dt in done: diff --git a/confluent_server/confluent/plugins/console/openbmc.py b/confluent_server/confluent/plugins/console/openbmc.py index f3cf1c07..ad6c0fbd 100644 --- a/confluent_server/confluent/plugins/console/openbmc.py +++ b/confluent_server/confluent/plugins/console/openbmc.py @@ -19,6 +19,7 @@ # specification. consoleserver or shellserver would be equally likely # to use this. +import asyncio import confluent.exceptions as cexc import confluent.interface.console as conapi import confluent.log as log @@ -84,19 +85,23 @@ class OpenBmcConsole(conapi.Console): self.datacallback = None self.nodeconfig = config self.connected = False + self.recvr = None async def recvdata(self): - while self.connected: - pendingdata = await self.ws.receive() - if pendingdata.type == aiohttp.WSMsgType.BINARY: - await self.datacallback(pendingdata.data) - continue - elif pendingdata.type == aiohttp.WSMsgType.CLOSE: - await self.datacallback(conapi.ConsoleEvent.Disconnect) - return - else: - print("Unknown response in WSConsoleHandler") + try: + while self.connected: + pendingdata = await self.ws.receive() + if pendingdata.type == aiohttp.WSMsgType.BINARY: + await self.datacallback(pendingdata.data) + continue + elif pendingdata.type == aiohttp.WSMsgType.CLOSE: + await self.datacallback(conapi.ConsoleEvent.Disconnect) + return + else: + print("Unknown response in WSConsoleHandler") + except asyncio.CancelledError: + pass async def connect(self, callback): @@ -124,13 +129,16 @@ class OpenBmcConsole(conapi.Console): self.ws = await self.clisess.ws_connect('wss://{0}/console0'.format(self.bmc), protocols=protos, ssl=self.ssl) #self.ws.connect('wss://{0}/console0'.format(self.bmc), host=bmc, cookie='XSRF-TOKEN={0}; SESSION={1}'.format(wc.cookies['XSRF-TOKEN'], wc.cookies['SESSION']), subprotocols=[wc.cookies['XSRF-TOKEN']]) self.connected = True - util.spawn(self.recvdata()) + self.recvr = util.spawn_task(self.recvdata()) return async def write(self, data): await self.ws.send_str(data.decode()) async def close(self): + if self.recvr: + self.recvr.cancel() + self.recvr = None if self.ws: await self.ws.close() self.connected = False diff --git a/confluent_server/confluent/util.py b/confluent_server/confluent/util.py index 5db7c424..4c566ea1 100644 --- a/confluent_server/confluent/util.py +++ b/confluent_server/confluent/util.py @@ -27,7 +27,6 @@ import re import socket import ssl import struct -import asyncio import random import subprocess @@ -58,21 +57,43 @@ def spawn_after(sleeptime, func, *args): tsks = {} + +tasksitter = None + + +async def _sit_tasks(): + while True: + while not tsks: + await asyncio.sleep(15) + tsk_list = [tsks[x] for x in tsks] + cmpl, pnding = await asyncio.wait(tsk_list, return_when=asyncio.FIRST_COMPLETED, timeout=15) + for tskid in list(tsks): + if tsks[tskid].done(): + try: + tsk = tsks[tskid] + del tsks[tskid] + await tsk + except Exception as e: + print(repr(e)) + + +def spawn_task(coro): + try: + return asyncio.create_task(coro) + except AttributeError: + return asyncio.get_event_loop().create_task(coro) + + def spawn(coro): + global tasksitter + if not tasksitter: + tasksitter = spawn_task(_sit_tasks()) tskid = random.random() while tskid in tsks: tskid = random.random() - tsks[tskid] = 1 - try: - tsks[tskid] = asyncio.create_task(_run(coro, tskid), name=repr(coro)) - except AttributeError: - tsks[tskid] = asyncio.get_event_loop().create_task(_run(coro, tskid), name=repr(coro)) + tsks[tskid] = spawn_task(coro) return tsks[tskid] -async def _run(coro, taskid): - ret = await coro - del tsks[taskid] - return ret async def check_output(*cmd):