17 Commits

Author SHA1 Message Date
Twirre Meulenbelt
ecf9d14a4e test: make audio sender tests pass 2026-02-09 15:51:35 +01:00
Twirre Meulenbelt
0fe5fcf8f8 feat: choose between Qi microphone and local microphone 2026-02-09 15:45:17 +01:00
abd6988d1e feat: multi-channel qi audio possible 2026-02-09 13:40:47 +01:00
Twirre Meulenbelt
31c76ecf84 fix: make QI audio sender working 2026-02-09 09:39:48 +01:00
6e2bedcd32 feat: (almost) qi audio sender 2026-02-04 18:38:40 +01:00
06e3dad25d Merge branch 'fix/send-video' into 'main'
fix: send video

See merge request ics/sp/2025/n25b/pepperplus-ri!28
2026-01-30 19:19:03 +00:00
Storm
fe8bad1f8c Merge branch 'main' into fix/send-video 2026-01-30 17:28:13 +01:00
5bb5d8a0cc Merge branch 'chore/copyright-all-files' into 'main'
chore: add copyright to all source files

See merge request ics/sp/2025/n25b/pepperplus-ri!29
2026-01-30 11:47:30 +00:00
Pim Hutting
ea208175de chore: add copyright to all source files 2026-01-29 15:57:22 +01:00
Storm
8333f2fc2a chore: removed numpy import 2026-01-29 13:09:25 +01:00
Storm
24c7fa216f test: 100% coverage
ref: N25B-393
2026-01-29 12:28:34 +01:00
Storm
56becd84ac test: fixed video_sender tests
ref: N25B-393
2026-01-29 12:16:48 +01:00
Storm
4a2cace1cf chore: changed socket option to set HWM to 3 (max 3 packets in queue 2026-01-29 12:02:28 +01:00
ad58b16559 Merge branch 'dev' into 'main'
Merge dev with main

See merge request ics/sp/2025/n25b/pepperplus-ri!27
2026-01-28 10:54:22 +00:00
fb0d7850cc Merge branch 'main' into dev 2026-01-28 11:53:23 +01:00
Storm
891ebf5e3f chore: changed video sending to work without cv2 2026-01-27 17:58:06 +01:00
aad2044b6e chore: add .gitignore 2025-09-27 17:58:12 +02:00
39 changed files with 625 additions and 98 deletions

View File

@@ -6,6 +6,9 @@
# The hostname or IP address of the Control Backend.
AGENT__CONTROL_BACKEND_HOST=localhost
# Whether to use Pepper's microphone when Pepper is connected.
AUDIO__USE_PEPPER_MICROPHONE=true
# Variables that are unlikely to be configured, you can probably ignore these:

View File

@@ -7,3 +7,5 @@ sphinx
sphinx_rtd_theme
pre-commit
python-dotenv
numpy<=1.16.6
enum34

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals
from robot_interface.utils.get_config import get_config
@@ -61,7 +68,7 @@ class VideoConfig(object):
):
self.camera_index = get_config(camera_index, "VIDEO__CAMERA_INDEX", 0, int)
self.resolution = get_config(resolution, "VIDEO__RESOLUTION", 2, int)
self.color_space = get_config(color_space, "VIDEO__COLOR_SPACE", 11, int)
self.color_space = get_config(color_space, "VIDEO__COLOR_SPACE", 13, int)
self.fps = get_config(fps, "VIDEO__FPS", 15, int)
self.stream_name = get_config(stream_name, "VIDEO__STREAM_NAME", "Pepper Video")
self.image_buffer = get_config(image_buffer, "VIDEO__IMAGE_BUFFER", 6, int)
@@ -71,6 +78,8 @@ class AudioConfig(object):
"""
Audio configuration constants.
:ivar use_pepper_microphone: Whether to use Pepper's microphone or not, defaults to True.
:vartype use_pepper_microphone: bool
:ivar sample_rate: Audio sampling rate in Hz, defaults to 16000.
:vartype sample_rate: int
:ivar chunk_size: Size of audio chunks to capture/process, defaults to 512.
@@ -78,7 +87,14 @@ class AudioConfig(object):
:ivar channels: Number of audio channels, defaults to 1.
:vartype channels: int
"""
def __init__(self, sample_rate=None, chunk_size=None, channels=None):
def __init__(
self,
use_pepper_microphone=None,
sample_rate=None,
chunk_size=None,
channels=None,
):
self.use_pepper_microphone = get_config(use_pepper_microphone, "AUDIO__USE_PEPPER_MICROPHONE", True, bool)
self.sample_rate = get_config(sample_rate, "AUDIO__SAMPLE_RATE", 16000, int)
self.chunk_size = get_config(chunk_size, "AUDIO__CHUNK_SIZE", 512, int)
self.channels = get_config(channels, "AUDIO__CHANNELS", 1, int)

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that we can log texts with Unicode characters
import logging
from threading import Thread

View File

@@ -1,7 +1,21 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that `logging` can use Unicode characters in names
import audioop
import enum
from abc import ABCMeta, abstractmethod
import threading
import logging
import Queue
import numpy as np
import pyaudio
import zmq
@@ -13,86 +27,219 @@ from robot_interface.core.config import settings
logger = logging.getLogger(__name__)
class AudioSender(SocketBase):
class AudioCapturer(object):
"""
Audio sender endpoint, responsible for sending microphone audio data.
Interface for audio capturers.
"""
__metaclass__ = ABCMeta
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
@abstractmethod
def setup(self):
raise NotImplementedError()
:param port: The port to use.
:type port: int
@abstractmethod
def stop(self):
raise NotImplementedError()
:ivar thread: Thread used for sending audio.
:vartype thread: threading.Thread | None
@abstractmethod
def generate_chunk(self):
raise NotImplementedError()
class SampleRate(enum.Enum):
"""
Sample rate to use in Hz.
"""
LOW = 16000
HIGH = 48000
class PepperMicrophone(enum.Enum):
"""
Which of Pepper's microphones to use. In our case, some of the mics were damages/didn't work
well, so we choose to only use the fron right. If you have a Pepper robot with all working mics,
you might wish to use all microphones, to improve overall audio quality.
"""
ALL = 0
LEFT = 1
RIGHT = 2
FRONT_LEFT = 3
FRONT_RIGHT = 4
class QiAudioCapturer(AudioCapturer):
# Some of this class' methods have docstrings as binary strings. Keep them that way, otherwise
# ``qi.Session.registerService`` will give RuntimeErrors.
def __init__(self, sample_rate=SampleRate.LOW, mic=PepperMicrophone.FRONT_RIGHT,
deinterleaved=0):
"""
:raises RuntimeError: If there is no Qi session available.
:raises ValueError: If the given arguments are not compatible.
"""
self.session = state.qi_session
if not self.session:
raise RuntimeError("Cannot capture from qi device, no qi session available.")
if sample_rate == SampleRate.HIGH and mic != PepperMicrophone.ALL:
raise RuntimeError("For 48000 Hz, you must select all microphones.")
if mic == PepperMicrophone.ALL and sample_rate != SampleRate.HIGH:
raise RuntimeError("For using all microphones, 48000 Hz is required.")
self.audio = self.session.service("ALAudioDevice")
self.service_name = "ZmqAudioStreamer"
self.sample_rate = sample_rate
self.mic = mic
self.deinterleaved = deinterleaved
self.overflow = np.empty(0, dtype=np.float32)
self.q = Queue.Queue()
self._rate_state = None
def setup(self):
b"""
:raises RuntimeError: If no Qi session is available or if the session is not compatible with audio streaming.
"""
assert self.session is not None
logger.info("Listening with Pepper's microphone.")
self.session.registerService(self.service_name, self)
self.audio.setClientPreferences(self.service_name, self.sample_rate.value, self.mic.value,
self.deinterleaved)
self.audio.subscribe(self.service_name)
def stop(self):
b"""
Stop the audio capturer.
"""
try:
self.audio.unsubscribe(self.service_name)
except:
pass
def generate_chunk(self):
try:
chunk = self.q.get(True, 0.1)
return chunk
except Queue.Empty:
return None
# Callback invoked by NAOqi
def processRemote(self, nbOfChannels, nbOfSamplesByChannel, timeStamp, inputBuffer):
raw_pcm = bytes(inputBuffer)
pcm_i16 = np.frombuffer(raw_pcm, dtype=np.int16)
# Make mono channel (if it was 4 channels)
pcm_i32_mono = self._make_mono(pcm_i16.astype(np.int32), nbOfChannels)
# Resample (if it was 48k)
pcm_i32_mono_16k, self._rate_state = audioop.ratecv(pcm_i32_mono.tobytes(), 4, 1,
self.sample_rate.value,
SampleRate.LOW.value, self._rate_state)
pcm_f32_mono_16k = (np.frombuffer(pcm_i32_mono_16k, dtype=np.int32).astype(np.float32) /
32768.0)
# Attach overflow
pcm_f32_mono_16k = np.append(self.overflow, pcm_f32_mono_16k)
for i in range(len(pcm_f32_mono_16k) // 512):
self.q.put_nowait(pcm_f32_mono_16k[i * 512 : (i + 1) * 512].tobytes())
self.overflow = pcm_f32_mono_16k[len(pcm_f32_mono_16k) // 512 * 512 :]
def _make_mono(self, frag, channels):
return frag.reshape(-1, channels).mean(axis=1, dtype=np.int32)
class StandaloneAudioCapturer(AudioCapturer):
"""
Audio capturer that uses a microphone from the local device, can be chosen with the
``--microphone`` program argument.
:ivar audio: PyAudio instance.
:vartype audio: pyaudio.PyAudio | None
:ivar microphone: Selected microphone information.
:vartype microphone: dict | None
:ivar stream: PyAudio stream instance. None until ``setup()`` is called, remaining None if setup
fails for any reason.
:vartype stream: pyaudio.Stream | None
"""
def __init__(self, zmq_context, port=settings.agent_settings.audio_sender_port):
super(AudioSender, self).__init__(str("audio")) # Convert future's unicode_literal to str
self.create_socket(zmq_context, zmq.PUB, port)
self.thread = None
def __init__(self):
self.stream = None
try:
self.audio = pyaudio.PyAudio()
self.microphone = choose_mic(self.audio)
except IOError as e:
logger.warning("PyAudio is not available.", exc_info=e)
logger.warning("PyAudio is not available. Won't be able to send audio.", exc_info=True)
self.audio = None
self.microphone = None
def start(self):
def setup(self):
"""
Start sending audio in a different thread.
Will not start if no microphone is available.
Setup audio stream. Will not if no microphone is available.
"""
if not self.microphone:
logger.info("Not listening: no microphone available.")
return
logger.info("Listening with microphone \"{}\".".format(self.microphone["name"]))
self.thread = threading.Thread(target=self._stream)
self.thread.start()
def wait_until_done(self):
"""
Wait until the audio thread is done.
Will block until `state.exit_event` is set. If the thread is not running, does nothing.
"""
if not self.thread: return
self.thread.join()
self.thread = None
def _stream(self):
"""
Internal method to continuously read audio from the microphone and send it over the socket.
"""
audio_settings = settings.audio_config
chunk = audio_settings.chunk_size # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD
# Docs say this only raises an error if neither `input` nor `output` is True
stream = self.audio.open(
self.stream = self.audio.open(
format=pyaudio.paFloat32,
channels=audio_settings.channels,
rate=audio_settings.sample_rate,
channels=settings.audio_config.channels,
rate=settings.audio_config.sample_rate,
input=True,
input_device_index=self.microphone["index"],
frames_per_buffer=chunk,
frames_per_buffer=settings.audio_config.chunk_size,
)
try:
while not state.exit_event.is_set():
data = stream.read(chunk)
if (state.is_speaking): continue # Do not send audio while the robot is speaking
self.socket.send(data)
except IOError as e:
logger.error("Stopped listening: failed to get audio from microphone.", exc_info=e)
finally:
stream.stop_stream()
stream.close()
def stop(self):
"""
Close the audio stream.
"""
if not self.stream: return
self.stream.stop_stream()
self.stream.close()
def generate_chunk(self):
"""
:return: Audio frames from the microphone of size ``settings.audio_config.chunk_size``.
:rtype: bytes.
:raises IOError: If reading from the audio stream fails.
"""
return self.stream.read(settings.audio_config.chunk_size)
class AudioSender(SocketBase):
def __init__(self, zmq_context, port=settings.agent_settings.audio_sender_port):
super(AudioSender, self).__init__(str("audio"))
self.create_socket(zmq_context, zmq.PUB, port)
self.thread = threading.Thread(target=self.stream)
self.capturer = self.choose_capturer()
def start(self):
self.capturer.setup()
self.thread.start()
def close(self):
self.capturer.stop()
super(AudioSender, self).close()
def stream(self):
while not state.exit_event.is_set():
chunk = self.capturer.generate_chunk()
if chunk is None or state.is_speaking:
continue
self.socket.send(chunk)
def choose_capturer(self):
if state.qi_session and settings.audio_config.use_pepper_microphone:
return QiAudioCapturer()
return StandaloneAudioCapturer()

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
class GestureTags:
tags = ["above", "affirmative", "afford", "agitated", "all", "allright", "alright", "any",
"assuage", "assuage", "attemper", "back", "bashful", "beg", "beseech", "blank",

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from abc import ABCMeta, abstractmethod
from robot_interface.endpoints.socket_base import SocketBase

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from abc import ABCMeta
import zmq

View File

@@ -1,7 +1,16 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import zmq
import threading
import logging
import struct
from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state
from robot_interface.core.config import settings
@@ -19,7 +28,7 @@ class VideoSender(SocketBase):
"""
def __init__(self, zmq_context, port=settings.agent_settings.video_sender_port):
super(VideoSender, self).__init__("video")
self.create_socket(zmq_context, zmq.PUB, port, [(zmq.CONFLATE,1)])
self.create_socket(zmq_context, zmq.PUB, port, [(zmq.SNDHWM,3)])
def start_video_rcv(self):
"""
@@ -52,10 +61,23 @@ class VideoSender(SocketBase):
:param vid_stream_name: The name of a camera subscription on the video service object vid_service
:type vid_stream_name: str
"""
while not state.exit_event.is_set():
try:
img = vid_service.getImageRemote(vid_stream_name)
#Possibly limit images sent if queuing issues arise
self.socket.send(img[settings.video_config.image_buffer])
except:
logging.warn("Failed to retrieve video image from robot.")
try:
while not state.exit_event.is_set():
try:
img = vid_service.getImageRemote(vid_stream_name)
if img is not None:
raw_data = img[6]
width = img[0]
height = img[1]
width_bytes = struct.pack('<I', width)
height_bytes = struct.pack('<I', height)
self.socket.send_multipart([width_bytes, height_bytes, raw_data])
except KeyboardInterrupt:
logging.info("Video receiving loop interrupted by user.")
except:
logging.warn("Failed to retrieve video image from robot.")
finally:
vid_service.unsubscribe(vid_stream_name)
logging.info("Unsubscribed from video stream.")

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
from robot_interface.endpoints.audio_sender import AudioSender

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
import signal
import threading

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import os
from dotenv import load_dotenv
@@ -9,6 +16,8 @@ def get_config(value, env, default, cast=None):
Small utility to get a configuration value, returns `value` if it is not None, else it will try to get the
environment variable cast with `cast`. If the environment variable is not set, it will return `default`.
Special handling for booleans, which are only true if the value of the variable is "true" or "yes", ignoring capitalization.
:param value: The value to check.
:type value: Any
:param env: The environment variable to check.
@@ -26,7 +35,14 @@ def get_config(value, env, default, cast=None):
env = os.environ.get(env, default)
if cast is None:
if cast is None or env is None:
return env
if cast == bool:
if isinstance(env, bool):
return env
if not isinstance(default, bool):
raise ValueError("Default value must be a boolean if the cast type is a boolean.")
return env.lower() == "true" or env.lower() == "yes"
return cast(env)

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that `print` can print Unicode characters in names
import logging
import sys

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
import sys
@@ -7,6 +14,20 @@ except ImportError:
qi = None
def _get_qi_url():
"""
Get the Qi URL from the command line arguments, or None if not given.
"""
if "--qi-url" in sys.argv:
return sys.argv[sys.argv.index("--qi-url") + 1]
for arg in sys.argv:
if arg.startswith("--qi-url="):
return arg[len("--qi-url="):]
return None
def get_qi_session():
"""
Create and return a Qi session if available.
@@ -18,12 +39,13 @@ def get_qi_session():
logging.info("Unable to import qi. Running in stand-alone mode.")
return None
if "--qi-url" not in sys.argv:
qi_url = _get_qi_url()
if qi_url is None:
logging.info("No Qi URL argument given. Running in stand-alone mode.")
return None
try:
app = qi.Application()
app = qi.Application(["--qi-url", qi_url, "--qi-listen-url", "tcp://0.0.0.0:0"])
app.start()
return app.session
except RuntimeError:

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import time

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that we can format strings with Unicode characters
import random
import sys

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from mock import patch, MagicMock
import pytest

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from mock import patch, mock
from robot_interface.core.config import Settings

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pyaudio
import pytest

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import sys
import mock

View File

@@ -1,4 +1,10 @@
# coding=utf-8
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import os
import mock
@@ -27,35 +33,33 @@ def test_no_microphone(zmq_context, mocker):
mock_info_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger.info")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = None
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
sender = AudioSender(zmq_context)
assert sender.microphone is None
assert sender.capturer.microphone is None
sender.start()
assert sender.thread is None
sender.capturer.setup()
mock_info_logger.assert_called()
sender.wait_until_done() # Should return early because we didn't start a thread
def test_unicode_mic_name(zmq_context, mocker):
"""
Tests the robustness of the `AudioSender` when handling microphone names
that contain Unicode characters.
"""
mocker.patch("robot_interface.endpoints.audio_sender.threading")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"• Some Unicode name"}
mock_choose_mic.return_value = {"name": u"• Some Unicode name", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
sender = AudioSender(zmq_context)
assert sender.microphone is not None
assert sender.capturer.microphone is not None
sender.capturer.audio.open = mock.Mock(return_value=mock.Mock())
# `.start()` logs the name of the microphone. It should not give an error if it contains Unicode
# `.setup()` logs the name of the microphone. It should not give an error if it contains Unicode
# symbols.
sender.start()
assert sender.thread is not None
sender.wait_until_done() # Should return instantly because we didn't start a real thread
sender.capturer.setup()
def _fake_read(num_frames):
@@ -73,6 +77,7 @@ def test_sending_audio(mocker):
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock()
@@ -85,11 +90,11 @@ def test_sending_audio(mocker):
sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket
sender.audio.open = mock.Mock()
sender.audio.open.return_value = stream
sender.capturer.audio.open = mock.Mock()
sender.capturer.audio.open.return_value = stream
sender.start()
sender.wait_until_done()
sender.thread.join()
send_socket.assert_called()
@@ -102,6 +107,7 @@ def test_no_sending_if_speaking(mocker):
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock()
@@ -115,11 +121,11 @@ def test_no_sending_if_speaking(mocker):
sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket
sender.audio.open = mock.Mock()
sender.audio.open.return_value = stream
sender.capturer.audio.open = mock.Mock()
sender.capturer.audio.open.return_value = stream
sender.start()
sender.wait_until_done()
sender.thread.join()
send_socket.assert_not_called()
@@ -139,6 +145,7 @@ def test_break_microphone(mocker):
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock()
@@ -150,11 +157,11 @@ def test_break_microphone(mocker):
sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket
sender.audio.open = mock.Mock()
sender.audio.open.return_value = stream
sender.capturer.audio.open = mock.Mock()
sender.capturer.audio.open.return_value = stream
sender.start()
sender.wait_until_done()
sender.thread.join()
send_socket.assert_not_called()
@@ -165,6 +172,8 @@ def test_pyaudio_init_failure(mocker, zmq_context):
"""
# Prevent binding the ZMQ socket
mocker.patch("robot_interface.endpoints.audio_sender.AudioSender.create_socket")
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
# Simulate PyAudio() failing
mocker.patch(
@@ -174,5 +183,5 @@ def test_pyaudio_init_failure(mocker, zmq_context):
sender = AudioSender(zmq_context)
assert sender.audio is None
assert sender.microphone is None
assert sender.capturer.audio is None
assert sender.capturer.microphone is None

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from robot_interface.utils.get_config import get_config
@@ -43,3 +50,58 @@ def test_get_config_casts_default_when_env_missing(monkeypatch):
result = get_config(None, "GET_CONFIG_MISSING", "42", int)
assert result == 42
def test_get_config_unset_boolean_default(monkeypatch):
"""
When the env var is a boolean, and it's not set, ensure it uses the default value.
"""
monkeypatch.delenv("SOME_BOOLEAN_VARIABLE", raising=False)
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == False
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == True
def test_get_config_true_boolean(monkeypatch):
"""
When the env var is a boolean, and its value is "true", "TRUE", "yes", etc., it should return true.
"""
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "TRUE")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "true")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "yes")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "YES")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "TrUE")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
def test_get_config_false_boolean(monkeypatch):
"""
When the env var is a boolean, and its value is not "true", "TRUE", "yes", etc., it should return False.
"""
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "FALSE")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == False
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "false")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == False
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "anything, tbh")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == False

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pytest
import threading
import zmq

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock
import pytest
import zmq

View File

@@ -1,4 +1,10 @@
# coding=utf-8
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock
import pytest

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import sys
# Import module under test
@@ -55,7 +62,7 @@ def test_get_qi_session_runtime_error(monkeypatch):
raise RuntimeError("boom")
class FakeQi:
Application = lambda self=None: FakeApp()
Application = lambda *args, **kwargs: FakeApp()
reload_qi_utils_with(FakeQi())
@@ -80,7 +87,7 @@ def test_get_qi_session_success(monkeypatch):
return True
class FakeQi:
Application = lambda self=None: FakeApp()
Application = lambda *args, **kwargs: FakeApp()
reload_qi_utils_with(FakeQi())

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pytest
from robot_interface.endpoints.receiver_base import ReceiverBase

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock
import zmq
from robot_interface.endpoints.socket_base import SocketBase

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import threading
import signal
import pytest

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import time
import mock

View File

@@ -1,5 +1,11 @@
# coding=utf-8
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import struct
import mock
import pytest
import zmq
@@ -48,8 +54,10 @@ def test_video_streaming(zmq_context, mocker):
# Pepper's image buffer lives at index 6
mocker.patch.object(settings.video_config, "image_buffer", 6)
test_width = 320
test_height = 240
mock_video_service = mock.Mock()
mock_video_service.getImageRemote.return_value = [None]*6 + ["fake_img"]
mock_video_service.getImageRemote.return_value = [test_width, test_height, None, None, None, None, b"fake_img"]
fake_session = mock.Mock()
fake_session.service.return_value = mock_video_service
@@ -63,12 +71,16 @@ def test_video_streaming(zmq_context, mocker):
sender = VideoSender(zmq_context)
send_socket = mock.Mock()
sender.socket.send = send_socket
sender.socket.send_multipart = send_socket
sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_called_with("fake_img")
send_socket.assert_called_with([
struct.pack('<I', 320),
struct.pack('<I', 240),
b"fake_img"
])
def test_video_receive_error(zmq_context, mocker):
@@ -91,9 +103,30 @@ def test_video_receive_error(zmq_context, mocker):
sender = VideoSender(zmq_context)
send_socket = mock.Mock()
sender.socket.send = send_socket
sender.socket.send_multipart = send_socket
sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_not_called()
def test_video_loop_keyboard_interrupt(zmq_context, mocker):
"""Video loop should handle KeyboardInterrupt gracefully and unsubscribe."""
_patch_basics(mocker)
_patch_exit_event(mocker)
# We mock the video service to raise KeyboardInterrupt when accessed
mock_video_service = mock.Mock()
mock_video_service.getImageRemote.side_effect = KeyboardInterrupt
# Mock logging to verify the specific interrupt message is logged
mock_logger = mocker.patch("robot_interface.endpoints.video_sender.logging")
sender = VideoSender(zmq_context)
# Execute the loop
sender.video_rcv_loop(mock_video_service, "stream_name")
# Verify the 'finally' block executed (unsubscribe)
mock_video_service.unsubscribe.assert_called_with("stream_name")
mock_logger.info.assert_any_call("Unsubscribed from video stream.")