enable playing a gesture with priority, clearing the queue #25

Closed
8464960 wants to merge 7 commits from feat/force-gesture into dev
8464960 commented 2026-01-02 15:38:56 +00:00 (Migrated from git.science.uu.nl)

Please first complete the merge request : "feat: implemented forced speech and speech queue" https://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-ri/-/merge_requests/23

Implemented queued gesture system and forced gesture functionality. Pepper's gestures are now queued up in a python queue. When a forced gesture message is received, the current gesture queue is emptied and the forced gesture is enqueued.

To test:

  • Create a new mock agent TestMixedGesture in CB (dev branch) with the code below. Also make sure to import and add it in the main.py file. (Code can be found below)
  • Run both the CB (dev branch) and RI.
  • Every second, a gesture without priority and every 5 seconds a gesture with priority is added to the queue by the mock agent, observe logs in RI (or better observe at Pepper) to see if the prioritized gesture clears the queue and is then added to the queue.
  • Verify that all RI tests pass and actuation_receiver.py is 100% covered.

Mock agent code:

import asyncio
import random

from control_backend.agents.base import BaseAgent
from control_backend.core.config import settings
from control_backend.core.agent_system import InternalMessage
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint

import json

class TestMixedGesture(BaseAgent):
    def __init__(self, name: str):
        super().__init__(name)
        self.gesture_list = [
            "animations/Stand/Gestures/Thinking_1",
            "animations/Stand/Gestures/Thinking_4",
            "animations/Stand/Gestures/Yes_1",
            "animations/Stand/Gestures/Yes_2",
            "animations/Stand/Gestures/You_1",
            "animations/Stand/Gestures/WhatSThis_1",
            "animations/Stand/Gestures/Wings_1",
            "animations/Stand/Reactions/Heat_1",
            "animations/Stand/Gestures/Take_1"
        ]

    async def setup(self):
        self.logger.debug("TestMixedGesture setup complete.")
        # We add two separate behaviors that run at the same time
        self.add_behavior(self._normal_gesture_loop())
        self.add_behavior(self._force_gesture_loop())


    async def _send_gesture(self, is_priority: bool):
        """
        method to send prioritized gesture command to RobotGestureAgent.

        :param single_gesture_name: The gesture tag that the robot has to perform.
        """
        # the endpoint is set to always be GESTURE_SINGLE for user interrupts
        selected_gesture = random.choice(self.gesture_list)
        cmd = GestureCommand(
            endpoint=RIEndpoint.GESTURE_SINGLE, data=selected_gesture, is_priority=is_priority
        )
        out_msg = InternalMessage(
            to=settings.agent_settings.robot_gesture_name,
            sender=self.name,
            body=cmd.model_dump_json(),
        )
        await self.send(out_msg)

    async def _normal_gesture_loop(self):
        """Sends a standard gesture every 3 seconds to fill the queue."""
        print("Starting normal gesture loop (every 3s).")
        while True:
            await asyncio.sleep(1)
            await self._send_gesture(is_priority=False)

    async def _force_gesture_loop(self):
        """Sends a PRIORITY gesture every 15 seconds to clear the queue."""
        print("Starting force gesture loop (every 15s).")
        while True:
            await asyncio.sleep(5)
            await self._send_gesture(is_priority=True)
Please first complete the merge request : "**feat: implemented forced speech and speech queue"** https://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-ri/-/merge_requests/23 Implemented queued gesture system and forced gesture functionality. Pepper's gestures are now queued up in a python queue. When a forced gesture message is received, the current gesture queue is emptied and the forced gesture is enqueued. To test: * [x] <span dir="">Create a new mock agent </span>`TestMixedGesture`<span dir=""> in </span>`CB`<span dir=""> (</span>`dev`<span dir=""> branch) with the code below. Also make sure to import and add it in the </span>`main.py`<span dir=""> file.</span> (Code can be found below) * [x] Run both the `CB` (dev branch) and `RI`. * [x] Every second, a gesture without priority and every 5 seconds a gesture with priority is added to the queue by the mock agent, observe logs in RI (or better observe at Pepper) to see if the prioritized gesture clears the queue and is then added to the queue. * [ ] Verify that all `RI` tests pass and `actuation_receiver.py` is 100% covered. Mock agent code: ```python import asyncio import random from control_backend.agents.base import BaseAgent from control_backend.core.config import settings from control_backend.core.agent_system import InternalMessage from control_backend.schemas.ri_message import GestureCommand, RIEndpoint import json class TestMixedGesture(BaseAgent): def __init__(self, name: str): super().__init__(name) self.gesture_list = [ "animations/Stand/Gestures/Thinking_1", "animations/Stand/Gestures/Thinking_4", "animations/Stand/Gestures/Yes_1", "animations/Stand/Gestures/Yes_2", "animations/Stand/Gestures/You_1", "animations/Stand/Gestures/WhatSThis_1", "animations/Stand/Gestures/Wings_1", "animations/Stand/Reactions/Heat_1", "animations/Stand/Gestures/Take_1" ] async def setup(self): self.logger.debug("TestMixedGesture setup complete.") # We add two separate behaviors that run at the same time self.add_behavior(self._normal_gesture_loop()) self.add_behavior(self._force_gesture_loop()) async def _send_gesture(self, is_priority: bool): """ method to send prioritized gesture command to RobotGestureAgent. :param single_gesture_name: The gesture tag that the robot has to perform. """ # the endpoint is set to always be GESTURE_SINGLE for user interrupts selected_gesture = random.choice(self.gesture_list) cmd = GestureCommand( endpoint=RIEndpoint.GESTURE_SINGLE, data=selected_gesture, is_priority=is_priority ) out_msg = InternalMessage( to=settings.agent_settings.robot_gesture_name, sender=self.name, body=cmd.model_dump_json(), ) await self.send(out_msg) async def _normal_gesture_loop(self): """Sends a standard gesture every 3 seconds to fill the queue.""" print("Starting normal gesture loop (every 3s).") while True: await asyncio.sleep(1) await self._send_gesture(is_priority=False) async def _force_gesture_loop(self): """Sends a PRIORITY gesture every 15 seconds to clear the queue.""" print("Starting force gesture loop (every 15s).") while True: await asyncio.sleep(5) await self._send_gesture(is_priority=True) ```
8464960 commented 2026-01-02 15:38:57 +00:00 (Migrated from git.science.uu.nl)

assigned to @8464960

assigned to @8464960
2584433 commented 2026-01-04 19:46:33 +00:00 (Migrated from git.science.uu.nl)

Your tests are flaky, sometimes all pass, sometimes these 2 or others fail.

______________________ test_handle_messages_queue_empty _______________________

mocker = <pytest_mock.plugin.MockFixture object at 0x0000000004B14588>

def test_handle_messages_queue_empty(mocker):
    """
    Tests the Queue.Empty exception handler in the consumer loop.
    This covers the logic that resets 'state.is_speaking' to False.
    """
    # Prevent the real background thread from starting
    mocker.patch("threading.Thread")

    # Mock the state object
    mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")

    # Setup 'is_speaking' property mock
    # We set return_value=True so the code enters the 'if state.is_speaking:' block.
    # We use PropertyMock to track when this attribute is set.
    type(mock_state).is_speaking = True

    mock_zmq_ctx = mock.Mock()
    receiver = ActuationReceiver(mock_zmq_ctx)

    # This ensures the while loop body runs exactly once for our test
    mock_state.exit_event.is_set.side_effect = [False, True]

    # Force get() to raise Queue.Empty immediately (simulate timeout)
    # We patch the 'get' method on the specific queue instance of our receiver
    #mocker.patch.object(receiver._message_queue, 'get', side_effect=Queue.Empty)

    # Run the loop logic manually (synchronously)
    receiver._handle_messages()

    # Final Assertion: Verify is_speaking was set to False
    # The code execution order is: read (returns True) -> print -> set (to False)
    # assert_called_with checks the arguments of the LAST call, which is the setter.
  assert mock_state.is_speaking is False

E AssertionError: assert True is False
E + where True = .is_speaking

test\unit\test_actuation_receiver.py:276: AssertionError
_____________________ test_handle_gestures_runtime_error ______________________

mocker = <pytest_mock.plugin.MockFixture object at 0x000000000446A648>

def test_handle_gestures_runtime_error(mocker):
    mocker.patch("threading.Thread")
    mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")

    mock_zmq_ctx = mock.Mock()
    receiver = ActuationReceiver(mock_zmq_ctx)

    # Run loop exactly once
    mock_state.exit_event.is_set.side_effect = [False, True]

    # Setup the service to fail
    mock_anim = mock.Mock()
    mock_anim.run.side_effect = RuntimeError("Wifi Lost")
    receiver._animation_service = mock_anim

    # Add item to trigger the service call
    receiver._gesture_queue.put("wave")

    receiver._handle_gestures()

    # Assert that the exit_event was triggered
  assert mock_state.exit_event.set.called

E AssertionError: assert False
E + where False = .called
E + where = .set
E + where = .exit_event

test\unit\test_actuation_receiver.py:592: AssertionError
===================== 2 failed, 88 passed in 2.25 seconds =====================

Your tests are flaky, sometimes all pass, sometimes these 2 or others fail. ______________________ test_handle_messages_queue_empty _______________________ mocker = <pytest_mock.plugin.MockFixture object at 0x0000000004B14588> def test_handle_messages_queue_empty(mocker): """ Tests the Queue.Empty exception handler in the consumer loop. This covers the logic that resets 'state.is_speaking' to False. """ # Prevent the real background thread from starting mocker.patch("threading.Thread") # Mock the state object mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") # Setup 'is_speaking' property mock # We set return_value=True so the code enters the 'if state.is_speaking:' block. # We use PropertyMock to track when this attribute is set. type(mock_state).is_speaking = True mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) # This ensures the while loop body runs exactly once for our test mock_state.exit_event.is_set.side_effect = [False, True] # Force get() to raise Queue.Empty immediately (simulate timeout) # We patch the 'get' method on the specific queue instance of our receiver #mocker.patch.object(receiver._message_queue, 'get', side_effect=Queue.Empty) # Run the loop logic manually (synchronously) receiver._handle_messages() # Final Assertion: Verify is_speaking was set to False # The code execution order is: read (returns True) -> print -> set (to False) # assert_called_with checks the arguments of the LAST call, which is the setter. > assert mock_state.is_speaking is False E AssertionError: assert True is False E + where True = <MagicMock name='state' id='78713928'>.is_speaking test\unit\test_actuation_receiver.py:276: AssertionError _____________________ test_handle_gestures_runtime_error ______________________ mocker = <pytest_mock.plugin.MockFixture object at 0x000000000446A648> def test_handle_gestures_runtime_error(mocker): mocker.patch("threading.Thread") mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") mock_zmq_ctx = mock.Mock() receiver = ActuationReceiver(mock_zmq_ctx) # Run loop exactly once mock_state.exit_event.is_set.side_effect = [False, True] # Setup the service to fail mock_anim = mock.Mock() mock_anim.run.side_effect = RuntimeError("Wifi Lost") receiver._animation_service = mock_anim # Add item to trigger the service call receiver._gesture_queue.put("wave") receiver._handle_gestures() # Assert that the exit_event was triggered > assert mock_state.exit_event.set.called E AssertionError: assert False E + where False = <MagicMock name='state.exit_event.set' id='80709512'>.called E + where <MagicMock name='state.exit_event.set' id='80709512'> = <MagicMock name='state.exit_event' id='71916232'>.set E + where <MagicMock name='state.exit_event' id='71916232'> = <MagicMock name='state' id='72459528'>.exit_event test\unit\test_actuation_receiver.py:592: AssertionError ===================== 2 failed, 88 passed in 2.25 seconds =====================
2584433 commented 2026-01-04 19:46:33 +00:00 (Migrated from git.science.uu.nl)

left review comments

left review comments
2584433 commented 2026-01-04 19:57:24 +00:00 (Migrated from git.science.uu.nl)

marked the checklist item Create a new mock agent TestMixedGesture in CB (dev branch) with the code below. Also make sure to import and add it in the main.py file. (Code can be found below) as completed

marked the checklist item **<span dir="">Create a new mock agent </span>`TestMixedGesture`<span dir=""> in </span>`CB`<span dir=""> (</span>`dev`<span dir=""> branch) with the code below. Also make sure to import and add it in the </span>`main.py`<span dir=""> file.</span> (Code can be found below)** as completed
2584433 commented 2026-01-04 19:57:25 +00:00 (Migrated from git.science.uu.nl)

marked the checklist item Run both the CB (dev branch) and RI. as completed

marked the checklist item **Run both the `CB` (dev branch) and `RI`.** as completed
2584433 commented 2026-01-04 19:58:55 +00:00 (Migrated from git.science.uu.nl)

marked the checklist item Every second, a gesture without priority and every 5 seconds a gesture with priority is added to the queue by the mock agent, observe logs in RI (or better observe at Pepper) to see if the prioritized gesture clears the queue and is then added to the queue. as completed

marked the checklist item **Every second, a gesture without priority and every 5 seconds a gesture with priority is added to the queue by the mock agent, observe logs in RI (or better observe at Pepper) to see if the prioritized gesture clears the queue and is then added to the queue.** as completed
2584433 commented 2026-01-04 20:03:01 +00:00 (Migrated from git.science.uu.nl)

I cannot observe if a priorty "clears" the cue, i am not getting the info logs for them. This is either a device issue or maybe i messed up in the CB?

I cannot observe if a priorty "clears" the cue, i am not getting the info logs for them. This is either a device issue or maybe i messed up in the CB?
2584433 commented 2026-01-04 20:03:01 +00:00 (Migrated from git.science.uu.nl)

left review comments

left review comments
2584433 commented 2026-01-04 20:06:41 +00:00 (Migrated from git.science.uu.nl)

requested review from @2584433

requested review from @2584433
8464960 commented 2026-01-14 15:35:47 +00:00 (Migrated from git.science.uu.nl)

added 1 commit

  • eab2481b - chore: fixed flakiness of tests

Compare with previous version

added 1 commit <ul><li>eab2481b - chore: fixed flakiness of tests</li></ul> [Compare with previous version](/ics/sp/2025/n25b/pepperplus-ri/-/merge_requests/25/diffs?diff_id=136791&start_sha=a55acd57b67d1b817df7f0b26d105c6807213bd1)
8464960 (Migrated from git.science.uu.nl) closed this pull request 2026-01-23 10:40:09 +00:00

Pull request closed

Sign in to join this conversation.
No Reviewers
No Label
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: pepperplus/pepperplus-ri#25