From 95c7585bf194171579ea307d334d1e194694a9b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Tue, 2 Dec 2025 15:00:10 +0100 Subject: [PATCH] feat: setup gesture agent and adjust command port for the UI ref: N25B-334 --- .../agents/actuation/robot_gesture_agent.py | 108 ++++++++++++++++++ .../agents/actuation/robot_speech_agent.py | 4 +- .../communication/ri_communication_agent.py | 13 ++- src/control_backend/api/v1/endpoints/robot.py | 21 +++- src/control_backend/schemas/ri_message.py | 13 +++ 5 files changed, 151 insertions(+), 8 deletions(-) create mode 100644 src/control_backend/agents/actuation/robot_gesture_agent.py diff --git a/src/control_backend/agents/actuation/robot_gesture_agent.py b/src/control_backend/agents/actuation/robot_gesture_agent.py new file mode 100644 index 0000000..1cda099 --- /dev/null +++ b/src/control_backend/agents/actuation/robot_gesture_agent.py @@ -0,0 +1,108 @@ +import json + +import zmq +import zmq.asyncio as azmq + +from control_backend.agents import BaseAgent +from control_backend.core.agent_system import InternalMessage +from control_backend.core.config import settings +from control_backend.schemas.ri_message import GestureCommand + + +class RobotGestureAgent(BaseAgent): + """ + This agent acts as a bridge between the control backend and the Robot Interface (RI). + It receives speech commands from other agents or from the UI, + and forwards them to the robot via a ZMQ PUB socket. + + :ivar subsocket: ZMQ SUB socket for receiving external commands (e.g., from UI). + :ivar pubsocket: ZMQ PUB socket for sending commands to the Robot Interface. + :ivar address: Address to bind/connect the PUB socket. + :ivar bind: Whether to bind or connect the PUB socket. + :ivar gesture_data: A list of strings for available gestures + """ + + subsocket: azmq.Socket + pubsocket: azmq.Socket + address = "" + bind = False + gesture_data = [] + + def __init__( + self, + name: str, + address=settings.zmq_settings.ri_command_address, + bind=False, + gesture_data=None, + ): + if gesture_data is None: + gesture_data = [] + super().__init__(name) + self.address = address + self.bind = bind + + async def setup(self): + """ + Initialize the agent. + + 1. Sets up the PUB socket to talk to the robot. + 2. Sets up the SUB socket to listen for "command" topics (from UI/External). + 3. Starts the loop for handling ZMQ commands. + """ + self.logger.info("Setting up %s", self.name) + + context = azmq.Context.instance() + + # To the robot + self.pubsocket = context.socket(zmq.PUB) + if self.bind: # TODO: Should this ever be the case? + self.pubsocket.bind(self.address) + else: + self.pubsocket.connect(self.address) + + # Receive internal topics regarding commands + self.subsocket = context.socket(zmq.SUB) + self.subsocket.connect(settings.zmq_settings.internal_sub_address) + self.subsocket.setsockopt(zmq.SUBSCRIBE, b"command") + + self.add_behavior(self._zmq_command_loop()) + + self.logger.info("Finished setting up %s", self.name) + + async def stop(self): + if self.subsocket: + self.subsocket.close() + if self.pubsocket: + self.pubsocket.close() + await super().stop() + + async def handle_message(self, msg: InternalMessage): + """ + Handle commands received from other internal Python agents. + + Validates the message as a :class:`GestureCommand` and forwards it to the robot. + + :param msg: The internal message containing the command. + """ + try: + speech_command = GestureCommand.model_validate_json(msg.body) + await self.pubsocket.send_json(speech_command.model_dump()) + except Exception: + self.logger.exception("Error processing internal message.") + + async def _zmq_command_loop(self): + """ + Loop to handle commands received via ZMQ (e.g., from the UI). + + Listens on the 'command' topic, validates the JSON and forwards it to the robot. + """ + while self._running: + try: + _, body = await self.subsocket.recv_multipart() + + body = json.loads(body) + message = GestureCommand.model_validate(body) + + await self.pubsocket.send_json(message.model_dump()) + except Exception: + self.logger.exception("Error processing ZMQ message.") diff --git a/src/control_backend/agents/actuation/robot_speech_agent.py b/src/control_backend/agents/actuation/robot_speech_agent.py index 15fa07f..674b270 100644 --- a/src/control_backend/agents/actuation/robot_speech_agent.py +++ b/src/control_backend/agents/actuation/robot_speech_agent.py @@ -21,8 +21,8 @@ class RobotSpeechAgent(BaseAgent): :ivar bind: Whether to bind or connect the PUB socket. """ - subsocket: zmq.Socket - pubsocket: zmq.Socket + subsocket: azmq.Socket + pubsocket: azmq.Socket address = "" bind = False diff --git a/src/control_backend/agents/communication/ri_communication_agent.py b/src/control_backend/agents/communication/ri_communication_agent.py index f4e3ef0..a9223c3 100644 --- a/src/control_backend/agents/communication/ri_communication_agent.py +++ b/src/control_backend/agents/communication/ri_communication_agent.py @@ -6,6 +6,7 @@ import zmq.asyncio as azmq from zmq.asyncio import Context from control_backend.agents import BaseAgent +from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent from control_backend.core.config import settings from ..actuation.robot_speech_agent import RobotSpeechAgent @@ -179,12 +180,20 @@ class RICommunicationAgent(BaseAgent): else: self._req_socket.bind(addr) case "actuation": - ri_commands_agent = RobotSpeechAgent( + gesture_data = port_data.get("gestures", []) + robot_speech_agent = RobotSpeechAgent( settings.agent_settings.robot_speech_name, address=addr, bind=bind, ) - await ri_commands_agent.start() + robot_gesture_agent = RobotGestureAgent( + settings.agent_settings.robot_speech_name, + address=addr, + bind=bind, + gesture_data=gesture_data, + ) + await robot_speech_agent.start() + await robot_gesture_agent.start() case _: self.logger.warning("Unhandled negotiation id: %s", id) diff --git a/src/control_backend/api/v1/endpoints/robot.py b/src/control_backend/api/v1/endpoints/robot.py index ae0fe66..12f2fa5 100644 --- a/src/control_backend/api/v1/endpoints/robot.py +++ b/src/control_backend/api/v1/endpoints/robot.py @@ -3,12 +3,13 @@ import json import logging import zmq.asyncio -from fastapi import APIRouter, Request +from fastapi import APIRouter, HTTPException, Request from fastapi.responses import StreamingResponse +from pydantic import ValidationError from zmq.asyncio import Context, Socket from control_backend.core.config import settings -from control_backend.schemas.ri_message import SpeechCommand +from control_backend.schemas.ri_message import GestureCommand, SpeechCommand logger = logging.getLogger(__name__) @@ -22,17 +23,29 @@ async def receive_command(command: SpeechCommand, request: Request): Publishes the command to the internal 'command' topic. The :class:`~control_backend.agents.actuation.robot_speech_agent.RobotSpeechAgent` + or + :class:`~control_backend.agents.actuation.robot_speech_agent.RobotGestureAgent` will forward this to the robot. :param command: The speech command payload. :param request: The FastAPI request object. """ # Validate and retrieve data. - SpeechCommand.model_validate(command) + validated = None + valid_commands = (GestureCommand, SpeechCommand) + for command_model in valid_commands: + try: + validated = command_model.model_validate(command) + except ValidationError: + continue + + if validated is None: + raise HTTPException(status_code=422, detail="Payload is not valid for command models") + topic = b"command" pub_socket: Socket = request.app.state.endpoints_pub_socket - await pub_socket.send_multipart([topic, command.model_dump_json().encode()]) + await pub_socket.send_multipart([topic, validated.model_dump_json().encode()]) return {"status": "Command received"} diff --git a/src/control_backend/schemas/ri_message.py b/src/control_backend/schemas/ri_message.py index 3f0e5d2..88656b0 100644 --- a/src/control_backend/schemas/ri_message.py +++ b/src/control_backend/schemas/ri_message.py @@ -10,6 +10,7 @@ class RIEndpoint(str, Enum): """ SPEECH = "actuate/speech" + GESTURE = "actuate/gesture" PING = "ping" NEGOTIATE_PORTS = "negotiate/ports" @@ -36,3 +37,15 @@ class SpeechCommand(RIMessage): endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH) data: str + + +class GestureCommand(RIMessage): + """ + A specific command to make the robot do a gesture. + + :ivar endpoint: Fixed to ``RIEndpoint.GESTURE``. + :ivar data: The id of the gesture to be executed. + """ + + endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH) + data: str