chore: finalized tests, added queue

ref: N25B-387
This commit is contained in:
Pim Hutting
2025-12-18 12:55:09 +01:00
parent 5d5c8553c2
commit a55acd57b6
2 changed files with 168 additions and 27 deletions

View File

@@ -36,8 +36,11 @@ class ActuationReceiver(ReceiverBase):
self._tts_service = None
self._animation_service = None
self._message_queue = Queue.Queue()
self._gesture_queue = Queue.Queue()
self.message_thread = Thread(target=self._handle_messages)
self.message_thread.start()
self.gesture_thread = Thread(target=self._handle_gestures)
self.gesture_thread.start()
def _handle_speech(self, message):
"""
@@ -86,6 +89,20 @@ class ActuationReceiver(ReceiverBase):
pass
logging.info("Message queue cleared.")
def clear_gesture_queue(self):
"""
Safely drains all pending gestures from the gesture queue.
"""
logging.info("Gesture queue size: {}".format(self._gesture_queue.qsize()))
try:
while True:
# Remove items one by one without waiting
self._gesture_queue.get_nowait()
except Queue.Empty:
pass
logging.info("Gesture queue cleared.")
logging.info("Gesture queue size: {}".format(self._gesture_queue.qsize()))
def _handle_gesture(self, message, is_single):
"""
@@ -127,12 +144,15 @@ class ActuationReceiver(ReceiverBase):
# Play the gesture. Pepper comes with predefined animations like "Wave", "Greet", "Clap"
# You can also create custom animations using Choregraphe and upload them to the robot.
if (message.get("is_priority")):
# Clear queue and play
self.clear_gesture_queue()
logging.debug("Force playing gesture immediately: {}".format(gesture))
if is_single:
logging.debug("Playing single gesture: {}".format(gesture))
getattr(qi, "async")(self._animation_service.run, gesture)
logging.debug("Adding single gesture to queue: {}".format(gesture))
else:
logging.debug("Playing tag gesture: {}".format(gesture))
getattr(qi, "async")(self._animation_service.runTag, gesture)
logging.debug("Adding tag gesture to queue: {}".format(gesture))
self._gesture_queue.put(gesture)
def handle_message(self, message):
@@ -163,6 +183,17 @@ class ActuationReceiver(ReceiverBase):
logging.warn("Lost connection to Pepper. Please check if you're connected to the local WiFi and restart this application.")
state.exit_event.set()
def _handle_gestures(self):
while not state.exit_event.is_set():
try:
gesture = self._gesture_queue.get(timeout=0.1)
self._animation_service.run(gesture)
except Queue.Empty:
pass
except RuntimeError:
logging.warn("Lost connection to Pepper. Please check if you're connected to the local WiFi and restart this application.")
state.exit_event.set()
def endpoint_description(self):
"""
Extend the default endpoint description with gesture tags.

View File

@@ -1,5 +1,6 @@
import sys
import time
import mock
import pytest
import zmq
@@ -375,10 +376,12 @@ def test_gesture_no_qi_session(zmq_context, mocker):
def test_gesture_single_success(zmq_context, mocker):
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Allow loops to run
mock_state.exit_event.is_set.return_value = False
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
# Setup gesture settings
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["wave"]
@@ -389,14 +392,21 @@ def test_gesture_single_success(zmq_context, mocker):
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "wave"}, True)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer")
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run
assert getattr(mock_qi, "async").call_args[0][1] == "wave"
time.sleep(0.2)
mock_state.qi_session.service.assert_called_with("ALAnimationPlayer")
mock_animation_service.run.assert_called_with("wave")
# CLEANUP: Signal exit AND join threads
mock_state.exit_event.is_set.return_value = True
receiver.message_thread.join(timeout=1.0)
receiver.gesture_thread.join(timeout=1.0)
def test_gesture_tag_success(zmq_context, mocker):
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_state.exit_event.is_set.return_value = False
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
@@ -410,10 +420,15 @@ def test_gesture_tag_success(zmq_context, mocker):
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "greeting"}, False)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer")
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.runTag
assert getattr(mock_qi, "async").call_args[0][1] == "greeting"
time.sleep(0.2)
mock_state.qi_session.service.assert_called_with("ALAnimationPlayer")
mock_animation_service.run.assert_called_with("greeting")
# CLEANUP: Signal exit AND join threads
mock_state.exit_event.is_set.return_value = True
receiver.message_thread.join(timeout=1.0)
receiver.gesture_thread.join(timeout=1.0)
def test_handle_message_all_routes(zmq_context, mocker):
@@ -449,25 +464,18 @@ def test_endpoint_description(zmq_context, mocker):
def test_gesture_single_real_gesturetags(zmq_context, mocker):
"""
Uses the real GestureTags (no mocking) to ensure the receiver
references GestureTags.single_gestures correctly.
"""
# Ensure qi session exists so we pass the early return
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_state.qi_session = mock.Mock()
mock_state.exit_event.is_set.return_value = False
# Mock qi.async to avoid real async calls
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
# Mock animation service
mock_animation_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context)
# Pick a real gesture from GestureTags.single_gestures
assert len(GestureTags.single_gestures) > 0, "GestureTags.single_gestures must not be empty"
gesture = GestureTags.single_gestures[0]
@@ -476,8 +484,110 @@ def test_gesture_single_real_gesturetags(zmq_context, mocker):
is_single=True,
)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer")
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run
assert getattr(mock_qi, "async").call_args[0][1] == gesture
time.sleep(0.2)
mock_state.qi_session.service.assert_called_with("ALAnimationPlayer")
mock_animation_service.run.assert_called_with(gesture)
# CLEANUP: Signal exit AND join threads
mock_state.exit_event.is_set.return_value = True
receiver.message_thread.join(timeout=1.0)
receiver.gesture_thread.join(timeout=1.0)
def test_clear_gesture_queue(mocker):
# Prevent background threads from eating the items
mocker.patch("threading.Thread")
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Populate the queue
receiver._gesture_queue.put("gesture1")
receiver._gesture_queue.put("gesture2")
assert receiver._gesture_queue.qsize() == 2
# Clear the queue
receiver.clear_gesture_queue()
# Assert the queue is empty
assert receiver._gesture_queue.qsize() == 0
def test_gesture_priority_clears_queue(mocker):
mocker.patch("threading.Thread")
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Mock QI and Tags so valid checks pass
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["urgent_wave"]
# Setup Animation Service
mock_anim = mock.Mock()
mock_state.qi_session.service.return_value = mock_anim
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Pre-fill queue with "slow" gestures
receiver._gesture_queue.put("slow_gesture_1")
receiver._gesture_queue.put("slow_gesture_2")
assert receiver._gesture_queue.qsize() == 2
# Send priority gesture
priority_msg = {
"endpoint": "actuate/gesture/single",
"data": "urgent_wave",
"is_priority": True,
}
receiver._handle_gesture(priority_msg, is_single=True)
# Assert old items are gone and only new one remains
assert receiver._gesture_queue.qsize() == 1
assert receiver._gesture_queue.get() == "urgent_wave"
def test_handle_gestures_loop_empty(mocker):
mocker.patch("threading.Thread")
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Run loop exactly once
mock_state.exit_event.is_set.side_effect = [False, True]
# We don't put anything in the queue, so .get(timeout=0.1) will raise Queue.Empty.
# The code should catch it and pass.
receiver._handle_gestures()
# If we reached here without raising an exception, the test passes.
# We can assert that the queue is still valid/empty.
assert receiver._gesture_queue.empty()
def test_handle_gestures_runtime_error(mocker):
mocker.patch("threading.Thread")
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Run loop exactly once
mock_state.exit_event.is_set.side_effect = [False, True]
# Setup the service to fail
mock_anim = mock.Mock()
mock_anim.run.side_effect = RuntimeError("Wifi Lost")
receiver._animation_service = mock_anim
# Add item to trigger the service call
receiver._gesture_queue.put("wave")
receiver._handle_gestures()
# Assert that the exit_event was triggered
assert mock_state.exit_event.set.called