diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index 9303d5b6..501ad21d 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -309,7 +309,7 @@ class ConsoleHandler(object): self.log( logdata='console disconnected', ltype=log.DataTypes.event, event=log.Events.consoledisconnect) - self._console.close() + await self._console.close() self._console = None self.connectstate = 'unconnected' await self._send_rcpts({'connectstate': self.connectstate}) @@ -455,10 +455,10 @@ class ConsoleHandler(object): self.cfgmgr.remove_watcher(self._attribwatcher) self._attribwatcher = None - def get_console_output(self, data): + async def get_console_output(self, data): # Spawn as a greenthread, return control as soon as possible # to the console object - util.spawn(self._handle_console_output(data)) + await self._handle_console_output(data) async def attachsession(self, session): edata = 1 diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 1de34a79..d7215157 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -648,10 +648,13 @@ def show_user(name, configmanager): -def stripnode(iterablersp, node): - for i in iterablersp: +async def stripnode(iterablersp, node): + async for i in iterablersp: if i is None: raise exc.NotImplementedException("Not Implemented") + if isinstance(i, console.Console): + yield i + continue i.strip_node(node) yield i @@ -1071,8 +1074,6 @@ async def handle_node_request(configmanager, inputdata, operation, inputdata=msginputdata) if isnoderange: return passvalue - elif isinstance(passvalue, console.Console): - return [passvalue] else: return stripnode(passvalue, nodes[0]) elif 'pluginattrs' in plugroute: diff --git a/confluent_server/confluent/plugins/console/openbmc.py b/confluent_server/confluent/plugins/console/openbmc.py index db1ca309..f3cf1c07 100644 --- a/confluent_server/confluent/plugins/console/openbmc.py +++ b/confluent_server/confluent/plugins/console/openbmc.py @@ -90,10 +90,10 @@ class OpenBmcConsole(conapi.Console): while self.connected: pendingdata = await self.ws.receive() if pendingdata.type == aiohttp.WSMsgType.BINARY: - self.datacallback(pendingdata.data) + await self.datacallback(pendingdata.data) continue elif pendingdata.type == aiohttp.WSMsgType.CLOSE: - self.datacallback(conapi.ConsoleEvent.Disconnect) + await self.datacallback(conapi.ConsoleEvent.Disconnect) return else: print("Unknown response in WSConsoleHandler") diff --git a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py index 99552ae0..74c73362 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py @@ -311,7 +311,7 @@ class IpmiConsole(conapi.Console): except KeyError: pass - def handle_data(self, data): + async def handle_data(self, data): if type(data) == dict: if 'error' in data: self.solconnection = None @@ -319,9 +319,9 @@ class IpmiConsole(conapi.Console): self.error = data['error'] if self.connected: self.connected = False - self.datacallback(conapi.ConsoleEvent.Disconnect) + await self.datacallback(conapi.ConsoleEvent.Disconnect) else: - self.datacallback(data) + await self.datacallback(data) async def connect(self, callback): self.datacallback = callback diff --git a/confluent_server/confluent/plugins/shell/ssh.py b/confluent_server/confluent/plugins/shell/ssh.py index 17804778..b07e12a9 100644 --- a/confluent_server/confluent/plugins/shell/ssh.py +++ b/confluent_server/confluent/plugins/shell/ssh.py @@ -27,49 +27,11 @@ import confluent.util as util import hashlib import sys sys.modules['gssapi'] = None -#paramiko = eventlet.import_patched('paramiko') import asyncio import asyncssh - -class HostKeyHandler: - - def __init__(self, configmanager, node): - self.cfm = configmanager - self.node = node - - def missing_host_key(self, client, hostname, key): - # have to catch the valueerror and use ssh-keyscan to trigger this, asyncssh host key handling - # is a bit more limited compared to paramiko - - #but... leverage /etc/ssh/ssh_known_hosts, we can try that way, and if it fails, fallback to our - #confluent db based handler - fingerprint = 'sha512$' + hashlib.sha512(key.asbytes()).hexdigest() - cfg = self.cfm.get_node_attributes( - self.node, ('pubkeys.ssh', 'pubkeys.addpolicy')) - if 'pubkeys.ssh' not in cfg[self.node]: - if ('pubkeys.addpolicy' in cfg[self.node] and - cfg[self.node]['pubkeys.addpolicy'] and - cfg[self.node]['pubkeys.addpolicy']['value'] == 'manual'): - raise cexc.PubkeyInvalid('New ssh key detected', - key.asbytes(), fingerprint, - 'pubkeys.ssh', 'newkey') - auditlog = log.Logger('audit') - auditlog.log({'node': self.node, 'event': 'sshautoadd', - 'fingerprint': fingerprint}) - self.cfm.set_node_attributes( - {self.node: {'pubkeys.ssh': fingerprint}}) - return True - elif cfg[self.node]['pubkeys.ssh']['value'] == fingerprint: - return True - raise cexc.PubkeyInvalid( - 'Mismatched SSH host key detected', key.asbytes(), fingerprint, - 'pubkeys.ssh', 'mismatch' - ) - - class SshShell(conapi.Console): def __init__(self, node, config, username=b'', password=b''): @@ -89,7 +51,6 @@ class SshShell(conapi.Console): self.height = height if not self.connected: return - # asyncssh channel has change_terminal_size, hurray self.shell[0].channel.change_terminal_size(width=width, height=height) async def recvdata(self): @@ -98,9 +59,9 @@ class SshShell(conapi.Console): if not pendingdata: self.ssh.close() if self.datacallback: - self.datacallback(conapi.ConsoleEvent.Disconnect) + await self.datacallback(conapi.ConsoleEvent.Disconnect) return - self.datacallback(pendingdata) + await self.datacallback(pendingdata) async def connect(self, callback): # for now, we just use the nodename as the presumptive ssh destination @@ -109,10 +70,10 @@ class SshShell(conapi.Console): # identifier self.datacallback = callback if self.username != b'': - await self.logon() + self.logon() else: self.inputmode = 0 - callback('\r\nlogin as: ') + await callback('\r\nlogin as: ') return def logon(self): @@ -122,10 +83,14 @@ class SshShell(conapi.Console): async def do_logon(self): sco = asyncssh.SSHClientConnectionOptions() #The below would be to support the confluent db, and only fallback if the SSH CA do not work - #sco.client_fatory = SSHKnownHostsLookup + # have to catch the valueerror and use ssh-keyscan to trigger this, asyncssh host key handling + # is a bit more limited compared to paramiko + #but... leverage /etc/ssh/ssh_known_hosts, we can try that way, and if it fails, fallback to our + #confluent db based handler + #sco.client_fatory = SSHKnownHostsLookup try: - self.datacallback('\r\nConnecting to {}...'.format(self.node)) + await self.datacallback('\r\nConnecting to {}...'.format(self.node)) try: self.ssh = await asyncssh.connect(self.node, username=self.username.decode(), password=self.password.decode(), known_hosts='/etc/ssh/ssh_known_hosts') except ValueError: @@ -135,11 +100,11 @@ class SshShell(conapi.Console): self.ssh.close() self.keyaction = b'' self.candidatefprint = pi.fingerprint - self.datacallback(pi.message) + await self.datacallback(pi.message) self.keyattrname = pi.attrname - self.datacallback('\r\nNew fingerprint: ' + pi.fingerprint) + await self.datacallback('\r\nNew fingerprint: ' + pi.fingerprint) self.inputmode = -1 - self.datacallback('\r\nEnter "disconnect" or "accept": ') + await self.datacallback('\r\nEnter "disconnect" or "accept": ') return except Exception as e: if self.ssh: @@ -148,25 +113,24 @@ class SshShell(conapi.Console): self.username = b'' self.password = b'' warn = 'Error connecting to {0}:\r\n {1}\r\n'.format(self.node, str(e)) - self.datacallback('\r\n' + warn) - self.datacallback('\r\nlogin as: ') + await self.datacallback('\r\n' + warn) + await self.datacallback('\r\nlogin as: ') return self.inputmode = 2 self.connected = True - self.datacallback('Connected\r\n') - self.shell = await self.ssh.open_session(term_type='vt100', term_size=(self.width, self.height)) # self.ssh.invoke_shell(width=self.width, - # height=self.height) + await self.datacallback('Connected\r\n') + self.shell = await self.ssh.open_session(term_type='vt100', term_size=(self.width, self.height)) self.rxthread = util.spawn(self.recvdata()) async def write(self, data): if self.inputmode == -2: - self.datacallback(conapi.ConsoleEvent.Disconnect) + await self.datacallback(conapi.ConsoleEvent.Disconnect) return elif self.inputmode == -3: return elif self.inputmode == -1: while len(data) and data[0:1] == b'\x7f' and len(self.keyaction): - self.datacallback('\b \b') # erase previously echoed value + await self.datacallback('\b \b') # erase previously echoed value self.keyaction = self.keyaction[:-1] data = data[1:] while len(data) and data[0:1] == b'\x7f': @@ -181,13 +145,13 @@ class SshShell(conapi.Console): self.nodeconfig.set_node_attributes( {self.node: {self.keyattrname: self.candidatefprint}}) - self.datacallback('\r\n') + await self.datacallback('\r\n') self.logon() elif action.lower() == b'disconnect': - self.datacallback(conapi.ConsoleEvent.Disconnect) + await self.datacallback(conapi.ConsoleEvent.Disconnect) else: self.keyaction = b'' - self.datacallback('\r\nEnter "disconnect" or "accept": ') + await self.datacallback('\r\nEnter "disconnect" or "accept": ') elif len(data) > 0: self.datacallback(data) elif self.inputmode == 0: @@ -205,12 +169,12 @@ class SshShell(conapi.Console): self.username, self.password = self.username.split(b'\r')[:2] lastdata = data.split(b'\r')[0] if lastdata != '': - self.datacallback(lastdata) - self.datacallback('\r\nEnter password: ') + await self.datacallback(lastdata) + await self.datacallback('\r\nEnter password: ') self.inputmode = 1 elif len(data) > 0: # echo back typed data - self.datacallback(data) + await self.datacallback(data) elif self.inputmode == 1: while len(data) > 0 and data[0:1] == b'\x7f': self.password = self.password[:-1] @@ -221,125 +185,16 @@ class SshShell(conapi.Console): self.password += data if b'\r' in self.password: self.password = self.password.split(b'\r')[0] - self.datacallback(b'\r\n') + await self.datacallback(b'\r\n') self.logon() else: self.shell[0].write(data.decode()) - def close(self): + async def close(self): if self.ssh is not None: self.ssh.close() self.datacallback = None - -def create(nodes, element, configmanager, inputdata): +async def create(nodes, element, configmanager, inputdata): if len(nodes) == 1: - return SshShell(nodes[0], configmanager) - - -class SshConn(): - - def __init__(self, node, config, username=b'', password=b''): - self.node = node - self.ssh = None - self.datacallback = None - self.nodeconfig = config - self.username = username - self.password = password - self.connected = False - self.inputmode = 0 # 0 = username, 1 = password... - - def __del__(self): - if self.connected: - self.close() - - def do_logon(self): - self.ssh = paramiko.SSHClient() - self.ssh.set_missing_host_key_policy( - HostKeyHandler(self.nodeconfig, self.node)) - log.log({'info': f"Connecting to {self.node} by ssh"}) - try: - if self.password: - self.ssh.connect(self.node, username=self.username, - password=self.password, allow_agent=False, - look_for_keys=False) - else: - self.ssh.connect(self.node, username=self.username) - except paramiko.AuthenticationException as e: - self.ssh.close() - self.inputmode = 0 - self.username = b'' - self.password = b'' - log.log({'warn': f"Error connecting to {self.node}: {str(e)}"}) - return - except paramiko.ssh_exception.NoValidConnectionsError as e: - self.ssh.close() - self.inputmode = 0 - self.username = b'' - self.password = b'' - log.log({'warn': f"Error connecting to {self.node}: {str(e)}"}) - return - except cexc.PubkeyInvalid as pi: - self.ssh.close() - self.keyaction = b'' - self.candidatefprint = pi.fingerprint - log.log({'warn': pi.message}) - self.keyattrname = pi.attrname - log.log({'info': f"New fingerprint: {pi.fingerprint}"}) - self.inputmode = -1 - return - except paramiko.SSHException as pi: - self.ssh.close() - self.inputmode = -2 - warn = str(pi) - if warnhostkey: - warn += ' (Older cryptography package on this host only ' \ - 'works with ed25519, check ssh startup on target ' \ - 'and permissions on /etc/ssh/*key)\r\n' - log.log({'warn': warn}) - return - except Exception as e: - self.ssh.close() - self.ssh.close() - self.inputmode = 0 - self.username = b'' - self.password = b'' - log.log({'warn': f"Error connecting to {self.node}: {str(e)}"}) - return - self.inputmode = 2 - self.connected = True - log.log({'info': f"Connected by ssh to {self.node}"}) - - def exec_command(self, cmd, cmdargs): - safecmd = cmd.translate(str.maketrans({"[": r"\]", - "]": r"\]", - "?": r"\?", - "!": r"\!", - "\\": r"\\", - "^": r"\^", - "$": r"\$", - " ": r"\ ", - "*": r"\*"})) - cmds = [safecmd] - for arg in cmdargs: - arg = arg.translate(str.maketrans({"[": r"\]", - "]": r"\]", - "?": r"\?", - "!": r"\!", - "\\": r"\\", - "^": r"\^", - "$": r"\$", - " ": r"\ ", - "*": r"\*"})) - arg = "%s" % (str(arg).replace(r"'", r"'\''"),) - cmds.append(arg) - - runcmd = " ".join(cmds) - stdin, stdout, stderr = self.ssh.exec_command(runcmd) - rcode = stdout.channel.recv_exit_status() - return stdout.readlines(), stderr.readlines() - - def close(self): - if self.ssh is not None: - self.ssh.close() - log.log({'info': f"Disconnected from {self.node}"}) + yield SshShell(nodes[0], configmanager) diff --git a/confluent_server/confluent/shellserver.py b/confluent_server/confluent/shellserver.py index 26ad4682..5f95bc2d 100644 --- a/confluent_server/confluent/shellserver.py +++ b/confluent_server/confluent/shellserver.py @@ -130,12 +130,14 @@ class ShellSession(consoleserver.ConsoleSession): pass return await super(ShellSession, self).destroy() -def create(nodes, element, configmanager, inputdata): + +async def create(nodes, element, configmanager, inputdata): # For creating a resource, it really has to be handled # in httpapi/sockapi specially, like a console. raise exc.InvalidArgumentException('Special client code required') -def retrieve(nodes, element, configmanager, inputdata): + +async def retrieve(nodes, element, configmanager, inputdata): tenant = configmanager.tenant user = configmanager.current_user if (tenant, nodes[0], user) in activesessions: