7 Commits

Author SHA1 Message Date
Pim Hutting
eab2481b85 chore: fixed flakiness of tests 2026-01-14 16:34:34 +01:00
Pim Hutting
a55acd57b6 chore: finalized tests, added queue
ref: N25B-387
2025-12-18 12:55:09 +01:00
Storm
5d5c8553c2 test: added test configuration to always mock zmq context to fix zmq port congestion issues
ref: N25B-386
2025-12-16 16:37:36 +01:00
Storm
79db2c77c8 feat: added queue-size log message
ref: N25B-386
2025-12-16 16:17:09 +01:00
Storm
b6f2893c25 style: moved one line
ref: N25B-386
2025-12-16 14:54:02 +01:00
Storm
b3e3a1eb80 feat: implemented force speech functionality in RI and refactored actuation_receiver tests
Before actuation_receiver tests started a receiver with real zmq context. This led to flaky tests because of port congestion issues.

ref: N25B-386
2025-12-16 14:49:44 +01:00
Storm
912af8d821 feat: implemented force speech functionality in RI and refactored actuation_receiver tests
Before actuation_receiver tests started a receiver with real zmq context. This led to flaky tests because of port congestion issues.

ref: N25B-386
2025-12-12 14:38:06 +01:00
37 changed files with 289 additions and 838 deletions

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals from __future__ import unicode_literals
from robot_interface.utils.get_config import get_config from robot_interface.utils.get_config import get_config

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,12 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that we can log texts with Unicode characters from __future__ import unicode_literals # So that we can log texts with Unicode characters
import logging import logging
import time
from threading import Thread from threading import Thread
import Queue import Queue
@@ -14,6 +8,7 @@ import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase from robot_interface.endpoints.receiver_base import ReceiverBase
from robot_interface.state import state from robot_interface.state import state
from robot_interface.core.config import settings from robot_interface.core.config import settings
from robot_interface.endpoints.gesture_settings import GestureTags from robot_interface.endpoints.gesture_settings import GestureTags
@@ -41,8 +36,11 @@ class ActuationReceiver(ReceiverBase):
self._tts_service = None self._tts_service = None
self._animation_service = None self._animation_service = None
self._message_queue = Queue.Queue() self._message_queue = Queue.Queue()
self._gesture_queue = Queue.Queue()
self.message_thread = Thread(target=self._handle_messages) self.message_thread = Thread(target=self._handle_messages)
self.message_thread.start() self.message_thread.start()
self.gesture_thread = Thread(target=self._handle_gestures)
self.gesture_thread.start()
def _handle_speech(self, message): def _handle_speech(self, message):
""" """
@@ -69,7 +67,7 @@ class ActuationReceiver(ReceiverBase):
if not self._tts_service: if not self._tts_service:
self._tts_service = state.qi_session.service("ALTextToSpeech") self._tts_service = state.qi_session.service("ALTextToSpeech")
if message.get("is_priority"): if (message.get("is_priority")):
# Bypass queue and speak immediately # Bypass queue and speak immediately
self.clear_queue() self.clear_queue()
self._message_queue.put(text) self._message_queue.put(text)
@@ -77,6 +75,7 @@ class ActuationReceiver(ReceiverBase):
else: else:
self._message_queue.put(text) self._message_queue.put(text)
def clear_queue(self): def clear_queue(self):
""" """
Safely drains all pending messages from the queue. Safely drains all pending messages from the queue.
@@ -90,6 +89,21 @@ class ActuationReceiver(ReceiverBase):
pass pass
logging.info("Message queue cleared.") logging.info("Message queue cleared.")
def clear_gesture_queue(self):
"""
Safely drains all pending gestures from the gesture queue.
"""
logging.info("Gesture queue size: {}".format(self._gesture_queue.qsize()))
try:
while True:
# Remove items one by one without waiting
self._gesture_queue.get_nowait()
except Queue.Empty:
pass
logging.info("Gesture queue cleared.")
logging.info("Gesture queue size: {}".format(self._gesture_queue.qsize()))
def _handle_gesture(self, message, is_single): def _handle_gesture(self, message, is_single):
""" """
Handle a gesture actuation request. Handle a gesture actuation request.
@@ -130,12 +144,16 @@ class ActuationReceiver(ReceiverBase):
# Play the gesture. Pepper comes with predefined animations like "Wave", "Greet", "Clap" # Play the gesture. Pepper comes with predefined animations like "Wave", "Greet", "Clap"
# You can also create custom animations using Choregraphe and upload them to the robot. # You can also create custom animations using Choregraphe and upload them to the robot.
if (message.get("is_priority")):
# Clear queue and play
self.clear_gesture_queue()
logging.debug("Force playing gesture immediately: {}".format(gesture))
if is_single: if is_single:
logging.debug("Playing single gesture: {}".format(gesture)) logging.debug("Adding single gesture to queue: {}".format(gesture))
getattr(qi, "async")(self._animation_service.run, gesture)
else: else:
logging.debug("Playing tag gesture: {}".format(gesture)) logging.debug("Adding tag gesture to queue: {}".format(gesture))
getattr(qi, "async")(self._animation_service.runTag, gesture) self._gesture_queue.put(gesture)
def handle_message(self, message): def handle_message(self, message):
""" """
@@ -155,13 +173,25 @@ class ActuationReceiver(ReceiverBase):
while not state.exit_event.is_set(): while not state.exit_event.is_set():
try: try:
text = self._message_queue.get(timeout=0.1) text = self._message_queue.get(timeout=0.1)
if not state.is_speaking: print("Started speaking.")
state.is_speaking = True state.is_speaking = True
self._tts_service.say(text) self._tts_service.say(text)
except Queue.Empty: except Queue.Empty:
if state.is_speaking: print("Finished speaking.")
state.is_speaking = False state.is_speaking = False
except RuntimeError: except RuntimeError:
logging.error("Lost connection to Pepper. Please check if you're connected to the " logging.warn("Lost connection to Pepper. Please check if you're connected to the local WiFi and restart this application.")
"local WiFi and restart this application.") state.exit_event.set()
def _handle_gestures(self):
while not state.exit_event.is_set():
try:
gesture = self._gesture_queue.get(timeout=0.1)
self._animation_service.run(gesture)
except Queue.Empty:
pass
except RuntimeError:
logging.warn("Lost connection to Pepper. Please check if you're connected to the local WiFi and restart this application.")
state.exit_event.set() state.exit_event.set()
def endpoint_description(self): def endpoint_description(self):

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that `logging` can use Unicode characters in names from __future__ import unicode_literals # So that `logging` can use Unicode characters in names
import threading import threading
import logging import logging
@@ -84,33 +77,21 @@ class AudioSender(SocketBase):
chunk = audio_settings.chunk_size # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD chunk = audio_settings.chunk_size # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD
# Docs say this only raises an error if neither `input` nor `output` is True # Docs say this only raises an error if neither `input` nor `output` is True
def open_stream(): stream = self.audio.open(
return self.audio.open( format=pyaudio.paFloat32,
format=pyaudio.paFloat32, channels=audio_settings.channels,
channels=audio_settings.channels, rate=audio_settings.sample_rate,
rate=audio_settings.sample_rate, input=True,
input=True, input_device_index=self.microphone["index"],
input_device_index=self.microphone["index"], frames_per_buffer=chunk,
frames_per_buffer=chunk, )
)
stream = None
try: try:
# Test in case exit_event was set while waiting
if not state.exit_event.is_set():
stream = open_stream()
while not state.exit_event.is_set(): while not state.exit_event.is_set():
data = stream.read(chunk) data = stream.read(chunk)
if (state.is_speaking): continue # Do not send audio while the robot is speaking
self.socket.send(data) self.socket.send(data)
except IOError as e: except IOError as e:
logger.error("Stopped listening: failed to get audio from microphone.", exc_info=e) logger.error("Stopped listening: failed to get audio from microphone.", exc_info=e)
finally: finally:
if stream: stream.stop_stream()
try: stream.close()
stream.stop_stream()
stream.close()
except IOError:
pass # Ignore errors on closing

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
class GestureTags: class GestureTags:
tags = ["above", "affirmative", "afford", "agitated", "all", "allright", "alright", "any", tags = ["above", "affirmative", "afford", "agitated", "all", "allright", "alright", "any",
"assuage", "assuage", "attemper", "back", "bashful", "beg", "beseech", "blank", "assuage", "assuage", "attemper", "back", "bashful", "beg", "beseech", "blank",

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import zmq import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase from robot_interface.endpoints.receiver_base import ReceiverBase
@@ -79,30 +72,6 @@ class MainReceiver(ReceiverBase):
return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."} return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."}
@staticmethod
def _handle_pause(message):
"""
Handle a pause request. Pauses or resumes the video and audio streams.
:param message: The pause request message.
:type message: dict
:return: A response dictionary indicating success.
:rtype: dict[str, str]
"""
if message.get("data"):
if state.active_event.is_set():
state.active_event.clear()
return {"endpoint": "pause", "data": "Streams paused."}
else:
return {"endpoint": "pause", "data": "Streams are already paused."}
else:
if not state.active_event.is_set():
state.active_event.set()
return {"endpoint": "pause", "data": "Streams resumed."}
else:
return {"endpoint": "pause", "data": "Streams are already running."}
def handle_message(self, message): def handle_message(self, message):
""" """
Main entry point for handling incoming messages. Main entry point for handling incoming messages.
@@ -119,7 +88,5 @@ class MainReceiver(ReceiverBase):
return self._handle_ping(message) return self._handle_ping(message)
elif message["endpoint"].startswith("negotiate"): elif message["endpoint"].startswith("negotiate"):
return self._handle_negotiation(message) return self._handle_negotiation(message)
elif message["endpoint"] == "pause":
return self._handle_pause(message)
return {"endpoint": "error", "data": "The requested endpoint is not supported."} return {"endpoint": "error", "data": "The requested endpoint is not supported."}

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from robot_interface.endpoints.socket_base import SocketBase from robot_interface.endpoints.socket_base import SocketBase

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from abc import ABCMeta from abc import ABCMeta
import zmq import zmq

View File

@@ -1,22 +1,11 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import struct
import zmq import zmq
import threading import threading
import logging import logging
import numpy as np
import cv2
from robot_interface.endpoints.socket_base import SocketBase from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state from robot_interface.state import state
from robot_interface.core.config import settings from robot_interface.core.config import settings
class VideoSender(SocketBase): class VideoSender(SocketBase):
""" """
Video sender endpoint, responsible for sending video frames. Video sender endpoint, responsible for sending video frames.
@@ -29,7 +18,7 @@ class VideoSender(SocketBase):
""" """
def __init__(self, zmq_context, port=settings.agent_settings.video_sender_port): def __init__(self, zmq_context, port=settings.agent_settings.video_sender_port):
super(VideoSender, self).__init__("video") super(VideoSender, self).__init__("video")
self.create_socket(zmq_context, zmq.PUB, port, [(zmq.SNDHWM,3)]) self.create_socket(zmq_context, zmq.PUB, port, [(zmq.CONFLATE,1)])
def start_video_rcv(self): def start_video_rcv(self):
""" """
@@ -39,9 +28,6 @@ class VideoSender(SocketBase):
""" """
if not state.qi_session: if not state.qi_session:
logging.info("No Qi session available. Not starting video loop.") logging.info("No Qi session available. Not starting video loop.")
logging.info("Starting test video stream from local webcam.")
thread = threading.Thread(target=self.test_video_stream)
thread.start()
return return
video = state.qi_session.service("ALVideoDevice") video = state.qi_session.service("ALVideoDevice")
@@ -55,40 +41,9 @@ class VideoSender(SocketBase):
thread = threading.Thread(target=self.video_rcv_loop, args=(video, vid_stream_name)) thread = threading.Thread(target=self.video_rcv_loop, args=(video, vid_stream_name))
thread.start() thread.start()
def test_video_stream(self):
"""
Test function to send video from a local webcam instead of the robot.
"""
cap = cv2.VideoCapture(0)
if not cap.isOpened():
logging.error("Could not open webcam for video stream test.")
return
while not state.exit_event.is_set():
ret, frame = cap.read()
if not ret:
logging.warning("Failed to read frame from webcam.")
continue
if cv2.waitKey(1) & 0xFF == ord('q'): # << Add this: Updates the window
break
height, width, channels = frame.shape
pixel_data = frame.tobytes()
width_bytes = struct.pack('<I', width)
height_bytes = struct.pack('<I', height)
self.socket.send_multipart([width_bytes, height_bytes, pixel_data])
cap.release()
def video_rcv_loop(self, vid_service, vid_stream_name): def video_rcv_loop(self, vid_service, vid_stream_name):
""" """
The main loop of retrieving video images from the robot. The main loop of retrieving video images from the robot.
Sends the image data over the ZMQ socket in 3 parts: image width, image height and raw image bytes.
:param vid_service: The video service object that the active Qi session is connected to. :param vid_service: The video service object that the active Qi session is connected to.
:type vid_service: Object (Qi service object) :type vid_service: Object (Qi service object)
@@ -96,23 +51,10 @@ class VideoSender(SocketBase):
:param vid_stream_name: The name of a camera subscription on the video service object vid_service :param vid_stream_name: The name of a camera subscription on the video service object vid_service
:type vid_stream_name: str :type vid_stream_name: str
""" """
try: while not state.exit_event.is_set():
while not state.exit_event.is_set(): try:
try: img = vid_service.getImageRemote(vid_stream_name)
img = vid_service.getImageRemote(vid_stream_name) #Possibly limit images sent if queuing issues arise
if img is not None: self.socket.send(img[settings.video_config.image_buffer])
image_bytes = img[6] except:
width = img[0] logging.warn("Failed to retrieve video image from robot.")
height = img[1]
width_bytes = struct.pack('<I', width)
height_bytes = struct.pack('<I', height)
self.socket.send_multipart([width_bytes, height_bytes, image_bytes])
except:
logging.warn("Failed to retrieve video image from robot.")
except KeyboardInterrupt:
logging.info("Video receiving loop interrupted by user.")
finally:
vid_service.unsubscribe(vid_stream_name)
logging.info("Unsubscribed from video stream.")

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging import logging
from robot_interface.endpoints.audio_sender import AudioSender from robot_interface.endpoints.audio_sender import AudioSender
@@ -91,7 +84,6 @@ def main():
context = zmq.Context() context = zmq.Context()
state.initialize() state.initialize()
state.active_event.set()
try: try:
main_loop(context) main_loop(context)

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging import logging
import signal import signal
import threading import threading
@@ -56,8 +49,6 @@ class State(object):
signal.signal(signal.SIGINT, handle_exit) signal.signal(signal.SIGINT, handle_exit)
signal.signal(signal.SIGTERM, handle_exit) signal.signal(signal.SIGTERM, handle_exit)
self.active_event = threading.Event()
self.qi_session = get_qi_session() self.qi_session = get_qi_session()
self.is_initialized = True self.is_initialized = True

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that `print` can print Unicode characters in names from __future__ import unicode_literals # So that `print` can print Unicode characters in names
import logging import logging
import sys import sys

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging import logging
import sys import sys

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import time import time

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that we can format strings with Unicode characters from __future__ import unicode_literals # So that we can format strings with Unicode characters
import random import random
import sys import sys

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from mock import patch, MagicMock from mock import patch, MagicMock
import pytest import pytest

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from mock import patch, mock from mock import patch, mock
from robot_interface.core.config import Settings from robot_interface.core.config import Settings

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pyaudio import pyaudio
import pytest import pytest

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,32 +1,13 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import sys import sys
import time
import mock import mock
import pytest import pytest
import zmq import zmq
import Queue
from robot_interface.endpoints.actuation_receiver import ActuationReceiver from robot_interface.endpoints.actuation_receiver import ActuationReceiver
from robot_interface.endpoints.gesture_settings import GestureTags from robot_interface.endpoints.gesture_settings import GestureTags
@pytest.fixture
def zmq_context():
"""
A pytest fixture that creates and yields a ZMQ context.
:return: An initialized ZeroMQ context.
:rtype: zmq.Context
"""
context = zmq.Context()
yield context
def test_force_speech_clears_queue(mocker): def test_force_speech_clears_queue(mocker):
""" """
Tests that a force speech message clears the existing queue Tests that a force speech message clears the existing queue
@@ -61,7 +42,6 @@ def test_force_speech_clears_queue(mocker):
queued_item = receiver._message_queue.get() queued_item = receiver._message_queue.get()
assert queued_item == "Emergency Notification" assert queued_item == "Emergency Notification"
def test_handle_unimplemented_endpoint(mocker): def test_handle_unimplemented_endpoint(mocker):
""" """
Tests handling of unknown endpoints. Tests handling of unknown endpoints.
@@ -77,7 +57,6 @@ def test_handle_unimplemented_endpoint(mocker):
"data": None, "data": None,
}) })
def test_speech_message_no_data(mocker): def test_speech_message_no_data(mocker):
""" """
Tests that if the message data is empty, the receiver returns immediately Tests that if the message data is empty, the receiver returns immediately
@@ -235,7 +214,7 @@ def test_handle_messages_loop(mocker):
receiver._tts_service = mock_tts_service receiver._tts_service = mock_tts_service
# This ensures the while loop iterates exactly once # This ensures the while loop iterates exactly once
mock_state.exit_event.is_set.side_effect = [False, True] mock_state.exit_event.is_set.side_effect = [False, True, True , True, True]
# Put an item in the queue # Put an item in the queue
receiver._message_queue.put("Hello World") receiver._message_queue.put("Hello World")
@@ -250,40 +229,24 @@ def test_handle_messages_loop(mocker):
assert mock_state.is_speaking is True assert mock_state.is_speaking is True
def test_handle_messages_queue_empty(mocker): def test_handle_gestures_runtime_error(mocker):
"""
Tests the Queue.Empty exception handler in the consumer loop.
This covers the logic that resets 'state.is_speaking' to False.
"""
# Prevent the real background thread from starting
mocker.patch("threading.Thread") mocker.patch("threading.Thread")
# Mock the state object
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Setup 'is_speaking' property mock # Use a side_effect that returns False then True thereafter
# We set return_value=True so the code enters the 'if state.is_speaking:' block. mock_state.exit_event.is_set.side_effect = [False, True, True, True, True]
# We use PropertyMock to track when this attribute is set.
type(mock_state).is_speaking = True
mock_zmq_ctx = mock.Mock() mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx) receiver = ActuationReceiver(mock_zmq_ctx)
# This ensures the while loop body runs exactly once for our test # ... rest of your setup ...
mock_state.exit_event.is_set.side_effect = [False, True] mock_anim = mock.Mock()
mock_anim.run.side_effect = RuntimeError("Wifi Lost")
receiver._animation_service = mock_anim
# Force get() to raise Queue.Empty immediately (simulate timeout) receiver._gesture_queue.put("wave")
# We patch the 'get' method on the specific queue instance of our receiver
#mocker.patch.object(receiver._message_queue, 'get', side_effect=Queue.Empty)
# Run the loop logic manually (synchronously)
receiver._handle_messages()
# Final Assertion: Verify is_speaking was set to False
# The code execution order is: read (returns True) -> print -> set (to False)
# assert_called_with checks the arguments of the LAST call, which is the setter.
assert mock_state.is_speaking is False
receiver._handle_gestures()
def test_handle_messages_runtime_error(mocker): def test_handle_messages_runtime_error(mocker):
""" """
@@ -303,7 +266,7 @@ def test_handle_messages_runtime_error(mocker):
# Initialize receiver with the mock context # Initialize receiver with the mock context
receiver = ActuationReceiver(mock_zmq_ctx) receiver = ActuationReceiver(mock_zmq_ctx)
mock_state.exit_event.is_set.side_effect = [False, True] mock_state.exit_event.is_set.side_effect = [False, True, True, True ]
receiver._message_queue.put("Test Message") receiver._message_queue.put("Test Message")
@@ -341,54 +304,61 @@ def test_clear_queue(mocker):
# Assert the queue is empty # Assert the queue is empty
assert receiver._message_queue.qsize() == 0 assert receiver._message_queue.qsize() == 0
def test_gesture_no_data(zmq_context, mocker): def test_gesture_no_data(mocker):
receiver = ActuationReceiver(zmq_context) mock_zmq = mock.Mock()
receiver = ActuationReceiver(mock_zmq)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": ""}, True) receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": ""}, True)
# Just ensuring no crash # Just ensuring no crash
def test_gesture_invalid_data(zmq_context, mocker): def test_gesture_invalid_data(mocker):
receiver = ActuationReceiver(zmq_context) mock_zmq = mock.Mock()
receiver = ActuationReceiver(mock_zmq)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": 123}, True) receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": 123}, True)
# No crash expected # No crash expected
def test_gesture_single_not_found(zmq_context, mocker): def test_gesture_single_not_found(mocker):
mock_zmq = mock.Mock()
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["wave", "bow"] # allowed single gestures mock_tags.single_gestures = ["wave", "bow"] # allowed single gestures
receiver = ActuationReceiver(mock_zmq)
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "unknown_gesture"}, True) receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "unknown_gesture"}, True)
# No crash expected # No crash expected
def test_gesture_tag_not_found(zmq_context, mocker): def test_gesture_tag_not_found(mocker):
mock_zmq = mock.Mock()
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.tags = ["happy", "sad"] mock_tags.tags = ["happy", "sad"]
receiver = ActuationReceiver(zmq_context) receiver = ActuationReceiver(mock_zmq)
receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "not_a_tag"}, False) receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "not_a_tag"}, False)
# No crash expected # No crash expected
def test_gesture_no_qi_session(zmq_context, mocker): def test_gesture_no_qi_session( mocker):
mock_zmq = mock.Mock()
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_state.qi_session = None mock_state.qi_session = None
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["hello"] mock_tags.single_gestures = ["hello"]
receiver = ActuationReceiver(zmq_context) receiver = ActuationReceiver(mock_zmq)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "hello"}, True) receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "hello"}, True)
# No crash, path returns early # No crash, path returns early
def test_gesture_single_success(zmq_context, mocker): def test_gesture_single_success(mocker):
mock_zmq = mock.Mock()
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Allow loops to run
mock_state.exit_event.is_set.return_value = False
mock_qi = mock.Mock() mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi sys.modules["qi"] = mock_qi
# Setup gesture settings
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["wave"] mock_tags.single_gestures = ["wave"]
@@ -396,17 +366,25 @@ def test_gesture_single_success(zmq_context, mocker):
mock_state.qi_session = mock.Mock() mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context) receiver = ActuationReceiver(mock_zmq)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "wave"}, True) receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "wave"}, True)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer") time.sleep(0.2)
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run mock_state.qi_session.service.assert_called_with("ALAnimationPlayer")
assert getattr(mock_qi, "async").call_args[0][1] == "wave" mock_animation_service.run.assert_called_with("wave")
# CLEANUP: Signal exit AND join threads
mock_state.exit_event.is_set.return_value = True
receiver.message_thread.join(timeout=1.0)
receiver.gesture_thread.join(timeout=1.0)
def test_gesture_tag_success(zmq_context, mocker): def test_gesture_tag_success(mocker):
mock_zmq = mock.Mock()
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_state.exit_event.is_set.return_value = False
mock_qi = mock.Mock() mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi sys.modules["qi"] = mock_qi
@@ -417,21 +395,26 @@ def test_gesture_tag_success(zmq_context, mocker):
mock_state.qi_session = mock.Mock() mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context) receiver = ActuationReceiver(mock_zmq)
receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "greeting"}, False) receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "greeting"}, False)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer") time.sleep(0.2)
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.runTag mock_state.qi_session.service.assert_called_with("ALAnimationPlayer")
assert getattr(mock_qi, "async").call_args[0][1] == "greeting" mock_animation_service.run.assert_called_with("greeting")
# CLEANUP: Signal exit AND join threads
mock_state.exit_event.is_set.return_value = True
receiver.message_thread.join(timeout=1.0)
receiver.gesture_thread.join(timeout=1.0)
def test_handle_message_all_routes(zmq_context, mocker): def test_handle_message_all_routes(mocker):
""" """
Ensures all handle_message endpoint branches route correctly. Ensures all handle_message endpoint branches route correctly.
""" """
receiver = ActuationReceiver(zmq_context) mock_zmq = mock.Mock()
receiver = ActuationReceiver(mock_zmq)
mock_speech = mocker.patch.object(receiver, "_handle_speech") mock_speech = mocker.patch.object(receiver, "_handle_speech")
mock_gesture = mocker.patch.object(receiver, "_handle_gesture") mock_gesture = mocker.patch.object(receiver, "_handle_gesture")
@@ -443,12 +426,13 @@ def test_handle_message_all_routes(zmq_context, mocker):
assert mock_gesture.call_count == 2 assert mock_gesture.call_count == 2
def test_endpoint_description(zmq_context, mocker): def test_endpoint_description(mocker):
mock_zmq = mock.Mock()
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.tags = ["happy"] mock_tags.tags = ["happy"]
mock_tags.single_gestures = ["wave"] mock_tags.single_gestures = ["wave"]
receiver = ActuationReceiver(zmq_context) receiver = ActuationReceiver(mock_zmq)
desc = receiver.endpoint_description() desc = receiver.endpoint_description()
assert "gestures" in desc assert "gestures" in desc
@@ -458,26 +442,20 @@ def test_endpoint_description(zmq_context, mocker):
assert desc["single_gestures"] == ["wave"] assert desc["single_gestures"] == ["wave"]
def test_gesture_single_real_gesturetags(zmq_context, mocker): def test_gesture_single_real_gesturetags(mocker):
""" mock_zmq = mock.Mock()
Uses the real GestureTags (no mocking) to ensure the receiver
references GestureTags.single_gestures correctly.
"""
# Ensure qi session exists so we pass the early return
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_state.qi_session = mock.Mock() mock_state.qi_session = mock.Mock()
mock_state.exit_event.is_set.return_value = False
# Mock qi.async to avoid real async calls
mock_qi = mock.Mock() mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi sys.modules["qi"] = mock_qi
# Mock animation service
mock_animation_service = mock.Mock() mock_animation_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context) receiver = ActuationReceiver(mock_zmq)
# Pick a real gesture from GestureTags.single_gestures
assert len(GestureTags.single_gestures) > 0, "GestureTags.single_gestures must not be empty" assert len(GestureTags.single_gestures) > 0, "GestureTags.single_gestures must not be empty"
gesture = GestureTags.single_gestures[0] gesture = GestureTags.single_gestures[0]
@@ -486,8 +464,85 @@ def test_gesture_single_real_gesturetags(zmq_context, mocker):
is_single=True, is_single=True,
) )
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer") time.sleep(0.2)
getattr(mock_qi, "async").assert_called_once() mock_state.qi_session.service.assert_called_with("ALAnimationPlayer")
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run mock_animation_service.run.assert_called_with(gesture)
assert getattr(mock_qi, "async").call_args[0][1] == gesture
# CLEANUP: Signal exit AND join threads
mock_state.exit_event.is_set.return_value = True
receiver.message_thread.join(timeout=1.0)
receiver.gesture_thread.join(timeout=1.0)
def test_clear_gesture_queue(mocker):
# Prevent background threads from eating the items
mocker.patch("threading.Thread")
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Populate the queue
receiver._gesture_queue.put("gesture1")
receiver._gesture_queue.put("gesture2")
assert receiver._gesture_queue.qsize() == 2
# Clear the queue
receiver.clear_gesture_queue()
# Assert the queue is empty
assert receiver._gesture_queue.qsize() == 0
def test_gesture_priority_clears_queue(mocker):
mocker.patch("threading.Thread")
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Mock QI and Tags so valid checks pass
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["urgent_wave"]
# Setup Animation Service
mock_anim = mock.Mock()
mock_state.qi_session.service.return_value = mock_anim
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Pre-fill queue with "slow" gestures
receiver._gesture_queue.put("slow_gesture_1")
receiver._gesture_queue.put("slow_gesture_2")
assert receiver._gesture_queue.qsize() == 2
# Send priority gesture
priority_msg = {
"endpoint": "actuate/gesture/single",
"data": "urgent_wave",
"is_priority": True,
}
receiver._handle_gesture(priority_msg, is_single=True)
# Assert old items are gone and only new one remains
assert receiver._gesture_queue.qsize() == 1
assert receiver._gesture_queue.get() == "urgent_wave"
def test_handle_gestures_loop_empty(mocker):
mocker.patch("threading.Thread")
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Run loop exactly once
mock_state.exit_event.is_set.side_effect = [False, True, True, True]
# We don't put anything in the queue, so .get(timeout=0.1) will raise Queue.Empty.
# The code should catch it and pass.
receiver._handle_gestures()
# If we reached here without raising an exception, the test passes.
# We can assert that the queue is still valid/empty.
assert receiver._gesture_queue.empty()

View File

@@ -1,10 +1,4 @@
# -*- coding: utf-8 -*- # coding=utf-8
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import os import os
import mock import mock
@@ -39,6 +33,7 @@ def test_no_microphone(zmq_context, mocker):
sender.start() sender.start()
assert sender.thread is None assert sender.thread is None
mock_info_logger.assert_called()
sender.wait_until_done() # Should return early because we didn't start a thread sender.wait_until_done() # Should return early because we didn't start a thread
@@ -78,12 +73,11 @@ def test_sending_audio(mocker):
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.exit_event.is_set.side_effect = [False, False, True] mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock() mock_zmq_context = mock.Mock()
send_socket = mock.Mock() send_socket = mock.Mock()
mock_state.is_speaking = False
# If there's something wrong with the microphone, it will raise an IOError when `read`ing. # If there's something wrong with the microphone, it will raise an IOError when `read`ing.
stream = mock.Mock() stream = mock.Mock()
stream.read = _fake_read stream.read = _fake_read
@@ -98,247 +92,6 @@ def test_sending_audio(mocker):
send_socket.assert_called() send_socket.assert_called()
# SENDING PAUSE RESUME?
def test_stream_initial_wait_exit(mocker):
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.exit_event.is_set.return_value = True
mock_state.active_event.is_set.return_value = False
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender._stream()
mock_pyaudio_instance.open.assert_not_called()
def test_stream_pause_and_resume(mocker):
mock_stream = mock.Mock()
mock_stream.read.return_value = b"data"
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
mock_pyaudio_instance.open.return_value = mock_stream
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.active_event.is_set.return_value = False
mock_state.exit_event.is_set.side_effect = [False, False, False, True]
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender.socket = mock.Mock()
sender._stream()
assert mock_pyaudio_instance.open.call_count == 2
assert mock_stream.close.call_count == 2
assert mock_stream.stop_stream.call_count == 2
assert mock_state.active_event.wait.called
def test_stream_exit_during_pause(mocker):
mock_stream = mock.Mock()
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
mock_pyaudio_instance.open.return_value = mock_stream
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.active_event.is_set.return_value = False
mock_state.exit_event.is_set.side_effect = [False, False, True]
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender._stream()
assert mock_pyaudio_instance.open.call_count == 1
assert mock_stream.close.call_count == 1
def test_stream_read_error_recovery(mocker):
stream_fail = mock.Mock()
stream_fail.read.side_effect = IOError("Overflow")
stream_ok = mock.Mock()
stream_ok.read.return_value = b"data"
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
mock_pyaudio_instance.open.side_effect = [stream_fail, stream_ok]
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.active_event.is_set.return_value = True
mock_state.exit_event.is_set.side_effect = [False, False, False, True]
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender.socket = mock.Mock()
sender._stream()
stream_fail.close.assert_called()
assert mock_pyaudio_instance.open.call_count == 2
sender.socket.send.assert_called_with(b"data")
def test_stream_fatal_error(mocker):
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
mock_pyaudio_instance.open.side_effect = IOError("Fatal error")
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.active_event.is_set.return_value = True
mock_state.exit_event.is_set.return_value = False
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender._stream()
def test_wait_until_done(mocker):
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender.wait_until_done()
mock_thread = mocker.Mock()
sender.thread = mock_thread
sender.wait_until_done()
mock_thread.join.assert_called_once()
assert sender.thread is None
def test_stream_pause_close_error(mocker):
"""
Tests that an IOError during stream closure (when pausing) is ignored,
covering the 'pass' statement in the pause logic.
"""
mock_stream = mock.Mock()
mock_stream.read.return_value = b"data"
# Raise IOError when stopping the stream during pause
mock_stream.stop_stream.side_effect = IOError("Failed to stop")
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
mock_pyaudio_instance.open.return_value = mock_stream
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
# 1. First False triggers the pause block
# 2. Second True resumes the loop
mock_state.active_event.is_set.side_effect = [False, True]
mock_state.exit_event.is_set.side_effect = [False, False, False, True]
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender.socket = mock.Mock()
sender._stream()
# Verification: The error should be swallowed, and the stream should re-open
assert mock_stream.stop_stream.called
assert mock_pyaudio_instance.open.call_count == 2
def test_stream_finally_close_error(mocker):
"""
Tests that an IOError during stream closure in the finally block is ignored,
covering the 'pass' statement in the finally logic.
"""
mock_stream = mock.Mock()
mock_stream.read.return_value = b"data"
# Raise IOError when stopping the stream at exit
mock_stream.stop_stream.side_effect = IOError("Cleanup failed")
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
mock_pyaudio_instance.open.return_value = mock_stream
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.active_event.is_set.return_value = True
mock_state.exit_event.is_set.side_effect = [False, False, True]
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender.socket = mock.Mock()
# Run
sender._stream()
# Assert: Should finish without raising exception despite the IOError in finally
assert mock_stream.stop_stream.called
def test_stream_recovery_failure(mocker):
"""
Tests the case where recovering from a read error (re-opening stream) also fails.
This ensures the outer try-except catches exceptions from the inner except block.
"""
mock_stream_initial = mock.Mock()
# Trigger the read error logic
mock_stream_initial.read.side_effect = IOError("Read failed")
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
# First open works, Second open (recovery) fails fatally
mock_pyaudio_instance.open.side_effect = [
mock_stream_initial,
IOError("Recovery failed")
]
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.active_event.is_set.return_value = True
mock_state.exit_event.is_set.return_value = False
mock_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger")
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender.socket = mock.Mock()
sender._stream()
# Assert we hit the outer error log
mock_logger.error.assert_called()
def test_no_sending_if_speaking(mocker):
"""
Tests the successful sending of audio data over a ZeroMQ socket.
"""
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock()
send_socket = mock.Mock()
mock_state.is_speaking = True
# If there's something wrong with the microphone, it will raise an IOError when `read`ing.
stream = mock.Mock()
stream.read = _fake_read
sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket
sender.audio.open = mock.Mock()
sender.audio.open.return_value = stream
sender.start()
sender.wait_until_done()
send_socket.assert_not_called()
def _fake_read_error(num_frames): def _fake_read_error(num_frames):
""" """

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from robot_interface.utils.get_config import get_config from robot_interface.utils.get_config import get_config

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pytest import pytest
import threading import threading
import zmq import zmq

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock import mock
import pytest import pytest
import zmq import zmq

View File

@@ -1,10 +1,4 @@
# -*- coding: utf-8 -*- # coding=utf-8
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock import mock
import pytest import pytest

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import sys import sys
# Import module under test # Import module under test

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pytest import pytest
from robot_interface.endpoints.receiver_base import ReceiverBase from robot_interface.endpoints.receiver_base import ReceiverBase

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock import mock
import zmq import zmq
from robot_interface.endpoints.socket_base import SocketBase from robot_interface.endpoints.socket_base import SocketBase

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import threading import threading
import signal import signal
import pytest import pytest

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import time import time
import mock import mock

View File

@@ -1,171 +1,99 @@
# -*- coding: utf-8 -*- # coding=utf-8
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock import mock
import pytest import pytest
import zmq import zmq
from robot_interface.endpoints.video_sender import VideoSender from robot_interface.endpoints.video_sender import VideoSender
from robot_interface.state import state
from robot_interface.core.config import settings
@pytest.fixture @pytest.fixture
def zmq_context(): def zmq_context():
""" """Provide a ZMQ context."""
Yields a real ZMQ context for socket creation. yield zmq.Context()
"""
context = zmq.Context()
yield context
context.term()
def test_init_defaults(zmq_context, mocker):
"""
Test initialization of the VideoSender.
"""
# We patch settings to ensure valid port access inside the class logic,
# although the default arg is evaluated at import time.
mocker.patch("robot_interface.endpoints.video_sender.settings")
mock_zmq = mock.Mock() def _patch_basics(mocker):
sender = VideoSender(mock_zmq) """Common patches: prevent real threads, port binds, and state errors."""
mocker.patch("robot_interface.endpoints.socket_base.zmq.Socket.bind")
mocker.patch("robot_interface.endpoints.video_sender.threading.Thread")
mocker.patch.object(state, "is_initialized", True)
# Verify socket type is PUB
assert sender.identifier == "video"
def test_start_no_qi_session(mocker): def _patch_exit_event(mocker):
""" """Make exit_event stop the loop after one iteration."""
Test that the loop does not start if no Qi session is available. fake_event = mock.Mock()
""" fake_event.is_set.side_effect = [False, True]
# Mock state to return None for qi_session mocker.patch.object(state, "exit_event", fake_event)
mock_state = mocker.patch("robot_interface.endpoints.video_sender.state")
mock_state.qi_session = None
mock_threading = mocker.patch("robot_interface.endpoints.video_sender.threading")
mock_zmq = mock.Mock() def test_no_qi_session(zmq_context, mocker):
sender = VideoSender(mock_zmq) """Video loop should not start without a qi_session."""
_patch_basics(mocker)
mocker.patch.object(state, "qi_session", None)
sender = VideoSender(zmq_context)
sender.start_video_rcv() sender.start_video_rcv()
# Assertions assert not hasattr(sender, "thread")
mock_threading.Thread.assert_not_called()
def test_start_success(mocker):
""" def test_video_streaming(zmq_context, mocker):
Test successful startup of the video receiver thread. """VideoSender should send retrieved image data."""
""" _patch_basics(mocker)
# Mock the Qi Session and Service _patch_exit_event(mocker)
mock_state = mocker.patch("robot_interface.endpoints.video_sender.state")
mock_session = mock.Mock() # Pepper's image buffer lives at index 6
mock_state.qi_session = mock_session mocker.patch.object(settings.video_config, "image_buffer", 6)
mock_video_service = mock.Mock() mock_video_service = mock.Mock()
mock_session.service.return_value = mock_video_service mock_video_service.getImageRemote.return_value = [None]*6 + ["fake_img"]
mock_video_service.subscribeCamera.return_value = "test_subscriber_id"
# Mock Settings fake_session = mock.Mock()
mock_settings = mocker.patch("robot_interface.endpoints.video_sender.settings") fake_session.service.return_value = mock_video_service
mock_settings.video_config.camera_index = 0 mocker.patch.object(state, "qi_session", fake_session)
mock_settings.video_config.resolution = 2
mock_settings.video_config.color_space = 11
mock_settings.video_config.fps = 30
mock_settings.video_config.stream_name = "test_stream"
mock_threading = mocker.patch("robot_interface.endpoints.video_sender.threading") mocker.patch.object(
fake_session.service("ALVideoDevice"),
"subscribeCamera",
return_value="stream_name"
)
sender = VideoSender(zmq_context)
send_socket = mock.Mock()
sender.socket.send = send_socket
# Run
mock_zmq = mock.Mock()
sender = VideoSender(mock_zmq)
sender.start_video_rcv() sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name")
# Assertions send_socket.assert_called_with("fake_img")
mock_session.service.assert_called_with("ALVideoDevice")
mock_video_service.subscribeCamera.assert_called_with("test_stream", 0, 2, 11, 30)
mock_threading.Thread.assert_called_once()
# Verify arguments passed to the thread target
call_args = mock_threading.Thread.call_args[1]
assert call_args["target"] == sender.video_rcv_loop
assert call_args["args"] == (mock_video_service, "test_subscriber_id")
# Ensure thread was started def test_video_receive_error(zmq_context, mocker):
mock_threading.Thread.return_value.start.assert_called_once() """Errors retrieving images should not call send()."""
_patch_basics(mocker)
_patch_exit_event(mocker)
def test_video_loop_happy_path(mocker): mock_video_service = mock.Mock()
""" mock_video_service.getImageRemote.side_effect = Exception("boom")
Test the main loop: Wait -> Get Image -> Send -> Repeat/Exit.
"""
# Mock settings for image buffer index
mock_settings = mocker.patch("robot_interface.endpoints.video_sender.settings")
mock_settings.video_config.image_buffer = 6
# Mock Video Service to return a fake image structure fake_session = mock.Mock()
# Standard NaoQi image is a list, binary data is usually at index 6 fake_session.service.return_value = mock_video_service
fake_image_data = b"binary_jpeg_data" mocker.patch.object(state, "qi_session", fake_session)
fake_image_list = [0] * 7
fake_image_list[6] = fake_image_data
mock_service = mock.Mock() mocker.patch.object(
mock_service.getImageRemote.return_value = fake_image_list fake_session.service("ALVideoDevice"),
"subscribeCamera",
return_value="stream_name"
)
# Mock Events: sender = VideoSender(zmq_context)
# exit_event: False (start), False (loop once), True (break) send_socket = mock.Mock()
mock_state = mocker.patch("robot_interface.endpoints.video_sender.state") sender.socket.send = send_socket
mock_state.exit_event.is_set.side_effect = [False, False, True]
# Run sender.start_video_rcv()
mock_zmq = mock.Mock() sender.video_rcv_loop(mock_video_service, "stream_name")
sender = VideoSender(mock_zmq)
sender.socket = mock.Mock() # Mock the socket to verify send
sender.video_rcv_loop(mock_service, "sub_id") send_socket.assert_not_called()
# Assertions
mock_state.active_event.wait.assert_called()
mock_service.getImageRemote.assert_called_with("sub_id")
sender.socket.send.assert_called_with(fake_image_data)
def test_video_loop_exit_during_wait(zmq_context, mocker):
"""
Test that the loop breaks immediately if exit_event is set while waiting.
"""
mock_service = mock.Mock()
mock_state = mocker.patch("robot_interface.endpoints.video_sender.state")
# 1. Loop check: False (enter loop)
# 2. Wait happens (mock returns instantly)
# 3. Post-wait check: True (break)
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zqm = mock.Mock()
sender = VideoSender(mock_zqm)
sender.video_rcv_loop(mock_service, "sub_id")
# Assert we never tried to get an image
mock_service.getImageRemote.assert_not_called()
def test_video_loop_exception_handling(zmq_context, mocker):
"""
Test that exceptions during image retrieval are caught and logged,
and do not crash the thread.
"""
mock_settings = mocker.patch("robot_interface.endpoints.video_sender.settings")
mock_service = mock.Mock()
# First call raises Exception, Second call works (if we allowed it, but we exit)
mock_service.getImageRemote.side_effect = Exception("Camera disconnected")
mock_state = mocker.patch("robot_interface.endpoints.video_sender.state")
# Loop runs once then exits
mock_state.exit_event.is_set.side_effect = [False, False, True]
mock_zmq = mock.Mock()
sender = VideoSender(mock_zmq)
sender.socket = mock.Mock()
sender.video_rcv_loop(mock_service, "sub_id")
# Assertions
# Ensure loop didn't crash; it should have completed the iteration and checked exit_event
assert mock_state.exit_event.is_set.call_count >= 2