import asyncio import pytest from unittest.mock import AsyncMock, MagicMock, patch, ANY from control_backend.agents.ri_communication_agent import RICommunicationAgent def fake_json_correct_negototiate_1(): return AsyncMock( return_value={ "endpoint": "negotiate/ports", "data": [ {"id": "main", "port": 5555, "bind": False}, {"id": "actuation", "port": 5556, "bind": True}, ], } ) def fake_json_correct_negototiate_2(): return AsyncMock( return_value={ "endpoint": "negotiate/ports", "data": [ {"id": "main", "port": 5555, "bind": False}, {"id": "actuation", "port": 5557, "bind": True}, ], } ) def fake_json_correct_negototiate_3(): return AsyncMock( return_value={ "endpoint": "negotiate/ports", "data": [ {"id": "main", "port": 5555, "bind": True}, {"id": "actuation", "port": 5557, "bind": True}, ], } ) def fake_json_correct_negototiate_4(): # Different port, do bind return AsyncMock( return_value={ "endpoint": "negotiate/ports", "data": [ {"id": "main", "port": 4555, "bind": True}, {"id": "actuation", "port": 5557, "bind": True}, ], } ) def fake_json_correct_negototiate_5(): # Different port, dont bind. return AsyncMock( return_value={ "endpoint": "negotiate/ports", "data": [ {"id": "main", "port": 4555, "bind": False}, {"id": "actuation", "port": 5557, "bind": True}, ], } ) def fake_json_wrong_negototiate_1(): return AsyncMock(return_value={"endpoint": "ping", "data": ""}) def fake_json_invalid_id_negototiate(): return AsyncMock( return_value={ "endpoint": "negotiate/ports", "data": [ {"id": "banana", "port": 4555, "bind": False}, {"id": "tomato", "port": 5557, "bind": True}, ], } ) @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_1(monkeypatch): """ Test the setup of the communication agent """ # --- Arrange --- fake_socket = MagicMock() fake_socket.send_json = AsyncMock() fake_socket.recv_json = fake_json_correct_negototiate_1() fake_pub_socket = AsyncMock() # Mock context.socket to return our fake socket monkeypatch.setattr( "control_backend.agents.ri_communication_agent.context.socket", lambda _: fake_socket ) # Mock RICommandAgent agent startup with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() fake_pub_socket = AsyncMock() # --- Act --- agent = RICommunicationAgent( "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) await agent.setup() # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": {}}) fake_socket.recv_json.assert_awaited() fake_agent_instance.start.assert_awaited() MockCommandAgent.assert_called_once_with( ANY, # Server Name ANY, # Server Password address="tcp://*:5556", # derived from the 'port' value in negotiation bind=True, ) # Ensure the agent attached a ListenBehaviour assert any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_2(monkeypatch): """ Test the setup of the communication agent """ # --- Arrange --- fake_socket = MagicMock() fake_socket.send_json = AsyncMock() fake_socket.recv_json = fake_json_correct_negototiate_2() # Mock context.socket to return our fake socket monkeypatch.setattr( "control_backend.agents.ri_communication_agent.context.socket", lambda _: fake_socket ) # Mock RICommandAgent agent startup with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() fake_pub_socket = AsyncMock() # --- Act --- agent = RICommunicationAgent( "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) await agent.setup() # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": {}}) fake_socket.recv_json.assert_awaited() fake_agent_instance.start.assert_awaited() MockCommandAgent.assert_called_once_with( ANY, # Server Name ANY, # Server Password address="tcp://*:5557", # derived from the 'port' value in negotiation bind=True, ) # Ensure the agent attached a ListenBehaviour assert any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_3(monkeypatch, caplog): """ Test the functionality of setup with incorrect negotiation message """ # --- Arrange --- fake_socket = MagicMock() fake_socket.send_json = AsyncMock() fake_socket.recv_json = fake_json_wrong_negototiate_1() # Mock context.socket to return our fake socket monkeypatch.setattr( "control_backend.agents.ri_communication_agent.context.socket", lambda _: fake_socket ) # Mock RICommandAgent agent startup # We are sending wrong negotiation info to the communication agent, so we should retry and expect a # better response, within a limited time. with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() fake_pub_socket = AsyncMock() # --- Act --- with caplog.at_level("ERROR"): agent = RICommunicationAgent( "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) await agent.setup(max_retries=1) # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") fake_socket.recv_json.assert_awaited() # Since it failed, there should not be any command agent. fake_agent_instance.start.assert_not_awaited() assert "Failed to set up RICommunicationAgent" in caplog.text # Ensure the agent did not attach a ListenBehaviour assert not any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_4(monkeypatch): """ Test the setup of the communication agent with different bind value """ # --- Arrange --- fake_socket = MagicMock() fake_socket.send_json = AsyncMock() fake_socket.recv_json = fake_json_correct_negototiate_3() # Mock context.socket to return our fake socket monkeypatch.setattr( "control_backend.agents.ri_communication_agent.context.socket", lambda _: fake_socket ) # Mock RICommandAgent agent startup with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() fake_pub_socket = AsyncMock() # --- Act --- agent = RICommunicationAgent( "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=True ) await agent.setup() # --- Assert --- fake_socket.bind.assert_any_call("tcp://localhost:5555") fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": {}}) fake_socket.recv_json.assert_awaited() fake_agent_instance.start.assert_awaited() MockCommandAgent.assert_called_once_with( ANY, # Server Name ANY, # Server Password address="tcp://*:5557", # derived from the 'port' value in negotiation bind=True, ) # Ensure the agent attached a ListenBehaviour assert any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_5(monkeypatch): """ Test the setup of the communication agent """ # --- Arrange --- fake_socket = MagicMock() fake_socket.send_json = AsyncMock() fake_socket.recv_json = fake_json_correct_negototiate_4() # Mock context.socket to return our fake socket monkeypatch.setattr( "control_backend.agents.ri_communication_agent.context.socket", lambda _: fake_socket ) # Mock RICommandAgent agent startup with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() fake_pub_socket = AsyncMock() # --- Act --- agent = RICommunicationAgent( "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) await agent.setup() # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": {}}) fake_socket.recv_json.assert_awaited() fake_agent_instance.start.assert_awaited() MockCommandAgent.assert_called_once_with( ANY, # Server Name ANY, # Server Password address="tcp://*:5557", # derived from the 'port' value in negotiation bind=True, ) # Ensure the agent attached a ListenBehaviour assert any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_6(monkeypatch): """ Test the setup of the communication agent """ # --- Arrange --- fake_socket = MagicMock() fake_socket.send_json = AsyncMock() fake_socket.recv_json = fake_json_correct_negototiate_5() # Mock context.socket to return our fake socket monkeypatch.setattr( "control_backend.agents.ri_communication_agent.context.socket", lambda _: fake_socket ) # Mock RICommandAgent agent startup with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() fake_pub_socket = AsyncMock() # --- Act --- agent = RICommunicationAgent( "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) await agent.setup() # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": {}}) fake_socket.recv_json.assert_awaited() fake_agent_instance.start.assert_awaited() MockCommandAgent.assert_called_once_with( ANY, # Server Name ANY, # Server Password address="tcp://*:5557", # derived from the 'port' value in negotiation bind=True, ) # Ensure the agent attached a ListenBehaviour assert any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_7(monkeypatch, caplog): """ Test the functionality of setup with incorrect id """ # --- Arrange --- fake_socket = MagicMock() fake_socket.send_json = AsyncMock() fake_socket.recv_json = fake_json_invalid_id_negototiate() # Mock context.socket to return our fake socket monkeypatch.setattr( "control_backend.agents.ri_communication_agent.context.socket", lambda _: fake_socket ) # Mock RICommandAgent agent startup # We are sending wrong negotiation info to the communication agent, so we should retry and expect a # better response, within a limited time. with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() fake_pub_socket = AsyncMock() # --- Act --- with caplog.at_level("WARNING"): agent = RICommunicationAgent( "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) await agent.setup(max_retries=1) # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") fake_socket.recv_json.assert_awaited() # Since it failed, there should not be any command agent. fake_agent_instance.start.assert_not_awaited() assert "Unhandled negotiation id:" in caplog.text @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_timeout(monkeypatch, caplog): """ Test the functionality of setup with incorrect negotiation message """ # --- Arrange --- fake_socket = MagicMock() fake_socket.send_json = AsyncMock() fake_socket.recv_json = AsyncMock(side_effect=asyncio.TimeoutError) # Mock context.socket to return our fake socket monkeypatch.setattr( "control_backend.agents.ri_communication_agent.context.socket", lambda _: fake_socket ) with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() fake_pub_socket = AsyncMock() # --- Act --- with caplog.at_level("WARNING"): agent = RICommunicationAgent( "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) await agent.setup(max_retries=1) # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") # Since it failed, there should not be any command agent. fake_agent_instance.start.assert_not_awaited() assert "No connection established in 20 seconds" in caplog.text # Ensure the agent did not attach a ListenBehaviour assert not any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @pytest.mark.asyncio async def test_listen_behaviour_ping_correct(caplog): fake_socket = AsyncMock() fake_socket.send_json = AsyncMock() fake_socket.recv_json = AsyncMock(return_value={"endpoint": "ping", "data": {}}) fake_pub_socket = AsyncMock() # TODO: Integration test between actual server and password needed for spade agents agent = RICommunicationAgent("test@server", "password", fake_pub_socket) agent.req_socket = fake_socket behaviour = agent.ListenBehaviour() agent.add_behaviour(behaviour) # Run once (CyclicBehaviour normally loops) with caplog.at_level("DEBUG"): await behaviour.run() fake_socket.send_json.assert_awaited() fake_socket.recv_json.assert_awaited() assert "Received message" in caplog.text @pytest.mark.asyncio async def test_listen_behaviour_ping_wrong_endpoint(caplog): """ Test if our listen behaviour can work with wrong messages (wrong endpoint) """ fake_socket = AsyncMock() fake_socket.send_json = AsyncMock() # This is a message for the wrong endpoint >:( fake_socket.recv_json = AsyncMock( return_value={ "endpoint": "negotiate/ports", "data": [ {"id": "main", "port": 5555, "bind": False}, {"id": "actuation", "port": 5556, "bind": True}, ], } ) fake_pub_socket = AsyncMock() agent = RICommunicationAgent("test@server", "password", fake_pub_socket) agent.req_socket = fake_socket behaviour = agent.ListenBehaviour() agent.add_behaviour(behaviour) # Run once (CyclicBehaviour normally loops) with caplog.at_level("INFO"): await behaviour.run() assert "Received message with topic different than ping, while ping expected." in caplog.text fake_socket.send_json.assert_awaited() fake_socket.recv_json.assert_awaited() @pytest.mark.asyncio async def test_listen_behaviour_timeout(caplog): fake_socket = AsyncMock() fake_socket.send_json = AsyncMock() # recv_json will never resolve, simulate timeout fake_socket.recv_json = AsyncMock(side_effect=asyncio.TimeoutError) fake_pub_socket = AsyncMock() agent = RICommunicationAgent("test@server", "password", fake_pub_socket) agent.req_socket = fake_socket behaviour = agent.ListenBehaviour() agent.add_behaviour(behaviour) with caplog.at_level("INFO"): await behaviour.run() assert "No ping" in caplog.text @pytest.mark.asyncio async def test_listen_behaviour_ping_no_endpoint(caplog): """ Test if our listen behaviour can work with wrong messages (wrong endpoint) """ fake_socket = AsyncMock() fake_socket.send_json = AsyncMock() # This is a message without endpoint >:( fake_socket.recv_json = AsyncMock( return_value={ "data": "I dont have an endpoint >:)", } ) fake_pub_socket = AsyncMock() agent = RICommunicationAgent("test@server", "password", fake_pub_socket) agent.req_socket = fake_socket behaviour = agent.ListenBehaviour() agent.add_behaviour(behaviour) # Run once (CyclicBehaviour normally loops) with caplog.at_level("ERROR"): await behaviour.run() assert "No received endpoint in message, excepted ping endpoint." in caplog.text fake_socket.send_json.assert_awaited() fake_socket.recv_json.assert_awaited() @pytest.mark.asyncio async def test_setup_unexpected_exception(monkeypatch, caplog): fake_socket = MagicMock() fake_socket.send_json = AsyncMock() fake_pub_socket = AsyncMock() # Simulate unexpected exception during recv_json() fake_socket.recv_json = AsyncMock(side_effect=Exception("boom!")) monkeypatch.setattr( "control_backend.agents.ri_communication_agent.context.socket", lambda _: fake_socket ) agent = RICommunicationAgent( "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) with caplog.at_level("ERROR"): await agent.setup(max_retries=1) # Ensure that the error was logged assert "Unexpected error during negotiation: boom!" in caplog.text @pytest.mark.asyncio async def test_setup_unpacking_exception(monkeypatch, caplog): # --- Arrange --- fake_socket = MagicMock() fake_socket.send_json = AsyncMock() # Make recv_json return malformed negotiation data to trigger unpacking exception malformed_data = { "endpoint": "negotiate/ports", "data": [{"id": "main"}], } # missing 'port' and 'bind' fake_socket.recv_json = AsyncMock(return_value=malformed_data) # Patch context.socket monkeypatch.setattr( "control_backend.agents.ri_communication_agent.context.socket", lambda _: fake_socket ) # Patch RICommandAgent so it won't actually start with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() fake_pub_socket = AsyncMock() agent = RICommunicationAgent( "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) # --- Act & Assert --- with caplog.at_level("ERROR"): await agent.setup(max_retries=1) # Ensure the unpacking exception was logged assert "Error unpacking negotiation data" in caplog.text # Ensure no command agent was started fake_agent_instance.start.assert_not_awaited() # Ensure no behaviour was attached assert not any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours)