From b766e7b0eec830c1d736953bc851150ecc0aa8dc Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 24 Jan 2020 11:39:44 -0500 Subject: [PATCH] Opt into the msgpack 1.0 behavior This fixes the dispatch to actually work. --- confluent_server/confluent/core.py | 18 ++++++++++-------- confluent_server/confluent/exceptions.py | 10 ++++++---- confluent_server/confluent/messages.py | 7 ++++--- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index f9d6d904..884ed809 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -690,7 +690,7 @@ def handle_dispatch(connection, cert, dispatch, peername): # under 0x20 or so. connection.close() return - dispatch = msgpack.unpackb(dispatch[2:]) + dispatch = msgpack.unpackb(dispatch[2:], raw=False) configmanager = cfm.ConfigManager(dispatch['tenant']) nodes = dispatch['nodes'] inputdata = dispatch['inputdata'] @@ -726,24 +726,26 @@ def handle_dispatch(connection, cert, dispatch, peername): configmanager=configmanager, inputdata=inputdata)) for res in itertools.chain(*passvalues): - _forward_rsp(connection, res, pversion) + _forward_rsp(connection, res) except Exception as res: - _forward_rsp(connection, res, pversion) + _forward_rsp(connection, res) connection.sendall('\x00\x00\x00\x00\x00\x00\x00\x00') -def _forward_rsp(connection, res, pversion): +def _forward_rsp(connection, res): try: r = res.serialize() except AttributeError: if isinstance(res, Exception): - r = msgpack.packb(['Exception', str(res)]) + r = msgpack.packb(['Exception', str(res)], use_bin_type=True) else: r = msgpack.packb( - ['Exception', 'Unable to serialize response ' + repr(res)]) + ['Exception', 'Unable to serialize response ' + repr(res)], + use_bin_type=True) except Exception: r = msgpack.packb( - ['Exception', 'Unable to serialize response ' + repr(res)]) + ['Exception', 'Unable to serialize response ' + repr(res)], + use_bin_type=True) rlen = len(r) if not rlen: return @@ -985,7 +987,7 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata, dreq = b'\x01\x03' + msgpack.packb( {'name': myname, 'nodes': list(nodes), 'path': element,'tenant': configmanager.tenant, - 'operation': operation, 'inputdata': inputdata}) + 'operation': operation, 'inputdata': inputdata}, use_bin_type=True) tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}}) remote.sendall(dreq) while True: diff --git a/confluent_server/confluent/exceptions.py b/confluent_server/confluent/exceptions.py index 619d24b5..e184a02f 100644 --- a/confluent_server/confluent/exceptions.py +++ b/confluent_server/confluent/exceptions.py @@ -19,8 +19,8 @@ import base64 import json import msgpack -def deserilaize_exc(msg): - excd = msgpack.unpackb(msg) +def deserialize_exc(msg): + excd = msgpack.unpackb(msg, raw=False) if excd[0] not in globals(): return False if not issubclass(excd[0], ConfluentException): @@ -36,7 +36,8 @@ class ConfluentException(Exception): return json.dumps({'error': errstr }) def serialize(self): - return msgpack.packb([self.__class__.__name__, [str(self)]]) + return msgpack.packb([self.__class__.__name__, [str(self)]], + use_bin_type=True) @property def apierrorstr(self): @@ -130,7 +131,8 @@ class PubkeyInvalid(ConfluentException): self.errorbody = json.dumps(bodydata) def serialize(self): - return msgpack.packb([self.__class__.__name__, self.myargs]) + return msgpack.packb([self.__class__.__name__, self.myargs], + use_bin_type=True) def get_error_body(self): return self.errorbody diff --git a/confluent_server/confluent/messages.py b/confluent_server/confluent/messages.py index 104ced21..07f84a14 100644 --- a/confluent_server/confluent/messages.py +++ b/confluent_server/confluent/messages.py @@ -86,7 +86,7 @@ def _htmlify_structure(indict): def msg_deserialize(packed): - m = msgpack.unpackb(packed) + m = msgpack.unpackb(packed, raw=False) cls = globals()[m[0]] if issubclass(cls, ConfluentMessage) or issubclass(cls, ConfluentNodeError): return cls(*m[1:]) @@ -116,7 +116,7 @@ class ConfluentMessage(object): def serialize(self): msg = [self.__class__.__name__] msg.extend(self.myargs) - return msgpack.packb(msg) + return msgpack.packb(msg, use_bin_type=True) @classmethod def deserialize(cls, data): @@ -230,7 +230,8 @@ class ConfluentNodeError(object): def serialize(self): return msgpack.packb( - [self.__class__.__name__, self.node, self.error]) + [self.__class__.__name__, self.node, self.error], + use_bin_type=True) @classmethod def deserialize(cls, data):