2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-01-12 02:52:30 +00:00

Close stray filehandles

Proxied terminals and dispsatched
requests would leak filehandles.
This commit is contained in:
Jarrod Johnson
2021-08-17 17:18:10 -04:00
parent d3b6326673
commit b07ca72a8b
3 changed files with 92 additions and 83 deletions

View File

@@ -720,6 +720,7 @@ class ProxyConsole(object):
if self.remote:
try:
tlvdata.send(self.remote, {'operation': 'stop'})
self.remote.close()
except Exception:
pass
self.clisession = None

View File

@@ -825,6 +825,7 @@ def handle_dispatch(connection, cert, dispatch, peername):
_forward_rsp(connection, res)
keepalive.kill()
connection.sendall('\x00\x00\x00\x00\x00\x00\x00\x00')
connection.close()
def _forward_rsp(connection, res):

View File

@@ -115,92 +115,98 @@ def send_data(connection, data):
def sessionhdl(connection, authname, skipauth=False, cert=None):
# For now, trying to test the console stuff, so let's just do n4.
authenticated = False
authdata = None
cfm = None
if skipauth:
authenticated = True
cfm = configmanager.ConfigManager(tenant=None, username=authname)
elif authname:
authdata = auth.authorize(authname, element=None)
if authdata:
cfm = authdata[1]
authenticated = True
# version 0 == original, version 1 == pickle3 allowed, 2 = pickle forbidden, msgpack allowed
# v3 - filehandle allowed
send_data(connection, "Confluent -- v3 --")
while not authenticated: # prompt for name and passphrase
send_data(connection, {'authpassed': 0})
response = tlvdata.recv(connection)
if not response:
return
if 'collective' in response:
return collective.handle_connection(connection, cert,
response['collective'])
if 'dispatch' in response:
dreq = tlvdata.recvall(connection, response['dispatch']['length'])
return pluginapi.handle_dispatch(connection, cert, dreq,
response['dispatch']['name'])
if 'proxyconsole' in response:
return start_proxy_term(connection, cert, response['proxyconsole'])
authname = response['username']
passphrase = response['password']
# note(jbjohnso): here, we need to authenticate, but not
# authorize a user. When authorization starts understanding
# element path, that authorization will need to be called
# per request the user makes
authdata = auth.check_user_passphrase(authname, passphrase)
if not authdata:
auditlog.log(
{'operation': 'connect', 'user': authname, 'allowed': False})
else:
authenticated = True
cfm = authdata[1]
send_data(connection, {'authpassed': 1})
request = tlvdata.recv(connection)
if request and isinstance(request, dict) and 'collective' in request:
try:
# For now, trying to test the console stuff, so let's just do n4.
authenticated = False
authdata = None
cfm = None
if skipauth:
if not libssl:
tlvdata.send(
connection,
{'collective': {'error': 'Server either does not have '
'python-pyopenssl installed or has an '
'incorrect version installed '
'(e.g. pyOpenSSL would need to be '
'replaced with python-pyopenssl). '
'Restart confluent after updating '
'the dependency.'}})
authenticated = True
cfm = configmanager.ConfigManager(tenant=None, username=authname)
elif authname:
authdata = auth.authorize(authname, element=None)
if authdata:
cfm = authdata[1]
authenticated = True
# version 0 == original, version 1 == pickle3 allowed, 2 = pickle forbidden, msgpack allowed
# v3 - filehandle allowed
send_data(connection, "Confluent -- v3 --")
while not authenticated: # prompt for name and passphrase
send_data(connection, {'authpassed': 0})
response = tlvdata.recv(connection)
if not response:
return
return collective.handle_connection(connection, None, request['collective'],
local=True)
else:
tlvdata.send(
connection,
{'collective': {'error': 'collective management commands may only be used by root'}})
while request is not None:
try:
process_request(
connection, request, cfm, authdata, authname, skipauth)
except exc.ConfluentException as e:
if ((not isinstance(e, exc.LockedCredentials)) and
e.apierrorcode == 500):
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
send_data(connection, {'errorcode': e.apierrorcode,
'error': e.apierrorstr,
'detail': e.get_error_body()})
send_data(connection, {'_requestdone': 1})
except SystemExit:
sys.exit(0)
except Exception as e:
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
send_data(connection, {'errorcode': 500,
'error': 'Unexpected error - ' + str(e)})
send_data(connection, {'_requestdone': 1})
if 'collective' in response:
return collective.handle_connection(connection, cert,
response['collective'])
if 'dispatch' in response:
dreq = tlvdata.recvall(connection, response['dispatch']['length'])
return pluginapi.handle_dispatch(connection, cert, dreq,
response['dispatch']['name'])
if 'proxyconsole' in response:
return start_proxy_term(connection, cert, response['proxyconsole'])
authname = response['username']
passphrase = response['password']
# note(jbjohnso): here, we need to authenticate, but not
# authorize a user. When authorization starts understanding
# element path, that authorization will need to be called
# per request the user makes
authdata = auth.check_user_passphrase(authname, passphrase)
if not authdata:
auditlog.log(
{'operation': 'connect', 'user': authname, 'allowed': False})
else:
authenticated = True
cfm = authdata[1]
send_data(connection, {'authpassed': 1})
request = tlvdata.recv(connection)
cfm.close_client_files()
if request and isinstance(request, dict) and 'collective' in request:
if skipauth:
if not libssl:
tlvdata.send(
connection,
{'collective': {'error': 'Server either does not have '
'python-pyopenssl installed or has an '
'incorrect version installed '
'(e.g. pyOpenSSL would need to be '
'replaced with python-pyopenssl). '
'Restart confluent after updating '
'the dependency.'}})
return
return collective.handle_connection(connection, None, request['collective'],
local=True)
else:
tlvdata.send(
connection,
{'collective': {'error': 'collective management commands may only be used by root'}})
while request is not None:
try:
process_request(
connection, request, cfm, authdata, authname, skipauth)
except exc.ConfluentException as e:
if ((not isinstance(e, exc.LockedCredentials)) and
e.apierrorcode == 500):
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
send_data(connection, {'errorcode': e.apierrorcode,
'error': e.apierrorstr,
'detail': e.get_error_body()})
send_data(connection, {'_requestdone': 1})
except SystemExit:
sys.exit(0)
except Exception as e:
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
send_data(connection, {'errorcode': 500,
'error': 'Unexpected error - ' + str(e)})
send_data(connection, {'_requestdone': 1})
request = tlvdata.recv(connection)
finally:
cfm.close_client_files()
try:
connection.close()
except Exception:
pass
def send_response(responses, connection):
if responses is None:
@@ -346,6 +352,7 @@ def term_interact(authdata, authname, ccons, cfm, connection, consession,
consession.destroy()
break
consession.write(data)
connection.close()
def _tlshandler(bind_host, bind_port):