1 Commits

Author SHA1 Message Date
JobvAlewijk
8695f5204f feat: added basic list of 10 gestures
ref: N25B-402
2025-12-18 18:17:06 +01:00
39 changed files with 140 additions and 965 deletions

View File

@@ -6,9 +6,6 @@
# The hostname or IP address of the Control Backend.
AGENT__CONTROL_BACKEND_HOST=localhost
# Whether to use Pepper's microphone when Pepper is connected.
AUDIO__USE_PEPPER_MICROPHONE=true
# Variables that are unlikely to be configured, you can probably ignore these:

View File

@@ -7,5 +7,3 @@ sphinx
sphinx_rtd_theme
pre-commit
python-dotenv
numpy<=1.16.6
enum34

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals
from robot_interface.utils.get_config import get_config
@@ -68,7 +61,7 @@ class VideoConfig(object):
):
self.camera_index = get_config(camera_index, "VIDEO__CAMERA_INDEX", 0, int)
self.resolution = get_config(resolution, "VIDEO__RESOLUTION", 2, int)
self.color_space = get_config(color_space, "VIDEO__COLOR_SPACE", 13, int)
self.color_space = get_config(color_space, "VIDEO__COLOR_SPACE", 11, int)
self.fps = get_config(fps, "VIDEO__FPS", 15, int)
self.stream_name = get_config(stream_name, "VIDEO__STREAM_NAME", "Pepper Video")
self.image_buffer = get_config(image_buffer, "VIDEO__IMAGE_BUFFER", 6, int)
@@ -78,8 +71,6 @@ class AudioConfig(object):
"""
Audio configuration constants.
:ivar use_pepper_microphone: Whether to use Pepper's microphone or not, defaults to True.
:vartype use_pepper_microphone: bool
:ivar sample_rate: Audio sampling rate in Hz, defaults to 16000.
:vartype sample_rate: int
:ivar chunk_size: Size of audio chunks to capture/process, defaults to 512.
@@ -87,14 +78,7 @@ class AudioConfig(object):
:ivar channels: Number of audio channels, defaults to 1.
:vartype channels: int
"""
def __init__(
self,
use_pepper_microphone=None,
sample_rate=None,
chunk_size=None,
channels=None,
):
self.use_pepper_microphone = get_config(use_pepper_microphone, "AUDIO__USE_PEPPER_MICROPHONE", True, bool)
def __init__(self, sample_rate=None, chunk_size=None, channels=None):
self.sample_rate = get_config(sample_rate, "AUDIO__SAMPLE_RATE", 16000, int)
self.chunk_size = get_config(chunk_size, "AUDIO__CHUNK_SIZE", 512, int)
self.channels = get_config(channels, "AUDIO__CHANNELS", 1, int)

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,19 +1,11 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that we can log texts with Unicode characters
import logging
from threading import Thread
import Queue
import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase
from robot_interface.state import state
from robot_interface.core.config import settings
from robot_interface.endpoints.gesture_settings import GestureTags
@@ -40,9 +32,6 @@ class ActuationReceiver(ReceiverBase):
self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options
self._tts_service = None
self._animation_service = None
self._message_queue = Queue.Queue()
self.message_thread = Thread(target=self._handle_messages)
self.message_thread.start()
def _handle_speech(self, message):
"""
@@ -69,26 +58,8 @@ class ActuationReceiver(ReceiverBase):
if not self._tts_service:
self._tts_service = state.qi_session.service("ALTextToSpeech")
if message.get("is_priority"):
# Bypass queue and speak immediately
self.clear_queue()
self._message_queue.put(text)
logging.debug("Force speaking immediately: {}".format(text))
else:
self._message_queue.put(text)
def clear_queue(self):
"""
Safely drains all pending messages from the queue.
"""
logging.info("Message queue size: {}".format(self._message_queue.qsize()))
try:
while True:
# Remove items one by one without waiting
self._message_queue.get_nowait()
except Queue.Empty:
pass
logging.info("Message queue cleared.")
# Returns instantly. Messages received while speaking will be queued.
getattr(qi, "async")(self._tts_service.say, text)
def _handle_gesture(self, message, is_single):
"""
@@ -151,19 +122,6 @@ class ActuationReceiver(ReceiverBase):
if message["endpoint"] == "actuate/gesture/single":
self._handle_gesture(message, True)
def _handle_messages(self):
while not state.exit_event.is_set():
try:
text = self._message_queue.get(timeout=0.1)
state.is_speaking = True
self._tts_service.say(text)
except Queue.Empty:
state.is_speaking = False
except RuntimeError:
logging.error("Lost connection to Pepper. Please check if you're connected to the "
"local WiFi and restart this application.")
state.exit_event.set()
def endpoint_description(self):
"""
Extend the default endpoint description with gesture tags.
@@ -171,5 +129,6 @@ class ActuationReceiver(ReceiverBase):
"""
desc = super(ActuationReceiver, self).endpoint_description()
desc["gestures"] = GestureTags.tags
desc["basic_gestures"] = GestureTags.basic_gestures
desc["single_gestures"] = GestureTags.single_gestures
return desc

View File

@@ -1,21 +1,7 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that `logging` can use Unicode characters in names
import audioop
import enum
from abc import ABCMeta, abstractmethod
import threading
import logging
import Queue
import numpy as np
import pyaudio
import zmq
@@ -27,219 +13,85 @@ from robot_interface.core.config import settings
logger = logging.getLogger(__name__)
class AudioCapturer(object):
class AudioSender(SocketBase):
"""
Interface for audio capturers.
"""
__metaclass__ = ABCMeta
Audio sender endpoint, responsible for sending microphone audio data.
@abstractmethod
def setup(self):
raise NotImplementedError()
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
@abstractmethod
def stop(self):
raise NotImplementedError()
:param port: The port to use.
:type port: int
@abstractmethod
def generate_chunk(self):
raise NotImplementedError()
class SampleRate(enum.Enum):
"""
Sample rate to use in Hz.
"""
LOW = 16000
HIGH = 48000
class PepperMicrophone(enum.Enum):
"""
Which of Pepper's microphones to use. In our case, some of the mics were damages/didn't work
well, so we choose to only use the fron right. If you have a Pepper robot with all working mics,
you might wish to use all microphones, to improve overall audio quality.
"""
ALL = 0
LEFT = 1
RIGHT = 2
FRONT_LEFT = 3
FRONT_RIGHT = 4
class QiAudioCapturer(AudioCapturer):
# Some of this class' methods have docstrings as binary strings. Keep them that way, otherwise
# ``qi.Session.registerService`` will give RuntimeErrors.
def __init__(self, sample_rate=SampleRate.LOW, mic=PepperMicrophone.FRONT_RIGHT,
deinterleaved=0):
"""
:raises RuntimeError: If there is no Qi session available.
:raises ValueError: If the given arguments are not compatible.
"""
self.session = state.qi_session
if not self.session:
raise RuntimeError("Cannot capture from qi device, no qi session available.")
if sample_rate == SampleRate.HIGH and mic != PepperMicrophone.ALL:
raise RuntimeError("For 48000 Hz, you must select all microphones.")
if mic == PepperMicrophone.ALL and sample_rate != SampleRate.HIGH:
raise RuntimeError("For using all microphones, 48000 Hz is required.")
self.audio = self.session.service("ALAudioDevice")
self.service_name = "ZmqAudioStreamer"
self.sample_rate = sample_rate
self.mic = mic
self.deinterleaved = deinterleaved
self.overflow = np.empty(0, dtype=np.float32)
self.q = Queue.Queue()
self._rate_state = None
def setup(self):
b"""
:raises RuntimeError: If no Qi session is available or if the session is not compatible with audio streaming.
"""
assert self.session is not None
logger.info("Listening with Pepper's microphone.")
self.session.registerService(self.service_name, self)
self.audio.setClientPreferences(self.service_name, self.sample_rate.value, self.mic.value,
self.deinterleaved)
self.audio.subscribe(self.service_name)
def stop(self):
b"""
Stop the audio capturer.
"""
try:
self.audio.unsubscribe(self.service_name)
except:
pass
def generate_chunk(self):
try:
chunk = self.q.get(True, 0.1)
return chunk
except Queue.Empty:
return None
# Callback invoked by NAOqi
def processRemote(self, nbOfChannels, nbOfSamplesByChannel, timeStamp, inputBuffer):
raw_pcm = bytes(inputBuffer)
pcm_i16 = np.frombuffer(raw_pcm, dtype=np.int16)
# Make mono channel (if it was 4 channels)
pcm_i32_mono = self._make_mono(pcm_i16.astype(np.int32), nbOfChannels)
# Resample (if it was 48k)
pcm_i32_mono_16k, self._rate_state = audioop.ratecv(pcm_i32_mono.tobytes(), 4, 1,
self.sample_rate.value,
SampleRate.LOW.value, self._rate_state)
pcm_f32_mono_16k = (np.frombuffer(pcm_i32_mono_16k, dtype=np.int32).astype(np.float32) /
32768.0)
# Attach overflow
pcm_f32_mono_16k = np.append(self.overflow, pcm_f32_mono_16k)
for i in range(len(pcm_f32_mono_16k) // 512):
self.q.put_nowait(pcm_f32_mono_16k[i * 512 : (i + 1) * 512].tobytes())
self.overflow = pcm_f32_mono_16k[len(pcm_f32_mono_16k) // 512 * 512 :]
def _make_mono(self, frag, channels):
return frag.reshape(-1, channels).mean(axis=1, dtype=np.int32)
class StandaloneAudioCapturer(AudioCapturer):
"""
Audio capturer that uses a microphone from the local device, can be chosen with the
``--microphone`` program argument.
:ivar thread: Thread used for sending audio.
:vartype thread: threading.Thread | None
:ivar audio: PyAudio instance.
:vartype audio: pyaudio.PyAudio | None
:ivar microphone: Selected microphone information.
:vartype microphone: dict | None
:ivar stream: PyAudio stream instance. None until ``setup()`` is called, remaining None if setup
fails for any reason.
:vartype stream: pyaudio.Stream | None
"""
def __init__(self):
self.stream = None
def __init__(self, zmq_context, port=settings.agent_settings.audio_sender_port):
super(AudioSender, self).__init__(str("audio")) # Convert future's unicode_literal to str
self.create_socket(zmq_context, zmq.PUB, port)
self.thread = None
try:
self.audio = pyaudio.PyAudio()
self.microphone = choose_mic(self.audio)
except IOError as e:
logger.warning("PyAudio is not available. Won't be able to send audio.", exc_info=True)
logger.warning("PyAudio is not available.", exc_info=e)
self.audio = None
self.microphone = None
def setup(self):
def start(self):
"""
Setup audio stream. Will not if no microphone is available.
Start sending audio in a different thread.
Will not start if no microphone is available.
"""
if not self.microphone:
logger.info("Not listening: no microphone available.")
return
logger.info("Listening with microphone \"{}\".".format(self.microphone["name"]))
self.stream = self.audio.open(
format=pyaudio.paFloat32,
channels=settings.audio_config.channels,
rate=settings.audio_config.sample_rate,
input=True,
input_device_index=self.microphone["index"],
frames_per_buffer=settings.audio_config.chunk_size,
)
def stop(self):
"""
Close the audio stream.
"""
if not self.stream: return
self.stream.stop_stream()
self.stream.close()
def generate_chunk(self):
"""
:return: Audio frames from the microphone of size ``settings.audio_config.chunk_size``.
:rtype: bytes.
:raises IOError: If reading from the audio stream fails.
"""
return self.stream.read(settings.audio_config.chunk_size)
class AudioSender(SocketBase):
def __init__(self, zmq_context, port=settings.agent_settings.audio_sender_port):
super(AudioSender, self).__init__(str("audio"))
self.create_socket(zmq_context, zmq.PUB, port)
self.thread = threading.Thread(target=self.stream)
self.capturer = self.choose_capturer()
def start(self):
self.capturer.setup()
self.thread = threading.Thread(target=self._stream)
self.thread.start()
def close(self):
self.capturer.stop()
super(AudioSender, self).close()
def wait_until_done(self):
"""
Wait until the audio thread is done.
def stream(self):
while not state.exit_event.is_set():
chunk = self.capturer.generate_chunk()
Will block until `state.exit_event` is set. If the thread is not running, does nothing.
"""
if not self.thread: return
self.thread.join()
self.thread = None
if chunk is None or state.is_speaking:
continue
def _stream(self):
"""
Internal method to continuously read audio from the microphone and send it over the socket.
"""
audio_settings = settings.audio_config
chunk = audio_settings.chunk_size # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD
self.socket.send(chunk)
# Docs say this only raises an error if neither `input` nor `output` is True
stream = self.audio.open(
format=pyaudio.paFloat32,
channels=audio_settings.channels,
rate=audio_settings.sample_rate,
input=True,
input_device_index=self.microphone["index"],
frames_per_buffer=chunk,
)
def choose_capturer(self):
if state.qi_session and settings.audio_config.use_pepper_microphone:
return QiAudioCapturer()
return StandaloneAudioCapturer()
try:
while not state.exit_event.is_set():
data = stream.read(chunk)
self.socket.send(data)
except IOError as e:
logger.error("Stopped listening: failed to get audio from microphone.", exc_info=e)
finally:
stream.stop_stream()
stream.close()

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
class GestureTags:
tags = ["above", "affirmative", "afford", "agitated", "all", "allright", "alright", "any",
"assuage", "assuage", "attemper", "back", "bashful", "beg", "beseech", "blank",
@@ -21,6 +14,19 @@ class GestureTags:
"think", "timid", "top", "unless", "up", "upstairs", "void", "warm", "winner", "yeah",
"yes", "yoo-hoo", "you", "your", "zero", "zestful"]
basic_gestures = [
"animations/Stand/Gestures/Hey_1",
"animations/Stand/Emotions/Neutral/Puzzled_1",
"animations/Stand/Gestures/Explain_4",
"animations/Stand/Gestures/You_1"
"animations/Stand/Emotions/Positive/Happy_1",
"animations/Stand/Emotions/Positive/Laugh_2",
"animations/Stand/Emotions/Neutral/Lonely_1",
"animations/Stand/Emotions/Negative/Surprise_1",
"animations/Stand/Emotions/Negative/Hurt_2",
"animations/Stand/Emotions/Negative/Angry_4",
]
single_gestures = [
"animations/Stand/BodyTalk/Listening/Listening_1",
"animations/Stand/BodyTalk/Listening/Listening_2",

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from abc import ABCMeta, abstractmethod
from robot_interface.endpoints.socket_base import SocketBase

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from abc import ABCMeta
import zmq

View File

@@ -1,21 +1,11 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import zmq
import threading
import logging
import struct
from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state
from robot_interface.core.config import settings
class VideoSender(SocketBase):
"""
Video sender endpoint, responsible for sending video frames.
@@ -28,7 +18,7 @@ class VideoSender(SocketBase):
"""
def __init__(self, zmq_context, port=settings.agent_settings.video_sender_port):
super(VideoSender, self).__init__("video")
self.create_socket(zmq_context, zmq.PUB, port, [(zmq.SNDHWM,3)])
self.create_socket(zmq_context, zmq.PUB, port, [(zmq.CONFLATE,1)])
def start_video_rcv(self):
"""
@@ -61,23 +51,10 @@ class VideoSender(SocketBase):
:param vid_stream_name: The name of a camera subscription on the video service object vid_service
:type vid_stream_name: str
"""
try:
while not state.exit_event.is_set():
try:
img = vid_service.getImageRemote(vid_stream_name)
if img is not None:
raw_data = img[6]
width = img[0]
height = img[1]
width_bytes = struct.pack('<I', width)
height_bytes = struct.pack('<I', height)
self.socket.send_multipart([width_bytes, height_bytes, raw_data])
except KeyboardInterrupt:
logging.info("Video receiving loop interrupted by user.")
except:
logging.warn("Failed to retrieve video image from robot.")
finally:
vid_service.unsubscribe(vid_stream_name)
logging.info("Unsubscribed from video stream.")
while not state.exit_event.is_set():
try:
img = vid_service.getImageRemote(vid_stream_name)
#Possibly limit images sent if queuing issues arise
self.socket.send(img[settings.video_config.image_buffer])
except:
logging.warn("Failed to retrieve video image from robot.")

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
from robot_interface.endpoints.audio_sender import AudioSender

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
import signal
import threading
@@ -37,7 +30,6 @@ class State(object):
self.exit_event = None
self.sockets = []
self.qi_session = None
self.is_speaking = False
def initialize(self):
"""

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import os
from dotenv import load_dotenv
@@ -16,8 +9,6 @@ def get_config(value, env, default, cast=None):
Small utility to get a configuration value, returns `value` if it is not None, else it will try to get the
environment variable cast with `cast`. If the environment variable is not set, it will return `default`.
Special handling for booleans, which are only true if the value of the variable is "true" or "yes", ignoring capitalization.
:param value: The value to check.
:type value: Any
:param env: The environment variable to check.
@@ -35,14 +26,7 @@ def get_config(value, env, default, cast=None):
env = os.environ.get(env, default)
if cast is None or env is None:
if cast is None:
return env
if cast == bool:
if isinstance(env, bool):
return env
if not isinstance(default, bool):
raise ValueError("Default value must be a boolean if the cast type is a boolean.")
return env.lower() == "true" or env.lower() == "yes"
return cast(env)

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that `print` can print Unicode characters in names
import logging
import sys

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
import sys
@@ -14,20 +7,6 @@ except ImportError:
qi = None
def _get_qi_url():
"""
Get the Qi URL from the command line arguments, or None if not given.
"""
if "--qi-url" in sys.argv:
return sys.argv[sys.argv.index("--qi-url") + 1]
for arg in sys.argv:
if arg.startswith("--qi-url="):
return arg[len("--qi-url="):]
return None
def get_qi_session():
"""
Create and return a Qi session if available.
@@ -39,13 +18,12 @@ def get_qi_session():
logging.info("Unable to import qi. Running in stand-alone mode.")
return None
qi_url = _get_qi_url()
if qi_url is None:
if "--qi-url" not in sys.argv:
logging.info("No Qi URL argument given. Running in stand-alone mode.")
return None
try:
app = qi.Application(["--qi-url", qi_url, "--qi-listen-url", "tcp://0.0.0.0:0"])
app = qi.Application()
app.start()
return app.session
except RuntimeError:

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import time

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that we can format strings with Unicode characters
import random
import sys

View File

@@ -1,17 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from mock import patch, MagicMock
import pytest
@pytest.fixture(autouse=True)
def mock_zmq_context():
with patch("zmq.Context") as mock:
mock.instance.return_value = MagicMock()
yield mock

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from mock import patch, mock
from robot_interface.core.config import Settings

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pyaudio
import pytest

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import sys
import mock
@@ -27,109 +20,46 @@ def zmq_context():
yield context
def test_force_speech_clears_queue(mocker):
def test_handle_unimplemented_endpoint(zmq_context):
"""
Tests that a force speech message clears the existing queue
and places the high-priority message at the front.
Tests that the ``ActuationReceiver.handle_message`` method can
handle an unknown or unimplemented endpoint without raising an error.
"""
mocker.patch("threading.Thread")
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tts_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._message_queue.put("old_message_1")
receiver._message_queue.put("old_message_2")
assert receiver._message_queue.qsize() == 2
force_msg = {
"endpoint": "actuate/speech",
"data": "Emergency Notification",
"is_priority": True,
}
receiver.handle_message(force_msg)
assert receiver._message_queue.qsize() == 1
queued_item = receiver._message_queue.get()
assert queued_item == "Emergency Notification"
def test_handle_unimplemented_endpoint(mocker):
"""
Tests handling of unknown endpoints.
"""
mocker.patch("threading.Thread")
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver = ActuationReceiver(zmq_context)
# Should not error
receiver.handle_message({
"endpoint": "some_endpoint_that_definitely_does_not_exist",
"data": None,
})
def test_speech_message_no_data(mocker):
def test_speech_message_no_data(zmq_context, mocker):
"""
Tests that if the message data is empty, the receiver returns immediately
WITHOUT attempting to access the global robot state or session.
Tests that the message handler logs a warning when a speech actuation
request (`actuate/speech`) is received but contains empty string data.
"""
# 1. Prevent background threads from running
mocker.patch("threading.Thread")
# 2. Mock the global state object
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_warn = mocker.patch("logging.warn")
# 3. Create a PropertyMock to track whenever 'qi_session' is accessed
# We attach it to the class type of the mock so it acts like a real property
mock_session_prop = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_session_prop
# 4. Initialize Receiver (Mocking the context to avoid ZMQ errors)
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# 5. Send empty data
receiver = ActuationReceiver(zmq_context)
receiver.handle_message({"endpoint": "actuate/speech", "data": ""})
# 6. Assertion:
# Because the code does `if not text: return` BEFORE `if not state.qi_session`,
# the state property should NEVER be read.
mock_session_prop.assert_not_called()
mock_warn.assert_called_with(mock.ANY)
def test_speech_message_invalid_data(mocker):
def test_speech_message_invalid_data(zmq_context, mocker):
"""
Tests that if the message data is not a string, the function returns.
:param mocker: Description
Tests that the message handler logs a warning when a speech actuation
request (`actuate/speech`) is received with data that is not a string (e.g., a boolean).
"""
mocker.patch("threading.Thread")
mock_warn = mocker.patch("logging.warn")
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_session_prop = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_session_prop
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver = ActuationReceiver(zmq_context)
receiver.handle_message({"endpoint": "actuate/speech", "data": True})
# Because the code does `if not text: return` BEFORE `if not state.qi_session`,
# the state property should NEVER be read.
mock_session_prop.assert_not_called()
mock_warn.assert_called_with(mock.ANY)
def test_speech_no_qi(mocker):
def test_speech_no_qi(zmq_context, mocker):
"""
Tests the actuation receiver's behavior when processing a speech request
but the global state does not have an active QI session.
@@ -139,21 +69,16 @@ def test_speech_no_qi(mocker):
mock_qi_session = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_qi_session
mock_tts_service = mock.Mock()
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._tts_service = mock_tts_service
receiver = ActuationReceiver(zmq_context)
receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."})
receiver._tts_service.assert_not_called()
mock_qi_session.assert_called()
def test_speech(mocker):
def test_speech(zmq_context, mocker):
"""
Tests the core speech actuation functionality by mocking the QI TextToSpeech
service and verifying that the received message is put into the queue.
service and verifying that it is called correctly.
"""
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
@@ -164,182 +89,17 @@ def test_speech(mocker):
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver = ActuationReceiver(zmq_context)
receiver._tts_service = None
receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."})
assert receiver._message_queue.qsize() == 1
mock_state.qi_session.service.assert_called_once_with("ALTextToSpeech")
queued_item = receiver._message_queue.get()
assert queued_item == "Some message to speak."
getattr(mock_qi, "async").assert_called_once()
call_args = getattr(mock_qi, "async").call_args[0]
assert call_args[0] == mock_tts_service.say
assert call_args[1] == "Some message to speak."
def test_speech_priority(mocker):
"""
Tests that a priority speech message is handled correctly by clearing the queue
and placing the priority message at the front.
"""
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tts_service = mock.Mock()
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._message_queue.put("old_message_1")
receiver._message_queue.put("old_message_2")
assert receiver._message_queue.qsize() == 2
priority_msg = {
"endpoint": "actuate/speech",
"data": "Urgent Message",
"is_priority": True,
}
receiver._handle_speech(priority_msg)
assert receiver._message_queue.qsize() == 1
queued_item = receiver._message_queue.get()
assert queued_item == "Urgent Message"
def test_handle_messages_loop(mocker):
"""
Tests the background consumer loop (_handle_messages) processing an item.
Runs SYNCHRONOUSLY to ensure coverage tools pick up the lines.
"""
# Patch Thread so the real background thread NEVER starts automatically
mocker.patch("threading.Thread")
# Mock state
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Setup initial speaking state to False (covers "Started speaking" print)
mock_state.is_speaking = False
# Mock the TextToSpeech service
mock_tts_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
# Initialize receiver (Thread is patched, so no thread starts)
# Use Mock Context to avoid ZMQ errors
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Manually inject service (since lazy loading might handle it, but this is safer)
receiver._tts_service = mock_tts_service
# This ensures the while loop iterates exactly once
mock_state.exit_event.is_set.side_effect = [False, True]
# Put an item in the queue
receiver._message_queue.put("Hello World")
# RUN MANUALLY in the main thread
# This executes the code: while -> try -> get -> if print -> speaking=True -> say
receiver._handle_messages()
# Assertions
assert receiver._message_queue.empty()
mock_tts_service.say.assert_called_with("Hello World")
assert mock_state.is_speaking is True
def test_handle_messages_queue_empty(mocker):
"""
Tests the Queue.Empty exception handler in the consumer loop.
This covers the logic that resets 'state.is_speaking' to False.
"""
# Prevent the real background thread from starting
mocker.patch("threading.Thread")
# Mock the state object
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Setup 'is_speaking' property mock
# We set return_value=True so the code enters the 'if state.is_speaking:' block.
# We use PropertyMock to track when this attribute is set.
type(mock_state).is_speaking = True
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# This ensures the while loop body runs exactly once for our test
mock_state.exit_event.is_set.side_effect = [False, True]
# Force get() to raise Queue.Empty immediately (simulate timeout)
# We patch the 'get' method on the specific queue instance of our receiver
#mocker.patch.object(receiver._message_queue, 'get', side_effect=Queue.Empty)
# Run the loop logic manually (synchronously)
receiver._handle_messages()
# Final Assertion: Verify is_speaking was set to False
# The code execution order is: read (returns True) -> print -> set (to False)
# assert_called_with checks the arguments of the LAST call, which is the setter.
assert mock_state.is_speaking is False
def test_handle_messages_runtime_error(mocker):
"""
Tests the RuntimeError exception handler (e.g. lost WiFi connection).
Uses a Mock ZMQ context to avoid 'Address already in use' errors.
"""
# Patch Thread so we don't accidentally spawn real threads
mocker.patch("threading.Thread")
# Mock the state and logging
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Use a MOCK ZMQ context.
# This prevents the receiver from trying to bind to a real TCP port.
mock_zmq_ctx = mock.Mock()
# Initialize receiver with the mock context
receiver = ActuationReceiver(mock_zmq_ctx)
mock_state.exit_event.is_set.side_effect = [False, True]
receiver._message_queue.put("Test Message")
# Setup: ...BUT the service raises RuntimeError when asked to speak
mock_tts = mock.Mock()
mock_tts.say.side_effect = RuntimeError("Connection lost")
receiver._tts_service = mock_tts
# Run the loop logic manually
receiver._handle_messages()
# Assertions
assert mock_state.exit_event.is_set.called
def test_clear_queue(mocker):
"""
Tests that the clear_queue method properly drains all items from the message queue.
"""
mocker.patch("threading.Thread")
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Populate the queue with multiple items
receiver._message_queue.put("msg1")
receiver._message_queue.put("msg2")
receiver._message_queue.put("msg3")
assert receiver._message_queue.qsize() == 3
# Clear the queue
receiver.clear_queue()
# Assert the queue is empty
assert receiver._message_queue.qsize() == 0
def test_gesture_no_data(zmq_context, mocker):
receiver = ActuationReceiver(zmq_context)

View File

@@ -1,10 +1,4 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
# coding=utf-8
import os
import mock
@@ -33,33 +27,35 @@ def test_no_microphone(zmq_context, mocker):
mock_info_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger.info")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = None
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
sender = AudioSender(zmq_context)
assert sender.capturer.microphone is None
assert sender.microphone is None
sender.capturer.setup()
sender.start()
assert sender.thread is None
mock_info_logger.assert_called()
sender.wait_until_done() # Should return early because we didn't start a thread
def test_unicode_mic_name(zmq_context, mocker):
"""
Tests the robustness of the `AudioSender` when handling microphone names
that contain Unicode characters.
"""
mocker.patch("robot_interface.endpoints.audio_sender.threading")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"• Some Unicode name", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
mock_choose_mic.return_value = {"name": u"• Some Unicode name"}
sender = AudioSender(zmq_context)
assert sender.capturer.microphone is not None
sender.capturer.audio.open = mock.Mock(return_value=mock.Mock())
assert sender.microphone is not None
# `.setup()` logs the name of the microphone. It should not give an error if it contains Unicode
# `.start()` logs the name of the microphone. It should not give an error if it contains Unicode
# symbols.
sender.capturer.setup()
sender.start()
assert sender.thread is not None
sender.wait_until_done() # Should return instantly because we didn't start a real thread
def _fake_read(num_frames):
@@ -77,59 +73,26 @@ def test_sending_audio(mocker):
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock()
send_socket = mock.Mock()
mock_state.is_speaking = False
# If there's something wrong with the microphone, it will raise an IOError when `read`ing.
stream = mock.Mock()
stream.read = _fake_read
sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket
sender.capturer.audio.open = mock.Mock()
sender.capturer.audio.open.return_value = stream
sender.audio.open = mock.Mock()
sender.audio.open.return_value = stream
sender.start()
sender.thread.join()
sender.wait_until_done()
send_socket.assert_called()
def test_no_sending_if_speaking(mocker):
"""
Tests the successful sending of audio data over a ZeroMQ socket.
"""
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock()
send_socket = mock.Mock()
mock_state.is_speaking = True
# If there's something wrong with the microphone, it will raise an IOError when `read`ing.
stream = mock.Mock()
stream.read = _fake_read
sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket
sender.capturer.audio.open = mock.Mock()
sender.capturer.audio.open.return_value = stream
sender.start()
sender.thread.join()
send_socket.assert_not_called()
def _fake_read_error(num_frames):
"""
Helper function to simulate an I/O error during microphone stream reading.
@@ -145,7 +108,6 @@ def test_break_microphone(mocker):
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock()
@@ -157,11 +119,11 @@ def test_break_microphone(mocker):
sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket
sender.capturer.audio.open = mock.Mock()
sender.capturer.audio.open.return_value = stream
sender.audio.open = mock.Mock()
sender.audio.open.return_value = stream
sender.start()
sender.thread.join()
sender.wait_until_done()
send_socket.assert_not_called()
@@ -172,8 +134,6 @@ def test_pyaudio_init_failure(mocker, zmq_context):
"""
# Prevent binding the ZMQ socket
mocker.patch("robot_interface.endpoints.audio_sender.AudioSender.create_socket")
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
# Simulate PyAudio() failing
mocker.patch(
@@ -183,5 +143,5 @@ def test_pyaudio_init_failure(mocker, zmq_context):
sender = AudioSender(zmq_context)
assert sender.capturer.audio is None
assert sender.capturer.microphone is None
assert sender.audio is None
assert sender.microphone is None

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from robot_interface.utils.get_config import get_config
@@ -50,58 +43,3 @@ def test_get_config_casts_default_when_env_missing(monkeypatch):
result = get_config(None, "GET_CONFIG_MISSING", "42", int)
assert result == 42
def test_get_config_unset_boolean_default(monkeypatch):
"""
When the env var is a boolean, and it's not set, ensure it uses the default value.
"""
monkeypatch.delenv("SOME_BOOLEAN_VARIABLE", raising=False)
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == False
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == True
def test_get_config_true_boolean(monkeypatch):
"""
When the env var is a boolean, and its value is "true", "TRUE", "yes", etc., it should return true.
"""
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "TRUE")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "true")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "yes")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "YES")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "TrUE")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
def test_get_config_false_boolean(monkeypatch):
"""
When the env var is a boolean, and its value is not "true", "TRUE", "yes", etc., it should return False.
"""
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "FALSE")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == False
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "false")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == False
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "anything, tbh")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == False

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pytest
import threading
import zmq

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock
import pytest
import zmq

View File

@@ -1,10 +1,4 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
# coding=utf-8
import mock
import pytest

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import sys
# Import module under test
@@ -62,7 +55,7 @@ def test_get_qi_session_runtime_error(monkeypatch):
raise RuntimeError("boom")
class FakeQi:
Application = lambda *args, **kwargs: FakeApp()
Application = lambda self=None: FakeApp()
reload_qi_utils_with(FakeQi())
@@ -87,7 +80,7 @@ def test_get_qi_session_success(monkeypatch):
return True
class FakeQi:
Application = lambda *args, **kwargs: FakeApp()
Application = lambda self=None: FakeApp()
reload_qi_utils_with(FakeQi())

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pytest
from robot_interface.endpoints.receiver_base import ReceiverBase

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock
import zmq
from robot_interface.endpoints.socket_base import SocketBase

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import threading
import signal
import pytest

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import time
import mock

View File

@@ -1,11 +1,5 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
# coding=utf-8
import struct
import mock
import pytest
import zmq
@@ -54,10 +48,8 @@ def test_video_streaming(zmq_context, mocker):
# Pepper's image buffer lives at index 6
mocker.patch.object(settings.video_config, "image_buffer", 6)
test_width = 320
test_height = 240
mock_video_service = mock.Mock()
mock_video_service.getImageRemote.return_value = [test_width, test_height, None, None, None, None, b"fake_img"]
mock_video_service.getImageRemote.return_value = [None]*6 + ["fake_img"]
fake_session = mock.Mock()
fake_session.service.return_value = mock_video_service
@@ -71,16 +63,12 @@ def test_video_streaming(zmq_context, mocker):
sender = VideoSender(zmq_context)
send_socket = mock.Mock()
sender.socket.send_multipart = send_socket
sender.socket.send = send_socket
sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_called_with([
struct.pack('<I', 320),
struct.pack('<I', 240),
b"fake_img"
])
send_socket.assert_called_with("fake_img")
def test_video_receive_error(zmq_context, mocker):
@@ -103,30 +91,9 @@ def test_video_receive_error(zmq_context, mocker):
sender = VideoSender(zmq_context)
send_socket = mock.Mock()
sender.socket.send_multipart = send_socket
sender.socket.send = send_socket
sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_not_called()
def test_video_loop_keyboard_interrupt(zmq_context, mocker):
"""Video loop should handle KeyboardInterrupt gracefully and unsubscribe."""
_patch_basics(mocker)
_patch_exit_event(mocker)
# We mock the video service to raise KeyboardInterrupt when accessed
mock_video_service = mock.Mock()
mock_video_service.getImageRemote.side_effect = KeyboardInterrupt
# Mock logging to verify the specific interrupt message is logged
mock_logger = mocker.patch("robot_interface.endpoints.video_sender.logging")
sender = VideoSender(zmq_context)
# Execute the loop
sender.video_rcv_loop(mock_video_service, "stream_name")
# Verify the 'finally' block executed (unsubscribe)
mock_video_service.unsubscribe.assert_called_with("stream_name")
mock_logger.info.assert_any_call("Unsubscribed from video stream.")