Files
pepperplus-cb/src/control_backend/core/agent_system.py
2025-11-21 13:28:37 +01:00

106 lines
3.0 KiB
Python

import asyncio
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
# Central directory to resolve agent names to instances
_agent_directory: dict[str, "BaseAgent"] = {}
@dataclass
class InternalMessage:
"""
Represents a message to an agent.
"""
to: str
sender: str
body: str
thread: str | None = None
class AgentDirectory:
"""
Helper class to keep track of which agents are registered.
Used for handling message routing.
"""
@staticmethod
def register(name: str, agent: "BaseAgent"):
_agent_directory[name] = agent
@staticmethod
def get(name: str) -> "BaseAgent | None":
return _agent_directory.get(name)
class BaseAgent(ABC):
"""
Abstract base class for all agents. To make a new agent, inherit from
`control_backend.agents.BaseAgent`, not this class. That ensures that a
logger is present with the correct name pattern.
When subclassing, the `setup()` method needs to be overwritten. To handle
messages from other agents, overwrite the `handle_message()` method. To
send messages to other agents, use the `send()` method. To add custom
behaviors/tasks to the agent, use the `add_background_task()` method.
"""
logger: logging.Logger
def __init__(self, name: str):
self.name = name
self.inbox: asyncio.Queue[InternalMessage] = asyncio.Queue()
self._tasks: set[asyncio.Task] = set()
self._running = False
# Register immediately
AgentDirectory.register(name, self)
@abstractmethod
async def setup(self):
"""Overwrite this to initialize resources."""
pass
async def start(self):
"""Starts the agent and its loops."""
self.logger.info(f"Starting agent {self.name}")
self._running = True
await self.setup()
# Start processing inbox
await self.add_background_task(self._process_inbox())
async def stop(self):
"""Stops the agent."""
self._running = False
for task in self._tasks:
task.cancel()
self.logger.info(f"Agent {self.name} stopped")
async def send(self, message: InternalMessage):
"""
Sends a message to another agent.
"""
target = AgentDirectory.get(message.to)
if target:
await target.inbox.put(message)
else:
self.logger.warning(f"Attempted to send message to unknown agent: {message.to}")
async def _process_inbox(self):
"""Default loop: equivalent to a CyclicBehaviour receiving messages."""
while self._running:
msg = await self.inbox.get()
await self.handle_message(msg)
async def handle_message(self, msg: InternalMessage):
"""Override this to handle incoming messages."""
raise NotImplementedError
async def add_background_task(self, coro):
"""Helper to add a behavior to the agent."""
task = asyncio.create_task(coro)
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)