import asyncio from unittest.mock import ANY, AsyncMock, MagicMock, patch import pytest from control_backend.agents.ri_communication_agent import RICommunicationAgent def fake_json_correct_negototiate_1(): return AsyncMock( return_value={ "endpoint": "negotiate/ports", "data": [ {"id": "main", "port": 5555, "bind": False}, {"id": "actuation", "port": 5556, "bind": True}, ], } ) def fake_json_correct_negototiate_2(): return AsyncMock( return_value={ "endpoint": "negotiate/ports", "data": [ {"id": "main", "port": 5555, "bind": False}, {"id": "actuation", "port": 5557, "bind": True}, ], } ) def fake_json_correct_negototiate_3(): return AsyncMock( return_value={ "endpoint": "negotiate/ports", "data": [ {"id": "main", "port": 5555, "bind": True}, {"id": "actuation", "port": 5557, "bind": True}, ], } ) def fake_json_correct_negototiate_4(): # Different port, do bind return AsyncMock( return_value={ "endpoint": "negotiate/ports", "data": [ {"id": "main", "port": 4555, "bind": True}, {"id": "actuation", "port": 5557, "bind": True}, ], } ) def fake_json_correct_negototiate_5(): # Different port, dont bind. return AsyncMock( return_value={ "endpoint": "negotiate/ports", "data": [ {"id": "main", "port": 4555, "bind": False}, {"id": "actuation", "port": 5557, "bind": True}, ], } ) def fake_json_wrong_negototiate_1(): return AsyncMock(return_value={"endpoint": "ping", "data": ""}) def fake_json_invalid_id_negototiate(): return AsyncMock( return_value={ "endpoint": "negotiate/ports", "data": [ {"id": "banana", "port": 4555, "bind": False}, {"id": "tomato", "port": 5557, "bind": True}, ], } ) @pytest.fixture def zmq_context(mocker): mock_context = mocker.patch("control_backend.agents.vad_agent.azmq.Context.instance") mock_context.return_value = MagicMock() return mock_context @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_1(zmq_context): """ Test the setup of the communication agent """ # --- Arrange --- fake_socket = zmq_context.return_value.socket.return_value fake_socket.send_json = AsyncMock() fake_socket.recv_json = fake_json_correct_negototiate_1() fake_socket.send_multipart = AsyncMock() # Mock RICommandAgent agent startup with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() # --- Act --- agent = RICommunicationAgent( "test@server", "password", address="tcp://localhost:5555", bind=False, ) await agent.setup() # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": {}}) fake_socket.recv_json.assert_awaited() fake_agent_instance.start.assert_awaited() MockCommandAgent.assert_called_once_with( ANY, # Server Name ANY, # Server Password address="tcp://*:5556", # derived from the 'port' value in negotiation bind=True, ) # Ensure the agent attached a ListenBehaviour assert any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_2(zmq_context): """ Test the setup of the communication agent """ # --- Arrange --- fake_socket = zmq_context.return_value.socket.return_value fake_socket.send_json = AsyncMock() fake_socket.recv_json = fake_json_correct_negototiate_2() fake_socket.send_multipart = AsyncMock() # Mock RICommandAgent agent startup with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() # --- Act --- agent = RICommunicationAgent( "test@server", "password", address="tcp://localhost:5555", bind=False, ) await agent.setup() # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": {}}) fake_socket.recv_json.assert_awaited() fake_agent_instance.start.assert_awaited() MockCommandAgent.assert_called_once_with( ANY, # Server Name ANY, # Server Password address="tcp://*:5557", # derived from the 'port' value in negotiation bind=True, ) # Ensure the agent attached a ListenBehaviour assert any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_3(zmq_context): """ Test the functionality of setup with incorrect negotiation message """ # --- Arrange --- fake_socket = zmq_context.return_value.socket.return_value fake_socket.send_json = AsyncMock() fake_socket.recv_json = fake_json_wrong_negototiate_1() fake_socket.send_multipart = AsyncMock() # Mock RICommandAgent agent startup # We are sending wrong negotiation info to the communication agent, # so we should retry and expect a better response, within a limited time. with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() # --- Act --- agent = RICommunicationAgent( "test@server", "password", address="tcp://localhost:5555", bind=False, ) await agent.setup(max_retries=1) # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") fake_socket.recv_json.assert_awaited() # Since it failed, there should not be any command agent. fake_agent_instance.start.assert_not_awaited() # Ensure the agent did not attach a ListenBehaviour assert not any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_4(zmq_context): """ Test the setup of the communication agent with different bind value """ # --- Arrange --- fake_socket = zmq_context.return_value.socket.return_value fake_socket.send_json = AsyncMock() fake_socket.recv_json = fake_json_correct_negototiate_3() fake_socket.send_multipart = AsyncMock() # Mock RICommandAgent agent startup with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() # --- Act --- agent = RICommunicationAgent( "test@server", "password", address="tcp://localhost:5555", bind=True, ) await agent.setup() # --- Assert --- fake_socket.bind.assert_any_call("tcp://localhost:5555") fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": {}}) fake_socket.recv_json.assert_awaited() fake_agent_instance.start.assert_awaited() MockCommandAgent.assert_called_once_with( ANY, # Server Name ANY, # Server Password address="tcp://*:5557", # derived from the 'port' value in negotiation bind=True, ) # Ensure the agent attached a ListenBehaviour assert any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_5(zmq_context): """ Test the setup of the communication agent """ # --- Arrange --- fake_socket = zmq_context.return_value.socket.return_value fake_socket.send_json = AsyncMock() fake_socket.recv_json = fake_json_correct_negototiate_4() fake_socket.send_multipart = AsyncMock() # Mock RICommandAgent agent startup with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() # --- Act --- agent = RICommunicationAgent( "test@server", "password", address="tcp://localhost:5555", bind=False, ) await agent.setup() # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": {}}) fake_socket.recv_json.assert_awaited() fake_agent_instance.start.assert_awaited() MockCommandAgent.assert_called_once_with( ANY, # Server Name ANY, # Server Password address="tcp://*:5557", # derived from the 'port' value in negotiation bind=True, ) # Ensure the agent attached a ListenBehaviour assert any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_6(zmq_context): """ Test the setup of the communication agent """ # --- Arrange --- fake_socket = zmq_context.return_value.socket.return_value fake_socket.send_json = AsyncMock() fake_socket.recv_json = fake_json_correct_negototiate_5() fake_socket.send_multipart = AsyncMock() # Mock RICommandAgent agent startup with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() # --- Act --- agent = RICommunicationAgent( "test@server", "password", address="tcp://localhost:5555", bind=False, ) await agent.setup() # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": {}}) fake_socket.recv_json.assert_awaited() fake_agent_instance.start.assert_awaited() MockCommandAgent.assert_called_once_with( ANY, # Server Name ANY, # Server Password address="tcp://*:5557", # derived from the 'port' value in negotiation bind=True, ) # Ensure the agent attached a ListenBehaviour assert any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_7(zmq_context): """ Test the functionality of setup with incorrect id """ # --- Arrange --- fake_socket = zmq_context.return_value.socket.return_value fake_socket.send_json = AsyncMock() fake_socket.recv_json = fake_json_invalid_id_negototiate() fake_socket.send_multipart = AsyncMock() # Mock RICommandAgent agent startup # We are sending wrong negotiation info to the communication agent, # so we should retry and expect a better response, within a limited time. with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() # --- Act --- agent = RICommunicationAgent( "test@server", "password", address="tcp://localhost:5555", bind=False, ) await agent.setup(max_retries=1) # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") fake_socket.recv_json.assert_awaited() # Since it failed, there should not be any command agent. fake_agent_instance.start.assert_not_awaited() @pytest.mark.asyncio async def test_setup_creates_socket_and_negotiate_timeout(zmq_context): """ Test the functionality of setup with incorrect negotiation message """ # --- Arrange --- fake_socket = zmq_context.return_value.socket.return_value fake_socket.send_json = AsyncMock() fake_socket.recv_json = AsyncMock(side_effect=asyncio.TimeoutError) fake_socket.send_multipart = AsyncMock() with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() # --- Act --- agent = RICommunicationAgent( "test@server", "password", address="tcp://localhost:5555", bind=False, ) await agent.setup(max_retries=1) # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") # Since it failed, there should not be any command agent. fake_agent_instance.start.assert_not_awaited() # Ensure the agent did not attach a ListenBehaviour assert not any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @pytest.mark.asyncio async def test_listen_behaviour_ping_correct(): fake_socket = AsyncMock() fake_socket.send_json = AsyncMock() fake_socket.recv_json = AsyncMock(return_value={"endpoint": "ping", "data": {}}) fake_socket.send_multipart = AsyncMock() agent = RICommunicationAgent("test@server", "password") agent._req_socket = fake_socket agent.connected = True behaviour = agent.ListenBehaviour() agent.add_behaviour(behaviour) await behaviour.run() fake_socket.send_json.assert_awaited() fake_socket.recv_json.assert_awaited() @pytest.mark.asyncio async def test_listen_behaviour_ping_wrong_endpoint(): """ Test if our listen behaviour can work with wrong messages (wrong endpoint) """ fake_socket = AsyncMock() fake_socket.send_json = AsyncMock() # This is a message for the wrong endpoint >:( fake_socket.recv_json = AsyncMock( return_value={ "endpoint": "negotiate/ports", "data": [ {"id": "main", "port": 5555, "bind": False}, {"id": "actuation", "port": 5556, "bind": True}, ], } ) fake_pub_socket = AsyncMock() agent = RICommunicationAgent("test@server", "password", fake_pub_socket) agent._req_socket = fake_socket agent.connected = True behaviour = agent.ListenBehaviour() agent.add_behaviour(behaviour) # Run once (CyclicBehaviour normally loops) await behaviour.run() fake_socket.send_json.assert_awaited() fake_socket.recv_json.assert_awaited() @pytest.mark.asyncio async def test_listen_behaviour_timeout(zmq_context): fake_socket = zmq_context.return_value.socket.return_value fake_socket.send_json = AsyncMock() # recv_json will never resolve, simulate timeout fake_socket.recv_json = AsyncMock(side_effect=asyncio.TimeoutError) fake_socket.send_multipart = AsyncMock() agent = RICommunicationAgent("test@server", "password") agent._req_socket = fake_socket agent.connected = True behaviour = agent.ListenBehaviour() agent.add_behaviour(behaviour) await behaviour.run() assert not any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) assert not agent.connected @pytest.mark.asyncio async def test_listen_behaviour_ping_no_endpoint(): """ Test if our listen behaviour can work with wrong messages (wrong endpoint) """ fake_socket = AsyncMock() fake_socket.send_json = AsyncMock() fake_socket.send_multipart = AsyncMock() # This is a message without endpoint >:( fake_socket.recv_json = AsyncMock( return_value={ "data": "I dont have an endpoint >:)", } ) agent = RICommunicationAgent("test@server", "password") agent._req_socket = fake_socket agent.connected = True behaviour = agent.ListenBehaviour() agent.add_behaviour(behaviour) await behaviour.run() fake_socket.send_json.assert_awaited() fake_socket.recv_json.assert_awaited() @pytest.mark.asyncio async def test_setup_unexpected_exception(zmq_context): fake_socket = zmq_context.return_value.socket.return_value fake_socket.send_json = AsyncMock() # Simulate unexpected exception during recv_json() fake_socket.recv_json = AsyncMock(side_effect=Exception("boom!")) fake_socket.send_multipart = AsyncMock() agent = RICommunicationAgent( "test@server", "password", address="tcp://localhost:5555", bind=False, ) await agent.setup(max_retries=1) assert not any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) assert not agent.connected @pytest.mark.asyncio async def test_setup_unpacking_exception(zmq_context): # --- Arrange --- fake_socket = zmq_context.return_value.socket.return_value fake_socket.send_json = AsyncMock() fake_socket.send_multipart = AsyncMock() # Make recv_json return malformed negotiation data to trigger unpacking exception malformed_data = { "endpoint": "negotiate/ports", "data": [{"id": "main"}], } # missing 'port' and 'bind' fake_socket.recv_json = AsyncMock(return_value=malformed_data) # Patch RICommandAgent so it won't actually start with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() agent = RICommunicationAgent( "test@server", "password", address="tcp://localhost:5555", bind=False, ) # --- Act & Assert --- await agent.setup(max_retries=1) # Ensure no command agent was started fake_agent_instance.start.assert_not_awaited() # Ensure no behaviour was attached assert not any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours)