munggoggo.agent¶
Async I/O agent, stop with CTRL-C. Inherits from Core
.
You can attach configuration parameters to an agent on self.config
via config dict parameter.
-
class
agent.
Agent
(*, identity=None, config=None, clock=None, channel_number: Optional[int] = None, beacon: Optional[mode.utils.types.trees.NodeT] = None, loop: Optional[asyncio.events.AbstractEventLoop] = None)[source]¶ Demo Agent
Use this agent as an example with simple behaviour. Agent identity must be unique within agent network. Run it with the Worker class:
from mode import Worker worker = Worker( Agent(identity='agent'), loglevel="info", logfile=None, daemon=True, ) worker.execute_from_commandline()
Stop the agent wih CTRL-C
munggoggo.core¶
Core functionality of an agent.
provides a configurable FIFO cache of all received and sent messages (TraceStore: munggoggo.trace)
holds a cache of all peers and their last update timestamp (keepalive message)
-
class
core.
Core
(*, identity=None, config=None, clock=None, channel_number: Optional[int] = None, beacon: Optional[mode.utils.types.trees.NodeT] = None, loop: Optional[asyncio.events.AbstractEventLoop] = None)[source]¶ Docstring for Core.
Every queue is automatically bound to the default exchange with a routing key which is the same as the queue name. All async tasks must only be started in on_start method because only there the eventloop is configured.
-
async
direct_send
(msg: str, msg_type: RmqMessageTypes.name, target: str = None, correlation_id: str = None, headers: dict = None) → None[source]¶ Sends message to default exchange
-
async
fanout_send
(msg: str, msg_type: RmqMessageTypes.name, correlation_id: str = None, headers: dict = None) → None[source]¶ Sends message to fanout exchange
-
async
list_peers
() → trace.TraceStore[source]¶ list all peers which have responded to the latest PING
-
async
on_message
(message: aio_pika.message.IncomingMessage)[source]¶ Handle incoming messages
Well defined types (RmqMessageTypes) are sent to system handlers, all others are enqueued to behaviour mailbox for user handling.
-
async
periodic_update_peers
(interval)[source]¶ Sends periodic keepalive message to all peers (if UPDATE_PEER_INTERVAL is set) and publishes the latest peer responses as peer list to websocket.
-
async
munggoggo.trace¶
Configurable fixed-sized queue to store RabbitMQ messages (TraceStore). Very handy for debugging.
-
class
trace.
TraceStore
(size)[source]¶ Stores and allows queries about events.
-
all
(limit=None)[source]¶ Returns all the events, until a limit if defined
- Args:
limit (int, optional): the max length of the events to return (Default value = None)
- Returns:
list: a list of events
-
append
(event, category=None)[source]¶ Adds a new event to the trace store. The event may hava a category
- Args:
event (spade.message.Message): the event to be stored category (str, optional): a category to classify the event (Default value = None)
-
filter
(limit=None, app_id=None, category=None)[source]¶ Returns the events that match the filters
- Args:
limit (int, optional): the max length of the events to return (Default value = None) app_id (str, optional): only events that have been sent or received from ‘app_id’ (Default value = None) category (str, optional): only events belonging to the category (Default value = None)
- Returns:
list: a list of filtered events
-