mirror of
https://github.com/xcat2/confluent.git
synced 2026-01-12 02:52:30 +00:00
Fix a number of issues with async rework
Have util retain tasks that are 'fire and forget', to avoid garbage collection trying to delete the background tasks. Move some utilities explicitly over to asynclient/asynctlvdata that had previously been reworked. Implement terminal resize in new asyncssh backend.
This commit is contained in:
@@ -130,6 +130,15 @@ def print_help():
|
||||
#common with the api document
|
||||
|
||||
|
||||
def writeout(data):
|
||||
while True:
|
||||
try:
|
||||
select.select((), (sys.stdout,), ())
|
||||
sys.stdout.write(data)
|
||||
break
|
||||
except BlockingIOError:
|
||||
continue
|
||||
|
||||
def updatestatus(stateinfo={}):
|
||||
global powerstate, powertime, clearpowermessage
|
||||
status = consolename
|
||||
@@ -149,10 +158,10 @@ def updatestatus(stateinfo={}):
|
||||
if 'state' in stateinfo: # currently only read power means anything
|
||||
newpowerstate = stateinfo['state']['value']
|
||||
if newpowerstate != powerstate and newpowerstate == 'off':
|
||||
sys.stdout.write("\x1b[2J\x1b[;H[powered off]\r\n")
|
||||
writeout("\x1b[2J\x1b[;H[powered off]\r\n")
|
||||
clearpowermessage = True
|
||||
if newpowerstate == 'on' and clearpowermessage:
|
||||
sys.stdout.write("\x1b[2J\x1b[;H")
|
||||
writeout("\x1b[2J\x1b[;H")
|
||||
clearpowermessage = False
|
||||
powerstate = newpowerstate
|
||||
if 'clientcount' in laststate and laststate['clientcount'] != 1:
|
||||
@@ -170,7 +179,7 @@ def updatestatus(stateinfo={}):
|
||||
if info:
|
||||
status += ' [' + ','.join(info) + ']'
|
||||
if os.environ.get('TERM', '') not in ('linux'):
|
||||
sys.stdout.write('\x1b]0;console: %s\x07' % status)
|
||||
writeout('\x1b]0;console: %s\x07' % status)
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
@@ -450,7 +459,7 @@ def do_command(command, server):
|
||||
print_result(res)
|
||||
elif argv[0] == 'start':
|
||||
targpath = fullpath_target(argv[1])
|
||||
nodename = targpath.split('/')[-3]
|
||||
nodename = targpath.split('/')[2]
|
||||
currconsole = targpath
|
||||
startrequest = {'operation': 'start', 'path': targpath,
|
||||
'parameters': {}}
|
||||
@@ -657,9 +666,13 @@ def get_session_node(shellargs):
|
||||
return shellargs[0]
|
||||
if len(shellargs) == 2 and shellargs[0] == 'start':
|
||||
args = [s for s in shellargs[1].split('/') if s]
|
||||
if len(args) == 4 and args[0] == 'nodes' and args[2] == 'console' and \
|
||||
if len(args) == 4 and args[0] == 'nodes':
|
||||
if args[2] == 'console' and \
|
||||
args[3] == 'session':
|
||||
return args[1]
|
||||
return args[1]
|
||||
if args[2] == 'shell' and \
|
||||
args[3] == 'sessions':
|
||||
return args[1]
|
||||
return None
|
||||
|
||||
|
||||
@@ -920,7 +933,7 @@ def main():
|
||||
session_node = get_session_node(shellargs)
|
||||
if session_node is not None:
|
||||
consoleonly = True
|
||||
do_command("start /nodes/%s/console/session" % session_node, netserver)
|
||||
do_command(shellargs, netserver)
|
||||
doexit = True
|
||||
elif shellargs:
|
||||
do_command(shellargs, netserver)
|
||||
@@ -980,14 +993,21 @@ def consume_termdata(fh, bufferonly=False):
|
||||
clearpowermessage = False
|
||||
if bufferonly:
|
||||
return data
|
||||
try:
|
||||
sys.stdout.write(data)
|
||||
except UnicodeEncodeError:
|
||||
sys.stdout.buffer.write(data.encode('utf8'))
|
||||
except IOError: # Some times circumstances are bad
|
||||
# resort to byte at a time...
|
||||
for d in data:
|
||||
sys.stdout.write(d)
|
||||
data = data.encode('utf8')
|
||||
written = False
|
||||
while not written:
|
||||
select.select((), (sys.stdout,), ())
|
||||
try:
|
||||
sys.stdout.buffer.write(data)
|
||||
written = True
|
||||
except BlockingIOError:
|
||||
continue
|
||||
except IOError: # Some times circumstances are bad
|
||||
# resort to byte at a time...
|
||||
raise
|
||||
for d in data:
|
||||
sys.stdout.write(d)
|
||||
written = True
|
||||
now = time.time()
|
||||
if ('showtime' not in laststate or
|
||||
(now // 60) != laststate['showtime'] // 60):
|
||||
|
||||
@@ -35,7 +35,7 @@ path = os.path.realpath(os.path.join(path, '..', 'lib', 'python'))
|
||||
if path.startswith('/opt'):
|
||||
sys.path.append(path)
|
||||
|
||||
import confluent.client as client
|
||||
import confluent.asynclient as client
|
||||
|
||||
async def main():
|
||||
argparser = optparse.OptionParser(
|
||||
|
||||
@@ -34,7 +34,7 @@ path = os.path.realpath(os.path.join(path, '..', 'lib', 'python'))
|
||||
if path.startswith('/opt'):
|
||||
sys.path.append(path)
|
||||
|
||||
import confluent.client as client
|
||||
import confluent.asynclient as client
|
||||
|
||||
async def main():
|
||||
argparser = optparse.OptionParser(
|
||||
|
||||
@@ -30,7 +30,7 @@ import confluent.exceptions as exc
|
||||
import confluent.interface.console as conapi
|
||||
import confluent.log as log
|
||||
import confluent.core as plugin
|
||||
import confluent.tlvdata as tlvdata
|
||||
import confluent.asynctlvdata as tlvdata
|
||||
import confluent.util as util
|
||||
import eventlet
|
||||
import eventlet.event
|
||||
@@ -554,12 +554,12 @@ class ConsoleHandler(object):
|
||||
retdata = await get_buffer_output(nodeid)
|
||||
return retdata, connstate
|
||||
|
||||
def write(self, data):
|
||||
async def write(self, data):
|
||||
if self.connectstate == 'connected':
|
||||
try:
|
||||
if isinstance(data, str) and not isinstance(data, bytes):
|
||||
data = data.encode('utf-8')
|
||||
self._console.write(data)
|
||||
await self._console.write(data)
|
||||
except Exception:
|
||||
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
|
||||
event=log.Events.stacktrace)
|
||||
@@ -661,7 +661,7 @@ class ProxyConsole(object):
|
||||
while data:
|
||||
self.data_handler(data)
|
||||
data = await tlvdata.recv(self.remote)
|
||||
self.remote.close()
|
||||
self.remote[1].close()
|
||||
|
||||
def get_buffer_age(self):
|
||||
# the server sends a buffer age if appropriate, no need to handle
|
||||
@@ -673,17 +673,18 @@ class ProxyConsole(object):
|
||||
self.skipreplay = False
|
||||
return b''
|
||||
|
||||
def write(self, data):
|
||||
async def write(self, data):
|
||||
# Relay data to the collective manager
|
||||
try:
|
||||
tlvdata.send(self.remote, data)
|
||||
except Exception:
|
||||
await tlvdata.send(self.remote, data)
|
||||
except Exception as e:
|
||||
print(repr(e))
|
||||
if self.clisession:
|
||||
self.clisession.detach()
|
||||
await self.clisession.detach()
|
||||
self.clisession = None
|
||||
|
||||
|
||||
def attachsession(self, session):
|
||||
async def attachsession(self, session):
|
||||
self.clisession = session
|
||||
self.data_handler = session.data_handler
|
||||
termreq = {
|
||||
@@ -700,30 +701,25 @@ class ProxyConsole(object):
|
||||
},
|
||||
}
|
||||
try:
|
||||
remote = socket.create_connection((self.managerinfo['address'], 13001))
|
||||
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE,
|
||||
keyfile='/etc/confluent/privkey.pem',
|
||||
certfile='/etc/confluent/srvcert.pem')
|
||||
if not util.cert_matches(self.managerinfo['fingerprint'],
|
||||
remote.getpeercert(binary_form=True)):
|
||||
raise Exception('Invalid peer certificate')
|
||||
except Exception:
|
||||
#await asyncio.sleep(3)
|
||||
remote = await collective.connect_to_collective(None, self.managerinfo['address'])
|
||||
except Exception as e:
|
||||
print(repr(e))
|
||||
await asyncio.sleep(3)
|
||||
if self.clisession:
|
||||
self.clisession.detach()
|
||||
self.detachsession(None)
|
||||
await self.detachsession(None)
|
||||
return
|
||||
tlvdata.recv(remote)
|
||||
tlvdata.recv(remote)
|
||||
tlvdata.send(remote, termreq)
|
||||
await tlvdata.recv(remote)
|
||||
await tlvdata.recv(remote)
|
||||
await tlvdata.send(remote, termreq)
|
||||
self.remote = remote
|
||||
util.spawn(self.relay_data())
|
||||
|
||||
def detachsession(self, session):
|
||||
async def detachsession(self, session):
|
||||
# we will disappear, so just let that happen...
|
||||
if self.remote:
|
||||
try:
|
||||
tlvdata.send(self.remote, {'operation': 'stop'})
|
||||
await tlvdata.send(self.remote, {'operation': 'stop'})
|
||||
except Exception:
|
||||
pass
|
||||
self.clisession = None
|
||||
@@ -840,16 +836,16 @@ class ConsoleSession(object):
|
||||
self._evt = None
|
||||
self.reghdl = None
|
||||
|
||||
def detach(self):
|
||||
async def detach(self):
|
||||
"""Handler for the console handler to detach so it can reattach,
|
||||
currently to facilitate changing from one collective.manager to
|
||||
another
|
||||
|
||||
:return:
|
||||
"""
|
||||
self.conshdl.detachsession(self)
|
||||
await self.conshdl.detachsession(self)
|
||||
self.connect_session()
|
||||
self.conshdl.attachsession(self)
|
||||
await self.conshdl.attachsession(self)
|
||||
self.write = self.conshdl.write
|
||||
|
||||
def got_data(self, data):
|
||||
|
||||
@@ -991,7 +991,7 @@ async def handle_node_request(configmanager, inputdata, operation,
|
||||
try:
|
||||
nodeorrange = pathcomponents[1]
|
||||
if not isnoderange and not configmanager.is_node(nodeorrange):
|
||||
raise exc.NotFoundException("Invalid Node")
|
||||
raise exc.NotFoundException(f'Invalid Node: {repr(pathcomponents)}')
|
||||
if isnoderange and not (len(pathcomponents) == 3 and
|
||||
pathcomponents[2] == 'abbreviate'):
|
||||
try:
|
||||
@@ -1116,7 +1116,7 @@ async def handle_node_request(configmanager, inputdata, operation,
|
||||
numworkers = 0
|
||||
for hfunc in nodesbyhandler:
|
||||
numworkers += 1
|
||||
asyncio.create_task(addtoqueue(passvalues, hfunc, {'nodes': nodesbyhandler[hfunc],
|
||||
util.spawn(addtoqueue(passvalues, hfunc, {'nodes': nodesbyhandler[hfunc],
|
||||
'element': pathcomponents,
|
||||
'configmanager': configmanager,
|
||||
'inputdata': _get_input_data(_plugin, pathcomponents,
|
||||
@@ -1124,7 +1124,7 @@ async def handle_node_request(configmanager, inputdata, operation,
|
||||
isnoderange, configmanager)}))
|
||||
for manager in nodesbymanager:
|
||||
numworkers += 1
|
||||
asyncio.create_task(addtoqueue(passvalues, dispatch_request, {
|
||||
util.spawn(addtoqueue(passvalues, dispatch_request, {
|
||||
'nodes': nodesbymanager[manager], 'manager': manager,
|
||||
'element': pathcomponents, 'configmanager': configmanager,
|
||||
'inputdata': inputdata, 'operation': operation, 'isnoderange': isnoderange}))
|
||||
|
||||
@@ -90,7 +90,7 @@ class SshShell(conapi.Console):
|
||||
if not self.connected:
|
||||
return
|
||||
# asyncssh channel has change_terminal_size, hurray
|
||||
self.shell.resize_pty(width=width, height=height)
|
||||
self.shell[0].channel.change_terminal_size(width=width, height=height)
|
||||
|
||||
async def recvdata(self):
|
||||
while self.connected:
|
||||
@@ -142,8 +142,8 @@ class SshShell(conapi.Console):
|
||||
self.datacallback('\r\nEnter "disconnect" or "accept": ')
|
||||
return
|
||||
except Exception as e:
|
||||
self.ssh.close()
|
||||
self.ssh.close()
|
||||
if self.ssh:
|
||||
self.ssh.close()
|
||||
self.inputmode = 0
|
||||
self.username = b''
|
||||
self.password = b''
|
||||
@@ -158,7 +158,7 @@ class SshShell(conapi.Console):
|
||||
# height=self.height)
|
||||
self.rxthread = util.spawn(self.recvdata())
|
||||
|
||||
def write(self, data):
|
||||
async def write(self, data):
|
||||
if self.inputmode == -2:
|
||||
self.datacallback(conapi.ConsoleEvent.Disconnect)
|
||||
return
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
import confluent.consoleserver as consoleserver
|
||||
import confluent.exceptions as exc
|
||||
import confluent.messages as msg
|
||||
import confluent.util as util
|
||||
activesessions = {}
|
||||
|
||||
|
||||
@@ -47,9 +48,12 @@ class _ShellHandler(consoleserver.ConsoleHandler):
|
||||
|
||||
def _got_disconnected(self):
|
||||
self.connectstate = 'closed'
|
||||
self._send_rcpts({'connectstate': self.connectstate})
|
||||
util.spawn(self._bgdisconnect())
|
||||
|
||||
async def _bgdisconnect(self):
|
||||
await self._send_rcpts({'connectstate': self.connectstate})
|
||||
for session in list(self.livesessions):
|
||||
session.destroy()
|
||||
await session.destroy()
|
||||
|
||||
|
||||
|
||||
@@ -119,7 +123,7 @@ class ShellSession(consoleserver.ConsoleSession):
|
||||
|
||||
async def destroy(self):
|
||||
try:
|
||||
activesessions[(self.configmanager.tenant, self.node,
|
||||
await activesessions[(self.configmanager.tenant, self.node,
|
||||
self.username)][self.sessionid].close()
|
||||
del activesessions[(self.configmanager.tenant, self.node,
|
||||
self.username)][self.sessionid]
|
||||
|
||||
@@ -316,10 +316,13 @@ async def term_interact(authdata, authname, ccons, cfm, connection, consession,
|
||||
if bufferage is not False:
|
||||
send_data(connection, {'bufferage': bufferage})
|
||||
while consession is not None:
|
||||
data = await tlvdata.recv(connection)
|
||||
try:
|
||||
data = await tlvdata.recv(connection)
|
||||
except Exception:
|
||||
data = None
|
||||
if type(data) == dict:
|
||||
if data['operation'] == 'stop':
|
||||
consession.destroy()
|
||||
await consession.destroy()
|
||||
break
|
||||
elif data['operation'] == 'break':
|
||||
consession.send_break()
|
||||
@@ -350,10 +353,12 @@ async def term_interact(authdata, authname, ccons, cfm, connection, consession,
|
||||
'error': 'Unexpected error - ' + str(e)})
|
||||
await send_data(connection, {'_requestdone': 1})
|
||||
continue
|
||||
if not data:
|
||||
consession.destroy()
|
||||
if not consession:
|
||||
break
|
||||
consession.write(data)
|
||||
if not data:
|
||||
await consession.destroy()
|
||||
break
|
||||
await consession.write(data)
|
||||
connection.close()
|
||||
|
||||
|
||||
@@ -484,7 +489,8 @@ async def _unixdomainhandler():
|
||||
except KeyError:
|
||||
cnn.close()
|
||||
return
|
||||
asyncio.create_task(sessionhdl(cnn, authname, skipauth))
|
||||
util.spawn(sessionhdl(cnn, authname, skipauth))
|
||||
#asyncio.create_task(sessionhdl(cnn, authname, skipauth))
|
||||
|
||||
|
||||
class SockApi(object):
|
||||
|
||||
@@ -28,6 +28,7 @@ import ssl
|
||||
import struct
|
||||
import eventlet.green.subprocess as subprocess
|
||||
import asyncio
|
||||
import random
|
||||
|
||||
|
||||
def mkdirp(path, mode=0o777):
|
||||
@@ -48,12 +49,24 @@ def spawn_after(sleeptime, func, *args):
|
||||
raise Exception('tf')
|
||||
return spawn(_sleep_and_run(sleeptime, func, args))
|
||||
|
||||
tsks = {}
|
||||
|
||||
def spawn(coro):
|
||||
tskid = random.random()
|
||||
while tskid in tsks:
|
||||
tskid = random.random()
|
||||
tsks[tskid] = 1
|
||||
try:
|
||||
return asyncio.create_task(coro)
|
||||
tsks[tskid] = asyncio.create_task(_run(coro, tskid))
|
||||
except AttributeError:
|
||||
return asyncio.get_event_loop().create_task(coro)
|
||||
tsks[tskid] = asyncio.get_event_loop().create_task(_run(coro, tskid))
|
||||
return tsks[tskid]
|
||||
|
||||
async def _run(coro, taskid):
|
||||
ret = await coro
|
||||
del tsks[taskid]
|
||||
return ret
|
||||
|
||||
|
||||
def run(cmd):
|
||||
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
|
||||
Reference in New Issue
Block a user