From 9eea4ee3454881e5a846b9eb775647d46513cab9 Mon Sep 17 00:00:00 2001 From: Kasper Date: Fri, 2 Jan 2026 12:08:20 +0100 Subject: [PATCH] feat: new ASL generation ref: N25B-376 --- src/control_backend/agents/bdi/astv2.py | 3 +- src/control_backend/agents/bdi/gen.py | 0 src/control_backend/agents/bdi/genv2.py | 354 ++++++++++++++++++++++++ 3 files changed, 356 insertions(+), 1 deletion(-) delete mode 100644 src/control_backend/agents/bdi/gen.py create mode 100644 src/control_backend/agents/bdi/genv2.py diff --git a/src/control_backend/agents/bdi/astv2.py b/src/control_backend/agents/bdi/astv2.py index f88fb6a..188b4f3 100644 --- a/src/control_backend/agents/bdi/astv2.py +++ b/src/control_backend/agents/bdi/astv2.py @@ -187,9 +187,10 @@ class StatementType(StrEnum): EMPTY = "" DO_ACTION = "." ACHIEVE_GOAL = "!" - # TEST_GOAL = "?" # TODO + TEST_GOAL = "?" ADD_BELIEF = "+" REMOVE_BELIEF = "-" + REPLACE_BELIEF = "-+" @dataclass diff --git a/src/control_backend/agents/bdi/gen.py b/src/control_backend/agents/bdi/gen.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/control_backend/agents/bdi/genv2.py b/src/control_backend/agents/bdi/genv2.py new file mode 100644 index 0000000..61980e4 --- /dev/null +++ b/src/control_backend/agents/bdi/genv2.py @@ -0,0 +1,354 @@ +import asyncio +import time +from functools import singledispatchmethod + +from slugify import slugify + +from control_backend.agents.bdi import BDICoreAgent +from control_backend.agents.bdi.astv2 import ( + AstBinaryOp, + AstExpression, + AstLiteral, + AstPlan, + AstProgram, + AstRule, + AstStatement, + AstString, + AstVar, + BinaryOperatorType, + StatementType, + TriggerType, +) +from control_backend.agents.bdi.bdi_program_manager import test_program +from control_backend.schemas.program import ( + BasicNorm, + ConditionalNorm, + GestureAction, + Goal, + InferredBelief, + KeywordBelief, + LLMAction, + LogicalOperator, + Norm, + Phase, + PlanElement, + Program, + ProgramElement, + SemanticBelief, + SpeechAction, + Trigger, +) + + +def do_things(): + program = AgentSpeakGenerator().generate(test_program) + print(program) + + +async def do_other_things(): + res = input("Wanna generate") + if res == "y": + program = AgentSpeakGenerator().generate(test_program) + filename = f"{int(time.time())}.asl" + with open(filename, "w") as f: + f.write(program) + else: + filename = "temp.asl" + bdi_agent = BDICoreAgent("BDICoreAgent", filename) + flag = asyncio.Event() + await bdi_agent.start() + await flag.wait() + + +class AgentSpeakGenerator: + _asp: AstProgram + + def generate(self, program: Program) -> str: + self._asp = AstProgram() + + self._asp.rules.append(AstRule(AstLiteral("phase", [AstString("start")]))) + self._add_keyword_inference() + self._add_response_goal() + + self._process_phases(program.phases) + + self._add_fallbacks() + + return str(self._asp) + + def _add_keyword_inference(self) -> None: + keyword = AstVar("Keyword") + message = AstVar("Message") + position = AstVar("Pos") + + self._asp.rules.append( + AstRule( + AstLiteral("keyword_said", [keyword]), + AstLiteral("user_said", [message]) + & AstLiteral(".substring", [keyword, message, position]) + & (position >= 0), + ) + ) + + def _add_response_goal(self): + self._asp.plans.append( + AstPlan( + TriggerType.ADDED_GOAL, + AstLiteral("generate_response_with_goal", [AstVar("Goal")]), + [AstLiteral("user_said", [AstVar("Message")])], + [ + AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")), + AstStatement( + StatementType.DO_ACTION, + AstLiteral( + "findall", + [AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")], + ), + ), + AstStatement( + StatementType.DO_ACTION, + AstLiteral( + "reply_with_goal", [AstVar("Message"), AstVar("Norms"), AstVar("Goal")] + ), + ), + ], + ) + ) + + def _process_phases(self, phases: list[Phase]) -> None: + for curr_phase, next_phase in zip([None] + phases, phases + [None], strict=True): + if curr_phase: + self._process_phase(curr_phase) + self._add_phase_transition(curr_phase, next_phase) + + def _process_phase(self, phase: Phase) -> None: + for norm in phase.norms: + self._process_norm(norm, phase) + + self._add_default_loop(phase) + + previous_goal = None + for goal in phase.goals: + self._process_goal(goal, phase, previous_goal) + previous_goal = goal + + for trigger in phase.triggers: + self._process_trigger(trigger, phase) + + def _add_phase_transition(self, from_phase: Phase | None, to_phase: Phase | None) -> None: + from_phase_ast = ( + self._astify(from_phase) if from_phase else AstLiteral("phase", [AstString("start")]) + ) + to_phase_ast = ( + self._astify(to_phase) if to_phase else AstLiteral("phase", [AstString("end")]) + ) + + context = [from_phase_ast] + if from_phase and from_phase.goals: + context.append(self._astify(from_phase.goals[-1], achieved=True)) + + body = [ + AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast), + AstStatement(StatementType.ADD_BELIEF, to_phase_ast), + ] + + if from_phase: + body.extend( + [ + AstStatement( + StatementType.TEST_GOAL, AstLiteral("user_said", [AstVar("Message")]) + ), + AstStatement( + StatementType.REPLACE_BELIEF, AstLiteral("user_said", [AstVar("Message")]) + ), + ] + ) + + self._asp.plans.append( + AstPlan(TriggerType.ADDED_GOAL, AstLiteral("transition_phase"), context, body) + ) + + def _process_norm(self, norm: Norm, phase: Phase) -> None: + rule: AstRule | None = None + + match norm: + case ConditionalNorm(condition=cond): + rule = AstRule(self._astify(norm), self._astify(phase) & self._astify(cond)) + case BasicNorm(): + rule = AstRule(self._astify(norm), self._astify(phase)) + + if not rule: + return + + self._asp.rules.append(rule) + + def _add_default_loop(self, phase: Phase) -> None: + actions = [] + + actions.append(AstStatement(StatementType.REMOVE_BELIEF, AstLiteral("responded_this_turn"))) + actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("check_triggers"))) + + for goal in phase.goals: + actions.append(AstStatement(StatementType.ACHIEVE_GOAL, self._astify(goal))) + + actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("transition_phase"))) + + self._asp.plans.append( + AstPlan( + TriggerType.ADDED_BELIEF, + AstLiteral("user_said", [AstVar("Message")]), + [self._astify(phase)], + actions, + ) + ) + + def _process_goal( + self, + goal: Goal, + phase: Phase, + previous_goal: Goal | None = None, + continues_response: bool = False, + ) -> None: + context: list[AstExpression] = [self._astify(phase)] + context.append(~self._astify(goal, achieved=True)) + if previous_goal and previous_goal.can_fail: + context.append(self._astify(previous_goal, achieved=True)) + if not continues_response: + context.append(~AstLiteral("responded_this_turn")) + + body = [] + + subgoals = [] + for step in goal.plan.steps: + body.append(self._step_to_statement(step)) + if isinstance(step, Goal): + subgoals.append(step) + + if not goal.can_fail and not continues_response: + body.append(AstStatement(StatementType.ADD_BELIEF, self._astify(goal, achieved=True))) + + self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(goal), context, body)) + + self._asp.plans.append( + AstPlan( + TriggerType.ADDED_GOAL, + self._astify(goal), + context=[], + body=[AstStatement(StatementType.EMPTY, AstLiteral("true"))], + ) + ) + + prev_goal = None + for subgoal in subgoals: + self._process_goal(subgoal, phase, prev_goal) + prev_goal = subgoal + + def _step_to_statement(self, step: PlanElement) -> AstStatement: + match step: + case Goal() as g: + return AstStatement(StatementType.ACHIEVE_GOAL, self._astify(g)) + case SpeechAction() | GestureAction() as a: + return AstStatement(StatementType.DO_ACTION, self._astify(a)) + case LLMAction() as la: + return AstStatement( + StatementType.ACHIEVE_GOAL, self._astify(la) + ) # LLM action is a goal in ASL + + # TODO: separate handling of keyword and others + def _process_trigger(self, trigger: Trigger, phase: Phase) -> None: + body = [] + subgoals = [] + + for step in trigger.plan.steps: + body.append(self._step_to_statement(step)) + if isinstance(step, Goal): + step.can_fail = False # triggers are continuous sequence + subgoals.append(step) + + self._asp.plans.append( + AstPlan( + TriggerType.ADDED_GOAL, + AstLiteral("check_triggers"), + [self._astify(phase), self._astify(trigger.condition)], + body, + ) + ) + + for subgoal in subgoals: + self._process_goal(subgoal, phase, continues_response=True) + + def _add_fallbacks(self): + # Trigger fallback + self._asp.plans.append( + AstPlan( + TriggerType.ADDED_GOAL, + AstLiteral("check_triggers"), + [], + [AstStatement(StatementType.EMPTY, AstLiteral("true"))], + ) + ) + + # Phase transition fallback + self._asp.plans.append( + AstPlan( + TriggerType.ADDED_GOAL, + AstLiteral("transition_phase"), + [], + [AstStatement(StatementType.EMPTY, AstLiteral("true"))], + ) + ) + + @singledispatchmethod + def _astify(self, element: ProgramElement) -> AstExpression: + raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.") + + @_astify.register + def _(self, kwb: KeywordBelief) -> AstExpression: + return AstLiteral("keyword_said", [AstString(kwb.keyword)]) + + @_astify.register + def _(self, sb: SemanticBelief) -> AstExpression: + return AstLiteral(f"semantic_{self._slugify_str(sb.description)}") + + @_astify.register + def _(self, ib: InferredBelief) -> AstExpression: + return AstBinaryOp( + self._astify(ib.left), + BinaryOperatorType.AND if ib.operator == LogicalOperator.AND else BinaryOperatorType.OR, + self._astify(ib.right), + ) + + @_astify.register + def _(self, norm: Norm) -> AstExpression: + functor = "critical_norm" if norm.critical else "norm" + return AstLiteral(functor, [AstString(norm.norm)]) + + @_astify.register + def _(self, phase: Phase) -> AstExpression: + return AstLiteral("phase", [AstString(str(phase.id))]) + + @_astify.register + def _(self, goal: Goal, achieved: bool = False) -> AstExpression: + return AstLiteral(f"{'achieved_' if achieved else ''}{self._slugify_str(goal.name)}") + + @_astify.register + def _(self, sa: SpeechAction) -> AstExpression: + return AstLiteral("say", [AstString(sa.text)]) + + @_astify.register + def _(self, ga: GestureAction) -> AstExpression: + gesture = ga.gesture + return AstLiteral("gesture", [AstString(gesture.type), AstString(gesture.name)]) + + @_astify.register + def _(self, la: LLMAction) -> AstExpression: + return AstLiteral("generate_response_with_goal", [AstString(la.goal)]) + + @staticmethod + def _slugify_str(text: str) -> str: + return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"]) + + +if __name__ == "__main__": + # do_things() + asyncio.run(do_other_things())