munggoggo.behaviour

Async I/O behaviour that can be started/stopped/shutdown. Think of it as service.

The following behaviours are provided as implementation examples: EmptyBehav, SqlBehav

class behaviour.Behaviour(core, *, beacon: Optional[mode.utils.types.trees.NodeT] = None, loop: Optional[asyncio.events.AbstractEventLoop] = None, binding_keys: Optional[list] = None, configure_rpc: bool = False)[source]

Behaviour is a Service which defines set of actions

All incoming messages are queued into input mailbox.

Outgoing messages can either be broadcast to all listening agents or point-to-point sent to one agent or sent to a message topic (PubSub pattern)

async direct_send(msg: str, msg_type: str, target: Optional[str] = None, correlation_id: Optional[str] = None)[source]

Sends message to default exchange, 1:1 communication

async dispatch(msg: aio_pika.message.IncomingMessage, handlers: Optional[handler.Registry] = None) → None[source]

Dispatch message to handler.

async enqueue(message: aio_pika.message.Message)[source]

Enqueues a message in the behaviour’s incoming mailbox

async example_rpc_method(x, y, flag=None, **kwargs)[source]

Sample RPC method: multiplies to numbers

property exit_code

Returns the exit_code of the behaviour. It only works when the behaviour is done or killed, otherwise it raises an exception.

Returns:

object: the exit code of the behaviour

async fanout_send(msg: str, msg_type: str)[source]

Sends message to fanout exchange, 1:n communication

async get_and_dispatch(timeout: Optional[float] = None)[source]

Reads a message from inbox and dispatches it to known handler.

mailbox_size() → int[source]

returns mailbox size

async on_end()[source]

Coroutine called after the behaviour is done or killed. To be overwritten by user.

async on_shutdown()[source]

Service is being stopped/restarted.

async on_start()[source]

Coroutine called before the behaviour is started.

async on_stop() → None[source]

Service is being stopped/restarted.

async publish(msg: str, routing_key: str)[source]

Publishes message to topic, 1:n communication

async receive(timeout: Optional[float] = None) → aio_pika.message.IncomingMessage[source]

Receives a message from inbox mailbox.

If timeout is not None it returns the message or “None” after timeout is done.

receive_all() → AsyncIterable[aio_pika.message.IncomingMessage][source]

Receives all messages from inbox mailbox.

async run()[source]

Body of the behaviour, runs every step of runloop To be overwritten by user.

async setup()[source]

to be overwritten by user

async teardown()[source]

to be overwritten by user

exception behaviour.BehaviourNotFinishedException[source]
class behaviour.EmptyBehav(core, *, beacon: Optional[mode.utils.types.trees.NodeT] = None, loop: Optional[asyncio.events.AbstractEventLoop] = None, binding_keys: Optional[list] = None, configure_rpc: bool = False)[source]

Do nothing, just overwrite methods.

async on_end()[source]

Coroutine called after the behaviour is done or killed. To be overwritten by user.

async on_start()[source]

Coroutine called before the behaviour is started.

async run()[source]

Body of the behaviour, runs every step of runloop To be overwritten by user.

class behaviour.SqlBehav(core, *, beacon: Optional[mode.utils.types.trees.NodeT] = None, loop: Optional[asyncio.events.AbstractEventLoop] = None, binding_keys: Optional[list] = None, configure_rpc: bool = False)[source]

Stores all messages arriving in mailbox to SQL-DB (sqlite) as json blob

async on_end()[source]

Coroutine called after the behaviour is done or killed. To be overwritten by user.

async run()[source]

Body of the behaviour, runs every step of runloop To be overwritten by user.

async setup()[source]

to be overwritten by user

class behaviour.SystemBehaviour[source]

An enumeration.