diff --git a/src/robot_interface/endpoints/actuation_receiver.py b/src/robot_interface/endpoints/actuation_receiver.py index fdc77d9..dd40214 100644 --- a/src/robot_interface/endpoints/actuation_receiver.py +++ b/src/robot_interface/endpoints/actuation_receiver.py @@ -1,11 +1,12 @@ from __future__ import unicode_literals # So that we can log texts with Unicode characters import logging +from threading import Thread +import Queue import zmq from robot_interface.endpoints.receiver_base import ReceiverBase from robot_interface.state import state - from robot_interface.core.config import settings from robot_interface.endpoints.gesture_settings import GestureTags @@ -32,6 +33,9 @@ class ActuationReceiver(ReceiverBase): self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options self._tts_service = None self._animation_service = None + self._message_queue = Queue.Queue() + self.message_thread = Thread(target=self._handle_messages) + self.message_thread.start() def _handle_speech(self, message): """ @@ -58,8 +62,26 @@ class ActuationReceiver(ReceiverBase): if not self._tts_service: self._tts_service = state.qi_session.service("ALTextToSpeech") - # Returns instantly. Messages received while speaking will be queued. - getattr(qi, "async")(self._tts_service.say, text) + if message.get("is_priority"): + # Bypass queue and speak immediately + self.clear_queue() + self._message_queue.put(text) + logging.debug("Force speaking immediately: {}".format(text)) + else: + self._message_queue.put(text) + + def clear_queue(self): + """ + Safely drains all pending messages from the queue. + """ + logging.info("Message queue size: {}".format(self._message_queue.qsize())) + try: + while True: + # Remove items one by one without waiting + self._message_queue.get_nowait() + except Queue.Empty: + pass + logging.info("Message queue cleared.") def _handle_gesture(self, message, is_single): """ @@ -122,6 +144,19 @@ class ActuationReceiver(ReceiverBase): if message["endpoint"] == "actuate/gesture/single": self._handle_gesture(message, True) + def _handle_messages(self): + while not state.exit_event.is_set(): + try: + text = self._message_queue.get(timeout=0.1) + state.is_speaking = True + self._tts_service.say(text) + except Queue.Empty: + state.is_speaking = False + except RuntimeError: + logging.error("Lost connection to Pepper. Please check if you're connected to the " + "local WiFi and restart this application.") + state.exit_event.set() + def endpoint_description(self): """ Extend the default endpoint description with gesture tags. diff --git a/src/robot_interface/endpoints/audio_sender.py b/src/robot_interface/endpoints/audio_sender.py index 448d6f3..dc37967 100644 --- a/src/robot_interface/endpoints/audio_sender.py +++ b/src/robot_interface/endpoints/audio_sender.py @@ -89,6 +89,7 @@ class AudioSender(SocketBase): try: while not state.exit_event.is_set(): data = stream.read(chunk) + if (state.is_speaking): continue # Do not send audio while the robot is speaking self.socket.send(data) except IOError as e: logger.error("Stopped listening: failed to get audio from microphone.", exc_info=e) diff --git a/src/robot_interface/endpoints/video_sender.py b/src/robot_interface/endpoints/video_sender.py index 9fa1132..d822352 100644 --- a/src/robot_interface/endpoints/video_sender.py +++ b/src/robot_interface/endpoints/video_sender.py @@ -6,6 +6,7 @@ from robot_interface.endpoints.socket_base import SocketBase from robot_interface.state import state from robot_interface.core.config import settings + class VideoSender(SocketBase): """ Video sender endpoint, responsible for sending video frames. diff --git a/src/robot_interface/state.py b/src/robot_interface/state.py index b625867..8d69cca 100644 --- a/src/robot_interface/state.py +++ b/src/robot_interface/state.py @@ -30,6 +30,7 @@ class State(object): self.exit_event = None self.sockets = [] self.qi_session = None + self.is_speaking = False def initialize(self): """ diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 0000000..d387f28 --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,10 @@ +from mock import patch, MagicMock + +import pytest + + +@pytest.fixture(autouse=True) +def mock_zmq_context(): + with patch("zmq.Context") as mock: + mock.instance.return_value = MagicMock() + yield mock \ No newline at end of file diff --git a/test/unit/test_actuation_receiver.py b/test/unit/test_actuation_receiver.py index 1197196..e6bacd2 100644 --- a/test/unit/test_actuation_receiver.py +++ b/test/unit/test_actuation_receiver.py @@ -20,46 +20,109 @@ def zmq_context(): yield context -def test_handle_unimplemented_endpoint(zmq_context): +def test_force_speech_clears_queue(mocker): """ - Tests that the ``ActuationReceiver.handle_message`` method can - handle an unknown or unimplemented endpoint without raising an error. + Tests that a force speech message clears the existing queue + and places the high-priority message at the front. """ - receiver = ActuationReceiver(zmq_context) - # Should not error + mocker.patch("threading.Thread") + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") + + mock_qi = mock.Mock() + sys.modules["qi"] = mock_qi + + mock_tts_service = mock.Mock() + mock_state.qi_session.service.return_value = mock_tts_service + + # Use Mock Context + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + + receiver._message_queue.put("old_message_1") + receiver._message_queue.put("old_message_2") + + assert receiver._message_queue.qsize() == 2 + + force_msg = { + "endpoint": "actuate/speech", + "data": "Emergency Notification", + "is_priority": True, + } + receiver.handle_message(force_msg) + + assert receiver._message_queue.qsize() == 1 + queued_item = receiver._message_queue.get() + assert queued_item == "Emergency Notification" + + +def test_handle_unimplemented_endpoint(mocker): + """ + Tests handling of unknown endpoints. + """ + mocker.patch("threading.Thread") + + # Use Mock Context + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + receiver.handle_message({ "endpoint": "some_endpoint_that_definitely_does_not_exist", "data": None, }) -def test_speech_message_no_data(zmq_context, mocker): +def test_speech_message_no_data(mocker): """ - Tests that the message handler logs a warning when a speech actuation - request (`actuate/speech`) is received but contains empty string data. + Tests that if the message data is empty, the receiver returns immediately + WITHOUT attempting to access the global robot state or session. """ - mock_warn = mocker.patch("logging.warn") + # 1. Prevent background threads from running + mocker.patch("threading.Thread") + + # 2. Mock the global state object + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") - receiver = ActuationReceiver(zmq_context) + # 3. Create a PropertyMock to track whenever 'qi_session' is accessed + # We attach it to the class type of the mock so it acts like a real property + mock_session_prop = mock.PropertyMock(return_value=None) + type(mock_state).qi_session = mock_session_prop + + # 4. Initialize Receiver (Mocking the context to avoid ZMQ errors) + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + + # 5. Send empty data receiver.handle_message({"endpoint": "actuate/speech", "data": ""}) - mock_warn.assert_called_with(mock.ANY) + # 6. Assertion: + # Because the code does `if not text: return` BEFORE `if not state.qi_session`, + # the state property should NEVER be read. + mock_session_prop.assert_not_called() -def test_speech_message_invalid_data(zmq_context, mocker): +def test_speech_message_invalid_data(mocker): """ - Tests that the message handler logs a warning when a speech actuation - request (`actuate/speech`) is received with data that is not a string (e.g., a boolean). + Tests that if the message data is not a string, the function returns. + :param mocker: Description """ - mock_warn = mocker.patch("logging.warn") + mocker.patch("threading.Thread") - receiver = ActuationReceiver(zmq_context) + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") + + mock_session_prop = mock.PropertyMock(return_value=None) + type(mock_state).qi_session = mock_session_prop + + # Use Mock Context + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + receiver.handle_message({"endpoint": "actuate/speech", "data": True}) - mock_warn.assert_called_with(mock.ANY) + # Because the code does `if not text: return` BEFORE `if not state.qi_session`, + # the state property should NEVER be read. + mock_session_prop.assert_not_called() - -def test_speech_no_qi(zmq_context, mocker): +def test_speech_no_qi(mocker): """ Tests the actuation receiver's behavior when processing a speech request but the global state does not have an active QI session. @@ -69,16 +132,21 @@ def test_speech_no_qi(zmq_context, mocker): mock_qi_session = mock.PropertyMock(return_value=None) type(mock_state).qi_session = mock_qi_session - receiver = ActuationReceiver(zmq_context) + mock_tts_service = mock.Mock() + + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + receiver._tts_service = mock_tts_service + receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) - mock_qi_session.assert_called() + receiver._tts_service.assert_not_called() -def test_speech(zmq_context, mocker): +def test_speech(mocker): """ Tests the core speech actuation functionality by mocking the QI TextToSpeech - service and verifying that it is called correctly. + service and verifying that the received message is put into the queue. """ mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") @@ -89,17 +157,182 @@ def test_speech(zmq_context, mocker): mock_state.qi_session = mock.Mock() mock_state.qi_session.service.return_value = mock_tts_service - receiver = ActuationReceiver(zmq_context) + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + receiver._tts_service = None receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) - mock_state.qi_session.service.assert_called_once_with("ALTextToSpeech") + assert receiver._message_queue.qsize() == 1 - getattr(mock_qi, "async").assert_called_once() - call_args = getattr(mock_qi, "async").call_args[0] - assert call_args[0] == mock_tts_service.say - assert call_args[1] == "Some message to speak." + queued_item = receiver._message_queue.get() + assert queued_item == "Some message to speak." +def test_speech_priority(mocker): + """ + Tests that a priority speech message is handled correctly by clearing the queue + and placing the priority message at the front. + """ + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") + + mock_qi = mock.Mock() + sys.modules["qi"] = mock_qi + + mock_tts_service = mock.Mock() + mock_state.qi_session = mock.Mock() + mock_state.qi_session.service.return_value = mock_tts_service + + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + + receiver._message_queue.put("old_message_1") + receiver._message_queue.put("old_message_2") + + assert receiver._message_queue.qsize() == 2 + + priority_msg = { + "endpoint": "actuate/speech", + "data": "Urgent Message", + "is_priority": True, + } + receiver._handle_speech(priority_msg) + + assert receiver._message_queue.qsize() == 1 + queued_item = receiver._message_queue.get() + assert queued_item == "Urgent Message" + +def test_handle_messages_loop(mocker): + """ + Tests the background consumer loop (_handle_messages) processing an item. + Runs SYNCHRONOUSLY to ensure coverage tools pick up the lines. + """ + # Patch Thread so the real background thread NEVER starts automatically + mocker.patch("threading.Thread") + + # Mock state + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") + + # Setup initial speaking state to False (covers "Started speaking" print) + mock_state.is_speaking = False + + # Mock the TextToSpeech service + mock_tts_service = mock.Mock() + mock_state.qi_session.service.return_value = mock_tts_service + + # Initialize receiver (Thread is patched, so no thread starts) + # Use Mock Context to avoid ZMQ errors + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + + # Manually inject service (since lazy loading might handle it, but this is safer) + receiver._tts_service = mock_tts_service + + # This ensures the while loop iterates exactly once + mock_state.exit_event.is_set.side_effect = [False, True] + + # Put an item in the queue + receiver._message_queue.put("Hello World") + + # RUN MANUALLY in the main thread + # This executes the code: while -> try -> get -> if print -> speaking=True -> say + receiver._handle_messages() + + # Assertions + assert receiver._message_queue.empty() + mock_tts_service.say.assert_called_with("Hello World") + assert mock_state.is_speaking is True + + +def test_handle_messages_queue_empty(mocker): + """ + Tests the Queue.Empty exception handler in the consumer loop. + This covers the logic that resets 'state.is_speaking' to False. + """ + # Prevent the real background thread from starting + mocker.patch("threading.Thread") + + # Mock the state object + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") + + # Setup 'is_speaking' property mock + # We set return_value=True so the code enters the 'if state.is_speaking:' block. + # We use PropertyMock to track when this attribute is set. + type(mock_state).is_speaking = True + + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + + # This ensures the while loop body runs exactly once for our test + mock_state.exit_event.is_set.side_effect = [False, True] + + # Force get() to raise Queue.Empty immediately (simulate timeout) + # We patch the 'get' method on the specific queue instance of our receiver + #mocker.patch.object(receiver._message_queue, 'get', side_effect=Queue.Empty) + + # Run the loop logic manually (synchronously) + receiver._handle_messages() + + # Final Assertion: Verify is_speaking was set to False + # The code execution order is: read (returns True) -> print -> set (to False) + # assert_called_with checks the arguments of the LAST call, which is the setter. + assert mock_state.is_speaking is False + + +def test_handle_messages_runtime_error(mocker): + """ + Tests the RuntimeError exception handler (e.g. lost WiFi connection). + Uses a Mock ZMQ context to avoid 'Address already in use' errors. + """ + # Patch Thread so we don't accidentally spawn real threads + mocker.patch("threading.Thread") + + # Mock the state and logging + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") + + # Use a MOCK ZMQ context. + # This prevents the receiver from trying to bind to a real TCP port. + mock_zmq_ctx = mock.Mock() + + # Initialize receiver with the mock context + receiver = ActuationReceiver(mock_zmq_ctx) + + mock_state.exit_event.is_set.side_effect = [False, True] + + receiver._message_queue.put("Test Message") + + # Setup: ...BUT the service raises RuntimeError when asked to speak + mock_tts = mock.Mock() + mock_tts.say.side_effect = RuntimeError("Connection lost") + receiver._tts_service = mock_tts + + # Run the loop logic manually + receiver._handle_messages() + + # Assertions + assert mock_state.exit_event.is_set.called + +def test_clear_queue(mocker): + """ + Tests that the clear_queue method properly drains all items from the message queue. + """ + mocker.patch("threading.Thread") + + # Use Mock Context + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + + # Populate the queue with multiple items + receiver._message_queue.put("msg1") + receiver._message_queue.put("msg2") + receiver._message_queue.put("msg3") + + assert receiver._message_queue.qsize() == 3 + + # Clear the queue + receiver.clear_queue() + + # Assert the queue is empty + assert receiver._message_queue.qsize() == 0 def test_gesture_no_data(zmq_context, mocker): receiver = ActuationReceiver(zmq_context) diff --git a/test/unit/test_audio_sender.py b/test/unit/test_audio_sender.py index 4e337c2..c6ca87f 100644 --- a/test/unit/test_audio_sender.py +++ b/test/unit/test_audio_sender.py @@ -77,7 +77,8 @@ def test_sending_audio(mocker): mock_zmq_context = mock.Mock() send_socket = mock.Mock() - + + mock_state.is_speaking = False # If there's something wrong with the microphone, it will raise an IOError when `read`ing. stream = mock.Mock() stream.read = _fake_read @@ -93,6 +94,36 @@ def test_sending_audio(mocker): send_socket.assert_called() +def test_no_sending_if_speaking(mocker): + """ + Tests the successful sending of audio data over a ZeroMQ socket. + """ + mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic") + mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} + + mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") + mock_state.exit_event.is_set.side_effect = [False, True] + + mock_zmq_context = mock.Mock() + send_socket = mock.Mock() + + mock_state.is_speaking = True + + # If there's something wrong with the microphone, it will raise an IOError when `read`ing. + stream = mock.Mock() + stream.read = _fake_read + + sender = AudioSender(mock_zmq_context) + sender.socket.send = send_socket + sender.audio.open = mock.Mock() + sender.audio.open.return_value = stream + + sender.start() + sender.wait_until_done() + + send_socket.assert_not_called() + + def _fake_read_error(num_frames): """ Helper function to simulate an I/O error during microphone stream reading.