3 Commits

Author SHA1 Message Date
Storm
19b7efec05 chore: implemented test video function
If there is no qi session, the webcam of the device is used to send video.
2026-01-22 11:36:34 +01:00
Pim Hutting
da97eb8a1a Merge branch 'feat/robot-speech-agent-force-speech' into 'dev'
feat: implemented forced speech and speech queue

See merge request ics/sp/2025/n25b/pepperplus-ri!23
2026-01-14 14:26:39 +00:00
Luijkx,S.O.H. (Storm)
e51cf8fe65 feat: implemented forced speech and speech queue 2026-01-14 14:26:38 +00:00
9 changed files with 493 additions and 34 deletions

View File

@@ -7,3 +7,4 @@ sphinx
sphinx_rtd_theme
pre-commit
python-dotenv
opencv-python==4.1.2.30

View File

@@ -1,11 +1,12 @@
from __future__ import unicode_literals # So that we can log texts with Unicode characters
import logging
from threading import Thread
import Queue
import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase
from robot_interface.state import state
from robot_interface.core.config import settings
from robot_interface.endpoints.gesture_settings import GestureTags
@@ -32,6 +33,9 @@ class ActuationReceiver(ReceiverBase):
self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options
self._tts_service = None
self._animation_service = None
self._message_queue = Queue.Queue()
self.message_thread = Thread(target=self._handle_messages)
self.message_thread.start()
def _handle_speech(self, message):
"""
@@ -58,8 +62,26 @@ class ActuationReceiver(ReceiverBase):
if not self._tts_service:
self._tts_service = state.qi_session.service("ALTextToSpeech")
# Returns instantly. Messages received while speaking will be queued.
getattr(qi, "async")(self._tts_service.say, text)
if message.get("is_priority"):
# Bypass queue and speak immediately
self.clear_queue()
self._message_queue.put(text)
logging.debug("Force speaking immediately: {}".format(text))
else:
self._message_queue.put(text)
def clear_queue(self):
"""
Safely drains all pending messages from the queue.
"""
logging.info("Message queue size: {}".format(self._message_queue.qsize()))
try:
while True:
# Remove items one by one without waiting
self._message_queue.get_nowait()
except Queue.Empty:
pass
logging.info("Message queue cleared.")
def _handle_gesture(self, message, is_single):
"""
@@ -122,6 +144,19 @@ class ActuationReceiver(ReceiverBase):
if message["endpoint"] == "actuate/gesture/single":
self._handle_gesture(message, True)
def _handle_messages(self):
while not state.exit_event.is_set():
try:
text = self._message_queue.get(timeout=0.1)
state.is_speaking = True
self._tts_service.say(text)
except Queue.Empty:
state.is_speaking = False
except RuntimeError:
logging.error("Lost connection to Pepper. Please check if you're connected to the "
"local WiFi and restart this application.")
state.exit_event.set()
def endpoint_description(self):
"""
Extend the default endpoint description with gesture tags.

View File

@@ -89,6 +89,7 @@ class AudioSender(SocketBase):
try:
while not state.exit_event.is_set():
data = stream.read(chunk)
if (state.is_speaking): continue # Do not send audio while the robot is speaking
self.socket.send(data)
except IOError as e:
logger.error("Stopped listening: failed to get audio from microphone.", exc_info=e)

View File

@@ -1,11 +1,13 @@
import zmq
import threading
import logging
import cv2
from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state
from robot_interface.core.config import settings
class VideoSender(SocketBase):
"""
Video sender endpoint, responsible for sending video frames.
@@ -27,7 +29,9 @@ class VideoSender(SocketBase):
Will not start if no qi session is available.
"""
if not state.qi_session:
logging.info("No Qi session available. Not starting video loop.")
logging.info("No Qi session available. Starting video from webcam.")
thread = threading.Thread(target=self.test_video_stream)
thread.start()
return
video = state.qi_session.service("ALVideoDevice")
@@ -58,3 +62,29 @@ class VideoSender(SocketBase):
self.socket.send(img[settings.video_config.image_buffer])
except:
logging.warn("Failed to retrieve video image from robot.")
def test_video_stream(self):
"""
Test function to send video from local webcam instead of the robot.
"""
cap = cv2.VideoCapture(0)
if not cap.isOpened():
logging.error("Could not open webcam for video stream test.")
return
while not state.exit_event.is_set():
ret, frame = cap.read()
if not ret:
logging.warning("Failed to read frame from webcam.")
continue
cv2.waitKey(1)
small_frame = cv2.resize(frame, (320, 240), interpolation=cv2.INTER_AREA)
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 70]
_, buffer = cv2.imencode('.jpg', small_frame, encode_param)
self.socket.send(buffer.tobytes())
cap.release()

View File

@@ -30,6 +30,7 @@ class State(object):
self.exit_event = None
self.sockets = []
self.qi_session = None
self.is_speaking = False
def initialize(self):
"""

10
test/conftest.py Normal file
View File

@@ -0,0 +1,10 @@
from mock import patch, MagicMock
import pytest
@pytest.fixture(autouse=True)
def mock_zmq_context():
with patch("zmq.Context") as mock:
mock.instance.return_value = MagicMock()
yield mock

View File

@@ -20,46 +20,109 @@ def zmq_context():
yield context
def test_handle_unimplemented_endpoint(zmq_context):
def test_force_speech_clears_queue(mocker):
"""
Tests that the ``ActuationReceiver.handle_message`` method can
handle an unknown or unimplemented endpoint without raising an error.
Tests that a force speech message clears the existing queue
and places the high-priority message at the front.
"""
receiver = ActuationReceiver(zmq_context)
# Should not error
mocker.patch("threading.Thread")
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tts_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._message_queue.put("old_message_1")
receiver._message_queue.put("old_message_2")
assert receiver._message_queue.qsize() == 2
force_msg = {
"endpoint": "actuate/speech",
"data": "Emergency Notification",
"is_priority": True,
}
receiver.handle_message(force_msg)
assert receiver._message_queue.qsize() == 1
queued_item = receiver._message_queue.get()
assert queued_item == "Emergency Notification"
def test_handle_unimplemented_endpoint(mocker):
"""
Tests handling of unknown endpoints.
"""
mocker.patch("threading.Thread")
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver.handle_message({
"endpoint": "some_endpoint_that_definitely_does_not_exist",
"data": None,
})
def test_speech_message_no_data(zmq_context, mocker):
def test_speech_message_no_data(mocker):
"""
Tests that the message handler logs a warning when a speech actuation
request (`actuate/speech`) is received but contains empty string data.
Tests that if the message data is empty, the receiver returns immediately
WITHOUT attempting to access the global robot state or session.
"""
mock_warn = mocker.patch("logging.warn")
# 1. Prevent background threads from running
mocker.patch("threading.Thread")
# 2. Mock the global state object
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
receiver = ActuationReceiver(zmq_context)
# 3. Create a PropertyMock to track whenever 'qi_session' is accessed
# We attach it to the class type of the mock so it acts like a real property
mock_session_prop = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_session_prop
# 4. Initialize Receiver (Mocking the context to avoid ZMQ errors)
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# 5. Send empty data
receiver.handle_message({"endpoint": "actuate/speech", "data": ""})
mock_warn.assert_called_with(mock.ANY)
# 6. Assertion:
# Because the code does `if not text: return` BEFORE `if not state.qi_session`,
# the state property should NEVER be read.
mock_session_prop.assert_not_called()
def test_speech_message_invalid_data(zmq_context, mocker):
def test_speech_message_invalid_data(mocker):
"""
Tests that the message handler logs a warning when a speech actuation
request (`actuate/speech`) is received with data that is not a string (e.g., a boolean).
Tests that if the message data is not a string, the function returns.
:param mocker: Description
"""
mock_warn = mocker.patch("logging.warn")
mocker.patch("threading.Thread")
receiver = ActuationReceiver(zmq_context)
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_session_prop = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_session_prop
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver.handle_message({"endpoint": "actuate/speech", "data": True})
mock_warn.assert_called_with(mock.ANY)
# Because the code does `if not text: return` BEFORE `if not state.qi_session`,
# the state property should NEVER be read.
mock_session_prop.assert_not_called()
def test_speech_no_qi(zmq_context, mocker):
def test_speech_no_qi(mocker):
"""
Tests the actuation receiver's behavior when processing a speech request
but the global state does not have an active QI session.
@@ -69,16 +132,21 @@ def test_speech_no_qi(zmq_context, mocker):
mock_qi_session = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_qi_session
receiver = ActuationReceiver(zmq_context)
mock_tts_service = mock.Mock()
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._tts_service = mock_tts_service
receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."})
mock_qi_session.assert_called()
receiver._tts_service.assert_not_called()
def test_speech(zmq_context, mocker):
def test_speech(mocker):
"""
Tests the core speech actuation functionality by mocking the QI TextToSpeech
service and verifying that it is called correctly.
service and verifying that the received message is put into the queue.
"""
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
@@ -89,17 +157,182 @@ def test_speech(zmq_context, mocker):
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
receiver = ActuationReceiver(zmq_context)
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._tts_service = None
receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."})
mock_state.qi_session.service.assert_called_once_with("ALTextToSpeech")
assert receiver._message_queue.qsize() == 1
getattr(mock_qi, "async").assert_called_once()
call_args = getattr(mock_qi, "async").call_args[0]
assert call_args[0] == mock_tts_service.say
assert call_args[1] == "Some message to speak."
queued_item = receiver._message_queue.get()
assert queued_item == "Some message to speak."
def test_speech_priority(mocker):
"""
Tests that a priority speech message is handled correctly by clearing the queue
and placing the priority message at the front.
"""
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tts_service = mock.Mock()
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._message_queue.put("old_message_1")
receiver._message_queue.put("old_message_2")
assert receiver._message_queue.qsize() == 2
priority_msg = {
"endpoint": "actuate/speech",
"data": "Urgent Message",
"is_priority": True,
}
receiver._handle_speech(priority_msg)
assert receiver._message_queue.qsize() == 1
queued_item = receiver._message_queue.get()
assert queued_item == "Urgent Message"
def test_handle_messages_loop(mocker):
"""
Tests the background consumer loop (_handle_messages) processing an item.
Runs SYNCHRONOUSLY to ensure coverage tools pick up the lines.
"""
# Patch Thread so the real background thread NEVER starts automatically
mocker.patch("threading.Thread")
# Mock state
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Setup initial speaking state to False (covers "Started speaking" print)
mock_state.is_speaking = False
# Mock the TextToSpeech service
mock_tts_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
# Initialize receiver (Thread is patched, so no thread starts)
# Use Mock Context to avoid ZMQ errors
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Manually inject service (since lazy loading might handle it, but this is safer)
receiver._tts_service = mock_tts_service
# This ensures the while loop iterates exactly once
mock_state.exit_event.is_set.side_effect = [False, True]
# Put an item in the queue
receiver._message_queue.put("Hello World")
# RUN MANUALLY in the main thread
# This executes the code: while -> try -> get -> if print -> speaking=True -> say
receiver._handle_messages()
# Assertions
assert receiver._message_queue.empty()
mock_tts_service.say.assert_called_with("Hello World")
assert mock_state.is_speaking is True
def test_handle_messages_queue_empty(mocker):
"""
Tests the Queue.Empty exception handler in the consumer loop.
This covers the logic that resets 'state.is_speaking' to False.
"""
# Prevent the real background thread from starting
mocker.patch("threading.Thread")
# Mock the state object
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Setup 'is_speaking' property mock
# We set return_value=True so the code enters the 'if state.is_speaking:' block.
# We use PropertyMock to track when this attribute is set.
type(mock_state).is_speaking = True
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# This ensures the while loop body runs exactly once for our test
mock_state.exit_event.is_set.side_effect = [False, True]
# Force get() to raise Queue.Empty immediately (simulate timeout)
# We patch the 'get' method on the specific queue instance of our receiver
#mocker.patch.object(receiver._message_queue, 'get', side_effect=Queue.Empty)
# Run the loop logic manually (synchronously)
receiver._handle_messages()
# Final Assertion: Verify is_speaking was set to False
# The code execution order is: read (returns True) -> print -> set (to False)
# assert_called_with checks the arguments of the LAST call, which is the setter.
assert mock_state.is_speaking is False
def test_handle_messages_runtime_error(mocker):
"""
Tests the RuntimeError exception handler (e.g. lost WiFi connection).
Uses a Mock ZMQ context to avoid 'Address already in use' errors.
"""
# Patch Thread so we don't accidentally spawn real threads
mocker.patch("threading.Thread")
# Mock the state and logging
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Use a MOCK ZMQ context.
# This prevents the receiver from trying to bind to a real TCP port.
mock_zmq_ctx = mock.Mock()
# Initialize receiver with the mock context
receiver = ActuationReceiver(mock_zmq_ctx)
mock_state.exit_event.is_set.side_effect = [False, True]
receiver._message_queue.put("Test Message")
# Setup: ...BUT the service raises RuntimeError when asked to speak
mock_tts = mock.Mock()
mock_tts.say.side_effect = RuntimeError("Connection lost")
receiver._tts_service = mock_tts
# Run the loop logic manually
receiver._handle_messages()
# Assertions
assert mock_state.exit_event.is_set.called
def test_clear_queue(mocker):
"""
Tests that the clear_queue method properly drains all items from the message queue.
"""
mocker.patch("threading.Thread")
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Populate the queue with multiple items
receiver._message_queue.put("msg1")
receiver._message_queue.put("msg2")
receiver._message_queue.put("msg3")
assert receiver._message_queue.qsize() == 3
# Clear the queue
receiver.clear_queue()
# Assert the queue is empty
assert receiver._message_queue.qsize() == 0
def test_gesture_no_data(zmq_context, mocker):
receiver = ActuationReceiver(zmq_context)

View File

@@ -77,7 +77,8 @@ def test_sending_audio(mocker):
mock_zmq_context = mock.Mock()
send_socket = mock.Mock()
mock_state.is_speaking = False
# If there's something wrong with the microphone, it will raise an IOError when `read`ing.
stream = mock.Mock()
stream.read = _fake_read
@@ -93,6 +94,36 @@ def test_sending_audio(mocker):
send_socket.assert_called()
def test_no_sending_if_speaking(mocker):
"""
Tests the successful sending of audio data over a ZeroMQ socket.
"""
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock()
send_socket = mock.Mock()
mock_state.is_speaking = True
# If there's something wrong with the microphone, it will raise an IOError when `read`ing.
stream = mock.Mock()
stream.read = _fake_read
sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket
sender.audio.open = mock.Mock()
sender.audio.open.return_value = stream
sender.start()
sender.wait_until_done()
send_socket.assert_not_called()
def _fake_read_error(num_frames):
"""
Helper function to simulate an I/O error during microphone stream reading.

View File

@@ -97,3 +97,120 @@ def test_video_receive_error(zmq_context, mocker):
sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_not_called()
def test_video_stream_camera_fail(zmq_context, mocker):
"""
Test that the function logs an error and returns early if
the webcam cannot be opened.
"""
_patch_basics(mocker)
# Mock cv2 and logging
mock_cv2 = mocker.patch("robot_interface.endpoints.video_sender.cv2")
mock_logging = mocker.patch("robot_interface.endpoints.video_sender.logging")
# Setup the mock capture to fail isOpened()
mock_cap = mock.Mock()
mock_cap.isOpened.return_value = False
mock_cv2.VideoCapture.return_value = mock_cap
sender = VideoSender(zmq_context)
sender.test_video_stream()
# Assertions
mock_cv2.VideoCapture.assert_called_with(0)
# Ensure the loop was never entered and cleanup didn't happen
assert not mock_cap.read.called
assert not mock_cap.release.called
def test_video_stream_read_fail(zmq_context, mocker):
"""
Test that the function logs a warning and continues the loop
if a specific frame fails to read.
"""
_patch_basics(mocker)
_patch_exit_event(mocker) # Run loop exactly once
mock_cv2 = mocker.patch("robot_interface.endpoints.video_sender.cv2")
mock_logging = mocker.patch("robot_interface.endpoints.video_sender.logging")
# Setup capture to open successfully, but fail the read()
mock_cap = mock.Mock()
mock_cap.isOpened.return_value = True
# Return (False, None) simulating a failed frame read
mock_cap.read.return_value = (False, None)
mock_cv2.VideoCapture.return_value = mock_cap
sender = VideoSender(zmq_context)
# Mock the socket to ensure nothing is sent
sender.socket = mock.Mock()
sender.test_video_stream()
# Ensure we skipped the processing steps
assert not mock_cv2.resize.called
assert not sender.socket.send.called
# Ensure cleanup happened at the end
mock_cap.release.assert_called_once()
def test_video_stream_success(zmq_context, mocker):
"""
Test the happy path: Frame read -> Resize -> Encode -> Send.
"""
_patch_basics(mocker)
_patch_exit_event(mocker) # Run loop exactly once
mock_cv2 = mocker.patch("robot_interface.endpoints.video_sender.cv2")
# Setup constants usually found in cv2
mock_cv2.IMWRITE_JPEG_QUALITY = 1
mock_cv2.INTER_AREA = 2
# Setup capture to work perfectly
mock_cap = mock.Mock()
mock_cap.isOpened.return_value = True
fake_frame = "original_frame_data"
mock_cap.read.return_value = (True, fake_frame)
mock_cv2.VideoCapture.return_value = mock_cap
# Setup Resize and Encode
mock_cv2.resize.return_value = "small_frame_data"
# Mock buffer behavior
mock_buffer = mock.Mock()
mock_buffer.tobytes.return_value = b"encoded_bytes"
# imencode returns (retval, buffer)
mock_cv2.imencode.return_value = (True, mock_buffer)
sender = VideoSender(zmq_context)
sender.socket = mock.Mock()
sender.test_video_stream()
# Assertions
# 1. Check waitKey (the 1ms delay)
mock_cv2.waitKey.assert_called_with(1)
# 2. Check Resize logic
mock_cv2.resize.assert_called_with(
fake_frame,
(320, 240),
interpolation=mock_cv2.INTER_AREA
)
# 3. Check Encode logic
mock_cv2.imencode.assert_called_with(
'.jpg',
"small_frame_data",
[mock_cv2.IMWRITE_JPEG_QUALITY, 70]
)
# 4. Check Socket Send
sender.socket.send.assert_called_with(b"encoded_bytes")
# 5. Check Cleanup
mock_cap.release.assert_called_once()