feat: add application parameter to choose a custom microphone

ref: N25B-119
This commit is contained in:
Twirre Meulenbelt
2025-11-02 16:12:56 +01:00
parent 5912ac606a
commit fab5127cac
4 changed files with 122 additions and 8 deletions

View File

@@ -7,7 +7,7 @@ import zmq
from robot_interface.endpoints.socket_base import SocketBase from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state from robot_interface.state import state
from robot_interface.utils.microphone import choose_mic_default from robot_interface.utils.microphone import choose_mic
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -17,10 +17,16 @@ class AudioSender(SocketBase):
def __init__(self, zmq_context, port=5558): def __init__(self, zmq_context, port=5558):
super(AudioSender, self).__init__(str("audio")) # Convert future's unicode_literal to str super(AudioSender, self).__init__(str("audio")) # Convert future's unicode_literal to str
self.create_socket(zmq_context, zmq.PUB, port) self.create_socket(zmq_context, zmq.PUB, port)
self.audio = pyaudio.PyAudio()
self.microphone = choose_mic_default(self.audio)
self.thread = None self.thread = None
try:
self.audio = pyaudio.PyAudio()
self.microphone = choose_mic(self.audio)
except IOError as e:
logger.warning("PyAudio is not available.", exc_info=e)
self.audio = None
self.microphone = None
def start(self): def start(self):
""" """
Start sending audio in a different thread. Start sending audio in a different thread.

View File

@@ -1,5 +1,6 @@
from __future__ import unicode_literals # So that `print` can print Unicode characters in names from __future__ import unicode_literals # So that `print` can print Unicode characters in names
import logging import logging
import sys
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -67,3 +68,53 @@ def choose_mic_default(audio):
return audio.get_default_input_device_info() return audio.get_default_input_device_info()
except IOError: except IOError:
return None return None
def choose_mic_arguments(audio):
"""
Get a microphone to use from command line arguments.
:param audio: An instance of PyAudio to use.
:type audio: pyaudio.PyAudio
:return: A dictionary from PyAudio containing information about the microphone to use, or None
if there is no microphone satisfied by the arguments.
:rtype: dict | None
"""
microphone_name = None
for i, arg in enumerate(sys.argv):
if arg == "--microphone" and len(sys.argv) > i+1:
microphone_name = sys.argv[i+1].strip()
if arg.startswith("--microphone="):
microphone_name = arg[13:].strip()
if not microphone_name: return None
available_mics = list(get_microphones(audio))
for mic in available_mics:
if mic["name"] == microphone_name:
return mic
available_mic_names = [mic["name"] for mic in available_mics]
logger.warning("Microphone \"{}\" not found. Choose one of {}"
.format(microphone_name, available_mic_names))
return None
def choose_mic(audio):
"""
Get a microphone to use. Firstly, tries to see if there's an application argument specifying the
microphone to use. If not, get the default microphone.
:param audio: An instance of PyAudio to use.
:type audio: pyaudio.PyAudio
:return: A dictionary from PyAudio containing information about the microphone to use, or None
if there is no microphone.
:rtype: dict | None
"""
chosen_mic = choose_mic_arguments(audio)
if chosen_mic: return chosen_mic
return choose_mic_default(audio)

View File

@@ -1,8 +1,15 @@
from __future__ import unicode_literals # So that we can format strings with Unicode characters
import random import random
import sys import sys
from StringIO import StringIO from StringIO import StringIO
from robot_interface.utils.microphone import choose_mic_default, choose_mic_interactive, get_microphones from robot_interface.utils.microphone import (
choose_mic_default,
choose_mic_interactive,
choose_mic_arguments,
choose_mic,
get_microphones,
)
class MicrophoneUtils(object): class MicrophoneUtils(object):
@@ -93,3 +100,53 @@ class MicrophoneUtils(object):
assert "index" in result assert "index" in result
assert isinstance(result["index"], (int, long)) assert isinstance(result["index"], (int, long))
assert result["index"] == microphones[random_index]["index"] assert result["index"] == microphones[random_index]["index"]
def test_choose_mic_no_arguments(self, pyaudio_instance, mocker):
mocker.patch.object(sys, "argv", [])
result = choose_mic_arguments(pyaudio_instance)
assert result is None
def test_choose_mic_arguments(self, pyaudio_instance, mocker):
for mic in get_microphones(pyaudio_instance):
mocker.patch.object(sys, "argv", ["--microphone", mic["name"]])
result = choose_mic_arguments(pyaudio_instance)
assert result is not None
assert result == mic
def test_choose_mic_arguments_eq(self, pyaudio_instance, mocker):
for mic in get_microphones(pyaudio_instance):
mocker.patch.object(sys, "argv", ["--microphone={}".format(mic["name"])])
result = choose_mic_arguments(pyaudio_instance)
assert result is not None
assert result == mic
def test_choose_mic_arguments_not_exits(self, pyaudio_instance, mocker):
mocker.patch.object(sys, "argv", ["--microphone", "Surely this microphone doesn't exist"])
result = choose_mic_arguments(pyaudio_instance)
assert result is None
def test_choose_mic_with_argument(self, pyaudio_instance, mocker):
mic = next(get_microphones(pyaudio_instance))
mocker.patch.object(sys, "argv", ["--microphone", mic["name"]])
result = choose_mic(pyaudio_instance)
assert result is not None
assert result == mic
def test_choose_mic_no_argument(self, pyaudio_instance, mocker):
default_mic = choose_mic_default(pyaudio_instance)
mocker.patch.object(sys, "argv", [])
result = choose_mic(pyaudio_instance)
assert result is not None
assert result == default_mic

View File

@@ -17,7 +17,7 @@ def zmq_context():
def test_no_microphone(zmq_context, mocker): def test_no_microphone(zmq_context, mocker):
mock_info_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger.info") mock_info_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger.info")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default") mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = None mock_choose_mic.return_value = None
sender = AudioSender(zmq_context) sender = AudioSender(zmq_context)
@@ -32,7 +32,7 @@ def test_no_microphone(zmq_context, mocker):
def test_unicode_mic_name(zmq_context, mocker): def test_unicode_mic_name(zmq_context, mocker):
mocker.patch("robot_interface.endpoints.audio_sender.threading") mocker.patch("robot_interface.endpoints.audio_sender.threading")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default") mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"• Some Unicode name"} mock_choose_mic.return_value = {"name": u"• Some Unicode name"}
sender = AudioSender(zmq_context) sender = AudioSender(zmq_context)
@@ -51,7 +51,7 @@ def _fake_read(num_frames):
def test_sending_audio(mocker): def test_sending_audio(mocker):
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default") mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
@@ -80,7 +80,7 @@ def _fake_read_error(num_frames):
def test_break_microphone(mocker): def test_break_microphone(mocker):
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default") mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")