Files
pepperplus-ri/src/robot_interface/endpoints/actuation_receiver.py
Twirre Meulenbelt df985a8cbc fix: log speech commands even when Pepper SDK is not connected
Previously, the `_handle_speech` function had an early return when no Pepper session was available, causing incoming messages not to get logged. Now messages are logged even when there is no session with the Pepper SDK.

ref: N25B-168
2025-10-15 14:58:31 +02:00

69 lines
2.1 KiB
Python

import logging
import sys
import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase
class ActuationReceiver(ReceiverBase):
def __init__(self, zmq_context, port=5556):
"""
The actuation receiver endpoint, responsible for handling speech and gesture requests.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use.
:type port: int
"""
super(ActuationReceiver, self).__init__("actuation", "json")
self.create_socket(zmq_context, zmq.SUB, port)
self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Subscribe to all topics
self._qi_session = self._get_session()
self._tts_service = None
@staticmethod
def _get_session():
if "--qi-url" not in sys.argv:
logging.info("No Qi URL argument given. Running in stand-alone mode.")
return None
try:
import qi
except ImportError:
logging.info("Unable to import qi. Running in stand-alone mode.")
return None
try:
app = qi.Application()
app.start()
return app.session
except RuntimeError:
logging.info("Unable to connect to the robot. Running in stand-alone mode.")
return None
def _handle_speech(self, message):
text = message.get("data")
if not text:
logging.warn("Received message to speak, but it lacks data.")
return
logging.debug("Received message to speak: {}".format(text))
if not self._qi_session: return
if not self._tts_service:
self._tts_service = self._qi_session.service("ALTextToSpeech")
self._tts_service.say(text)
def handle_message(self, message):
if "endpoint" not in message:
return {"endpoint": "error", "data": "No endpoint provided."}
if message["endpoint"] == "actuate/speech":
self._handle_speech(message)
return {"endpoint": "error", "data": "The requested endpoint is not supported."}