From 3bf083deb3a1f572ff6ccf9d9a3caf4840f731b2 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 21 Jan 2020 14:15:14 -0500 Subject: [PATCH] Stage 3 of msgpack for dispatch This may complete the dispatch portion of the msgpack migration. --- confluent_server/confluent/core.py | 48 ++++++++++++++++-------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 575d2e19..f1fba399 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -60,19 +60,14 @@ import eventlet.greenpool as greenpool import eventlet.green.ssl as ssl import eventlet.queue as queue import itertools +import msgpack import os -try: - import cPickle as pickle - pargs = {} -except ImportError: - import pickle - pargs = {'encoding': 'utf-8'} import socket import struct import sys pluginmap = {} -dispatch_plugins = (b'ipmi', u'ipmi') +dispatch_plugins = (b'ipmi', u'ipmi', b'redfish', u'redfish') def seek_element(currplace, currkey): @@ -689,10 +684,13 @@ def handle_dispatch(connection, cert, dispatch, peername): cfm.get_collective_member(peername)['fingerprint'], cert): connection.close() return - pversion = 0 - if bytearray(dispatch)[0] == 0x80: - pversion = bytearray(dispatch)[1] - dispatch = pickle.loads(dispatch, **pargs) + if dispatch[0:2] != b'\x01\x03': # magic value to indicate msgpack + # We only support msgpack now + # The magic should preclude any pickle, as the first byte can never be + # under 0x20 or so. + connection.close() + return + dispatch = msgpack.unpackb(dispatch[2:]) configmanager = cfm.ConfigManager(dispatch['tenant']) nodes = dispatch['nodes'] inputdata = dispatch['inputdata'] @@ -736,10 +734,16 @@ def handle_dispatch(connection, cert, dispatch, peername): def _forward_rsp(connection, res, pversion): try: - r = pickle.dumps(res, protocol=pversion) - except TypeError: - r = pickle.dumps(Exception( - 'Cannot serialize error, check collective.manager error logs for details' + str(res)), protocol=pversion) + r = res.serialize() + except AttributeError: + if isinstance(res, Exception): + r = msgpack.packb(['Exception', str(res)]) + else: + r = msgpack.packb( + ['Exception', 'Unable to serialize response ' + repr(res)]) + except Exception: + r = msgpack.packb( + ['Exception', 'Unable to serialize response ' + repr(res)]) rlen = len(r) if not rlen: return @@ -978,10 +982,10 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata, pvers = 2 tlvdata.recv(remote) myname = collective.get_myname() - dreq = pickle.dumps({'name': myname, 'nodes': list(nodes), - 'path': element,'tenant': configmanager.tenant, - 'operation': operation, 'inputdata': inputdata}, - protocol=pvers) + dreq = b'\x01\x03' + msgpack.packb( + {'name': myname, 'nodes': list(nodes), + 'path': element,'tenant': configmanager.tenant, + 'operation': operation, 'inputdata': inputdata}) tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}}) remote.sendall(dreq) while True: @@ -1029,9 +1033,9 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata, return rsp += nrsp try: - rsp = pickle.loads(rsp, **pargs) - except UnicodeDecodeError: - rsp = pickle.loads(rsp, encoding='latin1') + rsp = msg.msg_deserialize(rsp) + except Exception: + rsp = exc.deserialize_exc(rsp) if isinstance(rsp, Exception): raise rsp yield rsp