Files
pepperplus-ri/src/robot_interface/endpoints/actuation_receiver.py
Twirre Meulenbelt 9ea446275e fix: allow speaking text with Unicode characters
When speaking, the actuation receiver logs the message to speak. If the message includes Unicode characters, it will now no longer crash.

ref: N25B-119
2025-11-02 14:59:16 +01:00

51 lines
1.7 KiB
Python

from __future__ import unicode_literals # So that we can log texts with Unicode characters
import logging
import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase
from robot_interface.state import state
class ActuationReceiver(ReceiverBase):
def __init__(self, zmq_context, port=5557):
"""
The actuation receiver endpoint, responsible for handling speech and gesture requests.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use.
:type port: int
"""
super(ActuationReceiver, self).__init__("actuation")
self.create_socket(zmq_context, zmq.SUB, port)
self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options
self._tts_service = None
def _handle_speech(self, message):
text = message.get("data")
if not text:
logging.warn("Received message to speak, but it lacks data.")
return
if not isinstance(text, (str, unicode)):
logging.warn("Received message to speak but it is not a string.")
return
logging.debug("Received message to speak: {}".format(text))
if not state.qi_session: return
# If state has a qi_session, we know that we can import qi
import qi # Takes a while only the first time it's imported
if not self._tts_service:
self._tts_service = state.qi_session.service("ALTextToSpeech")
# Returns instantly. Messages received while speaking will be queued.
qi.async(self._tts_service.say, text)
def handle_message(self, message):
if message["endpoint"] == "actuate/speech":
self._handle_speech(message)