feat: add trigger generation

ref: N25B-376
This commit is contained in:
2025-12-16 12:10:52 +01:00
parent d043c54336
commit bab4800698

View File

@@ -23,6 +23,7 @@ from control_backend.schemas.program import (
ProgramElement,
SemanticBelief,
SpeechAction,
Trigger,
)
test_program = Program(
@@ -47,22 +48,30 @@ test_program = Program(
),
],
triggers=[
# Trigger(
# condition=InferredBelief(
# left=KeywordBelief(keyword="key"),
# right=InferredBelief(
# left=KeywordBelief(keyword="key2"),
# right=SemanticBelief(
# description="Decode this", name="semantic belief 2"
# ),
# operator=LogicalOperator.OR,
# name="test trigger inferred inner",
# ),
# operator=LogicalOperator.OR,
# name="test trigger inferred outer",
# ),
# plan=Plan(steps=[]),
# )
Trigger(
condition=InferredBelief(
left=KeywordBelief(keyword="key"),
right=InferredBelief(
left=KeywordBelief(keyword="key2"),
right=SemanticBelief(
description="Decode this", name="semantic belief 2"
),
operator=LogicalOperator.OR,
name="test trigger inferred inner",
),
operator=LogicalOperator.OR,
name="test trigger inferred outer",
),
plan=Plan(
steps=[
SpeechAction(text="Testing trigger"),
Goal(
name="Testing trigger",
plan=Plan(steps=[LLMAction(goal="Do something")]),
),
]
),
)
],
goals=[
Goal(
@@ -93,6 +102,10 @@ test_program = Program(
)
def do_things():
print(AgentSpeakGenerator().generate(test_program))
class AgentSpeakGenerator:
"""
Converts Pydantic representation of behavior programs into AgentSpeak(L) code string.
@@ -186,13 +199,13 @@ class AgentSpeakGenerator:
for phase in program.phases:
previous_goal: Goal | None = None
for goal in phase.goals:
lines += self._generate_plan_recursive(goal, phase, previous_goal)
lines += self._generate_goal_plan_recursive(goal, phase, previous_goal)
previous_goal = goal
lines += ["", ""]
return lines
def _generate_plan_recursive(
def _generate_goal_plan_recursive(
self, goal: Goal, phase: Phase, previous_goal: Goal | None = None
) -> list[str]:
lines = []
@@ -210,6 +223,11 @@ class AgentSpeakGenerator:
extra_goals_to_generate = []
steps = goal.plan.steps
if len(steps) == 0:
lines.append(f"{' ' * 2}<-{' ' * 2}true.")
return lines
first_step = steps[0]
lines.append(
f"{' ' * 2}<-{' ' * 2}{self._slugify(first_step, include_prefix=True)}"
@@ -239,7 +257,7 @@ class AgentSpeakGenerator:
extra_previous_goal: Goal | None = None
for extra_goal in extra_goals_to_generate:
lines += self._generate_plan_recursive(extra_goal, phase, extra_previous_goal)
lines += self._generate_goal_plan_recursive(extra_goal, phase, extra_previous_goal)
extra_previous_goal = extra_goal
return lines
@@ -248,9 +266,57 @@ class AgentSpeakGenerator:
lines = []
lines.append("// --- Triggers ---")
for phase in program.phases:
for trigger in phase.triggers:
lines += self._generate_trigger_plan(trigger, phase)
lines += ["", ""]
return lines
def _generate_trigger_plan(self, trigger: Trigger, phase: Phase) -> list[str]:
lines = []
belief_name = self._slugify(trigger.condition)
lines.append(f"+{belief_name}")
lines.append(f"{' ' * 2}:{' ' * 3}phase({phase.id})")
extra_goals_to_generate = []
steps = trigger.plan.steps
if len(steps) == 0:
lines.append(f"{' ' * 2}<-{' ' * 2}true.")
return lines
first_step = steps[0]
lines.append(
f"{' ' * 2}<-{' ' * 2}{self._slugify(first_step, include_prefix=True)}"
f"{'.' if len(steps) == 1 else ';'}"
)
if isinstance(first_step, Goal):
extra_goals_to_generate.append(first_step)
for step in steps[1:-1]:
lines.append(f"{' ' * 6}{self._slugify(step, include_prefix=True)};")
if isinstance(step, Goal):
extra_goals_to_generate.append(step)
if len(steps) > 1:
last_step = steps[-1]
lines.append(f"{' ' * 6}{self._slugify(last_step, include_prefix=True)}.")
if isinstance(last_step, Goal):
extra_goals_to_generate.append(last_step)
lines.append("")
extra_previous_goal: Goal | None = None
for extra_goal in extra_goals_to_generate:
lines += self._generate_goal_plan_recursive(extra_goal, phase, extra_previous_goal)
extra_previous_goal = extra_goal
return lines
def _slugify(self, element: ProgramElement, include_prefix: bool = False) -> str:
def base_slugify_call(text: str):
return slugify(text, separator="_", stopwords=["a", "the"])