import asyncio import json import zmq from zmq.asyncio import Context from control_backend.agents import BaseAgent from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand class UserInterruptAgent(BaseAgent): """ User Interrupt Agent. This agent receives button_pressed events from the external HTTP API (via ZMQ) and uses the associated context to trigger one of the following actions: - Send a prioritized message to the `RobotSpeechAgent` - Send a prioritized gesture to the `RobotGestureAgent` - Send a belief override to the `BDIProgramManager`in order to activate a trigger/conditional norm or complete a goal. Prioritized actions clear the current RI queue before inserting the new item, ensuring they are executed immediately after Pepper's current action has been fulfilled. :ivar sub_socket: The ZMQ SUB socket used to receive user intterupts. """ def __init__(self, **kwargs): super().__init__(**kwargs) self.sub_socket = None self.pub_socket = None async def setup(self): """ Initialize the agent. Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic. Starts the background behavior to receive the user interrupts. """ context = Context.instance() self.sub_socket = context.socket(zmq.SUB) self.sub_socket.connect(settings.zmq_settings.internal_sub_address) self.sub_socket.subscribe("button_pressed") self.pub_socket = context.socket(zmq.PUB) self.pub_socket.connect(settings.zmq_settings.internal_pub_address) self.add_behavior(self._receive_button_event()) # self.add_behavior(self.test_sending_behaviour()) async def _receive_button_event(self): """ The behaviour of the UserInterruptAgent. Continuous loop that receives button_pressed events from the button_pressed HTTP endpoint. These events contain a type and a context. These are the different types and contexts: - type: "speech", context: string that the robot has to say. - type: "gesture", context: single gesture name that the robot has to perform. - type: "override", context: belief_id that overrides the goal/trigger/conditional norm. """ while True: topic, body = await self.sub_socket.recv_multipart() try: event_data = json.loads(body) event_type = event_data.get("type") # e.g., "speech", "gesture" event_context = event_data.get("context") # e.g., "Hello, I am Pepper!" except json.JSONDecodeError: self.logger.error("Received invalid JSON payload on topic %s", topic) continue if event_type == "speech": await self._send_to_speech_agent(event_context) self.logger.info( "Forwarded button press (speech) with context '%s' to RobotSpeechAgent.", event_context, ) elif event_type == "gesture": await self._send_to_gesture_agent(event_context) self.logger.info( "Forwarded button press (gesture) with context '%s' to RobotGestureAgent.", event_context, ) elif event_type == "override": await self._send_to_program_manager(event_context) self.logger.info( "Forwarded button press (override) with context '%s' to BDIProgramManager.", event_context, ) else: self.logger.warning( "Received button press with unknown type '%s' (context: '%s').", event_type, event_context, ) async def handle_message(self, msg: InternalMessage): """ Handle commands received from other internal Python agents. """ match msg.thread: case "transition_phase": new_phase_id = msg.body self.logger.info(f"Phase transition detected: {new_phase_id}") payload = {"type": "phase_update", "phase_id": new_phase_id} await self._send_experiment_update(payload) case _: self.logger.debug(f"Received internal message on unhandled thread: {msg.thread}") # moet weg!!!!! async def test_sending_behaviour(self): self.logger.info("Starting simple test sending behaviour...") while True: try: test_data = {"type": "heartbeat", "status": "ok"} await self._send_experiment_update(test_data) except zmq.ZMQError as ze: self.logger.error(f"ZMQ error: {ze}") except Exception as e: self.logger.error(f"Error: {e}") await asyncio.sleep(2) async def _send_experiment_update(self, data): """ Sends an update to the 'experiment' topic. The SSE endpoint will pick this up and push it to the UI. """ if self.pub_socket: topic = b"experiment" body = json.dumps(data).encode("utf-8") await self.pub_socket.send_multipart([topic, body]) self.logger.debug(f"Sent experiment update: {data}") async def _send_to_speech_agent(self, text_to_say: str): """ method to send prioritized speech command to RobotSpeechAgent. :param text_to_say: The string that the robot has to say. """ cmd = SpeechCommand(data=text_to_say, is_priority=True) out_msg = InternalMessage( to=settings.agent_settings.robot_speech_name, sender=self.name, body=cmd.model_dump_json(), ) await self.send(out_msg) async def _send_to_gesture_agent(self, single_gesture_name: str): """ method to send prioritized gesture command to RobotGestureAgent. :param single_gesture_name: The gesture tag that the robot has to perform. """ # the endpoint is set to always be GESTURE_SINGLE for user interrupts cmd = GestureCommand( endpoint=RIEndpoint.GESTURE_SINGLE, data=single_gesture_name, is_priority=True ) out_msg = InternalMessage( to=settings.agent_settings.robot_gesture_name, sender=self.name, body=cmd.model_dump_json(), ) await self.send(out_msg) async def _send_to_program_manager(self, belief_id: str): """ Send a button_override belief to the BDIProgramManager. :param belief_id: The belief_id that overrides the goal/trigger/conditional norm. this id can belong to a basic belief or an inferred belief. See also: https://utrechtuniversity.youtrack.cloud/articles/N25B-A-27/UI-components """ data = {"belief": belief_id} message = InternalMessage( to=settings.agent_settings.bdi_program_manager_name, sender=self.name, body=json.dumps(data), thread="belief_override_id", ) await self.send(message) self.logger.info( "Sent button_override belief with id '%s' to Program manager.", belief_id, )