import json from unittest.mock import AsyncMock, MagicMock import pytest import zmq from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent from control_backend.core.agent_system import InternalMessage from control_backend.schemas.ri_message import RIEndpoint @pytest.fixture def zmq_context(mocker): """Mock the ZMQ context.""" mock_context = mocker.patch( "control_backend.agents.actuation.robot_gesture_agent.azmq.Context.instance" ) mock_context.return_value = MagicMock() return mock_context @pytest.mark.asyncio async def test_setup_bind(zmq_context, mocker): """Setup binds and subscribes to internal commands.""" fake_socket = zmq_context.return_value.socket.return_value agent = RobotGestureAgent("robot_gesture", address="tcp://localhost:5556", bind=True) settings = mocker.patch("control_backend.agents.actuation.robot_gesture_agent.settings") settings.zmq_settings.internal_sub_address = "tcp://internal:1234" agent.add_behavior = MagicMock() await agent.setup() # Check PUB socket binding fake_socket.bind.assert_any_call("tcp://localhost:5556") # Check REP socket binding fake_socket.bind.assert_called() # Check SUB socket connection and subscriptions fake_socket.connect.assert_any_call("tcp://internal:1234") fake_socket.setsockopt.assert_any_call(zmq.SUBSCRIBE, b"command") fake_socket.setsockopt.assert_any_call(zmq.SUBSCRIBE, b"send_gestures") # Check behavior was added (twice: once for command loop, once for fetch gestures loop) assert agent.add_behavior.call_count == 2 @pytest.mark.asyncio async def test_setup_connect(zmq_context, mocker): """Setup connects when bind=False.""" fake_socket = zmq_context.return_value.socket.return_value agent = RobotGestureAgent("robot_gesture", address="tcp://localhost:5556", bind=False) settings = mocker.patch("control_backend.agents.actuation.robot_gesture_agent.settings") settings.zmq_settings.internal_sub_address = "tcp://internal:1234" agent.add_behavior = MagicMock() await agent.setup() # Check PUB socket connection (not binding) fake_socket.connect.assert_any_call("tcp://localhost:5556") fake_socket.connect.assert_any_call("tcp://internal:1234") # Check REP socket binding (always binds) fake_socket.bind.assert_called() # Check behavior was added (twice) assert agent.add_behavior.call_count == 2 @pytest.mark.asyncio async def test_handle_message_sends_valid_gesture_command(): """Internal message with valid gesture tag is forwarded to robot pub socket.""" pubsocket = AsyncMock() agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="") agent.pubsocket = pubsocket payload = { "endpoint": RIEndpoint.GESTURE_TAG, "data": "hello", # "hello" is in gesture_data } msg = InternalMessage(to="robot", sender="tester", body=json.dumps(payload)) await agent.handle_message(msg) pubsocket.send_json.assert_awaited_once() @pytest.mark.asyncio async def test_handle_message_sends_non_gesture_command(): """Internal message with non-gesture endpoint is not forwarded by this agent.""" pubsocket = AsyncMock() agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="") agent.pubsocket = pubsocket payload = {"endpoint": "some_other_endpoint", "data": "invalid_tag_not_in_list"} msg = InternalMessage(to="robot", sender="tester", body=json.dumps(payload)) await agent.handle_message(msg) # Non-gesture endpoints should not be forwarded by this agent pubsocket.send_json.assert_not_awaited() @pytest.mark.asyncio async def test_handle_message_rejects_invalid_gesture_tag(): """Internal message with invalid gesture tag is not forwarded.""" pubsocket = AsyncMock() agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="") agent.pubsocket = pubsocket # Use a tag that's not in gesture_data payload = {"endpoint": RIEndpoint.GESTURE_TAG, "data": "invalid_tag_not_in_list"} msg = InternalMessage(to="robot", sender="tester", body=json.dumps(payload)) await agent.handle_message(msg) pubsocket.send_json.assert_not_awaited() @pytest.mark.asyncio async def test_handle_message_invalid_payload(): """Invalid payload is caught and does not send.""" pubsocket = AsyncMock() agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="") agent.pubsocket = pubsocket msg = InternalMessage(to="robot", sender="tester", body=json.dumps({"bad": "data"})) await agent.handle_message(msg) pubsocket.send_json.assert_not_awaited() @pytest.mark.asyncio async def test_zmq_command_loop_valid_gesture_payload(): """UI command with valid gesture tag is read from SUB and published.""" command = {"endpoint": RIEndpoint.GESTURE_TAG, "data": "hello"} fake_socket = AsyncMock() async def recv_once(): # stop after first iteration agent._running = False return b"command", json.dumps(command).encode("utf-8") fake_socket.recv_multipart = recv_once fake_socket.send_json = AsyncMock() agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="") agent.subsocket = fake_socket agent.pubsocket = fake_socket agent._running = True await agent._zmq_command_loop() fake_socket.send_json.assert_awaited_once() @pytest.mark.asyncio async def test_zmq_command_loop_valid_non_gesture_payload(): """UI command with non-gesture endpoint is not forwarded by this agent.""" command = {"endpoint": "some_other_endpoint", "data": "anything"} fake_socket = AsyncMock() async def recv_once(): agent._running = False return b"command", json.dumps(command).encode("utf-8") fake_socket.recv_multipart = recv_once fake_socket.send_json = AsyncMock() agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="") agent.subsocket = fake_socket agent.pubsocket = fake_socket agent._running = True await agent._zmq_command_loop() fake_socket.send_json.assert_not_awaited() @pytest.mark.asyncio async def test_zmq_command_loop_invalid_gesture_tag(): """UI command with invalid gesture tag is not forwarded.""" command = {"endpoint": RIEndpoint.GESTURE_TAG, "data": "invalid_tag_not_in_list"} fake_socket = AsyncMock() async def recv_once(): agent._running = False return b"command", json.dumps(command).encode("utf-8") fake_socket.recv_multipart = recv_once fake_socket.send_json = AsyncMock() agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="") agent.subsocket = fake_socket agent.pubsocket = fake_socket agent._running = True await agent._zmq_command_loop() fake_socket.send_json.assert_not_awaited() @pytest.mark.asyncio async def test_zmq_command_loop_invalid_json(): """Invalid JSON is ignored without sending.""" fake_socket = AsyncMock() async def recv_once(): agent._running = False return b"command", b"{not_json}" fake_socket.recv_multipart = recv_once fake_socket.send_json = AsyncMock() agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="") agent.subsocket = fake_socket agent.pubsocket = fake_socket agent._running = True await agent._zmq_command_loop() fake_socket.send_json.assert_not_awaited() @pytest.mark.asyncio async def test_zmq_command_loop_ignores_send_gestures_topic(): """send_gestures topic is ignored in command loop.""" fake_socket = AsyncMock() async def recv_once(): agent._running = False return b"send_gestures", b"{}" fake_socket.recv_multipart = recv_once fake_socket.send_json = AsyncMock() agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="") agent.subsocket = fake_socket agent.pubsocket = fake_socket agent._running = True await agent._zmq_command_loop() fake_socket.send_json.assert_not_awaited() @pytest.mark.asyncio async def test_fetch_gestures_loop_without_amount(): """Fetch gestures request without amount returns all tags.""" fake_repsocket = AsyncMock() async def recv_once(): agent._running = False return b"{}" # Empty JSON request fake_repsocket.recv = recv_once fake_repsocket.send = AsyncMock() agent = RobotGestureAgent( "robot_gesture", gesture_data=["hello", "yes", "no", "wave", "point"], address="" ) agent.repsocket = fake_repsocket agent._running = True await agent._fetch_gestures_loop() fake_repsocket.send.assert_awaited_once() # Check the response contains all tags args, kwargs = fake_repsocket.send.call_args response = json.loads(args[0]) assert "tags" in response assert response["tags"] == ["hello", "yes", "no", "wave", "point"] @pytest.mark.asyncio async def test_fetch_gestures_loop_with_amount(): """Fetch gestures request with amount returns limited tags.""" fake_repsocket = AsyncMock() amount = 3 async def recv_once(): agent._running = False return json.dumps(amount).encode() fake_repsocket.recv = recv_once fake_repsocket.send = AsyncMock() agent = RobotGestureAgent( "robot_gesture", gesture_data=["hello", "yes", "no", "wave", "point"], address="" ) agent.repsocket = fake_repsocket agent._running = True await agent._fetch_gestures_loop() fake_repsocket.send.assert_awaited_once() args, kwargs = fake_repsocket.send.call_args response = json.loads(args[0]) assert "tags" in response assert len(response["tags"]) == amount assert response["tags"] == ["hello", "yes", "no"] @pytest.mark.asyncio async def test_fetch_gestures_loop_with_integer_request(): """Fetch gestures request with integer amount.""" fake_repsocket = AsyncMock() amount = 2 async def recv_once(): agent._running = False return json.dumps(amount).encode() fake_repsocket.recv = recv_once fake_repsocket.send = AsyncMock() agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="") agent.repsocket = fake_repsocket agent._running = True await agent._fetch_gestures_loop() fake_repsocket.send.assert_awaited_once() args, kwargs = fake_repsocket.send.call_args response = json.loads(args[0]) assert response["tags"] == ["hello", "yes"] @pytest.mark.asyncio async def test_fetch_gestures_loop_with_invalid_json(): """Invalid JSON request returns all tags.""" fake_repsocket = AsyncMock() async def recv_once(): agent._running = False return b"not_json" fake_repsocket.recv = recv_once fake_repsocket.send = AsyncMock() agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="") agent.repsocket = fake_repsocket agent._running = True await agent._fetch_gestures_loop() fake_repsocket.send.assert_awaited_once() args, kwargs = fake_repsocket.send.call_args response = json.loads(args[0]) assert response["tags"] == ["hello", "yes", "no"] @pytest.mark.asyncio async def test_fetch_gestures_loop_with_non_integer_json(): """Non-integer JSON request returns all tags.""" fake_repsocket = AsyncMock() async def recv_once(): agent._running = False return json.dumps({"not": "an_integer"}).encode() fake_repsocket.recv = recv_once fake_repsocket.send = AsyncMock() agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="") agent.repsocket = fake_repsocket agent._running = True await agent._fetch_gestures_loop() fake_repsocket.send.assert_awaited_once() args, kwargs = fake_repsocket.send.call_args response = json.loads(args[0]) assert response["tags"] == ["hello", "yes", "no"] def test_gesture_data_attribute(): """Test that gesture_data returns the expected list.""" gesture_data = ["hello", "yes", "no", "wave"] agent = RobotGestureAgent("robot_gesture", gesture_data=gesture_data, address="") assert agent.gesture_data == gesture_data assert isinstance(agent.gesture_data, list) assert len(agent.gesture_data) == 4 assert "hello" in agent.gesture_data assert "yes" in agent.gesture_data assert "no" in agent.gesture_data assert "invalid_tag_not_in_list" not in agent.gesture_data @pytest.mark.asyncio async def test_stop_closes_sockets(): """Stop method closes all sockets.""" pubsocket = MagicMock() subsocket = MagicMock() repsocket = MagicMock() agent = RobotGestureAgent("robot_gesture", address="") agent.pubsocket = pubsocket agent.subsocket = subsocket agent.repsocket = repsocket await agent.stop() pubsocket.close.assert_called_once() subsocket.close.assert_called_once() # Note: repsocket is not closed in stop() method, but you might want to add it # repsocket.close.assert_called_once() @pytest.mark.asyncio async def test_initialization_with_custom_gesture_data(): """Agent can be initialized with custom gesture data.""" custom_gestures = ["custom1", "custom2", "custom3"] agent = RobotGestureAgent("robot_gesture", gesture_data=custom_gestures, address="") assert agent.gesture_data == custom_gestures @pytest.mark.asyncio async def test_fetch_gestures_loop_handles_exception(): """Exception in fetch gestures loop is caught and logged.""" fake_repsocket = AsyncMock() async def recv_once(): agent._running = False raise Exception("Test exception") fake_repsocket.recv = recv_once fake_repsocket.send = AsyncMock() agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="") agent.repsocket = fake_repsocket agent.logger = MagicMock() agent._running = True # Should not raise exception await agent._fetch_gestures_loop() # Exception should be logged agent.logger.exception.assert_called_once()