From 13a0bf4fbe3e57b5262155bc6b2b91cdf0c08794 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 16 Jan 2020 16:42:32 -0500 Subject: [PATCH 1/3] Draft for message serialization This adds msgpack based serialization to messages. This would be used to superesed pickle in core. --- confluent_server/confluent/messages.py | 79 +++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 2 deletions(-) diff --git a/confluent_server/confluent/messages.py b/confluent_server/confluent/messages.py index 4be79516..104ced21 100644 --- a/confluent_server/confluent/messages.py +++ b/confluent_server/confluent/messages.py @@ -24,6 +24,7 @@ import confluent.config.conf as cfgfile from copy import deepcopy from datetime import datetime import confluent.util as util +import msgpack import json try: @@ -84,6 +85,13 @@ def _htmlify_structure(indict): return ret + '' +def msg_deserialize(packed): + m = msgpack.unpackb(packed) + cls = globals()[m[0]] + if issubclass(cls, ConfluentMessage) or issubclass(cls, ConfluentNodeError): + return cls(*m[1:]) + raise Exception("Unknown shenanigans") + class ConfluentMessage(object): apicode = 200 readonly = False @@ -105,6 +113,15 @@ class ConfluentMessage(object): jsonsnippet = json.dumps(datasource, sort_keys=True, separators=(',', ':'))[1:-1] return jsonsnippet + def serialize(self): + msg = [self.__class__.__name__] + msg.extend(self.myargs) + return msgpack.packb(msg) + + @classmethod + def deserialize(cls, data): + return cls(*data) + def raw(self): """Return pythonic representation of the response. @@ -211,6 +228,14 @@ class ConfluentNodeError(object): self.node = node self.error = errorstr + def serialize(self): + return msgpack.packb( + [self.__class__.__name__, self.node, self.error]) + + @classmethod + def deserialize(cls, data): + return cls(*data) + def raw(self): return {'databynode': {self.node: {'errorcode': self.apicode, 'error': self.error}}} @@ -271,6 +296,7 @@ class ConfluentTargetInvalidCredentials(ConfluentNodeError): class DeletedResource(ConfluentMessage): notnode = True def __init__(self, resource): + self.myargs = [resource] self.kvpairs = {'deleted': resource} def strip_node(self, node): @@ -282,6 +308,7 @@ class CreatedResource(ConfluentMessage): readonly = True def __init__(self, resource): + self.myargs = [resource] self.kvpairs = {'created': resource} def strip_node(self, node): @@ -293,6 +320,7 @@ class RenamedResource(ConfluentMessage): readonly = True def __init__(self, oldname, newname): + self.myargs = (oldname, newname) self.kvpairs = {'oldname': oldname, 'newname': newname} def strip_node(self, node): @@ -301,6 +329,7 @@ class RenamedResource(ConfluentMessage): class RenamedNode(ConfluentMessage): def __init__(self, name, rename): + self.myargs = (name, rename) self.desc = 'New Name' kv = {'rename': {'value': rename}} self.kvpairs = {name: kv} @@ -311,13 +340,16 @@ class AssignedResource(ConfluentMessage): readonly = True def __init__(self, resource): + self.myargs = [resource] self.kvpairs = {'assigned': resource} + class ConfluentChoiceMessage(ConfluentMessage): valid_values = set() valid_paramset = {} def __init__(self, node, state): + self.myargs = (node, state) self.stripped = False self.kvpairs = { node: { @@ -391,6 +423,7 @@ class LinkRelation(ConfluentMessage): class ChildCollection(LinkRelation): def __init__(self, collname, candelete=False): + self.myargs = (collname, candelete) self.rel = 'item' self.href = collname self.candelete = candelete @@ -533,10 +566,12 @@ class DetachMedia(ConfluentMessage): class Media(ConfluentMessage): def __init__(self, node, media): + self.myargs = (node, media) self.kvpairs = {node: {'name': media.name, 'url': media.url}} class SavedFile(ConfluentMessage): def __init__(self, node, file): + self.myargs = (node, file) self.kvpairs = {node: {'filename': file}} class InputAlertData(ConfluentMessage): @@ -1108,6 +1143,7 @@ class BootDevice(ConfluentChoiceMessage): } def __init__(self, node, device, bootmode='unspecified', persistent=False): + self.myargs = (node, device, bootmode, persistent) if device not in self.valid_values: raise Exception("Invalid boot device argument passed in:" + repr(device)) @@ -1206,10 +1242,10 @@ class PowerState(ConfluentChoiceMessage): def __init__(self, node, state, oldstate=None): super(PowerState, self).__init__(node, state) + self.myargs = (node, state, oldstate) if oldstate is not None: self.kvpairs[node]['oldstate'] = {'value': oldstate} - class BMCReset(ConfluentChoiceMessage): valid_values = set([ 'reset', @@ -1225,13 +1261,13 @@ class NTPEnabled(ConfluentChoiceMessage): def __init__(self, node, enabled): self.stripped = False + self.myargs = (node, enabled) self.kvpairs = { node: { 'state': {'value': str(enabled)}, } } - class EventCollection(ConfluentMessage): """A collection of events @@ -1251,6 +1287,8 @@ class EventCollection(ConfluentMessage): def __init__(self, events=(), name=None): eventdata = [] self.notnode = name is None + self.myname = name + self.myargs = (eventdata, name) for event in events: entry = { 'id': event.get('id', None), @@ -1278,6 +1316,10 @@ class AsyncCompletion(ConfluentMessage): self.stripped = True self.notnode = True + @classmethod + def deserialize(cls): + raise Exception("Not supported") + def raw(self): return {'_requestdone': True} @@ -1288,6 +1330,10 @@ class AsyncMessage(ConfluentMessage): self.notnode = True self.msgpair = pair + @classmethod + def deserialize(cls): + raise Exception("Not supported") + def raw(self): rsp = self.msgpair[1] rspdict = None @@ -1319,6 +1365,7 @@ class User(ConfluentMessage): self.desc = 'foo' self.stripped = False self.notnode = name is None + self.myargs = (uid, username, privilege_level, name, expiration) kvpairs = {'username': {'value': username}, 'password': {'value': '', 'type': 'password'}, 'privilege_level': {'value': privilege_level}, @@ -1338,7 +1385,11 @@ class UserCollection(ConfluentMessage): self.notnode = name is None self.desc = 'list of users' userlist = [] + self.myargs = (userlist, name) for user in users: + if 'username' in user: # processing an already translated dict + userlist.append(user) + continue entry = { 'uid': user['uid'], 'username': user['name'], @@ -1352,8 +1403,10 @@ class UserCollection(ConfluentMessage): self.kvpairs = {name: {'users': userlist}} + class AlertDestination(ConfluentMessage): def __init__(self, ip, acknowledge=False, acknowledge_timeout=None, retries=0, name=None): + self.myargs = (ip, acknowledge, acknowledge_timeout, retries, name) self.desc = 'foo' self.stripped = False self.notnode = name is None @@ -1418,7 +1471,11 @@ class SensorReadings(ConfluentMessage): def __init__(self, sensors=(), name=None): readings = [] self.notnode = name is None + self.myargs = (readings, name) for sensor in sensors: + if isinstance(sensor, dict): + readings.append(sensor) + continue sensordict = {'name': sensor.name} if hasattr(sensor, 'value'): sensordict['value'] = sensor.value @@ -1443,6 +1500,7 @@ class Firmware(ConfluentMessage): readonly = True def __init__(self, data, name): + self.myargs = (data, name) self.notnode = name is None self.desc = 'Firmware information' if self.notnode: @@ -1455,6 +1513,7 @@ class KeyValueData(ConfluentMessage): readonly = True def __init__(self, kvdata, name=None): + self.myargs = (kvdata, name) self.notnode = name is None if self.notnode: self.kvpairs = kvdata @@ -1464,6 +1523,7 @@ class KeyValueData(ConfluentMessage): class Array(ConfluentMessage): def __init__(self, name, disks=None, raid=None, volumes=None, id=None, capacity=None, available=None): + self.myargs = (name, disks, raid, volumes, id, capacity, available) self.kvpairs = { name: { 'type': 'array', @@ -1478,6 +1538,7 @@ class Array(ConfluentMessage): class Volume(ConfluentMessage): def __init__(self, name, volname, size, state, array, stripsize=None): + self.myargs = (name, volname, size, state, array, stripsize) self.kvpairs = { name: { 'type': 'volume', @@ -1518,6 +1579,8 @@ class Disk(ConfluentMessage): def __init__(self, name, label=None, description=None, diskid=None, state=None, serial=None, fru=None, array=None): + self.myargs = (name, label, description, diskid, state, + serial, fru, array) state = self._normalize_state(state) self.kvpairs = { name: { @@ -1539,6 +1602,7 @@ class LEDStatus(ConfluentMessage): readonly = True def __init__(self, data, name): + self.myargs = (data, name) self.notnode = name is None self.desc = 'led status' @@ -1553,6 +1617,7 @@ class NetworkConfiguration(ConfluentMessage): def __init__(self, name=None, ipv4addr=None, ipv4gateway=None, ipv4cfgmethod=None, hwaddr=None): + self.myargs = (name, ipv4addr, ipv4gateway, ipv4cfgmethod, hwaddr) self.notnode = name is None self.stripped = False @@ -1573,6 +1638,7 @@ class HealthSummary(ConfluentMessage): valid_values = valid_health_values def __init__(self, health, name=None): + self.myargs = (health, name) self.stripped = False self.notnode = name is None if health not in self.valid_values: @@ -1585,6 +1651,7 @@ class HealthSummary(ConfluentMessage): class Attributes(ConfluentMessage): def __init__(self, name=None, kv=None, desc=''): + self.myargs = (name, kv, desc) self.desc = desc nkv = {} self.notnode = name is None @@ -1605,6 +1672,7 @@ class ConfigSet(Attributes): class ListAttributes(ConfluentMessage): def __init__(self, name=None, kv=None, desc=''): + self.myargs = (name, kv, desc) self.desc = desc self.notnode = name is None if self.notnode: @@ -1615,6 +1683,7 @@ class ListAttributes(ConfluentMessage): class MCI(ConfluentMessage): def __init__(self, name=None, mci=None): + self.myargs = (name, mci) self.notnode = name is None self.desc = 'BMC identifier' @@ -1627,6 +1696,7 @@ class MCI(ConfluentMessage): class Hostname(ConfluentMessage): def __init__(self, name=None, hostname=None): + self.myargs = (name, hostname) self.notnode = name is None self.desc = 'BMC hostname' @@ -1638,6 +1708,7 @@ class Hostname(ConfluentMessage): class DomainName(ConfluentMessage): def __init__(self, name=None, dn=None): + self.myargs = (name, dn) self.notnode = name is None self.desc = 'BMC domain name' @@ -1652,6 +1723,7 @@ class NTPServers(ConfluentMessage): readonly = True def __init__(self, name=None, servers=None): + self.myargs = (name, servers) self.notnode = name is None self.desc = 'NTP Server' @@ -1666,6 +1738,7 @@ class NTPServers(ConfluentMessage): class NTPServer(ConfluentMessage): def __init__(self, name=None, server=None): + self.myargs = (name, server) self.notnode = name is None self.desc = 'NTP Server' @@ -1682,6 +1755,7 @@ class License(ConfluentMessage): readonly = True def __init__(self, name=None, kvm=None, feature=None, state=None): + self.myargs = (name, kvm, feature, state) self.notnode = name is None self.desc = 'License' @@ -1697,6 +1771,7 @@ class CryptedAttributes(Attributes): defaulttype = 'password' def __init__(self, name=None, kv=None, desc=''): + self.myargs = (name, kv, desc) # for now, just keep the dictionary keys and discard crypt value self.desc = desc nkv = {} From 79afd174c93b25a8ccf6800d1ebff2c05345488a Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 21 Jan 2020 11:28:08 -0500 Subject: [PATCH 2/3] Add serialization to ConfluentExceptions In the same manner that messages are handled, handle non-messages content. --- confluent_server/confluent/exceptions.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/confluent_server/confluent/exceptions.py b/confluent_server/confluent/exceptions.py index d9a7f5ab..619d24b5 100644 --- a/confluent_server/confluent/exceptions.py +++ b/confluent_server/confluent/exceptions.py @@ -17,7 +17,15 @@ import base64 import json +import msgpack +def deserilaize_exc(msg): + excd = msgpack.unpackb(msg) + if excd[0] not in globals(): + return False + if not issubclass(excd[0], ConfluentException): + return False + return globals(excd[0])(*excd[1]) class ConfluentException(Exception): apierrorcode = 500 @@ -27,6 +35,9 @@ class ConfluentException(Exception): errstr = ' - '.join((self._apierrorstr, str(self))) return json.dumps({'error': errstr }) + def serialize(self): + return msgpack.packb([self.__class__.__name__, [str(self)]]) + @property def apierrorstr(self): if str(self): @@ -104,6 +115,7 @@ class PubkeyInvalid(ConfluentException): def __init__(self, text, certificate, fingerprint, attribname, event): super(PubkeyInvalid, self).__init__(self, text) + self.myargs = (text, certificate, fingerprint, attribname, event) self.fingerprint = fingerprint self.attrname = attribname self.message = text @@ -117,6 +129,9 @@ class PubkeyInvalid(ConfluentException): 'certificate': certtxt} self.errorbody = json.dumps(bodydata) + def serialize(self): + return msgpack.packb([self.__class__.__name__, self.myargs]) + def get_error_body(self): return self.errorbody From 3bf083deb3a1f572ff6ccf9d9a3caf4840f731b2 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 21 Jan 2020 14:15:14 -0500 Subject: [PATCH 3/3] Stage 3 of msgpack for dispatch This may complete the dispatch portion of the msgpack migration. --- confluent_server/confluent/core.py | 48 ++++++++++++++++-------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 575d2e19..f1fba399 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -60,19 +60,14 @@ import eventlet.greenpool as greenpool import eventlet.green.ssl as ssl import eventlet.queue as queue import itertools +import msgpack import os -try: - import cPickle as pickle - pargs = {} -except ImportError: - import pickle - pargs = {'encoding': 'utf-8'} import socket import struct import sys pluginmap = {} -dispatch_plugins = (b'ipmi', u'ipmi') +dispatch_plugins = (b'ipmi', u'ipmi', b'redfish', u'redfish') def seek_element(currplace, currkey): @@ -689,10 +684,13 @@ def handle_dispatch(connection, cert, dispatch, peername): cfm.get_collective_member(peername)['fingerprint'], cert): connection.close() return - pversion = 0 - if bytearray(dispatch)[0] == 0x80: - pversion = bytearray(dispatch)[1] - dispatch = pickle.loads(dispatch, **pargs) + if dispatch[0:2] != b'\x01\x03': # magic value to indicate msgpack + # We only support msgpack now + # The magic should preclude any pickle, as the first byte can never be + # under 0x20 or so. + connection.close() + return + dispatch = msgpack.unpackb(dispatch[2:]) configmanager = cfm.ConfigManager(dispatch['tenant']) nodes = dispatch['nodes'] inputdata = dispatch['inputdata'] @@ -736,10 +734,16 @@ def handle_dispatch(connection, cert, dispatch, peername): def _forward_rsp(connection, res, pversion): try: - r = pickle.dumps(res, protocol=pversion) - except TypeError: - r = pickle.dumps(Exception( - 'Cannot serialize error, check collective.manager error logs for details' + str(res)), protocol=pversion) + r = res.serialize() + except AttributeError: + if isinstance(res, Exception): + r = msgpack.packb(['Exception', str(res)]) + else: + r = msgpack.packb( + ['Exception', 'Unable to serialize response ' + repr(res)]) + except Exception: + r = msgpack.packb( + ['Exception', 'Unable to serialize response ' + repr(res)]) rlen = len(r) if not rlen: return @@ -978,10 +982,10 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata, pvers = 2 tlvdata.recv(remote) myname = collective.get_myname() - dreq = pickle.dumps({'name': myname, 'nodes': list(nodes), - 'path': element,'tenant': configmanager.tenant, - 'operation': operation, 'inputdata': inputdata}, - protocol=pvers) + dreq = b'\x01\x03' + msgpack.packb( + {'name': myname, 'nodes': list(nodes), + 'path': element,'tenant': configmanager.tenant, + 'operation': operation, 'inputdata': inputdata}) tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}}) remote.sendall(dreq) while True: @@ -1029,9 +1033,9 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata, return rsp += nrsp try: - rsp = pickle.loads(rsp, **pargs) - except UnicodeDecodeError: - rsp = pickle.loads(rsp, encoding='latin1') + rsp = msg.msg_deserialize(rsp) + except Exception: + rsp = exc.deserialize_exc(rsp) if isinstance(rsp, Exception): raise rsp yield rsp