From c0b8fb861213ace81fdf53060009ca89b960557a Mon Sep 17 00:00:00 2001 From: Kasper Marinus Date: Tue, 13 Jan 2026 11:06:42 +0100 Subject: [PATCH 1/4] feat: able to send to multiple receivers ref: N25B-441 --- src/control_backend/core/agent_system.py | 25 +++++++++++-------- .../schemas/internal_message.py | 4 ++- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/control_backend/core/agent_system.py b/src/control_backend/core/agent_system.py index 9d7a47f..1411c0d 100644 --- a/src/control_backend/core/agent_system.py +++ b/src/control_backend/core/agent_system.py @@ -130,16 +130,21 @@ class BaseAgent(ABC): :param message: The message to send. """ - target = AgentDirectory.get(message.to) - if target: - await target.inbox.put(message) - self.logger.debug(f"Sent message {message.body} to {message.to} via regular inbox.") - else: - # Apparently target agent is on a different process, send via ZMQ - topic = f"internal/{message.to}".encode() - body = message.model_dump_json().encode() - await self._internal_pub_socket.send_multipart([topic, body]) - self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.") + to = message.to + receivers = [to] if isinstance(to, str) else to + + for receiver in receivers: + target = AgentDirectory.get(receiver) + + if target: + await target.inbox.put(message) + self.logger.debug(f"Sent message {message.body} to {message.to} via regular inbox.") + else: + # Apparently target agent is on a different process, send via ZMQ + topic = f"internal/{message.to}".encode() + body = message.model_dump_json().encode() + await self._internal_pub_socket.send_multipart([topic, body]) + self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.") async def _process_inbox(self): """ diff --git a/src/control_backend/schemas/internal_message.py b/src/control_backend/schemas/internal_message.py index 071d884..d70f80a 100644 --- a/src/control_backend/schemas/internal_message.py +++ b/src/control_backend/schemas/internal_message.py @@ -1,3 +1,5 @@ +from collections.abc import Iterable + from pydantic import BaseModel @@ -11,7 +13,7 @@ class InternalMessage(BaseModel): :ivar thread: An optional thread identifier/topic to categorize the message (e.g., 'beliefs'). """ - to: str + to: str | Iterable[str] sender: str body: str thread: str | None = None From 70e05b6c9261da323d39ce3cec9d691ff9e4ed95 Mon Sep 17 00:00:00 2001 From: Twirre Meulenbelt <43213592+TwirreM@users.noreply.github.com> Date: Tue, 13 Jan 2026 11:10:35 +0100 Subject: [PATCH 2/4] test: sending to multiple agents, including remote ref: N25B-441 --- src/control_backend/core/agent_system.py | 5 +- test/unit/core/test_agent_system.py | 67 +++++++++++++++++++++++- 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/src/control_backend/core/agent_system.py b/src/control_backend/core/agent_system.py index 1411c0d..fffa282 100644 --- a/src/control_backend/core/agent_system.py +++ b/src/control_backend/core/agent_system.py @@ -60,6 +60,9 @@ class BaseAgent(ABC): self._tasks: set[asyncio.Task] = set() self._running = False + self._internal_pub_socket: None | azmq.Socket = None + self._internal_sub_socket: None | azmq.Socket = None + # Register immediately AgentDirectory.register(name, self) @@ -141,7 +144,7 @@ class BaseAgent(ABC): self.logger.debug(f"Sent message {message.body} to {message.to} via regular inbox.") else: # Apparently target agent is on a different process, send via ZMQ - topic = f"internal/{message.to}".encode() + topic = f"internal/{receiver}".encode() body = message.model_dump_json().encode() await self._internal_pub_socket.send_multipart([topic, body]) self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.") diff --git a/test/unit/core/test_agent_system.py b/test/unit/core/test_agent_system.py index 234de4e..252cca1 100644 --- a/test/unit/core/test_agent_system.py +++ b/test/unit/core/test_agent_system.py @@ -99,12 +99,75 @@ async def test_send_to_local_agent(monkeypatch): # Patch inbox.put target.inbox.put = AsyncMock() - message = InternalMessage(to="receiver", sender="sender", body="hello") + message = InternalMessage(to=target.name, sender=sender.name, body="hello") await sender.send(message) target.inbox.put.assert_awaited_once_with(message) - sender.logger.debug.assert_called_once() + + +@pytest.mark.asyncio +async def test_send_to_zmq_agent(monkeypatch): + sender = DummyAgent("sender") + target = "remote_receiver" + + # Fake logger + sender.logger = MagicMock() + + # Fake zmq + sender._internal_pub_socket = AsyncMock() + + message = InternalMessage(to=target, sender=sender.name, body="hello") + + await sender.send(message) + + zmq_calls = sender._internal_pub_socket.send_multipart.call_args[0][0] + assert zmq_calls[0] == f"internal/{target}".encode() + + +@pytest.mark.asyncio +async def test_send_to_multiple_local_agents(monkeypatch): + sender = DummyAgent("sender") + target1 = DummyAgent("receiver1") + target2 = DummyAgent("receiver2") + + # Fake logger + sender.logger = MagicMock() + + # Patch inbox.put + target1.inbox.put = AsyncMock() + target2.inbox.put = AsyncMock() + + message = InternalMessage(to=[target1.name, target2.name], sender=sender.name, body="hello") + + await sender.send(message) + + target1.inbox.put.assert_awaited_once_with(message) + target2.inbox.put.assert_awaited_once_with(message) + + +@pytest.mark.asyncio +async def test_send_to_multiple_agents(monkeypatch): + sender = DummyAgent("sender") + target1 = DummyAgent("receiver1") + target2 = "remote_receiver" + + # Fake logger + sender.logger = MagicMock() + + # Fake zmq + sender._internal_pub_socket = AsyncMock() + + # Patch inbox.put + target1.inbox.put = AsyncMock() + + message = InternalMessage(to=[target1.name, target2], sender=sender.name, body="hello") + + await sender.send(message) + + target1.inbox.put.assert_awaited_once_with(message) + zmq_calls = sender._internal_pub_socket.send_multipart.call_args[0][0] + assert zmq_calls[0] == f"internal/{target2}".encode() @pytest.mark.asyncio From 9a55067a1371d325708ef7ac80c191b0ae3db9fe Mon Sep 17 00:00:00 2001 From: Twirre Meulenbelt <43213592+TwirreM@users.noreply.github.com> Date: Tue, 13 Jan 2026 17:07:17 +0100 Subject: [PATCH 3/4] fix: set sender for internal messages ref: N25B-441 --- src/control_backend/core/agent_system.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/control_backend/core/agent_system.py b/src/control_backend/core/agent_system.py index fffa282..5b2ea7e 100644 --- a/src/control_backend/core/agent_system.py +++ b/src/control_backend/core/agent_system.py @@ -133,6 +133,7 @@ class BaseAgent(ABC): :param message: The message to send. """ + message.sender = self.name to = message.to receivers = [to] if isinstance(to, str) else to From d7d697b29363c49e0edadb3c92cf881077070954 Mon Sep 17 00:00:00 2001 From: Twirre Meulenbelt <43213592+TwirreM@users.noreply.github.com> Date: Tue, 13 Jan 2026 17:09:26 +0100 Subject: [PATCH 4/4] docs: update `to` docstring ref: N25B-441 --- src/control_backend/schemas/internal_message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/control_backend/schemas/internal_message.py b/src/control_backend/schemas/internal_message.py index d70f80a..758c085 100644 --- a/src/control_backend/schemas/internal_message.py +++ b/src/control_backend/schemas/internal_message.py @@ -7,7 +7,7 @@ class InternalMessage(BaseModel): """ Standard message envelope for communication between agents within the Control Backend. - :ivar to: The name of the destination agent. + :ivar to: The name(s) of the destination agent(s). :ivar sender: The name of the sending agent. :ivar body: The string payload (often a JSON-serialized model). :ivar thread: An optional thread identifier/topic to categorize the message (e.g., 'beliefs').