munggoggo.messages

I decided not to go for pickle, although it would simplify the serialization process significantly due to its capability to handle any python objects.

However, in order to allow for communication with non-python world I went for JSON encoding and wrapping objects into a common data structure based on python dataclasses.

Message Definition and Serialization:

Self describing data serialization format based on json and python dataclasses.

Two stage serialization

  1. serialize payload by using dataclasses and marshmallow (dataclass with dataclass_json decorator)

  2. wrap result into RPC dataclass with two fields: {c_type: str, c_data: json_payload}

Deserialization:

  1. extract the class_type of payload from the SerializedObject (obj.c_type)

  2. deserizalize the json_payload with the class_type as schema into the respective python object

Caveat when serializing dataclasses:

As specified in the datetime docs, if your datetime object is naive, it will assume your system local timezone when calling .timestamp(). JSON nunbers corresponding to a datetime field in your dataclass are decoded into a datetime-aware object, with tzinfo set to your system local timezone. Thus, if you encode a datetime-naive object, you will decode into a datetime-aware object. This is important, because encoding and decoding won’t strictly be inverses.

When using tz-aware objects everything should be fine

Rabbit MQ Message attributes and their semantic here:

body:            payload
headers:         message headers
headers_raw:     message raw headers
content_type:    application/json
content_encoding:
delivery_mode:   delivery mode
priority:        priority (not used)
correlation_id:  correlation id
reply_to:        reply to
routing_key:     routing_key (e.g. topic)
expiration:      expiration in seconds (or datetime or timedelta)
message_id:      message id
timestamp:       timestamp
type:            type  -> (RmqMessageTypes: not rmq relevant, for application to communicate semantics)
user_id:         user id -> (rmq user, e.g. guest)
app_id:          app id -> (sender identity)
class messages.ControlMessage(command: str, args: List[str], kwargs: Dict)[source]
class messages.CoreStatus(name: str, state: str, behaviours: List[messages.ServiceStatus])[source]
class messages.DemoData(message: str, date: datetime.datetime = None)[source]
class messages.DemoObj(message: str = '', date: datetime.datetime = None)[source]
class messages.ListBehav(behavs: List[str] = <factory>)[source]
class messages.ListTraceStore(limit: Union[int, NoneType] = None, app_id: Union[str, NoneType] = None, category: Union[str, NoneType] = None, traces: List[str] = <factory>)[source]
class messages.ManageBehav(behav: str = None, command: str = None, result: str = '')[source]
class messages.Ping(ping: str = 'ping')[source]
class messages.PingControl[source]
class messages.Pong(pong: str = 'pong')[source]
class messages.PongControl(status: messages.CoreStatus)[source]
class messages.RmqMessageTypes[source]

Defines the RMQ messages which are handled by system

User can define arbitrary message types by using the msg_type parameter of fanout_send/direct_send methods.

class messages.RpcError(error: str = '')[source]
class messages.RpcMessage(c_type: str, c_data: str, request_type: str = <RpcMessageTypes.RPC_REQUEST: 1>)[source]
class messages.RpcMessageTypes[source]

An enumeration.

class messages.RpcObject[source]
class messages.SerializableObject[source]
static deserialize(msg: str, msg_type: Type[SerializableObject] = None) → SerializableDataclass[source]

Deserializes SerializedObject into correct type

serialize(to_dict=False) → str[source]

Serializes dataclass including type into SerializedObject

class messages.SerializedObject(c_type: str, c_data: str)[source]
class messages.ServiceStatus(name: str, state: str)[source]
class messages.Shutdown(result: str = '')[source]
class messages.TraceStoreMessage(body: str = '', body_size: int = 0, headers: dict = None, content_type: str = 'application/json', content_encoding: str = '', delivery_mode: int = 0, priority: int = 0, correlation_id: str = '', reply_to: str = '', expiration: datetime.datetime = None, message_id: str = '', timestamp: <module 'time' (built-in)> = None, type: str = '', user_id: str = '', app_id: str = '', target: str = '', cluster_id: str = '', consumer_tag: str = '', delivery_tag: int = 0, exchange: str = '', redelivered: bool = False, routing_key: str = '')[source]
exception messages.WrongMessageFormatException[source]
messages.to_rpc(obj: SerializableDataclass) → str[source]

Creates self describing serialized json RPC object