from __future__ import unicode_literals # So that we can log texts with Unicode characters import logging import zmq from robot_interface.endpoints.receiver_base import ReceiverBase from robot_interface.state import state from robot_interface.core.config import settings class ActuationReceiver(ReceiverBase): """ The actuation receiver endpoint, responsible for handling speech and gesture requests. :param zmq_context: The ZeroMQ context to use. :type zmq_context: zmq.Context :param port: The port to use. :type port: int :ivar _tts_service: The text-to-speech service object from the Qi session. :vartype _tts_service: ssl.SSLSession | None """ def __init__(self, zmq_context, port=settings.agent_settings.actuating_receiver_port): super(ActuationReceiver, self).__init__("actuation") self.create_socket(zmq_context, zmq.SUB, port) self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options self._tts_service = None def _handle_speech(self, message): """ Handle a speech actuation request. :param message: The message to handle, must contain properties "endpoint" and "data". :type message: dict """ text = message.get("data") if not text: logging.warn("Received message to speak, but it lacks data.") return if not isinstance(text, (str, unicode)): logging.warn("Received message to speak but it is not a string.") return logging.debug("Received message to speak: {}".format(text)) if not state.qi_session: return # If state has a qi_session, we know that we can import qi import qi # Takes a while only the first time it's imported if not self._tts_service: self._tts_service = state.qi_session.service("ALTextToSpeech") # Returns instantly. Messages received while speaking will be queued. qi.async(self._tts_service.say, text) def handle_message(self, message): """ Handle an actuation/speech message with the receiver. :param message: The message to handle, must contain properties "endpoint" and "data". :type message: dict """ if message["endpoint"] == "actuate/speech": self._handle_speech(message)