From a55acd57b67d1b817df7f0b26d105c6807213bd1 Mon Sep 17 00:00:00 2001
From: Pim Hutting
Date: Thu, 18 Dec 2025 12:55:09 +0100
Subject: [PATCH] chore: finalized tests, added queue
ref: N25B-387
---
.../endpoints/actuation_receiver.py | 41 ++++-
test/unit/test_actuation_receiver.py | 154 +++++++++++++++---
2 files changed, 168 insertions(+), 27 deletions(-)
diff --git a/src/robot_interface/endpoints/actuation_receiver.py b/src/robot_interface/endpoints/actuation_receiver.py
index 774cb97..42e8296 100644
--- a/src/robot_interface/endpoints/actuation_receiver.py
+++ b/src/robot_interface/endpoints/actuation_receiver.py
@@ -36,8 +36,11 @@ class ActuationReceiver(ReceiverBase):
self._tts_service = None
self._animation_service = None
self._message_queue = Queue.Queue()
+ self._gesture_queue = Queue.Queue()
self.message_thread = Thread(target=self._handle_messages)
self.message_thread.start()
+ self.gesture_thread = Thread(target=self._handle_gestures)
+ self.gesture_thread.start()
def _handle_speech(self, message):
"""
@@ -86,6 +89,20 @@ class ActuationReceiver(ReceiverBase):
pass
logging.info("Message queue cleared.")
+ def clear_gesture_queue(self):
+ """
+ Safely drains all pending gestures from the gesture queue.
+ """
+ logging.info("Gesture queue size: {}".format(self._gesture_queue.qsize()))
+ try:
+ while True:
+ # Remove items one by one without waiting
+ self._gesture_queue.get_nowait()
+ except Queue.Empty:
+ pass
+ logging.info("Gesture queue cleared.")
+ logging.info("Gesture queue size: {}".format(self._gesture_queue.qsize()))
+
def _handle_gesture(self, message, is_single):
"""
@@ -127,12 +144,15 @@ class ActuationReceiver(ReceiverBase):
# Play the gesture. Pepper comes with predefined animations like "Wave", "Greet", "Clap"
# You can also create custom animations using Choregraphe and upload them to the robot.
+ if (message.get("is_priority")):
+ # Clear queue and play
+ self.clear_gesture_queue()
+ logging.debug("Force playing gesture immediately: {}".format(gesture))
if is_single:
- logging.debug("Playing single gesture: {}".format(gesture))
- getattr(qi, "async")(self._animation_service.run, gesture)
+ logging.debug("Adding single gesture to queue: {}".format(gesture))
else:
- logging.debug("Playing tag gesture: {}".format(gesture))
- getattr(qi, "async")(self._animation_service.runTag, gesture)
+ logging.debug("Adding tag gesture to queue: {}".format(gesture))
+ self._gesture_queue.put(gesture)
def handle_message(self, message):
@@ -161,7 +181,18 @@ class ActuationReceiver(ReceiverBase):
state.is_speaking = False
except RuntimeError:
logging.warn("Lost connection to Pepper. Please check if you're connected to the local WiFi and restart this application.")
- state.exit_event.set()
+ state.exit_event.set()
+
+ def _handle_gestures(self):
+ while not state.exit_event.is_set():
+ try:
+ gesture = self._gesture_queue.get(timeout=0.1)
+ self._animation_service.run(gesture)
+ except Queue.Empty:
+ pass
+ except RuntimeError:
+ logging.warn("Lost connection to Pepper. Please check if you're connected to the local WiFi and restart this application.")
+ state.exit_event.set()
def endpoint_description(self):
"""
diff --git a/test/unit/test_actuation_receiver.py b/test/unit/test_actuation_receiver.py
index 3be2472..86e7f5b 100644
--- a/test/unit/test_actuation_receiver.py
+++ b/test/unit/test_actuation_receiver.py
@@ -1,5 +1,6 @@
import sys
+import time
import mock
import pytest
import zmq
@@ -375,10 +376,12 @@ def test_gesture_no_qi_session(zmq_context, mocker):
def test_gesture_single_success(zmq_context, mocker):
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
+ # Allow loops to run
+ mock_state.exit_event.is_set.return_value = False
+
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
-
- # Setup gesture settings
+
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["wave"]
@@ -389,14 +392,21 @@ def test_gesture_single_success(zmq_context, mocker):
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "wave"}, True)
- mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer")
- getattr(mock_qi, "async").assert_called_once()
- assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run
- assert getattr(mock_qi, "async").call_args[0][1] == "wave"
+ time.sleep(0.2)
+
+ mock_state.qi_session.service.assert_called_with("ALAnimationPlayer")
+ mock_animation_service.run.assert_called_with("wave")
+
+ # CLEANUP: Signal exit AND join threads
+ mock_state.exit_event.is_set.return_value = True
+ receiver.message_thread.join(timeout=1.0)
+ receiver.gesture_thread.join(timeout=1.0)
def test_gesture_tag_success(zmq_context, mocker):
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
+ mock_state.exit_event.is_set.return_value = False
+
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
@@ -410,10 +420,15 @@ def test_gesture_tag_success(zmq_context, mocker):
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "greeting"}, False)
- mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer")
- getattr(mock_qi, "async").assert_called_once()
- assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.runTag
- assert getattr(mock_qi, "async").call_args[0][1] == "greeting"
+ time.sleep(0.2)
+
+ mock_state.qi_session.service.assert_called_with("ALAnimationPlayer")
+ mock_animation_service.run.assert_called_with("greeting")
+
+ # CLEANUP: Signal exit AND join threads
+ mock_state.exit_event.is_set.return_value = True
+ receiver.message_thread.join(timeout=1.0)
+ receiver.gesture_thread.join(timeout=1.0)
def test_handle_message_all_routes(zmq_context, mocker):
@@ -449,25 +464,18 @@ def test_endpoint_description(zmq_context, mocker):
def test_gesture_single_real_gesturetags(zmq_context, mocker):
- """
- Uses the real GestureTags (no mocking) to ensure the receiver
- references GestureTags.single_gestures correctly.
- """
- # Ensure qi session exists so we pass the early return
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_state.qi_session = mock.Mock()
+ mock_state.exit_event.is_set.return_value = False
- # Mock qi.async to avoid real async calls
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
- # Mock animation service
mock_animation_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context)
- # Pick a real gesture from GestureTags.single_gestures
assert len(GestureTags.single_gestures) > 0, "GestureTags.single_gestures must not be empty"
gesture = GestureTags.single_gestures[0]
@@ -476,8 +484,110 @@ def test_gesture_single_real_gesturetags(zmq_context, mocker):
is_single=True,
)
- mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer")
- getattr(mock_qi, "async").assert_called_once()
- assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run
- assert getattr(mock_qi, "async").call_args[0][1] == gesture
+ time.sleep(0.2)
+ mock_state.qi_session.service.assert_called_with("ALAnimationPlayer")
+ mock_animation_service.run.assert_called_with(gesture)
+
+ # CLEANUP: Signal exit AND join threads
+ mock_state.exit_event.is_set.return_value = True
+ receiver.message_thread.join(timeout=1.0)
+ receiver.gesture_thread.join(timeout=1.0)
+
+def test_clear_gesture_queue(mocker):
+ # Prevent background threads from eating the items
+ mocker.patch("threading.Thread")
+
+ mock_zmq_ctx = mock.Mock()
+ receiver = ActuationReceiver(mock_zmq_ctx)
+
+ # Populate the queue
+ receiver._gesture_queue.put("gesture1")
+ receiver._gesture_queue.put("gesture2")
+
+ assert receiver._gesture_queue.qsize() == 2
+
+ # Clear the queue
+ receiver.clear_gesture_queue()
+
+ # Assert the queue is empty
+ assert receiver._gesture_queue.qsize() == 0
+
+
+def test_gesture_priority_clears_queue(mocker):
+ mocker.patch("threading.Thread")
+ mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
+
+ # Mock QI and Tags so valid checks pass
+ mock_qi = mock.Mock()
+ sys.modules["qi"] = mock_qi
+ mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
+ mock_tags.single_gestures = ["urgent_wave"]
+
+ # Setup Animation Service
+ mock_anim = mock.Mock()
+ mock_state.qi_session.service.return_value = mock_anim
+
+ mock_zmq_ctx = mock.Mock()
+ receiver = ActuationReceiver(mock_zmq_ctx)
+
+ # Pre-fill queue with "slow" gestures
+ receiver._gesture_queue.put("slow_gesture_1")
+ receiver._gesture_queue.put("slow_gesture_2")
+
+ assert receiver._gesture_queue.qsize() == 2
+
+ # Send priority gesture
+ priority_msg = {
+ "endpoint": "actuate/gesture/single",
+ "data": "urgent_wave",
+ "is_priority": True,
+ }
+ receiver._handle_gesture(priority_msg, is_single=True)
+
+ # Assert old items are gone and only new one remains
+ assert receiver._gesture_queue.qsize() == 1
+ assert receiver._gesture_queue.get() == "urgent_wave"
+
+
+def test_handle_gestures_loop_empty(mocker):
+ mocker.patch("threading.Thread")
+ mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
+
+ mock_zmq_ctx = mock.Mock()
+ receiver = ActuationReceiver(mock_zmq_ctx)
+
+ # Run loop exactly once
+ mock_state.exit_event.is_set.side_effect = [False, True]
+
+ # We don't put anything in the queue, so .get(timeout=0.1) will raise Queue.Empty.
+ # The code should catch it and pass.
+ receiver._handle_gestures()
+
+ # If we reached here without raising an exception, the test passes.
+ # We can assert that the queue is still valid/empty.
+ assert receiver._gesture_queue.empty()
+
+
+def test_handle_gestures_runtime_error(mocker):
+ mocker.patch("threading.Thread")
+ mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
+
+ mock_zmq_ctx = mock.Mock()
+ receiver = ActuationReceiver(mock_zmq_ctx)
+
+ # Run loop exactly once
+ mock_state.exit_event.is_set.side_effect = [False, True]
+
+ # Setup the service to fail
+ mock_anim = mock.Mock()
+ mock_anim.run.side_effect = RuntimeError("Wifi Lost")
+ receiver._animation_service = mock_anim
+
+ # Add item to trigger the service call
+ receiver._gesture_queue.put("wave")
+
+ receiver._handle_gestures()
+
+ # Assert that the exit_event was triggered
+ assert mock_state.exit_event.set.called