2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-01-13 03:22:30 +00:00

Opt into the msgpack 1.0 behavior

This fixes the dispatch to actually work.
This commit is contained in:
Jarrod Johnson
2020-01-24 11:39:44 -05:00
parent 92699e47f2
commit b766e7b0ee
3 changed files with 20 additions and 15 deletions

View File

@@ -690,7 +690,7 @@ def handle_dispatch(connection, cert, dispatch, peername):
# under 0x20 or so.
connection.close()
return
dispatch = msgpack.unpackb(dispatch[2:])
dispatch = msgpack.unpackb(dispatch[2:], raw=False)
configmanager = cfm.ConfigManager(dispatch['tenant'])
nodes = dispatch['nodes']
inputdata = dispatch['inputdata']
@@ -726,24 +726,26 @@ def handle_dispatch(connection, cert, dispatch, peername):
configmanager=configmanager,
inputdata=inputdata))
for res in itertools.chain(*passvalues):
_forward_rsp(connection, res, pversion)
_forward_rsp(connection, res)
except Exception as res:
_forward_rsp(connection, res, pversion)
_forward_rsp(connection, res)
connection.sendall('\x00\x00\x00\x00\x00\x00\x00\x00')
def _forward_rsp(connection, res, pversion):
def _forward_rsp(connection, res):
try:
r = res.serialize()
except AttributeError:
if isinstance(res, Exception):
r = msgpack.packb(['Exception', str(res)])
r = msgpack.packb(['Exception', str(res)], use_bin_type=True)
else:
r = msgpack.packb(
['Exception', 'Unable to serialize response ' + repr(res)])
['Exception', 'Unable to serialize response ' + repr(res)],
use_bin_type=True)
except Exception:
r = msgpack.packb(
['Exception', 'Unable to serialize response ' + repr(res)])
['Exception', 'Unable to serialize response ' + repr(res)],
use_bin_type=True)
rlen = len(r)
if not rlen:
return
@@ -985,7 +987,7 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata,
dreq = b'\x01\x03' + msgpack.packb(
{'name': myname, 'nodes': list(nodes),
'path': element,'tenant': configmanager.tenant,
'operation': operation, 'inputdata': inputdata})
'operation': operation, 'inputdata': inputdata}, use_bin_type=True)
tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}})
remote.sendall(dreq)
while True:

View File

@@ -19,8 +19,8 @@ import base64
import json
import msgpack
def deserilaize_exc(msg):
excd = msgpack.unpackb(msg)
def deserialize_exc(msg):
excd = msgpack.unpackb(msg, raw=False)
if excd[0] not in globals():
return False
if not issubclass(excd[0], ConfluentException):
@@ -36,7 +36,8 @@ class ConfluentException(Exception):
return json.dumps({'error': errstr })
def serialize(self):
return msgpack.packb([self.__class__.__name__, [str(self)]])
return msgpack.packb([self.__class__.__name__, [str(self)]],
use_bin_type=True)
@property
def apierrorstr(self):
@@ -130,7 +131,8 @@ class PubkeyInvalid(ConfluentException):
self.errorbody = json.dumps(bodydata)
def serialize(self):
return msgpack.packb([self.__class__.__name__, self.myargs])
return msgpack.packb([self.__class__.__name__, self.myargs],
use_bin_type=True)
def get_error_body(self):
return self.errorbody

View File

@@ -86,7 +86,7 @@ def _htmlify_structure(indict):
def msg_deserialize(packed):
m = msgpack.unpackb(packed)
m = msgpack.unpackb(packed, raw=False)
cls = globals()[m[0]]
if issubclass(cls, ConfluentMessage) or issubclass(cls, ConfluentNodeError):
return cls(*m[1:])
@@ -116,7 +116,7 @@ class ConfluentMessage(object):
def serialize(self):
msg = [self.__class__.__name__]
msg.extend(self.myargs)
return msgpack.packb(msg)
return msgpack.packb(msg, use_bin_type=True)
@classmethod
def deserialize(cls, data):
@@ -230,7 +230,8 @@ class ConfluentNodeError(object):
def serialize(self):
return msgpack.packb(
[self.__class__.__name__, self.node, self.error])
[self.__class__.__name__, self.node, self.error],
use_bin_type=True)
@classmethod
def deserialize(cls, data):