feat: setup gesture agent and adjust command port for the UI

ref: N25B-334
This commit is contained in:
Björn Otgaar
2025-12-02 15:00:10 +01:00
parent c85753f834
commit 95c7585bf1
5 changed files with 151 additions and 8 deletions

View File

@@ -0,0 +1,108 @@
import json
import zmq
import zmq.asyncio as azmq
from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.ri_message import GestureCommand
class RobotGestureAgent(BaseAgent):
"""
This agent acts as a bridge between the control backend and the Robot Interface (RI).
It receives speech commands from other agents or from the UI,
and forwards them to the robot via a ZMQ PUB socket.
:ivar subsocket: ZMQ SUB socket for receiving external commands (e.g., from UI).
:ivar pubsocket: ZMQ PUB socket for sending commands to the Robot Interface.
:ivar address: Address to bind/connect the PUB socket.
:ivar bind: Whether to bind or connect the PUB socket.
:ivar gesture_data: A list of strings for available gestures
"""
subsocket: azmq.Socket
pubsocket: azmq.Socket
address = ""
bind = False
gesture_data = []
def __init__(
self,
name: str,
address=settings.zmq_settings.ri_command_address,
bind=False,
gesture_data=None,
):
if gesture_data is None:
gesture_data = []
super().__init__(name)
self.address = address
self.bind = bind
async def setup(self):
"""
Initialize the agent.
1. Sets up the PUB socket to talk to the robot.
2. Sets up the SUB socket to listen for "command" topics (from UI/External).
3. Starts the loop for handling ZMQ commands.
"""
self.logger.info("Setting up %s", self.name)
context = azmq.Context.instance()
# To the robot
self.pubsocket = context.socket(zmq.PUB)
if self.bind: # TODO: Should this ever be the case?
self.pubsocket.bind(self.address)
else:
self.pubsocket.connect(self.address)
# Receive internal topics regarding commands
self.subsocket = context.socket(zmq.SUB)
self.subsocket.connect(settings.zmq_settings.internal_sub_address)
self.subsocket.setsockopt(zmq.SUBSCRIBE, b"command")
self.add_behavior(self._zmq_command_loop())
self.logger.info("Finished setting up %s", self.name)
async def stop(self):
if self.subsocket:
self.subsocket.close()
if self.pubsocket:
self.pubsocket.close()
await super().stop()
async def handle_message(self, msg: InternalMessage):
"""
Handle commands received from other internal Python agents.
Validates the message as a :class:`GestureCommand` and forwards it to the robot.
:param msg: The internal message containing the command.
"""
try:
speech_command = GestureCommand.model_validate_json(msg.body)
await self.pubsocket.send_json(speech_command.model_dump())
except Exception:
self.logger.exception("Error processing internal message.")
async def _zmq_command_loop(self):
"""
Loop to handle commands received via ZMQ (e.g., from the UI).
Listens on the 'command' topic, validates the JSON and forwards it to the robot.
"""
while self._running:
try:
_, body = await self.subsocket.recv_multipart()
body = json.loads(body)
message = GestureCommand.model_validate(body)
await self.pubsocket.send_json(message.model_dump())
except Exception:
self.logger.exception("Error processing ZMQ message.")

View File

@@ -21,8 +21,8 @@ class RobotSpeechAgent(BaseAgent):
:ivar bind: Whether to bind or connect the PUB socket.
"""
subsocket: zmq.Socket
pubsocket: zmq.Socket
subsocket: azmq.Socket
pubsocket: azmq.Socket
address = ""
bind = False

View File

@@ -6,6 +6,7 @@ import zmq.asyncio as azmq
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
from control_backend.core.config import settings
from ..actuation.robot_speech_agent import RobotSpeechAgent
@@ -179,12 +180,20 @@ class RICommunicationAgent(BaseAgent):
else:
self._req_socket.bind(addr)
case "actuation":
ri_commands_agent = RobotSpeechAgent(
gesture_data = port_data.get("gestures", [])
robot_speech_agent = RobotSpeechAgent(
settings.agent_settings.robot_speech_name,
address=addr,
bind=bind,
)
await ri_commands_agent.start()
robot_gesture_agent = RobotGestureAgent(
settings.agent_settings.robot_speech_name,
address=addr,
bind=bind,
gesture_data=gesture_data,
)
await robot_speech_agent.start()
await robot_gesture_agent.start()
case _:
self.logger.warning("Unhandled negotiation id: %s", id)

View File

@@ -3,12 +3,13 @@ import json
import logging
import zmq.asyncio
from fastapi import APIRouter, Request
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import StreamingResponse
from pydantic import ValidationError
from zmq.asyncio import Context, Socket
from control_backend.core.config import settings
from control_backend.schemas.ri_message import SpeechCommand
from control_backend.schemas.ri_message import GestureCommand, SpeechCommand
logger = logging.getLogger(__name__)
@@ -22,17 +23,29 @@ async def receive_command(command: SpeechCommand, request: Request):
Publishes the command to the internal 'command' topic. The
:class:`~control_backend.agents.actuation.robot_speech_agent.RobotSpeechAgent`
or
:class:`~control_backend.agents.actuation.robot_speech_agent.RobotGestureAgent`
will forward this to the robot.
:param command: The speech command payload.
:param request: The FastAPI request object.
"""
# Validate and retrieve data.
SpeechCommand.model_validate(command)
validated = None
valid_commands = (GestureCommand, SpeechCommand)
for command_model in valid_commands:
try:
validated = command_model.model_validate(command)
except ValidationError:
continue
if validated is None:
raise HTTPException(status_code=422, detail="Payload is not valid for command models")
topic = b"command"
pub_socket: Socket = request.app.state.endpoints_pub_socket
await pub_socket.send_multipart([topic, command.model_dump_json().encode()])
await pub_socket.send_multipart([topic, validated.model_dump_json().encode()])
return {"status": "Command received"}

View File

@@ -10,6 +10,7 @@ class RIEndpoint(str, Enum):
"""
SPEECH = "actuate/speech"
GESTURE = "actuate/gesture"
PING = "ping"
NEGOTIATE_PORTS = "negotiate/ports"
@@ -36,3 +37,15 @@ class SpeechCommand(RIMessage):
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH)
data: str
class GestureCommand(RIMessage):
"""
A specific command to make the robot do a gesture.
:ivar endpoint: Fixed to ``RIEndpoint.GESTURE``.
:ivar data: The id of the gesture to be executed.
"""
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH)
data: str