diff --git a/confluent_client/confluent/tlvdata.py b/confluent_client/confluent/tlvdata.py index 7fcb663b..26e12ca0 100644 --- a/confluent_client/confluent/tlvdata.py +++ b/confluent_client/confluent/tlvdata.py @@ -19,12 +19,8 @@ import array import ctypes import ctypes.util import confluent.tlv as tlv -try: - import eventlet.green.socket as socket - import eventlet.green.select as select -except ImportError: - import socket - import select +import socket +import select from datetime import datetime import json import os diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index 2442d4fa..114ea25c 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -32,14 +32,7 @@ import confluent.log as log import confluent.core as plugin import confluent.asynctlvdata as tlvdata import confluent.util as util -import eventlet -import eventlet.event -import eventlet.green.os as os import socket -import eventlet.green.subprocess as subprocess -import eventlet.green.ssl as ssl -import eventlet.semaphore as semaphore -import fcntl import random import struct import time @@ -351,7 +344,6 @@ class ConsoleHandler(object): self._plugin_path.format(self.node), "create", self.cfgmgr) async for cns in consoles: - print(repr(cns)) self._console = cns except (exc.NotImplementedException, exc.NotFoundException): self._console = None @@ -666,7 +658,7 @@ class ProxyConsole(object): async def relay_data(self): data = await tlvdata.recv(self.remote) while data: - self.data_handler(data) + await self.data_handler(data) data = await tlvdata.recv(self.remote) self.remote[1].close() @@ -675,7 +667,7 @@ class ProxyConsole(object): # it explicitly in the proxy instance return False - def get_recent(self): + async def get_recent(self): # Again, delegate this to the remote collective member self.skipreplay = False return b'' @@ -686,6 +678,7 @@ class ProxyConsole(object): await tlvdata.send(self.remote, data) except Exception as e: print(repr(e)) + raise if self.clisession: await self.clisession.detach() self.clisession = None @@ -713,7 +706,7 @@ class ProxyConsole(object): print(repr(e)) await asyncio.sleep(3) if self.clisession: - self.clisession.detach() + await self.clisession.detach() await self.detachsession(None) return await tlvdata.recv(remote) @@ -759,8 +752,7 @@ class ConsoleSession(object): :param configmanager: A configuration manager object for current context :param username: Username for which this session object will operate :param datacallback: An asynchronous data handler, to be called when data - is available. Note that if passed, it makes - 'get_next_output' non-functional + is available. :param skipreplay: If true, will skip the attempt to redraw the screen """ @@ -790,7 +782,7 @@ class ConsoleSession(object): self.databuffer = collections.deque([]) self.data_handler = self.got_data if not skipreplay: - self.databuffer.extend(self.conshdl.get_recent()) + self.databuffer.extend(await self.conshdl.get_recent()) else: self.data_handler = datacallback if not skipreplay: @@ -867,32 +859,3 @@ class ConsoleSession(object): if self._evt: self._evt.send() self._evt = None - - def get_next_output(self, timeout=45): - """Poll for next available output on this console. - - Ideally purely event driven scheme is perfect. AJAX over HTTP is - at least one case where we don't have that luxury. This function - will not work if the session was initialized with a data callback - instead of polling mode. - """ - self.reaper.cancel() - # postpone death to be 15 seconds after this would timeout - self.reaper = util.spawn_after(timeout + 15, self.destroy) - if self._evt: - raise Exception('get_next_output is not re-entrant') - if not self.databuffer: - self._evt = eventlet.event.Event() - with eventlet.Timeout(timeout, False): - self._evt.wait() - self._evt = None - if not self.databuffer: - return "" - currdata = self.databuffer.popleft() - if isinstance(currdata, dict): - return currdata - retval = currdata - while self.databuffer and not isinstance(self.databuffer[0], dict): - retval += self.databuffer.popleft() - - return retval diff --git a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py index 1a854554..db6ab48c 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py @@ -22,19 +22,12 @@ import confluent.messages as msg import confluent.util as util import copy import errno -#import eventlet -#import eventlet.event -#import eventlet.green.threading as threading -#import eventlet.greenpool as greenpool -#import eventlet.queue as queue -#import eventlet.support.greendns from fnmatch import fnmatch import os import pwd import aiohmi.constants as pygconstants import aiohmi.exceptions as pygexc import aiohmi.storage as storage -#console = eventlet.import_patched('pyghmi.ipmi.console') import aiohmi.ipmi.console as console import aiohmi.ipmi.command as ipmicommand import socket @@ -368,10 +361,12 @@ class IpmiConsole(conapi.Console): self.error = "closed" async def write(self, data): - await self.solconnection.send_data(data) + if self.solconnection: + await self.solconnection.send_data(data) async def send_break(self): - await self.solconnection.send_break() + if self.solconnection: + await self.solconnection.send_break() async def perform_requests(operator, nodes, element, cfg, inputdata, realop): @@ -407,7 +402,7 @@ async def perform_requests(operator, nodes, element, cfg, inputdata, realop): except asyncio.QueueEmpty: pass except asyncio.TimeoutError: - print("odd timeout?") + print("odd timeout?" + repr(element) + repr(nodes)) pass finally: for datum in sorted( diff --git a/confluent_server/confluent/shellserver.py b/confluent_server/confluent/shellserver.py index 6e66c7b2..26ad4682 100644 --- a/confluent_server/confluent/shellserver.py +++ b/confluent_server/confluent/shellserver.py @@ -91,8 +91,7 @@ class ShellSession(consoleserver.ConsoleSession): :param configmanager: A configuration manager object for current context :param username: Username for which this session object will operate :param datacallback: An asynchronous data handler, to be called when data - is available. Note that if passed, it makes - 'get_next_output' non-functional + is available. :param skipreplay: If true, will skip the attempt to redraw the screen :param sessionid: An optional identifier to match a running session or customize the name of a new session. diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 9bce9d42..5522fc58 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -314,7 +314,7 @@ async def term_interact(authdata, authname, ccons, cfm, connection, consession, await ccons.startsending() bufferage = consession.get_buffer_age() if bufferage is not False: - send_data(connection, {'bufferage': bufferage}) + await send_data(connection, {'bufferage': bufferage}) while consession is not None: try: data = await tlvdata.recv(connection)