2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-26 10:41:29 +00:00

Fix issues associated with unix domain vs tls

This commit is contained in:
Jarrod Johnson
2026-04-17 10:42:40 -04:00
parent 67860dc7c3
commit f1f5f1b3b8
3 changed files with 43 additions and 49 deletions

View File

@@ -198,6 +198,12 @@ async def sendall(handle, data):
cloop = asyncio.get_event_loop()
return await cloop.sock_sendall(handle, data)
def get_socket(handle):
if isinstance(handle, tuple):
return handle[1].transport.get_extra_info('socket')
else:
return handle
async def close(handle):
if isinstance(handle, tuple):
handle[1].close()

View File

@@ -341,8 +341,7 @@ async def handle_connection(connection, cert, request, local=False):
await tlvdata.send(connection,
{'error': 'Invalid certificate, '
'redo invitation process'})
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
return
await tlvdata.recv(remote) # ignore banner
await tlvdata.recv(remote) # ignore authpassed: 0
@@ -366,16 +365,17 @@ async def handle_connection(connection, cert, request, local=False):
todelete in collinfo['active']):
await tlvdata.send(connection, {'collective':
{'error': '{0} is still active, stop the confluent service to remove it'.format(todelete)}})
await tlvdata.close(connection)
return
if todelete not in collinfo['offline']:
await tlvdata.send(connection, {'collective':
{'error': '{0} is not a recognized collective member'.format(todelete)}})
await tlvdata.close(connection)
return
await cfm.del_collective_member(todelete)
await tlvdata.send(connection,
{'collective': {'status': 'Successfully deleted {0}'.format(todelete)}})
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
return
if 'invite' == operation:
try:
@@ -392,8 +392,7 @@ async def handle_connection(connection, cert, request, local=False):
invitation = invites.create_server_invitation(name, role)
await tlvdata.send(connection,
{'collective': {'invitation': invitation}})
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
if 'join' == operation:
invitation = request['invitation']
try:
@@ -459,6 +458,7 @@ async def handle_connection(connection, cert, request, local=False):
if 'error' in rsp:
await tlvdata.send(connection, {'collective':
{'status': rsp['error']}})
await tlvdata.close(connection)
return
proof = rsp['collective']['approval']
proof = base64.b64decode(proof)
@@ -498,11 +498,12 @@ async def handle_connection(connection, cert, request, local=False):
fprint = util.get_fingerprint(cert)
myfprint = util.get_fingerprint(mycert)
iam = cfm.get_collective_member(get_myname())
cnn = tlvdata.get_socket(connection)
if not iam:
await cfm.add_collective_member(get_myname(),
connection[1].transport.get_extra_info('socket').getsockname()[0], myfprint)
cnn.getsockname()[0], myfprint)
await cfm.add_collective_member(request['name'],
connection[1].transport.get_extra_info('socket').getpeername()[0], fprint, role)
cnn.getpeername()[0], fprint, role)
myleader = await get_leader(connection)
ldrfprint = (await cfm.get_collective_member_by_address(
myleader))['fingerprint']
@@ -564,23 +565,20 @@ async def handle_connection(connection, cert, request, local=False):
connection,
{'error': 'Already following, assimilate leader first',
'leader': currentleader})
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
return
if connecting.active:
# don't try to connect while actively already trying to connect
await tlvdata.send(connection, {'status': 0})
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
return
cnn = connection[1].get_extra_info('socket')
cnn = tlvdata.get_socket(connection)
if (currentleader == cnn.getpeername()[0] and
follower and not follower.dead):
# if we are happily following this leader already, don't stir
# the pot
await tlvdata.send(connection, {'status': 0})
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
return
log.log({'info': 'Connecting in response to assimilation',
'subsystem': 'collective'})
@@ -588,8 +586,7 @@ async def handle_connection(connection, cert, request, local=False):
if cfm.cfgstreams:
await retire_as_leader(newleader)
await tlvdata.send(connection, {'status': 0})
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
if not await connect_to_leader(None, None, leader=newleader):
if retrythread is None:
retrythread = tasks.spawn_task_after(random.random(),
@@ -602,8 +599,7 @@ async def handle_connection(connection, cert, request, local=False):
await tlvdata.send(connection,
{'error': 'Invalid certificate, '
'redo invitation process'})
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
return
collinfo = {}
populate_collinfo(collinfo)
@@ -616,31 +612,27 @@ async def handle_connection(connection, cert, request, local=False):
await tlvdata.send(connection,
{'error': 'Invalid certificate, '
'redo invitation process'})
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
return
cnn = connection[1].transport.get_extra_info('socket')
cnn = tlvdata.get_socket(connection)
myself = cnn.getsockname()[0]
if connecting.active or initting:
await tlvdata.send(connection, {'error': 'Connecting right now',
'backoff': True})
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
return
if leader_init.active:
print("initting leader....")
await tlvdata.send(connection, {'error': 'Servicing a connection',
'waitinline': True})
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
return
if myself != await get_leader(connection):
await tlvdata.send(
connection,
{'error': 'Cannot assimilate, our leader is '
'in another castle', 'leader': currentleader})
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
return
if request['txcount'] > cfm._txcount:
await retire_as_leader()
@@ -650,10 +642,9 @@ async def handle_connection(connection, cert, request, local=False):
'txcount': cfm._txcount})
log.log({'info': 'Connecting to leader due to superior '
'transaction count', 'subsystem': 'collective'})
cnn = connection[1].transport.get_extra_info('socket')
cnn = tlvdata.get_socket(connection)
peername = cnn.getpeername()[0]
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
if not await connect_to_leader(
None, None, peername):
if retrythread is None:
@@ -664,7 +655,7 @@ async def handle_connection(connection, cert, request, local=False):
retrythread.cancel()
retrythread = None
async with leader_init:
cnn = connection[1].get_extra_info('socket')
cnn = tlvdata.get_socket(connection)
cfm.update_collective_address(request['name'],
cnn.getpeername()[0])
await tlvdata.send(connection, cfm._dump_keys(None, False))
@@ -674,16 +665,18 @@ async def handle_connection(connection, cert, request, local=False):
try:
await tlvdata.send(connection, {'txcount': cfm._txcount,
'dbsize': len(cfgdata)})
connection[1].write(cfgdata)
await connection[1].drain()
except Exception as e:
print(repr(e))
if isinstance(connection, tuple):
connection[1].write(cfgdata)
await connection[1].drain()
else:
connection.write(cfgdata)
await connection.drain()
finally:
try:
connection[1].close()
await connection[1].wait_closed()
finally:
raise
return None
await tlvdata.close(connection)
except Exception as e:
log.log({'info': 'Ignoring non-fatal error while closing collective connection: {0}'.format(e),
'subsystem': 'collective'})
#tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now,
# so far unused anyway
#connection.settimeout(90)
@@ -760,7 +753,7 @@ async def try_assimilate(drone, followcount, remote):
async def get_leader(connection):
cnn = connection[1].transport.get_extra_info('socket')
cnn = tlvdata.get_socket(connection)
if currentleader is None or cnn.getpeername()[0] == currentleader:
# cancel retry if a retry is pending
if currentleader is None:
@@ -799,7 +792,7 @@ async def become_leader(connection):
if retrythread is not None:
retrythread.cancel()
retrythread = None
cnn = connection[1].transport.get_extra_info('socket')
cnn = tlvdata.get_socket(connection)
currentleader = cnn.getsockname()[0]
skipaddr = cnn.getpeername()[0]
if reassimilate is not None:

View File

@@ -193,12 +193,7 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None):
if cfm:
cfm.close_client_files()
try:
if isinstance(connection, tuple):
connection[1].close()
await connection[1].wait_closed()
connection = connection[1].get_extra_info('socket')
else:
connection.close()
await tlvdata.close(connection)
except Exception as e:
print(repr(e))
pass