24 Commits

Author SHA1 Message Date
Twirre Meulenbelt
ecf9d14a4e test: make audio sender tests pass 2026-02-09 15:51:35 +01:00
Twirre Meulenbelt
0fe5fcf8f8 feat: choose between Qi microphone and local microphone 2026-02-09 15:45:17 +01:00
abd6988d1e feat: multi-channel qi audio possible 2026-02-09 13:40:47 +01:00
Twirre Meulenbelt
31c76ecf84 fix: make QI audio sender working 2026-02-09 09:39:48 +01:00
6e2bedcd32 feat: (almost) qi audio sender 2026-02-04 18:38:40 +01:00
06e3dad25d Merge branch 'fix/send-video' into 'main'
fix: send video

See merge request ics/sp/2025/n25b/pepperplus-ri!28
2026-01-30 19:19:03 +00:00
Storm
fe8bad1f8c Merge branch 'main' into fix/send-video 2026-01-30 17:28:13 +01:00
5bb5d8a0cc Merge branch 'chore/copyright-all-files' into 'main'
chore: add copyright to all source files

See merge request ics/sp/2025/n25b/pepperplus-ri!29
2026-01-30 11:47:30 +00:00
Pim Hutting
ea208175de chore: add copyright to all source files 2026-01-29 15:57:22 +01:00
Storm
8333f2fc2a chore: removed numpy import 2026-01-29 13:09:25 +01:00
Storm
24c7fa216f test: 100% coverage
ref: N25B-393
2026-01-29 12:28:34 +01:00
Storm
56becd84ac test: fixed video_sender tests
ref: N25B-393
2026-01-29 12:16:48 +01:00
Storm
4a2cace1cf chore: changed socket option to set HWM to 3 (max 3 packets in queue 2026-01-29 12:02:28 +01:00
ad58b16559 Merge branch 'dev' into 'main'
Merge dev with main

See merge request ics/sp/2025/n25b/pepperplus-ri!27
2026-01-28 10:54:22 +00:00
fb0d7850cc Merge branch 'main' into dev 2026-01-28 11:53:23 +01:00
Storm
891ebf5e3f chore: changed video sending to work without cv2 2026-01-27 17:58:06 +01:00
Pim Hutting
da97eb8a1a Merge branch 'feat/robot-speech-agent-force-speech' into 'dev'
feat: implemented forced speech and speech queue

See merge request ics/sp/2025/n25b/pepperplus-ri!23
2026-01-14 14:26:39 +00:00
Luijkx,S.O.H. (Storm)
e51cf8fe65 feat: implemented forced speech and speech queue 2026-01-14 14:26:38 +00:00
Twirre
1e77548622 Merge branch 'feat/ri-gestures' into 'dev'
feat: gestures to ri

See merge request ics/sp/2025/n25b/pepperplus-ri!21
2025-12-16 08:35:26 +00:00
JobvAlewijk
a8fe887c48 feat: gestures to ri 2025-12-16 08:35:26 +00:00
JobvAlewijk
df702f1e44 Merge branch 'feat/environment-variables' into 'dev'
Add environment variables

See merge request ics/sp/2025/n25b/pepperplus-ri!22
2025-12-13 13:43:52 +00:00
JobvAlewijk
a2cb2ae90a Merge branch 'dev' of ssh://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-ri into feat/environment-variables 2025-12-13 14:43:02 +01:00
Twirre Meulenbelt
3a259c1170 feat: add environment variables and docs
ref: N25B-352
2025-12-10 13:28:13 +01:00
aad2044b6e chore: add .gitignore 2025-09-27 17:58:12 +02:00
40 changed files with 1501 additions and 150 deletions

28
.env.example Normal file
View File

@@ -0,0 +1,28 @@
# Example .env file. To use, make a copy, call it ".env" (i.e. removing the ".example" suffix), then you edit values.
# To make a variable apply, uncomment it (remove the "#" in front of the line).
# First, some variables that are likely to be configured:
# The hostname or IP address of the Control Backend.
AGENT__CONTROL_BACKEND_HOST=localhost
# Whether to use Pepper's microphone when Pepper is connected.
AUDIO__USE_PEPPER_MICROPHONE=true
# Variables that are unlikely to be configured, you can probably ignore these:
#AGENT__ACTUATION_RECEIVER_PORT=
#AGENT__MAIN_RECEIVER_PORT=
#AGENT__VIDEO_SENDER_PORT=
#AGENT__AUDIO_SENDER_PORT=
#VIDEO__CAMERA_INDEX=
#VIDEO__RESOLUTION=
#VIDEO__COLOR_SPACE=
#VIDEO__FPS=
#VIDEO__STREAM_NAME=
#VIDEO__IMAGE_BUFFER=
#AUDIO__SAMPLE_RATE=
#AUDIO__CHUNK_SIZE=
#AUDIO__CHANNELS=

View File

@@ -39,10 +39,15 @@ On Windows:
$env:PYTHONPATH="src"; python -m robot_interface.main
```
With both, if you want to connect to the actual robot (or simulator), pass the `--qi-url` argument.
### Program Arguments
If you want to connect to the actual robot (or simulator), pass the `--qi-url` argument.
There's also a `--microphone` argument that can be used to choose a microphone to use. If not given, the program will try the default microphone. If you don't know the name of the microphone, pass the argument with any value, and it will list the names of available microphones.
### Environment Variables
You may use environment variables to change settings. Make a copy of the [`.env.example`](.env.example) file, name it `.env` and put it in the root directory. The file itself describes how to do the configuration.
## Testing

View File

@@ -6,3 +6,6 @@ pytest-cov<3.0.0
sphinx
sphinx_rtd_theme
pre-commit
python-dotenv
numpy<=1.16.6
enum34

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,95 +1,117 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals
from robot_interface.utils.get_config import get_config
class AgentSettings(object):
"""
Agent port configuration.
:ivar actuating_receiver_port: Port for receiving actuation commands.
:vartype actuating_receiver_port: int
:ivar main_receiver_port: Port for receiving main messages.
:ivar control_backend_host: Hostname of the control backend, defaults to "localhost".
:vartype control_backend_host: string
:ivar actuation_receiver_port: Port for receiving actuation commands, defaults to 5557.
:vartype actuation_receiver_port: int
:ivar main_receiver_port: Port for receiving main messages, defaults to 5555.
:vartype main_receiver_port: int
:ivar video_sender_port: Port used for sending video frames.
:ivar video_sender_port: Port used for sending video frames, defaults to 5556.
:vartype video_sender_port: int
:ivar audio_sender_port: Port used for sending audio data.
:ivar audio_sender_port: Port used for sending audio data, defaults to 5558.
:vartype audio_sender_port: int
"""
def __init__(
self,
actuating_receiver_port=5557,
main_receiver_port=5555,
video_sender_port=5556,
audio_sender_port=5558,
self,
control_backend_host=None,
actuation_receiver_port=None,
main_receiver_port=None,
video_sender_port=None,
audio_sender_port=None,
):
self.actuating_receiver_port = actuating_receiver_port
self.main_receiver_port = main_receiver_port
self.video_sender_port = video_sender_port
self.audio_sender_port = audio_sender_port
self.control_backend_host = get_config(control_backend_host, "AGENT__CONTROL_BACKEND_HOST", "localhost")
self.actuation_receiver_port = get_config(actuation_receiver_port, "AGENT__ACTUATION_RECEIVER_PORT", 5557, int)
self.main_receiver_port = get_config(main_receiver_port, "AGENT__MAIN_RECEIVER_PORT", 5555, int)
self.video_sender_port = get_config(video_sender_port, "AGENT__VIDEO_SENDER_PORT", 5556, int)
self.audio_sender_port = get_config(audio_sender_port, "AGENT__AUDIO_SENDER_PORT", 5558, int)
class VideoConfig(object):
"""
Video configuration constants.
:ivar camera_index: Index of the camera used.
:ivar camera_index: Index of the camera used, defaults to 0.
:vartype camera_index: int
:ivar resolution: Video resolution mode.
:ivar resolution: Video resolution mode, defaults to 2.
:vartype resolution: int
:ivar color_space: Color space identifier.
:ivar color_space: Color space identifier, defaults to 11.
:vartype color_space: int
:ivar fps: Frames per second of the video stream.
:ivar fps: Frames per second of the video stream, defaults to 15.
:vartype fps: int
:ivar stream_name: Name of the video stream.
:ivar stream_name: Name of the video stream, defaults to "Pepper Video".
:vartype stream_name: str
:ivar image_buffer: Internal buffer size for video frames.
:ivar image_buffer: Internal buffer size for video frames, defaults to 6.
:vartype image_buffer: int
"""
def __init__(
self,
camera_index=0,
resolution=2,
color_space=11,
fps=15,
stream_name="Pepper Video",
image_buffer=6,
camera_index=None,
resolution=None,
color_space=None,
fps=None,
stream_name=None,
image_buffer=None,
):
self.camera_index = camera_index
self.resolution = resolution
self.color_space = color_space
self.fps = fps
self.stream_name = stream_name
self.image_buffer = image_buffer
self.camera_index = get_config(camera_index, "VIDEO__CAMERA_INDEX", 0, int)
self.resolution = get_config(resolution, "VIDEO__RESOLUTION", 2, int)
self.color_space = get_config(color_space, "VIDEO__COLOR_SPACE", 13, int)
self.fps = get_config(fps, "VIDEO__FPS", 15, int)
self.stream_name = get_config(stream_name, "VIDEO__STREAM_NAME", "Pepper Video")
self.image_buffer = get_config(image_buffer, "VIDEO__IMAGE_BUFFER", 6, int)
class AudioConfig(object):
"""
Audio configuration constants.
:ivar sample_rate: Audio sampling rate in Hz.
:ivar use_pepper_microphone: Whether to use Pepper's microphone or not, defaults to True.
:vartype use_pepper_microphone: bool
:ivar sample_rate: Audio sampling rate in Hz, defaults to 16000.
:vartype sample_rate: int
:ivar chunk_size: Size of audio chunks to capture/process.
:ivar chunk_size: Size of audio chunks to capture/process, defaults to 512.
:vartype chunk_size: int
:ivar channels: Number of audio channels.
:ivar channels: Number of audio channels, defaults to 1.
:vartype channels: int
"""
def __init__(self, sample_rate=16000, chunk_size=512, channels=1):
self.sample_rate = sample_rate
self.chunk_size = chunk_size
self.channels = channels
def __init__(
self,
use_pepper_microphone=None,
sample_rate=None,
chunk_size=None,
channels=None,
):
self.use_pepper_microphone = get_config(use_pepper_microphone, "AUDIO__USE_PEPPER_MICROPHONE", True, bool)
self.sample_rate = get_config(sample_rate, "AUDIO__SAMPLE_RATE", 16000, int)
self.chunk_size = get_config(chunk_size, "AUDIO__CHUNK_SIZE", 512, int)
self.channels = get_config(channels, "AUDIO__CHANNELS", 1, int)
class MainConfig(object):
"""
Main system configuration.
:ivar poll_timeout_ms: Timeout for polling events, in milliseconds.
:ivar poll_timeout_ms: Timeout for polling events, in milliseconds, defaults to 100.
:vartype poll_timeout_ms: int
:ivar max_handler_time_ms: Maximum allowed handler time, in milliseconds.
:ivar max_handler_time_ms: Maximum allowed handler time, in milliseconds, defaults to 50.
:vartype max_handler_time_ms: int
"""
def __init__(self, poll_timeout_ms=100, max_handler_time_ms=50):
self.poll_timeout_ms = poll_timeout_ms
self.max_handler_time_ms = max_handler_time_ms
def __init__(self, poll_timeout_ms=None, max_handler_time_ms=None):
self.poll_timeout_ms = get_config(poll_timeout_ms, "MAIN__POLL_TIMEOUT_MS", 100, int)
self.max_handler_time_ms = get_config(max_handler_time_ms, "MAIN__MAX_HANDLER_TIME_MS", 50, int)
class Settings(object):

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,6 +1,12 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that we can log texts with Unicode characters
import logging
import time
from threading import Thread
import Queue
@@ -8,8 +14,8 @@ import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase
from robot_interface.state import state
from robot_interface.core.config import settings
from robot_interface.endpoints.gesture_settings import GestureTags
class ActuationReceiver(ReceiverBase):
@@ -24,12 +30,16 @@ class ActuationReceiver(ReceiverBase):
:ivar _tts_service: The text-to-speech service object from the Qi session.
:vartype _tts_service: qi.Session | None
:ivar _animation_service: The animation/gesture service object from the Qi session.
:vartype _animation_service: qi.Session | None
"""
def __init__(self, zmq_context, port=settings.agent_settings.actuating_receiver_port):
def __init__(self, zmq_context, port=settings.agent_settings.actuation_receiver_port):
super(ActuationReceiver, self).__init__("actuation")
self.create_socket(zmq_context, zmq.SUB, port)
self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options
self._tts_service = None
self._animation_service = None
self._message_queue = Queue.Queue()
self.message_thread = Thread(target=self._handle_messages)
self.message_thread.start()
@@ -59,7 +69,7 @@ class ActuationReceiver(ReceiverBase):
if not self._tts_service:
self._tts_service = state.qi_session.service("ALTextToSpeech")
if (message.get("is_priority")):
if message.get("is_priority"):
# Bypass queue and speak immediately
self.clear_queue()
self._message_queue.put(text)
@@ -71,6 +81,7 @@ class ActuationReceiver(ReceiverBase):
"""
Safely drains all pending messages from the queue.
"""
logging.info("Message queue size: {}".format(self._message_queue.qsize()))
try:
while True:
# Remove items one by one without waiting
@@ -79,6 +90,53 @@ class ActuationReceiver(ReceiverBase):
pass
logging.info("Message queue cleared.")
def _handle_gesture(self, message, is_single):
"""
Handle a gesture actuation request.
:param message: The gesture to do, must contain properties "endpoint" and "data".
:type message: dict
:param is_single: Whether it's a specific single gesture or a gesture tag.
:type is_single: bool
"""
gesture = message.get("data")
if not gesture:
logging.warn("Received gesture to do, but it lacks data.")
return
if not isinstance(gesture, (str, unicode)):
logging.warn("Received gesture to do but it is not a string.")
return
logging.debug("Received gesture to do: {}".format(gesture))
if is_single:
if gesture not in GestureTags.single_gestures:
logging.warn("Received single gesture to do, but it does not exist in settings")
return
else:
if gesture not in GestureTags.tags:
logging.warn("Received single tag to do, but it does not exist in settings")
return
if not state.qi_session: return
# If state has a qi_session, we know that we can import qi
import qi # Takes a while only the first time it's imported
if not self._animation_service:
self._animation_service = state.qi_session.service("ALAnimationPlayer")
# Play the gesture. Pepper comes with predefined animations like "Wave", "Greet", "Clap"
# You can also create custom animations using Choregraphe and upload them to the robot.
if is_single:
logging.debug("Playing single gesture: {}".format(gesture))
getattr(qi, "async")(self._animation_service.run, gesture)
else:
logging.debug("Playing tag gesture: {}".format(gesture))
getattr(qi, "async")(self._animation_service.runTag, gesture)
def handle_message(self, message):
"""
Handle an actuation/speech message with the receiver.
@@ -88,18 +146,30 @@ class ActuationReceiver(ReceiverBase):
"""
if message["endpoint"] == "actuate/speech":
self._handle_speech(message)
if message["endpoint"] == "actuate/gesture/tag":
self._handle_gesture(message, False)
if message["endpoint"] == "actuate/gesture/single":
self._handle_gesture(message, True)
def _handle_messages(self):
while not state.exit_event.is_set():
try:
text = self._message_queue.get(timeout=0.1)
if not state.is_speaking: print("Started speaking.")
state.is_speaking = True
self._tts_service.say(text)
except Queue.Empty:
if state.is_speaking: print("Finished speaking.")
state.is_speaking = False
except RuntimeError:
logging.warn("Lost connection to Pepper. Please check if you're connected to the local WiFi and restart this application.")
state.exit_event.set()
logging.error("Lost connection to Pepper. Please check if you're connected to the "
"local WiFi and restart this application.")
state.exit_event.set()
def endpoint_description(self):
"""
Extend the default endpoint description with gesture tags.
Returned during negotiate/ports so the CB knows available gestures.
"""
desc = super(ActuationReceiver, self).endpoint_description()
desc["gestures"] = GestureTags.tags
desc["single_gestures"] = GestureTags.single_gestures
return desc

View File

@@ -1,7 +1,21 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that `logging` can use Unicode characters in names
import audioop
import enum
from abc import ABCMeta, abstractmethod
import threading
import logging
import Queue
import numpy as np
import pyaudio
import zmq
@@ -13,85 +27,219 @@ from robot_interface.core.config import settings
logger = logging.getLogger(__name__)
class AudioSender(SocketBase):
class AudioCapturer(object):
"""
Audio sender endpoint, responsible for sending microphone audio data.
Interface for audio capturers.
"""
__metaclass__ = ABCMeta
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
@abstractmethod
def setup(self):
raise NotImplementedError()
:param port: The port to use.
:type port: int
@abstractmethod
def stop(self):
raise NotImplementedError()
:ivar thread: Thread used for sending audio.
:vartype thread: threading.Thread | None
@abstractmethod
def generate_chunk(self):
raise NotImplementedError()
class SampleRate(enum.Enum):
"""
Sample rate to use in Hz.
"""
LOW = 16000
HIGH = 48000
class PepperMicrophone(enum.Enum):
"""
Which of Pepper's microphones to use. In our case, some of the mics were damages/didn't work
well, so we choose to only use the fron right. If you have a Pepper robot with all working mics,
you might wish to use all microphones, to improve overall audio quality.
"""
ALL = 0
LEFT = 1
RIGHT = 2
FRONT_LEFT = 3
FRONT_RIGHT = 4
class QiAudioCapturer(AudioCapturer):
# Some of this class' methods have docstrings as binary strings. Keep them that way, otherwise
# ``qi.Session.registerService`` will give RuntimeErrors.
def __init__(self, sample_rate=SampleRate.LOW, mic=PepperMicrophone.FRONT_RIGHT,
deinterleaved=0):
"""
:raises RuntimeError: If there is no Qi session available.
:raises ValueError: If the given arguments are not compatible.
"""
self.session = state.qi_session
if not self.session:
raise RuntimeError("Cannot capture from qi device, no qi session available.")
if sample_rate == SampleRate.HIGH and mic != PepperMicrophone.ALL:
raise RuntimeError("For 48000 Hz, you must select all microphones.")
if mic == PepperMicrophone.ALL and sample_rate != SampleRate.HIGH:
raise RuntimeError("For using all microphones, 48000 Hz is required.")
self.audio = self.session.service("ALAudioDevice")
self.service_name = "ZmqAudioStreamer"
self.sample_rate = sample_rate
self.mic = mic
self.deinterleaved = deinterleaved
self.overflow = np.empty(0, dtype=np.float32)
self.q = Queue.Queue()
self._rate_state = None
def setup(self):
b"""
:raises RuntimeError: If no Qi session is available or if the session is not compatible with audio streaming.
"""
assert self.session is not None
logger.info("Listening with Pepper's microphone.")
self.session.registerService(self.service_name, self)
self.audio.setClientPreferences(self.service_name, self.sample_rate.value, self.mic.value,
self.deinterleaved)
self.audio.subscribe(self.service_name)
def stop(self):
b"""
Stop the audio capturer.
"""
try:
self.audio.unsubscribe(self.service_name)
except:
pass
def generate_chunk(self):
try:
chunk = self.q.get(True, 0.1)
return chunk
except Queue.Empty:
return None
# Callback invoked by NAOqi
def processRemote(self, nbOfChannels, nbOfSamplesByChannel, timeStamp, inputBuffer):
raw_pcm = bytes(inputBuffer)
pcm_i16 = np.frombuffer(raw_pcm, dtype=np.int16)
# Make mono channel (if it was 4 channels)
pcm_i32_mono = self._make_mono(pcm_i16.astype(np.int32), nbOfChannels)
# Resample (if it was 48k)
pcm_i32_mono_16k, self._rate_state = audioop.ratecv(pcm_i32_mono.tobytes(), 4, 1,
self.sample_rate.value,
SampleRate.LOW.value, self._rate_state)
pcm_f32_mono_16k = (np.frombuffer(pcm_i32_mono_16k, dtype=np.int32).astype(np.float32) /
32768.0)
# Attach overflow
pcm_f32_mono_16k = np.append(self.overflow, pcm_f32_mono_16k)
for i in range(len(pcm_f32_mono_16k) // 512):
self.q.put_nowait(pcm_f32_mono_16k[i * 512 : (i + 1) * 512].tobytes())
self.overflow = pcm_f32_mono_16k[len(pcm_f32_mono_16k) // 512 * 512 :]
def _make_mono(self, frag, channels):
return frag.reshape(-1, channels).mean(axis=1, dtype=np.int32)
class StandaloneAudioCapturer(AudioCapturer):
"""
Audio capturer that uses a microphone from the local device, can be chosen with the
``--microphone`` program argument.
:ivar audio: PyAudio instance.
:vartype audio: pyaudio.PyAudio | None
:ivar microphone: Selected microphone information.
:vartype microphone: dict | None
:ivar stream: PyAudio stream instance. None until ``setup()`` is called, remaining None if setup
fails for any reason.
:vartype stream: pyaudio.Stream | None
"""
def __init__(self, zmq_context, port=settings.agent_settings.audio_sender_port):
super(AudioSender, self).__init__(str("audio")) # Convert future's unicode_literal to str
self.create_socket(zmq_context, zmq.PUB, port)
self.thread = None
def __init__(self):
self.stream = None
try:
self.audio = pyaudio.PyAudio()
self.microphone = choose_mic(self.audio)
except IOError as e:
logger.warning("PyAudio is not available.", exc_info=e)
logger.warning("PyAudio is not available. Won't be able to send audio.", exc_info=True)
self.audio = None
self.microphone = None
def start(self):
def setup(self):
"""
Start sending audio in a different thread.
Will not start if no microphone is available.
Setup audio stream. Will not if no microphone is available.
"""
if not self.microphone:
logger.info("Not listening: no microphone available.")
return
logger.info("Listening with microphone \"{}\".".format(self.microphone["name"]))
self.thread = threading.Thread(target=self._stream)
self.thread.start()
def wait_until_done(self):
"""
Wait until the audio thread is done.
Will block until `state.exit_event` is set. If the thread is not running, does nothing.
"""
if not self.thread: return
self.thread.join()
self.thread = None
def _stream(self):
"""
Internal method to continuously read audio from the microphone and send it over the socket.
"""
audio_settings = settings.audio_config
chunk = audio_settings.chunk_size # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD
# Docs say this only raises an error if neither `input` nor `output` is True
stream = self.audio.open(
self.stream = self.audio.open(
format=pyaudio.paFloat32,
channels=audio_settings.channels,
rate=audio_settings.sample_rate,
channels=settings.audio_config.channels,
rate=settings.audio_config.sample_rate,
input=True,
input_device_index=self.microphone["index"],
frames_per_buffer=chunk,
frames_per_buffer=settings.audio_config.chunk_size,
)
try:
while not state.exit_event.is_set():
data = stream.read(chunk)
self.socket.send(data)
except IOError as e:
logger.error("Stopped listening: failed to get audio from microphone.", exc_info=e)
finally:
stream.stop_stream()
stream.close()
def stop(self):
"""
Close the audio stream.
"""
if not self.stream: return
self.stream.stop_stream()
self.stream.close()
def generate_chunk(self):
"""
:return: Audio frames from the microphone of size ``settings.audio_config.chunk_size``.
:rtype: bytes.
:raises IOError: If reading from the audio stream fails.
"""
return self.stream.read(settings.audio_config.chunk_size)
class AudioSender(SocketBase):
def __init__(self, zmq_context, port=settings.agent_settings.audio_sender_port):
super(AudioSender, self).__init__(str("audio"))
self.create_socket(zmq_context, zmq.PUB, port)
self.thread = threading.Thread(target=self.stream)
self.capturer = self.choose_capturer()
def start(self):
self.capturer.setup()
self.thread.start()
def close(self):
self.capturer.stop()
super(AudioSender, self).close()
def stream(self):
while not state.exit_event.is_set():
chunk = self.capturer.generate_chunk()
if chunk is None or state.is_speaking:
continue
self.socket.send(chunk)
def choose_capturer(self):
if state.qi_session and settings.audio_config.use_pepper_microphone:
return QiAudioCapturer()
return StandaloneAudioCapturer()

View File

@@ -0,0 +1,419 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
class GestureTags:
tags = ["above", "affirmative", "afford", "agitated", "all", "allright", "alright", "any",
"assuage", "assuage", "attemper", "back", "bashful", "beg", "beseech", "blank",
"body language", "bored", "bow", "but", "call", "calm", "choose", "choice", "cloud",
"cogitate", "cool", "crazy", "disappointed", "down", "earth", "empty", "embarrassed",
"enthusiastic", "entire", "estimate", "except", "exalted", "excited", "explain", "far",
"field", "floor", "forlorn", "friendly", "front", "frustrated", "gentle", "gift",
"give", "ground", "happy", "hello", "her", "here", "hey", "hi", "him", "hopeless",
"hysterical", "I", "implore", "indicate", "joyful", "me", "meditate", "modest",
"negative", "nervous", "no", "not know", "nothing", "offer", "ok", "once upon a time",
"oppose", "or", "pacify", "pick", "placate", "please", "present", "proffer", "quiet",
"reason", "refute", "reject", "rousing", "sad", "select", "shamefaced", "show",
"show sky", "sky", "soothe", "sun", "supplicate", "tablet", "tall", "them", "there",
"think", "timid", "top", "unless", "up", "upstairs", "void", "warm", "winner", "yeah",
"yes", "yoo-hoo", "you", "your", "zero", "zestful"]
single_gestures = [
"animations/Stand/BodyTalk/Listening/Listening_1",
"animations/Stand/BodyTalk/Listening/Listening_2",
"animations/Stand/BodyTalk/Listening/Listening_3",
"animations/Stand/BodyTalk/Listening/Listening_4",
"animations/Stand/BodyTalk/Listening/Listening_5",
"animations/Stand/BodyTalk/Listening/Listening_6",
"animations/Stand/BodyTalk/Listening/Listening_7",
"animations/Stand/BodyTalk/Speaking/BodyTalk_1",
"animations/Stand/BodyTalk/Speaking/BodyTalk_10",
"animations/Stand/BodyTalk/Speaking/BodyTalk_11",
"animations/Stand/BodyTalk/Speaking/BodyTalk_12",
"animations/Stand/BodyTalk/Speaking/BodyTalk_13",
"animations/Stand/BodyTalk/Speaking/BodyTalk_14",
"animations/Stand/BodyTalk/Speaking/BodyTalk_15",
"animations/Stand/BodyTalk/Speaking/BodyTalk_16",
"animations/Stand/BodyTalk/Speaking/BodyTalk_2",
"animations/Stand/BodyTalk/Speaking/BodyTalk_3",
"animations/Stand/BodyTalk/Speaking/BodyTalk_4",
"animations/Stand/BodyTalk/Speaking/BodyTalk_5",
"animations/Stand/BodyTalk/Speaking/BodyTalk_6",
"animations/Stand/BodyTalk/Speaking/BodyTalk_7",
"animations/Stand/BodyTalk/Speaking/BodyTalk_8",
"animations/Stand/BodyTalk/Speaking/BodyTalk_9",
"animations/Stand/BodyTalk/Thinking/Remember_1",
"animations/Stand/BodyTalk/Thinking/Remember_2",
"animations/Stand/BodyTalk/Thinking/Remember_3",
"animations/Stand/BodyTalk/Thinking/ThinkingLoop_1",
"animations/Stand/BodyTalk/Thinking/ThinkingLoop_2",
"animations/Stand/Emotions/Negative/Angry_1",
"animations/Stand/Emotions/Negative/Angry_2",
"animations/Stand/Emotions/Negative/Angry_3",
"animations/Stand/Emotions/Negative/Angry_4",
"animations/Stand/Emotions/Negative/Anxious_1",
"animations/Stand/Emotions/Negative/Bored_1",
"animations/Stand/Emotions/Negative/Bored_2",
"animations/Stand/Emotions/Negative/Disappointed_1",
"animations/Stand/Emotions/Negative/Exhausted_1",
"animations/Stand/Emotions/Negative/Exhausted_2",
"animations/Stand/Emotions/Negative/Fear_1",
"animations/Stand/Emotions/Negative/Fear_2",
"animations/Stand/Emotions/Negative/Fearful_1",
"animations/Stand/Emotions/Negative/Frustrated_1",
"animations/Stand/Emotions/Negative/Humiliated_1",
"animations/Stand/Emotions/Negative/Hurt_1",
"animations/Stand/Emotions/Negative/Hurt_2",
"animations/Stand/Emotions/Negative/Late_1",
"animations/Stand/Emotions/Negative/Sad_1",
"animations/Stand/Emotions/Negative/Sad_2",
"animations/Stand/Emotions/Negative/Shocked_1",
"animations/Stand/Emotions/Negative/Sorry_1",
"animations/Stand/Emotions/Negative/Surprise_1",
"animations/Stand/Emotions/Negative/Surprise_2",
"animations/Stand/Emotions/Negative/Surprise_3",
"animations/Stand/Emotions/Neutral/Alienated_1",
"animations/Stand/Emotions/Neutral/AskForAttention_1",
"animations/Stand/Emotions/Neutral/AskForAttention_2",
"animations/Stand/Emotions/Neutral/AskForAttention_3",
"animations/Stand/Emotions/Neutral/Cautious_1",
"animations/Stand/Emotions/Neutral/Confused_1",
"animations/Stand/Emotions/Neutral/Determined_1",
"animations/Stand/Emotions/Neutral/Embarrassed_1",
"animations/Stand/Emotions/Neutral/Hesitation_1",
"animations/Stand/Emotions/Neutral/Innocent_1",
"animations/Stand/Emotions/Neutral/Lonely_1",
"animations/Stand/Emotions/Neutral/Mischievous_1",
"animations/Stand/Emotions/Neutral/Puzzled_1",
"animations/Stand/Emotions/Neutral/Sneeze",
"animations/Stand/Emotions/Neutral/Stubborn_1",
"animations/Stand/Emotions/Neutral/Suspicious_1",
"animations/Stand/Emotions/Positive/Amused_1",
"animations/Stand/Emotions/Positive/Confident_1",
"animations/Stand/Emotions/Positive/Ecstatic_1",
"animations/Stand/Emotions/Positive/Enthusiastic_1",
"animations/Stand/Emotions/Positive/Excited_1",
"animations/Stand/Emotions/Positive/Excited_2",
"animations/Stand/Emotions/Positive/Excited_3",
"animations/Stand/Emotions/Positive/Happy_1",
"animations/Stand/Emotions/Positive/Happy_2",
"animations/Stand/Emotions/Positive/Happy_3",
"animations/Stand/Emotions/Positive/Happy_4",
"animations/Stand/Emotions/Positive/Hungry_1",
"animations/Stand/Emotions/Positive/Hysterical_1",
"animations/Stand/Emotions/Positive/Interested_1",
"animations/Stand/Emotions/Positive/Interested_2",
"animations/Stand/Emotions/Positive/Laugh_1",
"animations/Stand/Emotions/Positive/Laugh_2",
"animations/Stand/Emotions/Positive/Laugh_3",
"animations/Stand/Emotions/Positive/Mocker_1",
"animations/Stand/Emotions/Positive/Optimistic_1",
"animations/Stand/Emotions/Positive/Peaceful_1",
"animations/Stand/Emotions/Positive/Proud_1",
"animations/Stand/Emotions/Positive/Proud_2",
"animations/Stand/Emotions/Positive/Proud_3",
"animations/Stand/Emotions/Positive/Relieved_1",
"animations/Stand/Emotions/Positive/Shy_1",
"animations/Stand/Emotions/Positive/Shy_2",
"animations/Stand/Emotions/Positive/Sure_1",
"animations/Stand/Emotions/Positive/Winner_1",
"animations/Stand/Emotions/Positive/Winner_2",
"animations/Stand/Gestures/Angry_1",
"animations/Stand/Gestures/Angry_2",
"animations/Stand/Gestures/Angry_3",
"animations/Stand/Gestures/BowShort_1",
"animations/Stand/Gestures/BowShort_2",
"animations/Stand/Gestures/BowShort_3",
"animations/Stand/Gestures/But_1",
"animations/Stand/Gestures/CalmDown_1",
"animations/Stand/Gestures/CalmDown_2",
"animations/Stand/Gestures/CalmDown_3",
"animations/Stand/Gestures/CalmDown_4",
"animations/Stand/Gestures/CalmDown_5",
"animations/Stand/Gestures/CalmDown_6",
"animations/Stand/Gestures/Choice_1",
"animations/Stand/Gestures/ComeOn_1",
"animations/Stand/Gestures/Confused_1",
"animations/Stand/Gestures/Confused_2",
"animations/Stand/Gestures/CountFive_1",
"animations/Stand/Gestures/CountFour_1",
"animations/Stand/Gestures/CountMore_1",
"animations/Stand/Gestures/CountOne_1",
"animations/Stand/Gestures/CountThree_1",
"animations/Stand/Gestures/CountTwo_1",
"animations/Stand/Gestures/Desperate_1",
"animations/Stand/Gestures/Desperate_2",
"animations/Stand/Gestures/Desperate_3",
"animations/Stand/Gestures/Desperate_4",
"animations/Stand/Gestures/Desperate_5",
"animations/Stand/Gestures/DontUnderstand_1",
"animations/Stand/Gestures/Enthusiastic_3",
"animations/Stand/Gestures/Enthusiastic_4",
"animations/Stand/Gestures/Enthusiastic_5",
"animations/Stand/Gestures/Everything_1",
"animations/Stand/Gestures/Everything_2",
"animations/Stand/Gestures/Everything_3",
"animations/Stand/Gestures/Everything_4",
"animations/Stand/Gestures/Everything_6",
"animations/Stand/Gestures/Excited_1",
"animations/Stand/Gestures/Explain_1",
"animations/Stand/Gestures/Explain_10",
"animations/Stand/Gestures/Explain_11",
"animations/Stand/Gestures/Explain_2",
"animations/Stand/Gestures/Explain_3",
"animations/Stand/Gestures/Explain_4",
"animations/Stand/Gestures/Explain_5",
"animations/Stand/Gestures/Explain_6",
"animations/Stand/Gestures/Explain_7",
"animations/Stand/Gestures/Explain_8",
"animations/Stand/Gestures/Far_1",
"animations/Stand/Gestures/Far_2",
"animations/Stand/Gestures/Far_3",
"animations/Stand/Gestures/Follow_1",
"animations/Stand/Gestures/Give_1",
"animations/Stand/Gestures/Give_2",
"animations/Stand/Gestures/Give_3",
"animations/Stand/Gestures/Give_4",
"animations/Stand/Gestures/Give_5",
"animations/Stand/Gestures/Give_6",
"animations/Stand/Gestures/Great_1",
"animations/Stand/Gestures/HeSays_1",
"animations/Stand/Gestures/HeSays_2",
"animations/Stand/Gestures/HeSays_3",
"animations/Stand/Gestures/Hey_1",
"animations/Stand/Gestures/Hey_10",
"animations/Stand/Gestures/Hey_2",
"animations/Stand/Gestures/Hey_3",
"animations/Stand/Gestures/Hey_4",
"animations/Stand/Gestures/Hey_6",
"animations/Stand/Gestures/Hey_7",
"animations/Stand/Gestures/Hey_8",
"animations/Stand/Gestures/Hey_9",
"animations/Stand/Gestures/Hide_1",
"animations/Stand/Gestures/Hot_1",
"animations/Stand/Gestures/Hot_2",
"animations/Stand/Gestures/IDontKnow_1",
"animations/Stand/Gestures/IDontKnow_2",
"animations/Stand/Gestures/IDontKnow_3",
"animations/Stand/Gestures/IDontKnow_4",
"animations/Stand/Gestures/IDontKnow_5",
"animations/Stand/Gestures/IDontKnow_6",
"animations/Stand/Gestures/Joy_1",
"animations/Stand/Gestures/Kisses_1",
"animations/Stand/Gestures/Look_1",
"animations/Stand/Gestures/Look_2",
"animations/Stand/Gestures/Maybe_1",
"animations/Stand/Gestures/Me_1",
"animations/Stand/Gestures/Me_2",
"animations/Stand/Gestures/Me_4",
"animations/Stand/Gestures/Me_7",
"animations/Stand/Gestures/Me_8",
"animations/Stand/Gestures/Mime_1",
"animations/Stand/Gestures/Mime_2",
"animations/Stand/Gestures/Next_1",
"animations/Stand/Gestures/No_1",
"animations/Stand/Gestures/No_2",
"animations/Stand/Gestures/No_3",
"animations/Stand/Gestures/No_4",
"animations/Stand/Gestures/No_5",
"animations/Stand/Gestures/No_6",
"animations/Stand/Gestures/No_7",
"animations/Stand/Gestures/No_8",
"animations/Stand/Gestures/No_9",
"animations/Stand/Gestures/Nothing_1",
"animations/Stand/Gestures/Nothing_2",
"animations/Stand/Gestures/OnTheEvening_1",
"animations/Stand/Gestures/OnTheEvening_2",
"animations/Stand/Gestures/OnTheEvening_3",
"animations/Stand/Gestures/OnTheEvening_4",
"animations/Stand/Gestures/OnTheEvening_5",
"animations/Stand/Gestures/Please_1",
"animations/Stand/Gestures/Please_2",
"animations/Stand/Gestures/Please_3",
"animations/Stand/Gestures/Reject_1",
"animations/Stand/Gestures/Reject_2",
"animations/Stand/Gestures/Reject_3",
"animations/Stand/Gestures/Reject_4",
"animations/Stand/Gestures/Reject_5",
"animations/Stand/Gestures/Reject_6",
"animations/Stand/Gestures/Salute_1",
"animations/Stand/Gestures/Salute_2",
"animations/Stand/Gestures/Salute_3",
"animations/Stand/Gestures/ShowFloor_1",
"animations/Stand/Gestures/ShowFloor_2",
"animations/Stand/Gestures/ShowFloor_3",
"animations/Stand/Gestures/ShowFloor_4",
"animations/Stand/Gestures/ShowFloor_5",
"animations/Stand/Gestures/ShowSky_1",
"animations/Stand/Gestures/ShowSky_10",
"animations/Stand/Gestures/ShowSky_11",
"animations/Stand/Gestures/ShowSky_12",
"animations/Stand/Gestures/ShowSky_2",
"animations/Stand/Gestures/ShowSky_3",
"animations/Stand/Gestures/ShowSky_4",
"animations/Stand/Gestures/ShowSky_5",
"animations/Stand/Gestures/ShowSky_6",
"animations/Stand/Gestures/ShowSky_7",
"animations/Stand/Gestures/ShowSky_8",
"animations/Stand/Gestures/ShowSky_9",
"animations/Stand/Gestures/ShowTablet_1",
"animations/Stand/Gestures/ShowTablet_2",
"animations/Stand/Gestures/ShowTablet_3",
"animations/Stand/Gestures/Shy_1",
"animations/Stand/Gestures/Stretch_1",
"animations/Stand/Gestures/Stretch_2",
"animations/Stand/Gestures/Surprised_1",
"animations/Stand/Gestures/TakePlace_1",
"animations/Stand/Gestures/TakePlace_2",
"animations/Stand/Gestures/Take_1",
"animations/Stand/Gestures/Thinking_1",
"animations/Stand/Gestures/Thinking_2",
"animations/Stand/Gestures/Thinking_3",
"animations/Stand/Gestures/Thinking_4",
"animations/Stand/Gestures/Thinking_5",
"animations/Stand/Gestures/Thinking_6",
"animations/Stand/Gestures/Thinking_7",
"animations/Stand/Gestures/Thinking_8",
"animations/Stand/Gestures/This_1",
"animations/Stand/Gestures/This_10",
"animations/Stand/Gestures/This_11",
"animations/Stand/Gestures/This_12",
"animations/Stand/Gestures/This_13",
"animations/Stand/Gestures/This_14",
"animations/Stand/Gestures/This_15",
"animations/Stand/Gestures/This_2",
"animations/Stand/Gestures/This_3",
"animations/Stand/Gestures/This_4",
"animations/Stand/Gestures/This_5",
"animations/Stand/Gestures/This_6",
"animations/Stand/Gestures/This_7",
"animations/Stand/Gestures/This_8",
"animations/Stand/Gestures/This_9",
"animations/Stand/Gestures/WhatSThis_1",
"animations/Stand/Gestures/WhatSThis_10",
"animations/Stand/Gestures/WhatSThis_11",
"animations/Stand/Gestures/WhatSThis_12",
"animations/Stand/Gestures/WhatSThis_13",
"animations/Stand/Gestures/WhatSThis_14",
"animations/Stand/Gestures/WhatSThis_15",
"animations/Stand/Gestures/WhatSThis_16",
"animations/Stand/Gestures/WhatSThis_2",
"animations/Stand/Gestures/WhatSThis_3",
"animations/Stand/Gestures/WhatSThis_4",
"animations/Stand/Gestures/WhatSThis_5",
"animations/Stand/Gestures/WhatSThis_6",
"animations/Stand/Gestures/WhatSThis_7",
"animations/Stand/Gestures/WhatSThis_8",
"animations/Stand/Gestures/WhatSThis_9",
"animations/Stand/Gestures/Whisper_1",
"animations/Stand/Gestures/Wings_1",
"animations/Stand/Gestures/Wings_2",
"animations/Stand/Gestures/Wings_3",
"animations/Stand/Gestures/Wings_4",
"animations/Stand/Gestures/Wings_5",
"animations/Stand/Gestures/Yes_1",
"animations/Stand/Gestures/Yes_2",
"animations/Stand/Gestures/Yes_3",
"animations/Stand/Gestures/YouKnowWhat_1",
"animations/Stand/Gestures/YouKnowWhat_2",
"animations/Stand/Gestures/YouKnowWhat_3",
"animations/Stand/Gestures/YouKnowWhat_4",
"animations/Stand/Gestures/YouKnowWhat_5",
"animations/Stand/Gestures/YouKnowWhat_6",
"animations/Stand/Gestures/You_1",
"animations/Stand/Gestures/You_2",
"animations/Stand/Gestures/You_3",
"animations/Stand/Gestures/You_4",
"animations/Stand/Gestures/You_5",
"animations/Stand/Gestures/Yum_1",
"animations/Stand/Reactions/EthernetOff_1",
"animations/Stand/Reactions/EthernetOn_1",
"animations/Stand/Reactions/Heat_1",
"animations/Stand/Reactions/Heat_2",
"animations/Stand/Reactions/LightShine_1",
"animations/Stand/Reactions/LightShine_2",
"animations/Stand/Reactions/LightShine_3",
"animations/Stand/Reactions/LightShine_4",
"animations/Stand/Reactions/SeeColor_1",
"animations/Stand/Reactions/SeeColor_2",
"animations/Stand/Reactions/SeeColor_3",
"animations/Stand/Reactions/SeeSomething_1",
"animations/Stand/Reactions/SeeSomething_3",
"animations/Stand/Reactions/SeeSomething_4",
"animations/Stand/Reactions/SeeSomething_5",
"animations/Stand/Reactions/SeeSomething_6",
"animations/Stand/Reactions/SeeSomething_7",
"animations/Stand/Reactions/SeeSomething_8",
"animations/Stand/Reactions/ShakeBody_1",
"animations/Stand/Reactions/ShakeBody_2",
"animations/Stand/Reactions/ShakeBody_3",
"animations/Stand/Reactions/TouchHead_1",
"animations/Stand/Reactions/TouchHead_2",
"animations/Stand/Reactions/TouchHead_3",
"animations/Stand/Reactions/TouchHead_4",
"animations/Stand/Waiting/AirGuitar_1",
"animations/Stand/Waiting/BackRubs_1",
"animations/Stand/Waiting/Bandmaster_1",
"animations/Stand/Waiting/Binoculars_1",
"animations/Stand/Waiting/BreathLoop_1",
"animations/Stand/Waiting/BreathLoop_2",
"animations/Stand/Waiting/BreathLoop_3",
"animations/Stand/Waiting/CallSomeone_1",
"animations/Stand/Waiting/Drink_1",
"animations/Stand/Waiting/DriveCar_1",
"animations/Stand/Waiting/Fitness_1",
"animations/Stand/Waiting/Fitness_2",
"animations/Stand/Waiting/Fitness_3",
"animations/Stand/Waiting/FunnyDancer_1",
"animations/Stand/Waiting/HappyBirthday_1",
"animations/Stand/Waiting/Helicopter_1",
"animations/Stand/Waiting/HideEyes_1",
"animations/Stand/Waiting/HideHands_1",
"animations/Stand/Waiting/Innocent_1",
"animations/Stand/Waiting/Knight_1",
"animations/Stand/Waiting/KnockEye_1",
"animations/Stand/Waiting/KungFu_1",
"animations/Stand/Waiting/LookHand_1",
"animations/Stand/Waiting/LookHand_2",
"animations/Stand/Waiting/LoveYou_1",
"animations/Stand/Waiting/Monster_1",
"animations/Stand/Waiting/MysticalPower_1",
"animations/Stand/Waiting/PlayHands_1",
"animations/Stand/Waiting/PlayHands_2",
"animations/Stand/Waiting/PlayHands_3",
"animations/Stand/Waiting/Relaxation_1",
"animations/Stand/Waiting/Relaxation_2",
"animations/Stand/Waiting/Relaxation_3",
"animations/Stand/Waiting/Relaxation_4",
"animations/Stand/Waiting/Rest_1",
"animations/Stand/Waiting/Robot_1",
"animations/Stand/Waiting/ScratchBack_1",
"animations/Stand/Waiting/ScratchBottom_1",
"animations/Stand/Waiting/ScratchEye_1",
"animations/Stand/Waiting/ScratchHand_1",
"animations/Stand/Waiting/ScratchHead_1",
"animations/Stand/Waiting/ScratchLeg_1",
"animations/Stand/Waiting/ScratchTorso_1",
"animations/Stand/Waiting/ShowMuscles_1",
"animations/Stand/Waiting/ShowMuscles_2",
"animations/Stand/Waiting/ShowMuscles_3",
"animations/Stand/Waiting/ShowMuscles_4",
"animations/Stand/Waiting/ShowMuscles_5",
"animations/Stand/Waiting/ShowSky_1",
"animations/Stand/Waiting/ShowSky_2",
"animations/Stand/Waiting/SpaceShuttle_1",
"animations/Stand/Waiting/Stretch_1",
"animations/Stand/Waiting/Stretch_2",
"animations/Stand/Waiting/TakePicture_1",
"animations/Stand/Waiting/Taxi_1",
"animations/Stand/Waiting/Think_1",
"animations/Stand/Waiting/Think_2",
"animations/Stand/Waiting/Think_3",
"animations/Stand/Waiting/Think_4",
"animations/Stand/Waiting/Waddle_1",
"animations/Stand/Waiting/Waddle_2",
"animations/Stand/Waiting/WakeUp_1",
"animations/Stand/Waiting/Zombie_1"]

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase
@@ -5,6 +12,7 @@ from robot_interface.state import state
from robot_interface.core.config import settings
class MainReceiver(ReceiverBase):
"""
The main receiver endpoint, responsible for handling ping and negotiation requests.
@@ -12,10 +20,12 @@ class MainReceiver(ReceiverBase):
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use.
:param port: The port to use, defaults to value in `settings.agent_settings.main_receiver_port`.
:type port: int
"""
def __init__(self, zmq_context, port=settings.agent_settings.main_receiver_port):
def __init__(self, zmq_context, port=None):
if port is None:
port = settings.agent_settings.main_receiver_port
super(MainReceiver, self).__init__("main")
self.create_socket(zmq_context, zmq.REP, port, bind=False)

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from abc import ABCMeta, abstractmethod
from robot_interface.endpoints.socket_base import SocketBase

View File

@@ -1,7 +1,16 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from abc import ABCMeta
import zmq
from robot_interface.core.config import settings
class SocketBase(object):
"""
@@ -59,7 +68,7 @@ class SocketBase(object):
if bind:
self.socket.bind("tcp://*:{}".format(port))
else:
self.socket.connect("tcp://localhost:{}".format(port))
self.socket.connect("tcp://{}:{}".format(settings.agent_settings.control_backend_host, port))
def close(self):
"""Close the ZeroMQ socket."""

View File

@@ -1,11 +1,21 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import zmq
import threading
import logging
import struct
from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state
from robot_interface.core.config import settings
class VideoSender(SocketBase):
"""
Video sender endpoint, responsible for sending video frames.
@@ -18,7 +28,7 @@ class VideoSender(SocketBase):
"""
def __init__(self, zmq_context, port=settings.agent_settings.video_sender_port):
super(VideoSender, self).__init__("video")
self.create_socket(zmq_context, zmq.PUB, port, [(zmq.CONFLATE,1)])
self.create_socket(zmq_context, zmq.PUB, port, [(zmq.SNDHWM,3)])
def start_video_rcv(self):
"""
@@ -51,10 +61,23 @@ class VideoSender(SocketBase):
:param vid_stream_name: The name of a camera subscription on the video service object vid_service
:type vid_stream_name: str
"""
while not state.exit_event.is_set():
try:
img = vid_service.getImageRemote(vid_stream_name)
#Possibly limit images sent if queuing issues arise
self.socket.send(img[settings.video_config.image_buffer])
except:
logging.warn("Failed to retrieve video image from robot.")
try:
while not state.exit_event.is_set():
try:
img = vid_service.getImageRemote(vid_stream_name)
if img is not None:
raw_data = img[6]
width = img[0]
height = img[1]
width_bytes = struct.pack('<I', width)
height_bytes = struct.pack('<I', height)
self.socket.send_multipart([width_bytes, height_bytes, raw_data])
except KeyboardInterrupt:
logging.info("Video receiving loop interrupted by user.")
except:
logging.warn("Failed to retrieve video image from robot.")
finally:
vid_service.unsubscribe(vid_stream_name)
logging.info("Unsubscribed from video stream.")

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
from robot_interface.endpoints.audio_sender import AudioSender

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
import signal
import threading

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import os
from dotenv import load_dotenv
load_dotenv()
def get_config(value, env, default, cast=None):
"""
Small utility to get a configuration value, returns `value` if it is not None, else it will try to get the
environment variable cast with `cast`. If the environment variable is not set, it will return `default`.
Special handling for booleans, which are only true if the value of the variable is "true" or "yes", ignoring capitalization.
:param value: The value to check.
:type value: Any
:param env: The environment variable to check.
:type env: string
:param default: The default value to return if the environment variable is not set.
:type default: Any
:param cast: A function to use to cast the environment variable. Must support string input.
:type cast: Callable[[Any], Any], optional
:return: The value, the environment variable value, or the default.
:rtype: Any
"""
if value is not None:
return value
env = os.environ.get(env, default)
if cast is None or env is None:
return env
if cast == bool:
if isinstance(env, bool):
return env
if not isinstance(default, bool):
raise ValueError("Default value must be a boolean if the cast type is a boolean.")
return env.lower() == "true" or env.lower() == "yes"
return cast(env)

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that `print` can print Unicode characters in names
import logging
import sys

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
import sys
@@ -7,6 +14,20 @@ except ImportError:
qi = None
def _get_qi_url():
"""
Get the Qi URL from the command line arguments, or None if not given.
"""
if "--qi-url" in sys.argv:
return sys.argv[sys.argv.index("--qi-url") + 1]
for arg in sys.argv:
if arg.startswith("--qi-url="):
return arg[len("--qi-url="):]
return None
def get_qi_session():
"""
Create and return a Qi session if available.
@@ -18,12 +39,13 @@ def get_qi_session():
logging.info("Unable to import qi. Running in stand-alone mode.")
return None
if "--qi-url" not in sys.argv:
qi_url = _get_qi_url()
if qi_url is None:
logging.info("No Qi URL argument given. Running in stand-alone mode.")
return None
try:
app = qi.Application()
app = qi.Application(["--qi-url", qi_url, "--qi-listen-url", "tcp://0.0.0.0:0"])
app.start()
return app.session
except RuntimeError:

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import time

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that we can format strings with Unicode characters
import random
import sys

17
test/conftest.py Normal file
View File

@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from mock import patch, MagicMock
import pytest
@pytest.fixture(autouse=True)
def mock_zmq_context():
with patch("zmq.Context") as mock:
mock.instance.return_value = MagicMock()
yield mock

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from mock import patch, mock
from robot_interface.core.config import Settings
from robot_interface.endpoints.main_receiver import MainReceiver
def test_environment_variables(monkeypatch):
"""
When environment variables are set, creating settings should use these.
"""
monkeypatch.setenv("AGENT__CONTROL_BACKEND_HOST", "some_value_that_should_be_different")
settings = Settings()
assert settings.agent_settings.control_backend_host == "some_value_that_should_be_different"
@patch("robot_interface.endpoints.main_receiver.settings")
@patch("robot_interface.endpoints.socket_base.settings")
def test_create_endpoint_custom_host(base_settings, main_settings):
"""
When a custom host is given in the settings, check that an endpoint's socket connects to it.
"""
fake_context = mock.Mock()
fake_socket = mock.Mock()
fake_context.socket.return_value = fake_socket
base_settings.agent_settings.control_backend_host = "not_localhost"
main_settings.agent_settings.main_receiver_port = 9999
_ = MainReceiver(fake_context)
fake_socket.connect.assert_called_once_with("tcp://not_localhost:9999")

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pyaudio
import pytest

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,18 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import sys
import mock
import pytest
import zmq
import Queue
from robot_interface.endpoints.actuation_receiver import ActuationReceiver
from robot_interface.endpoints.gesture_settings import GestureTags
@pytest.fixture
@@ -18,6 +26,7 @@ def zmq_context():
context = zmq.Context()
yield context
def test_force_speech_clears_queue(mocker):
"""
Tests that a force speech message clears the existing queue
@@ -52,6 +61,7 @@ def test_force_speech_clears_queue(mocker):
queued_item = receiver._message_queue.get()
assert queued_item == "Emergency Notification"
def test_handle_unimplemented_endpoint(mocker):
"""
Tests handling of unknown endpoints.
@@ -67,6 +77,7 @@ def test_handle_unimplemented_endpoint(mocker):
"data": None,
})
def test_speech_message_no_data(mocker):
"""
Tests that if the message data is empty, the receiver returns immediately
@@ -328,4 +339,155 @@ def test_clear_queue(mocker):
receiver.clear_queue()
# Assert the queue is empty
assert receiver._message_queue.qsize() == 0
assert receiver._message_queue.qsize() == 0
def test_gesture_no_data(zmq_context, mocker):
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": ""}, True)
# Just ensuring no crash
def test_gesture_invalid_data(zmq_context, mocker):
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": 123}, True)
# No crash expected
def test_gesture_single_not_found(zmq_context, mocker):
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["wave", "bow"] # allowed single gestures
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "unknown_gesture"}, True)
# No crash expected
def test_gesture_tag_not_found(zmq_context, mocker):
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.tags = ["happy", "sad"]
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "not_a_tag"}, False)
# No crash expected
def test_gesture_no_qi_session(zmq_context, mocker):
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_state.qi_session = None
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["hello"]
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "hello"}, True)
# No crash, path returns early
def test_gesture_single_success(zmq_context, mocker):
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
# Setup gesture settings
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["wave"]
mock_animation_service = mock.Mock()
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "wave"}, True)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer")
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run
assert getattr(mock_qi, "async").call_args[0][1] == "wave"
def test_gesture_tag_success(zmq_context, mocker):
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.tags = ["greeting"]
mock_animation_service = mock.Mock()
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "greeting"}, False)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer")
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.runTag
assert getattr(mock_qi, "async").call_args[0][1] == "greeting"
def test_handle_message_all_routes(zmq_context, mocker):
"""
Ensures all handle_message endpoint branches route correctly.
"""
receiver = ActuationReceiver(zmq_context)
mock_speech = mocker.patch.object(receiver, "_handle_speech")
mock_gesture = mocker.patch.object(receiver, "_handle_gesture")
receiver.handle_message({"endpoint": "actuate/speech", "data": "hi"})
receiver.handle_message({"endpoint": "actuate/gesture/tag", "data": "greeting"})
receiver.handle_message({"endpoint": "actuate/gesture/single", "data": "wave"})
mock_speech.assert_called_once()
assert mock_gesture.call_count == 2
def test_endpoint_description(zmq_context, mocker):
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.tags = ["happy"]
mock_tags.single_gestures = ["wave"]
receiver = ActuationReceiver(zmq_context)
desc = receiver.endpoint_description()
assert "gestures" in desc
assert desc["gestures"] == ["happy"]
assert "single_gestures" in desc
assert desc["single_gestures"] == ["wave"]
def test_gesture_single_real_gesturetags(zmq_context, mocker):
"""
Uses the real GestureTags (no mocking) to ensure the receiver
references GestureTags.single_gestures correctly.
"""
# Ensure qi session exists so we pass the early return
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_state.qi_session = mock.Mock()
# Mock qi.async to avoid real async calls
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
# Mock animation service
mock_animation_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context)
# Pick a real gesture from GestureTags.single_gestures
assert len(GestureTags.single_gestures) > 0, "GestureTags.single_gestures must not be empty"
gesture = GestureTags.single_gestures[0]
receiver._handle_gesture(
{"endpoint": "actuate/gesture/single", "data": gesture},
is_single=True,
)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer")
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run
assert getattr(mock_qi, "async").call_args[0][1] == gesture

View File

@@ -1,4 +1,10 @@
# coding=utf-8
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import os
import mock
@@ -27,35 +33,33 @@ def test_no_microphone(zmq_context, mocker):
mock_info_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger.info")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = None
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
sender = AudioSender(zmq_context)
assert sender.microphone is None
assert sender.capturer.microphone is None
sender.start()
assert sender.thread is None
sender.capturer.setup()
mock_info_logger.assert_called()
sender.wait_until_done() # Should return early because we didn't start a thread
def test_unicode_mic_name(zmq_context, mocker):
"""
Tests the robustness of the `AudioSender` when handling microphone names
that contain Unicode characters.
"""
mocker.patch("robot_interface.endpoints.audio_sender.threading")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"• Some Unicode name"}
mock_choose_mic.return_value = {"name": u"• Some Unicode name", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
sender = AudioSender(zmq_context)
assert sender.microphone is not None
assert sender.capturer.microphone is not None
sender.capturer.audio.open = mock.Mock(return_value=mock.Mock())
# `.start()` logs the name of the microphone. It should not give an error if it contains Unicode
# `.setup()` logs the name of the microphone. It should not give an error if it contains Unicode
# symbols.
sender.start()
assert sender.thread is not None
sender.wait_until_done() # Should return instantly because we didn't start a real thread
sender.capturer.setup()
def _fake_read(num_frames):
@@ -73,10 +77,43 @@ def test_sending_audio(mocker):
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock()
send_socket = mock.Mock()
mock_state.is_speaking = False
# If there's something wrong with the microphone, it will raise an IOError when `read`ing.
stream = mock.Mock()
stream.read = _fake_read
sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket
sender.capturer.audio.open = mock.Mock()
sender.capturer.audio.open.return_value = stream
sender.start()
sender.thread.join()
send_socket.assert_called()
def test_no_sending_if_speaking(mocker):
"""
Tests the successful sending of audio data over a ZeroMQ socket.
"""
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock()
send_socket = mock.Mock()
mock_state.is_speaking = True
# If there's something wrong with the microphone, it will raise an IOError when `read`ing.
stream = mock.Mock()
@@ -84,13 +121,13 @@ def test_sending_audio(mocker):
sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket
sender.audio.open = mock.Mock()
sender.audio.open.return_value = stream
sender.capturer.audio.open = mock.Mock()
sender.capturer.audio.open.return_value = stream
sender.start()
sender.wait_until_done()
sender.thread.join()
send_socket.assert_called()
send_socket.assert_not_called()
def _fake_read_error(num_frames):
@@ -108,6 +145,7 @@ def test_break_microphone(mocker):
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock()
@@ -119,11 +157,11 @@ def test_break_microphone(mocker):
sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket
sender.audio.open = mock.Mock()
sender.audio.open.return_value = stream
sender.capturer.audio.open = mock.Mock()
sender.capturer.audio.open.return_value = stream
sender.start()
sender.wait_until_done()
sender.thread.join()
send_socket.assert_not_called()
@@ -134,6 +172,8 @@ def test_pyaudio_init_failure(mocker, zmq_context):
"""
# Prevent binding the ZMQ socket
mocker.patch("robot_interface.endpoints.audio_sender.AudioSender.create_socket")
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
# Simulate PyAudio() failing
mocker.patch(
@@ -143,5 +183,5 @@ def test_pyaudio_init_failure(mocker, zmq_context):
sender = AudioSender(zmq_context)
assert sender.audio is None
assert sender.microphone is None
assert sender.capturer.audio is None
assert sender.capturer.microphone is None

View File

@@ -0,0 +1,107 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from robot_interface.utils.get_config import get_config
def test_get_config_prefers_explicit_value(monkeypatch):
"""
When a direct value is provided it should be returned without reading the environment.
"""
monkeypatch.setenv("GET_CONFIG_TEST", "from-env")
result = get_config("explicit", "GET_CONFIG_TEST", "default")
assert result == "explicit"
def test_get_config_returns_env_value(monkeypatch):
"""
If value is None the environment variable should be used.
"""
monkeypatch.setenv("GET_CONFIG_TEST", "from-env")
result = get_config(None, "GET_CONFIG_TEST", "default")
assert result == "from-env"
def test_get_config_casts_env_value(monkeypatch):
"""
The env value should be cast when a cast function is provided.
"""
monkeypatch.setenv("GET_CONFIG_PORT", "1234")
result = get_config(None, "GET_CONFIG_PORT", 0, int)
assert result == 1234
def test_get_config_casts_default_when_env_missing(monkeypatch):
"""
When the env var is missing it should fall back to the default and still apply the cast.
"""
monkeypatch.delenv("GET_CONFIG_MISSING", raising=False)
result = get_config(None, "GET_CONFIG_MISSING", "42", int)
assert result == 42
def test_get_config_unset_boolean_default(monkeypatch):
"""
When the env var is a boolean, and it's not set, ensure it uses the default value.
"""
monkeypatch.delenv("SOME_BOOLEAN_VARIABLE", raising=False)
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == False
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == True
def test_get_config_true_boolean(monkeypatch):
"""
When the env var is a boolean, and its value is "true", "TRUE", "yes", etc., it should return true.
"""
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "TRUE")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "true")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "yes")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "YES")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "TrUE")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
def test_get_config_false_boolean(monkeypatch):
"""
When the env var is a boolean, and its value is not "true", "TRUE", "yes", etc., it should return False.
"""
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "FALSE")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == False
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "false")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == False
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "anything, tbh")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == False

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pytest
import threading
import zmq

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock
import pytest
import zmq

View File

@@ -1,4 +1,10 @@
# coding=utf-8
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock
import pytest

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import sys
# Import module under test
@@ -55,7 +62,7 @@ def test_get_qi_session_runtime_error(monkeypatch):
raise RuntimeError("boom")
class FakeQi:
Application = lambda self=None: FakeApp()
Application = lambda *args, **kwargs: FakeApp()
reload_qi_utils_with(FakeQi())
@@ -80,7 +87,7 @@ def test_get_qi_session_success(monkeypatch):
return True
class FakeQi:
Application = lambda self=None: FakeApp()
Application = lambda *args, **kwargs: FakeApp()
reload_qi_utils_with(FakeQi())

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pytest
from robot_interface.endpoints.receiver_base import ReceiverBase

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock
import zmq
from robot_interface.endpoints.socket_base import SocketBase

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import threading
import signal
import pytest

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import time
import mock

View File

@@ -1,5 +1,11 @@
# coding=utf-8
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import struct
import mock
import pytest
import zmq
@@ -48,8 +54,10 @@ def test_video_streaming(zmq_context, mocker):
# Pepper's image buffer lives at index 6
mocker.patch.object(settings.video_config, "image_buffer", 6)
test_width = 320
test_height = 240
mock_video_service = mock.Mock()
mock_video_service.getImageRemote.return_value = [None]*6 + ["fake_img"]
mock_video_service.getImageRemote.return_value = [test_width, test_height, None, None, None, None, b"fake_img"]
fake_session = mock.Mock()
fake_session.service.return_value = mock_video_service
@@ -63,12 +71,16 @@ def test_video_streaming(zmq_context, mocker):
sender = VideoSender(zmq_context)
send_socket = mock.Mock()
sender.socket.send = send_socket
sender.socket.send_multipart = send_socket
sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_called_with("fake_img")
send_socket.assert_called_with([
struct.pack('<I', 320),
struct.pack('<I', 240),
b"fake_img"
])
def test_video_receive_error(zmq_context, mocker):
@@ -91,9 +103,30 @@ def test_video_receive_error(zmq_context, mocker):
sender = VideoSender(zmq_context)
send_socket = mock.Mock()
sender.socket.send = send_socket
sender.socket.send_multipart = send_socket
sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_not_called()
def test_video_loop_keyboard_interrupt(zmq_context, mocker):
"""Video loop should handle KeyboardInterrupt gracefully and unsubscribe."""
_patch_basics(mocker)
_patch_exit_event(mocker)
# We mock the video service to raise KeyboardInterrupt when accessed
mock_video_service = mock.Mock()
mock_video_service.getImageRemote.side_effect = KeyboardInterrupt
# Mock logging to verify the specific interrupt message is logged
mock_logger = mocker.patch("robot_interface.endpoints.video_sender.logging")
sender = VideoSender(zmq_context)
# Execute the loop
sender.video_rcv_loop(mock_video_service, "stream_name")
# Verify the 'finally' block executed (unsubscribe)
mock_video_service.unsubscribe.assert_called_with("stream_name")
mock_logger.info.assert_any_call("Unsubscribed from video stream.")