2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-28 19:37:45 +00:00

Bugfix and rework consoleserver a bit

Fix incorrect syntax in ssh.py, and correct direct asyncio
call of sock_recv when it must be called on the loop.
This commit is contained in:
Jarrod Johnson
2024-03-25 15:18:05 -04:00
parent b1cd7bcd98
commit 668c5af261
4 changed files with 29 additions and 13 deletions

View File

@@ -289,7 +289,7 @@ async def recv(handle):
else:
data = await cloop.sock_recv(handle, dlen)
while len(data) < dlen:
ndata = await asyncio.sock_recv(handle, dlen - len(data))
ndata = await cloop.sock_recv(handle, dlen - len(data))
if not ndata:
raise Exception("Error reading data")
data += ndata

View File

@@ -171,7 +171,7 @@ class ConsoleHandler(object):
self.check_isondemand()
if not self._isondemand:
self.connectstate = 'connecting'
eventlet.spawn(self._connect)
self._connect()
def resize(self, width, height):
return None
@@ -275,7 +275,7 @@ class ConsoleHandler(object):
if logvalue in ('none', 'memory'):
self._dologging = False
if not self._isondemand or self.livesessions:
eventlet.spawn(self._connect)
self._connect()
def log(self, *args, **kwargs):
if not self._dologging:
@@ -330,9 +330,9 @@ class ConsoleHandler(object):
if self.connectionthread:
self.connectionthread.kill()
self.connectionthread = None
self.connectionthread = eventlet.spawn(self._connect_backend)
self.connectionthread = util.spawn(self._connect_backend())
def _connect_backend(self):
async def _connect_backend(self):
if self._console:
self._console.close()
self._console = None
@@ -345,7 +345,7 @@ class ConsoleHandler(object):
'not configured,\r\nset it to a valid value for console '
'function')
try:
self._console = list(plugin.handle_path(
self._console = list(await plugin.handle_path(
self._plugin_path.format(self.node),
"create", self.cfgmgr))[0]
except (exc.NotImplementedException, exc.NotFoundException):
@@ -396,7 +396,7 @@ class ConsoleHandler(object):
'error': self.error})
retrytime = self._get_retry_time()
if not self.reconnect:
self.reconnect = eventlet.spawn_after(retrytime, self._connect)
self.reconnect = util.spawn_after(retrytime, self._connect)
return
except (exc.TargetEndpointUnreachable, socket.gaierror) as se:
self.clearbuffer()
@@ -406,7 +406,7 @@ class ConsoleHandler(object):
'error': self.error})
retrytime = self._get_retry_time()
if not self.reconnect:
self.reconnect = eventlet.spawn_after(retrytime, self._connect)
self.reconnect = util.spawn_after(retrytime, self._connect)
return
except Exception:
self.clearbuffer()
@@ -418,7 +418,7 @@ class ConsoleHandler(object):
'error': self.error})
retrytime = self._get_retry_time()
if not self.reconnect:
self.reconnect = eventlet.spawn_after(retrytime, self._connect)
self.reconnect = util.spawn_after(retrytime, self._connect)
return
self._got_connected()
@@ -482,7 +482,7 @@ class ConsoleHandler(object):
self.reconnect.cancel()
self.reconnect = None
self.connectstate = 'connecting'
eventlet.spawn(self._connect)
self._connect()
@@ -778,7 +778,7 @@ class ConsoleSession(object):
self.node = node
self.write = self.conshdl.write
if datacallback is None:
self.reaper = eventlet.spawn_after(15, self.destroy)
self.reaper = util.spawn_after(15, self.destroy)
self.databuffer = collections.deque([])
self.data_handler = self.got_data
if not skipreplay:
@@ -870,7 +870,7 @@ class ConsoleSession(object):
"""
self.reaper.cancel()
# postpone death to be 15 seconds after this would timeout
self.reaper = eventlet.spawn_after(timeout + 15, self.destroy)
self.reaper = util.spawn_after(timeout + 15, self.destroy)
if self._evt:
raise Exception('get_next_output is not re-entrant')
if not self.databuffer:

View File

@@ -107,7 +107,7 @@ class SshShell(conapi.Console):
# that would rather not use the nodename as anything but an opaque
# identifier
self.datacallback = callback
if self.username is not b'':
if self.username != b'':
self.logon()
else:
self.inputmode = 0

View File

@@ -27,6 +27,7 @@ import socket
import ssl
import struct
import eventlet.green.subprocess as subprocess
import asyncio
def mkdirp(path, mode=0o777):
@@ -37,6 +38,21 @@ def mkdirp(path, mode=0o777):
raise
async def _sleep_and_run(sleeptime, func, args):
await asyncio.sleep(sleeptime)
func(*args)
def spawn_after(sleeptime, func, *args):
return spawn(_sleep_and_run(sleeptime, func, args))
def spawn(coro):
try:
return asyncio.create_task(coro)
except AttributeError:
return asyncio.get_event_loop().create_task(coro)
def run(cmd):
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()