from functools import singledispatchmethod from slugify import slugify from control_backend.agents.bdi.agentspeak_ast import ( AstAtom, AstBinaryOp, AstExpression, AstLiteral, AstNumber, AstPlan, AstProgram, AstRule, AstStatement, AstString, AstVar, BinaryOperatorType, StatementType, TriggerType, ) from control_backend.schemas.program import ( BaseGoal, BasicNorm, ConditionalNorm, GestureAction, Goal, InferredBelief, KeywordBelief, LLMAction, LogicalOperator, Norm, Phase, PlanElement, Program, ProgramElement, SemanticBelief, SpeechAction, Trigger, ) class AgentSpeakGenerator: _asp: AstProgram def generate(self, program: Program) -> str: self._asp = AstProgram() if program.phases: self._asp.rules.append(AstRule(self._astify(program.phases[0]))) else: self._asp.rules.append(AstRule(AstLiteral("phase", [AstString("end")]))) self._add_keyword_inference() self._add_default_plans() self._process_phases(program.phases) self._add_fallbacks() return str(self._asp) def _add_keyword_inference(self) -> None: keyword = AstVar("Keyword") message = AstVar("Message") position = AstVar("Pos") self._asp.rules.append( AstRule( AstLiteral("keyword_said", [keyword]), AstLiteral("user_said", [message]) & AstLiteral(".substring", [keyword, message, position]) & (position >= 0), ) ) def _add_default_plans(self): self._add_reply_with_goal_plan() self._add_say_plan() self._add_reply_plan() self._add_notify_cycle_plan() def _add_reply_with_goal_plan(self): self._asp.plans.append( AstPlan( TriggerType.ADDED_GOAL, AstLiteral("reply_with_goal", [AstVar("Goal")]), [AstLiteral("user_said", [AstVar("Message")])], [ AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")), AstStatement( StatementType.DO_ACTION, AstLiteral( "findall", [AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")], ), ), AstStatement( StatementType.DO_ACTION, AstLiteral( "reply_with_goal", [AstVar("Message"), AstVar("Norms"), AstVar("Goal")] ), ), ], ) ) def _add_say_plan(self): self._asp.plans.append( AstPlan( TriggerType.ADDED_GOAL, AstLiteral("say", [AstVar("Text")]), [], [ AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")), AstStatement(StatementType.DO_ACTION, AstLiteral("say", [AstVar("Text")])), ], ) ) def _add_reply_plan(self): self._asp.plans.append( AstPlan( TriggerType.ADDED_GOAL, AstLiteral("reply"), [AstLiteral("user_said", [AstVar("Message")])], [ AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")), AstStatement( StatementType.DO_ACTION, AstLiteral( "findall", [AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")], ), ), AstStatement( StatementType.DO_ACTION, AstLiteral("reply", [AstVar("Message"), AstVar("Norms")]), ), ], ) ) def _add_notify_cycle_plan(self): self._asp.plans.append( AstPlan( TriggerType.ADDED_GOAL, AstLiteral("notify_cycle"), [], [ AstStatement(StatementType.DO_ACTION, AstLiteral("notify_ui")), AstStatement(StatementType.DO_ACTION, AstLiteral("wait", [AstNumber(1)])), ], ) ) def _process_phases(self, phases: list[Phase]) -> None: for curr_phase, next_phase in zip([None] + phases, phases + [None], strict=True): if curr_phase: self._process_phase(curr_phase) self._add_phase_transition(curr_phase, next_phase) # End phase behavior # When deleting this, the entire `reply` plan and action can be deleted self._asp.plans.append( AstPlan( type=TriggerType.ADDED_BELIEF, trigger_literal=AstLiteral("user_said", [AstVar("Message")]), context=[AstLiteral("phase", [AstString("end")])], body=[ AstStatement( StatementType.DO_ACTION, AstLiteral("notify_user_said", [AstVar("Message")]) ), AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("reply")), ], ) ) def _process_phase(self, phase: Phase) -> None: for norm in phase.norms: self._process_norm(norm, phase) self._add_default_loop(phase) previous_goal = None for goal in phase.goals: self._process_goal(goal, phase, previous_goal, main_goal=True) previous_goal = goal for trigger in phase.triggers: self._process_trigger(trigger, phase) def _add_phase_transition(self, from_phase: Phase | None, to_phase: Phase | None) -> None: if from_phase is None: return from_phase_ast = self._astify(from_phase) to_phase_ast = ( self._astify(to_phase) if to_phase else AstLiteral("phase", [AstString("end")]) ) check_context = [from_phase_ast] if from_phase: for goal in from_phase.goals: check_context.append(self._astify(goal, achieved=True)) force_context = [from_phase_ast] body = [ AstStatement( StatementType.DO_ACTION, AstLiteral( "notify_transition_phase", [ AstString(str(from_phase.id)), AstString(str(to_phase.id) if to_phase else "end"), ], ), ), AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast), AstStatement(StatementType.ADD_BELIEF, to_phase_ast), ] # if from_phase: # body.extend( # [ # AstStatement( # StatementType.TEST_GOAL, AstLiteral("user_said", [AstVar("Message")]) # ), # AstStatement( # StatementType.REPLACE_BELIEF, AstLiteral("user_said", [AstVar("Message")]) # ), # ] # ) # Check self._asp.plans.append( AstPlan( TriggerType.ADDED_GOAL, AstLiteral("transition_phase"), check_context, [ AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("force_transition_phase")), ], ) ) # Force self._asp.plans.append( AstPlan( TriggerType.ADDED_GOAL, AstLiteral("force_transition_phase"), force_context, body ) ) def _process_norm(self, norm: Norm, phase: Phase) -> None: rule: AstRule | None = None match norm: case ConditionalNorm(condition=cond): rule = AstRule( self._astify(norm), self._astify(phase) & self._astify(cond) | AstAtom(f"force_{self.slugify(norm)}"), ) case BasicNorm(): rule = AstRule(self._astify(norm), self._astify(phase)) if not rule: return self._asp.rules.append(rule) def _add_default_loop(self, phase: Phase) -> None: actions = [] actions.append( AstStatement( StatementType.DO_ACTION, AstLiteral("notify_user_said", [AstVar("Message")]) ) ) actions.append(AstStatement(StatementType.REMOVE_BELIEF, AstLiteral("responded_this_turn"))) actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("check_triggers"))) for goal in phase.goals: actions.append(AstStatement(StatementType.ACHIEVE_GOAL, self._astify(goal))) actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("transition_phase"))) self._asp.plans.append( AstPlan( TriggerType.ADDED_BELIEF, AstLiteral("user_said", [AstVar("Message")]), [self._astify(phase)], actions, ) ) def _process_goal( self, goal: Goal, phase: Phase, previous_goal: Goal | None = None, continues_response: bool = False, main_goal: bool = False, ) -> None: context: list[AstExpression] = [self._astify(phase)] context.append(~self._astify(goal, achieved=True)) if previous_goal and previous_goal.can_fail: context.append(self._astify(previous_goal, achieved=True)) if not continues_response: context.append(~AstLiteral("responded_this_turn")) body = [] if main_goal: # UI only needs to know about the main goals body.append( AstStatement( StatementType.DO_ACTION, AstLiteral("notify_goal_start", [AstString(self.slugify(goal))]), ) ) subgoals = [] for step in goal.plan.steps: body.append(self._step_to_statement(step)) if isinstance(step, Goal): subgoals.append(step) if not goal.can_fail and not continues_response: body.append(AstStatement(StatementType.ADD_BELIEF, self._astify(goal, achieved=True))) self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(goal), context, body)) self._asp.plans.append( AstPlan( TriggerType.ADDED_GOAL, self._astify(goal), context=[], body=[AstStatement(StatementType.EMPTY, AstLiteral("true"))], ) ) prev_goal = None for subgoal in subgoals: self._process_goal(subgoal, phase, prev_goal) prev_goal = subgoal def _step_to_statement(self, step: PlanElement) -> AstStatement: match step: case Goal() | SpeechAction() | LLMAction() as a: return AstStatement(StatementType.ACHIEVE_GOAL, self._astify(a)) case GestureAction() as a: return AstStatement(StatementType.DO_ACTION, self._astify(a)) # TODO: separate handling of keyword and others def _process_trigger(self, trigger: Trigger, phase: Phase) -> None: body = [] subgoals = [] body.append( AstStatement( StatementType.DO_ACTION, AstLiteral("notify_trigger_start", [AstString(self.slugify(trigger))]), ) ) for step in trigger.plan.steps: body.append(self._step_to_statement(step)) if isinstance(step, Goal): step.can_fail = False # triggers are continuous sequence subgoals.append(step) body.append( AstStatement( StatementType.DO_ACTION, AstLiteral("notify_trigger_end", [AstString(self.slugify(trigger))]), ) ) self._asp.plans.append( AstPlan( TriggerType.ADDED_GOAL, AstLiteral("check_triggers"), [self._astify(phase), self._astify(trigger.condition)], body, ) ) # Force trigger (from UI) self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(trigger), [], body)) for subgoal in subgoals: self._process_goal(subgoal, phase, continues_response=True) def _add_fallbacks(self): # Trigger fallback self._asp.plans.append( AstPlan( TriggerType.ADDED_GOAL, AstLiteral("check_triggers"), [], [AstStatement(StatementType.EMPTY, AstLiteral("true"))], ) ) # Phase transition fallback self._asp.plans.append( AstPlan( TriggerType.ADDED_GOAL, AstLiteral("transition_phase"), [], [AstStatement(StatementType.EMPTY, AstLiteral("true"))], ) ) @singledispatchmethod def _astify(self, element: ProgramElement) -> AstExpression: raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.") @_astify.register def _(self, kwb: KeywordBelief) -> AstExpression: return AstLiteral("keyword_said", [AstString(kwb.keyword)]) @_astify.register def _(self, sb: SemanticBelief) -> AstExpression: return AstLiteral(self.slugify(sb)) @_astify.register def _(self, ib: InferredBelief) -> AstExpression: return AstBinaryOp( self._astify(ib.left), BinaryOperatorType.AND if ib.operator == LogicalOperator.AND else BinaryOperatorType.OR, self._astify(ib.right), ) @_astify.register def _(self, norm: Norm) -> AstExpression: functor = "critical_norm" if norm.critical else "norm" return AstLiteral(functor, [AstString(norm.norm)]) @_astify.register def _(self, phase: Phase) -> AstExpression: return AstLiteral("phase", [AstString(str(phase.id))]) @_astify.register def _(self, goal: Goal, achieved: bool = False) -> AstExpression: return AstLiteral(f"{'achieved_' if achieved else ''}{self._slugify_str(goal.name)}") @_astify.register def _(self, trigger: Trigger) -> AstExpression: return AstLiteral(self.slugify(trigger)) @_astify.register def _(self, sa: SpeechAction) -> AstExpression: return AstLiteral("say", [AstString(sa.text)]) @_astify.register def _(self, ga: GestureAction) -> AstExpression: gesture = ga.gesture return AstLiteral("gesture", [AstString(gesture.type), AstString(gesture.name)]) @_astify.register def _(self, la: LLMAction) -> AstExpression: return AstLiteral("reply_with_goal", [AstString(la.goal)]) @singledispatchmethod @staticmethod def slugify(element: ProgramElement) -> str: raise NotImplementedError(f"Cannot convert element {element} to a slug.") @slugify.register @staticmethod def _(n: Norm) -> str: return f"norm_{AgentSpeakGenerator._slugify_str(n.norm)}" @slugify.register @staticmethod def _(sb: SemanticBelief) -> str: return f"semantic_{AgentSpeakGenerator._slugify_str(sb.name)}" @slugify.register @staticmethod def _(g: BaseGoal) -> str: return AgentSpeakGenerator._slugify_str(g.name) @slugify.register @staticmethod def _(t: Trigger): return f"trigger_{AgentSpeakGenerator._slugify_str(t.name)}" @staticmethod def _slugify_str(text: str) -> str: return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])