diff --git a/src/control_backend/agents/bdi/behaviours/text_belief_extractor.py b/src/control_backend/agents/bdi/behaviours/text_belief_extractor.py new file mode 100644 index 0000000..ea1b04f --- /dev/null +++ b/src/control_backend/agents/bdi/behaviours/text_belief_extractor.py @@ -0,0 +1,96 @@ +import asyncio +import json +import logging + +from spade.behaviour import CyclicBehaviour +from spade.message import Message + +from control_backend.core.config import settings + + +class BeliefFromText(CyclicBehaviour): + logger = logging.getLogger("Belief From Text") + + # TODO: LLM prompt nog hardcoded + llm_instruction_prompt = """ + You are an information extraction assistent for a BDI agent. Your task is to extract values from a user's text to bind a list of ungrounded beliefs. Rules: + You will receive a JSON object with "beliefs" (a list of ungrounded AgentSpeak beliefs) and "text" (user's transcript). + Analyze the text to find values that sematically match the variables (X,Y,Z) in the beliefs. + A single piece of text might contain multiple instances that match a belief. + Respond ONLY with a single JSON object. + The JSON object's keys should be the belief functors (e.g., "weather"). + The value for each key must be a list of lists. + Each inner list must contain the extracted arguments (as strings) for one instance of that belief. + CRITICAL: If no information in the text matches a belief, DO NOT include that key in your response. + """ + + # on_start agent receives message containing the beliefs to look out for and sets up the LLM with instruction prompt + #async def on_start(self): + # msg = await self.receive(timeout=0.1) + # self.beliefs = dict uit message + # send instruction prompt to LLM + + beliefs: dict[str, list[str]] + beliefs = { + "mood": ["X"], + "car": ["Y"] + } + + async def run(self): + msg = await self.receive(timeout=0.1) + if msg: + sender = msg.sender.node + match sender: + # TODO: Change to Transcriber agent name once implemented + case settings.agent_settings.test_agent_name: + self.logger.info("Received text from transcriber.") + await self._process_transcription_demo(msg.body) + case _: + self.logger.info("Received message from other agent.") + pass + await asyncio.sleep(1) + + async def _process_transcription(self, text: str): + text_prompt = f"Text: {text}" + + beliefs_prompt = "These are the beliefs to be bound:\n" + for belief, values in self.beliefs.items(): + beliefs_prompt += f"{belief}({', '.join(values)})\n" + + prompt = text_prompt + beliefs_prompt + self.logger.info(prompt) + #prompt_msg = Message(to="LLMAgent@whatever") + #response = self.send(prompt_msg) + + # Mock response; response is beliefs in JSON format, it parses do dict[str,list[list[str]]] + response = '{"mood": [["happy"]]}' + # Verify by trying to parse + try: + json.loads(response) + belief_message = Message( + to=settings.agent_settings.bdi_core_agent_name + '@' + settings.agent_settings.host, + body=response) + belief_message.thread = "beliefs" + + await self.send(belief_message) + self.logger.info("Sent beliefs to BDI.") + except json.JSONDecodeError: + # Parsing failed, so the response is in the wrong format, log warning + self.logger.warning("Received LLM response in incorrect format.") + + async def _process_transcription_demo(self, txt: str): + """ + Demo version to process the transcription input to beliefs. For the demo only the belief + 'user_said' is relevant, so this function simply makes a dict with key: "user_said", + value: txt and passes this to the Belief Collector agent. + """ + belief = {"user_said": [txt]} + payload = json.dumps(belief) + # TODO: Change to belief collector + belief_msg = Message(to=settings.agent_settings.bdi_core_agent_name + + '@' + settings.agent_settings.host, + body=payload) + belief_msg.thread = "beliefs" + + await self.send(belief_msg) + self.logger.info("Sent beliefs to Belief Collector.") diff --git a/src/control_backend/agents/bdi/text_extractor.py b/src/control_backend/agents/bdi/text_extractor.py new file mode 100644 index 0000000..596a3fe --- /dev/null +++ b/src/control_backend/agents/bdi/text_extractor.py @@ -0,0 +1,9 @@ +from spade.agent import Agent + +from control_backend.agents.bdi.behaviours.text_belief_extractor import BeliefFromText + + +class TBeliefExtractor(Agent): + async def setup(self): + self.b = BeliefFromText() + self.add_behaviour(self.b) \ No newline at end of file diff --git a/src/control_backend/agents/belief_collector/__init__.py b/src/control_backend/agents/belief_collector/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/control_backend/agents/belief_collector/behaviours/continuous_collect.py b/src/control_backend/agents/belief_collector/behaviours/continuous_collect.py new file mode 100644 index 0000000..50986cd --- /dev/null +++ b/src/control_backend/agents/belief_collector/behaviours/continuous_collect.py @@ -0,0 +1,118 @@ +import json +import logging +from spade.behaviour import CyclicBehaviour +from spade.agent import Message +from control_backend.core.config import settings + +logger = logging.getLogger(__name__) + +class ContinuousBeliefCollector(CyclicBehaviour): + """ + Continuously collects beliefs/emotions from extractor agents: + Then we send a unified belief packet to the BDI agent. + """ + + async def run(self): + msg = await self.receive(timeout=0.1) # Wait for 0.1s + if msg: + await self._process_message(msg) + + + async def _process_message(self, msg: Message): + sender_node = self._sender_node(msg) + + # Parse JSON payload + try: + payload = json.loads(msg.body) + except Exception as e: + logger.warning( + "BeliefCollector: failed to parse JSON from %s. Body=%r Error=%s", + sender_node, msg.body, e + ) + return + + msg_type = payload.get("type") + + # Prefer explicit 'type' field + if msg_type == "belief_extraction_text" or sender_node == "belief_text_agent_mock": + logger.info("BeliefCollector: message routed to _handle_belief_text (sender=%s)", sender_node) + await self._handle_belief_text(payload, sender_node) + #This is not implemented yet, but we keep the structure for future use + elif msg_type == "emotion_extraction_text" or sender_node == "emo_text_agent_mock": + logger.info("BeliefCollector: message routed to _handle_emo_text (sender=%s)", sender_node) + await self._handle_emo_text(payload, sender_node) + else: + logger.info( + "BeliefCollector: unrecognized message (sender=%s, type=%r). Ignoring.", + sender_node, msg_type + ) + + @staticmethod + def _sender_node(msg: Message) -> str: + """ + Extracts the 'node' (localpart) of the sender JID. + E.g., 'agent@host/resource' -> 'agent' + """ + s = str(msg.sender) if msg.sender is not None else "no_sender" + return s.split("@", 1)[0] if "@" in s else s + + + async def _handle_belief_text(self, payload: dict, origin: str): + """ + Expected payload: + { + "type": "belief_extraction_text", + "beliefs": {"user_said": ["hello"","Can you help me?","stop talking to me","No","Pepper do a dance"]} + + } + + """ + beliefs = payload.get("beliefs", {}) + + if not beliefs: + logger.info("BeliefCollector: no beliefs to process.") + return + + if not isinstance(beliefs, dict): + logger.warning("BeliefCollector: 'beliefs' is not a dict: %r", beliefs) + return + + if not all(isinstance(v, list) for v in beliefs.values()): + logger.warning("BeliefCollector: 'beliefs' values are not all lists: %r", beliefs) + return + + logger.info("BeliefCollector: forwarding %d beliefs.", len(beliefs)) + for belief_name, belief_list in beliefs.items(): + for belief in belief_list: + logger.info(" - %s %s", belief_name,str(belief)) + + await self._send_beliefs_to_bdi(beliefs, origin=origin) + + + + async def _handle_emo_text(self, payload: dict, origin: str): + """TODO: implement (after we have emotional recogntion)""" + pass + + + async def _send_beliefs_to_bdi(self, beliefs: list[str], origin: str | None = None): + """ + Sends a unified belief packet to the BDI agent. + """ + if not beliefs: + return + + to_jid = f"{settings.agent_settings.bdi_core_agent_name}@{settings.agent_settings.host}" + + packet = { + "type": "belief_packet", + "origin": origin, + "beliefs": beliefs, + } + + msg = Message(to=to_jid) + msg.body = json.dumps(packet) + + + await self.send(msg) + logger.info("BeliefCollector: sent %d belief(s) to BDI at %s", len(beliefs), to_jid) diff --git a/src/control_backend/agents/belief_collector/belief_collector.py b/src/control_backend/agents/belief_collector/belief_collector.py new file mode 100644 index 0000000..dbb6095 --- /dev/null +++ b/src/control_backend/agents/belief_collector/belief_collector.py @@ -0,0 +1,13 @@ +import logging +from spade.agent import Agent + +from .behaviours.continuous_collect import ContinuousBeliefCollector + +logger = logging.getLogger(__name__) + +class BeliefCollectorAgent(Agent): + async def setup(self): + logger.info("BeliefCollectorAgent starting (%s)", self.jid) + # Attach the continuous collector behaviour (listens and forwards to BDI) + self.add_behaviour(ContinuousBeliefCollector()) + logger.info("BeliefCollectorAgent ready.") \ No newline at end of file diff --git a/src/control_backend/agents/mock_agents/__init__.py b/src/control_backend/agents/mock_agents/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/control_backend/agents/mock_agents/belief_text_mock.py b/src/control_backend/agents/mock_agents/belief_text_mock.py new file mode 100644 index 0000000..607c2f5 --- /dev/null +++ b/src/control_backend/agents/mock_agents/belief_text_mock.py @@ -0,0 +1,29 @@ +import json +from spade.agent import Agent +from spade.behaviour import OneShotBehaviour +from spade.message import Message +from control_backend.core.config import settings + +class BeliefTextAgent(Agent): + class SendOnceBehaviourBlfText(OneShotBehaviour): + async def run(self): + to_jid = f"{settings.agent_settings.belief_collector_agent_name}@{settings.agent_settings.host}" + + # Send multiple beliefs in one JSON payload + payload = { + "type": "belief_extraction_text", + "beliefs": {"user_said": ["hello test","Can you help me?","stop talking to me","No","Pepper do a dance"]} + } + + msg = Message(to=to_jid) + msg.body = json.dumps(payload) + await self.send(msg) + print(f"Beliefs sent to {to_jid}!") + + self.exit_code = "Job Finished!" + await self.agent.stop() + + async def setup(self): + print("BeliefTextAgent started") + self.b = self.SendOnceBehaviourBlfText() + self.add_behaviour(self.b) diff --git a/src/control_backend/core/config.py b/src/control_backend/core/config.py index ea45b8d..5e4b764 100644 --- a/src/control_backend/core/config.py +++ b/src/control_backend/core/config.py @@ -10,6 +10,7 @@ class AgentSettings(BaseModel): host: str = "localhost" bdi_core_agent_name: str = "bdi_core" belief_collector_agent_name: str = "belief_collector" + text_belief_extractor_agent_name: str = "text_belief_extractor" vad_agent_name: str = "vad_agent" llm_agent_name: str = "llm_agent" test_agent_name: str = "test_agent" @@ -20,8 +21,8 @@ class AgentSettings(BaseModel): class LLMSettings(BaseModel): - local_llm_url: str = "http://145.107.82.68:1234/v1/chat/completions" - local_llm_model: str = "openai/gpt-oss-120b" + local_llm_url: str = "http://localhost:1234/v1/chat/completions" + local_llm_model: str = "openai/gpt-oss-20b" class Settings(BaseSettings): app_title: str = "PepperPlus" diff --git a/src/control_backend/main.py b/src/control_backend/main.py index 0050efa..d3588ea 100644 --- a/src/control_backend/main.py +++ b/src/control_backend/main.py @@ -13,6 +13,8 @@ from control_backend.agents.ri_communication_agent import RICommunicationAgent from control_backend.agents.bdi.bdi_core import BDICoreAgent from control_backend.agents.vad_agent import VADAgent from control_backend.agents.llm.llm import LLMAgent +from control_backend.agents.bdi.text_extractor import TBeliefExtractor +from control_backend.agents.belief_collector.belief_collector import BeliefCollectorAgent from control_backend.api.v1.router import api_router from control_backend.core.config import settings from control_backend.core.zmq_context import context @@ -41,15 +43,32 @@ async def lifespan(app: FastAPI): bind=True, ) await ri_communication_agent.start() - - - llm_agent = LLMAgent(settings.agent_settings.llm_agent_name + '@' + settings.agent_settings.host, - settings.agent_settings.llm_agent_name) + + llm_agent = LLMAgent( + settings.agent_settings.llm_agent_name + '@' + settings.agent_settings.host, + settings.agent_settings.llm_agent_name, + ) await llm_agent.start() - bdi_core = BDICoreAgent(settings.agent_settings.bdi_core_agent_name + '@' + settings.agent_settings.host, - settings.agent_settings.bdi_core_agent_name, "src/control_backend/agents/bdi/rules.asl") + + bdi_core = BDICoreAgent( + settings.agent_settings.bdi_core_agent_name + '@' + settings.agent_settings.host, + settings.agent_settings.bdi_core_agent_name, + "src/control_backend/agents/bdi/rules.asl", + ) await bdi_core.start() + belief_collector = BeliefCollectorAgent( + settings.agent_settings.belief_collector_agent_name + '@' + settings.agent_settings.host, + settings.agent_settings.belief_collector_agent_name, + ) + await belief_collector.start() + + text_belief_extractor = TBeliefExtractor( + settings.agent_settings.text_belief_extractor_agent_name + '@' + settings.agent_settings.host, + settings.agent_settings.text_belief_extractor_agent_name, + ) + await text_belief_extractor.start() + _temp_vad_agent = VADAgent("tcp://localhost:5558", False) await _temp_vad_agent.start() diff --git a/test/unit/agents/belief_collector/behaviours/test_continuous_collect.py b/test/unit/agents/belief_collector/behaviours/test_continuous_collect.py new file mode 100644 index 0000000..7629fe5 --- /dev/null +++ b/test/unit/agents/belief_collector/behaviours/test_continuous_collect.py @@ -0,0 +1,209 @@ +import json +import logging +from unittest.mock import MagicMock, AsyncMock, call + +import pytest + +from control_backend.agents.belief_collector.behaviours.continuous_collect import ContinuousBeliefCollector + +@pytest.fixture +def mock_agent(mocker): + """Fixture to create a mock Agent.""" + agent = MagicMock() + agent.jid = "belief_collector_agent@test" + return agent + +@pytest.fixture +def continuous_collector(mock_agent, mocker): + """Fixture to create an instance of ContinuousBeliefCollector with a mocked agent.""" + # Patch asyncio.sleep to prevent tests from actually waiting + mocker.patch("asyncio.sleep", return_value=None) + + collector = ContinuousBeliefCollector() + collector.agent = mock_agent + # Mock the receive method, we will control its return value in each test + collector.receive = AsyncMock() + return collector + +@pytest.mark.asyncio +async def test_run_no_message_received(continuous_collector, mocker): + """ + Test that when no message is received, _process_message is not called. + """ + # Arrange + continuous_collector.receive.return_value = None + mocker.patch.object(continuous_collector, "_process_message") + + # Act + await continuous_collector.run() + + # Assert + continuous_collector._process_message.assert_not_called() + +@pytest.mark.asyncio +async def test_run_message_received(continuous_collector, mocker): + """ + Test that when a message is received, _process_message is called with that message. + """ + # Arrange + mock_msg = MagicMock() + continuous_collector.receive.return_value = mock_msg + mocker.patch.object(continuous_collector, "_process_message") + + # Act + await continuous_collector.run() + + # Assert + continuous_collector._process_message.assert_awaited_once_with(mock_msg) + +@pytest.mark.asyncio +async def test_process_message_invalid(continuous_collector, mocker): + """ + Test that when an invalid JSON message is received, a warning is logged and processing stops. + """ + # Arrange + invalid_json = "this is not json" + msg = MagicMock() + msg.body = invalid_json + msg.sender = "belief_text_agent_mock@test" + + logger_mock = mocker.patch("control_backend.agents.belief_collector.behaviours.continuous_collect.logger") + + # Act + await continuous_collector._process_message(msg) + + # Assert + logger_mock.warning.assert_called_once() + +def test_get_sender_from_message(continuous_collector): + """ + Test that _sender_node correctly extracts the sender node from the message JID. + """ + # Arrange + msg = MagicMock() + msg.sender = "agent_node@host/resource" + + # Act + sender_node = continuous_collector._sender_node(msg) + + # Assert + assert sender_node == "agent_node" + +@pytest.mark.asyncio +async def test_routes_to_handle_belief_text_by_type(continuous_collector, mocker): + msg = MagicMock() + msg.body = json.dumps({"type": "belief_extraction_text", "beliefs": {"user_said": [["hi"]]}}) + msg.sender = "anyone@test" + spy = mocker.patch.object(continuous_collector, "_handle_belief_text", new=AsyncMock()) + await continuous_collector._process_message(msg) + spy.assert_awaited_once() + +@pytest.mark.asyncio +async def test_routes_to_handle_belief_text_by_sender(continuous_collector, mocker): + msg = MagicMock() + msg.body = json.dumps({"beliefs": {"user_said": [["hi"]]}}) # no type + msg.sender = "belief_text_agent_mock@test" + spy = mocker.patch.object(continuous_collector, "_handle_belief_text", new=AsyncMock()) + await continuous_collector._process_message(msg) + spy.assert_awaited_once() + +@pytest.mark.asyncio +async def test_routes_to_handle_emo_text(continuous_collector, mocker): + msg = MagicMock() + msg.body = json.dumps({"type": "emotion_extraction_text"}) + msg.sender = "anyone@test" + spy = mocker.patch.object(continuous_collector, "_handle_emo_text", new=AsyncMock()) + await continuous_collector._process_message(msg) + spy.assert_awaited_once() + +@pytest.mark.asyncio +async def test_unrecognized_message_logs_info(continuous_collector, mocker): + msg = MagicMock() + msg.body = json.dumps({"type": "something_else"}) + msg.sender = "x@test" + logger_mock = mocker.patch("control_backend.agents.belief_collector.behaviours.continuous_collect.logger") + await continuous_collector._process_message(msg) + logger_mock.info.assert_any_call( + "BeliefCollector: unrecognized message (sender=%s, type=%r). Ignoring.", "x", "something_else" + ) + + +@pytest.mark.asyncio +async def test_belief_text_no_beliefs(continuous_collector, mocker): + msg_payload = {"type": "belief_extraction_text"} # no 'beliefs' + logger_mock = mocker.patch("control_backend.agents.belief_collector.behaviours.continuous_collect.logger") + await continuous_collector._handle_belief_text(msg_payload, "origin_node") + logger_mock.info.assert_any_call("BeliefCollector: no beliefs to process.") + +@pytest.mark.asyncio +async def test_belief_text_beliefs_not_dict(continuous_collector, mocker): + payload = {"type": "belief_extraction_text", "beliefs": ["not", "a", "dict"]} + logger_mock = mocker.patch("control_backend.agents.belief_collector.behaviours.continuous_collect.logger") + await continuous_collector._handle_belief_text(payload, "origin") + logger_mock.warning.assert_any_call("BeliefCollector: 'beliefs' is not a dict: %r", ["not", "a", "dict"]) + +@pytest.mark.asyncio +async def test_belief_text_values_not_lists(continuous_collector, mocker): + payload = {"type": "belief_extraction_text", "beliefs": {"user_said": "not-a-list"}} + logger_mock = mocker.patch("control_backend.agents.belief_collector.behaviours.continuous_collect.logger") + await continuous_collector._handle_belief_text(payload, "origin") + logger_mock.warning.assert_any_call( + "BeliefCollector: 'beliefs' values are not all lists: %r", {"user_said": "not-a-list"} + ) + +@pytest.mark.asyncio +async def test_belief_text_happy_path_logs_items_and_sends(continuous_collector, mocker): + payload = { + "type": "belief_extraction_text", + "beliefs": {"user_said": ["hello test", "No"]} + } + # Your code calls self.send(..); patch it (or switch implementation to self.agent.send and patch that) + continuous_collector.send = AsyncMock() + logger_mock = mocker.patch("control_backend.agents.belief_collector.behaviours.continuous_collect.logger") + await continuous_collector._handle_belief_text(payload, "belief_text_agent_mock") + + logger_mock.info.assert_any_call("BeliefCollector: forwarding %d beliefs.", 1) + # and the item logs: + logger_mock.info.assert_any_call(" - %s %s", "user_said", "hello test") + logger_mock.info.assert_any_call(" - %s %s", "user_said", "No") + # make sure we attempted a send + continuous_collector.send.assert_awaited_once() + +@pytest.mark.asyncio +async def test_send_beliefs_noop_on_empty(continuous_collector): + continuous_collector.send = AsyncMock() + await continuous_collector._send_beliefs_to_bdi([], origin="o") + continuous_collector.send.assert_not_awaited() + +@pytest.mark.asyncio +async def test_send_beliefs_sends_json_packet(continuous_collector): + # Patch .send and capture the message body + sent = {} + + async def _fake_send(msg): + sent["body"] = msg.body + sent["to"] = str(msg.to) + + continuous_collector.send = AsyncMock(side_effect=_fake_send) + beliefs = ["user_said hello", "user_said No"] + await continuous_collector._send_beliefs_to_bdi(beliefs, origin="origin_node") + + assert "belief_packet" in json.loads(sent["body"])["type"] + assert json.loads(sent["body"])["beliefs"] == beliefs + +def test_sender_node_no_sender_returns_literal(continuous_collector): + msg = MagicMock() + msg.sender = None + assert continuous_collector._sender_node(msg) == "no_sender" + +def test_sender_node_without_at(continuous_collector): + msg = MagicMock() + msg.sender = "localpartonly" + assert continuous_collector._sender_node(msg) == "localpartonly" + +@pytest.mark.asyncio +async def test_belief_text_coerces_non_strings(continuous_collector, mocker): + payload = {"type": "belief_extraction_text", "beliefs": {"user_said": [["hi", 123]]}} + continuous_collector.send = AsyncMock() + await continuous_collector._handle_belief_text(payload, "origin") + continuous_collector.send.assert_awaited_once()