import sys import mock import pytest import zmq import Queue from robot_interface.endpoints.actuation_receiver import ActuationReceiver from robot_interface.endpoints.gesture_settings import GestureTags @pytest.fixture def zmq_context(): """ A pytest fixture that creates and yields a ZMQ context. :return: An initialized ZeroMQ context. :rtype: zmq.Context """ context = zmq.Context() yield context def test_force_speech_clears_queue(mocker): """ Tests that a force speech message clears the existing queue and places the high-priority message at the front. """ mocker.patch("threading.Thread") mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_qi = mock.Mock() sys.modules["qi"] = mock_qi mock_tts_service = mock.Mock() mock_state.qi_session.service.return_value = mock_tts_service # Use Mock Context mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) receiver._message_queue.put("old_message_1") receiver._message_queue.put("old_message_2") assert receiver._message_queue.qsize() == 2 force_msg = { "endpoint": "actuate/speech", "data": "Emergency Notification", "is_priority": True, } receiver.handle_message(force_msg) assert receiver._message_queue.qsize() == 1 queued_item = receiver._message_queue.get() assert queued_item == "Emergency Notification" def test_handle_unimplemented_endpoint(mocker): """ Tests handling of unknown endpoints. """ mocker.patch("threading.Thread") # Use Mock Context mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) receiver.handle_message({ "endpoint": "some_endpoint_that_definitely_does_not_exist", "data": None, }) def test_speech_message_no_data(mocker): """ Tests that if the message data is empty, the receiver returns immediately WITHOUT attempting to access the global robot state or session. """ # 1. Prevent background threads from running mocker.patch("threading.Thread") # 2. Mock the global state object mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") # 3. Create a PropertyMock to track whenever 'qi_session' is accessed # We attach it to the class type of the mock so it acts like a real property mock_session_prop = mock.PropertyMock(return_value=None) type(mock_state).qi_session = mock_session_prop # 4. Initialize Receiver (Mocking the context to avoid ZMQ errors) mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) # 5. Send empty data receiver.handle_message({"endpoint": "actuate/speech", "data": ""}) # 6. Assertion: # Because the code does `if not text: return` BEFORE `if not state.qi_session`, # the state property should NEVER be read. mock_session_prop.assert_not_called() def test_speech_message_invalid_data(mocker): """ Tests that if the message data is not a string, the function returns. :param mocker: Description """ mocker.patch("threading.Thread") mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_session_prop = mock.PropertyMock(return_value=None) type(mock_state).qi_session = mock_session_prop # Use Mock Context mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) receiver.handle_message({"endpoint": "actuate/speech", "data": True}) # Because the code does `if not text: return` BEFORE `if not state.qi_session`, # the state property should NEVER be read. mock_session_prop.assert_not_called() def test_speech_no_qi(mocker): """ Tests the actuation receiver's behavior when processing a speech request but the global state does not have an active QI session. """ mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_qi_session = mock.PropertyMock(return_value=None) type(mock_state).qi_session = mock_qi_session mock_tts_service = mock.Mock() mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) receiver._tts_service = mock_tts_service receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) receiver._tts_service.assert_not_called() def test_speech(mocker): """ Tests the core speech actuation functionality by mocking the QI TextToSpeech service and verifying that the received message is put into the queue. """ mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_qi = mock.Mock() sys.modules["qi"] = mock_qi mock_tts_service = mock.Mock() mock_state.qi_session = mock.Mock() mock_state.qi_session.service.return_value = mock_tts_service mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) receiver._tts_service = None receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) assert receiver._message_queue.qsize() == 1 queued_item = receiver._message_queue.get() assert queued_item == "Some message to speak." def test_speech_priority(mocker): """ Tests that a priority speech message is handled correctly by clearing the queue and placing the priority message at the front. """ mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_qi = mock.Mock() sys.modules["qi"] = mock_qi mock_tts_service = mock.Mock() mock_state.qi_session = mock.Mock() mock_state.qi_session.service.return_value = mock_tts_service mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) receiver._message_queue.put("old_message_1") receiver._message_queue.put("old_message_2") assert receiver._message_queue.qsize() == 2 priority_msg = { "endpoint": "actuate/speech", "data": "Urgent Message", "is_priority": True, } receiver._handle_speech(priority_msg) assert receiver._message_queue.qsize() == 1 queued_item = receiver._message_queue.get() assert queued_item == "Urgent Message" def test_handle_messages_loop(mocker): """ Tests the background consumer loop (_handle_messages) processing an item. Runs SYNCHRONOUSLY to ensure coverage tools pick up the lines. """ # Patch Thread so the real background thread NEVER starts automatically mocker.patch("threading.Thread") # Mock state mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") # Setup initial speaking state to False (covers "Started speaking" print) mock_state.is_speaking = False # Mock the TextToSpeech service mock_tts_service = mock.Mock() mock_state.qi_session.service.return_value = mock_tts_service # Initialize receiver (Thread is patched, so no thread starts) # Use Mock Context to avoid ZMQ errors mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) # Manually inject service (since lazy loading might handle it, but this is safer) receiver._tts_service = mock_tts_service # This ensures the while loop iterates exactly once mock_state.exit_event.is_set.side_effect = [False, True] # Put an item in the queue receiver._message_queue.put("Hello World") # RUN MANUALLY in the main thread # This executes the code: while -> try -> get -> if print -> speaking=True -> say receiver._handle_messages() # Assertions assert receiver._message_queue.empty() mock_tts_service.say.assert_called_with("Hello World") assert mock_state.is_speaking is True def test_handle_messages_queue_empty(mocker): """ Tests the Queue.Empty exception handler in the consumer loop. This covers the logic that resets 'state.is_speaking' to False. """ # Prevent the real background thread from starting mocker.patch("threading.Thread") # Mock the state object mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") # Setup 'is_speaking' property mock # We set return_value=True so the code enters the 'if state.is_speaking:' block. # We use PropertyMock to track when this attribute is set. type(mock_state).is_speaking = True mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) # This ensures the while loop body runs exactly once for our test mock_state.exit_event.is_set.side_effect = [False, True] # Force get() to raise Queue.Empty immediately (simulate timeout) # We patch the 'get' method on the specific queue instance of our receiver #mocker.patch.object(receiver._message_queue, 'get', side_effect=Queue.Empty) # Run the loop logic manually (synchronously) receiver._handle_messages() # Final Assertion: Verify is_speaking was set to False # The code execution order is: read (returns True) -> print -> set (to False) # assert_called_with checks the arguments of the LAST call, which is the setter. assert mock_state.is_speaking is False def test_handle_messages_runtime_error(mocker): """ Tests the RuntimeError exception handler (e.g. lost WiFi connection). Uses a Mock ZMQ context to avoid 'Address already in use' errors. """ # Patch Thread so we don't accidentally spawn real threads mocker.patch("threading.Thread") # Mock the state and logging mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") # Use a MOCK ZMQ context. # This prevents the receiver from trying to bind to a real TCP port. mock_zmq_ctx = mock.Mock() # Initialize receiver with the mock context receiver = ActuationReceiver(mock_zmq_ctx) mock_state.exit_event.is_set.side_effect = [False, True] receiver._message_queue.put("Test Message") # Setup: ...BUT the service raises RuntimeError when asked to speak mock_tts = mock.Mock() mock_tts.say.side_effect = RuntimeError("Connection lost") receiver._tts_service = mock_tts # Run the loop logic manually receiver._handle_messages() # Assertions assert mock_state.exit_event.is_set.called def test_clear_queue(mocker): """ Tests that the clear_queue method properly drains all items from the message queue. """ mocker.patch("threading.Thread") # Use Mock Context mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) # Populate the queue with multiple items receiver._message_queue.put("msg1") receiver._message_queue.put("msg2") receiver._message_queue.put("msg3") assert receiver._message_queue.qsize() == 3 # Clear the queue receiver.clear_queue() # Assert the queue is empty assert receiver._message_queue.qsize() == 0 def test_gesture_no_data(zmq_context, mocker): receiver = ActuationReceiver(zmq_context) receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": ""}, True) # Just ensuring no crash def test_gesture_invalid_data(zmq_context, mocker): receiver = ActuationReceiver(zmq_context) receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": 123}, True) # No crash expected def test_gesture_single_not_found(zmq_context, mocker): mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags.single_gestures = ["wave", "bow"] # allowed single gestures receiver = ActuationReceiver(zmq_context) receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "unknown_gesture"}, True) # No crash expected def test_gesture_tag_not_found(zmq_context, mocker): mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags.tags = ["happy", "sad"] receiver = ActuationReceiver(zmq_context) receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "not_a_tag"}, False) # No crash expected def test_gesture_no_qi_session(zmq_context, mocker): mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_state.qi_session = None mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags.single_gestures = ["hello"] receiver = ActuationReceiver(zmq_context) receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "hello"}, True) # No crash, path returns early def test_gesture_single_success(zmq_context, mocker): mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_qi = mock.Mock() sys.modules["qi"] = mock_qi # Setup gesture settings mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags.single_gestures = ["wave"] mock_animation_service = mock.Mock() mock_state.qi_session = mock.Mock() mock_state.qi_session.service.return_value = mock_animation_service receiver = ActuationReceiver(zmq_context) receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "wave"}, True) mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer") getattr(mock_qi, "async").assert_called_once() assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run assert getattr(mock_qi, "async").call_args[0][1] == "wave" def test_gesture_tag_success(zmq_context, mocker): mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_qi = mock.Mock() sys.modules["qi"] = mock_qi mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags.tags = ["greeting"] mock_animation_service = mock.Mock() mock_state.qi_session = mock.Mock() mock_state.qi_session.service.return_value = mock_animation_service receiver = ActuationReceiver(zmq_context) receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "greeting"}, False) mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer") getattr(mock_qi, "async").assert_called_once() assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.runTag assert getattr(mock_qi, "async").call_args[0][1] == "greeting" def test_handle_message_all_routes(zmq_context, mocker): """ Ensures all handle_message endpoint branches route correctly. """ receiver = ActuationReceiver(zmq_context) mock_speech = mocker.patch.object(receiver, "_handle_speech") mock_gesture = mocker.patch.object(receiver, "_handle_gesture") receiver.handle_message({"endpoint": "actuate/speech", "data": "hi"}) receiver.handle_message({"endpoint": "actuate/gesture/tag", "data": "greeting"}) receiver.handle_message({"endpoint": "actuate/gesture/single", "data": "wave"}) mock_speech.assert_called_once() assert mock_gesture.call_count == 2 def test_endpoint_description(zmq_context, mocker): mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags.tags = ["happy"] mock_tags.single_gestures = ["wave"] receiver = ActuationReceiver(zmq_context) desc = receiver.endpoint_description() assert "gestures" in desc assert desc["gestures"] == ["happy"] assert "single_gestures" in desc assert desc["single_gestures"] == ["wave"] def test_gesture_single_real_gesturetags(zmq_context, mocker): """ Uses the real GestureTags (no mocking) to ensure the receiver references GestureTags.single_gestures correctly. """ # Ensure qi session exists so we pass the early return mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_state.qi_session = mock.Mock() # Mock qi.async to avoid real async calls mock_qi = mock.Mock() sys.modules["qi"] = mock_qi # Mock animation service mock_animation_service = mock.Mock() mock_state.qi_session.service.return_value = mock_animation_service receiver = ActuationReceiver(zmq_context) # Pick a real gesture from GestureTags.single_gestures assert len(GestureTags.single_gestures) > 0, "GestureTags.single_gestures must not be empty" gesture = GestureTags.single_gestures[0] receiver._handle_gesture( {"endpoint": "actuate/gesture/single", "data": gesture}, is_single=True, ) mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer") getattr(mock_qi, "async").assert_called_once() assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run assert getattr(mock_qi, "async").call_args[0][1] == gesture