import asyncio import logging from abc import ABC, abstractmethod from asyncio import Task from collections.abc import Coroutine import zmq import zmq.asyncio as azmq from control_backend.core.config import settings from control_backend.schemas.internal_message import InternalMessage # Central directory to resolve agent names to instances _agent_directory: dict[str, "BaseAgent"] = {} class AgentDirectory: """ Helper class to keep track of which agents are registered. Used for handling message routing. """ @staticmethod def register(name: str, agent: "BaseAgent"): _agent_directory[name] = agent @staticmethod def get(name: str) -> "BaseAgent | None": return _agent_directory.get(name) class BaseAgent(ABC): """ Abstract base class for all agents in the system. This class provides the foundational infrastructure for agent lifecycle management, messaging (both intra-process and inter-process via ZMQ), and asynchronous behavior execution. .. warning:: Do not inherit from this class directly for creating new agents. Instead, inherit from :class:`control_backend.agents.base.BaseAgent`, which ensures proper logger configuration. :ivar name: The unique name of the agent. :ivar inbox: The queue for receiving internal messages. :ivar _tasks: A set of currently running asynchronous tasks/behaviors. :ivar _running: A boolean flag indicating if the agent is currently running. :ivar logger: The logger instance for the agent. """ logger: logging.Logger def __init__(self, name: str): """ Initialize the BaseAgent. :param name: The unique identifier for this agent. """ self.name = name self.inbox: asyncio.Queue[InternalMessage] = asyncio.Queue() self._tasks: set[asyncio.Task] = set() self._running = False self._internal_pub_socket: None | azmq.Socket = None self._internal_sub_socket: None | azmq.Socket = None # Register immediately AgentDirectory.register(name, self) @abstractmethod async def setup(self): """ Initialize agent-specific resources. This method must be overridden by subclasses. It is called after the agent has started and the ZMQ sockets have been initialized. Use this method to: * Initialize connections (databases, APIs, etc.) * Add initial behaviors using :meth:`add_behavior` """ pass async def start(self): """ Start the agent and its internal loops. This method: 1. Sets the running state to True. 2. Initializes ZeroMQ PUB/SUB sockets for inter-process communication. 3. Calls the user-defined :meth:`setup` method. 4. Starts the inbox processing loop and the ZMQ receiver loop. """ self.logger.info(f"Starting agent {self.name}") self._running = True context = azmq.Context.instance() # Setup the internal publishing socket self._internal_pub_socket = context.socket(zmq.PUB) self._internal_pub_socket.connect(settings.zmq_settings.internal_pub_address) # Setup the internal receiving socket self._internal_sub_socket = context.socket(zmq.SUB) self._internal_sub_socket.connect(settings.zmq_settings.internal_sub_address) self._internal_sub_socket.subscribe(f"internal/{self.name}") await self.setup() # Start processing inbox and ZMQ messages self.add_behavior(self._process_inbox()) self.add_behavior(self._receive_internal_zmq_loop()) async def stop(self): """ Stop the agent. Sets the running state to False and cancels all running background tasks. """ self._running = False for task in self._tasks: task.cancel() self.logger.info(f"Agent {self.name} stopped") async def send(self, message: InternalMessage): """ Send a message to another agent. This method intelligently routes the message: * If the target agent is in the same process (found in :class:`AgentDirectory`), the message is put directly into its inbox. * If the target agent is not found locally, the message is serialized and sent via ZeroMQ to the internal publication address. :param message: The message to send. """ message.sender = self.name to = message.to receivers = [to] if isinstance(to, str) else to for receiver in receivers: target = AgentDirectory.get(receiver) if target: await target.inbox.put(message) self.logger.debug(f"Sent message {message.body} to {message.to} via regular inbox.") else: # Apparently target agent is on a different process, send via ZMQ topic = f"internal/{receiver}".encode() body = message.model_dump_json().encode() await self._internal_pub_socket.send_multipart([topic, body]) self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.") async def _process_inbox(self): """ Internal loop that processes messages from the inbox. Reads messages from ``self.inbox`` and passes them to :meth:`handle_message`. """ while self._running: msg = await self.inbox.get() self.logger.debug(f"Received message from {msg.sender}.") await self.handle_message(msg) async def _receive_internal_zmq_loop(self): """ Internal loop that listens for ZMQ messages. Subscribes to ``internal/`` topics. When a message is received, it is deserialized into an :class:`InternalMessage` and put into the local inbox. This bridges the gap between inter-process ZMQ communication and the intra-process inbox. """ while self._running: try: _, body = await self._internal_sub_socket.recv_multipart() msg = InternalMessage.model_validate_json(body) await self.inbox.put(msg) except asyncio.CancelledError: break except Exception: self.logger.exception("Could not process ZMQ message.") async def handle_message(self, msg: InternalMessage): """ Handle an incoming message. This method must be overridden by subclasses to define how the agent reacts to messages. :param msg: The received message. :raises NotImplementedError: If not overridden by the subclass. """ raise NotImplementedError def add_behavior(self, coro: Coroutine) -> Task: """ Add a background behavior (task) to the agent. This is the preferred way to run continuous loops or long-running tasks within an agent. The task is tracked and will be automatically cancelled when :meth:`stop` is called. :param coro: The coroutine to execute as a task. """ task = asyncio.create_task(coro) self._tasks.add(task) task.add_done_callback(self._tasks.discard) return task