From fab5127cace233d50f9f5903d6afa4f838e75043 Mon Sep 17 00:00:00 2001 From: Twirre Meulenbelt <43213592+TwirreM@users.noreply.github.com> Date: Sun, 2 Nov 2025 16:12:56 +0100 Subject: [PATCH] feat: add application parameter to choose a custom microphone ref: N25B-119 --- src/robot_interface/endpoints/audio_sender.py | 12 +++- src/robot_interface/utils/microphone.py | 51 ++++++++++++++++ test/common/microphone_utils.py | 59 ++++++++++++++++++- test/unit/test_audio_sender.py | 8 +-- 4 files changed, 122 insertions(+), 8 deletions(-) diff --git a/src/robot_interface/endpoints/audio_sender.py b/src/robot_interface/endpoints/audio_sender.py index 5cd5a6b..7365816 100644 --- a/src/robot_interface/endpoints/audio_sender.py +++ b/src/robot_interface/endpoints/audio_sender.py @@ -7,7 +7,7 @@ import zmq from robot_interface.endpoints.socket_base import SocketBase from robot_interface.state import state -from robot_interface.utils.microphone import choose_mic_default +from robot_interface.utils.microphone import choose_mic logger = logging.getLogger(__name__) @@ -17,10 +17,16 @@ class AudioSender(SocketBase): def __init__(self, zmq_context, port=5558): super(AudioSender, self).__init__(str("audio")) # Convert future's unicode_literal to str self.create_socket(zmq_context, zmq.PUB, port) - self.audio = pyaudio.PyAudio() - self.microphone = choose_mic_default(self.audio) self.thread = None + try: + self.audio = pyaudio.PyAudio() + self.microphone = choose_mic(self.audio) + except IOError as e: + logger.warning("PyAudio is not available.", exc_info=e) + self.audio = None + self.microphone = None + def start(self): """ Start sending audio in a different thread. diff --git a/src/robot_interface/utils/microphone.py b/src/robot_interface/utils/microphone.py index c37ed0b..877ca3f 100644 --- a/src/robot_interface/utils/microphone.py +++ b/src/robot_interface/utils/microphone.py @@ -1,5 +1,6 @@ from __future__ import unicode_literals # So that `print` can print Unicode characters in names import logging +import sys logger = logging.getLogger(__name__) @@ -67,3 +68,53 @@ def choose_mic_default(audio): return audio.get_default_input_device_info() except IOError: return None + + +def choose_mic_arguments(audio): + """ + Get a microphone to use from command line arguments. + + :param audio: An instance of PyAudio to use. + :type audio: pyaudio.PyAudio + + :return: A dictionary from PyAudio containing information about the microphone to use, or None + if there is no microphone satisfied by the arguments. + :rtype: dict | None + """ + microphone_name = None + for i, arg in enumerate(sys.argv): + if arg == "--microphone" and len(sys.argv) > i+1: + microphone_name = sys.argv[i+1].strip() + if arg.startswith("--microphone="): + microphone_name = arg[13:].strip() + + if not microphone_name: return None + + available_mics = list(get_microphones(audio)) + for mic in available_mics: + if mic["name"] == microphone_name: + return mic + + available_mic_names = [mic["name"] for mic in available_mics] + logger.warning("Microphone \"{}\" not found. Choose one of {}" + .format(microphone_name, available_mic_names)) + + return None + + +def choose_mic(audio): + """ + Get a microphone to use. Firstly, tries to see if there's an application argument specifying the + microphone to use. If not, get the default microphone. + + :param audio: An instance of PyAudio to use. + :type audio: pyaudio.PyAudio + + :return: A dictionary from PyAudio containing information about the microphone to use, or None + if there is no microphone. + :rtype: dict | None + """ + chosen_mic = choose_mic_arguments(audio) + if chosen_mic: return chosen_mic + + return choose_mic_default(audio) diff --git a/test/common/microphone_utils.py b/test/common/microphone_utils.py index 7ecbf27..c82de37 100644 --- a/test/common/microphone_utils.py +++ b/test/common/microphone_utils.py @@ -1,8 +1,15 @@ +from __future__ import unicode_literals # So that we can format strings with Unicode characters import random import sys from StringIO import StringIO -from robot_interface.utils.microphone import choose_mic_default, choose_mic_interactive, get_microphones +from robot_interface.utils.microphone import ( + choose_mic_default, + choose_mic_interactive, + choose_mic_arguments, + choose_mic, + get_microphones, +) class MicrophoneUtils(object): @@ -93,3 +100,53 @@ class MicrophoneUtils(object): assert "index" in result assert isinstance(result["index"], (int, long)) assert result["index"] == microphones[random_index]["index"] + + def test_choose_mic_no_arguments(self, pyaudio_instance, mocker): + mocker.patch.object(sys, "argv", []) + + result = choose_mic_arguments(pyaudio_instance) + + assert result is None + + def test_choose_mic_arguments(self, pyaudio_instance, mocker): + for mic in get_microphones(pyaudio_instance): + mocker.patch.object(sys, "argv", ["--microphone", mic["name"]]) + + result = choose_mic_arguments(pyaudio_instance) + + assert result is not None + assert result == mic + + def test_choose_mic_arguments_eq(self, pyaudio_instance, mocker): + for mic in get_microphones(pyaudio_instance): + mocker.patch.object(sys, "argv", ["--microphone={}".format(mic["name"])]) + + result = choose_mic_arguments(pyaudio_instance) + + assert result is not None + assert result == mic + + def test_choose_mic_arguments_not_exits(self, pyaudio_instance, mocker): + mocker.patch.object(sys, "argv", ["--microphone", "Surely this microphone doesn't exist"]) + + result = choose_mic_arguments(pyaudio_instance) + + assert result is None + + def test_choose_mic_with_argument(self, pyaudio_instance, mocker): + mic = next(get_microphones(pyaudio_instance)) + mocker.patch.object(sys, "argv", ["--microphone", mic["name"]]) + + result = choose_mic(pyaudio_instance) + + assert result is not None + assert result == mic + + def test_choose_mic_no_argument(self, pyaudio_instance, mocker): + default_mic = choose_mic_default(pyaudio_instance) + mocker.patch.object(sys, "argv", []) + + result = choose_mic(pyaudio_instance) + + assert result is not None + assert result == default_mic diff --git a/test/unit/test_audio_sender.py b/test/unit/test_audio_sender.py index 4324cdb..fc21805 100644 --- a/test/unit/test_audio_sender.py +++ b/test/unit/test_audio_sender.py @@ -17,7 +17,7 @@ def zmq_context(): def test_no_microphone(zmq_context, mocker): mock_info_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger.info") - mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default") + mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic") mock_choose_mic.return_value = None sender = AudioSender(zmq_context) @@ -32,7 +32,7 @@ def test_no_microphone(zmq_context, mocker): def test_unicode_mic_name(zmq_context, mocker): mocker.patch("robot_interface.endpoints.audio_sender.threading") - mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default") + mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic") mock_choose_mic.return_value = {"name": u"• Some Unicode name"} sender = AudioSender(zmq_context) @@ -51,7 +51,7 @@ def _fake_read(num_frames): def test_sending_audio(mocker): - mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default") + mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic") mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") @@ -80,7 +80,7 @@ def _fake_read_error(num_frames): def test_break_microphone(mocker): - mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default") + mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic") mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")