feat: persistent rules and stuff

So ugly

ref: N25B-376
This commit is contained in:
2025-12-18 14:33:42 +01:00
parent f91cec6708
commit 756e1f0dc5
3 changed files with 143 additions and 45 deletions

View File

@@ -93,6 +93,33 @@ class Rule:
return f"{self.head} :- {self.body}."
@dataclass
class PersistentRule:
"""
Represents an inference rule, where the inferred belief is persistent when formed.
"""
head: Expression
body: Expression
def __str__(self):
if not self.body:
raise Exception("Rule without body should not be persistent.")
lines = []
if isinstance(self.body, BinaryOp):
lines.append(f"+{self.body.left}")
if self.body.operator == "&":
lines.append(f" : {self.body.right}")
lines.append(f" <- +{self.head}.")
if self.body.operator == "|":
lines.append(f"+{self.body.right}")
lines.append(f" <- +{self.head}.")
return "\n".join(lines)
@dataclass
class Plan:
"""
@@ -148,7 +175,7 @@ class AgentSpeakFile:
"""
initial_beliefs: list[Rule] = field(default_factory=list)
inference_rules: list[Rule] = field(default_factory=list)
inference_rules: list[Rule | PersistentRule] = field(default_factory=list)
plans: list[Plan] = field(default_factory=list)
def __str__(self):
@@ -161,7 +188,11 @@ class AgentSpeakFile:
if self.inference_rules:
sections.append("// --- Inference Rules ---")
sections.extend(str(rule) for rule in self.inference_rules)
sections.extend(str(rule) for rule in self.inference_rules if isinstance(rule, Rule))
sections.append("")
sections.extend(
str(rule) for rule in self.inference_rules if isinstance(rule, PersistentRule)
)
sections.append("")
if self.plans:

View File

@@ -5,8 +5,6 @@ from functools import singledispatchmethod
from slugify import slugify
from control_backend.agents.bdi import BDICoreAgent
# Import the AST we defined above
from control_backend.agents.bdi.asl_ast import (
ActionLiteral,
AgentSpeakFile,
@@ -14,13 +12,13 @@ from control_backend.agents.bdi.asl_ast import (
BinaryOp,
Expression,
GoalLiteral,
PersistentRule,
Plan,
Rule,
)
from control_backend.agents.bdi.bdi_program_manager import test_program
# Import your Pydantic models (adjust import based on your file structure)
from control_backend.schemas.program import (
BasicBelief,
Belief,
ConditionalNorm,
GestureAction,
@@ -46,13 +44,17 @@ async def do_things():
f.write(program)
else:
# filename = "0test.asl"
filename = "1766053943.asl"
filename = "1766062491.asl"
bdi_agent = BDICoreAgent("BDICoreAgent", filename)
flag = asyncio.Event()
await bdi_agent.start()
await flag.wait()
def do_other_things():
print(AgentSpeakGenerator().generate(test_program))
class AgentSpeakGenerator:
"""
Converts a Pydantic Program behavior model into an AgentSpeak(L) AST,
@@ -118,6 +120,10 @@ class AgentSpeakGenerator:
# +user_said(Message) : phase(ID) <- !goal1; !goal2; !transition_phase.
goal_actions = [ActionLiteral("-responded_this_turn")]
goal_actions += [
ActionLiteral(f"!check_{self._slugify_str(keyword)}")
for keyword in self._get_keyword_conditionals(phase)
]
goal_actions += [ActionLiteral(f"!{self._slugify(g)}") for g in phase.goals]
goal_actions.append(ActionLiteral("!transition_phase"))
@@ -143,10 +149,20 @@ class AgentSpeakGenerator:
body=[
ActionLiteral(f'-phase("{phase.id}")'),
ActionLiteral(f'+phase("{next_id}")'),
ActionLiteral("user_said(Anything)"),
ActionLiteral("-+user_said(Anything)"),
],
)
)
def _get_keyword_conditionals(self, phase: Phase) -> list[str]:
res = []
for belief in self._extract_basic_beliefs_from_phase(phase):
if isinstance(belief, KeywordBelief):
res.append(belief.keyword)
return res
# --- Section: Norms & Beliefs ---
def _generate_norms(self, phase: Phase, asl: AgentSpeakFile):
@@ -174,21 +190,22 @@ class AgentSpeakGenerator:
though ASL engines often handle redefinition or we can use a set to track processed IDs.
"""
if isinstance(belief, KeywordBelief):
# Rule: keyword_said("word") :- user_said(M) & .substring("word", M, P) & P >= 0.
kwd_slug = f'"{belief.keyword}"'
head = BeliefLiteral("keyword_said", [kwd_slug])
# Avoid duplicates
if any(str(r.head) == str(head) for r in asl.inference_rules):
return
body = BinaryOp(
BeliefLiteral("user_said", ["Message"]),
"&",
BinaryOp(f".substring({kwd_slug}, Message, Pos)", "&", "Pos >= 0"),
)
asl.inference_rules.append(Rule(head=head, body=body))
pass
# # Rule: keyword_said("word") :- user_said(M) & .substring("word", M, P) & P >= 0.
# kwd_slug = f'"{belief.keyword}"'
# head = BeliefLiteral("keyword_said", [kwd_slug])
#
# # Avoid duplicates
# if any(str(r.head) == str(head) for r in asl.inference_rules):
# return
#
# body = BinaryOp(
# BeliefLiteral("user_said", ["Message"]),
# "&",
# BinaryOp(f".substring({kwd_slug}, Message, Pos)", "&", "Pos >= 0"),
# )
#
# asl.inference_rules.append(Rule(head=head, body=body))
elif isinstance(belief, InferredBelief):
self._ensure_belief_inference(belief.left, asl)
@@ -204,7 +221,7 @@ class AgentSpeakGenerator:
body = BinaryOp(
self._belief_to_expr(belief.left), op_char, self._belief_to_expr(belief.right)
)
asl.inference_rules.append(Rule(head=head, body=body))
asl.inference_rules.append(PersistentRule(head=head, body=body))
def _belief_to_expr(self, belief: Belief) -> Expression:
if isinstance(belief, KeywordBelief):
@@ -221,17 +238,26 @@ class AgentSpeakGenerator:
previous_goal = goal
def _generate_goal_plan_recursive(
self, goal: Goal, phase_id: str, previous_goal: Goal | None, asl: AgentSpeakFile
self,
goal: Goal,
phase_id: str,
previous_goal: Goal | None,
asl: AgentSpeakFile,
responded_needed: bool = True,
can_fail: bool = True,
):
goal_slug = self._slugify(goal)
# phase(ID) & not responded_this_turn & not achieved_goal
context = [
BeliefLiteral("phase", [f'"{phase_id}"']),
BeliefLiteral("responded_this_turn", negated=True),
BeliefLiteral(f"achieved_{goal_slug}", negated=True),
]
if responded_needed:
context.append(BeliefLiteral("responded_this_turn", negated=True))
if can_fail:
context.append(BeliefLiteral(f"achieved_{goal_slug}", negated=True))
if previous_goal:
prev_slug = self._slugify(previous_goal)
context.append(BeliefLiteral(f"achieved_{prev_slug}"))
@@ -256,6 +282,9 @@ class AgentSpeakGenerator:
body_actions.append(ActionLiteral(f"+achieved_{goal_slug}"))
asl.plans.append(Plan(trigger=GoalLiteral(goal_slug), context=context, body=body_actions))
asl.plans.append(
Plan(trigger=GoalLiteral(goal_slug), context=[], body=[ActionLiteral("true")])
)
prev_sub = None
for sub_goal in sub_goals_to_process:
@@ -265,6 +294,28 @@ class AgentSpeakGenerator:
# --- Section: Triggers ---
def _generate_triggers(self, phase: Phase, asl: AgentSpeakFile):
for keyword in self._get_keyword_conditionals(phase):
asl.plans.append(
Plan(
trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"),
context=[
ActionLiteral(
f'user_said(Message) & .substring("{keyword}", Message, Pos) & Pos >= 0'
)
],
body=[
ActionLiteral(f'+keyword_said("{keyword}")'),
ActionLiteral(f'-keyword_said("{keyword}")'),
],
)
)
asl.plans.append(
Plan(
trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"),
body=[ActionLiteral("true")],
)
)
for trigger in phase.triggers:
self._ensure_belief_inference(trigger.condition, asl)
@@ -300,31 +351,18 @@ class AgentSpeakGenerator:
# Recurse for triggered goals
prev_sub = None
for sub_goal in sub_goals:
self._generate_goal_plan_recursive(sub_goal, str(phase.id), prev_sub, asl)
self._generate_goal_plan_recursive(
sub_goal, str(phase.id), prev_sub, asl, False, False
)
prev_sub = sub_goal
# --- Section: Fallbacks ---
def _generate_fallbacks(self, program: Program, asl: AgentSpeakFile):
for phase in program.phases:
for goal in phase.goals:
self._generate_goal_fallbacks_recursive(goal, asl)
asl.plans.append(
Plan(trigger=GoalLiteral("transition_phase"), context=[], body=[ActionLiteral("true")])
)
def _generate_goal_fallbacks_recursive(self, goal: Goal, asl: AgentSpeakFile):
goal_slug = self._slugify(goal)
asl.plans.append(
Plan(trigger=GoalLiteral(goal_slug), context=[], body=[ActionLiteral("true")])
)
for step in goal.plan.steps:
if not isinstance(step, Goal):
continue
self._generate_goal_fallbacks_recursive(step, asl)
# --- Helpers ---
@singledispatchmethod
@@ -354,6 +392,34 @@ class AgentSpeakGenerator:
def _slugify_str(self, text: str) -> str:
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])
def _extract_basic_beliefs_from_program(self, program: Program) -> list[BasicBelief]:
beliefs = []
for phase in program.phases:
beliefs.extend(self._extract_basic_beliefs_from_phase(phase))
return beliefs
def _extract_basic_beliefs_from_phase(self, phase: Phase) -> list[BasicBelief]:
beliefs = []
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += self._extract_basic_beliefs_from_belief(norm.condition)
for trigger in phase.triggers:
beliefs += self._extract_basic_beliefs_from_belief(trigger.condition)
return beliefs
def _extract_basic_beliefs_from_belief(self, belief: Belief) -> list[BasicBelief]:
if isinstance(belief, InferredBelief):
return self._extract_basic_beliefs_from_belief(
belief.left
) + self._extract_basic_beliefs_from_belief(belief.right)
return [belief]
if __name__ == "__main__":
asyncio.run(do_things())
# do_other_things()y

View File

@@ -89,9 +89,9 @@ class BDICoreAgent(BaseAgent):
the agent has deferred intentions (deadlines).
"""
while self._running:
await (
self._wake_bdi_loop.wait()
) # gets set whenever there's an update to the belief base
# await (
# self._wake_bdi_loop.wait()
# ) # gets set whenever there's an update to the belief base
# Agent knows when it's expected to have to do its next thing
maybe_more_work = True
@@ -168,6 +168,7 @@ class BDICoreAgent(BaseAgent):
:param args: Arguments for the belief.
"""
# new_args = (agentspeak.Literal(arg) for arg in args) # TODO: Eventually support multiple
args = args or []
if args:
merged_args = DELIMITER.join(arg for arg in args)
new_args = (agentspeak.Literal(merged_args),)