feat: gestures to ri

This commit is contained in:
JobvAlewijk
2025-12-16 08:35:26 +00:00
committed by Twirre
parent df702f1e44
commit a8fe887c48
3 changed files with 634 additions and 3 deletions

View File

@@ -7,6 +7,7 @@ from robot_interface.endpoints.receiver_base import ReceiverBase
from robot_interface.state import state
from robot_interface.core.config import settings
from robot_interface.endpoints.gesture_settings import GestureTags
class ActuationReceiver(ReceiverBase):
@@ -21,12 +22,16 @@ class ActuationReceiver(ReceiverBase):
:ivar _tts_service: The text-to-speech service object from the Qi session.
:vartype _tts_service: qi.Session | None
:ivar _animation_service: The animation/gesture service object from the Qi session.
:vartype _animation_service: qi.Session | None
"""
def __init__(self, zmq_context, port=settings.agent_settings.actuation_receiver_port):
super(ActuationReceiver, self).__init__("actuation")
self.create_socket(zmq_context, zmq.SUB, port)
self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options
self._tts_service = None
self._animation_service = None
def _handle_speech(self, message):
"""
@@ -54,7 +59,54 @@ class ActuationReceiver(ReceiverBase):
self._tts_service = state.qi_session.service("ALTextToSpeech")
# Returns instantly. Messages received while speaking will be queued.
qi.async(self._tts_service.say, text)
getattr(qi, "async")(self._tts_service.say, text)
def _handle_gesture(self, message, is_single):
"""
Handle a gesture actuation request.
:param message: The gesture to do, must contain properties "endpoint" and "data".
:type message: dict
:param is_single: Whether it's a specific single gesture or a gesture tag.
:type is_single: bool
"""
gesture = message.get("data")
if not gesture:
logging.warn("Received gesture to do, but it lacks data.")
return
if not isinstance(gesture, (str, unicode)):
logging.warn("Received gesture to do but it is not a string.")
return
logging.debug("Received gesture to do: {}".format(gesture))
if is_single:
if gesture not in GestureTags.single_gestures:
logging.warn("Received single gesture to do, but it does not exist in settings")
return
else:
if gesture not in GestureTags.tags:
logging.warn("Received single tag to do, but it does not exist in settings")
return
if not state.qi_session: return
# If state has a qi_session, we know that we can import qi
import qi # Takes a while only the first time it's imported
if not self._animation_service:
self._animation_service = state.qi_session.service("ALAnimationPlayer")
# Play the gesture. Pepper comes with predefined animations like "Wave", "Greet", "Clap"
# You can also create custom animations using Choregraphe and upload them to the robot.
if is_single:
logging.debug("Playing single gesture: {}".format(gesture))
getattr(qi, "async")(self._animation_service.run, gesture)
else:
logging.debug("Playing tag gesture: {}".format(gesture))
getattr(qi, "async")(self._animation_service.runTag, gesture)
def handle_message(self, message):
"""
@@ -65,3 +117,17 @@ class ActuationReceiver(ReceiverBase):
"""
if message["endpoint"] == "actuate/speech":
self._handle_speech(message)
if message["endpoint"] == "actuate/gesture/tag":
self._handle_gesture(message, False)
if message["endpoint"] == "actuate/gesture/single":
self._handle_gesture(message, True)
def endpoint_description(self):
"""
Extend the default endpoint description with gesture tags.
Returned during negotiate/ports so the CB knows available gestures.
"""
desc = super(ActuationReceiver, self).endpoint_description()
desc["gestures"] = GestureTags.tags
desc["single_gestures"] = GestureTags.single_gestures
return desc