munggoggo.agent

Async I/O agent, stop with CTRL-C. Inherits from Core.

You can attach configuration parameters to an agent on self.config via config dict parameter.

class agent.Agent(*, identity=None, config=None, clock=None, channel_number: Optional[int] = None, beacon: Optional[mode.utils.types.trees.NodeT] = None, loop: Optional[asyncio.events.AbstractEventLoop] = None)[source]

Demo Agent

Use this agent as an example with simple behaviour. Agent identity must be unique within agent network. Run it with the Worker class:

from mode import Worker

worker = Worker(
    Agent(identity='agent'),
    loglevel="info",
    logfile=None,
    daemon=True,
)

worker.execute_from_commandline()

Stop the agent wih CTRL-C

async setup() → None[source]

setup is run during agent initialization Put in your all setup and behaviours you need.

munggoggo.core

Core functionality of an agent.

  • provides a configurable FIFO cache of all received and sent messages (TraceStore: munggoggo.trace)

  • holds a cache of all peers and their last update timestamp (keepalive message)

class core.Core(*, identity=None, config=None, clock=None, channel_number: Optional[int] = None, beacon: Optional[mode.utils.types.trees.NodeT] = None, loop: Optional[asyncio.events.AbstractEventLoop] = None)[source]

Docstring for Core.

Every queue is automatically bound to the default exchange with a routing key which is the same as the queue name. All async tasks must only be started in on_start method because only there the eventloop is configured.

async call(msg: str, target: Optional[str] = None) → str[source]

Sends PRC call

async configure_exchanges()[source]

Configures the exchanges: TOPIC, FANOUT

async direct_send(msg: str, msg_type: RmqMessageTypes.name, target: str = None, correlation_id: str = None, headers: dict = None) → None[source]

Sends message to default exchange

async fanout_send(msg: str, msg_type: RmqMessageTypes.name, correlation_id: str = None, headers: dict = None) → None[source]

Sends message to fanout exchange

get_behaviour(name: str) → Optional[mode.types.services.ServiceT][source]

Returns the behaviour

has_behaviour(behaviour)[source]

Tests for behaviour

list_behaviour()[source]

Lists all behaviours

async list_peers() → trace.TraceStore[source]

list all peers which have responded to the latest PING

async on_first_start()[source]

Service started for the first time in this process.

async on_message(message: aio_pika.message.IncomingMessage)[source]

Handle incoming messages

Well defined types (RmqMessageTypes) are sent to system handlers, all others are enqueued to behaviour mailbox for user handling.

async on_shutdown()[source]

Service is being stopped/restarted.

async on_start()[source]

Service is starting.

async on_started()[source]

Service has started.

async on_stop()[source]

Stops an agent and kills all its behaviours.

async periodic_update_peers(interval)[source]

Sends periodic keepalive message to all peers (if UPDATE_PEER_INTERVAL is set) and publishes the latest peer responses as peer list to websocket.

async publish(msg: str, routing_key: str, headers: Optional[dict] = None) → None[source]

Publishes message to topic

async setup()[source]

to be overwritten by user

async stop() → None[source]

Stop the service.

async teardown()[source]

” To be overwritten by user

class core.MyService(identity, *, beacon: Optional[mode.utils.types.trees.NodeT] = None, loop: Optional[asyncio.events.AbstractEventLoop] = None)[source]

Base class for agent and behaviours Defines async service framework.

munggoggo.trace

Configurable fixed-sized queue to store RabbitMQ messages (TraceStore). Very handy for debugging.

class trace.TraceStore(size)[source]

Stores and allows queries about events.

all(limit=None)[source]

Returns all the events, until a limit if defined

Args:

limit (int, optional): the max length of the events to return (Default value = None)

Returns:

list: a list of events

append(event, category=None)[source]

Adds a new event to the trace store. The event may hava a category

Args:

event (spade.message.Message): the event to be stored category (str, optional): a category to classify the event (Default value = None)

filter(limit=None, app_id=None, category=None)[source]

Returns the events that match the filters

Args:

limit (int, optional): the max length of the events to return (Default value = None) app_id (str, optional): only events that have been sent or received from ‘app_id’ (Default value = None) category (str, optional): only events belonging to the category (Default value = None)

Returns:

list: a list of filtered events

len()[source]

Length of the store

Returns:

int: the size of the trace store

received(limit=None)[source]

Returns all the events that have been received (excluding sent events), until a limit if defined

Args:

limit (int, optional): the max length of the events to return (Default value = None)

Returns:

list: a list of received events

reset()[source]

Resets the trace store