""" This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences) """ import asyncio import logging from abc import ABC, abstractmethod from asyncio import Task from collections.abc import Coroutine import zmq import zmq.asyncio as azmq from control_backend.core.config import settings from control_backend.schemas.internal_message import InternalMessage # Central directory to resolve agent names to instances _agent_directory: dict[str, "BaseAgent"] = {} class AgentDirectory: """ Helper class to keep track of which agents are registered. Used for handling message routing. """ @staticmethod def register(name: str, agent: "BaseAgent"): """ Registers an agent instance with a unique name. :param name: The name of the agent. :param agent: The :class:`BaseAgent` instance. """ _agent_directory[name] = agent @staticmethod def get(name: str) -> "BaseAgent | None": """ Retrieves a registered agent instance by name. :param name: The name of the agent to retrieve. :return: The :class:`BaseAgent` instance, or None if not found. """ return _agent_directory.get(name) class BaseAgent(ABC): """ Abstract base class for all agents in the system. This class provides the foundational infrastructure for agent lifecycle management, messaging (both intra-process and inter-process via ZMQ), and asynchronous behavior execution. .. warning:: Do not inherit from this class directly for creating new agents. Instead, inherit from :class:`control_backend.agents.base.BaseAgent`, which ensures proper logger configuration. :ivar name: The unique name of the agent. :ivar inbox: The queue for receiving internal messages. :ivar _tasks: A set of currently running asynchronous tasks/behaviors. :ivar _running: A boolean flag indicating if the agent is currently running. :ivar logger: The logger instance for the agent. """ logger: logging.Logger def __init__(self, name: str): """ Initialize the BaseAgent. :param name: The unique identifier for this agent. """ self.name = name self.inbox: asyncio.Queue[InternalMessage] = asyncio.Queue() self._tasks: set[asyncio.Task] = set() self._running = False self._internal_pub_socket: None | azmq.Socket = None self._internal_sub_socket: None | azmq.Socket = None # Register immediately AgentDirectory.register(name, self) @abstractmethod async def setup(self): """ Initialize agent-specific resources. This method must be overridden by subclasses. It is called after the agent has started and the ZMQ sockets have been initialized. Use this method to: * Initialize connections (databases, APIs, etc.) * Add initial behaviors using :meth:`add_behavior` """ pass async def start(self): """ Start the agent and its internal loops. This method: 1. Sets the running state to True. 2. Initializes ZeroMQ PUB/SUB sockets for inter-process communication. 3. Calls the user-defined :meth:`setup` method. 4. Starts the inbox processing loop and the ZMQ receiver loop. """ self.logger.info(f"Starting agent {self.name}") self._running = True context = azmq.Context.instance() # Setup the internal publishing socket self._internal_pub_socket = context.socket(zmq.PUB) self._internal_pub_socket.connect(settings.zmq_settings.internal_pub_address) # Setup the internal receiving socket self._internal_sub_socket = context.socket(zmq.SUB) self._internal_sub_socket.connect(settings.zmq_settings.internal_sub_address) self._internal_sub_socket.subscribe(f"internal/{self.name}") await self.setup() # Start processing inbox and ZMQ messages self.add_behavior(self._process_inbox()) self.add_behavior(self._receive_internal_zmq_loop()) async def stop(self): """ Stop the agent. Sets the running state to False and cancels all running background tasks. """ self._running = False for task in self._tasks: task.cancel() self.logger.info(f"Agent {self.name} stopped") async def send(self, message: InternalMessage, should_log: bool = True): """ Send a message to another agent. This method intelligently routes the message: * If the target agent is in the same process (found in :class:`AgentDirectory`), the message is put directly into its inbox. * If the target agent is not found locally, the message is serialized and sent via ZeroMQ to the internal publication address. :param message: The message to send. """ message.sender = self.name to = message.to receivers = [to] if isinstance(to, str) else to for receiver in receivers: target = AgentDirectory.get(receiver) if target: await target.inbox.put(message) if should_log: self.logger.debug( f"Sent message {message.body} to {message.to} via regular inbox." ) else: # Apparently target agent is on a different process, send via ZMQ topic = f"internal/{receiver}".encode() body = message.model_dump_json().encode() await self._internal_pub_socket.send_multipart([topic, body]) if should_log: self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.") async def _process_inbox(self): """ Internal loop that processes messages from the inbox. Reads messages from ``self.inbox`` and passes them to :meth:`handle_message`. """ while self._running: msg = await self.inbox.get() await self.handle_message(msg) async def _receive_internal_zmq_loop(self): """ Internal loop that listens for ZMQ messages. Subscribes to ``internal/`` topics. When a message is received, it is deserialized into an :class:`InternalMessage` and put into the local inbox. This bridges the gap between inter-process ZMQ communication and the intra-process inbox. """ while self._running: try: _, body = await self._internal_sub_socket.recv_multipart() msg = InternalMessage.model_validate_json(body) await self.inbox.put(msg) except asyncio.CancelledError: break except Exception: self.logger.exception("Could not process ZMQ message.") async def handle_message(self, msg: InternalMessage): """ Handle an incoming message. This method must be overridden by subclasses to define how the agent reacts to messages. :param msg: The received message. :raises NotImplementedError: If not overridden by the subclass. """ raise NotImplementedError def add_behavior(self, coro: Coroutine) -> Task: """ Add a background behavior (task) to the agent. This is the preferred way to run continuous loops or long-running tasks within an agent. The task is tracked and will be automatically cancelled when :meth:`stop` is called. :param coro: The coroutine to execute as a task. """ async def try_coro(coro_: Coroutine): try: await coro_ except asyncio.CancelledError: self.logger.debug("A behavior was canceled successfully: %s", coro_) except Exception: self.logger.warning("An exception occurred in a behavior.", exc_info=True) task = asyncio.create_task(try_coro(coro)) self._tasks.add(task) task.add_done_callback(self._tasks.discard) return task