diff --git a/.gitignore b/.gitignore index 4d2fe1b..f6ad342 100644 --- a/.gitignore +++ b/.gitignore @@ -215,7 +215,8 @@ __marimo__/ # Streamlit .streamlit/secrets.toml - +# MacOS +.DS_Store diff --git a/pyproject.toml b/pyproject.toml index 00652a9..6776668 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "pyzmq>=27.1.0", "silero-vad>=6.0.0", "spade>=4.1.0", + "spade-bdi>=0.3.2", "torch>=2.8.0", "uvicorn>=0.37.0", ] diff --git a/src/control_backend/agents/bdi/__init__.py b/src/control_backend/agents/bdi/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/control_backend/agents/bdi/bdi_core.py b/src/control_backend/agents/bdi/bdi_core.py new file mode 100644 index 0000000..7311061 --- /dev/null +++ b/src/control_backend/agents/bdi/bdi_core.py @@ -0,0 +1,35 @@ +import logging + +import agentspeak +from spade_bdi.bdi import BDIAgent + +from control_backend.agents.bdi.behaviours.belief_setter import BeliefSetter + +class BDICoreAgent(BDIAgent): + """ + This is the Brain agent that does the belief inference with AgentSpeak. + This is a continous process that happens automatically in the background. + This class contains all the actions that can be called from AgentSpeak plans. + It has the BeliefSetter behaviour. + """ + logger = logging.getLogger("BDI Core") + + async def setup(self): + belief_setter = BeliefSetter() + self.add_behaviour(belief_setter) + + def add_custom_actions(self, actions): + @actions.add(".reply", 1) + def _reply(agent, term, intention): + message = agentspeak.grounded(term.args[0], intention.scope) + self.logger.info(f"Replying to message: {message}") + reply = self._send_to_llm(message) + self.logger.info(f"Received reply: {reply}") + + yield + + def _send_to_llm(self, message) -> str: + """TODO: implement""" + return f"This is a reply to {message}" + + diff --git a/src/control_backend/agents/bdi/behaviours/__init__.py b/src/control_backend/agents/bdi/behaviours/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/control_backend/agents/bdi/behaviours/belief_setter.py b/src/control_backend/agents/bdi/behaviours/belief_setter.py new file mode 100644 index 0000000..777dda3 --- /dev/null +++ b/src/control_backend/agents/bdi/behaviours/belief_setter.py @@ -0,0 +1,60 @@ +import asyncio +import json +import logging + +from spade.agent import Message +from spade.behaviour import CyclicBehaviour +from spade_bdi.bdi import BDIAgent + +from control_backend.core.config import settings + +class BeliefSetter(CyclicBehaviour): + """ + This is the behaviour that the BDI agent runs. + This behaviour waits for incoming message and processes it based on sender. + Currently, t only waits for messages containing beliefs from Belief Collector and adds these to its KB. + """ + agent: BDIAgent + logger = logging.getLogger("BDI/Belief Setter") + + async def run(self): + msg = await self.receive(timeout=0.1) + if msg: + self.logger.info(f"Received message {msg.body}") + self._process_message(msg) + await asyncio.sleep(1) + + def _process_message(self, message: Message): + sender = message.sender.node # removes host from jid and converts to str + self.logger.debug("Sender: %s", sender) + + match sender: + case settings.agent_settings.belief_collector_agent_name: + self.logger.debug("Processing message from belief collector.") + self._process_belief_message(message) + case _: + pass + + def _process_belief_message(self, message: Message): + if not message.body: return + + match message.thread: + case "beliefs": + try: + beliefs: dict[str, list[list[str]]] = json.loads(message.body) + self._set_beliefs(beliefs) + except json.JSONDecodeError as e: + self.logger.error("Could not decode beliefs into JSON format: %s", e) + case _: + pass + + + def _set_beliefs(self, beliefs: dict[str, list[list[str]]]): + if self.agent.bdi is None: + self.logger.warning("Cannot set beliefs, since agent's BDI is not yet initialized.") + return + + for belief, arguments_list in beliefs.items(): + for arguments in arguments_list: + self.agent.bdi.set_belief(belief, *arguments) + self.logger.info("Set belief %s with arguments %s", belief, arguments) diff --git a/src/control_backend/agents/bdi/rules.asl b/src/control_backend/agents/bdi/rules.asl new file mode 100644 index 0000000..41660a4 --- /dev/null +++ b/src/control_backend/agents/bdi/rules.asl @@ -0,0 +1,3 @@ ++user_said(Message) : not responded <- + +responded; + .reply(Message). diff --git a/src/control_backend/api/v1/endpoints/message.py b/src/control_backend/api/v1/endpoints/message.py index 1ad0b65..fef07b8 100644 --- a/src/control_backend/api/v1/endpoints/message.py +++ b/src/control_backend/api/v1/endpoints/message.py @@ -1,13 +1,23 @@ from fastapi import APIRouter, Request import logging +from zmq import Socket + from control_backend.schemas.message import Message logger = logging.getLogger(__name__) router = APIRouter() -# TODO: implement -@router.post("/message") +@router.post("/message", status_code=202) async def receive_message(message: Message, request: Request): logger.info("Received message: %s", message.message) + + topic = b"message" + body = message.model_dump_json().encode("utf-8") + + pub_socket: Socket = request.app.state.internal_comm_socket + + pub_socket.send_multipart([topic, body]) + + return {"status": "Message received"} diff --git a/src/control_backend/core/config.py b/src/control_backend/core/config.py index 8d91af5..07a828d 100644 --- a/src/control_backend/core/config.py +++ b/src/control_backend/core/config.py @@ -1,11 +1,25 @@ -from pydantic import HttpUrl +from re import L +from pydantic import BaseModel from pydantic_settings import BaseSettings, SettingsConfigDict +class ZMQSettings(BaseModel): + internal_comm_address: str = "tcp://localhost:5560" + +class AgentSettings(BaseModel): + host: str = "localhost" + bdi_core_agent_name: str = "bdi_core" + belief_collector_agent_name: str = "belief_collector" + test_agent_name: str = "test_agent" + class Settings(BaseSettings): app_title: str = "PepperPlus" - + ui_url: str = "http://localhost:5173" + + zmq_settings: ZMQSettings = ZMQSettings() + + agent_settings: AgentSettings = AgentSettings() model_config = SettingsConfigDict(env_file=".env") -settings = Settings() \ No newline at end of file +settings = Settings() diff --git a/src/control_backend/core/zmq_context.py b/src/control_backend/core/zmq_context.py new file mode 100644 index 0000000..a74544f --- /dev/null +++ b/src/control_backend/core/zmq_context.py @@ -0,0 +1,3 @@ +from zmq.asyncio import Context + +context = Context() diff --git a/src/control_backend/main.py b/src/control_backend/main.py index 8fa0428..1f377c4 100644 --- a/src/control_backend/main.py +++ b/src/control_backend/main.py @@ -1,24 +1,43 @@ +# Standard library imports +import asyncio +import json + # External imports import contextlib from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware import logging +from spade.agent import Agent, Message +from spade.behaviour import OneShotBehaviour +import zmq # Internal imports +from control_backend.agents.bdi.bdi_core import BDICoreAgent from control_backend.api.v1.router import api_router -from control_backend.core.config import settings +from control_backend.core.config import AgentSettings, settings +from control_backend.core.zmq_context import context logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.DEBUG) @contextlib.asynccontextmanager async def lifespan(app: FastAPI): logger.info("%s starting up.", app.title) + + # Initiate sockets + internal_comm_socket = context.socket(zmq.PUB) + internal_comm_address = settings.zmq_settings.internal_comm_address + internal_comm_socket.bind(internal_comm_address) + app.state.internal_comm_socket = internal_comm_socket + logger.info("Internal publishing socket bound to %s", internal_comm_socket) + + # Initiate agents + bdi_core = BDICoreAgent(settings.agent_settings.bdi_core_agent_name + '@' + settings.agent_settings.host, settings.agent_settings.bdi_core_agent_name, "src/control_backend/agents/bdi/rules.asl") + await bdi_core.start() yield logger.info("%s shutting down.", app.title) - # if __name__ == "__main__": app = FastAPI(title=settings.app_title, lifespan=lifespan) @@ -34,4 +53,4 @@ app.include_router(api_router, prefix="") # TODO: make prefix /api/v1 @app.get("/") async def root(): - return {"status": "ok"} \ No newline at end of file + return {"status": "ok"} diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 0000000..1e51aca --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,37 @@ +import sys +from unittest.mock import MagicMock + +import sys +from unittest.mock import MagicMock + +def pytest_configure(config): + """ + This hook runs at the start of the pytest session, before any tests are + collected. It mocks heavy or unavailable modules to prevent ImportErrors. + """ + # --- Mock spade and spade-bdi --- + mock_spade = MagicMock() + mock_spade.agent = MagicMock() + mock_spade.behaviour = MagicMock() + mock_spade_bdi = MagicMock() + mock_spade_bdi.bdi = MagicMock() + + mock_spade.agent.Message = MagicMock() + mock_spade.behaviour.CyclicBehaviour = type('CyclicBehaviour', (object,), {}) + mock_spade_bdi.bdi.BDIAgent = type('BDIAgent', (object,), {}) + + sys.modules['spade'] = mock_spade + sys.modules['spade.agent'] = mock_spade.agent + sys.modules['spade.behaviour'] = mock_spade.behaviour + sys.modules['spade_bdi'] = mock_spade_bdi + sys.modules['spade_bdi.bdi'] = mock_spade_bdi.bdi + + # --- Mock the config module to prevent Pydantic ImportError --- + mock_config_module = MagicMock() + + # The code under test does `from ... import settings`, so our mock module + # must have a `settings` attribute. We'll make it a MagicMock so we can + # configure it later in our tests using mocker.patch. + mock_config_module.settings = MagicMock() + + sys.modules['control_backend.core.config'] = mock_config_module diff --git a/test/unit/agents/bdi/behaviours/test_belief_setter.py b/test/unit/agents/bdi/behaviours/test_belief_setter.py new file mode 100644 index 0000000..8932834 --- /dev/null +++ b/test/unit/agents/bdi/behaviours/test_belief_setter.py @@ -0,0 +1,236 @@ +import json +import logging +from unittest.mock import MagicMock, AsyncMock, call + +import pytest + +from control_backend.agents.bdi.behaviours.belief_setter import BeliefSetter + +# Define a constant for the collector agent name to use in tests +COLLECTOR_AGENT_NAME = "belief_collector" +COLLECTOR_AGENT_JID = f"{COLLECTOR_AGENT_NAME}@test" + + +@pytest.fixture +def mock_agent(mocker): + """Fixture to create a mock BDIAgent.""" + agent = MagicMock() + agent.bdi = MagicMock() + agent.jid = "bdi_agent@test" + return agent + + +@pytest.fixture +def belief_setter(mock_agent, mocker): + """Fixture to create an instance of BeliefSetter with a mocked agent.""" + # Patch the settings to use a predictable agent name + mocker.patch( + "control_backend.agents.bdi.behaviours.belief_setter.settings.agent_settings.belief_collector_agent_name", + COLLECTOR_AGENT_NAME + ) + # Patch asyncio.sleep to prevent tests from actually waiting + mocker.patch("asyncio.sleep", return_value=None) + + setter = BeliefSetter() + setter.agent = mock_agent + # Mock the receive method, we will control its return value in each test + setter.receive = AsyncMock() + return setter + + +def create_mock_message(sender_node: str, body: str, thread: str) -> MagicMock: + """Helper function to create a configured mock message.""" + msg = MagicMock() + msg.sender.node = sender_node # MagicMock automatically creates nested mocks + msg.body = body + msg.thread = thread + return msg + + +@pytest.mark.asyncio +async def test_run_no_message_received(belief_setter, mocker): + """ + Test that when no message is received, _process_message is not called. + """ + # Arrange + belief_setter.receive.return_value = None + mocker.patch.object(belief_setter, "_process_message") + + # Act + await belief_setter.run() + + # Assert + belief_setter._process_message.assert_not_called() + + +@pytest.mark.asyncio +async def test_run_message_received(belief_setter, mocker): + """ + Test that when a message is received, _process_message is called. + """ + # Arrange + msg = MagicMock(); + belief_setter.receive.return_value = msg + mocker.patch.object(belief_setter, "_process_message") + + # Act + await belief_setter.run() + + # Assert + belief_setter._process_message.assert_called_once_with(msg) + + +def test_process_message_from_belief_collector(belief_setter, mocker): + """ + Test processing a message from the correct belief collector agent. + """ + # Arrange + msg = create_mock_message(sender_node=COLLECTOR_AGENT_NAME, body="", thread="") + mock_process_belief = mocker.patch.object(belief_setter, "_process_belief_message") + + # Act + belief_setter._process_message(msg) + + # Assert + mock_process_belief.assert_called_once_with(msg) + + +def test_process_message_from_other_agent(belief_setter, mocker): + """ + Test that messages from other agents are ignored. + """ + # Arrange + msg = create_mock_message(sender_node="other_agent", body="", thread="") + mock_process_belief = mocker.patch.object(belief_setter, "_process_belief_message") + + # Act + belief_setter._process_message(msg) + + # Assert + mock_process_belief.assert_not_called() + + +def test_process_belief_message_valid_json(belief_setter, mocker): + """ + Test processing a valid belief message with correct thread and JSON body. + """ + # Arrange + beliefs_payload = { + "is_hot": [["kitchen"]], + "is_clean": [["kitchen"], ["bathroom"]] + } + msg = create_mock_message( + sender_node=COLLECTOR_AGENT_JID, + body=json.dumps(beliefs_payload), + thread="beliefs" + ) + mock_set_beliefs = mocker.patch.object(belief_setter, "_set_beliefs") + + # Act + belief_setter._process_belief_message(msg) + + # Assert + mock_set_beliefs.assert_called_once_with(beliefs_payload) + + +def test_process_belief_message_invalid_json(belief_setter, mocker, caplog): + """ + Test that a message with invalid JSON is handled gracefully and an error is logged. + """ + # Arrange + msg = create_mock_message( + sender_node=COLLECTOR_AGENT_JID, + body="this is not a json string", + thread="beliefs" + ) + mock_set_beliefs = mocker.patch.object(belief_setter, "_set_beliefs") + + # Act + with caplog.at_level(logging.ERROR): + belief_setter._process_belief_message(msg) + + # Assert + mock_set_beliefs.assert_not_called() + assert "Could not decode beliefs into JSON format" in caplog.text + + +def test_process_belief_message_wrong_thread(belief_setter, mocker): + """ + Test that a message with an incorrect thread is ignored. + """ + # Arrange + msg = create_mock_message( + sender_node=COLLECTOR_AGENT_JID, + body='{"some": "data"}', + thread="not_beliefs" + ) + mock_set_beliefs = mocker.patch.object(belief_setter, "_set_beliefs") + + # Act + belief_setter._process_belief_message(msg) + + # Assert + mock_set_beliefs.assert_not_called() + +def test_process_belief_message_empty_body(belief_setter, mocker): + """ + Test that a message with an empty body is ignored. + """ + # Arrange + msg = create_mock_message( + sender_node=COLLECTOR_AGENT_JID, + body="", + thread="beliefs" + ) + mock_set_beliefs = mocker.patch.object(belief_setter, "_set_beliefs") + + # Act + belief_setter._process_belief_message(msg) + + # Assert + mock_set_beliefs.assert_not_called() + + +def test_set_beliefs_success(belief_setter, mock_agent, caplog): + """ + Test that beliefs are correctly set on the agent's BDI. + """ + # Arrange + beliefs_to_set = { + "is_hot": [["kitchen"], ["living_room"]], + "door_is": [["front_door", "closed"]] + } + + # Act + with caplog.at_level(logging.INFO): + belief_setter._set_beliefs(beliefs_to_set) + + # Assert + expected_calls = [ + call("is_hot", "kitchen"), + call("is_hot", "living_room"), + call("door_is", "front_door", "closed") + ] + mock_agent.bdi.set_belief.assert_has_calls(expected_calls, any_order=True) + assert mock_agent.bdi.set_belief.call_count == 3 + + # Check logs + assert "Set belief is_hot with arguments ['kitchen']" in caplog.text + assert "Set belief is_hot with arguments ['living_room']" in caplog.text + assert "Set belief door_is with arguments ['front_door', 'closed']" in caplog.text + + +def test_set_beliefs_bdi_not_initialized(belief_setter, mock_agent, caplog): + """ + Test that a warning is logged if the agent's BDI is not initialized. + """ + # Arrange + mock_agent.bdi = None # Simulate BDI not being ready + beliefs_to_set = {"is_hot": [["kitchen"]]} + + # Act + with caplog.at_level(logging.WARNING): + belief_setter._set_beliefs(beliefs_to_set) + + # Assert + assert "Cannot set beliefs, since agent's BDI is not yet initialized." in caplog.text diff --git a/test/unit/test_temp.py b/test/unit/test_temp.py deleted file mode 100644 index ac449ca..0000000 --- a/test/unit/test_temp.py +++ /dev/null @@ -1,6 +0,0 @@ -""" -Temporary file to demonstrate unit testing. -""" - -def test_temp(): - assert True diff --git a/uv.lock b/uv.lock index 4b39c29..050aa28 100644 --- a/uv.lock +++ b/uv.lock @@ -6,6 +6,18 @@ resolution-markers = [ "python_full_version < '3.14'", ] +[[package]] +name = "agentspeak" +version = "0.2.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "colorama" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/d0/a3/f8e9292cfd47aa5558f4578c498ca12c068a3a1d60ddfd0af13a87c1e47a/agentspeak-0.2.2.tar.gz", hash = "sha256:7c7fcf689fd54460597be1798ce11535f42a60c3d79af59381af3e13ef7a41bb", size = 59628, upload-time = "2024-03-21T11:55:39.026Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/03/b5/e95cbd9d9e999ac8dc4e0bb7a940112a2751cf98880b4ff0626e53d14249/agentspeak-0.2.2-py3-none-any.whl", hash = "sha256:9b454bc0adf63cb0d73fb4a3a9a489e7d892d5fbf17f750de532670736c0c4dd", size = 61628, upload-time = "2024-03-21T11:55:36.741Z" }, +] + [[package]] name = "aiodns" version = "3.5.0" @@ -1291,6 +1303,7 @@ dependencies = [ { name = "pyzmq" }, { name = "silero-vad" }, { name = "spade" }, + { name = "spade-bdi" }, { name = "torch" }, { name = "uvicorn" }, ] @@ -1314,6 +1327,7 @@ requires-dist = [ { name = "pyzmq", specifier = ">=27.1.0" }, { name = "silero-vad", specifier = ">=6.0.0" }, { name = "spade", specifier = ">=4.1.0" }, + { name = "spade-bdi", specifier = ">=0.3.2" }, { name = "torch", specifier = ">=2.8.0" }, { name = "uvicorn", specifier = ">=0.37.0" }, ] @@ -2090,6 +2104,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e9/06/21d0e937f4daa905a9a007700f59b06de644a44e5f594c3428c3ff93ca39/spade-4.1.0-py2.py3-none-any.whl", hash = "sha256:8b20e7fcb12f836cb0504e9da31f7bd867c7276440e19ebca864aecabc71b114", size = 37033, upload-time = "2025-05-22T17:19:06.524Z" }, ] +[[package]] +name = "spade-bdi" +version = "0.3.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "agentspeak" }, + { name = "loguru" }, + { name = "spade" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/16/b4/d52d9d06ad17d4b3a90ca11b64a14194f3f944f561f4da1395ce3fe3994d/spade_bdi-0.3.2.tar.gz", hash = "sha256:5d03661425f78771e39f3592f8a602ff8240465682b79d333926d3e562657d81", size = 21208, upload-time = "2025-01-03T14:16:43.755Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/90/c2/986de9abaad805d92a33912ab06b08bb81bd404bcef9ad0f2fd7a09f274b/spade_bdi-0.3.2-py2.py3-none-any.whl", hash = "sha256:2039271f586b108660a0a6a951d9ec815197caf14915317c6eec19ff496c2cff", size = 7416, upload-time = "2025-01-03T14:16:42.226Z" }, +] + [[package]] name = "sqlalchemy" version = "2.0.40"