Files
pepperplus-cb/test/unit/core/test_agent_system.py
Kasper ef00c03ec5 feat: pydantic models and inter-process messaging
Moved `InternalMessage` into schemas and created a `BeliefMessage`
model. Also added the ability for agents to communicate via ZMQ to
agents on another process.

ref: N25B-316
2025-11-24 14:03:34 +01:00

71 lines
1.7 KiB
Python

"""Test the base class logic, message passing and background task handling."""
import asyncio
import logging
from unittest.mock import AsyncMock
import pytest
from control_backend.core.agent_system import AgentDirectory, BaseAgent, InternalMessage
class ConcreteTestAgent(BaseAgent):
logger = logging.getLogger("test")
def __init__(self, name: str):
super().__init__(name)
self.received = []
async def setup(self):
pass
async def handle_message(self, msg: InternalMessage):
self.received.append(msg)
if msg.body == "stop":
await self.stop()
@pytest.mark.asyncio
async def test_agent_lifecycle():
agent = ConcreteTestAgent("lifecycle_agent")
await agent.start()
assert agent._running is True
# Test background task
async def dummy_task():
await asyncio.sleep(0.01)
await agent.add_behavior(dummy_task())
assert len(agent._tasks) > 0
# Wait for task to finish
await asyncio.sleep(0.02)
assert len(agent._tasks) == 2 # message handling tasks are running
await agent.stop()
assert agent._running is False
await asyncio.sleep(0.01)
# Tasks should be cancelled
assert len(agent._tasks) == 0
@pytest.mark.asyncio
async def test_send_unknown_agent():
agent = ConcreteTestAgent("sender")
msg = InternalMessage(to="unknown_receiver", sender="sender", body="boo")
agent._internal_pub_socket = AsyncMock()
await agent.send(msg)
agent._internal_pub_socket.send_multipart.assert_called()
@pytest.mark.asyncio
async def test_get_agent():
agent = ConcreteTestAgent("registrant")
assert AgentDirectory.get("registrant") == agent
assert AgentDirectory.get("non_existent") is None