From 4bf2be63599998ef8d198abb5854d30ff708d2e7 Mon Sep 17 00:00:00 2001 From: Pim Hutting Date: Thu, 8 Jan 2026 09:56:10 +0100 Subject: [PATCH] feat: added a functionality for monitoring page ref: N25B-400 --- src/control_backend/agents/bdi/agentspeak.asl | 35 +++--- .../agents/bdi/agentspeak_generator.py | 24 ++-- .../agents/bdi/bdi_core_agent.py | 9 ++ .../agents/bdi/bdi_program_manager.py | 23 +++- .../user_interrupt/user_interrupt_agent.py | 110 +++++++++++++++++- 5 files changed, 161 insertions(+), 40 deletions(-) diff --git a/src/control_backend/agents/bdi/agentspeak.asl b/src/control_backend/agents/bdi/agentspeak.asl index e6e1fb0..399566c 100644 --- a/src/control_backend/agents/bdi/agentspeak.asl +++ b/src/control_backend/agents/bdi/agentspeak.asl @@ -1,5 +1,6 @@ -phase("0e0f239c-efe9-442c-bdd7-3aabfccd1c49"). +phase("db4c68c3-0316-4905-a8db-22dd5bec7abf"). keyword_said(Keyword) :- (user_said(Message) & .substring(Keyword, Message, Pos)) & (Pos >= 0). +norm("do nothing and make a little dance, do a little laugh") :- phase("db4c68c3-0316-4905-a8db-22dd5bec7abf") & keyword_said("hi"). +!reply_with_goal(Goal) @@ -19,36 +20,30 @@ keyword_said(Keyword) :- (user_said(Message) & .substring(Keyword, Message, Pos) .reply(Message, Norms). +user_said(Message) - : phase("0e0f239c-efe9-442c-bdd7-3aabfccd1c49") + : phase("db4c68c3-0316-4905-a8db-22dd5bec7abf") <- .notify_user_said(Message); -responded_this_turn; !check_triggers; !transition_phase. -+!transition_phase - : phase("0e0f239c-efe9-442c-bdd7-3aabfccd1c49") & - not responded_this_turn - <- -phase("0e0f239c-efe9-442c-bdd7-3aabfccd1c49"); - +phase("1fc60869-86db-483d-b475-b8ecdec4bba8"); - ?user_said(Message); - -+user_said(Message); - .notify_transition_phase("0e0f239c-efe9-442c-bdd7-3aabfccd1c49", "1fc60869-86db-483d-b475-b8ecdec4bba8"). ++!check_triggers + : phase("db4c68c3-0316-4905-a8db-22dd5bec7abf") & + semantic_hello + <- .notify_trigger_start("trigger_"); + .notify_trigger_end("trigger_"). -+user_said(Message) - : phase("1fc60869-86db-483d-b475-b8ecdec4bba8") - <- .notify_user_said(Message); - -responded_this_turn; - !check_triggers; - !transition_phase. ++!trigger_ + <- .notify_trigger_start("trigger_"); + .notify_trigger_end("trigger_"). +!transition_phase - : phase("1fc60869-86db-483d-b475-b8ecdec4bba8") & + : phase("db4c68c3-0316-4905-a8db-22dd5bec7abf") & not responded_this_turn - <- -phase("1fc60869-86db-483d-b475-b8ecdec4bba8"); + <- .notify_transition_phase("db4c68c3-0316-4905-a8db-22dd5bec7abf", "end"); + -phase("db4c68c3-0316-4905-a8db-22dd5bec7abf"); +phase("end"); ?user_said(Message); - -+user_said(Message); - .notify_transition_phase("1fc60869-86db-483d-b475-b8ecdec4bba8", "end"). + -+user_said(Message). +user_said(Message) : phase("end") diff --git a/src/control_backend/agents/bdi/agentspeak_generator.py b/src/control_backend/agents/bdi/agentspeak_generator.py index 17248a8..18cb794 100644 --- a/src/control_backend/agents/bdi/agentspeak_generator.py +++ b/src/control_backend/agents/bdi/agentspeak_generator.py @@ -176,6 +176,16 @@ class AgentSpeakGenerator: context.append(self._astify(from_phase.goals[-1], achieved=True)) body = [ + AstStatement( + StatementType.DO_ACTION, + AstLiteral( + "notify_transition_phase", + [ + AstString(str(from_phase.id)), + AstString(str(to_phase.id) if to_phase else "end"), + ], + ), + ), AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast), AstStatement(StatementType.ADD_BELIEF, to_phase_ast), ] @@ -192,20 +202,6 @@ class AgentSpeakGenerator: ] ) - # Notify outside world about transition - body.append( - AstStatement( - StatementType.DO_ACTION, - AstLiteral( - "notify_transition_phase", - [ - AstString(str(from_phase.id)), - AstString(str(to_phase.id) if to_phase else "end"), - ], - ), - ) - ) - self._asp.plans.append( AstPlan(TriggerType.ADDED_GOAL, AstLiteral("transition_phase"), context, body) ) diff --git a/src/control_backend/agents/bdi/bdi_core_agent.py b/src/control_backend/agents/bdi/bdi_core_agent.py index 99bea80..94232e4 100644 --- a/src/control_backend/agents/bdi/bdi_core_agent.py +++ b/src/control_backend/agents/bdi/bdi_core_agent.py @@ -311,6 +311,15 @@ class BDICoreAgent(BaseAgent): message_text = agentspeak.grounded(term.args[0], intention.scope) norms = agentspeak.grounded(term.args[1], intention.scope) + norm_update_message = InternalMessage( + to=settings.agent_settings.user_interrupt_name, + sender=self.name, + thread="active_norms_update", + body=str(norms), + ) + + self.add_behavior(self.send(norm_update_message)) + self.logger.debug("Norms: %s", norms) self.logger.debug("User text: %s", message_text) diff --git a/src/control_backend/agents/bdi/bdi_program_manager.py b/src/control_backend/agents/bdi/bdi_program_manager.py index ba022eb..7899e3c 100644 --- a/src/control_backend/agents/bdi/bdi_program_manager.py +++ b/src/control_backend/agents/bdi/bdi_program_manager.py @@ -75,7 +75,12 @@ class BDIProgramManager(BaseAgent): self._transition_phase(phases["old"], phases["new"]) def _transition_phase(self, old: str, new: str): - assert old == str(self._phase.id) + if old != str(self._phase.id): + self.logger.warning( + f"Phase transition desync detected! ASL requested move from '{old}', " + f"but Python is currently in '{self._phase.id}'. Request ignored." + ) + return if new == "end": self._phase = None @@ -208,6 +213,7 @@ class BDIProgramManager(BaseAgent): self._initialize_internal_state(program) + await self._send_program_to_user_interrupt(program) await self._send_clear_llm_history() await asyncio.gather( @@ -216,6 +222,21 @@ class BDIProgramManager(BaseAgent): self._send_goals_to_semantic_belief_extractor(), ) + async def _send_program_to_user_interrupt(self, program: Program): + """ + Send the received program to the User Interrupt Agent. + + :param program: The program object received from the API. + """ + msg = InternalMessage( + sender=self.name, + to=settings.agent_settings.user_interrupt_name, + body=program.model_dump_json(), + thread="new_program", + ) + + await self.send(msg) + async def setup(self): """ Initialize the agent. diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py index cfb6d2f..b762e68 100644 --- a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py +++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py @@ -5,8 +5,10 @@ import zmq from zmq.asyncio import Context from control_backend.agents import BaseAgent +from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings +from control_backend.schemas.program import Program from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand @@ -32,6 +34,16 @@ class UserInterruptAgent(BaseAgent): super().__init__(**kwargs) self.sub_socket = None self.pub_socket = None + self._trigger_map = {} + self._trigger_reverse_map = {} + + self._goal_map = {} + self._goal_reverse_map = {} + + self._cond_norm_map = {} + self._cond_norm_reverse_map = {} + + self._belief_condition_map = {} async def setup(self): """ @@ -87,11 +99,19 @@ class UserInterruptAgent(BaseAgent): event_context, ) elif event_type == "override": - await self._send_to_program_manager(event_context) - self.logger.info( - "Forwarded button press (override) with context '%s' to BDIProgramManager.", - event_context, - ) + ui_id = str(event_context) + if asl_trigger := self._trigger_map.get(ui_id): + await self._send_to_bdi("force_trigger", asl_trigger) + self.logger.info( + "Forwarded button press (override) with context '%s' to BDI Core.", + event_context, + ) + else: + await self._send_to_program_manager(event_context) + self.logger.info( + "Forwarded button press (override) with context '%s' to BDIProgramManager.", + event_context, + ) else: self.logger.warning( "Received button press with unknown type '%s' (context: '%s').", @@ -104,6 +124,26 @@ class UserInterruptAgent(BaseAgent): Handle commands received from other internal Python agents. """ match msg.thread: + case "new_program": + self._create_mapping(msg.body) + case "trigger_start": + # msg.body is the sluggified trigger + asl_slug = msg.body + ui_id = self._trigger_reverse_map.get(asl_slug) + + if ui_id: + payload = {"type": "trigger_update", "id": ui_id, "achieved": True} + await self._send_experiment_update(payload) + self.logger.info(f"UI Update: Trigger {asl_slug} started (ID: {ui_id})") + + case "trigger_end": + asl_slug = msg.body + ui_id = self._trigger_reverse_map.get(asl_slug) + + if ui_id: + payload = {"type": "trigger_update", "id": ui_id, "achieved": False} + await self._send_experiment_update(payload) + self.logger.info(f"UI Update: Trigger {asl_slug} ended (ID: {ui_id})") case "transition_phase": new_phase_id = msg.body self.logger.info(f"Phase transition detected: {new_phase_id}") @@ -111,6 +151,10 @@ class UserInterruptAgent(BaseAgent): payload = {"type": "phase_update", "phase_id": new_phase_id} await self._send_experiment_update(payload) + case "active_norms_update": + asl_slugs = [s.strip() for s in msg.body.split(";")] + + await self._broadcast_cond_norms(asl_slugs) case _: self.logger.debug(f"Received internal message on unhandled thread: {msg.thread}") @@ -132,6 +176,54 @@ class UserInterruptAgent(BaseAgent): await asyncio.sleep(2) + async def _broadcast_cond_norms(self, active_slugs: list[str]): + """ + Sends the current state of all conditional norms to the UI. + :param active_slugs: A list of slugs (strings) currently active in the BDI core. + """ + updates = [] + + for asl_slug, ui_id in self._cond_norm_reverse_map.items(): + is_active = asl_slug in active_slugs + updates.append({"id": ui_id, "name": asl_slug, "active": is_active}) + + payload = {"type": "cond_norms_state_update", "norms": updates} + + await self._send_experiment_update(payload) + self.logger.debug(f"Broadcasted state for {len(updates)} conditional norms.") + + def _create_mapping(self, program_json: str): + try: + program = Program.model_validate_json(program_json) + self._trigger_map = {} + self._trigger_reverse_map = {} + self._goal_map = {} + self._cond_norm_map = {} + self._cond_norm_reverse_map = {} + + for phase in program.phases: + for trigger in phase.triggers: + slug = AgentSpeakGenerator.slugify(trigger) + self._trigger_map[str(trigger.id)] = slug + self._trigger_reverse_map[slug] = str(trigger.id) + + for goal in phase.goals: + self._goal_map[str(goal.id)] = AgentSpeakGenerator.slugify(goal) + + for norm in phase.conditional_norms: + if norm.condition: + asl_slug = AgentSpeakGenerator.slugify(norm) + belief_id = str(norm.condition) + + self._cond_norm_map[belief_id] = asl_slug + self._cond_norm_reverse_map[asl_slug] = belief_id + + self.logger.info( + f"Mapped {len(self._trigger_map)} triggers and {len(self._goal_map)} goals." + ) + except Exception as e: + self.logger.error(f"Mapping failed: {e}") + async def _send_experiment_update(self, data): """ Sends an update to the 'experiment' topic. @@ -194,3 +286,11 @@ class UserInterruptAgent(BaseAgent): "Sent button_override belief with id '%s' to Program manager.", belief_id, ) + + async def _send_to_bdi(self, thread: str, body: str): + """Send slug of trigger to BDI""" + msg = InternalMessage( + to=settings.agent_settings.bdi_core_name, sender=self.name, thread=thread, body=body + ) + await self.send(msg) + self.logger.info(f"Directly forced {thread} in BDI: {body}")