2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-28 19:37:45 +00:00

Fix proxy console through collective in async

This commit is contained in:
Jarrod Johnson
2024-05-30 16:14:39 -04:00
parent 00eff4a002
commit 85c8268ad8
5 changed files with 15 additions and 62 deletions

View File

@@ -19,12 +19,8 @@ import array
import ctypes
import ctypes.util
import confluent.tlv as tlv
try:
import eventlet.green.socket as socket
import eventlet.green.select as select
except ImportError:
import socket
import select
import socket
import select
from datetime import datetime
import json
import os

View File

@@ -32,14 +32,7 @@ import confluent.log as log
import confluent.core as plugin
import confluent.asynctlvdata as tlvdata
import confluent.util as util
import eventlet
import eventlet.event
import eventlet.green.os as os
import socket
import eventlet.green.subprocess as subprocess
import eventlet.green.ssl as ssl
import eventlet.semaphore as semaphore
import fcntl
import random
import struct
import time
@@ -351,7 +344,6 @@ class ConsoleHandler(object):
self._plugin_path.format(self.node),
"create", self.cfgmgr)
async for cns in consoles:
print(repr(cns))
self._console = cns
except (exc.NotImplementedException, exc.NotFoundException):
self._console = None
@@ -666,7 +658,7 @@ class ProxyConsole(object):
async def relay_data(self):
data = await tlvdata.recv(self.remote)
while data:
self.data_handler(data)
await self.data_handler(data)
data = await tlvdata.recv(self.remote)
self.remote[1].close()
@@ -675,7 +667,7 @@ class ProxyConsole(object):
# it explicitly in the proxy instance
return False
def get_recent(self):
async def get_recent(self):
# Again, delegate this to the remote collective member
self.skipreplay = False
return b''
@@ -686,6 +678,7 @@ class ProxyConsole(object):
await tlvdata.send(self.remote, data)
except Exception as e:
print(repr(e))
raise
if self.clisession:
await self.clisession.detach()
self.clisession = None
@@ -713,7 +706,7 @@ class ProxyConsole(object):
print(repr(e))
await asyncio.sleep(3)
if self.clisession:
self.clisession.detach()
await self.clisession.detach()
await self.detachsession(None)
return
await tlvdata.recv(remote)
@@ -759,8 +752,7 @@ class ConsoleSession(object):
:param configmanager: A configuration manager object for current context
:param username: Username for which this session object will operate
:param datacallback: An asynchronous data handler, to be called when data
is available. Note that if passed, it makes
'get_next_output' non-functional
is available.
:param skipreplay: If true, will skip the attempt to redraw the screen
"""
@@ -790,7 +782,7 @@ class ConsoleSession(object):
self.databuffer = collections.deque([])
self.data_handler = self.got_data
if not skipreplay:
self.databuffer.extend(self.conshdl.get_recent())
self.databuffer.extend(await self.conshdl.get_recent())
else:
self.data_handler = datacallback
if not skipreplay:
@@ -867,32 +859,3 @@ class ConsoleSession(object):
if self._evt:
self._evt.send()
self._evt = None
def get_next_output(self, timeout=45):
"""Poll for next available output on this console.
Ideally purely event driven scheme is perfect. AJAX over HTTP is
at least one case where we don't have that luxury. This function
will not work if the session was initialized with a data callback
instead of polling mode.
"""
self.reaper.cancel()
# postpone death to be 15 seconds after this would timeout
self.reaper = util.spawn_after(timeout + 15, self.destroy)
if self._evt:
raise Exception('get_next_output is not re-entrant')
if not self.databuffer:
self._evt = eventlet.event.Event()
with eventlet.Timeout(timeout, False):
self._evt.wait()
self._evt = None
if not self.databuffer:
return ""
currdata = self.databuffer.popleft()
if isinstance(currdata, dict):
return currdata
retval = currdata
while self.databuffer and not isinstance(self.databuffer[0], dict):
retval += self.databuffer.popleft()
return retval

View File

@@ -22,19 +22,12 @@ import confluent.messages as msg
import confluent.util as util
import copy
import errno
#import eventlet
#import eventlet.event
#import eventlet.green.threading as threading
#import eventlet.greenpool as greenpool
#import eventlet.queue as queue
#import eventlet.support.greendns
from fnmatch import fnmatch
import os
import pwd
import aiohmi.constants as pygconstants
import aiohmi.exceptions as pygexc
import aiohmi.storage as storage
#console = eventlet.import_patched('pyghmi.ipmi.console')
import aiohmi.ipmi.console as console
import aiohmi.ipmi.command as ipmicommand
import socket
@@ -368,10 +361,12 @@ class IpmiConsole(conapi.Console):
self.error = "closed"
async def write(self, data):
await self.solconnection.send_data(data)
if self.solconnection:
await self.solconnection.send_data(data)
async def send_break(self):
await self.solconnection.send_break()
if self.solconnection:
await self.solconnection.send_break()
async def perform_requests(operator, nodes, element, cfg, inputdata, realop):
@@ -407,7 +402,7 @@ async def perform_requests(operator, nodes, element, cfg, inputdata, realop):
except asyncio.QueueEmpty:
pass
except asyncio.TimeoutError:
print("odd timeout?")
print("odd timeout?" + repr(element) + repr(nodes))
pass
finally:
for datum in sorted(

View File

@@ -91,8 +91,7 @@ class ShellSession(consoleserver.ConsoleSession):
:param configmanager: A configuration manager object for current context
:param username: Username for which this session object will operate
:param datacallback: An asynchronous data handler, to be called when data
is available. Note that if passed, it makes
'get_next_output' non-functional
is available.
:param skipreplay: If true, will skip the attempt to redraw the screen
:param sessionid: An optional identifier to match a running session or
customize the name of a new session.

View File

@@ -314,7 +314,7 @@ async def term_interact(authdata, authname, ccons, cfm, connection, consession,
await ccons.startsending()
bufferage = consession.get_buffer_age()
if bufferage is not False:
send_data(connection, {'bufferage': bufferage})
await send_data(connection, {'bufferage': bufferage})
while consession is not None:
try:
data = await tlvdata.recv(connection)