The Big One #43
@@ -1,203 +0,0 @@
|
|||||||
import typing
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
|
|
||||||
# --- Types ---
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class BeliefLiteral:
|
|
||||||
"""
|
|
||||||
Represents a literal or atom.
|
|
||||||
Example: phase(1), user_said("hello"), ~started
|
|
||||||
"""
|
|
||||||
|
|
||||||
functor: str
|
|
||||||
args: list[str] = field(default_factory=list)
|
|
||||||
negated: bool = False
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
# In ASL, 'not' is usually for closed-world assumption (prolog style),
|
|
||||||
# '~' is for explicit negation in beliefs.
|
|
||||||
# For simplicity in behavior trees, we often use 'not' for conditions.
|
|
||||||
prefix = "not " if self.negated else ""
|
|
||||||
if not self.args:
|
|
||||||
return f"{prefix}{self.functor}"
|
|
||||||
|
|
||||||
# Clean args to ensure strings are quoted if they look like strings,
|
|
||||||
# but usually the converter handles the quoting of string literals.
|
|
||||||
args_str = ", ".join(self.args)
|
|
||||||
return f"{prefix}{self.functor}({args_str})"
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class GoalLiteral:
|
|
||||||
name: str
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return f"!{self.name}"
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class ActionLiteral:
|
|
||||||
"""
|
|
||||||
Represents a step in a plan body.
|
|
||||||
Example: .say("Hello") or !achieve_goal
|
|
||||||
"""
|
|
||||||
|
|
||||||
code: str
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return self.code
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class BinaryOp:
|
|
||||||
"""
|
|
||||||
Represents logical operations.
|
|
||||||
Example: (A & B) | C
|
|
||||||
"""
|
|
||||||
|
|
||||||
left: "Expression | str"
|
|
||||||
operator: typing.Literal["&", "|"]
|
|
||||||
right: "Expression | str"
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
l_str = str(self.left)
|
|
||||||
r_str = str(self.right)
|
|
||||||
|
|
||||||
if isinstance(self.left, BinaryOp):
|
|
||||||
l_str = f"({l_str})"
|
|
||||||
if isinstance(self.right, BinaryOp):
|
|
||||||
r_str = f"({r_str})"
|
|
||||||
|
|
||||||
return f"{l_str} {self.operator} {r_str}"
|
|
||||||
|
|
||||||
|
|
||||||
Literal = BeliefLiteral | GoalLiteral | ActionLiteral
|
|
||||||
Expression = Literal | BinaryOp | str
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Rule:
|
|
||||||
"""
|
|
||||||
Represents an inference rule.
|
|
||||||
Example: head :- body.
|
|
||||||
"""
|
|
||||||
|
|
||||||
head: Expression
|
|
||||||
body: Expression | None = None
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
if not self.body:
|
|
||||||
return f"{self.head}."
|
|
||||||
return f"{self.head} :- {self.body}."
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class PersistentRule:
|
|
||||||
"""
|
|
||||||
Represents an inference rule, where the inferred belief is persistent when formed.
|
|
||||||
"""
|
|
||||||
|
|
||||||
head: Expression
|
|
||||||
body: Expression
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
if not self.body:
|
|
||||||
raise Exception("Rule without body should not be persistent.")
|
|
||||||
|
|
||||||
lines = []
|
|
||||||
|
|
||||||
if isinstance(self.body, BinaryOp):
|
|
||||||
lines.append(f"+{self.body.left}")
|
|
||||||
if self.body.operator == "&":
|
|
||||||
lines.append(f" : {self.body.right}")
|
|
||||||
lines.append(f" <- +{self.head}.")
|
|
||||||
if self.body.operator == "|":
|
|
||||||
lines.append(f"+{self.body.right}")
|
|
||||||
lines.append(f" <- +{self.head}.")
|
|
||||||
|
|
||||||
return "\n".join(lines)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Plan:
|
|
||||||
"""
|
|
||||||
Represents a plan.
|
|
||||||
Syntax: +trigger : context <- body.
|
|
||||||
"""
|
|
||||||
|
|
||||||
trigger: BeliefLiteral | GoalLiteral
|
|
||||||
context: list[Expression] = field(default_factory=list)
|
|
||||||
body: list[ActionLiteral] = field(default_factory=list)
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
# Indentation settings
|
|
||||||
INDENT = " "
|
|
||||||
ARROW = "\n <- "
|
|
||||||
COLON = "\n : "
|
|
||||||
|
|
||||||
# Build Header
|
|
||||||
header = f"+{self.trigger}"
|
|
||||||
if self.context:
|
|
||||||
ctx_str = f" &\n{INDENT}".join(str(c) for c in self.context)
|
|
||||||
header += f"{COLON}{ctx_str}"
|
|
||||||
|
|
||||||
# Case 1: Empty body
|
|
||||||
if not self.body:
|
|
||||||
return f"{header}."
|
|
||||||
|
|
||||||
# Case 2: Short body (optional optimization, keeping it uniform usually better)
|
|
||||||
header += ARROW
|
|
||||||
|
|
||||||
lines = []
|
|
||||||
# We start the first action on the same line or next line.
|
|
||||||
# Let's put it on the next line for readability if there are multiple.
|
|
||||||
|
|
||||||
if len(self.body) == 1:
|
|
||||||
return f"{header}{self.body[0]}."
|
|
||||||
|
|
||||||
# First item
|
|
||||||
lines.append(f"{header}{self.body[0]};")
|
|
||||||
# Middle items
|
|
||||||
for item in self.body[1:-1]:
|
|
||||||
lines.append(f"{INDENT}{item};")
|
|
||||||
# Last item
|
|
||||||
lines.append(f"{INDENT}{self.body[-1]}.")
|
|
||||||
|
|
||||||
return "\n".join(lines)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class AgentSpeakFile:
|
|
||||||
"""
|
|
||||||
Root element representing the entire generated file.
|
|
||||||
"""
|
|
||||||
|
|
||||||
initial_beliefs: list[Rule] = field(default_factory=list)
|
|
||||||
inference_rules: list[Rule | PersistentRule] = field(default_factory=list)
|
|
||||||
plans: list[Plan] = field(default_factory=list)
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
sections = []
|
|
||||||
|
|
||||||
if self.initial_beliefs:
|
|
||||||
sections.append("// --- Initial Beliefs & Facts ---")
|
|
||||||
sections.extend(str(rule) for rule in self.initial_beliefs)
|
|
||||||
sections.append("")
|
|
||||||
|
|
||||||
if self.inference_rules:
|
|
||||||
sections.append("// --- Inference Rules ---")
|
|
||||||
sections.extend(str(rule) for rule in self.inference_rules if isinstance(rule, Rule))
|
|
||||||
sections.append("")
|
|
||||||
sections.extend(
|
|
||||||
str(rule) for rule in self.inference_rules if isinstance(rule, PersistentRule)
|
|
||||||
)
|
|
||||||
sections.append("")
|
|
||||||
|
|
||||||
if self.plans:
|
|
||||||
sections.append("// --- Plans ---")
|
|
||||||
# Separate plans by a newline for readability
|
|
||||||
sections.extend(str(plan) + "\n" for plan in self.plans)
|
|
||||||
|
|
||||||
return "\n".join(sections)
|
|
||||||
@@ -1,425 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
import time
|
|
||||||
from functools import singledispatchmethod
|
|
||||||
|
|
||||||
from slugify import slugify
|
|
||||||
|
|
||||||
from control_backend.agents.bdi import BDICoreAgent
|
|
||||||
from control_backend.agents.bdi.asl_ast import (
|
|
||||||
ActionLiteral,
|
|
||||||
AgentSpeakFile,
|
|
||||||
BeliefLiteral,
|
|
||||||
BinaryOp,
|
|
||||||
Expression,
|
|
||||||
GoalLiteral,
|
|
||||||
PersistentRule,
|
|
||||||
Plan,
|
|
||||||
Rule,
|
|
||||||
)
|
|
||||||
from control_backend.agents.bdi.bdi_program_manager import test_program
|
|
||||||
from control_backend.schemas.program import (
|
|
||||||
BasicBelief,
|
|
||||||
Belief,
|
|
||||||
ConditionalNorm,
|
|
||||||
GestureAction,
|
|
||||||
Goal,
|
|
||||||
InferredBelief,
|
|
||||||
KeywordBelief,
|
|
||||||
LLMAction,
|
|
||||||
LogicalOperator,
|
|
||||||
Phase,
|
|
||||||
Program,
|
|
||||||
ProgramElement,
|
|
||||||
SemanticBelief,
|
|
||||||
SpeechAction,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def do_things():
|
|
||||||
res = input("Wanna generate")
|
|
||||||
if res == "y":
|
|
||||||
program = AgentSpeakGenerator().generate(test_program)
|
|
||||||
filename = f"{int(time.time())}.asl"
|
|
||||||
with open(filename, "w") as f:
|
|
||||||
f.write(program)
|
|
||||||
else:
|
|
||||||
# filename = "0test.asl"
|
|
||||||
filename = "1766062491.asl"
|
|
||||||
bdi_agent = BDICoreAgent("BDICoreAgent", filename)
|
|
||||||
flag = asyncio.Event()
|
|
||||||
await bdi_agent.start()
|
|
||||||
await flag.wait()
|
|
||||||
|
|
||||||
|
|
||||||
def do_other_things():
|
|
||||||
print(AgentSpeakGenerator().generate(test_program))
|
|
||||||
|
|
||||||
|
|
||||||
class AgentSpeakGenerator:
|
|
||||||
"""
|
|
||||||
Converts a Pydantic Program behavior model into an AgentSpeak(L) AST,
|
|
||||||
then renders it to a string.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def generate(self, program: Program) -> str:
|
|
||||||
asl = AgentSpeakFile()
|
|
||||||
|
|
||||||
self._generate_startup(program, asl)
|
|
||||||
|
|
||||||
for i, phase in enumerate(program.phases):
|
|
||||||
next_phase = program.phases[i + 1] if i < len(program.phases) - 1 else None
|
|
||||||
|
|
||||||
self._generate_phase_flow(phase, next_phase, asl)
|
|
||||||
|
|
||||||
self._generate_norms(phase, asl)
|
|
||||||
|
|
||||||
self._generate_goals(phase, asl)
|
|
||||||
|
|
||||||
self._generate_triggers(phase, asl)
|
|
||||||
|
|
||||||
self._generate_fallbacks(program, asl)
|
|
||||||
|
|
||||||
return str(asl)
|
|
||||||
|
|
||||||
# --- Section: Startup & Phase Management ---
|
|
||||||
|
|
||||||
def _generate_startup(self, program: Program, asl: AgentSpeakFile):
|
|
||||||
if not program.phases:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Initial belief: phase(start).
|
|
||||||
asl.initial_beliefs.append(Rule(head=BeliefLiteral("phase", ['"start"'])))
|
|
||||||
|
|
||||||
# Startup plan: +started : phase(start) <- -phase(start); +phase(first_id).
|
|
||||||
asl.plans.append(
|
|
||||||
Plan(
|
|
||||||
trigger=BeliefLiteral("started"),
|
|
||||||
context=[BeliefLiteral("phase", ['"start"'])],
|
|
||||||
body=[
|
|
||||||
ActionLiteral('-phase("start")'),
|
|
||||||
ActionLiteral(f'+phase("{program.phases[0].id}")'),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Initial plans:
|
|
||||||
asl.plans.append(
|
|
||||||
Plan(
|
|
||||||
trigger=GoalLiteral("generate_response_with_goal(Goal)"),
|
|
||||||
context=[BeliefLiteral("user_said", ["Message"])],
|
|
||||||
body=[
|
|
||||||
ActionLiteral("+responded_this_turn"),
|
|
||||||
ActionLiteral(".findall(Norm, norm(Norm), Norms)"),
|
|
||||||
ActionLiteral(".reply_with_goal(Message, Norms, Goal)"),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
def _generate_phase_flow(self, phase: Phase, next_phase: Phase | None, asl: AgentSpeakFile):
|
|
||||||
"""Generates the main loop listener and the transition logic for this phase."""
|
|
||||||
|
|
||||||
# +user_said(Message) : phase(ID) <- !goal1; !goal2; !transition_phase.
|
|
||||||
goal_actions = [ActionLiteral("-responded_this_turn")]
|
|
||||||
goal_actions += [
|
|
||||||
ActionLiteral(f"!check_{self._slugify_str(keyword)}")
|
|
||||||
for keyword in self._get_keyword_conditionals(phase)
|
|
||||||
]
|
|
||||||
goal_actions += [ActionLiteral(f"!{self._slugify(g)}") for g in phase.goals]
|
|
||||||
goal_actions.append(ActionLiteral("!transition_phase"))
|
|
||||||
|
|
||||||
asl.plans.append(
|
|
||||||
Plan(
|
|
||||||
trigger=BeliefLiteral("user_said", ["Message"]),
|
|
||||||
context=[BeliefLiteral("phase", [f'"{phase.id}"'])],
|
|
||||||
body=goal_actions,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# +!transition_phase : phase(ID) <- -phase(ID); +(NEXT_ID).
|
|
||||||
next_id = str(next_phase.id) if next_phase else "end"
|
|
||||||
|
|
||||||
transition_context = [BeliefLiteral("phase", [f'"{phase.id}"'])]
|
|
||||||
if phase.goals:
|
|
||||||
transition_context.append(BeliefLiteral(f"achieved_{self._slugify(phase.goals[-1])}"))
|
|
||||||
|
|
||||||
asl.plans.append(
|
|
||||||
Plan(
|
|
||||||
trigger=GoalLiteral("transition_phase"),
|
|
||||||
context=transition_context,
|
|
||||||
body=[
|
|
||||||
ActionLiteral(f'-phase("{phase.id}")'),
|
|
||||||
ActionLiteral(f'+phase("{next_id}")'),
|
|
||||||
ActionLiteral("user_said(Anything)"),
|
|
||||||
ActionLiteral("-+user_said(Anything)"),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
def _get_keyword_conditionals(self, phase: Phase) -> list[str]:
|
|
||||||
res = []
|
|
||||||
for belief in self._extract_basic_beliefs_from_phase(phase):
|
|
||||||
if isinstance(belief, KeywordBelief):
|
|
||||||
res.append(belief.keyword)
|
|
||||||
|
|
||||||
return res
|
|
||||||
|
|
||||||
# --- Section: Norms & Beliefs ---
|
|
||||||
|
|
||||||
def _generate_norms(self, phase: Phase, asl: AgentSpeakFile):
|
|
||||||
for norm in phase.norms:
|
|
||||||
norm_slug = f'"{norm.norm}"'
|
|
||||||
head = BeliefLiteral("norm", [norm_slug])
|
|
||||||
|
|
||||||
# Base context is the phase
|
|
||||||
phase_lit = BeliefLiteral("phase", [f'"{phase.id}"'])
|
|
||||||
|
|
||||||
if isinstance(norm, ConditionalNorm):
|
|
||||||
self._ensure_belief_inference(norm.condition, asl)
|
|
||||||
|
|
||||||
condition_expr = self._belief_to_expr(norm.condition)
|
|
||||||
body = BinaryOp(phase_lit, "&", condition_expr)
|
|
||||||
else:
|
|
||||||
body = phase_lit
|
|
||||||
|
|
||||||
asl.inference_rules.append(Rule(head=head, body=body))
|
|
||||||
|
|
||||||
def _ensure_belief_inference(self, belief: Belief, asl: AgentSpeakFile):
|
|
||||||
"""
|
|
||||||
Recursively adds rules to infer beliefs.
|
|
||||||
Checks strictly to avoid duplicates if necessary,
|
|
||||||
though ASL engines often handle redefinition or we can use a set to track processed IDs.
|
|
||||||
"""
|
|
||||||
if isinstance(belief, KeywordBelief):
|
|
||||||
pass
|
|
||||||
# # Rule: keyword_said("word") :- user_said(M) & .substring("word", M, P) & P >= 0.
|
|
||||||
# kwd_slug = f'"{belief.keyword}"'
|
|
||||||
# head = BeliefLiteral("keyword_said", [kwd_slug])
|
|
||||||
#
|
|
||||||
# # Avoid duplicates
|
|
||||||
# if any(str(r.head) == str(head) for r in asl.inference_rules):
|
|
||||||
# return
|
|
||||||
#
|
|
||||||
# body = BinaryOp(
|
|
||||||
# BeliefLiteral("user_said", ["Message"]),
|
|
||||||
# "&",
|
|
||||||
# BinaryOp(f".substring({kwd_slug}, Message, Pos)", "&", "Pos >= 0"),
|
|
||||||
# )
|
|
||||||
#
|
|
||||||
# asl.inference_rules.append(Rule(head=head, body=body))
|
|
||||||
|
|
||||||
elif isinstance(belief, InferredBelief):
|
|
||||||
self._ensure_belief_inference(belief.left, asl)
|
|
||||||
self._ensure_belief_inference(belief.right, asl)
|
|
||||||
|
|
||||||
slug = self._slugify(belief)
|
|
||||||
head = BeliefLiteral(slug)
|
|
||||||
|
|
||||||
if any(str(r.head) == str(head) for r in asl.inference_rules):
|
|
||||||
return
|
|
||||||
|
|
||||||
op_char = "&" if belief.operator == LogicalOperator.AND else "|"
|
|
||||||
body = BinaryOp(
|
|
||||||
self._belief_to_expr(belief.left), op_char, self._belief_to_expr(belief.right)
|
|
||||||
)
|
|
||||||
asl.inference_rules.append(PersistentRule(head=head, body=body))
|
|
||||||
|
|
||||||
def _belief_to_expr(self, belief: Belief) -> Expression:
|
|
||||||
if isinstance(belief, KeywordBelief):
|
|
||||||
return BeliefLiteral("keyword_said", [f'"{belief.keyword}"'])
|
|
||||||
else:
|
|
||||||
return BeliefLiteral(self._slugify(belief))
|
|
||||||
|
|
||||||
# --- Section: Goals ---
|
|
||||||
|
|
||||||
def _generate_goals(self, phase: Phase, asl: AgentSpeakFile):
|
|
||||||
previous_goal: Goal | None = None
|
|
||||||
for goal in phase.goals:
|
|
||||||
self._generate_goal_plan_recursive(goal, str(phase.id), previous_goal, asl)
|
|
||||||
previous_goal = goal
|
|
||||||
|
|
||||||
def _generate_goal_plan_recursive(
|
|
||||||
self,
|
|
||||||
goal: Goal,
|
|
||||||
phase_id: str,
|
|
||||||
previous_goal: Goal | None,
|
|
||||||
asl: AgentSpeakFile,
|
|
||||||
responded_needed: bool = True,
|
|
||||||
can_fail: bool = True,
|
|
||||||
):
|
|
||||||
goal_slug = self._slugify(goal)
|
|
||||||
|
|
||||||
# phase(ID) & not responded_this_turn & not achieved_goal
|
|
||||||
context = [
|
|
||||||
BeliefLiteral("phase", [f'"{phase_id}"']),
|
|
||||||
]
|
|
||||||
|
|
||||||
if responded_needed:
|
|
||||||
context.append(BeliefLiteral("responded_this_turn", negated=True))
|
|
||||||
if can_fail:
|
|
||||||
context.append(BeliefLiteral(f"achieved_{goal_slug}", negated=True))
|
|
||||||
|
|
||||||
if previous_goal:
|
|
||||||
prev_slug = self._slugify(previous_goal)
|
|
||||||
context.append(BeliefLiteral(f"achieved_{prev_slug}"))
|
|
||||||
|
|
||||||
body_actions = []
|
|
||||||
sub_goals_to_process = []
|
|
||||||
|
|
||||||
for step in goal.plan.steps:
|
|
||||||
if isinstance(step, Goal):
|
|
||||||
sub_slug = self._slugify(step)
|
|
||||||
body_actions.append(ActionLiteral(f"!{sub_slug}"))
|
|
||||||
sub_goals_to_process.append(step)
|
|
||||||
elif isinstance(step, SpeechAction):
|
|
||||||
body_actions.append(ActionLiteral(f'.say("{step.text}")'))
|
|
||||||
elif isinstance(step, GestureAction):
|
|
||||||
body_actions.append(ActionLiteral(f'.gesture("{step.gesture}")'))
|
|
||||||
elif isinstance(step, LLMAction):
|
|
||||||
body_actions.append(ActionLiteral(f'!generate_response_with_goal("{step.goal}")'))
|
|
||||||
|
|
||||||
# Mark achievement
|
|
||||||
if not goal.can_fail:
|
|
||||||
body_actions.append(ActionLiteral(f"+achieved_{goal_slug}"))
|
|
||||||
|
|
||||||
asl.plans.append(Plan(trigger=GoalLiteral(goal_slug), context=context, body=body_actions))
|
|
||||||
asl.plans.append(
|
|
||||||
Plan(trigger=GoalLiteral(goal_slug), context=[], body=[ActionLiteral("true")])
|
|
||||||
)
|
|
||||||
|
|
||||||
prev_sub = None
|
|
||||||
for sub_goal in sub_goals_to_process:
|
|
||||||
self._generate_goal_plan_recursive(sub_goal, phase_id, prev_sub, asl)
|
|
||||||
prev_sub = sub_goal
|
|
||||||
|
|
||||||
# --- Section: Triggers ---
|
|
||||||
|
|
||||||
def _generate_triggers(self, phase: Phase, asl: AgentSpeakFile):
|
|
||||||
for keyword in self._get_keyword_conditionals(phase):
|
|
||||||
asl.plans.append(
|
|
||||||
Plan(
|
|
||||||
trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"),
|
|
||||||
context=[
|
|
||||||
ActionLiteral(
|
|
||||||
f'user_said(Message) & .substring("{keyword}", Message, Pos) & Pos >= 0'
|
|
||||||
)
|
|
||||||
],
|
|
||||||
body=[
|
|
||||||
ActionLiteral(f'+keyword_said("{keyword}")'),
|
|
||||||
ActionLiteral(f'-keyword_said("{keyword}")'),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
)
|
|
||||||
asl.plans.append(
|
|
||||||
Plan(
|
|
||||||
trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"),
|
|
||||||
body=[ActionLiteral("true")],
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
for trigger in phase.triggers:
|
|
||||||
self._ensure_belief_inference(trigger.condition, asl)
|
|
||||||
|
|
||||||
trigger_belief_slug = self._belief_to_expr(trigger.condition)
|
|
||||||
|
|
||||||
body_actions = []
|
|
||||||
sub_goals = []
|
|
||||||
|
|
||||||
for step in trigger.plan.steps:
|
|
||||||
if isinstance(step, Goal):
|
|
||||||
sub_slug = self._slugify(step)
|
|
||||||
body_actions.append(ActionLiteral(f"!{sub_slug}"))
|
|
||||||
sub_goals.append(step)
|
|
||||||
elif isinstance(step, SpeechAction):
|
|
||||||
body_actions.append(ActionLiteral(f'.say("{step.text}")'))
|
|
||||||
elif isinstance(step, GestureAction):
|
|
||||||
body_actions.append(
|
|
||||||
ActionLiteral(f'.gesture("{step.gesture.type}", "{step.gesture.name}")')
|
|
||||||
)
|
|
||||||
elif isinstance(step, LLMAction):
|
|
||||||
body_actions.append(
|
|
||||||
ActionLiteral(f'!generate_response_with_goal("{step.goal}")')
|
|
||||||
)
|
|
||||||
|
|
||||||
asl.plans.append(
|
|
||||||
Plan(
|
|
||||||
trigger=BeliefLiteral(trigger_belief_slug),
|
|
||||||
context=[BeliefLiteral("phase", [f'"{phase.id}"'])],
|
|
||||||
body=body_actions,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Recurse for triggered goals
|
|
||||||
prev_sub = None
|
|
||||||
for sub_goal in sub_goals:
|
|
||||||
self._generate_goal_plan_recursive(
|
|
||||||
sub_goal, str(phase.id), prev_sub, asl, False, False
|
|
||||||
)
|
|
||||||
prev_sub = sub_goal
|
|
||||||
|
|
||||||
# --- Section: Fallbacks ---
|
|
||||||
|
|
||||||
def _generate_fallbacks(self, program: Program, asl: AgentSpeakFile):
|
|
||||||
asl.plans.append(
|
|
||||||
Plan(trigger=GoalLiteral("transition_phase"), context=[], body=[ActionLiteral("true")])
|
|
||||||
)
|
|
||||||
|
|
||||||
# --- Helpers ---
|
|
||||||
|
|
||||||
@singledispatchmethod
|
|
||||||
def _slugify(self, element: ProgramElement) -> str:
|
|
||||||
if element.name:
|
|
||||||
raise NotImplementedError("Cannot slugify this element.")
|
|
||||||
return self._slugify_str(element.name)
|
|
||||||
|
|
||||||
@_slugify.register
|
|
||||||
def _(self, goal: Goal) -> str:
|
|
||||||
if goal.name:
|
|
||||||
return self._slugify_str(goal.name)
|
|
||||||
return f"goal_{goal.id.hex}"
|
|
||||||
|
|
||||||
@_slugify.register
|
|
||||||
def _(self, kwb: KeywordBelief) -> str:
|
|
||||||
return f"keyword_said({kwb.keyword})"
|
|
||||||
|
|
||||||
@_slugify.register
|
|
||||||
def _(self, sb: SemanticBelief) -> str:
|
|
||||||
return self._slugify_str(sb.description)
|
|
||||||
|
|
||||||
@_slugify.register
|
|
||||||
def _(self, ib: InferredBelief) -> str:
|
|
||||||
return self._slugify_str(ib.name)
|
|
||||||
|
|
||||||
def _slugify_str(self, text: str) -> str:
|
|
||||||
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])
|
|
||||||
|
|
||||||
def _extract_basic_beliefs_from_program(self, program: Program) -> list[BasicBelief]:
|
|
||||||
beliefs = []
|
|
||||||
|
|
||||||
for phase in program.phases:
|
|
||||||
beliefs.extend(self._extract_basic_beliefs_from_phase(phase))
|
|
||||||
|
|
||||||
return beliefs
|
|
||||||
|
|
||||||
def _extract_basic_beliefs_from_phase(self, phase: Phase) -> list[BasicBelief]:
|
|
||||||
beliefs = []
|
|
||||||
|
|
||||||
for norm in phase.norms:
|
|
||||||
if isinstance(norm, ConditionalNorm):
|
|
||||||
beliefs += self._extract_basic_beliefs_from_belief(norm.condition)
|
|
||||||
|
|
||||||
for trigger in phase.triggers:
|
|
||||||
beliefs += self._extract_basic_beliefs_from_belief(trigger.condition)
|
|
||||||
|
|
||||||
return beliefs
|
|
||||||
|
|
||||||
def _extract_basic_beliefs_from_belief(self, belief: Belief) -> list[BasicBelief]:
|
|
||||||
if isinstance(belief, InferredBelief):
|
|
||||||
return self._extract_basic_beliefs_from_belief(
|
|
||||||
belief.left
|
|
||||||
) + self._extract_basic_beliefs_from_belief(belief.right)
|
|
||||||
return [belief]
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
asyncio.run(do_things())
|
|
||||||
# do_other_things()y
|
|
||||||
Reference in New Issue
Block a user