From bab48006981f7a99614690da1fdf1d8e2ffacd4c Mon Sep 17 00:00:00 2001 From: Kasper Marinus Date: Tue, 16 Dec 2025 12:10:52 +0100 Subject: [PATCH] feat: add trigger generation ref: N25B-376 --- .../agents/bdi/bdi_program_manager.py | 104 ++++++++++++++---- 1 file changed, 85 insertions(+), 19 deletions(-) diff --git a/src/control_backend/agents/bdi/bdi_program_manager.py b/src/control_backend/agents/bdi/bdi_program_manager.py index 4213cfa..5b2d484 100644 --- a/src/control_backend/agents/bdi/bdi_program_manager.py +++ b/src/control_backend/agents/bdi/bdi_program_manager.py @@ -23,6 +23,7 @@ from control_backend.schemas.program import ( ProgramElement, SemanticBelief, SpeechAction, + Trigger, ) test_program = Program( @@ -47,22 +48,30 @@ test_program = Program( ), ], triggers=[ - # Trigger( - # condition=InferredBelief( - # left=KeywordBelief(keyword="key"), - # right=InferredBelief( - # left=KeywordBelief(keyword="key2"), - # right=SemanticBelief( - # description="Decode this", name="semantic belief 2" - # ), - # operator=LogicalOperator.OR, - # name="test trigger inferred inner", - # ), - # operator=LogicalOperator.OR, - # name="test trigger inferred outer", - # ), - # plan=Plan(steps=[]), - # ) + Trigger( + condition=InferredBelief( + left=KeywordBelief(keyword="key"), + right=InferredBelief( + left=KeywordBelief(keyword="key2"), + right=SemanticBelief( + description="Decode this", name="semantic belief 2" + ), + operator=LogicalOperator.OR, + name="test trigger inferred inner", + ), + operator=LogicalOperator.OR, + name="test trigger inferred outer", + ), + plan=Plan( + steps=[ + SpeechAction(text="Testing trigger"), + Goal( + name="Testing trigger", + plan=Plan(steps=[LLMAction(goal="Do something")]), + ), + ] + ), + ) ], goals=[ Goal( @@ -93,6 +102,10 @@ test_program = Program( ) +def do_things(): + print(AgentSpeakGenerator().generate(test_program)) + + class AgentSpeakGenerator: """ Converts Pydantic representation of behavior programs into AgentSpeak(L) code string. @@ -186,13 +199,13 @@ class AgentSpeakGenerator: for phase in program.phases: previous_goal: Goal | None = None for goal in phase.goals: - lines += self._generate_plan_recursive(goal, phase, previous_goal) + lines += self._generate_goal_plan_recursive(goal, phase, previous_goal) previous_goal = goal lines += ["", ""] return lines - def _generate_plan_recursive( + def _generate_goal_plan_recursive( self, goal: Goal, phase: Phase, previous_goal: Goal | None = None ) -> list[str]: lines = [] @@ -210,6 +223,11 @@ class AgentSpeakGenerator: extra_goals_to_generate = [] steps = goal.plan.steps + + if len(steps) == 0: + lines.append(f"{' ' * 2}<-{' ' * 2}true.") + return lines + first_step = steps[0] lines.append( f"{' ' * 2}<-{' ' * 2}{self._slugify(first_step, include_prefix=True)}" @@ -239,7 +257,7 @@ class AgentSpeakGenerator: extra_previous_goal: Goal | None = None for extra_goal in extra_goals_to_generate: - lines += self._generate_plan_recursive(extra_goal, phase, extra_previous_goal) + lines += self._generate_goal_plan_recursive(extra_goal, phase, extra_previous_goal) extra_previous_goal = extra_goal return lines @@ -248,9 +266,57 @@ class AgentSpeakGenerator: lines = [] lines.append("// --- Triggers ---") + for phase in program.phases: + for trigger in phase.triggers: + lines += self._generate_trigger_plan(trigger, phase) + lines += ["", ""] return lines + def _generate_trigger_plan(self, trigger: Trigger, phase: Phase) -> list[str]: + lines = [] + + belief_name = self._slugify(trigger.condition) + + lines.append(f"+{belief_name}") + lines.append(f"{' ' * 2}:{' ' * 3}phase({phase.id})") + + extra_goals_to_generate = [] + + steps = trigger.plan.steps + + if len(steps) == 0: + lines.append(f"{' ' * 2}<-{' ' * 2}true.") + return lines + + first_step = steps[0] + lines.append( + f"{' ' * 2}<-{' ' * 2}{self._slugify(first_step, include_prefix=True)}" + f"{'.' if len(steps) == 1 else ';'}" + ) + if isinstance(first_step, Goal): + extra_goals_to_generate.append(first_step) + + for step in steps[1:-1]: + lines.append(f"{' ' * 6}{self._slugify(step, include_prefix=True)};") + if isinstance(step, Goal): + extra_goals_to_generate.append(step) + + if len(steps) > 1: + last_step = steps[-1] + lines.append(f"{' ' * 6}{self._slugify(last_step, include_prefix=True)}.") + if isinstance(last_step, Goal): + extra_goals_to_generate.append(last_step) + + lines.append("") + + extra_previous_goal: Goal | None = None + for extra_goal in extra_goals_to_generate: + lines += self._generate_goal_plan_recursive(extra_goal, phase, extra_previous_goal) + extra_previous_goal = extra_goal + + return lines + def _slugify(self, element: ProgramElement, include_prefix: bool = False) -> str: def base_slugify_call(text: str): return slugify(text, separator="_", stopwords=["a", "the"])