diff --git a/src/control_backend/agents/bdi/asl_ast.py b/src/control_backend/agents/bdi/asl_ast.py new file mode 100644 index 0000000..6543b63 --- /dev/null +++ b/src/control_backend/agents/bdi/asl_ast.py @@ -0,0 +1,172 @@ +import typing +from dataclasses import dataclass, field + +# --- Types --- + + +@dataclass +class BeliefLiteral: + """ + Represents a literal or atom. + Example: phase(1), user_said("hello"), ~started + """ + + functor: str + args: list[str] = field(default_factory=list) + negated: bool = False + + def __str__(self): + # In ASL, 'not' is usually for closed-world assumption (prolog style), + # '~' is for explicit negation in beliefs. + # For simplicity in behavior trees, we often use 'not' for conditions. + prefix = "not " if self.negated else "" + if not self.args: + return f"{prefix}{self.functor}" + + # Clean args to ensure strings are quoted if they look like strings, + # but usually the converter handles the quoting of string literals. + args_str = ", ".join(self.args) + return f"{prefix}{self.functor}({args_str})" + + +@dataclass +class GoalLiteral: + name: str + + def __str__(self): + return f"!{self.name}" + + +@dataclass +class ActionLiteral: + """ + Represents a step in a plan body. + Example: .say("Hello") or !achieve_goal + """ + + code: str + + def __str__(self): + return self.code + + +@dataclass +class BinaryOp: + """ + Represents logical operations. + Example: (A & B) | C + """ + + left: "Expression | str" + operator: typing.Literal["&", "|"] + right: "Expression | str" + + def __str__(self): + l_str = str(self.left) + r_str = str(self.right) + + if isinstance(self.left, BinaryOp): + l_str = f"({l_str})" + if isinstance(self.right, BinaryOp): + r_str = f"({r_str})" + + return f"{l_str} {self.operator} {r_str}" + + +Literal = BeliefLiteral | GoalLiteral | ActionLiteral +Expression = Literal | BinaryOp | str + + +@dataclass +class Rule: + """ + Represents an inference rule. + Example: head :- body. + """ + + head: Expression + body: Expression | None = None + + def __str__(self): + if not self.body: + return f"{self.head}." + return f"{self.head} :- {self.body}." + + +@dataclass +class Plan: + """ + Represents a plan. + Syntax: +trigger : context <- body. + """ + + trigger: BeliefLiteral | GoalLiteral + context: list[Expression] = field(default_factory=list) + body: list[ActionLiteral] = field(default_factory=list) + + def __str__(self): + # Indentation settings + INDENT = " " + ARROW = "\n <- " + COLON = "\n : " + + # Build Header + header = f"+{self.trigger}" + if self.context: + ctx_str = f" &\n{INDENT}".join(str(c) for c in self.context) + header += f"{COLON}{ctx_str}" + + # Case 1: Empty body + if not self.body: + return f"{header}." + + # Case 2: Short body (optional optimization, keeping it uniform usually better) + header += ARROW + + lines = [] + # We start the first action on the same line or next line. + # Let's put it on the next line for readability if there are multiple. + + if len(self.body) == 1: + return f"{header}{self.body[0]}." + + # First item + lines.append(f"{header}{self.body[0]};") + # Middle items + for item in self.body[1:-1]: + lines.append(f"{INDENT}{item};") + # Last item + lines.append(f"{INDENT}{self.body[-1]}.") + + return "\n".join(lines) + + +@dataclass +class AgentSpeakFile: + """ + Root element representing the entire generated file. + """ + + initial_beliefs: list[Rule] = field(default_factory=list) + inference_rules: list[Rule] = field(default_factory=list) + plans: list[Plan] = field(default_factory=list) + + def __str__(self): + sections = [] + + if self.initial_beliefs: + sections.append("// --- Initial Beliefs & Facts ---") + sections.extend(str(rule) for rule in self.initial_beliefs) + sections.append("") + + if self.inference_rules: + sections.append("// --- Inference Rules ---") + sections.extend(str(rule) for rule in self.inference_rules) + sections.append("") + + if self.plans: + sections.append("// --- Plans ---") + # Separate plans by a newline for readability + sections.extend(str(plan) + "\n" for plan in self.plans) + + return "\n".join(sections) diff --git a/src/control_backend/agents/bdi/asl_gen.py b/src/control_backend/agents/bdi/asl_gen.py new file mode 100644 index 0000000..f78108a --- /dev/null +++ b/src/control_backend/agents/bdi/asl_gen.py @@ -0,0 +1,295 @@ +from functools import singledispatchmethod + +from slugify import slugify + +# Import the AST we defined above +from control_backend.agents.bdi.asl_ast import ( + ActionLiteral, + AgentSpeakFile, + BeliefLiteral, + BinaryOp, + Expression, + GoalLiteral, + Plan, + Rule, +) +from control_backend.agents.bdi.bdi_program_manager import test_program + +# Import your Pydantic models (adjust import based on your file structure) +from control_backend.schemas.program import ( + Belief, + ConditionalNorm, + GestureAction, + Goal, + InferredBelief, + KeywordBelief, + LLMAction, + LogicalOperator, + Phase, + Program, + ProgramElement, + SemanticBelief, + SpeechAction, +) + + +def do_things(): + print(AgentSpeakGenerator().generate(test_program)) + + +class AgentSpeakGenerator: + """ + Converts a Pydantic Program behavior model into an AgentSpeak(L) AST, + then renders it to a string. + """ + + def generate(self, program: Program) -> str: + asl = AgentSpeakFile() + + self._generate_startup(program, asl) + + for i, phase in enumerate(program.phases): + next_phase = program.phases[i + 1] if i < len(program.phases) - 1 else None + + self._generate_phase_flow(phase, next_phase, asl) + + self._generate_norms(phase, asl) + + self._generate_goals(phase, asl) + + self._generate_triggers(phase, asl) + + return str(asl) + + # --- Section: Startup & Phase Management --- + + def _generate_startup(self, program: Program, asl: AgentSpeakFile): + if not program.phases: + return + + # Initial belief: phase(start). + asl.initial_beliefs.append(Rule(head=BeliefLiteral("phase", ["start"]))) + + # Startup plan: +started : phase(start) <- -+phase(first_id). + asl.plans.append( + Plan( + trigger=BeliefLiteral("started"), + context=[BeliefLiteral("phase", ["start"])], + body=[ActionLiteral("!transition_phase")], + ) + ) + + def _generate_phase_flow(self, phase: Phase, next_phase: Phase | None, asl: AgentSpeakFile): + """Generates the main loop listener and the transition logic for this phase.""" + + # +user_said(Message) : phase(ID) <- !goal1; !goal2; !transition_phase. + goal_actions = [ActionLiteral(f"!{self._slugify(g)}") for g in phase.goals] + goal_actions.append(ActionLiteral("!transition_phase")) + + asl.plans.append( + Plan( + trigger=BeliefLiteral("user_said", ["Message"]), + context=[BeliefLiteral("phase", [str(phase.id)])], + body=goal_actions, + ) + ) + + # +!transition_phase : phase(ID) <- -+phase(NEXT_ID). + next_id = next_phase.id if next_phase else "end" + + asl.plans.append( + Plan( + trigger=GoalLiteral("transition_phase"), + context=[BeliefLiteral("phase", [str(phase.id)])], + body=[ActionLiteral(f"-+phase({next_id})")], + ) + ) + + # --- Section: Norms & Beliefs --- + + def _generate_norms(self, phase: Phase, asl: AgentSpeakFile): + for norm in phase.norms: + norm_slug = f'"{norm.norm}"' + head = BeliefLiteral("norm", [norm_slug]) + + # Base context is the phase + phase_lit = BeliefLiteral("phase", [str(phase.id)]) + + if isinstance(norm, ConditionalNorm): + self._ensure_belief_inference(norm.condition, asl) + + condition_expr = self._belief_to_expr(norm.condition) + body = BinaryOp(phase_lit, "&", condition_expr) + else: + body = phase_lit + + asl.inference_rules.append(Rule(head=head, body=body)) + + def _ensure_belief_inference(self, belief: Belief, asl: AgentSpeakFile): + """ + Recursively adds rules to infer beliefs. + Checks strictly to avoid duplicates if necessary, + though ASL engines often handle redefinition or we can use a set to track processed IDs. + """ + if isinstance(belief, KeywordBelief): + # Rule: keyword_said("word") :- user_said(M) & .substring(M, "word", P) & P >= 0. + kwd_slug = f'"{belief.keyword}"' + head = BeliefLiteral("keyword_said", [kwd_slug]) + + # Avoid duplicates + if any(str(r.head) == str(head) for r in asl.inference_rules): + return + + body = BinaryOp( + BeliefLiteral("user_said", ["Message"]), + "&", + BinaryOp(f".substring(Message, {kwd_slug}, Pos)", "&", "Pos >= 0"), + ) + + asl.inference_rules.append(Rule(head=head, body=body)) + + elif isinstance(belief, InferredBelief): + self._ensure_belief_inference(belief.left, asl) + self._ensure_belief_inference(belief.right, asl) + + slug = self._slugify(belief) + head = BeliefLiteral(slug) + + if any(str(r.head) == str(head) for r in asl.inference_rules): + return + + op_char = "&" if belief.operator == LogicalOperator.AND else "|" + body = BinaryOp( + self._belief_to_expr(belief.left), op_char, self._belief_to_expr(belief.right) + ) + asl.inference_rules.append(Rule(head=head, body=body)) + + def _belief_to_expr(self, belief: Belief) -> Expression: + if isinstance(belief, KeywordBelief): + return BeliefLiteral("keyword_said", [f'"{belief.keyword}"']) + else: + return BeliefLiteral(self._slugify(belief)) + + # --- Section: Goals --- + + def _generate_goals(self, phase: Phase, asl: AgentSpeakFile): + previous_goal: Goal | None = None + for goal in phase.goals: + self._generate_goal_plan_recursive(goal, str(phase.id), previous_goal, asl) + previous_goal = goal + + def _generate_goal_plan_recursive( + self, goal: Goal, phase_id: str, previous_goal: Goal | None, asl: AgentSpeakFile + ): + goal_slug = self._slugify(goal) + + # phase(ID) & not responded_this_turn & not achieved_goal + context = [ + BeliefLiteral("phase", [phase_id]), + BeliefLiteral("responded_this_turn", negated=True), + BeliefLiteral(f"achieved_{goal_slug}", negated=True), + ] + + if previous_goal: + prev_slug = self._slugify(previous_goal) + context.append(BeliefLiteral(f"achieved_{prev_slug}")) + + body_actions = [] + sub_goals_to_process = [] + + for step in goal.plan.steps: + if isinstance(step, Goal): + sub_slug = self._slugify(step) + body_actions.append(ActionLiteral(f"!{sub_slug}")) + sub_goals_to_process.append(step) + elif isinstance(step, SpeechAction): + body_actions.append(ActionLiteral(f'.say("{step.text}")')) + elif isinstance(step, GestureAction): + body_actions.append(ActionLiteral(f'.gesture("{step.gesture}")')) + elif isinstance(step, LLMAction): + body_actions.append(ActionLiteral(f'!generate_response_with_goal("{step.goal}")')) + + # Mark achievement + if not goal.can_fail: + body_actions.append(ActionLiteral(f"+achieved_{goal_slug}")) + + asl.plans.append(Plan(trigger=GoalLiteral(goal_slug), context=context, body=body_actions)) + + prev_sub = None + for sub_goal in sub_goals_to_process: + self._generate_goal_plan_recursive(sub_goal, phase_id, prev_sub, asl) + prev_sub = sub_goal + + # --- Section: Triggers --- + + def _generate_triggers(self, phase: Phase, asl: AgentSpeakFile): + for trigger in phase.triggers: + self._ensure_belief_inference(trigger.condition, asl) + + trigger_belief_slug = self._belief_to_expr(trigger.condition) + + body_actions = [] + sub_goals = [] + + for step in trigger.plan.steps: + if isinstance(step, Goal): + sub_slug = self._slugify(step) + body_actions.append(ActionLiteral(f"!{sub_slug}")) + sub_goals.append(step) + elif isinstance(step, SpeechAction): + body_actions.append(ActionLiteral(f'.say("{step.text}")')) + elif isinstance(step, GestureAction): + body_actions.append( + ActionLiteral(f'.gesture("{step.gesture.type}", "{step.gesture.name}")') + ) + elif isinstance(step, LLMAction): + body_actions.append( + ActionLiteral(f'!generate_response_with_goal("{step.goal}")') + ) + + asl.plans.append( + Plan( + trigger=BeliefLiteral(trigger_belief_slug), + context=[BeliefLiteral("phase", [str(phase.id)])], + body=body_actions, + ) + ) + + # Recurse for triggered goals + prev_sub = None + for sub_goal in sub_goals: + self._generate_goal_plan_recursive(sub_goal, str(phase.id), prev_sub, asl) + prev_sub = sub_goal + + # --- Helpers --- + + @singledispatchmethod + def _slugify(self, element: ProgramElement) -> str: + if element.name: + raise NotImplementedError("Cannot slugify this element.") + return self._slugify_str(element.name) + + @_slugify.register + def _(self, goal: Goal) -> str: + if goal.name: + return self._slugify_str(goal.name) + return f"goal_{goal.id}" + + @_slugify.register + def _(self, kwb: KeywordBelief) -> str: + return f"keyword_said({kwb.keyword})" + + @_slugify.register + def _(self, sb: SemanticBelief) -> str: + return self._slugify_str(sb.description) + + @_slugify.register + def _(self, ib: InferredBelief) -> str: + return self._slugify_str(ib.name) + + def _slugify_str(self, text: str) -> str: + return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"]) + + +if __name__ == "__main__": + do_things() diff --git a/src/control_backend/agents/bdi/bdi_program_manager.py b/src/control_backend/agents/bdi/bdi_program_manager.py index 11c0b00..9925cfb 100644 --- a/src/control_backend/agents/bdi/bdi_program_manager.py +++ b/src/control_backend/agents/bdi/bdi_program_manager.py @@ -1,3 +1,4 @@ +import uuid from collections.abc import Iterable import zmq @@ -32,53 +33,72 @@ test_program = Program( phases=[ Phase( norms=[ - BasicNorm(norm="Talk like a pirate"), + BasicNorm(norm="Talk like a pirate", id=uuid.uuid4()), ConditionalNorm( condition=InferredBelief( - left=KeywordBelief(keyword="Arr"), - right=SemanticBelief(description="testing", name="semantic belief"), + left=KeywordBelief(keyword="Arr", id=uuid.uuid4()), + right=SemanticBelief( + description="testing", name="semantic belief", id=uuid.uuid4() + ), operator=LogicalOperator.OR, name="Talking to a pirate", + id=uuid.uuid4(), ), norm="Use nautical terms", + id=uuid.uuid4(), ), ConditionalNorm( condition=SemanticBelief( - description="We are talking to a child", name="talking to child" + description="We are talking to a child", + name="talking to child", + id=uuid.uuid4(), ), norm="Do not use cuss words", + id=uuid.uuid4(), ), ], triggers=[ Trigger( condition=InferredBelief( - left=KeywordBelief(keyword="key"), + left=KeywordBelief(keyword="key", id=uuid.uuid4()), right=InferredBelief( - left=KeywordBelief(keyword="key2"), + left=KeywordBelief(keyword="key2", id=uuid.uuid4()), right=SemanticBelief( - description="Decode this", name="semantic belief 2" + description="Decode this", name="semantic belief 2", id=uuid.uuid4() ), operator=LogicalOperator.OR, name="test trigger inferred inner", + id=uuid.uuid4(), ), operator=LogicalOperator.OR, name="test trigger inferred outer", + id=uuid.uuid4(), ), plan=Plan( steps=[ - SpeechAction(text="Testing trigger"), + SpeechAction(text="Testing trigger", id=uuid.uuid4()), Goal( name="Testing trigger", - plan=Plan(steps=[LLMAction(goal="Do something")]), + plan=Plan( + steps=[LLMAction(goal="Do something", id=uuid.uuid4())], + id=uuid.uuid4(), + ), + id=uuid.uuid4(), ), - ] + ], + id=uuid.uuid4(), ), + id=uuid.uuid4(), ) ], goals=[ Goal( name="Determine user age", - plan=Plan(steps=[LLMAction(goal="Determine the age of the user.")]), + plan=Plan( + steps=[LLMAction(goal="Determine the age of the user.", id=uuid.uuid4())], + id=uuid.uuid4(), + ), + id=uuid.uuid4(), ), Goal( name="Find the user's name", @@ -86,38 +106,62 @@ test_program = Program( steps=[ Goal( name="Greet the user", - plan=Plan(steps=[LLMAction(goal="Greet the user.")]), + plan=Plan( + steps=[LLMAction(goal="Greet the user.", id=uuid.uuid4())], + id=uuid.uuid4(), + ), can_fail=False, + id=uuid.uuid4(), ), Goal( name="Ask for name", - plan=Plan(steps=[LLMAction(goal="Obtain the user's name.")]), + plan=Plan( + steps=[ + LLMAction(goal="Obtain the user's name.", id=uuid.uuid4()) + ], + id=uuid.uuid4(), + ), + id=uuid.uuid4(), ), - ] + ], + id=uuid.uuid4(), ), + id=uuid.uuid4(), + ), + Goal( + name="Tell a joke", + plan=Plan( + steps=[LLMAction(goal="Tell a joke.", id=uuid.uuid4())], id=uuid.uuid4() + ), + id=uuid.uuid4(), ), - Goal(name="Tell a joke", plan=Plan(steps=[LLMAction(goal="Tell a joke.")])), ], - id=1, + id=uuid.uuid4(), ), Phase( - id=2, + id=uuid.uuid4(), norms=[ - BasicNorm(norm="Use very gentle speech."), + BasicNorm(norm="Use very gentle speech.", id=uuid.uuid4()), ConditionalNorm( condition=SemanticBelief( - description="We are talking to a child", name="talking to child" + description="We are talking to a child", + name="talking to child", + id=uuid.uuid4(), ), norm="Do not use cuss words", + id=uuid.uuid4(), ), ], triggers=[ Trigger( condition=InferredBelief( - left=KeywordBelief(keyword="help"), - right=SemanticBelief(description="User is stuck", name="stuck"), + left=KeywordBelief(keyword="help", id=uuid.uuid4()), + right=SemanticBelief( + description="User is stuck", name="stuck", id=uuid.uuid4() + ), operator=LogicalOperator.OR, name="help_or_stuck", + id=uuid.uuid4(), ), plan=Plan( steps=[ @@ -127,13 +171,18 @@ test_program = Program( steps=[ LLMAction( goal="Provide a step-by-step path to " - "resolve the user's issue." + "resolve the user's issue.", + id=uuid.uuid4(), ) - ] + ], + id=uuid.uuid4(), ), + id=uuid.uuid4(), ), - ] + ], + id=uuid.uuid4(), ), + id=uuid.uuid4(), ), ], goals=[ @@ -143,20 +192,38 @@ test_program = Program( steps=[ LLMAction( goal="Ask 1-2 targeted questions to clarify the " - "user's intent, then proceed." + "user's intent, then proceed.", + id=uuid.uuid4(), ) - ] + ], + id=uuid.uuid4(), ), + id=uuid.uuid4(), ), Goal( name="Provide solution", plan=Plan( - steps=[LLMAction(goal="Deliver a solution to complete the user's goal.")] + steps=[ + LLMAction( + goal="Deliver a solution to complete the user's goal.", + id=uuid.uuid4(), + ) + ], + id=uuid.uuid4(), ), + id=uuid.uuid4(), ), Goal( name="Summarize next steps", - plan=Plan(steps=[LLMAction(goal="Summarize what the user should do next.")]), + plan=Plan( + steps=[ + LLMAction( + goal="Summarize what the user should do next.", id=uuid.uuid4() + ) + ], + id=uuid.uuid4(), + ), + id=uuid.uuid4(), ), ], ), @@ -198,10 +265,16 @@ class AgentSpeakGenerator: return "\n".join(lines) def _generate_initial_beliefs(self, program: Program) -> Iterable[str]: - yield "// --- Initial beliefs ---" + yield "// --- Initial beliefs and agent startup ---" yield "phase(start)." + yield "" + + yield "+started" + yield f"{self.colon_prefix}phase(start)" + yield f"{self.arrow_prefix}phase({program.phases[0].id if program.phases else 'end'})." + yield from ["", ""] def _generate_basic_flow(self, program: Program) -> Iterable[str]: diff --git a/src/control_backend/schemas/program.py b/src/control_backend/schemas/program.py index 7c73a6a..529a23d 100644 --- a/src/control_backend/schemas/program.py +++ b/src/control_backend/schemas/program.py @@ -1,4 +1,5 @@ from enum import Enum +from typing import Literal from pydantic import UUID4, BaseModel @@ -133,9 +134,17 @@ class SpeechAction(ProgramElement): text: str -# TODO: gestures -class Gesture(Enum): - RAISE_HAND = "RAISE_HAND" +class Gesture(BaseModel): + """ + Represents a gesture to be performed. Can be either a single gesture, + or a random gesture from a category (tag). + + :ivar type: The type of the gesture, "tag" or "single". + :ivar name: The name of the single gesture or tag. + """ + + type: Literal["tag", "single"] + name: str class GestureAction(ProgramElement):