mirror of
https://github.com/xcat2/confluent.git
synced 2026-04-29 11:57:49 +00:00
Draft for proxyconsole object for remote use of consoles
This would be the stub stand in for the console object to connect to remote console object rather than local.
This commit is contained in:
@@ -29,9 +29,11 @@ import confluent.exceptions as exc
|
||||
import confluent.interface.console as conapi
|
||||
import confluent.log as log
|
||||
import confluent.core as plugin
|
||||
import confluent.tlvdata as tlvdata
|
||||
import confluent.util as util
|
||||
import eventlet
|
||||
import eventlet.event
|
||||
import eventlet.green.ssl as ssl
|
||||
import pyte
|
||||
import random
|
||||
import time
|
||||
@@ -615,6 +617,67 @@ def connect_node(node, configmanager, username=None):
|
||||
_handled_consoles[consk] = ConsoleHandler(node, configmanager)
|
||||
return _handled_consoles[consk]
|
||||
|
||||
# A stub console handler that just passes through to a remote confluent
|
||||
# collective member. It can skip the multi-session sharing as that is handled
|
||||
# remotely
|
||||
class ProxyConsole(object):
|
||||
def __init__(self, node, managerinfo, myname, configmanager, skipreplay):
|
||||
termreq = {
|
||||
'proxyconsole': {
|
||||
'name': myname,
|
||||
'tenant': configmanager.tenant,
|
||||
'node': node,
|
||||
'skiprelay': skipreplay,
|
||||
},
|
||||
}
|
||||
remote = socket.create_connection((managerinfo['address'], 13001))
|
||||
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE,
|
||||
keyfile='/etc/confluent/privkey.pem',
|
||||
certfile='/etc/confluent/srvcert.pem')
|
||||
if not util.cert_matches(managerinfo['fingerprint'],
|
||||
remote.getpeercert(binary_form=True)):
|
||||
raise Exception('Invalid peer certificate')
|
||||
tlvdata.recv(remote)
|
||||
tlvdata.recv(remote)
|
||||
tlvdata.send(remote, termreq)
|
||||
self.remote = remote
|
||||
eventlet.spawn(self.relay_data)
|
||||
|
||||
def relay_data(self):
|
||||
data = tlvdata.recv(self.remote)
|
||||
while data:
|
||||
self.data_handler(data)
|
||||
data = tlvdata.recv(self.remote)
|
||||
|
||||
def get_buffer_age(self):
|
||||
# the server sends a buffer age if appropriate, no need to handle
|
||||
# it explicitly in the proxy instance
|
||||
return False
|
||||
|
||||
def get_recent(self):
|
||||
# Again, delegate this to the remote collective member
|
||||
return b''
|
||||
|
||||
def write(self, data):
|
||||
# Relay data to the collective manager
|
||||
tlvdata.send(remote, data)
|
||||
|
||||
def attachsession(self, session):
|
||||
# a do nothing stub, since this relationship is easier, the real
|
||||
# complexity is handled remote
|
||||
self.data_handler = session.data_handler
|
||||
|
||||
def detachsession(self, session):
|
||||
# we will disappear, so just let that happen...
|
||||
tlvdata.send(self.remote, {'operation': 'stop'})
|
||||
|
||||
def send_break(self):
|
||||
tlvdata.send(self.remote, {'operation:': 'break'})
|
||||
|
||||
def reopen(self):
|
||||
tlvdata.send(self.remote, {'operation:': 'reopen'})
|
||||
|
||||
|
||||
# this represents some api view of a console handler. This handles things like
|
||||
# holding the caller specific queue data, for example, when http api should be
|
||||
# sending data, but there is no outstanding POST request to hold it,
|
||||
@@ -709,6 +772,15 @@ class ConsoleSession(object):
|
||||
self._evt = None
|
||||
self.reghdl = None
|
||||
|
||||
def detach(self):
|
||||
"""Handler for the console handler to detach so it can reattach,
|
||||
currently to facilitate changing from one collective.manager to
|
||||
another
|
||||
|
||||
:return:
|
||||
"""
|
||||
pass
|
||||
|
||||
def got_data(self, data):
|
||||
"""Receive data from console and buffer
|
||||
|
||||
|
||||
@@ -226,11 +226,9 @@ def start_proxy_term(connection, cert, request):
|
||||
return
|
||||
cfm = configmanager.ConfigManager(request['tenant'])
|
||||
ccons = ClientConsole(connection)
|
||||
if params and 'skipreplay' in params and params['skipreplay']:
|
||||
skipreplay = True
|
||||
consession = consoleserver.ConsoleSession(
|
||||
node=request['node'], configmanager=cfm, username=request['user'],
|
||||
datacallback=ccons.sendall, skipreplay=skipreplay)
|
||||
datacallback=ccons.sendall, skipreplay=request['skipreplay'])
|
||||
term_interact(None, None, ccons, None, connection, consession, None)
|
||||
|
||||
def start_term(authname, cfm, connection, params, path, authdata, skipauth):
|
||||
|
||||
Reference in New Issue
Block a user