import json import zmq import zmq.asyncio as azmq from control_backend.agents import BaseAgent from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings from control_backend.schemas.ri_message import GestureCommand, RIEndpoint class RobotGestureAgent(BaseAgent): """ This agent acts as a bridge between the control backend and the Robot Interface (RI). It receives gesture commands from other agents or from the UI, and forwards them to the robot via a ZMQ PUB socket. :ivar subsocket: ZMQ SUB socket for receiving external commands (e.g., from UI). :ivar pubsocket: ZMQ PUB socket for sending commands to the Robot Interface. :ivar address: Address to bind/connect the PUB socket. :ivar bind: Whether to bind or connect the PUB socket. :ivar gesture_data: A list of strings for available gestures """ subsocket: azmq.Socket repsocket: azmq.Socket pubsocket: azmq.Socket address = "" bind = False gesture_tags = [] gesture_basic = [] gesture_single = [] def __init__( self, name: str, address=settings.zmq_settings.ri_command_address, bind=False, gesture_tags=None, gesture_basic=None, gesture_single=None, ): self.gesture_tags = gesture_tags or [] self.gesture_basic = gesture_basic or [] self.gesture_single = gesture_single or [] super().__init__(name) self.address = address self.bind = bind async def setup(self): """ Initialize the agent. 1. Sets up the PUB socket to talk to the robot. 2. Sets up the SUB socket to listen for "command" topics (from UI/External). 3. Starts the loop for handling ZMQ commands. """ self.logger.info("Setting up %s", self.name) context = azmq.Context.instance() # To the robot self.pubsocket = context.socket(zmq.PUB) if self.bind: self.pubsocket.bind(self.address) else: self.pubsocket.connect(self.address) # Receive internal topics regarding commands self.subsocket = context.socket(zmq.SUB) self.subsocket.connect(settings.zmq_settings.internal_sub_address) self.subsocket.setsockopt(zmq.SUBSCRIBE, b"command") self.subsocket.setsockopt(zmq.SUBSCRIBE, b"send_gestures") # REP socket for replying to gesture requests self.repsocket = context.socket(zmq.REP) self.repsocket.bind(settings.zmq_settings.internal_gesture_rep_adress) self.add_behavior(self._zmq_command_loop()) self.add_behavior(self._fetch_gestures_loop()) self.logger.info("Finished setting up %s", self.name) async def stop(self): if self.subsocket: self.subsocket.close() if self.pubsocket: self.pubsocket.close() await super().stop() async def handle_message(self, msg: InternalMessage): """ Handle commands received from other internal Python agents. Validates the message as a :class:`GestureCommand` and forwards it to the robot. :param msg: The internal message containing the command. """ try: gesture_command = GestureCommand.model_validate_json(msg.body) # if gesture_command.endpoint == RIEndpoint.GESTURE_TAG: # if gesture_command.data not in self.gesture_data: # self.logger.warning( # "Received gesture tag '%s' which is not in available tags. # Early returning", # gesture_command.data, # ) # return await self.pubsocket.send_json(gesture_command.model_dump()) except Exception: self.logger.exception("Error processing internal message.") async def _zmq_command_loop(self): """ Loop to handle commands received via ZMQ (e.g., from the UI). Listens on the 'command' topic, validates the JSON and forwards it to the robot. """ while self._running: try: topic, body = await self.subsocket.recv_multipart() # Don't process send_gestures here if topic != b"command": continue body = json.loads(body) gesture_command = GestureCommand.model_validate(body) if gesture_command.endpoint == RIEndpoint.GESTURE_TAG: if gesture_command.data not in self.gesture_data: self.logger.warning( "Received gesture tag '%s' which is not in available tags.\ Early returning", gesture_command.data, ) continue await self.pubsocket.send_json(gesture_command.model_dump()) except Exception: self.logger.exception("Error processing ZMQ message.") async def _fetch_gestures_loop(self): """ REP socket handler for gesture queries. Supports: - tags - basic_gestures - single_gestures """ while self._running: try: req = await self.repsocket.recv_json() req_type = req.get("type") amount = req.get("count") if req_type == "tags": data = self.gesture_tags key = "tags" elif req_type == "basic": data = self.gesture_basic key = "basic_gestures" elif req_type == "single": data = self.gesture_single key = "single_gestures" else: await self.repsocket.send_json({}) continue if amount: data = data[:amount] await self.repsocket.send_json({key: data}) except Exception: self.logger.exception("Error fetching gestures.")