Source code for messages

import importlib
import json
from json import JSONDecodeError

import sys

import time
import logging
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import TypeVar, List, Optional, Tuple, Type, Dict

import pytz
from aio_pika import Message, IncomingMessage
from dataclasses_json import dataclass_json

_log = logging.getLogger(__name__)

""" Message Definition and Serialization:

    Self describing data serialization format based on json and python dataclasses.

    Two stage serialization:
    1. serialize payload by using dataclasses and marshmallow (dataclass with dataclass_json decorator)
    2. wrap result into RPC dataclass with two fields: {c_type: str, c_data: json_payload}

    This allows during deserialization to
    1. extract the class_type of payload from the SerializedObject (obj.c_type)
    2. deserizalize the json_payload with the class_type into the usable python object

    Caveat when serializing dataclasses:
    As specified in the datetime docs, if your datetime object is naive, it will assume your system local timezone
    when calling .timestamp(). JSON nunbers corresponding to a datetime field in your dataclass are decoded into
    a datetime-aware object, with tzinfo set to your system local timezone.
    Thus, if you encode a datetime-naive object, you will decode into a datetime-aware object.
    This is important, because encoding and decoding won't strictly be inverses.

    When using tz-aware objects everything should be fine

    Rabbit MQ Message attributes and their semantic:
        body: payload
        headers: message headers
        headers_raw: message raw headers
        content_type: application/json
        content_encoding: string
        delivery_mode: delivery mode
        priority: priority (not used)
        correlation_id: correlation id
        reply_to: reply to
        routing_key: routing_key (e.g. topic)
        expiration: expiration in seconds (or datetime or timedelta)
        message_id: message id
        timestamp: timestamp
        type: type  -> (RmqMessageTypes: not rmq relevant, for application to communicate semantics)
        user_id: user id -> (rmq user, e.g. guest)
        app_id: app id -> (sender identity)
"""

# dataclass with dataclass_json decorator
SerializableDataclass = TypeVar("SerializableDataclass")


def convert_to_utc(obj):
    # convert from local timezone to UTC
    for field in obj.__class__.__dataclass_fields__.values():
        if field.type is datetime:
            dt = getattr(obj, field.name)
            setattr(obj, field.name, dt.astimezone(pytz.UTC))
    return obj


[docs]class RmqMessageTypes(Enum): """ Defines the RMQ messages which are handled by system User can define arbitrary message types by using the msg_type parameter of fanout_send/direct_send methods. """ CONTROL = 0 RPC = 1 PUBSUB = 2
[docs]class RpcMessageTypes(Enum): RPC_REQUEST = 1 RPC_RESPONSE = 2
[docs]@dataclass_json @dataclass class RpcMessage: c_type: str c_data: str request_type: str = RpcMessageTypes.RPC_REQUEST # 'request', 'response' def is_request(self): return self.request_type is RpcMessageTypes.RPC_REQUEST def is_response(self): return self.request_type is RpcMessageTypes.RPC_RESPONSE
[docs]@dataclass_json @dataclass() class RpcObject: def to_rpc(self, rt: RpcMessageTypes = None) -> str: rpc_message = RpcMessage(c_type=self.__class__.__name__, c_data=self.to_json()) if rt is not None: rpc_message.request_type = rt return rpc_message.to_json() @staticmethod def from_rpc(msg: str) -> Tuple[RpcMessageTypes, SerializableDataclass]: rpc_msg = RpcMessage.from_json(msg) module = importlib.import_module("messages") class_ = getattr(module, rpc_msg.c_type) rpc_obj = class_.from_json(rpc_msg.c_data) rpc_obj = convert_to_utc(rpc_obj) return RpcMessageTypes(rpc_msg.request_type), rpc_obj
[docs]def to_rpc(obj: SerializableDataclass) -> str: """ Creates self describing serialized json RPC object """ return RpcMessage(c_type=obj.__class__.__name__, c_data=obj.to_json()).to_json()
def from_rpc(msg: str) -> SerializableDataclass: rpc_msg = RpcMessage.from_json(msg) module = importlib.import_module("messages") class_ = getattr(module, rpc_msg.c_type) rpc_obj = class_.from_json(rpc_msg.c_data) return rpc_obj
[docs]@dataclass_json @dataclass() class Ping(RpcObject): ping: str = "ping"
[docs]@dataclass_json @dataclass() class Pong(RpcObject): pong: str = "pong"
[docs]@dataclass_json @dataclass() class RpcError(RpcObject): error: str = ""
[docs]@dataclass_json @dataclass() class DemoObj(RpcObject): message: str = "" date: datetime = None
[docs]@dataclass_json @dataclass() class ListBehav(RpcObject): behavs: List[str] = field(default_factory=list)
[docs]@dataclass_json @dataclass() class ManageBehav(RpcObject): behav: str = None command: str = None result: str = ""
[docs]@dataclass_json @dataclass() class ListTraceStore(RpcObject): limit: Optional[int] = None app_id: Optional[str] = None category: Optional[str] = None traces: List[str] = field(default_factory=list)
[docs]@dataclass_json @dataclass() class Shutdown(RpcObject): result: str = ""
# @api.schema("ExampleMethodParameter") # class ExampleMethodParameterSchema(Schema): # x = fields.Float() # y = fields.Float()
[docs]@dataclass_json @dataclass() class TraceStoreMessage: body: str = "" body_size: int = 0 headers: dict = None # rmq: headers_raw content_type: str = "application/json" content_encoding: str = "" delivery_mode: int = 0 priority: int = 0 correlation_id: str = "" reply_to: str = "" expiration: datetime = None message_id: str = "" timestamp: time = None type: str = "" user_id: str = "" app_id: str = "" target: str = "" cluster_id: str = "" consumer_tag: str = "" delivery_tag: int = 0 exchange: str = "" redelivered: bool = False routing_key: str = "" @staticmethod def from_msg(msg: IncomingMessage): self = TraceStoreMessage( body=msg.body.decode(), body_size=msg.body_size, headers=msg.headers_raw, content_type=msg.content_type, content_encoding=msg.content_encoding, delivery_mode=msg.delivery_mode, priority=msg.priority, correlation_id=msg.correlation_id, reply_to=msg.reply_to, expiration=msg.expiration, message_id=msg.message_id, timestamp=time.mktime(msg.timestamp), type=msg.type, user_id=msg.user_id, app_id=msg.app_id, cluster_id=msg.cluster_id, consumer_tag=msg.consumer_tag, delivery_tag=msg.delivery_tag, exchange=msg.exchange, redelivered=msg.redelivered, routing_key=msg.routing_key, ) return self
[docs]class WrongMessageFormatException(Exception): pass
[docs]@dataclass_json @dataclass class SerializedObject: c_type: str c_data: str
[docs]@dataclass_json @dataclass() class SerializableObject:
[docs] def serialize(self, to_dict=False) -> str: """ Serializes dataclass including type into SerializedObject """ obj = SerializedObject( c_type=self.__class__.__name__, c_data=self.to_json(), # method from dataclass_json ) return obj.to_json()
[docs] @staticmethod def deserialize( msg: str, msg_type: Type["SerializableObject"] = None ) -> SerializableDataclass: """ Deserializes SerializedObject into correct type """ obj = None serialized_obj = SerializableObject.extract_serialized_obj(msg) # load msg_type from module messages.py for deserialization if serialized_obj is not None: if not msg_type: module = importlib.import_module("messages") try: msg_type = getattr(module, serialized_obj.c_type) except AttributeError as e: _log.error( f"Object type unknown: {serialized_obj.c_type}.", exc_info=sys.exc_info(), ) # raise WrongMessageFormatException(e).with_traceback(sys.exc_info()[2]) return obj obj = msg_type.from_json(serialized_obj.c_data) obj = convert_to_utc(obj) return obj
@staticmethod def extract_serialized_obj(msg): try: serialized_obj = SerializedObject.from_json(msg) return serialized_obj except (KeyError, JSONDecodeError) as e: _log.error( f"Wrong message format: {msg}. Expected {{c_type: str, c_data: str}}.", exc_info=sys.exc_info(), ) # raise WrongMessageFormatException(e).with_traceback(sys.exc_info()[2]) # return SerializedObject(c_type="NoneTypeOrWrongMessageFormat", c_data="") @classmethod def extract_type(cls, msg: str) -> str: serialized_obj = cls.extract_serialized_obj(msg) return "NoneType" if serialized_obj is None else serialized_obj.c_type
[docs]@dataclass_json @dataclass class DemoData(SerializableObject): message: str date: datetime = None
[docs]@dataclass_json @dataclass class ControlMessage(SerializableObject): command: str args: List[str] kwargs: Dict
[docs]@dataclass_json @dataclass class PingControl(SerializableObject): pass
[docs]@dataclass_json @dataclass class ServiceStatus: name: str state: str
[docs]@dataclass_json @dataclass class CoreStatus: name: str state: str behaviours: List[ServiceStatus]
[docs]@dataclass_json @dataclass class PongControl(SerializableObject): status: CoreStatus