From 1aee19997a4a2ea4872e47cd2068a9950a331267 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 4 Feb 2020 10:16:48 -0500 Subject: [PATCH] Carry errors across msgpack Messages that were formerly carried as pickled exceptions are now sent as generic strings over msgpack. --- confluent_server/confluent/config/configmanager.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index bc282660..c2138354 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -613,8 +613,10 @@ def relay_slaved_requests(name, listener): raise Exception('Unsupported function {0} called'.format(rpc['function'])) try: globals()[rpc['function']](*rpc['args']) + except ValueError as ve: + exc = ['ValueError', str(ve)] except Exception as e: - exc = e + exc = ['Exception', str(e)] if 'xid' in rpc: res = _push_rpc(listener, msgpack.packb({'xid': rpc['xid'], 'exc': exc}, use_bin_type=False)) @@ -775,7 +777,12 @@ def follow_channel(channel): print(repr(e)) if 'xid' in rpc and rpc['xid']: if rpc.get('exc', None): - _pendingchangesets[rpc['xid']].send_exception(rpc['exc']) + exctype, excstr = rpc['exc'] + if exctype == 'ValueError': + exc = ValueError(excstr) + else: + exc = Exception(excstr) + _pendingchangesets[rpc['xid']].send_exception(exc) else: _pendingchangesets[rpc['xid']].send() if 'quorum' in rpc: