diff --git a/src/robot_interface/endpoints/actuation_receiver.py b/src/robot_interface/endpoints/actuation_receiver.py index 927efbd..13e0768 100644 --- a/src/robot_interface/endpoints/actuation_receiver.py +++ b/src/robot_interface/endpoints/actuation_receiver.py @@ -1,6 +1,9 @@ from __future__ import unicode_literals # So that we can log texts with Unicode characters import logging +import time +from threading import Thread +import Queue import zmq from robot_interface.endpoints.receiver_base import ReceiverBase @@ -27,6 +30,9 @@ class ActuationReceiver(ReceiverBase): self.create_socket(zmq_context, zmq.SUB, port) self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options self._tts_service = None + self._message_queue = Queue.Queue() + self.message_thread = Thread(target=self._handle_messages) + self.message_thread.start() def _handle_speech(self, message): """ @@ -53,8 +59,25 @@ class ActuationReceiver(ReceiverBase): if not self._tts_service: self._tts_service = state.qi_session.service("ALTextToSpeech") - # Returns instantly. Messages received while speaking will be queued. - qi.async(self._tts_service.say, text) + if (message.get("is_priority")): + # Bypass queue and speak immediately + self.clear_queue() + self._message_queue.put(text) + logging.debug("Force speaking immediately: {}".format(text)) + else: + self._message_queue.put(text) + + def clear_queue(self): + """ + Safely drains all pending messages from the queue. + """ + try: + while True: + # Remove items one by one without waiting + self._message_queue.get_nowait() + except Queue.Empty: + pass + logging.info("Message queue cleared.") def handle_message(self, message): """ @@ -65,3 +88,18 @@ class ActuationReceiver(ReceiverBase): """ if message["endpoint"] == "actuate/speech": self._handle_speech(message) + + def _handle_messages(self): + while not state.exit_event.is_set(): + try: + text = self._message_queue.get(timeout=0.1) + if not state.is_speaking: print("Started speaking.") + state.is_speaking = True + self._tts_service.say(text) + except Queue.Empty: + if state.is_speaking: print("Finished speaking.") + state.is_speaking = False + except RuntimeError: + logging.warn("Lost connection to Pepper. Please check if you're connected to the local WiFi and restart this application.") + state.exit_event.set() + diff --git a/src/robot_interface/state.py b/src/robot_interface/state.py index b625867..8d69cca 100644 --- a/src/robot_interface/state.py +++ b/src/robot_interface/state.py @@ -30,6 +30,7 @@ class State(object): self.exit_event = None self.sockets = [] self.qi_session = None + self.is_speaking = False def initialize(self): """ diff --git a/test/unit/test_actuation_receiver.py b/test/unit/test_actuation_receiver.py index 74620ab..d5918a2 100644 --- a/test/unit/test_actuation_receiver.py +++ b/test/unit/test_actuation_receiver.py @@ -3,7 +3,7 @@ import sys import mock import pytest import zmq - +import Queue from robot_interface.endpoints.actuation_receiver import ActuationReceiver @@ -18,47 +18,107 @@ def zmq_context(): context = zmq.Context() yield context +def test_force_speech_clears_queue(mocker): + """ + Tests that a force speech message clears the existing queue + and places the high-priority message at the front. + """ + mocker.patch("threading.Thread") + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") + + mock_qi = mock.Mock() + sys.modules["qi"] = mock_qi -def test_handle_unimplemented_endpoint(zmq_context): + mock_tts_service = mock.Mock() + mock_state.qi_session.service.return_value = mock_tts_service + + # Use Mock Context + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + + receiver._message_queue.put("old_message_1") + receiver._message_queue.put("old_message_2") + + assert receiver._message_queue.qsize() == 2 + + force_msg = { + "endpoint": "actuate/speech", + "data": "Emergency Notification", + "is_priority": True, + } + receiver.handle_message(force_msg) + + assert receiver._message_queue.qsize() == 1 + queued_item = receiver._message_queue.get() + assert queued_item == "Emergency Notification" + +def test_handle_unimplemented_endpoint(mocker): """ - Tests that the ``ActuationReceiver.handle_message`` method can - handle an unknown or unimplemented endpoint without raising an error. + Tests handling of unknown endpoints. """ - receiver = ActuationReceiver(zmq_context) - # Should not error + mocker.patch("threading.Thread") + + # Use Mock Context + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + receiver.handle_message({ "endpoint": "some_endpoint_that_definitely_does_not_exist", "data": None, }) - -def test_speech_message_no_data(zmq_context, mocker): +def test_speech_message_no_data(mocker): """ - Tests that the message handler logs a warning when a speech actuation - request (`actuate/speech`) is received but contains empty string data. + Tests that if the message data is empty, the receiver returns immediately + WITHOUT attempting to access the global robot state or session. """ - mock_warn = mocker.patch("logging.warn") + # 1. Prevent background threads from running + mocker.patch("threading.Thread") + + # 2. Mock the global state object + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") - receiver = ActuationReceiver(zmq_context) + # 3. Create a PropertyMock to track whenever 'qi_session' is accessed + # We attach it to the class type of the mock so it acts like a real property + mock_session_prop = mock.PropertyMock(return_value=None) + type(mock_state).qi_session = mock_session_prop + + # 4. Initialize Receiver (Mocking the context to avoid ZMQ errors) + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + + # 5. Send empty data receiver.handle_message({"endpoint": "actuate/speech", "data": ""}) - mock_warn.assert_called_with(mock.ANY) + # 6. Assertion: + # Because the code does `if not text: return` BEFORE `if not state.qi_session`, + # the state property should NEVER be read. + mock_session_prop.assert_not_called() -def test_speech_message_invalid_data(zmq_context, mocker): +def test_speech_message_invalid_data(mocker): """ - Tests that the message handler logs a warning when a speech actuation - request (`actuate/speech`) is received with data that is not a string (e.g., a boolean). + Tests that if the message data is not a string, the function returns. + :param mocker: Description """ - mock_warn = mocker.patch("logging.warn") + mocker.patch("threading.Thread") - receiver = ActuationReceiver(zmq_context) + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") + + mock_session_prop = mock.PropertyMock(return_value=None) + type(mock_state).qi_session = mock_session_prop + + # Use Mock Context + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + receiver.handle_message({"endpoint": "actuate/speech", "data": True}) - mock_warn.assert_called_with(mock.ANY) + # Because the code does `if not text: return` BEFORE `if not state.qi_session`, + # the state property should NEVER be read. + mock_session_prop.assert_not_called() - -def test_speech_no_qi(zmq_context, mocker): +def test_speech_no_qi(mocker): """ Tests the actuation receiver's behavior when processing a speech request but the global state does not have an active QI session. @@ -68,16 +128,21 @@ def test_speech_no_qi(zmq_context, mocker): mock_qi_session = mock.PropertyMock(return_value=None) type(mock_state).qi_session = mock_qi_session - receiver = ActuationReceiver(zmq_context) + mock_tts_service = mock.Mock() + + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + receiver._tts_service = mock_tts_service + receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) - mock_qi_session.assert_called() + receiver._tts_service.assert_not_called() -def test_speech(zmq_context, mocker): +def test_speech(mocker): """ Tests the core speech actuation functionality by mocking the QI TextToSpeech - service and verifying that it is called correctly. + service and verifying that the received message is put into the queue. """ mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") @@ -88,13 +153,179 @@ def test_speech(zmq_context, mocker): mock_state.qi_session = mock.Mock() mock_state.qi_session.service.return_value = mock_tts_service - receiver = ActuationReceiver(zmq_context) + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + receiver._tts_service = None receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) - mock_state.qi_session.service.assert_called_once_with("ALTextToSpeech") + assert receiver._message_queue.qsize() == 1 - mock_qi.async.assert_called_once() - call_args = mock_qi.async.call_args[0] - assert call_args[0] == mock_tts_service.say - assert call_args[1] == "Some message to speak." + queued_item = receiver._message_queue.get() + assert queued_item == "Some message to speak." + +def test_speech_priority(mocker): + """ + Tests that a priority speech message is handled correctly by clearing the queue + and placing the priority message at the front. + """ + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") + + mock_qi = mock.Mock() + sys.modules["qi"] = mock_qi + + mock_tts_service = mock.Mock() + mock_state.qi_session = mock.Mock() + mock_state.qi_session.service.return_value = mock_tts_service + + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + + receiver._message_queue.put("old_message_1") + receiver._message_queue.put("old_message_2") + + assert receiver._message_queue.qsize() == 2 + + priority_msg = { + "endpoint": "actuate/speech", + "data": "Urgent Message", + "is_priority": True, + } + receiver._handle_speech(priority_msg) + + assert receiver._message_queue.qsize() == 1 + queued_item = receiver._message_queue.get() + assert queued_item == "Urgent Message" + +def test_handle_messages_loop(mocker): + """ + Tests the background consumer loop (_handle_messages) processing an item. + Runs SYNCHRONOUSLY to ensure coverage tools pick up the lines. + """ + # Patch Thread so the real background thread NEVER starts automatically + mocker.patch("threading.Thread") + + # Mock state + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") + + # Setup initial speaking state to False (covers "Started speaking" print) + mock_state.is_speaking = False + + # Mock the TextToSpeech service + mock_tts_service = mock.Mock() + mock_state.qi_session.service.return_value = mock_tts_service + + # Initialize receiver (Thread is patched, so no thread starts) + # Use Mock Context to avoid ZMQ errors + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + + # Manually inject service (since lazy loading might handle it, but this is safer) + receiver._tts_service = mock_tts_service + + # This ensures the while loop iterates exactly once + mock_state.exit_event.is_set.side_effect = [False, True] + + # Put an item in the queue + receiver._message_queue.put("Hello World") + + # RUN MANUALLY in the main thread + # This executes the code: while -> try -> get -> if print -> speaking=True -> say + receiver._handle_messages() + + # Assertions + assert receiver._message_queue.empty() + mock_tts_service.say.assert_called_with("Hello World") + assert mock_state.is_speaking is True + + +def test_handle_messages_queue_empty(mocker): + """ + Tests the Queue.Empty exception handler in the consumer loop. + This covers the logic that resets 'state.is_speaking' to False. + """ + # Prevent the real background thread from starting + mocker.patch("threading.Thread") + + # Mock the state object + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") + + # Setup 'is_speaking' property mock + # We set return_value=True so the code enters the 'if state.is_speaking:' block. + # We use PropertyMock to track when this attribute is set. + type(mock_state).is_speaking = True + + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + + # This ensures the while loop body runs exactly once for our test + mock_state.exit_event.is_set.side_effect = [False, True] + + # Force get() to raise Queue.Empty immediately (simulate timeout) + # We patch the 'get' method on the specific queue instance of our receiver + #mocker.patch.object(receiver._message_queue, 'get', side_effect=Queue.Empty) + + # Run the loop logic manually (synchronously) + receiver._handle_messages() + + # Final Assertion: Verify is_speaking was set to False + # The code execution order is: read (returns True) -> print -> set (to False) + # assert_called_with checks the arguments of the LAST call, which is the setter. + assert mock_state.is_speaking is False + + +def test_handle_messages_runtime_error(mocker): + """ + Tests the RuntimeError exception handler (e.g. lost WiFi connection). + Uses a Mock ZMQ context to avoid 'Address already in use' errors. + """ + # Patch Thread so we don't accidentally spawn real threads + mocker.patch("threading.Thread") + + # Mock the state and logging + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") + + # Use a MOCK ZMQ context. + # This prevents the receiver from trying to bind to a real TCP port. + mock_zmq_ctx = mock.Mock() + + # Initialize receiver with the mock context + receiver = ActuationReceiver(mock_zmq_ctx) + + mock_state.exit_event.is_set.side_effect = [False, True] + + receiver._message_queue.put("Test Message") + + # Setup: ...BUT the service raises RuntimeError when asked to speak + mock_tts = mock.Mock() + mock_tts.say.side_effect = RuntimeError("Connection lost") + receiver._tts_service = mock_tts + + # Run the loop logic manually + receiver._handle_messages() + + # Assertions + assert mock_state.exit_event.is_set.called + +def test_clear_queue(mocker): + """ + Tests that the clear_queue method properly drains all items from the message queue. + """ + mocker.patch("threading.Thread") + + # Use Mock Context + mock_zmq_ctx = mock.Mock() + receiver = ActuationReceiver(mock_zmq_ctx) + + # Populate the queue with multiple items + receiver._message_queue.put("msg1") + receiver._message_queue.put("msg2") + receiver._message_queue.put("msg3") + + assert receiver._message_queue.qsize() == 3 + + # Clear the queue + receiver.clear_queue() + + # Assert the queue is empty + assert receiver._message_queue.qsize() == 0 \ No newline at end of file