feat: implement actuation receiver

The ActuationReceiver connects to the Pepper robot using the Qi library. The endpoint is automatically negotiated.

ref: N25B-168
This commit is contained in:
Twirre Meulenbelt
2025-10-13 22:08:43 +02:00
parent c6916470e9
commit ff6abbfea1
2 changed files with 72 additions and 1 deletions

View File

@@ -0,0 +1,68 @@
import logging
import sys
import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase
class ActuationReceiver(ReceiverBase):
def __init__(self, zmq_context, port=5556):
"""
The actuation receiver endpoint, responsible for handling speech and gesture requests.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use.
:type port: int
"""
super(ActuationReceiver, self).__init__("actuation", "json")
self.create_socket(zmq_context, zmq.SUB, port)
self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Subscribe to all topics
self._qi_session = self._get_session()
self._tts_service = None
@staticmethod
def _get_session():
if "--qi-url" not in sys.argv:
logging.info("No Qi URL argument given. Running in stand-alone mode.")
return None
try:
import qi
except ImportError:
logging.info("Unable to import qi. Running in stand-alone mode.")
return None
try:
app = qi.Application()
app.start()
return app.session
except RuntimeError:
logging.info("Unable to connect to the robot. Running in stand-alone mode.")
return None
def _handle_speech(self, message):
if not self._qi_session: return
if not self._tts_service:
self._tts_service = self._qi_session.service("ALTextToSpeech")
text = message.get("data")
if not text:
logging.warn("Received message to speak, but it lacks data.")
return
logging.debug("Speaking received message: {}".format(text))
self._tts_service.say(text)
def handle_message(self, message):
if "endpoint" not in message:
return {"endpoint": "error", "data": "No endpoint provided."}
if message["endpoint"] == "actuate/speech":
self._handle_speech(message)
return {"endpoint": "error", "data": "The requested endpoint is not supported."}

View File

@@ -4,6 +4,7 @@ import time
import zmq
from robot_interface.endpoints.actuation_receiver import ActuationReceiver
from robot_interface.endpoints.main_receiver import MainReceiver
from robot_interface.state import state
@@ -18,9 +19,11 @@ def main_loop(context):
# When creating sockets, remember to add them to the `sockets` list of the state to ensure they're deinitialized
main_receiver = MainReceiver(context)
state.sockets.append(main_receiver)
actuation_receiver = ActuationReceiver(context)
state.sockets.append(actuation_receiver)
# Sockets that can run on the main thread. These sockets' endpoints should not block for long (say 50 ms at most).
receivers = [main_receiver]
receivers = [main_receiver, actuation_receiver]
poller = zmq.Poller()
for receiver in receivers: