import json import logging from unittest.mock import MagicMock, AsyncMock, call import pytest from control_backend.agents.belief_collector.behaviours.continuous_collect import ContinuousBeliefCollector @pytest.fixture def mock_agent(mocker): """Fixture to create a mock Agent.""" agent = MagicMock() agent.jid = "belief_collector_agent@test" return agent @pytest.fixture def continuous_collector(mock_agent, mocker): """Fixture to create an instance of ContinuousBeliefCollector with a mocked agent.""" # Patch asyncio.sleep to prevent tests from actually waiting mocker.patch("asyncio.sleep", return_value=None) collector = ContinuousBeliefCollector() collector.agent = mock_agent # Mock the receive method, we will control its return value in each test collector.receive = AsyncMock() return collector @pytest.mark.asyncio async def test_run_no_message_received(continuous_collector, mocker): """ Test that when no message is received, _process_message is not called. """ # Arrange continuous_collector.receive.return_value = None mocker.patch.object(continuous_collector, "_process_message") # Act await continuous_collector.run() # Assert continuous_collector._process_message.assert_not_called() @pytest.mark.asyncio async def test_run_message_received(continuous_collector, mocker): """ Test that when a message is received, _process_message is called with that message. """ # Arrange mock_msg = MagicMock() continuous_collector.receive.return_value = mock_msg mocker.patch.object(continuous_collector, "_process_message") # Act await continuous_collector.run() # Assert continuous_collector._process_message.assert_awaited_once_with(mock_msg) @pytest.mark.asyncio async def test_process_message_invalid(continuous_collector, mocker): """ Test that when an invalid JSON message is received, a warning is logged and processing stops. """ # Arrange invalid_json = "this is not json" msg = MagicMock() msg.body = invalid_json msg.sender = "belief_text_agent_mock@test" logger_mock = mocker.patch("control_backend.agents.belief_collector.behaviours.continuous_collect.logger") # Act await continuous_collector._process_message(msg) # Assert logger_mock.warning.assert_called_once() def test_get_sender_from_message(continuous_collector): """ Test that _sender_node correctly extracts the sender node from the message JID. """ # Arrange msg = MagicMock() msg.sender = "agent_node@host/resource" # Act sender_node = continuous_collector._sender_node(msg) # Assert assert sender_node == "agent_node" @pytest.mark.asyncio async def test_routes_to_handle_belief_text_by_type(continuous_collector, mocker): msg = MagicMock() msg.body = json.dumps({"type": "belief_extraction_text", "beliefs": {"user_said": [["hi"]]}}) msg.sender = "anyone@test" spy = mocker.patch.object(continuous_collector, "_handle_belief_text", new=AsyncMock()) await continuous_collector._process_message(msg) spy.assert_awaited_once() @pytest.mark.asyncio async def test_routes_to_handle_belief_text_by_sender(continuous_collector, mocker): msg = MagicMock() msg.body = json.dumps({"beliefs": {"user_said": [["hi"]]}}) # no type msg.sender = "belief_text_agent_mock@test" spy = mocker.patch.object(continuous_collector, "_handle_belief_text", new=AsyncMock()) await continuous_collector._process_message(msg) spy.assert_awaited_once() @pytest.mark.asyncio async def test_routes_to_handle_emo_text(continuous_collector, mocker): msg = MagicMock() msg.body = json.dumps({"type": "emotion_extraction_text"}) msg.sender = "anyone@test" spy = mocker.patch.object(continuous_collector, "_handle_emo_text", new=AsyncMock()) await continuous_collector._process_message(msg) spy.assert_awaited_once() @pytest.mark.asyncio async def test_unrecognized_message_logs_info(continuous_collector, mocker): msg = MagicMock() msg.body = json.dumps({"type": "something_else"}) msg.sender = "x@test" logger_mock = mocker.patch("control_backend.agents.belief_collector.behaviours.continuous_collect.logger") await continuous_collector._process_message(msg) logger_mock.info.assert_any_call( "BeliefCollector: unrecognized message (sender=%s, type=%r). Ignoring.", "x", "something_else" ) @pytest.mark.asyncio async def test_belief_text_no_beliefs(continuous_collector, mocker): msg_payload = {"type": "belief_extraction_text"} # no 'beliefs' logger_mock = mocker.patch("control_backend.agents.belief_collector.behaviours.continuous_collect.logger") await continuous_collector._handle_belief_text(msg_payload, "origin_node") logger_mock.info.assert_any_call("BeliefCollector: no beliefs to process.") @pytest.mark.asyncio async def test_belief_text_beliefs_not_dict(continuous_collector, mocker): payload = {"type": "belief_extraction_text", "beliefs": ["not", "a", "dict"]} logger_mock = mocker.patch("control_backend.agents.belief_collector.behaviours.continuous_collect.logger") await continuous_collector._handle_belief_text(payload, "origin") logger_mock.warning.assert_any_call("BeliefCollector: 'beliefs' is not a dict: %r", ["not", "a", "dict"]) @pytest.mark.asyncio async def test_belief_text_values_not_lists(continuous_collector, mocker): payload = {"type": "belief_extraction_text", "beliefs": {"user_said": "not-a-list"}} logger_mock = mocker.patch("control_backend.agents.belief_collector.behaviours.continuous_collect.logger") await continuous_collector._handle_belief_text(payload, "origin") logger_mock.warning.assert_any_call( "BeliefCollector: 'beliefs' values are not all lists: %r", {"user_said": "not-a-list"} ) @pytest.mark.asyncio async def test_belief_text_happy_path_logs_items_and_sends(continuous_collector, mocker): payload = { "type": "belief_extraction_text", "beliefs": {"user_said": [["hello", "test"], ["No"]]} } # Your code calls self.send(..); patch it (or switch implementation to self.agent.send and patch that) continuous_collector.send = AsyncMock() logger_mock = mocker.patch("control_backend.agents.belief_collector.behaviours.continuous_collect.logger") await continuous_collector._handle_belief_text(payload, "belief_text_agent_mock") logger_mock.info.assert_any_call("BeliefCollector: forwarding %d beliefs.", 1) # and the item logs: logger_mock.info.assert_any_call(" - %s %s", "user_said", "hello test") logger_mock.info.assert_any_call(" - %s %s", "user_said", "No") # make sure we attempted a send continuous_collector.send.assert_awaited_once() @pytest.mark.asyncio async def test_send_beliefs_noop_on_empty(continuous_collector): continuous_collector.send = AsyncMock() await continuous_collector._send_beliefs_to_bdi([], origin="o") continuous_collector.send.assert_not_awaited() @pytest.mark.asyncio async def test_send_beliefs_sends_json_packet(continuous_collector): # Patch .send and capture the message body sent = {} async def _fake_send(msg): sent["body"] = msg.body sent["to"] = str(msg.to) continuous_collector.send = AsyncMock(side_effect=_fake_send) beliefs = ["user_said hello", "user_said No"] await continuous_collector._send_beliefs_to_bdi(beliefs, origin="origin_node") assert "belief_packet" in json.loads(sent["body"])["type"] assert json.loads(sent["body"])["beliefs"] == beliefs def test_sender_node_no_sender_returns_literal(continuous_collector): msg = MagicMock() msg.sender = None assert continuous_collector._sender_node(msg) == "no_sender" def test_sender_node_without_at(continuous_collector): msg = MagicMock() msg.sender = "localpartonly" assert continuous_collector._sender_node(msg) == "localpartonly" @pytest.mark.asyncio async def test_belief_text_coerces_non_strings(continuous_collector, mocker): payload = {"type": "belief_extraction_text", "beliefs": {"user_said": [["hi", 123]]}} continuous_collector.send = AsyncMock() await continuous_collector._handle_belief_text(payload, "origin") continuous_collector.send.assert_awaited_once()