diff --git a/confluent_client/confluent/asynctlvdata.py b/confluent_client/confluent/asynctlvdata.py index 714f2fea..8a2656c8 100644 --- a/confluent_client/confluent/asynctlvdata.py +++ b/confluent_client/confluent/asynctlvdata.py @@ -198,6 +198,12 @@ async def sendall(handle, data): cloop = asyncio.get_event_loop() return await cloop.sock_sendall(handle, data) + def get_socket(handle): + if isinstance(handle, tuple): + return handle[1].transport.get_extra_info('socket') + else: + return handle + async def close(handle): if isinstance(handle, tuple): handle[1].close() diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 38290e9f..290922a1 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -341,8 +341,7 @@ async def handle_connection(connection, cert, request, local=False): await tlvdata.send(connection, {'error': 'Invalid certificate, ' 'redo invitation process'}) - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) return await tlvdata.recv(remote) # ignore banner await tlvdata.recv(remote) # ignore authpassed: 0 @@ -366,16 +365,17 @@ async def handle_connection(connection, cert, request, local=False): todelete in collinfo['active']): await tlvdata.send(connection, {'collective': {'error': '{0} is still active, stop the confluent service to remove it'.format(todelete)}}) + await tlvdata.close(connection) return if todelete not in collinfo['offline']: await tlvdata.send(connection, {'collective': {'error': '{0} is not a recognized collective member'.format(todelete)}}) + await tlvdata.close(connection) return await cfm.del_collective_member(todelete) await tlvdata.send(connection, {'collective': {'status': 'Successfully deleted {0}'.format(todelete)}}) - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) return if 'invite' == operation: try: @@ -392,8 +392,7 @@ async def handle_connection(connection, cert, request, local=False): invitation = invites.create_server_invitation(name, role) await tlvdata.send(connection, {'collective': {'invitation': invitation}}) - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) if 'join' == operation: invitation = request['invitation'] try: @@ -459,6 +458,7 @@ async def handle_connection(connection, cert, request, local=False): if 'error' in rsp: await tlvdata.send(connection, {'collective': {'status': rsp['error']}}) + await tlvdata.close(connection) return proof = rsp['collective']['approval'] proof = base64.b64decode(proof) @@ -498,11 +498,12 @@ async def handle_connection(connection, cert, request, local=False): fprint = util.get_fingerprint(cert) myfprint = util.get_fingerprint(mycert) iam = cfm.get_collective_member(get_myname()) + cnn = tlvdata.get_socket(connection) if not iam: await cfm.add_collective_member(get_myname(), - connection[1].transport.get_extra_info('socket').getsockname()[0], myfprint) + cnn.getsockname()[0], myfprint) await cfm.add_collective_member(request['name'], - connection[1].transport.get_extra_info('socket').getpeername()[0], fprint, role) + cnn.getpeername()[0], fprint, role) myleader = await get_leader(connection) ldrfprint = (await cfm.get_collective_member_by_address( myleader))['fingerprint'] @@ -564,23 +565,20 @@ async def handle_connection(connection, cert, request, local=False): connection, {'error': 'Already following, assimilate leader first', 'leader': currentleader}) - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) return if connecting.active: # don't try to connect while actively already trying to connect await tlvdata.send(connection, {'status': 0}) - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) return - cnn = connection[1].get_extra_info('socket') + cnn = tlvdata.get_socket(connection) if (currentleader == cnn.getpeername()[0] and follower and not follower.dead): # if we are happily following this leader already, don't stir # the pot await tlvdata.send(connection, {'status': 0}) - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) return log.log({'info': 'Connecting in response to assimilation', 'subsystem': 'collective'}) @@ -588,8 +586,7 @@ async def handle_connection(connection, cert, request, local=False): if cfm.cfgstreams: await retire_as_leader(newleader) await tlvdata.send(connection, {'status': 0}) - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) if not await connect_to_leader(None, None, leader=newleader): if retrythread is None: retrythread = tasks.spawn_task_after(random.random(), @@ -602,8 +599,7 @@ async def handle_connection(connection, cert, request, local=False): await tlvdata.send(connection, {'error': 'Invalid certificate, ' 'redo invitation process'}) - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) return collinfo = {} populate_collinfo(collinfo) @@ -616,31 +612,27 @@ async def handle_connection(connection, cert, request, local=False): await tlvdata.send(connection, {'error': 'Invalid certificate, ' 'redo invitation process'}) - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) return - cnn = connection[1].transport.get_extra_info('socket') + cnn = tlvdata.get_socket(connection) myself = cnn.getsockname()[0] if connecting.active or initting: await tlvdata.send(connection, {'error': 'Connecting right now', 'backoff': True}) - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) return if leader_init.active: print("initting leader....") await tlvdata.send(connection, {'error': 'Servicing a connection', 'waitinline': True}) - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) return if myself != await get_leader(connection): await tlvdata.send( connection, {'error': 'Cannot assimilate, our leader is ' 'in another castle', 'leader': currentleader}) - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) return if request['txcount'] > cfm._txcount: await retire_as_leader() @@ -650,10 +642,9 @@ async def handle_connection(connection, cert, request, local=False): 'txcount': cfm._txcount}) log.log({'info': 'Connecting to leader due to superior ' 'transaction count', 'subsystem': 'collective'}) - cnn = connection[1].transport.get_extra_info('socket') + cnn = tlvdata.get_socket(connection) peername = cnn.getpeername()[0] - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) if not await connect_to_leader( None, None, peername): if retrythread is None: @@ -664,7 +655,7 @@ async def handle_connection(connection, cert, request, local=False): retrythread.cancel() retrythread = None async with leader_init: - cnn = connection[1].get_extra_info('socket') + cnn = tlvdata.get_socket(connection) cfm.update_collective_address(request['name'], cnn.getpeername()[0]) await tlvdata.send(connection, cfm._dump_keys(None, False)) @@ -674,16 +665,18 @@ async def handle_connection(connection, cert, request, local=False): try: await tlvdata.send(connection, {'txcount': cfm._txcount, 'dbsize': len(cfgdata)}) - connection[1].write(cfgdata) - await connection[1].drain() - except Exception as e: - print(repr(e)) + if isinstance(connection, tuple): + connection[1].write(cfgdata) + await connection[1].drain() + else: + connection.write(cfgdata) + await connection.drain() + finally: try: - connection[1].close() - await connection[1].wait_closed() - finally: - raise - return None + await tlvdata.close(connection) + except Exception as e: + log.log({'info': 'Ignoring non-fatal error while closing collective connection: {0}'.format(e), + 'subsystem': 'collective'}) #tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, # so far unused anyway #connection.settimeout(90) @@ -760,7 +753,7 @@ async def try_assimilate(drone, followcount, remote): async def get_leader(connection): - cnn = connection[1].transport.get_extra_info('socket') + cnn = tlvdata.get_socket(connection) if currentleader is None or cnn.getpeername()[0] == currentleader: # cancel retry if a retry is pending if currentleader is None: @@ -799,7 +792,7 @@ async def become_leader(connection): if retrythread is not None: retrythread.cancel() retrythread = None - cnn = connection[1].transport.get_extra_info('socket') + cnn = tlvdata.get_socket(connection) currentleader = cnn.getsockname()[0] skipaddr = cnn.getpeername()[0] if reassimilate is not None: diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 984d866d..9c655335 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -193,12 +193,7 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): if cfm: cfm.close_client_files() try: - if isinstance(connection, tuple): - connection[1].close() - await connection[1].wait_closed() - connection = connection[1].get_extra_info('socket') - else: - connection.close() + await tlvdata.close(connection) except Exception as e: print(repr(e)) pass