From ef00c03ec596294a245d575e510142db8f5eb5ad Mon Sep 17 00:00:00 2001 From: Kasper Date: Mon, 24 Nov 2025 14:03:34 +0100 Subject: [PATCH] feat: pydantic models and inter-process messaging Moved `InternalMessage` into schemas and created a `BeliefMessage` model. Also added the ability for agents to communicate via ZMQ to agents on another process. ref: N25B-316 --- .../bdi/bdi_core_agent/bdi_core_agent.py | 12 ++-- .../agents/bdi/bdi_core_agent/rules.asl | 6 +- .../agents/bdi/belief_collector_agent.py | 3 +- src/control_backend/core/agent_system.py | 59 ++++++++++++++----- src/control_backend/schemas/belief_message.py | 5 ++ .../schemas/internal_message.py | 12 ++++ test/unit/agents/bdi/test_bdi_core_agent.py | 37 +++++++++--- test/unit/agents/bdi/test_belief_collector.py | 2 +- test/unit/core/test_agent_system.py | 14 +++-- 9 files changed, 113 insertions(+), 37 deletions(-) create mode 100644 src/control_backend/schemas/belief_message.py create mode 100644 src/control_backend/schemas/internal_message.py diff --git a/src/control_backend/agents/bdi/bdi_core_agent/bdi_core_agent.py b/src/control_backend/agents/bdi/bdi_core_agent/bdi_core_agent.py index 0a64ee7..72d3341 100644 --- a/src/control_backend/agents/bdi/bdi_core_agent/bdi_core_agent.py +++ b/src/control_backend/agents/bdi/bdi_core_agent/bdi_core_agent.py @@ -1,16 +1,17 @@ import asyncio import copy -import json import time from collections.abc import Iterable import agentspeak import agentspeak.runtime import agentspeak.stdlib +from pydantic import ValidationError from control_backend.agents.base import BaseAgent from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings +from control_backend.schemas.belief_message import BeliefMessage from control_backend.schemas.ri_message import SpeechCommand @@ -58,16 +59,19 @@ class BDICoreAgent(BaseAgent): maybe_more_work = True while maybe_more_work: maybe_more_work = False + self.logger.debug("Stepping BDI.") if self.bdi_agent.step(): maybe_more_work = True if not maybe_more_work: deadline = self.bdi_agent.shortest_deadline() if deadline: + self.logger.debug("Sleeping until %s", deadline) await asyncio.sleep(deadline - time.time()) maybe_more_work = True else: self._wake_bdi_loop.clear() + self.logger.debug("No more deadlines. Halting BDI loop.") async def handle_message(self, msg: InternalMessage): """ @@ -80,10 +84,10 @@ class BDICoreAgent(BaseAgent): self.logger.debug("Processing message from belief collector.") try: if msg.thread == "beliefs": - beliefs = json.loads(msg.body) + beliefs = BeliefMessage.model_validate_json(msg.body).beliefs self._add_beliefs(beliefs) - except Exception as e: - self.logger.error(f"Error processing belief: {e}") + except ValidationError: + self.logger.exception("Error processing belief.") case settings.agent_settings.llm_name: content = msg.body self.logger.info("Received LLM response: %s", content) diff --git a/src/control_backend/agents/bdi/bdi_core_agent/rules.asl b/src/control_backend/agents/bdi/bdi_core_agent/rules.asl index d88858d..a685f93 100644 --- a/src/control_backend/agents/bdi/bdi_core_agent/rules.asl +++ b/src/control_backend/agents/bdi/bdi_core_agent/rules.asl @@ -1,3 +1,3 @@ -+user_said(NewMessage) <- - -user_said(NewMessage); - .reply(NewMessage). ++user_said(Message) <- + -user_said(Message); + .reply(Message). diff --git a/src/control_backend/agents/bdi/belief_collector_agent.py b/src/control_backend/agents/bdi/belief_collector_agent.py index 85d8e6e..5d25204 100644 --- a/src/control_backend/agents/bdi/belief_collector_agent.py +++ b/src/control_backend/agents/bdi/belief_collector_agent.py @@ -3,6 +3,7 @@ import json from control_backend.agents.base import BaseAgent from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings +from control_backend.schemas.belief_message import BeliefMessage class BDIBeliefCollectorAgent(BaseAgent): @@ -80,7 +81,7 @@ class BDIBeliefCollectorAgent(BaseAgent): msg = InternalMessage( to=settings.agent_settings.bdi_core_name, sender=self.name, - body=json.dumps(beliefs), + body=BeliefMessage(beliefs=beliefs).model_dump_json(), thread="beliefs", ) diff --git a/src/control_backend/core/agent_system.py b/src/control_backend/core/agent_system.py index b7130ba..ccdfe78 100644 --- a/src/control_backend/core/agent_system.py +++ b/src/control_backend/core/agent_system.py @@ -2,24 +2,17 @@ import asyncio import logging from abc import ABC, abstractmethod from collections.abc import Coroutine -from dataclasses import dataclass + +import zmq +import zmq.asyncio as azmq + +from control_backend.core.config import settings +from control_backend.schemas.internal_message import InternalMessage # Central directory to resolve agent names to instances _agent_directory: dict[str, "BaseAgent"] = {} -@dataclass -class InternalMessage: - """ - Represents a message to an agent. - """ - - to: str - sender: str - body: str - thread: str | None = None - - class AgentDirectory: """ Helper class to keep track of which agents are registered. @@ -67,10 +60,23 @@ class BaseAgent(ABC): """Starts the agent and its loops.""" self.logger.info(f"Starting agent {self.name}") self._running = True + + context = azmq.Context.instance() + + # Setup the internal publishing socket + self._internal_pub_socket = context.socket(zmq.PUB) + self._internal_pub_socket.connect(settings.zmq_settings.internal_pub_address) + + # Setup the internal receiving socket + self._internal_sub_socket = context.socket(zmq.SUB) + self._internal_sub_socket.connect(settings.zmq_settings.internal_sub_address) + self._internal_sub_socket.subscribe(f"internal/{self.name}") + await self.setup() - # Start processing inbox + # Start processing inbox and ZMQ messages await self.add_behavior(self._process_inbox()) + await self.add_behavior(self._receive_internal_zmq_loop()) async def stop(self): """Stops the agent.""" @@ -86,15 +92,38 @@ class BaseAgent(ABC): target = AgentDirectory.get(message.to) if target: await target.inbox.put(message) + self.logger.debug(f"Sent message {message.body} to {message.to} via regular inbox.") else: - self.logger.warning(f"Attempted to send message to unknown agent: {message.to}") + # Apparently target agent is on a different process, send via ZMQ + topic = f"internal/{message.to}".encode() + body = message.model_dump_json().encode() + await self._internal_pub_socket.send_multipart([topic, body]) + self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.") async def _process_inbox(self): """Default loop: equivalent to a CyclicBehaviour receiving messages.""" while self._running: msg = await self.inbox.get() + self.logger.debug(f"Received message from {msg.sender}.") await self.handle_message(msg) + async def _receive_internal_zmq_loop(self): + """ + Listens for internal messages sent from agents on another process via ZMQ + and puts them into the normal inbox. + """ + while self._running: + try: + _, body = await self._internal_sub_socket.recv_multipart() + + msg = InternalMessage.model_validate_json(body) + + await self.inbox.put(msg) + except asyncio.CancelledError: + break + except Exception: + self.logger.exception("Could not process ZMQ message.") + async def handle_message(self, msg: InternalMessage): """Override this to handle incoming messages.""" raise NotImplementedError diff --git a/src/control_backend/schemas/belief_message.py b/src/control_backend/schemas/belief_message.py new file mode 100644 index 0000000..a5f7507 --- /dev/null +++ b/src/control_backend/schemas/belief_message.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel + + +class BeliefMessage(BaseModel): + beliefs: dict[str, list[str]] diff --git a/src/control_backend/schemas/internal_message.py b/src/control_backend/schemas/internal_message.py new file mode 100644 index 0000000..0240d52 --- /dev/null +++ b/src/control_backend/schemas/internal_message.py @@ -0,0 +1,12 @@ +from pydantic import BaseModel + + +class InternalMessage(BaseModel): + """ + Represents a message to an agent. + """ + + to: str + sender: str + body: str + thread: str | None = None diff --git a/test/unit/agents/bdi/test_bdi_core_agent.py b/test/unit/agents/bdi/test_bdi_core_agent.py index 84d11e4..43ee033 100644 --- a/test/unit/agents/bdi/test_bdi_core_agent.py +++ b/test/unit/agents/bdi/test_bdi_core_agent.py @@ -1,10 +1,13 @@ +import json from unittest.mock import AsyncMock, MagicMock, mock_open, patch +import agentspeak import pytest from control_backend.agents.bdi.bdi_core_agent.bdi_core_agent import BDICoreAgent from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings +from control_backend.schemas.belief_message import BeliefMessage @pytest.fixture @@ -40,23 +43,43 @@ async def test_setup_no_asl(mock_agentspeak_env, agent): @pytest.mark.asyncio -async def test_handle_belief_collector_message(agent): +async def test_handle_belief_collector_message(agent, mock_settings): """Test that incoming beliefs are added to the BDI agent""" - # Simulate message from belief collector - import json - beliefs = {"user_said": ["Hello"]} msg = InternalMessage( to="bdi_agent", - sender=settings.agent_settings.bdi_belief_collector_name, - body=json.dumps(beliefs), + sender=mock_settings.agent_settings.bdi_belief_collector_name, + body=BeliefMessage(beliefs=beliefs).model_dump_json(), thread="beliefs", ) await agent.handle_message(msg) # Expect bdi_agent.call to be triggered to add belief - assert agent.bdi_agent.call.called + args = agent.bdi_agent.call.call_args.args + assert args[0] == agentspeak.Trigger.addition + assert args[1] == agentspeak.GoalType.belief + assert args[2] == agentspeak.Literal("user_said", (agentspeak.Literal("Hello"),)) + + +@pytest.mark.asyncio +async def test_incorrect_belief_collector_message(agent, mock_settings): + """Test that incorrect message format triggers an exception.""" + msg = InternalMessage( + to="bdi_agent", + sender=mock_settings.agent_settings.bdi_belief_collector_name, + body=json.dumps({"bad_format": "bad_format"}), + thread="beliefs", + ) + + await agent.handle_message(msg) + + agent.bdi_agent.call.assert_not_called() # did not set belief + + +@pytest.mark.asyncio +async def test(): + pass @pytest.mark.asyncio diff --git a/test/unit/agents/bdi/test_belief_collector.py b/test/unit/agents/bdi/test_belief_collector.py index 250aa3f..ca89a9d 100644 --- a/test/unit/agents/bdi/test_belief_collector.py +++ b/test/unit/agents/bdi/test_belief_collector.py @@ -84,4 +84,4 @@ async def test_send_beliefs_to_bdi(agent): sent: InternalMessage = agent.send.call_args.args[0] assert sent.to == settings.agent_settings.bdi_core_name assert sent.thread == "beliefs" - assert json.loads(sent.body) == beliefs + assert json.loads(sent.body)["beliefs"] == beliefs diff --git a/test/unit/core/test_agent_system.py b/test/unit/core/test_agent_system.py index 001ead3..5e954c8 100644 --- a/test/unit/core/test_agent_system.py +++ b/test/unit/core/test_agent_system.py @@ -2,6 +2,7 @@ import asyncio import logging +from unittest.mock import AsyncMock import pytest @@ -39,7 +40,7 @@ async def test_agent_lifecycle(): # Wait for task to finish await asyncio.sleep(0.02) - assert len(agent._tasks) == 1 # _process_inbox is still running + assert len(agent._tasks) == 2 # message handling tasks are running await agent.stop() assert agent._running is False @@ -51,14 +52,15 @@ async def test_agent_lifecycle(): @pytest.mark.asyncio -async def test_send_unknown_agent(caplog): +async def test_send_unknown_agent(): agent = ConcreteTestAgent("sender") - msg = InternalMessage(to="unknown_sender", sender="sender", body="boo") + msg = InternalMessage(to="unknown_receiver", sender="sender", body="boo") - with caplog.at_level(logging.WARNING): - await agent.send(msg) + agent._internal_pub_socket = AsyncMock() - assert "Attempted to send message to unknown agent: unknown_sender" in caplog.text + await agent.send(msg) + + agent._internal_pub_socket.send_multipart.assert_called() @pytest.mark.asyncio