# coding=utf-8 import os import mock import pytest import zmq from robot_interface.endpoints.audio_sender import AudioSender @pytest.fixture def zmq_context(): """ A pytest fixture that creates and yields a ZMQ context. :return: An initialized ZeroMQ context. :rtype: zmq.Context """ context = zmq.Context() yield context def test_no_microphone(zmq_context, mocker): """ Tests the scenario where no valid microphone can be chosen for recording. """ mock_info_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger.info") mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic") mock_choose_mic.return_value = None sender = AudioSender(zmq_context) assert sender.microphone is None sender.start() assert sender.thread is None sender.wait_until_done() # Should return early because we didn't start a thread def test_unicode_mic_name(zmq_context, mocker): """ Tests the robustness of the `AudioSender` when handling microphone names that contain Unicode characters. """ mocker.patch("robot_interface.endpoints.audio_sender.threading") mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic") mock_choose_mic.return_value = {"name": u"• Some Unicode name"} sender = AudioSender(zmq_context) assert sender.microphone is not None # `.start()` logs the name of the microphone. It should not give an error if it contains Unicode # symbols. sender.start() assert sender.thread is not None sender.wait_until_done() # Should return instantly because we didn't start a real thread def _fake_read(num_frames): """ Helper function to simulate reading raw audio data from a microphone stream. """ return os.urandom(num_frames * 4) def test_sending_audio(mocker): """ Tests the successful sending of audio data over a ZeroMQ socket. """ mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic") mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state.exit_event.is_set.side_effect = [False, False, True] mock_zmq_context = mock.Mock() send_socket = mock.Mock() mock_state.is_speaking = False # If there's something wrong with the microphone, it will raise an IOError when `read`ing. stream = mock.Mock() stream.read = _fake_read sender = AudioSender(mock_zmq_context) sender.socket.send = send_socket sender.audio.open = mock.Mock() sender.audio.open.return_value = stream sender.start() sender.wait_until_done() send_socket.assert_called() # SENDING PAUSE RESUME? def test_stream_initial_wait_exit(mocker): mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic") mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") mock_pyaudio_instance = mock_pyaudio_cls.return_value mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state.exit_event.is_set.return_value = True mock_state.active_event.is_set.return_value = False mock_zmq_context = mock.Mock() sender = AudioSender(mock_zmq_context) sender._stream() mock_pyaudio_instance.open.assert_not_called() def test_stream_pause_and_resume(mocker): mock_stream = mock.Mock() mock_stream.read.return_value = b"data" mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") mock_pyaudio_instance = mock_pyaudio_cls.return_value mock_pyaudio_instance.open.return_value = mock_stream mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state.active_event.is_set.return_value = False mock_state.exit_event.is_set.side_effect = [False, False, False, True] mock_zmq_context = mock.Mock() sender = AudioSender(mock_zmq_context) sender.socket = mock.Mock() sender._stream() assert mock_pyaudio_instance.open.call_count == 2 assert mock_stream.close.call_count == 2 assert mock_stream.stop_stream.call_count == 2 assert mock_state.active_event.wait.called def test_stream_exit_during_pause(mocker): mock_stream = mock.Mock() mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") mock_pyaudio_instance = mock_pyaudio_cls.return_value mock_pyaudio_instance.open.return_value = mock_stream mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state.active_event.is_set.return_value = False mock_state.exit_event.is_set.side_effect = [False, False, True] mock_zmq_context = mock.Mock() sender = AudioSender(mock_zmq_context) sender._stream() assert mock_pyaudio_instance.open.call_count == 1 assert mock_stream.close.call_count == 1 def test_stream_read_error_recovery(mocker): stream_fail = mock.Mock() stream_fail.read.side_effect = IOError("Overflow") stream_ok = mock.Mock() stream_ok.read.return_value = b"data" mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") mock_pyaudio_instance = mock_pyaudio_cls.return_value mock_pyaudio_instance.open.side_effect = [stream_fail, stream_ok] mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state.active_event.is_set.return_value = True mock_state.exit_event.is_set.side_effect = [False, False, False, True] mock_zmq_context = mock.Mock() sender = AudioSender(mock_zmq_context) sender.socket = mock.Mock() sender._stream() stream_fail.close.assert_called() assert mock_pyaudio_instance.open.call_count == 2 sender.socket.send.assert_called_with(b"data") def test_stream_fatal_error(mocker): mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") mock_pyaudio_instance = mock_pyaudio_cls.return_value mock_pyaudio_instance.open.side_effect = IOError("Fatal error") mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state.active_event.is_set.return_value = True mock_state.exit_event.is_set.return_value = False mock_zmq_context = mock.Mock() sender = AudioSender(mock_zmq_context) sender._stream() def test_wait_until_done(mocker): mock_zmq_context = mock.Mock() sender = AudioSender(mock_zmq_context) sender.wait_until_done() mock_thread = mocker.Mock() sender.thread = mock_thread sender.wait_until_done() mock_thread.join.assert_called_once() assert sender.thread is None def test_stream_pause_close_error(mocker): """ Tests that an IOError during stream closure (when pausing) is ignored, covering the 'pass' statement in the pause logic. """ mock_stream = mock.Mock() mock_stream.read.return_value = b"data" # Raise IOError when stopping the stream during pause mock_stream.stop_stream.side_effect = IOError("Failed to stop") mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") mock_pyaudio_instance = mock_pyaudio_cls.return_value mock_pyaudio_instance.open.return_value = mock_stream mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") # 1. First False triggers the pause block # 2. Second True resumes the loop mock_state.active_event.is_set.side_effect = [False, True] mock_state.exit_event.is_set.side_effect = [False, False, False, True] mock_zmq_context = mock.Mock() sender = AudioSender(mock_zmq_context) sender.socket = mock.Mock() sender._stream() # Verification: The error should be swallowed, and the stream should re-open assert mock_stream.stop_stream.called assert mock_pyaudio_instance.open.call_count == 2 def test_stream_finally_close_error(mocker): """ Tests that an IOError during stream closure in the finally block is ignored, covering the 'pass' statement in the finally logic. """ mock_stream = mock.Mock() mock_stream.read.return_value = b"data" # Raise IOError when stopping the stream at exit mock_stream.stop_stream.side_effect = IOError("Cleanup failed") mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") mock_pyaudio_instance = mock_pyaudio_cls.return_value mock_pyaudio_instance.open.return_value = mock_stream mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state.active_event.is_set.return_value = True mock_state.exit_event.is_set.side_effect = [False, False, True] mock_zmq_context = mock.Mock() sender = AudioSender(mock_zmq_context) sender.socket = mock.Mock() # Run sender._stream() # Assert: Should finish without raising exception despite the IOError in finally assert mock_stream.stop_stream.called def test_stream_recovery_failure(mocker): """ Tests the case where recovering from a read error (re-opening stream) also fails. This ensures the outer try-except catches exceptions from the inner except block. """ mock_stream_initial = mock.Mock() # Trigger the read error logic mock_stream_initial.read.side_effect = IOError("Read failed") mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio") mock_pyaudio_instance = mock_pyaudio_cls.return_value # First open works, Second open (recovery) fails fatally mock_pyaudio_instance.open.side_effect = [ mock_stream_initial, IOError("Recovery failed") ] mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state.active_event.is_set.return_value = True mock_state.exit_event.is_set.return_value = False mock_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger") mock_zmq_context = mock.Mock() sender = AudioSender(mock_zmq_context) sender.socket = mock.Mock() sender._stream() # Assert we hit the outer error log mock_logger.error.assert_called() def test_no_sending_if_speaking(mocker): """ Tests the successful sending of audio data over a ZeroMQ socket. """ mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic") mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state.exit_event.is_set.side_effect = [False, True] mock_zmq_context = mock.Mock() send_socket = mock.Mock() mock_state.is_speaking = True # If there's something wrong with the microphone, it will raise an IOError when `read`ing. stream = mock.Mock() stream.read = _fake_read sender = AudioSender(mock_zmq_context) sender.socket.send = send_socket sender.audio.open = mock.Mock() sender.audio.open.return_value = stream sender.start() sender.wait_until_done() send_socket.assert_not_called() def _fake_read_error(num_frames): """ Helper function to simulate an I/O error during microphone stream reading. """ raise IOError() def test_break_microphone(mocker): """ Tests the error handling when the microphone stream breaks (raises an IOError). """ mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic") mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state.exit_event.is_set.side_effect = [False, True] mock_zmq_context = mock.Mock() send_socket = mock.Mock() # If there's something wrong with the microphone, it will raise an IOError when `read`ing. stream = mock.Mock() stream.read = _fake_read_error sender = AudioSender(mock_zmq_context) sender.socket.send = send_socket sender.audio.open = mock.Mock() sender.audio.open.return_value = stream sender.start() sender.wait_until_done() send_socket.assert_not_called() def test_pyaudio_init_failure(mocker, zmq_context): """ Tests the behavior when PyAudio initialization fails (raises an IOError). """ # Prevent binding the ZMQ socket mocker.patch("robot_interface.endpoints.audio_sender.AudioSender.create_socket") # Simulate PyAudio() failing mocker.patch( "robot_interface.endpoints.audio_sender.pyaudio.PyAudio", side_effect=IOError("boom") ) sender = AudioSender(zmq_context) assert sender.audio is None assert sender.microphone is None