import sys import mock import pytest import zmq import Queue from robot_interface.endpoints.actuation_receiver import ActuationReceiver @pytest.fixture def zmq_context(): """ A pytest fixture that creates and yields a ZMQ context. :return: An initialized ZeroMQ context. :rtype: zmq.Context """ context = zmq.Context() yield context def test_force_speech_clears_queue(mocker): """ Tests that a force speech message clears the existing queue and places the high-priority message at the front. """ mocker.patch("threading.Thread") mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_qi = mock.Mock() sys.modules["qi"] = mock_qi mock_tts_service = mock.Mock() mock_state.qi_session.service.return_value = mock_tts_service # Use Mock Context mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) receiver._message_queue.put("old_message_1") receiver._message_queue.put("old_message_2") assert receiver._message_queue.qsize() == 2 force_msg = { "endpoint": "actuate/speech", "data": "Emergency Notification", "is_priority": True, } receiver.handle_message(force_msg) assert receiver._message_queue.qsize() == 1 queued_item = receiver._message_queue.get() assert queued_item == "Emergency Notification" def test_handle_unimplemented_endpoint(mocker): """ Tests handling of unknown endpoints. """ mocker.patch("threading.Thread") # Use Mock Context mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) receiver.handle_message({ "endpoint": "some_endpoint_that_definitely_does_not_exist", "data": None, }) def test_speech_message_no_data(mocker): """ Tests that if the message data is empty, the receiver returns immediately WITHOUT attempting to access the global robot state or session. """ # 1. Prevent background threads from running mocker.patch("threading.Thread") # 2. Mock the global state object mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") # 3. Create a PropertyMock to track whenever 'qi_session' is accessed # We attach it to the class type of the mock so it acts like a real property mock_session_prop = mock.PropertyMock(return_value=None) type(mock_state).qi_session = mock_session_prop # 4. Initialize Receiver (Mocking the context to avoid ZMQ errors) mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) # 5. Send empty data receiver.handle_message({"endpoint": "actuate/speech", "data": ""}) # 6. Assertion: # Because the code does `if not text: return` BEFORE `if not state.qi_session`, # the state property should NEVER be read. mock_session_prop.assert_not_called() def test_speech_message_invalid_data(mocker): """ Tests that if the message data is not a string, the function returns. :param mocker: Description """ mocker.patch("threading.Thread") mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_session_prop = mock.PropertyMock(return_value=None) type(mock_state).qi_session = mock_session_prop # Use Mock Context mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) receiver.handle_message({"endpoint": "actuate/speech", "data": True}) # Because the code does `if not text: return` BEFORE `if not state.qi_session`, # the state property should NEVER be read. mock_session_prop.assert_not_called() def test_speech_no_qi(mocker): """ Tests the actuation receiver's behavior when processing a speech request but the global state does not have an active QI session. """ mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_qi_session = mock.PropertyMock(return_value=None) type(mock_state).qi_session = mock_qi_session mock_tts_service = mock.Mock() mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) receiver._tts_service = mock_tts_service receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) receiver._tts_service.assert_not_called() def test_speech(mocker): """ Tests the core speech actuation functionality by mocking the QI TextToSpeech service and verifying that the received message is put into the queue. """ mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_qi = mock.Mock() sys.modules["qi"] = mock_qi mock_tts_service = mock.Mock() mock_state.qi_session = mock.Mock() mock_state.qi_session.service.return_value = mock_tts_service mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) receiver._tts_service = None receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) assert receiver._message_queue.qsize() == 1 queued_item = receiver._message_queue.get() assert queued_item == "Some message to speak." def test_speech_priority(mocker): """ Tests that a priority speech message is handled correctly by clearing the queue and placing the priority message at the front. """ mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_qi = mock.Mock() sys.modules["qi"] = mock_qi mock_tts_service = mock.Mock() mock_state.qi_session = mock.Mock() mock_state.qi_session.service.return_value = mock_tts_service mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) receiver._message_queue.put("old_message_1") receiver._message_queue.put("old_message_2") assert receiver._message_queue.qsize() == 2 priority_msg = { "endpoint": "actuate/speech", "data": "Urgent Message", "is_priority": True, } receiver._handle_speech(priority_msg) assert receiver._message_queue.qsize() == 1 queued_item = receiver._message_queue.get() assert queued_item == "Urgent Message" def test_handle_messages_loop(mocker): """ Tests the background consumer loop (_handle_messages) processing an item. Runs SYNCHRONOUSLY to ensure coverage tools pick up the lines. """ # Patch Thread so the real background thread NEVER starts automatically mocker.patch("threading.Thread") # Mock state mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") # Setup initial speaking state to False (covers "Started speaking" print) mock_state.is_speaking = False # Mock the TextToSpeech service mock_tts_service = mock.Mock() mock_state.qi_session.service.return_value = mock_tts_service # Initialize receiver (Thread is patched, so no thread starts) # Use Mock Context to avoid ZMQ errors mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) # Manually inject service (since lazy loading might handle it, but this is safer) receiver._tts_service = mock_tts_service # This ensures the while loop iterates exactly once mock_state.exit_event.is_set.side_effect = [False, True] # Put an item in the queue receiver._message_queue.put("Hello World") # RUN MANUALLY in the main thread # This executes the code: while -> try -> get -> if print -> speaking=True -> say receiver._handle_messages() # Assertions assert receiver._message_queue.empty() mock_tts_service.say.assert_called_with("Hello World") assert mock_state.is_speaking is True def test_handle_messages_queue_empty(mocker): """ Tests the Queue.Empty exception handler in the consumer loop. This covers the logic that resets 'state.is_speaking' to False. """ # Prevent the real background thread from starting mocker.patch("threading.Thread") # Mock the state object mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") # Setup 'is_speaking' property mock # We set return_value=True so the code enters the 'if state.is_speaking:' block. # We use PropertyMock to track when this attribute is set. type(mock_state).is_speaking = True mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) # This ensures the while loop body runs exactly once for our test mock_state.exit_event.is_set.side_effect = [False, True] # Force get() to raise Queue.Empty immediately (simulate timeout) # We patch the 'get' method on the specific queue instance of our receiver #mocker.patch.object(receiver._message_queue, 'get', side_effect=Queue.Empty) # Run the loop logic manually (synchronously) receiver._handle_messages() # Final Assertion: Verify is_speaking was set to False # The code execution order is: read (returns True) -> print -> set (to False) # assert_called_with checks the arguments of the LAST call, which is the setter. assert mock_state.is_speaking is False def test_handle_messages_runtime_error(mocker): """ Tests the RuntimeError exception handler (e.g. lost WiFi connection). Uses a Mock ZMQ context to avoid 'Address already in use' errors. """ # Patch Thread so we don't accidentally spawn real threads mocker.patch("threading.Thread") # Mock the state and logging mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") # Use a MOCK ZMQ context. # This prevents the receiver from trying to bind to a real TCP port. mock_zmq_ctx = mock.Mock() # Initialize receiver with the mock context receiver = ActuationReceiver(mock_zmq_ctx) mock_state.exit_event.is_set.side_effect = [False, True] receiver._message_queue.put("Test Message") # Setup: ...BUT the service raises RuntimeError when asked to speak mock_tts = mock.Mock() mock_tts.say.side_effect = RuntimeError("Connection lost") receiver._tts_service = mock_tts # Run the loop logic manually receiver._handle_messages() # Assertions assert mock_state.exit_event.is_set.called def test_clear_queue(mocker): """ Tests that the clear_queue method properly drains all items from the message queue. """ mocker.patch("threading.Thread") # Use Mock Context mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) # Populate the queue with multiple items receiver._message_queue.put("msg1") receiver._message_queue.put("msg2") receiver._message_queue.put("msg3") assert receiver._message_queue.qsize() == 3 # Clear the queue receiver.clear_queue() # Assert the queue is empty assert receiver._message_queue.qsize() == 0