diff --git a/confluent_client/confluent/tlvdata.py b/confluent_client/confluent/tlvdata.py index f865aa22..158028b5 100644 --- a/confluent_client/confluent/tlvdata.py +++ b/confluent_client/confluent/tlvdata.py @@ -289,7 +289,7 @@ async def recv(handle): else: data = await cloop.sock_recv(handle, dlen) while len(data) < dlen: - ndata = await asyncio.sock_recv(handle, dlen - len(data)) + ndata = await cloop.sock_recv(handle, dlen - len(data)) if not ndata: raise Exception("Error reading data") data += ndata diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index aa05b9b7..4ccfb8fd 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -171,7 +171,7 @@ class ConsoleHandler(object): self.check_isondemand() if not self._isondemand: self.connectstate = 'connecting' - eventlet.spawn(self._connect) + self._connect() def resize(self, width, height): return None @@ -275,7 +275,7 @@ class ConsoleHandler(object): if logvalue in ('none', 'memory'): self._dologging = False if not self._isondemand or self.livesessions: - eventlet.spawn(self._connect) + self._connect() def log(self, *args, **kwargs): if not self._dologging: @@ -330,9 +330,9 @@ class ConsoleHandler(object): if self.connectionthread: self.connectionthread.kill() self.connectionthread = None - self.connectionthread = eventlet.spawn(self._connect_backend) + self.connectionthread = util.spawn(self._connect_backend()) - def _connect_backend(self): + async def _connect_backend(self): if self._console: self._console.close() self._console = None @@ -345,7 +345,7 @@ class ConsoleHandler(object): 'not configured,\r\nset it to a valid value for console ' 'function') try: - self._console = list(plugin.handle_path( + self._console = list(await plugin.handle_path( self._plugin_path.format(self.node), "create", self.cfgmgr))[0] except (exc.NotImplementedException, exc.NotFoundException): @@ -396,7 +396,7 @@ class ConsoleHandler(object): 'error': self.error}) retrytime = self._get_retry_time() if not self.reconnect: - self.reconnect = eventlet.spawn_after(retrytime, self._connect) + self.reconnect = util.spawn_after(retrytime, self._connect) return except (exc.TargetEndpointUnreachable, socket.gaierror) as se: self.clearbuffer() @@ -406,7 +406,7 @@ class ConsoleHandler(object): 'error': self.error}) retrytime = self._get_retry_time() if not self.reconnect: - self.reconnect = eventlet.spawn_after(retrytime, self._connect) + self.reconnect = util.spawn_after(retrytime, self._connect) return except Exception: self.clearbuffer() @@ -418,7 +418,7 @@ class ConsoleHandler(object): 'error': self.error}) retrytime = self._get_retry_time() if not self.reconnect: - self.reconnect = eventlet.spawn_after(retrytime, self._connect) + self.reconnect = util.spawn_after(retrytime, self._connect) return self._got_connected() @@ -482,7 +482,7 @@ class ConsoleHandler(object): self.reconnect.cancel() self.reconnect = None self.connectstate = 'connecting' - eventlet.spawn(self._connect) + self._connect() @@ -778,7 +778,7 @@ class ConsoleSession(object): self.node = node self.write = self.conshdl.write if datacallback is None: - self.reaper = eventlet.spawn_after(15, self.destroy) + self.reaper = util.spawn_after(15, self.destroy) self.databuffer = collections.deque([]) self.data_handler = self.got_data if not skipreplay: @@ -870,7 +870,7 @@ class ConsoleSession(object): """ self.reaper.cancel() # postpone death to be 15 seconds after this would timeout - self.reaper = eventlet.spawn_after(timeout + 15, self.destroy) + self.reaper = util.spawn_after(timeout + 15, self.destroy) if self._evt: raise Exception('get_next_output is not re-entrant') if not self.databuffer: diff --git a/confluent_server/confluent/plugins/shell/ssh.py b/confluent_server/confluent/plugins/shell/ssh.py index 98f98605..08199a12 100644 --- a/confluent_server/confluent/plugins/shell/ssh.py +++ b/confluent_server/confluent/plugins/shell/ssh.py @@ -107,7 +107,7 @@ class SshShell(conapi.Console): # that would rather not use the nodename as anything but an opaque # identifier self.datacallback = callback - if self.username is not b'': + if self.username != b'': self.logon() else: self.inputmode = 0 diff --git a/confluent_server/confluent/util.py b/confluent_server/confluent/util.py index 96d2291b..fe7b2638 100644 --- a/confluent_server/confluent/util.py +++ b/confluent_server/confluent/util.py @@ -27,6 +27,7 @@ import socket import ssl import struct import eventlet.green.subprocess as subprocess +import asyncio def mkdirp(path, mode=0o777): @@ -37,6 +38,21 @@ def mkdirp(path, mode=0o777): raise +async def _sleep_and_run(sleeptime, func, args): + await asyncio.sleep(sleeptime) + func(*args) + + +def spawn_after(sleeptime, func, *args): + return spawn(_sleep_and_run(sleeptime, func, args)) + + +def spawn(coro): + try: + return asyncio.create_task(coro) + except AttributeError: + return asyncio.get_event_loop().create_task(coro) + def run(cmd): process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate()