import json import zmq import zmq.asyncio as azmq from control_backend.agents import BaseAgent from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings from control_backend.schemas.ri_message import SpeechCommand class RobotSpeechAgent(BaseAgent): """ This agent acts as a bridge between the control backend and the Robot Interface (RI). It receives speech commands from other agents or from the UI, and forwards them to the robot via a ZMQ PUB socket. :ivar subsocket: ZMQ SUB socket for receiving external commands (e.g., from UI). :ivar pubsocket: ZMQ PUB socket for sending commands to the Robot Interface. :ivar address: Address to bind/connect the PUB socket. :ivar bind: Whether to bind or connect the PUB socket. """ subsocket: azmq.Socket pubsocket: azmq.Socket address = "" bind = False def __init__( self, name: str, address: str, bind=False, ): super().__init__(name) self.address = address self.bind = bind async def setup(self): """ Initialize the agent. 1. Sets up the PUB socket to talk to the robot. 2. Sets up the SUB socket to listen for "command" topics (from UI/External). 3. Starts the loop for handling ZMQ commands. """ self.logger.info("Setting up %s", self.name) context = azmq.Context.instance() # To the robot self.pubsocket = context.socket(zmq.PUB) if self.bind: # TODO: Should this ever be the case? self.pubsocket.bind(self.address) else: self.pubsocket.connect(self.address) # Receive internal topics regarding commands self.subsocket = context.socket(zmq.SUB) self.subsocket.connect(settings.zmq_settings.internal_sub_address) self.subsocket.setsockopt(zmq.SUBSCRIBE, b"command") self.add_behavior(self._zmq_command_loop()) self.logger.info("Finished setting up %s", self.name) async def stop(self): if self.subsocket: self.subsocket.close() if self.pubsocket: self.pubsocket.close() await super().stop() async def handle_message(self, msg: InternalMessage): """ Handle commands received from other internal Python agents. Validates the message as a :class:`SpeechCommand` and forwards it to the robot. :param msg: The internal message containing the command. """ try: speech_command = SpeechCommand.model_validate_json(msg.body) await self.pubsocket.send_json(speech_command.model_dump()) except Exception: self.logger.exception("Error processing internal message.") async def _zmq_command_loop(self): """ Loop to handle commands received via ZMQ (e.g., from the UI). Listens on the 'command' topic, validates the JSON, and forwards it to the robot. """ while self._running: try: _, body = await self.subsocket.recv_multipart() body = json.loads(body) message = SpeechCommand.model_validate(body) await self.pubsocket.send_json(message.model_dump()) except Exception: self.logger.exception("Error processing ZMQ message.")