From 358c4f687243fb65c9df5033dc8b9c7f47aaeed2 Mon Sep 17 00:00:00 2001 From: Storm Date: Mon, 22 Dec 2025 13:36:18 +0100 Subject: [PATCH] feat: pause RI Pause functionality in RI implemented. The audio_sender and video_sender stop sending when paused. ref: N25B-350 --- src/robot_interface/endpoints/audio_sender.py | 63 +++++- .../endpoints/main_receiver.py | 26 +++ src/robot_interface/endpoints/video_sender.py | 4 + src/robot_interface/main.py | 1 + src/robot_interface/state.py | 3 + test/unit/test_audio_sender.py | 214 +++++++++++++++++- test/unit/test_video_sender.py | 208 +++++++++++------ 7 files changed, 435 insertions(+), 84 deletions(-) diff --git a/src/robot_interface/endpoints/audio_sender.py b/src/robot_interface/endpoints/audio_sender.py index 448d6f3..8399cb9 100644 --- a/src/robot_interface/endpoints/audio_sender.py +++ b/src/robot_interface/endpoints/audio_sender.py @@ -77,21 +77,60 @@ class AudioSender(SocketBase): chunk = audio_settings.chunk_size # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD # Docs say this only raises an error if neither `input` nor `output` is True - stream = self.audio.open( - format=pyaudio.paFloat32, - channels=audio_settings.channels, - rate=audio_settings.sample_rate, - input=True, - input_device_index=self.microphone["index"], - frames_per_buffer=chunk, - ) + def open_stream(): + return self.audio.open( + format=pyaudio.paFloat32, + channels=audio_settings.channels, + rate=audio_settings.sample_rate, + input=True, + input_device_index=self.microphone["index"], + frames_per_buffer=chunk, + ) + + stream = None try: + state.active_event.wait() # Wait until the system is not paused + + # Test in case exit_event was set while waiting + if not state.exit_event.is_set(): + stream = open_stream() + while not state.exit_event.is_set(): - data = stream.read(chunk) - self.socket.send(data) + if not state.active_event.is_set(): # when paused + # Stop and close the stream if it is open to prevent buffer overflow + if stream: + try: + stream.stop_stream() + stream.close() + except IOError: + pass # Ignore errors on closing + stream = None + + state.active_event.wait() # Wait until unpaused + + # Check if exit_event was set while waiting + if state.exit_event.is_set(): + break + + stream = open_stream() + + if stream: + try: + data = stream.read(chunk) + self.socket.send(data) + except IOError as e: + logger.warn("Audio read error occurred.", exc_info=e) + if stream: + stream.close() + stream = open_stream() + except IOError as e: logger.error("Stopped listening: failed to get audio from microphone.", exc_info=e) finally: - stream.stop_stream() - stream.close() + if stream: + try: + stream.stop_stream() + stream.close() + except IOError: + pass # Ignore errors on closing \ No newline at end of file diff --git a/src/robot_interface/endpoints/main_receiver.py b/src/robot_interface/endpoints/main_receiver.py index 2882970..1c4b648 100644 --- a/src/robot_interface/endpoints/main_receiver.py +++ b/src/robot_interface/endpoints/main_receiver.py @@ -71,6 +71,30 @@ class MainReceiver(ReceiverBase): return MainReceiver._handle_port_negotiation(message) return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."} + + @staticmethod + def _handle_pause(message): + """ + Handle a pause request. Pauses or resumes the video and audio streams. + + :param message: The pause request message. + :type message: dict + + :return: A response dictionary indicating success. + :rtype: dict[str, str] + """ + if message.get("data"): + if state.active_event.is_set(): + state.active_event.clear() + return {"endpoint": "pause", "data": "Streams paused."} + else: + return {"endpoint": "pause", "data": "Streams are already paused."} + else: + if not state.active_event.is_set(): + state.active_event.set() + return {"endpoint": "pause", "data": "Streams resumed."} + else: + return {"endpoint": "pause", "data": "Streams are already running."} def handle_message(self, message): """ @@ -88,5 +112,7 @@ class MainReceiver(ReceiverBase): return self._handle_ping(message) elif message["endpoint"].startswith("negotiate"): return self._handle_negotiation(message) + elif message["endpoint"] == "pause": + return self._handle_pause(message) return {"endpoint": "error", "data": "The requested endpoint is not supported."} diff --git a/src/robot_interface/endpoints/video_sender.py b/src/robot_interface/endpoints/video_sender.py index 9fa1132..3428882 100644 --- a/src/robot_interface/endpoints/video_sender.py +++ b/src/robot_interface/endpoints/video_sender.py @@ -52,6 +52,10 @@ class VideoSender(SocketBase): :type vid_stream_name: str """ while not state.exit_event.is_set(): + state.active_event.wait() # Wait until the system is not paused + if state.exit_event.is_set(): + break + try: img = vid_service.getImageRemote(vid_stream_name) #Possibly limit images sent if queuing issues arise diff --git a/src/robot_interface/main.py b/src/robot_interface/main.py index 816e53b..89dae79 100644 --- a/src/robot_interface/main.py +++ b/src/robot_interface/main.py @@ -84,6 +84,7 @@ def main(): context = zmq.Context() state.initialize() + state.active_event.set() try: main_loop(context) diff --git a/src/robot_interface/state.py b/src/robot_interface/state.py index b625867..0e8dc32 100644 --- a/src/robot_interface/state.py +++ b/src/robot_interface/state.py @@ -30,6 +30,7 @@ class State(object): self.exit_event = None self.sockets = [] self.qi_session = None + self.active_event = None def initialize(self): """ @@ -48,6 +49,8 @@ class State(object): signal.signal(signal.SIGINT, handle_exit) signal.signal(signal.SIGTERM, handle_exit) + self.active_event = threading.Event() + self.qi_session = get_qi_session() self.is_initialized = True diff --git a/test/unit/test_audio_sender.py b/test/unit/test_audio_sender.py index 4e337c2..00ff740 100644 --- a/test/unit/test_audio_sender.py +++ b/test/unit/test_audio_sender.py @@ -33,7 +33,6 @@ def test_no_microphone(zmq_context, mocker): sender.start() assert sender.thread is None - mock_info_logger.assert_called() sender.wait_until_done() # Should return early because we didn't start a thread @@ -73,7 +72,7 @@ def test_sending_audio(mocker): mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") - mock_state.exit_event.is_set.side_effect = [False, True] + mock_state.exit_event.is_set.side_effect = [False, False, True] mock_zmq_context = mock.Mock() send_socket = mock.Mock() @@ -92,6 +91,217 @@ def test_sending_audio(mocker): send_socket.assert_called() +# SENDING PAUSE RESUME? + +def test_stream_initial_wait_exit(mocker): + mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic") + mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} + + mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") + mock_pyaudio_instance = mock_pyaudio_cls.return_value + + mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") + mock_state.exit_event.is_set.return_value = True + mock_state.active_event.is_set.return_value = False + + mock_zmq_context = mock.Mock() + + sender = AudioSender(mock_zmq_context) + sender._stream() + + mock_pyaudio_instance.open.assert_not_called() + +def test_stream_pause_and_resume(mocker): + mock_stream = mock.Mock() + mock_stream.read.return_value = b"data" + + mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") + mock_pyaudio_instance = mock_pyaudio_cls.return_value + mock_pyaudio_instance.open.return_value = mock_stream + + mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") + mock_state.active_event.is_set.return_value = False + + mock_state.exit_event.is_set.side_effect = [False, False, False, True] + + mock_zmq_context = mock.Mock() + sender = AudioSender(mock_zmq_context) + sender.socket = mock.Mock() + + sender._stream() + + assert mock_pyaudio_instance.open.call_count == 2 + + assert mock_stream.close.call_count == 2 + assert mock_stream.stop_stream.call_count == 2 + + assert mock_state.active_event.wait.called + +def test_stream_exit_during_pause(mocker): + mock_stream = mock.Mock() + mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") + mock_pyaudio_instance = mock_pyaudio_cls.return_value + mock_pyaudio_instance.open.return_value = mock_stream + + mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") + mock_state.active_event.is_set.return_value = False + + mock_state.exit_event.is_set.side_effect = [False, False, True] + + mock_zmq_context = mock.Mock() + sender = AudioSender(mock_zmq_context) + sender._stream() + + assert mock_pyaudio_instance.open.call_count == 1 + assert mock_stream.close.call_count == 1 + +def test_stream_read_error_recovery(mocker): + stream_fail = mock.Mock() + stream_fail.read.side_effect = IOError("Overflow") + + stream_ok = mock.Mock() + stream_ok.read.return_value = b"data" + + mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") + mock_pyaudio_instance = mock_pyaudio_cls.return_value + mock_pyaudio_instance.open.side_effect = [stream_fail, stream_ok] + + mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") + mock_state.active_event.is_set.return_value = True + mock_state.exit_event.is_set.side_effect = [False, False, False, True] + + mock_zmq_context = mock.Mock() + sender = AudioSender(mock_zmq_context) + sender.socket = mock.Mock() + sender._stream() + stream_fail.close.assert_called() + assert mock_pyaudio_instance.open.call_count == 2 + sender.socket.send.assert_called_with(b"data") + + +def test_stream_fatal_error(mocker): + mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") + mock_pyaudio_instance = mock_pyaudio_cls.return_value + mock_pyaudio_instance.open.side_effect = IOError("Fatal error") + + mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") + mock_state.active_event.is_set.return_value = True + mock_state.exit_event.is_set.return_value = False + + mock_zmq_context = mock.Mock() + sender = AudioSender(mock_zmq_context) + sender._stream() + +def test_wait_until_done(mocker): + mock_zmq_context = mock.Mock() + + sender = AudioSender(mock_zmq_context) + + sender.wait_until_done() + + mock_thread = mocker.Mock() + sender.thread = mock_thread + sender.wait_until_done() + + mock_thread.join.assert_called_once() + assert sender.thread is None + +def test_stream_pause_close_error(mocker): + """ + Tests that an IOError during stream closure (when pausing) is ignored, + covering the 'pass' statement in the pause logic. + """ + mock_stream = mock.Mock() + mock_stream.read.return_value = b"data" + # Raise IOError when stopping the stream during pause + mock_stream.stop_stream.side_effect = IOError("Failed to stop") + + mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") + mock_pyaudio_instance = mock_pyaudio_cls.return_value + mock_pyaudio_instance.open.return_value = mock_stream + + mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") + + # 1. First False triggers the pause block + # 2. Second True resumes the loop + mock_state.active_event.is_set.side_effect = [False, True] + + mock_state.exit_event.is_set.side_effect = [False, False, False, True] + + mock_zmq_context = mock.Mock() + sender = AudioSender(mock_zmq_context) + sender.socket = mock.Mock() + + sender._stream() + + # Verification: The error should be swallowed, and the stream should re-open + assert mock_stream.stop_stream.called + assert mock_pyaudio_instance.open.call_count == 2 + + +def test_stream_finally_close_error(mocker): + """ + Tests that an IOError during stream closure in the finally block is ignored, + covering the 'pass' statement in the finally logic. + """ + mock_stream = mock.Mock() + mock_stream.read.return_value = b"data" + # Raise IOError when stopping the stream at exit + mock_stream.stop_stream.side_effect = IOError("Cleanup failed") + + mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") + mock_pyaudio_instance = mock_pyaudio_cls.return_value + mock_pyaudio_instance.open.return_value = mock_stream + + mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") + mock_state.active_event.is_set.return_value = True + mock_state.exit_event.is_set.side_effect = [False, False, True] + + mock_zmq_context = mock.Mock() + sender = AudioSender(mock_zmq_context) + sender.socket = mock.Mock() + + # Run + sender._stream() + + # Assert: Should finish without raising exception despite the IOError in finally + assert mock_stream.stop_stream.called + + +def test_stream_recovery_failure(mocker): + """ + Tests the case where recovering from a read error (re-opening stream) also fails. + This ensures the outer try-except catches exceptions from the inner except block. + """ + mock_stream_initial = mock.Mock() + # Trigger the read error logic + mock_stream_initial.read.side_effect = IOError("Read failed") + + mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") + mock_pyaudio_instance = mock_pyaudio_cls.return_value + + # First open works, Second open (recovery) fails fatally + mock_pyaudio_instance.open.side_effect = [ + mock_stream_initial, + IOError("Recovery failed") + ] + + mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") + mock_state.active_event.is_set.return_value = True + mock_state.exit_event.is_set.return_value = False + + mock_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger") + + mock_zmq_context = mock.Mock() + sender = AudioSender(mock_zmq_context) + sender.socket = mock.Mock() + + sender._stream() + + # Assert we hit the outer error log + mock_logger.error.assert_called() + + def _fake_read_error(num_frames): """ diff --git a/test/unit/test_video_sender.py b/test/unit/test_video_sender.py index 430f658..55d739a 100644 --- a/test/unit/test_video_sender.py +++ b/test/unit/test_video_sender.py @@ -1,99 +1,167 @@ # coding=utf-8 - import mock import pytest import zmq from robot_interface.endpoints.video_sender import VideoSender -from robot_interface.state import state -from robot_interface.core.config import settings - @pytest.fixture def zmq_context(): - """Provide a ZMQ context.""" - yield zmq.Context() + """ + Yields a real ZMQ context for socket creation. + """ + context = zmq.Context() + yield context + context.term() +def test_init_defaults(zmq_context, mocker): + """ + Test initialization of the VideoSender. + """ + # We patch settings to ensure valid port access inside the class logic, + # although the default arg is evaluated at import time. + mocker.patch("robot_interface.endpoints.video_sender.settings") + + mock_zmq = mock.Mock() + sender = VideoSender(mock_zmq) + + # Verify socket type is PUB + assert sender.identifier == "video" -def _patch_basics(mocker): - """Common patches: prevent real threads, port binds, and state errors.""" - mocker.patch("robot_interface.endpoints.socket_base.zmq.Socket.bind") - mocker.patch("robot_interface.endpoints.video_sender.threading.Thread") - mocker.patch.object(state, "is_initialized", True) +def test_start_no_qi_session(mocker): + """ + Test that the loop does not start if no Qi session is available. + """ + # Mock state to return None for qi_session + mock_state = mocker.patch("robot_interface.endpoints.video_sender.state") + mock_state.qi_session = None + + mock_threading = mocker.patch("robot_interface.endpoints.video_sender.threading") - -def _patch_exit_event(mocker): - """Make exit_event stop the loop after one iteration.""" - fake_event = mock.Mock() - fake_event.is_set.side_effect = [False, True] - mocker.patch.object(state, "exit_event", fake_event) - - -def test_no_qi_session(zmq_context, mocker): - """Video loop should not start without a qi_session.""" - _patch_basics(mocker) - mocker.patch.object(state, "qi_session", None) - - sender = VideoSender(zmq_context) + mock_zmq = mock.Mock() + sender = VideoSender(mock_zmq) sender.start_video_rcv() - assert not hasattr(sender, "thread") - - -def test_video_streaming(zmq_context, mocker): - """VideoSender should send retrieved image data.""" - _patch_basics(mocker) - _patch_exit_event(mocker) - - # Pepper's image buffer lives at index 6 - mocker.patch.object(settings.video_config, "image_buffer", 6) + # Assertions + mock_threading.Thread.assert_not_called() +def test_start_success(mocker): + """ + Test successful startup of the video receiver thread. + """ + # Mock the Qi Session and Service + mock_state = mocker.patch("robot_interface.endpoints.video_sender.state") + mock_session = mock.Mock() + mock_state.qi_session = mock_session + mock_video_service = mock.Mock() - mock_video_service.getImageRemote.return_value = [None]*6 + ["fake_img"] + mock_session.service.return_value = mock_video_service + mock_video_service.subscribeCamera.return_value = "test_subscriber_id" - fake_session = mock.Mock() - fake_session.service.return_value = mock_video_service - mocker.patch.object(state, "qi_session", fake_session) + # Mock Settings + mock_settings = mocker.patch("robot_interface.endpoints.video_sender.settings") + mock_settings.video_config.camera_index = 0 + mock_settings.video_config.resolution = 2 + mock_settings.video_config.color_space = 11 + mock_settings.video_config.fps = 30 + mock_settings.video_config.stream_name = "test_stream" - mocker.patch.object( - fake_session.service("ALVideoDevice"), - "subscribeCamera", - return_value="stream_name" - ) - - sender = VideoSender(zmq_context) - send_socket = mock.Mock() - sender.socket.send = send_socket + mock_threading = mocker.patch("robot_interface.endpoints.video_sender.threading") + # Run + mock_zmq = mock.Mock() + sender = VideoSender(mock_zmq) sender.start_video_rcv() - sender.video_rcv_loop(mock_video_service, "stream_name") - send_socket.assert_called_with("fake_img") + # Assertions + mock_session.service.assert_called_with("ALVideoDevice") + mock_video_service.subscribeCamera.assert_called_with("test_stream", 0, 2, 11, 30) + + mock_threading.Thread.assert_called_once() + # Verify arguments passed to the thread target + call_args = mock_threading.Thread.call_args[1] + assert call_args["target"] == sender.video_rcv_loop + assert call_args["args"] == (mock_video_service, "test_subscriber_id") + + # Ensure thread was started + mock_threading.Thread.return_value.start.assert_called_once() +def test_video_loop_happy_path(mocker): + """ + Test the main loop: Wait -> Get Image -> Send -> Repeat/Exit. + """ + # Mock settings for image buffer index + mock_settings = mocker.patch("robot_interface.endpoints.video_sender.settings") + mock_settings.video_config.image_buffer = 6 -def test_video_receive_error(zmq_context, mocker): - """Errors retrieving images should not call send().""" - _patch_basics(mocker) - _patch_exit_event(mocker) + # Mock Video Service to return a fake image structure + # Standard NaoQi image is a list, binary data is usually at index 6 + fake_image_data = b"binary_jpeg_data" + fake_image_list = [0] * 7 + fake_image_list[6] = fake_image_data + + mock_service = mock.Mock() + mock_service.getImageRemote.return_value = fake_image_list - mock_video_service = mock.Mock() - mock_video_service.getImageRemote.side_effect = Exception("boom") + # Mock Events: + # exit_event: False (start), False (loop once), True (break) + mock_state = mocker.patch("robot_interface.endpoints.video_sender.state") + mock_state.exit_event.is_set.side_effect = [False, False, True] - fake_session = mock.Mock() - fake_session.service.return_value = mock_video_service - mocker.patch.object(state, "qi_session", fake_session) + # Run + mock_zmq = mock.Mock() + sender = VideoSender(mock_zmq) + sender.socket = mock.Mock() # Mock the socket to verify send + + sender.video_rcv_loop(mock_service, "sub_id") - mocker.patch.object( - fake_session.service("ALVideoDevice"), - "subscribeCamera", - return_value="stream_name" - ) + # Assertions + mock_state.active_event.wait.assert_called() + mock_service.getImageRemote.assert_called_with("sub_id") + sender.socket.send.assert_called_with(fake_image_data) - sender = VideoSender(zmq_context) - send_socket = mock.Mock() - sender.socket.send = send_socket +def test_video_loop_exit_during_wait(zmq_context, mocker): + """ + Test that the loop breaks immediately if exit_event is set while waiting. + """ + mock_service = mock.Mock() + mock_state = mocker.patch("robot_interface.endpoints.video_sender.state") + + # 1. Loop check: False (enter loop) + # 2. Wait happens (mock returns instantly) + # 3. Post-wait check: True (break) + mock_state.exit_event.is_set.side_effect = [False, True] - sender.start_video_rcv() - sender.video_rcv_loop(mock_video_service, "stream_name") + mock_zqm = mock.Mock() + sender = VideoSender(mock_zqm) + sender.video_rcv_loop(mock_service, "sub_id") - send_socket.assert_not_called() + # Assert we never tried to get an image + mock_service.getImageRemote.assert_not_called() + +def test_video_loop_exception_handling(zmq_context, mocker): + """ + Test that exceptions during image retrieval are caught and logged, + and do not crash the thread. + """ + mock_settings = mocker.patch("robot_interface.endpoints.video_sender.settings") + mock_logger = mocker.patch("robot_interface.endpoints.video_sender.logging") + + mock_service = mock.Mock() + # First call raises Exception, Second call works (if we allowed it, but we exit) + mock_service.getImageRemote.side_effect = Exception("Camera disconnected") + + mock_state = mocker.patch("robot_interface.endpoints.video_sender.state") + # Loop runs once then exits + mock_state.exit_event.is_set.side_effect = [False, False, True] + + mock_zmq = mock.Mock() + sender = VideoSender(mock_zmq) + sender.socket = mock.Mock() + + sender.video_rcv_loop(mock_service, "sub_id") + + # Assertions + mock_logger.warn.assert_called_with("Failed to retrieve video image from robot.") + # Ensure loop didn't crash; it should have completed the iteration and checked exit_event + assert mock_state.exit_event.is_set.call_count >= 2 \ No newline at end of file