[
Before it was a list of a dict of lists of lists of strings
now it is a dict of lists of lists of strings as prescribed by architecture (knowledge base)
*also added some tests, but will have to add some more
]
[ref]: N25B-206
90 lines
2.7 KiB
Python
90 lines
2.7 KiB
Python
import json
|
|
import logging
|
|
from unittest.mock import MagicMock, AsyncMock, call
|
|
|
|
import pytest
|
|
|
|
from control_backend.agents.belief_collector.behaviours.continuous_collect import ContinuousBeliefCollector
|
|
|
|
@pytest.fixture
|
|
def mock_agent(mocker):
|
|
"""Fixture to create a mock Agent."""
|
|
agent = MagicMock()
|
|
agent.jid = "belief_collector_agent@test"
|
|
return agent
|
|
|
|
@pytest.fixture
|
|
def continuous_collector(mock_agent, mocker):
|
|
"""Fixture to create an instance of ContinuousBeliefCollector with a mocked agent."""
|
|
# Patch asyncio.sleep to prevent tests from actually waiting
|
|
mocker.patch("asyncio.sleep", return_value=None)
|
|
|
|
collector = ContinuousBeliefCollector()
|
|
collector.agent = mock_agent
|
|
# Mock the receive method, we will control its return value in each test
|
|
collector.receive = AsyncMock()
|
|
return collector
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_no_message_received(continuous_collector, mocker):
|
|
"""
|
|
Test that when no message is received, _process_message is not called.
|
|
"""
|
|
# Arrange
|
|
continuous_collector.receive.return_value = None
|
|
mocker.patch.object(continuous_collector, "_process_message")
|
|
|
|
# Act
|
|
await continuous_collector.run()
|
|
|
|
# Assert
|
|
continuous_collector._process_message.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_message_received(continuous_collector, mocker):
|
|
"""
|
|
Test that when a message is received, _process_message is called with that message.
|
|
"""
|
|
# Arrange
|
|
mock_msg = MagicMock()
|
|
continuous_collector.receive.return_value = mock_msg
|
|
mocker.patch.object(continuous_collector, "_process_message")
|
|
|
|
# Act
|
|
await continuous_collector.run()
|
|
|
|
# Assert
|
|
continuous_collector._process_message.assert_awaited_once_with(mock_msg)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_message_invalid(continuous_collector, mocker):
|
|
"""
|
|
Test that when an invalid JSON message is received, a warning is logged and processing stops.
|
|
"""
|
|
# Arrange
|
|
invalid_json = "this is not json"
|
|
msg = MagicMock()
|
|
msg.body = invalid_json
|
|
msg.sender = "belief_text_agent_mock@test"
|
|
|
|
logger_mock = mocker.patch("control_backend.agents.belief_collector.behaviours.continuous_collect.logger")
|
|
|
|
# Act
|
|
await continuous_collector._process_message(msg)
|
|
|
|
# Assert
|
|
logger_mock.warning.assert_called_once()
|
|
|
|
def test_get_sender_from_message(continuous_collector):
|
|
"""
|
|
Test that _sender_node correctly extracts the sender node from the message JID.
|
|
"""
|
|
# Arrange
|
|
msg = MagicMock()
|
|
msg.sender = "agent_node@host/resource"
|
|
|
|
# Act
|
|
sender_node = continuous_collector._sender_node(msg)
|
|
|
|
# Assert
|
|
assert sender_node == "agent_node" |