feat: (hopefully) better intermediate representation

ref: N25B-376
This commit is contained in:
2025-12-17 15:33:27 +01:00
parent 742e36b94f
commit 1d36d2e089
4 changed files with 581 additions and 32 deletions

View File

@@ -0,0 +1,172 @@
import typing
from dataclasses import dataclass, field
# --- Types ---
@dataclass
class BeliefLiteral:
"""
Represents a literal or atom.
Example: phase(1), user_said("hello"), ~started
"""
functor: str
args: list[str] = field(default_factory=list)
negated: bool = False
def __str__(self):
# In ASL, 'not' is usually for closed-world assumption (prolog style),
# '~' is for explicit negation in beliefs.
# For simplicity in behavior trees, we often use 'not' for conditions.
prefix = "not " if self.negated else ""
if not self.args:
return f"{prefix}{self.functor}"
# Clean args to ensure strings are quoted if they look like strings,
# but usually the converter handles the quoting of string literals.
args_str = ", ".join(self.args)
return f"{prefix}{self.functor}({args_str})"
@dataclass
class GoalLiteral:
name: str
def __str__(self):
return f"!{self.name}"
@dataclass
class ActionLiteral:
"""
Represents a step in a plan body.
Example: .say("Hello") or !achieve_goal
"""
code: str
def __str__(self):
return self.code
@dataclass
class BinaryOp:
"""
Represents logical operations.
Example: (A & B) | C
"""
left: "Expression | str"
operator: typing.Literal["&", "|"]
right: "Expression | str"
def __str__(self):
l_str = str(self.left)
r_str = str(self.right)
if isinstance(self.left, BinaryOp):
l_str = f"({l_str})"
if isinstance(self.right, BinaryOp):
r_str = f"({r_str})"
return f"{l_str} {self.operator} {r_str}"
Literal = BeliefLiteral | GoalLiteral | ActionLiteral
Expression = Literal | BinaryOp | str
@dataclass
class Rule:
"""
Represents an inference rule.
Example: head :- body.
"""
head: Expression
body: Expression | None = None
def __str__(self):
if not self.body:
return f"{self.head}."
return f"{self.head} :- {self.body}."
@dataclass
class Plan:
"""
Represents a plan.
Syntax: +trigger : context <- body.
"""
trigger: BeliefLiteral | GoalLiteral
context: list[Expression] = field(default_factory=list)
body: list[ActionLiteral] = field(default_factory=list)
def __str__(self):
# Indentation settings
INDENT = " "
ARROW = "\n <- "
COLON = "\n : "
# Build Header
header = f"+{self.trigger}"
if self.context:
ctx_str = f" &\n{INDENT}".join(str(c) for c in self.context)
header += f"{COLON}{ctx_str}"
# Case 1: Empty body
if not self.body:
return f"{header}."
# Case 2: Short body (optional optimization, keeping it uniform usually better)
header += ARROW
lines = []
# We start the first action on the same line or next line.
# Let's put it on the next line for readability if there are multiple.
if len(self.body) == 1:
return f"{header}{self.body[0]}."
# First item
lines.append(f"{header}{self.body[0]};")
# Middle items
for item in self.body[1:-1]:
lines.append(f"{INDENT}{item};")
# Last item
lines.append(f"{INDENT}{self.body[-1]}.")
return "\n".join(lines)
@dataclass
class AgentSpeakFile:
"""
Root element representing the entire generated file.
"""
initial_beliefs: list[Rule] = field(default_factory=list)
inference_rules: list[Rule] = field(default_factory=list)
plans: list[Plan] = field(default_factory=list)
def __str__(self):
sections = []
if self.initial_beliefs:
sections.append("// --- Initial Beliefs & Facts ---")
sections.extend(str(rule) for rule in self.initial_beliefs)
sections.append("")
if self.inference_rules:
sections.append("// --- Inference Rules ---")
sections.extend(str(rule) for rule in self.inference_rules)
sections.append("")
if self.plans:
sections.append("// --- Plans ---")
# Separate plans by a newline for readability
sections.extend(str(plan) + "\n" for plan in self.plans)
return "\n".join(sections)

View File

@@ -0,0 +1,295 @@
from functools import singledispatchmethod
from slugify import slugify
# Import the AST we defined above
from control_backend.agents.bdi.asl_ast import (
ActionLiteral,
AgentSpeakFile,
BeliefLiteral,
BinaryOp,
Expression,
GoalLiteral,
Plan,
Rule,
)
from control_backend.agents.bdi.bdi_program_manager import test_program
# Import your Pydantic models (adjust import based on your file structure)
from control_backend.schemas.program import (
Belief,
ConditionalNorm,
GestureAction,
Goal,
InferredBelief,
KeywordBelief,
LLMAction,
LogicalOperator,
Phase,
Program,
ProgramElement,
SemanticBelief,
SpeechAction,
)
def do_things():
print(AgentSpeakGenerator().generate(test_program))
class AgentSpeakGenerator:
"""
Converts a Pydantic Program behavior model into an AgentSpeak(L) AST,
then renders it to a string.
"""
def generate(self, program: Program) -> str:
asl = AgentSpeakFile()
self._generate_startup(program, asl)
for i, phase in enumerate(program.phases):
next_phase = program.phases[i + 1] if i < len(program.phases) - 1 else None
self._generate_phase_flow(phase, next_phase, asl)
self._generate_norms(phase, asl)
self._generate_goals(phase, asl)
self._generate_triggers(phase, asl)
return str(asl)
# --- Section: Startup & Phase Management ---
def _generate_startup(self, program: Program, asl: AgentSpeakFile):
if not program.phases:
return
# Initial belief: phase(start).
asl.initial_beliefs.append(Rule(head=BeliefLiteral("phase", ["start"])))
# Startup plan: +started : phase(start) <- -+phase(first_id).
asl.plans.append(
Plan(
trigger=BeliefLiteral("started"),
context=[BeliefLiteral("phase", ["start"])],
body=[ActionLiteral("!transition_phase")],
)
)
def _generate_phase_flow(self, phase: Phase, next_phase: Phase | None, asl: AgentSpeakFile):
"""Generates the main loop listener and the transition logic for this phase."""
# +user_said(Message) : phase(ID) <- !goal1; !goal2; !transition_phase.
goal_actions = [ActionLiteral(f"!{self._slugify(g)}") for g in phase.goals]
goal_actions.append(ActionLiteral("!transition_phase"))
asl.plans.append(
Plan(
trigger=BeliefLiteral("user_said", ["Message"]),
context=[BeliefLiteral("phase", [str(phase.id)])],
body=goal_actions,
)
)
# +!transition_phase : phase(ID) <- -+phase(NEXT_ID).
next_id = next_phase.id if next_phase else "end"
asl.plans.append(
Plan(
trigger=GoalLiteral("transition_phase"),
context=[BeliefLiteral("phase", [str(phase.id)])],
body=[ActionLiteral(f"-+phase({next_id})")],
)
)
# --- Section: Norms & Beliefs ---
def _generate_norms(self, phase: Phase, asl: AgentSpeakFile):
for norm in phase.norms:
norm_slug = f'"{norm.norm}"'
head = BeliefLiteral("norm", [norm_slug])
# Base context is the phase
phase_lit = BeliefLiteral("phase", [str(phase.id)])
if isinstance(norm, ConditionalNorm):
self._ensure_belief_inference(norm.condition, asl)
condition_expr = self._belief_to_expr(norm.condition)
body = BinaryOp(phase_lit, "&", condition_expr)
else:
body = phase_lit
asl.inference_rules.append(Rule(head=head, body=body))
def _ensure_belief_inference(self, belief: Belief, asl: AgentSpeakFile):
"""
Recursively adds rules to infer beliefs.
Checks strictly to avoid duplicates if necessary,
though ASL engines often handle redefinition or we can use a set to track processed IDs.
"""
if isinstance(belief, KeywordBelief):
# Rule: keyword_said("word") :- user_said(M) & .substring(M, "word", P) & P >= 0.
kwd_slug = f'"{belief.keyword}"'
head = BeliefLiteral("keyword_said", [kwd_slug])
# Avoid duplicates
if any(str(r.head) == str(head) for r in asl.inference_rules):
return
body = BinaryOp(
BeliefLiteral("user_said", ["Message"]),
"&",
BinaryOp(f".substring(Message, {kwd_slug}, Pos)", "&", "Pos >= 0"),
)
asl.inference_rules.append(Rule(head=head, body=body))
elif isinstance(belief, InferredBelief):
self._ensure_belief_inference(belief.left, asl)
self._ensure_belief_inference(belief.right, asl)
slug = self._slugify(belief)
head = BeliefLiteral(slug)
if any(str(r.head) == str(head) for r in asl.inference_rules):
return
op_char = "&" if belief.operator == LogicalOperator.AND else "|"
body = BinaryOp(
self._belief_to_expr(belief.left), op_char, self._belief_to_expr(belief.right)
)
asl.inference_rules.append(Rule(head=head, body=body))
def _belief_to_expr(self, belief: Belief) -> Expression:
if isinstance(belief, KeywordBelief):
return BeliefLiteral("keyword_said", [f'"{belief.keyword}"'])
else:
return BeliefLiteral(self._slugify(belief))
# --- Section: Goals ---
def _generate_goals(self, phase: Phase, asl: AgentSpeakFile):
previous_goal: Goal | None = None
for goal in phase.goals:
self._generate_goal_plan_recursive(goal, str(phase.id), previous_goal, asl)
previous_goal = goal
def _generate_goal_plan_recursive(
self, goal: Goal, phase_id: str, previous_goal: Goal | None, asl: AgentSpeakFile
):
goal_slug = self._slugify(goal)
# phase(ID) & not responded_this_turn & not achieved_goal
context = [
BeliefLiteral("phase", [phase_id]),
BeliefLiteral("responded_this_turn", negated=True),
BeliefLiteral(f"achieved_{goal_slug}", negated=True),
]
if previous_goal:
prev_slug = self._slugify(previous_goal)
context.append(BeliefLiteral(f"achieved_{prev_slug}"))
body_actions = []
sub_goals_to_process = []
for step in goal.plan.steps:
if isinstance(step, Goal):
sub_slug = self._slugify(step)
body_actions.append(ActionLiteral(f"!{sub_slug}"))
sub_goals_to_process.append(step)
elif isinstance(step, SpeechAction):
body_actions.append(ActionLiteral(f'.say("{step.text}")'))
elif isinstance(step, GestureAction):
body_actions.append(ActionLiteral(f'.gesture("{step.gesture}")'))
elif isinstance(step, LLMAction):
body_actions.append(ActionLiteral(f'!generate_response_with_goal("{step.goal}")'))
# Mark achievement
if not goal.can_fail:
body_actions.append(ActionLiteral(f"+achieved_{goal_slug}"))
asl.plans.append(Plan(trigger=GoalLiteral(goal_slug), context=context, body=body_actions))
prev_sub = None
for sub_goal in sub_goals_to_process:
self._generate_goal_plan_recursive(sub_goal, phase_id, prev_sub, asl)
prev_sub = sub_goal
# --- Section: Triggers ---
def _generate_triggers(self, phase: Phase, asl: AgentSpeakFile):
for trigger in phase.triggers:
self._ensure_belief_inference(trigger.condition, asl)
trigger_belief_slug = self._belief_to_expr(trigger.condition)
body_actions = []
sub_goals = []
for step in trigger.plan.steps:
if isinstance(step, Goal):
sub_slug = self._slugify(step)
body_actions.append(ActionLiteral(f"!{sub_slug}"))
sub_goals.append(step)
elif isinstance(step, SpeechAction):
body_actions.append(ActionLiteral(f'.say("{step.text}")'))
elif isinstance(step, GestureAction):
body_actions.append(
ActionLiteral(f'.gesture("{step.gesture.type}", "{step.gesture.name}")')
)
elif isinstance(step, LLMAction):
body_actions.append(
ActionLiteral(f'!generate_response_with_goal("{step.goal}")')
)
asl.plans.append(
Plan(
trigger=BeliefLiteral(trigger_belief_slug),
context=[BeliefLiteral("phase", [str(phase.id)])],
body=body_actions,
)
)
# Recurse for triggered goals
prev_sub = None
for sub_goal in sub_goals:
self._generate_goal_plan_recursive(sub_goal, str(phase.id), prev_sub, asl)
prev_sub = sub_goal
# --- Helpers ---
@singledispatchmethod
def _slugify(self, element: ProgramElement) -> str:
if element.name:
raise NotImplementedError("Cannot slugify this element.")
return self._slugify_str(element.name)
@_slugify.register
def _(self, goal: Goal) -> str:
if goal.name:
return self._slugify_str(goal.name)
return f"goal_{goal.id}"
@_slugify.register
def _(self, kwb: KeywordBelief) -> str:
return f"keyword_said({kwb.keyword})"
@_slugify.register
def _(self, sb: SemanticBelief) -> str:
return self._slugify_str(sb.description)
@_slugify.register
def _(self, ib: InferredBelief) -> str:
return self._slugify_str(ib.name)
def _slugify_str(self, text: str) -> str:
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])
if __name__ == "__main__":
do_things()

View File

@@ -1,3 +1,4 @@
import uuid
from collections.abc import Iterable
import zmq
@@ -32,53 +33,72 @@ test_program = Program(
phases=[
Phase(
norms=[
BasicNorm(norm="Talk like a pirate"),
BasicNorm(norm="Talk like a pirate", id=uuid.uuid4()),
ConditionalNorm(
condition=InferredBelief(
left=KeywordBelief(keyword="Arr"),
right=SemanticBelief(description="testing", name="semantic belief"),
left=KeywordBelief(keyword="Arr", id=uuid.uuid4()),
right=SemanticBelief(
description="testing", name="semantic belief", id=uuid.uuid4()
),
operator=LogicalOperator.OR,
name="Talking to a pirate",
id=uuid.uuid4(),
),
norm="Use nautical terms",
id=uuid.uuid4(),
),
ConditionalNorm(
condition=SemanticBelief(
description="We are talking to a child", name="talking to child"
description="We are talking to a child",
name="talking to child",
id=uuid.uuid4(),
),
norm="Do not use cuss words",
id=uuid.uuid4(),
),
],
triggers=[
Trigger(
condition=InferredBelief(
left=KeywordBelief(keyword="key"),
left=KeywordBelief(keyword="key", id=uuid.uuid4()),
right=InferredBelief(
left=KeywordBelief(keyword="key2"),
left=KeywordBelief(keyword="key2", id=uuid.uuid4()),
right=SemanticBelief(
description="Decode this", name="semantic belief 2"
description="Decode this", name="semantic belief 2", id=uuid.uuid4()
),
operator=LogicalOperator.OR,
name="test trigger inferred inner",
id=uuid.uuid4(),
),
operator=LogicalOperator.OR,
name="test trigger inferred outer",
id=uuid.uuid4(),
),
plan=Plan(
steps=[
SpeechAction(text="Testing trigger"),
SpeechAction(text="Testing trigger", id=uuid.uuid4()),
Goal(
name="Testing trigger",
plan=Plan(steps=[LLMAction(goal="Do something")]),
plan=Plan(
steps=[LLMAction(goal="Do something", id=uuid.uuid4())],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
]
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
)
],
goals=[
Goal(
name="Determine user age",
plan=Plan(steps=[LLMAction(goal="Determine the age of the user.")]),
plan=Plan(
steps=[LLMAction(goal="Determine the age of the user.", id=uuid.uuid4())],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
Goal(
name="Find the user's name",
@@ -86,38 +106,62 @@ test_program = Program(
steps=[
Goal(
name="Greet the user",
plan=Plan(steps=[LLMAction(goal="Greet the user.")]),
plan=Plan(
steps=[LLMAction(goal="Greet the user.", id=uuid.uuid4())],
id=uuid.uuid4(),
),
can_fail=False,
id=uuid.uuid4(),
),
Goal(
name="Ask for name",
plan=Plan(steps=[LLMAction(goal="Obtain the user's name.")]),
plan=Plan(
steps=[
LLMAction(goal="Obtain the user's name.", id=uuid.uuid4())
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
]
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
Goal(
name="Tell a joke",
plan=Plan(
steps=[LLMAction(goal="Tell a joke.", id=uuid.uuid4())], id=uuid.uuid4()
),
id=uuid.uuid4(),
),
Goal(name="Tell a joke", plan=Plan(steps=[LLMAction(goal="Tell a joke.")])),
],
id=1,
id=uuid.uuid4(),
),
Phase(
id=2,
id=uuid.uuid4(),
norms=[
BasicNorm(norm="Use very gentle speech."),
BasicNorm(norm="Use very gentle speech.", id=uuid.uuid4()),
ConditionalNorm(
condition=SemanticBelief(
description="We are talking to a child", name="talking to child"
description="We are talking to a child",
name="talking to child",
id=uuid.uuid4(),
),
norm="Do not use cuss words",
id=uuid.uuid4(),
),
],
triggers=[
Trigger(
condition=InferredBelief(
left=KeywordBelief(keyword="help"),
right=SemanticBelief(description="User is stuck", name="stuck"),
left=KeywordBelief(keyword="help", id=uuid.uuid4()),
right=SemanticBelief(
description="User is stuck", name="stuck", id=uuid.uuid4()
),
operator=LogicalOperator.OR,
name="help_or_stuck",
id=uuid.uuid4(),
),
plan=Plan(
steps=[
@@ -127,13 +171,18 @@ test_program = Program(
steps=[
LLMAction(
goal="Provide a step-by-step path to "
"resolve the user's issue."
"resolve the user's issue.",
id=uuid.uuid4(),
)
]
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
]
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
],
goals=[
@@ -143,20 +192,38 @@ test_program = Program(
steps=[
LLMAction(
goal="Ask 1-2 targeted questions to clarify the "
"user's intent, then proceed."
"user's intent, then proceed.",
id=uuid.uuid4(),
)
]
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
Goal(
name="Provide solution",
plan=Plan(
steps=[LLMAction(goal="Deliver a solution to complete the user's goal.")]
steps=[
LLMAction(
goal="Deliver a solution to complete the user's goal.",
id=uuid.uuid4(),
)
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
Goal(
name="Summarize next steps",
plan=Plan(steps=[LLMAction(goal="Summarize what the user should do next.")]),
plan=Plan(
steps=[
LLMAction(
goal="Summarize what the user should do next.", id=uuid.uuid4()
)
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
],
),
@@ -198,10 +265,16 @@ class AgentSpeakGenerator:
return "\n".join(lines)
def _generate_initial_beliefs(self, program: Program) -> Iterable[str]:
yield "// --- Initial beliefs ---"
yield "// --- Initial beliefs and agent startup ---"
yield "phase(start)."
yield ""
yield "+started"
yield f"{self.colon_prefix}phase(start)"
yield f"{self.arrow_prefix}phase({program.phases[0].id if program.phases else 'end'})."
yield from ["", ""]
def _generate_basic_flow(self, program: Program) -> Iterable[str]:

View File

@@ -1,4 +1,5 @@
from enum import Enum
from typing import Literal
from pydantic import UUID4, BaseModel
@@ -133,9 +134,17 @@ class SpeechAction(ProgramElement):
text: str
# TODO: gestures
class Gesture(Enum):
RAISE_HAND = "RAISE_HAND"
class Gesture(BaseModel):
"""
Represents a gesture to be performed. Can be either a single gesture,
or a random gesture from a category (tag).
:ivar type: The type of the gesture, "tag" or "single".
:ivar name: The name of the single gesture or tag.
"""
type: Literal["tag", "single"]
name: str
class GestureAction(ProgramElement):