Source code for behaviour

from __future__ import (
    annotations,
)  # make all type hints be strings and skip evaluating them

import asyncio
import inspect
import sys
import traceback
from asyncio import CancelledError
from enum import Enum
from typing import Any, Optional, List, Type, Dict, Tuple, AsyncIterable
from typing import TYPE_CHECKING

import sqlalchemy
from asgiref.sync import sync_to_async
from databases import Database
from sqlalchemy import MetaData, Table, Column, Integer, TIMESTAMP, String, Text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError
from twpy import utcnow

from core import MyService
from messages import SerializableObject, DemoData, WrongMessageFormatException
from mode import Service
from mode.utils.locks import Event
from mode.utils.types.trees import NodeT
from model import TsDb, metadata
from settings import DB_URL

if TYPE_CHECKING:
    pass

from aio_pika import Message, IncomingMessage

import subsystem
from handler import Registry, Handler
from subsystem import PubSub, RPC_SubSystem


[docs]class BehaviourNotFinishedException(Exception): """ """ pass
[docs]class Behaviour(MyService): """ Behaviour is a Service which defines set of actions All incoming messages are queued into input mailbox. Outgoing messages can either be broadcast to all listening agents or point-to-point sent to one agent or sent to a message topic (PubSub pattern) """ def __init__( self, core, *, beacon: NodeT = None, loop: asyncio.AbstractEventLoop = None, binding_keys: list = None, configure_rpc: bool = False, ) -> None: super().__init__(identity=core.identity, beacon=beacon, loop=loop) self.core = core self.name = f"{core.identity}.{self.__class__.__name__}" self.handlers = core.handlers self._exit_code = 0 self.pubsub: Optional[PubSub] = None self.binding_keys = binding_keys self.rpc: Optional[RPC_SubSystem] = None self.configure_rpc = configure_rpc self.on_start_coros = list() self.on_end_coros = list() self.is_configured_asyncio = False # self.queue: Optional[asyncio.Queue[IncomingMessage]] = None self.queue = asyncio.Queue() # TODO: unlimited queue size self.is_configured_asyncio = True self._force_kill = self._new_force_kill_event() # self.future_store = FutureStore(loop=self.loop) def _new_force_kill_event(self) -> Event: return Event(loop=self._loop) def _new_shutdown_done_event(self) -> Event: return Event(loop=self._loop) @property def exit_code(self) -> Any: """ Returns the exit_code of the behaviour. It only works when the behaviour is done or killed, otherwise it raises an exception. Returns: object: the exit code of the behaviour """ if self.is_killed(): return self._exit_code else: raise BehaviourNotFinishedException @exit_code.setter def exit_code(self, value: Any): self._exit_code = value
[docs] async def on_start(self): """ Coroutine called before the behaviour is started. """ # self.identity = self.beacon.root.data.identity self.on_start_coros = list() self.on_end_coros = list() if self.binding_keys is not None: self.pubsub = PubSub(self, binding_keys=self.binding_keys) self.on_start_coros.append(self.pubsub.on_start()) self.on_end_coros.append(self.pubsub.on_end()) if self.configure_rpc: self.rpc = subsystem.RPC_SubSystem(self) self.on_start_coros.append(self.rpc.on_start()) self.on_end_coros.append(self.rpc.on_end()) await asyncio.gather(*self.on_start_coros) await self.setup() self.log.debug(f"{self.name} done.")
[docs] async def setup(self): """ to be overwritten by user """ pass
[docs] async def on_stop(self) -> None: self.log.info(f"Stopping {self.name}.") # self.kill(exit_code="Gracefull Shutdown") await self.teardown()
[docs] async def teardown(self): """ to be overwritten by user """ pass
[docs] async def on_shutdown(self): self.set_shutdown() await asyncio.gather(*self.on_end_coros) self.log.info(f"{self.name} shutdown: {self.state}")
async def _on_end(self): await self.on_end() self.log.debug(f"{self.name} done.")
[docs] async def on_end(self): """ Coroutine called after the behaviour is done or killed. To be overwritten by user. """ self.log.debug(f"{self.name} done.")
def is_killed(self) -> bool: return self._force_kill.is_set()
[docs] async def run(self): """ Body of the behaviour, runs every step of runloop To be overwritten by user. """ await asyncio.sleep(0.1)
async def _run(self): """ Function to be overload by more complex behaviours. In other case it just calls run() coroutine. """ await self.run() @Service.task async def _step(self): """ Main loop of the behaviour. checks whether behaviour is done or killed, otherwise it calls run() coroutine. """ cancelled = False while not self.should_stop and not self.is_killed(): try: await self._run() await asyncio.sleep(0) # relinquish cpu except CancelledError: self.log.info(f"Behaviour {self} cancelled") cancelled = True except Exception as e: self.log.error(f"Exception running behaviour {self}: {e}") self.log.error(traceback.format_exc()) # self.kill(exit_code=e) try: if not cancelled: await self._on_end() except Exception as e: self.log.error("Exception running on_end in behaviour {self}: {e}") self.log.error(traceback.format_exc()) # self.kill(exit_code=e) finally: self.log.info(f"---------- loop final ----------")
[docs] async def enqueue(self, message: Message): """ Enqueues a message in the behaviour's incoming mailbox """ self.log.debug(f"message enqueued: {message.body}") await self.queue.put(message)
[docs] def mailbox_size(self) -> int: """ returns mailbox size """ return self.queue.qsize()
[docs] async def direct_send( self, msg: str, msg_type: str, target: str = None, correlation_id: str = None ): """ Sends message to default exchange, 1:1 communication """ await self.core.direct_send(msg, msg_type, target, correlation_id)
# self.agent.traces.append(TraceStoreMessage.from_msg(msg), category=str(self))
[docs] async def publish(self, msg: str, routing_key: str): """ Publishes message to topic, 1:n communication """ await self.core.publish(msg, routing_key)
# self.agent.traces.append(TraceStoreMessage.from_msg(msg), category=str(self))
[docs] async def fanout_send(self, msg: str, msg_type: str): """ Sends message to fanout exchange, 1:n communication """ await self.core.fanout_send(msg, msg_type)
# self.agent.traces.append(TraceStoreMessage.from_msg(msg), category=str(self))
[docs] async def receive(self, timeout: float = None) -> IncomingMessage: """ Receives a message from inbox mailbox. If timeout is not None it returns the message or "None" after timeout is done. """ if timeout: coro = self.queue.get() try: msg = await asyncio.wait_for(coro, timeout=timeout) self.queue.task_done() except asyncio.TimeoutError: msg = None else: try: msg = self.queue.get_nowait() self.queue.task_done() except asyncio.QueueEmpty: msg = None return msg
[docs] async def receive_all(self) -> AsyncIterable[IncomingMessage]: """ Receives all messages from inbox mailbox. """ while self.queue.qsize() != 0: yield await self.receive()
[docs] async def get_and_dispatch(self, timeout: float = None): """ Reads a message from inbox and dispatches it to known handler. """ msg = await self.receive(timeout=timeout) if msg: self.log.info(f"{self.name}: Message: {msg.body.decode()}") return await self.dispatch(msg)
[docs] async def dispatch(self, msg: IncomingMessage, handlers: Registry = None) -> None: """ Dispatch message to handler. """ if handlers is None: handlers = self.handlers msg_type = msg.type handler = handlers.get(handler=msg_type) if isinstance(handler, Handler): return await handler.handle(self, msg) else: return await handler(self, msg)
[docs] @subsystem.expose async def example_rpc_method(self, x, y, flag=None, **kwargs): """ Sample RPC method: multiplies to numbers """ self.log.info( f"example_method: Signature: {inspect.signature(self.example_rpc_method)}" ) self.log.info(f"exmpale_method called with: {x}, {y}, {flag}, {kwargs}") return x * y
def __str__(self): if self.name: return self.name else: return "{}/{}".format( "/".join(base.__name__ for base in self.__class__.__bases__), self.__class__.__name__, )
[docs]class EmptyBehav(Behaviour): """ Do nothing, just overwrite methods."""
[docs] async def on_start(self): print(f"Starting {self.name} . . .")
[docs] async def run(self): # >1 triggers log messages: syncio:poll 999.294 ms took 1000.570 ms: timeout await asyncio.sleep(0)
[docs] async def on_end(self): print(f"Finished {self.name} with exit_code {self.exit_code}. . .")
[docs]class SqlBehav(Behaviour): """ Stores all messages arriving in mailbox to SQL-DB (sqlite) as json blob """ def __init__( self, core, *, beacon: NodeT = None, loop: asyncio.AbstractEventLoop = None, binding_keys: list = None, configure_rpc: bool = False, ) -> None: super(SqlBehav, self).__init__( core, beacon=beacon, loop=loop, binding_keys=binding_keys, configure_rpc=configure_rpc, ) self.db: Optional[Database] = None self.engine: Optional[Engine] = None self.metadata: Optional[MetaData] = None # TODO: Generalize example self.msg_types: Dict[str, Type[SerializableObject]] = { DemoData.__name__: DemoData } # ATTENTION: keep in sync with model.json_data definition !!! # allow already serialized json to be inserted in JSON column as text # as class variable overwrites tests while loading (startup-time vs. runtime) self.json_data = Table( "json_data", metadata, Column("id", Integer, primary_key=True), Column("ts", TIMESTAMP(timezone=True)), Column("sender", String(length=256)), Column("rmq_type", String(length=100)), Column("content_type", String(length=100)), Column("routing_key", String(length=256)), Column("data", Text), extend_existing=True, # allow redefinition to JSON column to send json string ) def add_msg_type(self, msg_type: Type[SerializableObject]): self.msg_types[msg_type.__name__] = msg_type
[docs] async def setup(self): await self.init_db() self.db = Database(DB_URL) await self.db.connect()
@sync_to_async def init_db(self) -> None: self.log.info(f"Initializating db: {DB_URL}") TsDb.create_new_db(DB_URL) db = TsDb(url=DB_URL, meta=metadata) db.init_db()
[docs] async def run(self): try: msg = await self.receive() if msg: print(f"{self.name}: Message received: {msg.body.decode()}") msg_type_key = SerializableObject.extract_type(msg.body.decode()) msg_type = self.msg_types.get(msg_type_key) if msg_type: obj = SerializableObject.deserialize( msg.body.decode(), msg_type=msg_type ) data = { "sender": msg.app_id, "rmq_type": msg.type, "content_type": obj.__class__.__name__, "ts": utcnow(), "routing_key": msg.routing_key, "data": obj.to_json(), } await self.save_to_db(data) else: self.log.error( f"Unknown message type {msg_type_key} read from topics {self.binding_keys}." ) self.log.error( f"Expected types: {[mtype for mtype in self.msg_types.keys()]}." ) self.log.error(f"Not saving to DB") except WrongMessageFormatException as e: # self.log.exception(f"{e}", exc_info=sys.exc_info()) self.log.exception(f"{e}")
async def save_to_db(self, data: dict) -> None: query = self.json_data.insert() try: await self.db.execute(query=query, values=data) except SQLAlchemyError as e: self.log.exception(e) raise
[docs] async def on_end(self): await self.db.disconnect()
[docs]class SystemBehaviour(Enum): EMPTY = EmptyBehav