Source code for agent

from __future__ import (
    annotations,
)  # make all type hints be strings and skip evaluating them

from behaviour import Behaviour
from core import Core


[docs]class Agent(Core): """Demo Agent Use this agent as an example with simple behaviour. Agent identity must be unique within agent network. Run it with the Worker class:: from mode import Worker worker = Worker( Agent(identity='agent'), loglevel="info", logfile=None, daemon=True, ) worker.execute_from_commandline() Stop the agent wih CTRL-C """
[docs] async def setup(self) -> None: """ setup is run during agent initialization Put in your all setup and behaviours you need. """ await self.add_runtime_dependency(self.behaviour)
@property def behaviour(self) -> Behaviour: return Behaviour( self, binding_keys=self.config.get("binding_keys"), configure_rpc=self.config.get("configure_rpc") ) def create_graph(self) -> None: print('APP STARTING') import pydot import io o = io.StringIO() beacon = self.beacon.root or self.beacon beacon.as_graph().to_dot(o) graph, = pydot.graph_from_dot_data(o.getvalue()) print('WRITING GRAPH TO image.png') with open('image.png', 'wb') as fh: fh.write(graph.create_png())
if __name__ == '__main__': from mode import Worker config = { "binding_keys": ['x.y'], "configure_rpc": True } worker = Worker( Agent(identity='agent', config=config), loglevel="info", logfile=None, daemon=True, redirect_stdouts=False, ) worker.execute_from_commandline() # Worker(Agent(identity='agent'), loglevel="info").execute_from_commandline()