7 Commits

Author SHA1 Message Date
Pim Hutting
eab2481b85 chore: fixed flakiness of tests 2026-01-14 16:34:34 +01:00
Pim Hutting
a55acd57b6 chore: finalized tests, added queue
ref: N25B-387
2025-12-18 12:55:09 +01:00
Storm
5d5c8553c2 test: added test configuration to always mock zmq context to fix zmq port congestion issues
ref: N25B-386
2025-12-16 16:37:36 +01:00
Storm
79db2c77c8 feat: added queue-size log message
ref: N25B-386
2025-12-16 16:17:09 +01:00
Storm
b6f2893c25 style: moved one line
ref: N25B-386
2025-12-16 14:54:02 +01:00
Storm
b3e3a1eb80 feat: implemented force speech functionality in RI and refactored actuation_receiver tests
Before actuation_receiver tests started a receiver with real zmq context. This led to flaky tests because of port congestion issues.

ref: N25B-386
2025-12-16 14:49:44 +01:00
Storm
912af8d821 feat: implemented force speech functionality in RI and refactored actuation_receiver tests
Before actuation_receiver tests started a receiver with real zmq context. This led to flaky tests because of port congestion issues.

ref: N25B-386
2025-12-12 14:38:06 +01:00
4 changed files with 468 additions and 90 deletions

View File

@@ -1,6 +1,9 @@
from __future__ import unicode_literals # So that we can log texts with Unicode characters from __future__ import unicode_literals # So that we can log texts with Unicode characters
import logging import logging
import time
from threading import Thread
import Queue
import zmq import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase from robot_interface.endpoints.receiver_base import ReceiverBase
@@ -32,6 +35,12 @@ class ActuationReceiver(ReceiverBase):
self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options
self._tts_service = None self._tts_service = None
self._animation_service = None self._animation_service = None
self._message_queue = Queue.Queue()
self._gesture_queue = Queue.Queue()
self.message_thread = Thread(target=self._handle_messages)
self.message_thread.start()
self.gesture_thread = Thread(target=self._handle_gestures)
self.gesture_thread.start()
def _handle_speech(self, message): def _handle_speech(self, message):
""" """
@@ -58,8 +67,42 @@ class ActuationReceiver(ReceiverBase):
if not self._tts_service: if not self._tts_service:
self._tts_service = state.qi_session.service("ALTextToSpeech") self._tts_service = state.qi_session.service("ALTextToSpeech")
# Returns instantly. Messages received while speaking will be queued. if (message.get("is_priority")):
getattr(qi, "async")(self._tts_service.say, text) # Bypass queue and speak immediately
self.clear_queue()
self._message_queue.put(text)
logging.debug("Force speaking immediately: {}".format(text))
else:
self._message_queue.put(text)
def clear_queue(self):
"""
Safely drains all pending messages from the queue.
"""
logging.info("Message queue size: {}".format(self._message_queue.qsize()))
try:
while True:
# Remove items one by one without waiting
self._message_queue.get_nowait()
except Queue.Empty:
pass
logging.info("Message queue cleared.")
def clear_gesture_queue(self):
"""
Safely drains all pending gestures from the gesture queue.
"""
logging.info("Gesture queue size: {}".format(self._gesture_queue.qsize()))
try:
while True:
# Remove items one by one without waiting
self._gesture_queue.get_nowait()
except Queue.Empty:
pass
logging.info("Gesture queue cleared.")
logging.info("Gesture queue size: {}".format(self._gesture_queue.qsize()))
def _handle_gesture(self, message, is_single): def _handle_gesture(self, message, is_single):
""" """
@@ -101,12 +144,16 @@ class ActuationReceiver(ReceiverBase):
# Play the gesture. Pepper comes with predefined animations like "Wave", "Greet", "Clap" # Play the gesture. Pepper comes with predefined animations like "Wave", "Greet", "Clap"
# You can also create custom animations using Choregraphe and upload them to the robot. # You can also create custom animations using Choregraphe and upload them to the robot.
if (message.get("is_priority")):
# Clear queue and play
self.clear_gesture_queue()
logging.debug("Force playing gesture immediately: {}".format(gesture))
if is_single: if is_single:
logging.debug("Playing single gesture: {}".format(gesture)) logging.debug("Adding single gesture to queue: {}".format(gesture))
getattr(qi, "async")(self._animation_service.run, gesture)
else: else:
logging.debug("Playing tag gesture: {}".format(gesture)) logging.debug("Adding tag gesture to queue: {}".format(gesture))
getattr(qi, "async")(self._animation_service.runTag, gesture) self._gesture_queue.put(gesture)
def handle_message(self, message): def handle_message(self, message):
""" """
@@ -122,6 +169,31 @@ class ActuationReceiver(ReceiverBase):
if message["endpoint"] == "actuate/gesture/single": if message["endpoint"] == "actuate/gesture/single":
self._handle_gesture(message, True) self._handle_gesture(message, True)
def _handle_messages(self):
while not state.exit_event.is_set():
try:
text = self._message_queue.get(timeout=0.1)
if not state.is_speaking: print("Started speaking.")
state.is_speaking = True
self._tts_service.say(text)
except Queue.Empty:
if state.is_speaking: print("Finished speaking.")
state.is_speaking = False
except RuntimeError:
logging.warn("Lost connection to Pepper. Please check if you're connected to the local WiFi and restart this application.")
state.exit_event.set()
def _handle_gestures(self):
while not state.exit_event.is_set():
try:
gesture = self._gesture_queue.get(timeout=0.1)
self._animation_service.run(gesture)
except Queue.Empty:
pass
except RuntimeError:
logging.warn("Lost connection to Pepper. Please check if you're connected to the local WiFi and restart this application.")
state.exit_event.set()
def endpoint_description(self): def endpoint_description(self):
""" """
Extend the default endpoint description with gesture tags. Extend the default endpoint description with gesture tags.

View File

@@ -30,6 +30,7 @@ class State(object):
self.exit_event = None self.exit_event = None
self.sockets = [] self.sockets = []
self.qi_session = None self.qi_session = None
self.is_speaking = False
def initialize(self): def initialize(self):
""" """

10
test/conftest.py Normal file
View File

@@ -0,0 +1,10 @@
from mock import patch, MagicMock
import pytest
@pytest.fixture(autouse=True)
def mock_zmq_context():
with patch("zmq.Context") as mock:
mock.instance.return_value = MagicMock()
yield mock

View File

@@ -1,65 +1,114 @@
import sys import sys
import time
import mock import mock
import pytest import pytest
import zmq import zmq
import Queue
from robot_interface.endpoints.actuation_receiver import ActuationReceiver from robot_interface.endpoints.actuation_receiver import ActuationReceiver
from robot_interface.endpoints.gesture_settings import GestureTags from robot_interface.endpoints.gesture_settings import GestureTags
def test_force_speech_clears_queue(mocker):
@pytest.fixture
def zmq_context():
""" """
A pytest fixture that creates and yields a ZMQ context. Tests that a force speech message clears the existing queue
and places the high-priority message at the front.
:return: An initialized ZeroMQ context.
:rtype: zmq.Context
""" """
context = zmq.Context() mocker.patch("threading.Thread")
yield context mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tts_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
def test_handle_unimplemented_endpoint(zmq_context): # Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._message_queue.put("old_message_1")
receiver._message_queue.put("old_message_2")
assert receiver._message_queue.qsize() == 2
force_msg = {
"endpoint": "actuate/speech",
"data": "Emergency Notification",
"is_priority": True,
}
receiver.handle_message(force_msg)
assert receiver._message_queue.qsize() == 1
queued_item = receiver._message_queue.get()
assert queued_item == "Emergency Notification"
def test_handle_unimplemented_endpoint(mocker):
""" """
Tests that the ``ActuationReceiver.handle_message`` method can Tests handling of unknown endpoints.
handle an unknown or unimplemented endpoint without raising an error.
""" """
receiver = ActuationReceiver(zmq_context) mocker.patch("threading.Thread")
# Should not error
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver.handle_message({ receiver.handle_message({
"endpoint": "some_endpoint_that_definitely_does_not_exist", "endpoint": "some_endpoint_that_definitely_does_not_exist",
"data": None, "data": None,
}) })
def test_speech_message_no_data(mocker):
def test_speech_message_no_data(zmq_context, mocker):
""" """
Tests that the message handler logs a warning when a speech actuation Tests that if the message data is empty, the receiver returns immediately
request (`actuate/speech`) is received but contains empty string data. WITHOUT attempting to access the global robot state or session.
""" """
mock_warn = mocker.patch("logging.warn") # 1. Prevent background threads from running
mocker.patch("threading.Thread")
# 2. Mock the global state object
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
receiver = ActuationReceiver(zmq_context) # 3. Create a PropertyMock to track whenever 'qi_session' is accessed
# We attach it to the class type of the mock so it acts like a real property
mock_session_prop = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_session_prop
# 4. Initialize Receiver (Mocking the context to avoid ZMQ errors)
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# 5. Send empty data
receiver.handle_message({"endpoint": "actuate/speech", "data": ""}) receiver.handle_message({"endpoint": "actuate/speech", "data": ""})
mock_warn.assert_called_with(mock.ANY) # 6. Assertion:
# Because the code does `if not text: return` BEFORE `if not state.qi_session`,
# the state property should NEVER be read.
mock_session_prop.assert_not_called()
def test_speech_message_invalid_data(zmq_context, mocker): def test_speech_message_invalid_data(mocker):
""" """
Tests that the message handler logs a warning when a speech actuation Tests that if the message data is not a string, the function returns.
request (`actuate/speech`) is received with data that is not a string (e.g., a boolean). :param mocker: Description
""" """
mock_warn = mocker.patch("logging.warn") mocker.patch("threading.Thread")
receiver = ActuationReceiver(zmq_context) mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_session_prop = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_session_prop
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver.handle_message({"endpoint": "actuate/speech", "data": True}) receiver.handle_message({"endpoint": "actuate/speech", "data": True})
mock_warn.assert_called_with(mock.ANY) # Because the code does `if not text: return` BEFORE `if not state.qi_session`,
# the state property should NEVER be read.
mock_session_prop.assert_not_called()
def test_speech_no_qi(mocker):
def test_speech_no_qi(zmq_context, mocker):
""" """
Tests the actuation receiver's behavior when processing a speech request Tests the actuation receiver's behavior when processing a speech request
but the global state does not have an active QI session. but the global state does not have an active QI session.
@@ -69,16 +118,21 @@ def test_speech_no_qi(zmq_context, mocker):
mock_qi_session = mock.PropertyMock(return_value=None) mock_qi_session = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_qi_session type(mock_state).qi_session = mock_qi_session
receiver = ActuationReceiver(zmq_context) mock_tts_service = mock.Mock()
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._tts_service = mock_tts_service
receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."})
mock_qi_session.assert_called() receiver._tts_service.assert_not_called()
def test_speech(zmq_context, mocker): def test_speech(mocker):
""" """
Tests the core speech actuation functionality by mocking the QI TextToSpeech Tests the core speech actuation functionality by mocking the QI TextToSpeech
service and verifying that it is called correctly. service and verifying that the received message is put into the queue.
""" """
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
@@ -89,66 +143,222 @@ def test_speech(zmq_context, mocker):
mock_state.qi_session = mock.Mock() mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service mock_state.qi_session.service.return_value = mock_tts_service
receiver = ActuationReceiver(zmq_context) mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._tts_service = None receiver._tts_service = None
receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."})
mock_state.qi_session.service.assert_called_once_with("ALTextToSpeech") assert receiver._message_queue.qsize() == 1
getattr(mock_qi, "async").assert_called_once() queued_item = receiver._message_queue.get()
call_args = getattr(mock_qi, "async").call_args[0] assert queued_item == "Some message to speak."
assert call_args[0] == mock_tts_service.say
assert call_args[1] == "Some message to speak." def test_speech_priority(mocker):
"""
Tests that a priority speech message is handled correctly by clearing the queue
and placing the priority message at the front.
"""
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tts_service = mock.Mock()
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._message_queue.put("old_message_1")
receiver._message_queue.put("old_message_2")
assert receiver._message_queue.qsize() == 2
priority_msg = {
"endpoint": "actuate/speech",
"data": "Urgent Message",
"is_priority": True,
}
receiver._handle_speech(priority_msg)
assert receiver._message_queue.qsize() == 1
queued_item = receiver._message_queue.get()
assert queued_item == "Urgent Message"
def test_handle_messages_loop(mocker):
"""
Tests the background consumer loop (_handle_messages) processing an item.
Runs SYNCHRONOUSLY to ensure coverage tools pick up the lines.
"""
# Patch Thread so the real background thread NEVER starts automatically
mocker.patch("threading.Thread")
# Mock state
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Setup initial speaking state to False (covers "Started speaking" print)
mock_state.is_speaking = False
# Mock the TextToSpeech service
mock_tts_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
# Initialize receiver (Thread is patched, so no thread starts)
# Use Mock Context to avoid ZMQ errors
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Manually inject service (since lazy loading might handle it, but this is safer)
receiver._tts_service = mock_tts_service
# This ensures the while loop iterates exactly once
mock_state.exit_event.is_set.side_effect = [False, True, True , True, True]
# Put an item in the queue
receiver._message_queue.put("Hello World")
# RUN MANUALLY in the main thread
# This executes the code: while -> try -> get -> if print -> speaking=True -> say
receiver._handle_messages()
# Assertions
assert receiver._message_queue.empty()
mock_tts_service.say.assert_called_with("Hello World")
assert mock_state.is_speaking is True
def test_gesture_no_data(zmq_context, mocker): def test_handle_gestures_runtime_error(mocker):
receiver = ActuationReceiver(zmq_context) mocker.patch("threading.Thread")
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Use a side_effect that returns False then True thereafter
mock_state.exit_event.is_set.side_effect = [False, True, True, True, True]
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# ... rest of your setup ...
mock_anim = mock.Mock()
mock_anim.run.side_effect = RuntimeError("Wifi Lost")
receiver._animation_service = mock_anim
receiver._gesture_queue.put("wave")
receiver._handle_gestures()
def test_handle_messages_runtime_error(mocker):
"""
Tests the RuntimeError exception handler (e.g. lost WiFi connection).
Uses a Mock ZMQ context to avoid 'Address already in use' errors.
"""
# Patch Thread so we don't accidentally spawn real threads
mocker.patch("threading.Thread")
# Mock the state and logging
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Use a MOCK ZMQ context.
# This prevents the receiver from trying to bind to a real TCP port.
mock_zmq_ctx = mock.Mock()
# Initialize receiver with the mock context
receiver = ActuationReceiver(mock_zmq_ctx)
mock_state.exit_event.is_set.side_effect = [False, True, True, True ]
receiver._message_queue.put("Test Message")
# Setup: ...BUT the service raises RuntimeError when asked to speak
mock_tts = mock.Mock()
mock_tts.say.side_effect = RuntimeError("Connection lost")
receiver._tts_service = mock_tts
# Run the loop logic manually
receiver._handle_messages()
# Assertions
assert mock_state.exit_event.is_set.called
def test_clear_queue(mocker):
"""
Tests that the clear_queue method properly drains all items from the message queue.
"""
mocker.patch("threading.Thread")
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Populate the queue with multiple items
receiver._message_queue.put("msg1")
receiver._message_queue.put("msg2")
receiver._message_queue.put("msg3")
assert receiver._message_queue.qsize() == 3
# Clear the queue
receiver.clear_queue()
# Assert the queue is empty
assert receiver._message_queue.qsize() == 0
def test_gesture_no_data(mocker):
mock_zmq = mock.Mock()
receiver = ActuationReceiver(mock_zmq)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": ""}, True) receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": ""}, True)
# Just ensuring no crash # Just ensuring no crash
def test_gesture_invalid_data(zmq_context, mocker): def test_gesture_invalid_data(mocker):
receiver = ActuationReceiver(zmq_context) mock_zmq = mock.Mock()
receiver = ActuationReceiver(mock_zmq)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": 123}, True) receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": 123}, True)
# No crash expected # No crash expected
def test_gesture_single_not_found(zmq_context, mocker): def test_gesture_single_not_found(mocker):
mock_zmq = mock.Mock()
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["wave", "bow"] # allowed single gestures mock_tags.single_gestures = ["wave", "bow"] # allowed single gestures
receiver = ActuationReceiver(mock_zmq)
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "unknown_gesture"}, True) receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "unknown_gesture"}, True)
# No crash expected # No crash expected
def test_gesture_tag_not_found(zmq_context, mocker): def test_gesture_tag_not_found(mocker):
mock_zmq = mock.Mock()
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.tags = ["happy", "sad"] mock_tags.tags = ["happy", "sad"]
receiver = ActuationReceiver(zmq_context) receiver = ActuationReceiver(mock_zmq)
receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "not_a_tag"}, False) receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "not_a_tag"}, False)
# No crash expected # No crash expected
def test_gesture_no_qi_session(zmq_context, mocker): def test_gesture_no_qi_session( mocker):
mock_zmq = mock.Mock()
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_state.qi_session = None mock_state.qi_session = None
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["hello"] mock_tags.single_gestures = ["hello"]
receiver = ActuationReceiver(zmq_context) receiver = ActuationReceiver(mock_zmq)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "hello"}, True) receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "hello"}, True)
# No crash, path returns early # No crash, path returns early
def test_gesture_single_success(zmq_context, mocker): def test_gesture_single_success(mocker):
mock_zmq = mock.Mock()
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Allow loops to run
mock_state.exit_event.is_set.return_value = False
mock_qi = mock.Mock() mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi sys.modules["qi"] = mock_qi
# Setup gesture settings
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["wave"] mock_tags.single_gestures = ["wave"]
@@ -156,17 +366,25 @@ def test_gesture_single_success(zmq_context, mocker):
mock_state.qi_session = mock.Mock() mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context) receiver = ActuationReceiver(mock_zmq)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "wave"}, True) receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "wave"}, True)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer") time.sleep(0.2)
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run mock_state.qi_session.service.assert_called_with("ALAnimationPlayer")
assert getattr(mock_qi, "async").call_args[0][1] == "wave" mock_animation_service.run.assert_called_with("wave")
# CLEANUP: Signal exit AND join threads
mock_state.exit_event.is_set.return_value = True
receiver.message_thread.join(timeout=1.0)
receiver.gesture_thread.join(timeout=1.0)
def test_gesture_tag_success(zmq_context, mocker): def test_gesture_tag_success(mocker):
mock_zmq = mock.Mock()
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_state.exit_event.is_set.return_value = False
mock_qi = mock.Mock() mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi sys.modules["qi"] = mock_qi
@@ -177,21 +395,26 @@ def test_gesture_tag_success(zmq_context, mocker):
mock_state.qi_session = mock.Mock() mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context) receiver = ActuationReceiver(mock_zmq)
receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "greeting"}, False) receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "greeting"}, False)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer") time.sleep(0.2)
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.runTag mock_state.qi_session.service.assert_called_with("ALAnimationPlayer")
assert getattr(mock_qi, "async").call_args[0][1] == "greeting" mock_animation_service.run.assert_called_with("greeting")
# CLEANUP: Signal exit AND join threads
mock_state.exit_event.is_set.return_value = True
receiver.message_thread.join(timeout=1.0)
receiver.gesture_thread.join(timeout=1.0)
def test_handle_message_all_routes(zmq_context, mocker): def test_handle_message_all_routes(mocker):
""" """
Ensures all handle_message endpoint branches route correctly. Ensures all handle_message endpoint branches route correctly.
""" """
receiver = ActuationReceiver(zmq_context) mock_zmq = mock.Mock()
receiver = ActuationReceiver(mock_zmq)
mock_speech = mocker.patch.object(receiver, "_handle_speech") mock_speech = mocker.patch.object(receiver, "_handle_speech")
mock_gesture = mocker.patch.object(receiver, "_handle_gesture") mock_gesture = mocker.patch.object(receiver, "_handle_gesture")
@@ -203,12 +426,13 @@ def test_handle_message_all_routes(zmq_context, mocker):
assert mock_gesture.call_count == 2 assert mock_gesture.call_count == 2
def test_endpoint_description(zmq_context, mocker): def test_endpoint_description(mocker):
mock_zmq = mock.Mock()
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags") mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.tags = ["happy"] mock_tags.tags = ["happy"]
mock_tags.single_gestures = ["wave"] mock_tags.single_gestures = ["wave"]
receiver = ActuationReceiver(zmq_context) receiver = ActuationReceiver(mock_zmq)
desc = receiver.endpoint_description() desc = receiver.endpoint_description()
assert "gestures" in desc assert "gestures" in desc
@@ -218,26 +442,20 @@ def test_endpoint_description(zmq_context, mocker):
assert desc["single_gestures"] == ["wave"] assert desc["single_gestures"] == ["wave"]
def test_gesture_single_real_gesturetags(zmq_context, mocker): def test_gesture_single_real_gesturetags(mocker):
""" mock_zmq = mock.Mock()
Uses the real GestureTags (no mocking) to ensure the receiver
references GestureTags.single_gestures correctly.
"""
# Ensure qi session exists so we pass the early return
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_state.qi_session = mock.Mock() mock_state.qi_session = mock.Mock()
mock_state.exit_event.is_set.return_value = False
# Mock qi.async to avoid real async calls
mock_qi = mock.Mock() mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi sys.modules["qi"] = mock_qi
# Mock animation service
mock_animation_service = mock.Mock() mock_animation_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context) receiver = ActuationReceiver(mock_zmq)
# Pick a real gesture from GestureTags.single_gestures
assert len(GestureTags.single_gestures) > 0, "GestureTags.single_gestures must not be empty" assert len(GestureTags.single_gestures) > 0, "GestureTags.single_gestures must not be empty"
gesture = GestureTags.single_gestures[0] gesture = GestureTags.single_gestures[0]
@@ -246,8 +464,85 @@ def test_gesture_single_real_gesturetags(zmq_context, mocker):
is_single=True, is_single=True,
) )
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer") time.sleep(0.2)
getattr(mock_qi, "async").assert_called_once() mock_state.qi_session.service.assert_called_with("ALAnimationPlayer")
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run mock_animation_service.run.assert_called_with(gesture)
assert getattr(mock_qi, "async").call_args[0][1] == gesture
# CLEANUP: Signal exit AND join threads
mock_state.exit_event.is_set.return_value = True
receiver.message_thread.join(timeout=1.0)
receiver.gesture_thread.join(timeout=1.0)
def test_clear_gesture_queue(mocker):
# Prevent background threads from eating the items
mocker.patch("threading.Thread")
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Populate the queue
receiver._gesture_queue.put("gesture1")
receiver._gesture_queue.put("gesture2")
assert receiver._gesture_queue.qsize() == 2
# Clear the queue
receiver.clear_gesture_queue()
# Assert the queue is empty
assert receiver._gesture_queue.qsize() == 0
def test_gesture_priority_clears_queue(mocker):
mocker.patch("threading.Thread")
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Mock QI and Tags so valid checks pass
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["urgent_wave"]
# Setup Animation Service
mock_anim = mock.Mock()
mock_state.qi_session.service.return_value = mock_anim
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Pre-fill queue with "slow" gestures
receiver._gesture_queue.put("slow_gesture_1")
receiver._gesture_queue.put("slow_gesture_2")
assert receiver._gesture_queue.qsize() == 2
# Send priority gesture
priority_msg = {
"endpoint": "actuate/gesture/single",
"data": "urgent_wave",
"is_priority": True,
}
receiver._handle_gesture(priority_msg, is_single=True)
# Assert old items are gone and only new one remains
assert receiver._gesture_queue.qsize() == 1
assert receiver._gesture_queue.get() == "urgent_wave"
def test_handle_gestures_loop_empty(mocker):
mocker.patch("threading.Thread")
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Run loop exactly once
mock_state.exit_event.is_set.side_effect = [False, True, True, True]
# We don't put anything in the queue, so .get(timeout=0.1) will raise Queue.Empty.
# The code should catch it and pass.
receiver._handle_gestures()
# If we reached here without raising an exception, the test passes.
# We can assert that the queue is still valid/empty.
assert receiver._gesture_queue.empty()