From 3d49e44cf7c4e877e612d52919c44abf3e977706 Mon Sep 17 00:00:00 2001 From: Kasper Marinus Date: Wed, 7 Jan 2026 17:13:58 +0100 Subject: [PATCH] fix: complete pipeline working User interrupts still need to be tested. ref: N25B-429 --- .../agents/bdi/agentspeak_generator.py | 52 ++++- .../agents/bdi/bdi_core_agent.py | 177 ++++++++++++++++-- .../agents/bdi/bdi_program_manager.py | 74 +++++--- .../communication/ri_communication_agent.py | 3 +- src/control_backend/agents/llm/llm_agent.py | 24 ++- src/control_backend/core/agent_system.py | 1 + src/control_backend/core/config.py | 2 +- .../schemas/internal_message.py | 2 +- 8 files changed, 276 insertions(+), 59 deletions(-) diff --git a/src/control_backend/agents/bdi/agentspeak_generator.py b/src/control_backend/agents/bdi/agentspeak_generator.py index 1c313ce..17248a8 100644 --- a/src/control_backend/agents/bdi/agentspeak_generator.py +++ b/src/control_backend/agents/bdi/agentspeak_generator.py @@ -157,7 +157,7 @@ class AgentSpeakGenerator: previous_goal = None for goal in phase.goals: - self._process_goal(goal, phase, previous_goal) + self._process_goal(goal, phase, previous_goal, main_goal=True) previous_goal = goal for trigger in phase.triggers: @@ -192,6 +192,20 @@ class AgentSpeakGenerator: ] ) + # Notify outside world about transition + body.append( + AstStatement( + StatementType.DO_ACTION, + AstLiteral( + "notify_transition_phase", + [ + AstString(str(from_phase.id)), + AstString(str(to_phase.id) if to_phase else "end"), + ], + ), + ) + ) + self._asp.plans.append( AstPlan(TriggerType.ADDED_GOAL, AstLiteral("transition_phase"), context, body) ) @@ -213,6 +227,11 @@ class AgentSpeakGenerator: def _add_default_loop(self, phase: Phase) -> None: actions = [] + actions.append( + AstStatement( + StatementType.DO_ACTION, AstLiteral("notify_user_said", [AstVar("Message")]) + ) + ) actions.append(AstStatement(StatementType.REMOVE_BELIEF, AstLiteral("responded_this_turn"))) actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("check_triggers"))) @@ -236,6 +255,7 @@ class AgentSpeakGenerator: phase: Phase, previous_goal: Goal | None = None, continues_response: bool = False, + main_goal: bool = False, ) -> None: context: list[AstExpression] = [self._astify(phase)] context.append(~self._astify(goal, achieved=True)) @@ -245,6 +265,13 @@ class AgentSpeakGenerator: context.append(~AstLiteral("responded_this_turn")) body = [] + if main_goal: # UI only needs to know about the main goals + body.append( + AstStatement( + StatementType.DO_ACTION, + AstLiteral("notify_goal_start", [AstString(self.slugify(goal))]), + ) + ) subgoals = [] for step in goal.plan.steps: @@ -283,11 +310,23 @@ class AgentSpeakGenerator: body = [] subgoals = [] + body.append( + AstStatement( + StatementType.DO_ACTION, + AstLiteral("notify_trigger_start", [AstString(self.slugify(trigger))]), + ) + ) for step in trigger.plan.steps: body.append(self._step_to_statement(step)) if isinstance(step, Goal): step.can_fail = False # triggers are continuous sequence subgoals.append(step) + body.append( + AstStatement( + StatementType.DO_ACTION, + AstLiteral("notify_trigger_end", [AstString(self.slugify(trigger))]), + ) + ) self._asp.plans.append( AstPlan( @@ -298,6 +337,9 @@ class AgentSpeakGenerator: ) ) + # Force trigger (from UI) + self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(trigger), [], body)) + for subgoal in subgoals: self._process_goal(subgoal, phase, continues_response=True) @@ -332,13 +374,7 @@ class AgentSpeakGenerator: @_astify.register def _(self, sb: SemanticBelief) -> AstExpression: - return AstLiteral(self.get_semantic_belief_slug(sb)) - - @staticmethod - def get_semantic_belief_slug(sb: SemanticBelief) -> str: - # If you need a method like this for other types, make a public slugify singledispatch for - # all types. - return f"semantic_{AgentSpeakGenerator._slugify_str(sb.name)}" + return AstLiteral(self.slugify(sb)) @_astify.register def _(self, ib: InferredBelief) -> AstExpression: diff --git a/src/control_backend/agents/bdi/bdi_core_agent.py b/src/control_backend/agents/bdi/bdi_core_agent.py index 58ece29..aec8343 100644 --- a/src/control_backend/agents/bdi/bdi_core_agent.py +++ b/src/control_backend/agents/bdi/bdi_core_agent.py @@ -1,5 +1,6 @@ import asyncio import copy +import json import time from collections.abc import Iterable @@ -13,7 +14,7 @@ from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings from control_backend.schemas.belief_message import BeliefMessage from control_backend.schemas.llm_prompt_message import LLMPromptMessage -from control_backend.schemas.ri_message import SpeechCommand +from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand DELIMITER = ";\n" # TODO: temporary until we support lists in AgentSpeak @@ -155,6 +156,17 @@ class BDICoreAgent(BaseAgent): body=cmd.model_dump_json(), ) await self.send(out_msg) + case settings.agent_settings.user_interrupt_name: + content = msg.body + self.logger.debug("Received user interruption: %s", content) + + match msg.thread: + case "force_phase_transition": + self._set_goal("transition_phase") + case "force_trigger": + self._force_trigger(msg.body) + case _: + self.logger.warning("Received unknow user interruption: %s", msg) def _apply_belief_changes(self, belief_changes: BeliefMessage): """ @@ -250,6 +262,37 @@ class BDICoreAgent(BaseAgent): self.logger.debug(f"Removed {removed_count} beliefs.") + def _set_goal(self, name: str, args: Iterable[str] | None = None): + args = args or [] + + if args: + merged_args = DELIMITER.join(arg for arg in args) + new_args = (agentspeak.Literal(merged_args),) + term = agentspeak.Literal(name, new_args) + else: + term = agentspeak.Literal(name) + + self.bdi_agent.call( + agentspeak.Trigger.addition, + agentspeak.GoalType.achievement, + term, + agentspeak.runtime.Intention(), + ) + + self._wake_bdi_loop.set() + + self.logger.debug(f"Set goal !{self.format_belief_string(name, args)}.") + + def _force_trigger(self, name: str): + self.bdi_agent.call( + agentspeak.Trigger.addition, + agentspeak.GoalType.achievement, + agentspeak.Literal(name), + agentspeak.runtime.Intention(), + ) + + self.logger.info("Manually forced trigger %s.", name) + def _add_custom_actions(self) -> None: """ Add any custom actions here. Inside `@self.actions.add()`, the first argument is @@ -258,7 +301,7 @@ class BDICoreAgent(BaseAgent): """ @self.actions.add(".reply", 2) - def _reply(agent: "BDICoreAgent", term, intention): + def _reply(agent, term, intention): """ Let the LLM generate a response to a user's utterance with the current norms and goals. """ @@ -291,7 +334,7 @@ class BDICoreAgent(BaseAgent): yield @self.actions.add(".say", 1) - def _say(agent: "BDICoreAgent", term, intention): + def _say(agent, term, intention): """ Make the robot say the given text instantly. """ @@ -305,12 +348,21 @@ class BDICoreAgent(BaseAgent): sender=settings.agent_settings.bdi_core_name, body=speech_command.model_dump_json(), ) - # TODO: add to conversation history + self.add_behavior(self.send(speech_message)) + + chat_history_message = InternalMessage( + to=settings.agent_settings.llm_name, + thread="assistant_message", + body=str(message_text), + ) + + self.add_behavior(self.send(chat_history_message)) + yield @self.actions.add(".gesture", 2) - def _gesture(agent: "BDICoreAgent", term, intention): + def _gesture(agent, term, intention): """ Make the robot perform the given gesture instantly. """ @@ -323,13 +375,113 @@ class BDICoreAgent(BaseAgent): gesture_name, ) - # gesture = Gesture(type=gesture_type, name=gesture_name) - # gesture_message = InternalMessage( - # to=settings.agent_settings.robot_gesture_name, - # sender=settings.agent_settings.bdi_core_name, - # body=gesture.model_dump_json(), - # ) - # asyncio.create_task(agent.send(gesture_message)) + if str(gesture_type) == "single": + endpoint = RIEndpoint.GESTURE_SINGLE + elif str(gesture_type) == "tag": + endpoint = RIEndpoint.GESTURE_TAG + else: + self.logger.warning("Gesture type %s could not be resolved.", gesture_type) + endpoint = RIEndpoint.GESTURE_SINGLE + + gesture_command = GestureCommand(endpoint=endpoint, data=gesture_name) + gesture_message = InternalMessage( + to=settings.agent_settings.robot_gesture_name, + sender=settings.agent_settings.bdi_core_name, + body=gesture_command.model_dump_json(), + ) + self.add_behavior(self.send(gesture_message)) + yield + + @self.actions.add(".notify_user_said", 1) + def _notify_user_said(agent, term, intention): + user_said = agentspeak.grounded(term.args[0], intention.scope) + + msg = InternalMessage( + to=settings.agent_settings.llm_name, thread="user_message", body=str(user_said) + ) + + self.add_behavior(self.send(msg)) + + yield + + @self.actions.add(".notify_trigger_start", 1) + def _notify_trigger_start(agent, term, intention): + """ + Notify the UI about the trigger we just started doing. + """ + trigger_name = agentspeak.grounded(term.args[0], intention.scope) + + self.logger.debug("Started trigger %s", trigger_name) + + msg = InternalMessage( + to=settings.agent_settings.user_interrupt_name, + sender=self.name, + thread="trigger_start", + body=str(trigger_name), + ) + + # TODO: check with Pim + self.add_behavior(self.send(msg)) + + yield + + @self.actions.add(".notify_trigger_end", 1) + def _notify_trigger_end(agent, term, intention): + """ + Notify the UI about the trigger we just started doing. + """ + trigger_name = agentspeak.grounded(term.args[0], intention.scope) + + self.logger.debug("Finished trigger %s", trigger_name) + + msg = InternalMessage( + to=settings.agent_settings.user_interrupt_name, + sender=self.name, + thread="trigger_end", + body=str(trigger_name), + ) + + # TODO: check with Pim + self.add_behavior(self.send(msg)) + + yield + + @self.actions.add(".notify_goal_start", 1) + def _notify_goal_start(agent, term, intention): + """ + Notify the UI about the goal we just started chasing. + """ + goal_name = agentspeak.grounded(term.args[0], intention.scope) + + self.logger.debug("Started chasing goal %s", goal_name) + + msg = InternalMessage( + to=settings.agent_settings.user_interrupt_name, + sender=self.name, + thread="goal_start", + body=str(goal_name), + ) + + self.add_behavior(self.send(msg)) + + yield + + @self.actions.add(".notify_transition_phase", 2) + def _notify_transition_phase(agent, term, intention): + """ + Notify the BDI program manager about a phase transition. + """ + old = agentspeak.grounded(term.args[0], intention.scope) + new = agentspeak.grounded(term.args[1], intention.scope) + + msg = InternalMessage( + to=settings.agent_settings.bdi_program_manager_name, + thread="transition_phase", + body=json.dumps({"old": str(old), "new": str(new)}), + ) + + self.add_behavior(self.send(msg)) + yield async def _send_to_llm(self, text: str, norms: str, goals: str): @@ -341,6 +493,7 @@ class BDICoreAgent(BaseAgent): to=settings.agent_settings.llm_name, sender=self.name, body=prompt.model_dump_json(), + thread="prompt_message", ) await self.send(msg) self.logger.info("Message sent to LLM agent: %s", text) diff --git a/src/control_backend/agents/bdi/bdi_program_manager.py b/src/control_backend/agents/bdi/bdi_program_manager.py index 54e7196..ba000de 100644 --- a/src/control_backend/agents/bdi/bdi_program_manager.py +++ b/src/control_backend/agents/bdi/bdi_program_manager.py @@ -1,4 +1,5 @@ import asyncio +import json import zmq from pydantic import ValidationError @@ -9,7 +10,7 @@ from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator from control_backend.core.config import settings from control_backend.schemas.belief_list import BeliefList from control_backend.schemas.internal_message import InternalMessage -from control_backend.schemas.program import Belief, ConditionalNorm, InferredBelief, Program +from control_backend.schemas.program import Belief, ConditionalNorm, InferredBelief, Phase, Program class BDIProgramManager(BaseAgent): @@ -24,20 +25,20 @@ class BDIProgramManager(BaseAgent): :ivar sub_socket: The ZMQ SUB socket used to receive program updates. """ + _program: Program + _phase: Phase | None + def __init__(self, **kwargs): super().__init__(**kwargs) self.sub_socket = None + def _initialize_internal_state(self, program: Program): + self._program = program + self._phase = program.phases[0] # start in first phase + async def _create_agentspeak_and_send_to_bdi(self, program: Program): """ - Convert a received program into BDI beliefs and send them to the BDI Core Agent. - - Currently, it takes the **first phase** of the program and extracts: - - **Norms**: Constraints or rules the agent must follow. - - **Goals**: Objectives the agent must achieve. - - These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will - overwrite any existing norms/goals of the same name in the BDI agent. + Convert a received program into an AgentSpeak file and send it to the BDI Core Agent. :param program: The program object received from the API. """ @@ -59,17 +60,44 @@ class BDIProgramManager(BaseAgent): await self.send(msg) - @staticmethod - def _extract_beliefs_from_program(program: Program) -> list[Belief]: + def handle_message(self, msg: InternalMessage): + match msg.thread: + case "transition_phase": + phases = json.loads(msg.body) + + self._transition_phase(phases["old"], phases["new"]) + + def _transition_phase(self, old: str, new: str): + assert old == str(self._phase.id) + + if new == "end": + self._phase = None + return + + for phase in self._program.phases: + if str(phase.id) == new: + self._phase = phase + + self._send_beliefs_to_semantic_belief_extractor() + + # Notify user interaction agent + msg = InternalMessage( + to=settings.agent_settings.user_interrupt_name, + thread="transition_phase", + body=str(self._phase.id), + ) + + self.add_behavior(self.send(msg)) + + def _extract_current_beliefs(self) -> list[Belief]: beliefs: list[Belief] = [] - for phase in program.phases: - for norm in phase.norms: - if isinstance(norm, ConditionalNorm): - beliefs += BDIProgramManager._extract_beliefs_from_belief(norm.condition) + for norm in self._phase.norms: + if isinstance(norm, ConditionalNorm): + beliefs += self._extract_beliefs_from_belief(norm.condition) - for trigger in phase.triggers: - beliefs += BDIProgramManager._extract_beliefs_from_belief(trigger.condition) + for trigger in self._phase.triggers: + beliefs += self._extract_beliefs_from_belief(trigger.condition) return beliefs @@ -81,13 +109,11 @@ class BDIProgramManager(BaseAgent): ) + BDIProgramManager._extract_beliefs_from_belief(belief.right) return [belief] - async def _send_beliefs_to_semantic_belief_extractor(self, program: Program): + async def _send_beliefs_to_semantic_belief_extractor(self): """ Extract beliefs from the program and send them to the Semantic Belief Extractor Agent. - - :param program: The program received from the API. """ - beliefs = BeliefList(beliefs=self._extract_beliefs_from_program(program)) + beliefs = BeliefList(beliefs=self._extract_current_beliefs()) message = InternalMessage( to=settings.agent_settings.text_belief_extractor_name, @@ -111,12 +137,14 @@ class BDIProgramManager(BaseAgent): try: program = Program.model_validate_json(body) except ValidationError: - self.logger.exception("Received an invalid program.") + self.logger.warning("Received an invalid program.") continue + self._initialize_internal_state(program) + await asyncio.gather( self._create_agentspeak_and_send_to_bdi(program), - self._send_beliefs_to_semantic_belief_extractor(program), + self._send_beliefs_to_semantic_belief_extractor(), ) async def setup(self): diff --git a/src/control_backend/agents/communication/ri_communication_agent.py b/src/control_backend/agents/communication/ri_communication_agent.py index 34e5b25..34d3a5a 100644 --- a/src/control_backend/agents/communication/ri_communication_agent.py +++ b/src/control_backend/agents/communication/ri_communication_agent.py @@ -248,7 +248,8 @@ class RICommunicationAgent(BaseAgent): self._req_socket.recv_json(), timeout=seconds_to_wait_total / 2 ) - self.logger.debug(f'Received message "{message}" from RI.') + if message["endpoint"] and message["endpoint"] != "ping": + self.logger.debug(f'Received message "{message}" from RI.') if "endpoint" not in message: self.logger.warning("No received endpoint in message, expected ping endpoint.") continue diff --git a/src/control_backend/agents/llm/llm_agent.py b/src/control_backend/agents/llm/llm_agent.py index 17edec9..3e19c49 100644 --- a/src/control_backend/agents/llm/llm_agent.py +++ b/src/control_backend/agents/llm/llm_agent.py @@ -46,12 +46,17 @@ class LLMAgent(BaseAgent): :param msg: The received internal message. """ if msg.sender == settings.agent_settings.bdi_core_name: - self.logger.debug("Processing message from BDI core.") - try: - prompt_message = LLMPromptMessage.model_validate_json(msg.body) - await self._process_bdi_message(prompt_message) - except ValidationError: - self.logger.debug("Prompt message from BDI core is invalid.") + match msg.thread: + case "prompt_message": + try: + prompt_message = LLMPromptMessage.model_validate_json(msg.body) + await self._process_bdi_message(prompt_message) + except ValidationError: + self.logger.debug("Prompt message from BDI core is invalid.") + case "assistant_message": + self.history.append({"role": "assistant", "content": msg.body}) + case "user_message": + self.history.append({"role": "user", "content": msg.body}) else: self.logger.debug("Message ignored (not from BDI core.") @@ -114,13 +119,6 @@ class LLMAgent(BaseAgent): :param goals: Goals the LLM should achieve. :yield: Fragments of the LLM-generated content (e.g., sentences/phrases). """ - self.history.append( - { - "role": "user", - "content": prompt, - } - ) - instructions = LLMInstructions(norms if norms else None, goals if goals else None) messages = [ { diff --git a/src/control_backend/core/agent_system.py b/src/control_backend/core/agent_system.py index 9d7a47f..fc418bb 100644 --- a/src/control_backend/core/agent_system.py +++ b/src/control_backend/core/agent_system.py @@ -131,6 +131,7 @@ class BaseAgent(ABC): :param message: The message to send. """ target = AgentDirectory.get(message.to) + message.sender = self.name if target: await target.inbox.put(message) self.logger.debug(f"Sent message {message.body} to {message.to} via regular inbox.") diff --git a/src/control_backend/core/config.py b/src/control_backend/core/config.py index 8a7267c..353a408 100644 --- a/src/control_backend/core/config.py +++ b/src/control_backend/core/config.py @@ -75,7 +75,7 @@ class BehaviourSettings(BaseModel): # VAD settings vad_prob_threshold: float = 0.5 vad_initial_since_speech: int = 100 - vad_non_speech_patience_chunks: int = 3 + vad_non_speech_patience_chunks: int = 15 # transcription behaviour transcription_max_concurrent_tasks: int = 3 diff --git a/src/control_backend/schemas/internal_message.py b/src/control_backend/schemas/internal_message.py index 071d884..14278c0 100644 --- a/src/control_backend/schemas/internal_message.py +++ b/src/control_backend/schemas/internal_message.py @@ -12,6 +12,6 @@ class InternalMessage(BaseModel): """ to: str - sender: str + sender: str | None = None body: str thread: str | None = None