import json import zmq from zmq.asyncio import Context from control_backend.agents import BaseAgent from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings from control_backend.schemas.belief_message import Belief, BeliefMessage from control_backend.schemas.program import ConditionalNorm, Program from control_backend.schemas.ri_message import ( GestureCommand, PauseCommand, RIEndpoint, SpeechCommand, ) class UserInterruptAgent(BaseAgent): """ User Interrupt Agent. This agent receives button_pressed events from the external HTTP API (via ZMQ) and uses the associated context to trigger one of the following actions: - Send a prioritized message to the `RobotSpeechAgent` - Send a prioritized gesture to the `RobotGestureAgent` - Send a belief override to the `BDI Core` in order to activate a trigger/conditional norm or complete a goal. Prioritized actions clear the current RI queue before inserting the new item, ensuring they are executed immediately after Pepper's current action has been fulfilled. :ivar sub_socket: The ZMQ SUB socket used to receive user interrupts. """ def __init__(self, **kwargs): super().__init__(**kwargs) self.sub_socket = None self.pub_socket = None self._trigger_map = {} self._trigger_reverse_map = {} self._goal_map = {} # id -> sluggified goal self._goal_reverse_map = {} # sluggified goal -> id self._cond_norm_map = {} # id -> sluggified cond norm self._cond_norm_reverse_map = {} # sluggified cond norm -> id async def setup(self): """ Initialize the agent by setting up ZMQ sockets for receiving button events and publishing updates. """ context = Context.instance() self.sub_socket = context.socket(zmq.SUB) self.sub_socket.connect(settings.zmq_settings.internal_sub_address) self.sub_socket.subscribe("button_pressed") self.pub_socket = context.socket(zmq.PUB) self.pub_socket.connect(settings.zmq_settings.internal_pub_address) self.add_behavior(self._receive_button_event()) async def _receive_button_event(self): """ Main loop to receive and process button press events from the UI. Handles different event types: - `speech`: Triggers immediate robot speech. - `gesture`: Triggers an immediate robot gesture. - `override`: Forces a belief, trigger, or goal completion in the BDI core. - `override_unachieve`: Removes a belief from the BDI core. - `pause`: Toggles the system's pause state. - `next_phase` / `reset_phase`: Controls experiment flow. """ while True: topic, body = await self.sub_socket.recv_multipart() try: event_data = json.loads(body) event_type = event_data.get("type") # e.g., "speech", "gesture" event_context = event_data.get("context") # e.g., "Hello, I am Pepper!" except json.JSONDecodeError: self.logger.error("Received invalid JSON payload on topic %s", topic) continue self.logger.debug("Received event type %s", event_type) match event_type: case "speech": await self._send_to_speech_agent(event_context) self.logger.info( "Forwarded button press (speech) with context '%s' to RobotSpeechAgent.", event_context, ) case "gesture": await self._send_to_gesture_agent(event_context) self.logger.info( "Forwarded button press (gesture) with context '%s' to RobotGestureAgent.", event_context, ) case "override": ui_id = str(event_context) if asl_trigger := self._trigger_map.get(ui_id): await self._send_to_bdi("force_trigger", asl_trigger) self.logger.info( "Forwarded button press (override) with context '%s' to BDI Core.", event_context, ) elif asl_cond_norm := self._cond_norm_map.get(ui_id): await self._send_to_bdi_belief(asl_cond_norm, "cond_norm") self.logger.info( "Forwarded button press (override) with context '%s' to BDI Core.", event_context, ) elif asl_goal := self._goal_map.get(ui_id): await self._send_to_bdi_belief(asl_goal, "goal") self.logger.info( "Forwarded button press (override) with context '%s' to BDI Core.", event_context, ) # Send achieve_goal to program manager to update semantic belief extractor goal_achieve_msg = InternalMessage( to=settings.agent_settings.bdi_program_manager_name, thread="achieve_goal", body=ui_id, ) await self.send(goal_achieve_msg) else: self.logger.warning("Could not determine which element to override.") case "override_unachieve": ui_id = str(event_context) if asl_cond_norm := self._cond_norm_map.get(ui_id): await self._send_to_bdi_belief(asl_cond_norm, "cond_norm", True) self.logger.info( "Forwarded button press (override_unachieve)" "with context '%s' to BDI Core.", event_context, ) else: self.logger.warning( "Could not determine which conditional norm to unachieve." ) case "pause": self.logger.debug( "Received pause/resume button press with context '%s'.", event_context ) await self._send_pause_command(event_context) if event_context: self.logger.info("Sent pause command.") else: self.logger.info("Sent resume command.") case "next_phase" | "reset_phase": await self._send_experiment_control_to_bdi_core(event_type) case _: self.logger.warning( "Received button press with unknown type '%s' (context: '%s').", event_type, event_context, ) async def handle_message(self, msg: InternalMessage): """ Handles internal messages from other agents, such as program updates or trigger notifications. :param msg: The incoming :class:`~control_backend.core.agent_system.InternalMessage`. """ match msg.thread: case "new_program": self._create_mapping(msg.body) case "trigger_start": # msg.body is the sluggified trigger asl_slug = msg.body ui_id = self._trigger_reverse_map.get(asl_slug) if ui_id: payload = {"type": "trigger_update", "id": ui_id, "achieved": True} await self._send_experiment_update(payload) self.logger.info(f"UI Update: Trigger {asl_slug} started (ID: {ui_id})") case "trigger_end": asl_slug = msg.body ui_id = self._trigger_reverse_map.get(asl_slug) if ui_id: payload = {"type": "trigger_update", "id": ui_id, "achieved": False} await self._send_experiment_update(payload) self.logger.info(f"UI Update: Trigger {asl_slug} ended (ID: {ui_id})") case "transition_phase": new_phase_id = msg.body self.logger.info(f"Phase transition detected: {new_phase_id}") payload = {"type": "phase_update", "id": new_phase_id} await self._send_experiment_update(payload) case "goal_start": goal_name = msg.body ui_id = self._goal_reverse_map.get(goal_name) if ui_id: payload = {"type": "goal_update", "id": ui_id, "active": True} await self._send_experiment_update(payload) self.logger.info(f"UI Update: Goal {goal_name} started (ID: {ui_id})") case "active_norms_update": active_norms_asl = [ s.strip("() '\",") for s in msg.body.split(",") if s.strip("() '\",") ] await self._broadcast_cond_norms(active_norms_asl) case _: self.logger.debug(f"Received internal message on unhandled thread: {msg.thread}") async def _broadcast_cond_norms(self, active_slugs: list[str]): """ Broadcasts the current activation state of all conditional norms to the UI. :param active_slugs: A list of sluggified norm names currently active in the BDI core. """ updates = [] for asl_slug, ui_id in self._cond_norm_reverse_map.items(): is_active = asl_slug in active_slugs updates.append({"id": ui_id, "active": is_active}) payload = {"type": "cond_norms_state_update", "norms": updates} if self.pub_socket: topic = b"status" body = json.dumps(payload).encode("utf-8") await self.pub_socket.send_multipart([topic, body]) # self.logger.info(f"UI Update: Active norms {updates}") def _create_mapping(self, program_json: str): """ Creates a bidirectional mapping between UI identifiers and AgentSpeak slugs. :param program_json: The JSON representation of the behavioral program. """ try: program = Program.model_validate_json(program_json) self._trigger_map = {} self._trigger_reverse_map = {} self._goal_map = {} self._cond_norm_map = {} self._cond_norm_reverse_map = {} for phase in program.phases: for trigger in phase.triggers: slug = AgentSpeakGenerator.slugify(trigger) self._trigger_map[str(trigger.id)] = slug self._trigger_reverse_map[slug] = str(trigger.id) for goal in phase.goals: self._goal_map[str(goal.id)] = AgentSpeakGenerator.slugify(goal) self._goal_reverse_map[AgentSpeakGenerator.slugify(goal)] = str(goal.id) for goal, id in self._goal_reverse_map.items(): self.logger.debug(f"Goal mapping: UI ID {goal} -> {id}") for norm in phase.norms: if isinstance(norm, ConditionalNorm): asl_slug = AgentSpeakGenerator.slugify(norm) norm_id = str(norm.id) self._cond_norm_map[norm_id] = asl_slug self._cond_norm_reverse_map[norm.norm] = norm_id self.logger.debug("Added conditional norm %s", asl_slug) self.logger.info( f"Mapped {len(self._trigger_map)} triggers and {len(self._goal_map)} goals " f"and {len(self._cond_norm_map)} conditional norms for UserInterruptAgent." ) except Exception as e: self.logger.error(f"Mapping failed: {e}") async def _send_experiment_update(self, data, should_log: bool = True): """ Publishes an experiment state update to the internal ZMQ bus for the UI. :param data: The update payload. :param should_log: Whether to log the update. """ if self.pub_socket: topic = b"experiment" body = json.dumps(data).encode("utf-8") await self.pub_socket.send_multipart([topic, body]) if should_log: self.logger.debug(f"Sent experiment update: {data}") async def _send_to_speech_agent(self, text_to_say: str): """ method to send prioritized speech command to RobotSpeechAgent. :param text_to_say: The string that the robot has to say. """ cmd = SpeechCommand(data=text_to_say, is_priority=True) out_msg = InternalMessage( to=settings.agent_settings.robot_speech_name, sender=self.name, body=cmd.model_dump_json(), ) await self.send(out_msg) async def _send_to_gesture_agent(self, single_gesture_name: str): """ method to send prioritized gesture command to RobotGestureAgent. :param single_gesture_name: The gesture tag that the robot has to perform. """ # the endpoint is set to always be GESTURE_SINGLE for user interrupts cmd = GestureCommand( endpoint=RIEndpoint.GESTURE_SINGLE, data=single_gesture_name, is_priority=True ) out_msg = InternalMessage( to=settings.agent_settings.robot_gesture_name, sender=self.name, body=cmd.model_dump_json(), ) await self.send(out_msg) async def _send_to_bdi(self, thread: str, body: str): """Send slug of trigger to BDI""" msg = InternalMessage(to=settings.agent_settings.bdi_core_name, thread=thread, body=body) await self.send(msg) self.logger.info(f"Directly forced {thread} in BDI: {body}") async def _send_to_bdi_belief(self, asl: str, asl_type: str, unachieve: bool = False): """Send belief to BDI Core""" if asl_type == "goal": belief_name = f"achieved_{asl}" elif asl_type == "cond_norm": belief_name = f"force_{asl}" else: self.logger.warning("Tried to send belief with unknown type") return belief = Belief(name=belief_name, arguments=None) self.logger.debug(f"Sending belief to BDI Core: {belief_name}") # Conditional norms are unachieved by removing the belief belief_message = ( BeliefMessage(delete=[belief]) if unachieve else BeliefMessage(create=[belief]) ) msg = InternalMessage( to=settings.agent_settings.bdi_core_name, thread="beliefs", body=belief_message.model_dump_json(), ) await self.send(msg) async def _send_experiment_control_to_bdi_core(self, type): """ method to send experiment control buttons to bdi core. :param type: the type of control button we should send to the bdi core. """ # Switch which thread we should send to bdi core thread = "" match type: case "next_phase": thread = "force_next_phase" case "reset_phase": thread = "reset_current_phase" case "reset_experiment": thread = "reset_experiment" case _: self.logger.warning( "Received unknown experiment control type '%s' to send to BDI Core.", type, ) out_msg = InternalMessage( to=settings.agent_settings.bdi_core_name, sender=self.name, thread=thread, body="", ) self.logger.debug("Sending experiment control '%s' to BDI Core.", thread) await self.send(out_msg) async def _send_pause_command(self, pause): """ Send a pause command to the Robot Interface via the RI Communication Agent. Send a pause command to the other internal agents; for now just VAD agent. """ cmd = PauseCommand(data=pause) message = InternalMessage( to=settings.agent_settings.ri_communication_name, sender=self.name, body=cmd.model_dump_json(), ) await self.send(message) if pause == "true": # Send pause to VAD agent vad_message = InternalMessage( to=settings.agent_settings.vad_name, sender=self.name, body="PAUSE", ) await self.send(vad_message) self.logger.info("Sent pause command to VAD Agent and RI Communication Agent.") else: # Send resume to VAD agent vad_message = InternalMessage( to=settings.agent_settings.vad_name, sender=self.name, body="RESUME", ) await self.send(vad_message) self.logger.info("Sent resume command to VAD Agent and RI Communication Agent.")