from __future__ import unicode_literals # So that we can log texts with Unicode characters import logging import time from threading import Thread import Queue import zmq from robot_interface.endpoints.receiver_base import ReceiverBase from robot_interface.state import state from robot_interface.core.config import settings from robot_interface.endpoints.gesture_settings import GestureTags class ActuationReceiver(ReceiverBase): """ The actuation receiver endpoint, responsible for handling speech and gesture requests. :param zmq_context: The ZeroMQ context to use. :type zmq_context: zmq.Context :param port: The port to use. :type port: int :ivar _tts_service: The text-to-speech service object from the Qi session. :vartype _tts_service: qi.Session | None :ivar _animation_service: The animation/gesture service object from the Qi session. :vartype _animation_service: qi.Session | None """ def __init__(self, zmq_context, port=settings.agent_settings.actuation_receiver_port): super(ActuationReceiver, self).__init__("actuation") self.create_socket(zmq_context, zmq.SUB, port) self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options self._tts_service = None self._message_queue = Queue.Queue() self.message_thread = Thread(target=self._handle_messages) self.message_thread.start() self._animation_service = None def _handle_speech(self, message): """ Handle a speech actuation request. :param message: The message to handle, must contain properties "endpoint" and "data". :type message: dict """ text = message.get("data") if not text: logging.warn("Received message to speak, but it lacks data.") return if not isinstance(text, (str, unicode)): logging.warn("Received message to speak but it is not a string.") return logging.debug("Received message to speak: {}".format(text)) if not state.qi_session: return # If state has a qi_session, we know that we can import qi import qi # Takes a while only the first time it's imported if not self._tts_service: self._tts_service = state.qi_session.service("ALTextToSpeech") if (message.get("is_priority")): # Bypass queue and speak immediately self.clear_queue() self._message_queue.put(text) logging.debug("Force speaking immediately: {}".format(text)) else: self._message_queue.put(text) def clear_queue(self): """ Safely drains all pending messages from the queue. """ try: while True: # Remove items one by one without waiting self._message_queue.get_nowait() except Queue.Empty: pass logging.info("Message queue cleared.") def _handle_gesture(self, message, is_single): """ Handle a gesture actuation request. :param message: The gesture to do, must contain properties "endpoint" and "data". :type message: dict :param is_single: Whether it's a specific single gesture or a gesture tag. :type is_single: bool """ gesture = message.get("data") if not gesture: logging.warn("Received gesture to do, but it lacks data.") return if not isinstance(gesture, (str, unicode)): logging.warn("Received gesture to do but it is not a string.") return logging.debug("Received gesture to do: {}".format(gesture)) if is_single: if gesture not in GestureTags.single_gestures: logging.warn("Received single gesture to do, but it does not exist in settings") return else: if gesture not in GestureTags.tags: logging.warn("Received single tag to do, but it does not exist in settings") return if not state.qi_session: return # If state has a qi_session, we know that we can import qi import qi # Takes a while only the first time it's imported if not self._animation_service: self._animation_service = state.qi_session.service("ALAnimationPlayer") # Play the gesture. Pepper comes with predefined animations like "Wave", "Greet", "Clap" # You can also create custom animations using Choregraphe and upload them to the robot. if is_single: logging.debug("Playing single gesture: {}".format(gesture)) getattr(qi, "async")(self._animation_service.run, gesture) else: logging.debug("Playing tag gesture: {}".format(gesture)) getattr(qi, "async")(self._animation_service.runTag, gesture) def handle_message(self, message): """ Handle an actuation/speech message with the receiver. :param message: The message to handle, must contain properties "endpoint" and "data". :type message: dict """ if message["endpoint"] == "actuate/speech": self._handle_speech(message) if message["endpoint"] == "actuate/gesture/tag": self._handle_gesture(message, False) if message["endpoint"] == "actuate/gesture/single": self._handle_gesture(message, True) def _handle_messages(self): while not state.exit_event.is_set(): try: text = self._message_queue.get(timeout=0.1) if not state.is_speaking: print("Started speaking.") state.is_speaking = True self._tts_service.say(text) except Queue.Empty: if state.is_speaking: print("Finished speaking.") state.is_speaking = False except RuntimeError: logging.warn("Lost connection to Pepper. Please check if you're connected to the local WiFi and restart this application.") state.exit_event.set() def endpoint_description(self): """ Extend the default endpoint description with gesture tags. Returned during negotiate/ports so the CB knows available gestures. """ desc = super(ActuationReceiver, self).endpoint_description() desc["gestures"] = GestureTags.tags desc["single_gestures"] = GestureTags.single_gestures return desc