Compare commits

...

16 Commits

Author SHA1 Message Date
Twirre Meulenbelt
3406e9ac2f feat: make the pipeline work with Program and AgentSpeak
ref: N25B-429
2026-01-06 15:26:44 +01:00
a357b6990b feat: send program to bdi core
ref: N25B-376
2026-01-06 12:11:37 +01:00
9eea4ee345 feat: new ASL generation
ref: N25B-376
2026-01-02 12:08:20 +01:00
3253760ef1 feat: new AST representation
File names will be changed eventually.

ref: N25B-376
2025-12-23 17:30:35 +01:00
756e1f0dc5 feat: persistent rules and stuff
So ugly

ref: N25B-376
2025-12-18 14:33:42 +01:00
Twirre Meulenbelt
f91cec6708 fix: things in AgentSpeak, add custom actions
ref: N25B-376
2025-12-18 11:50:16 +01:00
28262eb27e fix: default case for plans
ref: N25B-376
2025-12-17 16:20:37 +01:00
1d36d2e089 feat: (hopefully) better intermediate representation
ref: N25B-376
2025-12-17 15:33:27 +01:00
742e36b94f chore: non-optional uuid id
ref: N25B-376
2025-12-17 14:30:14 +01:00
Twirre Meulenbelt
57fe3ae3f6 Merge remote-tracking branch 'origin/dev' into feat/agentspeak-generation 2025-12-17 13:20:14 +01:00
e704ec5ed4 feat: basic flow and phase transitions
ref: N25B-376
2025-12-16 17:00:32 +01:00
Twirre Meulenbelt
27f04f0958 style: use yield instead of returning arrays
ref: N25B-376
2025-12-16 16:11:01 +01:00
Twirre Meulenbelt
8cc177041a feat: add a second phase in test_program
ref: N25B-376
2025-12-16 15:12:22 +01:00
4a432a603f fix: separate trigger plan generation
ref: N25B-376
2025-12-16 14:12:04 +01:00
bab4800698 feat: add trigger generation
ref: N25B-376
2025-12-16 12:10:52 +01:00
d043c54336 refactor: program restructure
Also includes some AgentSpeak generation.

ref: N25B-376
2025-12-16 10:21:50 +01:00
14 changed files with 1589 additions and 90 deletions

View File

@@ -15,6 +15,7 @@ dependencies = [
"pydantic>=2.12.0",
"pydantic-settings>=2.11.0",
"python-json-logger>=4.0.0",
"python-slugify>=8.0.4",
"pyyaml>=6.0.3",
"pyzmq>=27.1.0",
"silero-vad>=6.0.0",

View File

@@ -0,0 +1,273 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import StrEnum
class AstNode(ABC):
"""
Abstract base class for all elements of an AgentSpeak program.
"""
@abstractmethod
def _to_agentspeak(self) -> str:
"""
Generates the AgentSpeak code string.
"""
pass
def __str__(self) -> str:
return self._to_agentspeak()
class AstExpression(AstNode, ABC):
"""
Intermediate class for anything that can be used in a logical expression.
"""
def __and__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.AND, _coalesce_expr(other))
def __or__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.OR, _coalesce_expr(other))
def __invert__(self) -> AstLogicalExpression:
if isinstance(self, AstLogicalExpression):
self.negated = not self.negated
return self
return AstLogicalExpression(self, negated=True)
type ExprCoalescible = AstExpression | str | int | float
def _coalesce_expr(value: ExprCoalescible) -> AstExpression:
if isinstance(value, AstExpression):
return value
if isinstance(value, str):
return AstString(value)
if isinstance(value, (int, float)):
return AstNumber(value)
raise TypeError(f"Cannot coalesce type {type(value)} into an AstTerm.")
@dataclass
class AstTerm(AstExpression, ABC):
"""
Base class for terms appearing inside literals.
"""
def __ge__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.GREATER_EQUALS, _coalesce_expr(other))
def __gt__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.GREATER_THAN, _coalesce_expr(other))
def __le__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.LESS_EQUALS, _coalesce_expr(other))
def __lt__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.LESS_THAN, _coalesce_expr(other))
def __eq__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.EQUALS, _coalesce_expr(other))
def __ne__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.NOT_EQUALS, _coalesce_expr(other))
@dataclass
class AstAtom(AstTerm):
"""
Grounded expression in all lowercase.
"""
value: str
def _to_agentspeak(self) -> str:
return self.value.lower()
@dataclass
class AstVar(AstTerm):
"""
Ungrounded variable expression. First letter capitalized.
"""
name: str
def _to_agentspeak(self) -> str:
return self.name.capitalize()
@dataclass
class AstNumber(AstTerm):
value: int | float
def _to_agentspeak(self) -> str:
return str(self.value)
@dataclass
class AstString(AstTerm):
value: str
def _to_agentspeak(self) -> str:
return f'"{self.value}"'
@dataclass
class AstLiteral(AstTerm):
functor: str
terms: list[AstTerm] = field(default_factory=list)
def _to_agentspeak(self) -> str:
if not self.terms:
return self.functor
args = ", ".join(map(str, self.terms))
return f"{self.functor}({args})"
class BinaryOperatorType(StrEnum):
AND = "&"
OR = "|"
GREATER_THAN = ">"
LESS_THAN = "<"
EQUALS = "=="
NOT_EQUALS = "\\=="
GREATER_EQUALS = ">="
LESS_EQUALS = "<="
@dataclass
class AstBinaryOp(AstExpression):
left: AstExpression
operator: BinaryOperatorType
right: AstExpression
def __post_init__(self):
self.left = _as_logical(self.left)
self.right = _as_logical(self.right)
def _to_agentspeak(self) -> str:
l_str = str(self.left)
r_str = str(self.right)
assert isinstance(self.left, AstLogicalExpression)
assert isinstance(self.right, AstLogicalExpression)
if isinstance(self.left.expression, AstBinaryOp) or self.left.negated:
l_str = f"({l_str})"
if isinstance(self.right.expression, AstBinaryOp) or self.right.negated:
r_str = f"({r_str})"
return f"{l_str} {self.operator.value} {r_str}"
@dataclass
class AstLogicalExpression(AstExpression):
expression: AstExpression
negated: bool = False
def _to_agentspeak(self) -> str:
expr_str = str(self.expression)
if isinstance(self.expression, AstBinaryOp) and self.negated:
expr_str = f"({expr_str})"
return f"{'not ' if self.negated else ''}{expr_str}"
def _as_logical(expr: AstExpression) -> AstLogicalExpression:
if isinstance(expr, AstLogicalExpression):
return expr
return AstLogicalExpression(expr)
class StatementType(StrEnum):
EMPTY = ""
DO_ACTION = "."
ACHIEVE_GOAL = "!"
TEST_GOAL = "?"
ADD_BELIEF = "+"
REMOVE_BELIEF = "-"
REPLACE_BELIEF = "-+"
@dataclass
class AstStatement(AstNode):
"""
A statement that can appear inside a plan.
"""
type: StatementType
expression: AstExpression
def _to_agentspeak(self) -> str:
return f"{self.type.value}{self.expression}"
@dataclass
class AstRule(AstNode):
result: AstExpression
condition: AstExpression | None = None
def __post_init__(self):
if self.condition is not None:
self.condition = _as_logical(self.condition)
def _to_agentspeak(self) -> str:
if not self.condition:
return f"{self.result}."
return f"{self.result} :- {self.condition}."
class TriggerType(StrEnum):
ADDED_BELIEF = "+"
# REMOVED_BELIEF = "-" # TODO
# MODIFIED_BELIEF = "^" # TODO
ADDED_GOAL = "+!"
# REMOVED_GOAL = "-!" # TODO
@dataclass
class AstPlan(AstNode):
type: TriggerType
trigger_literal: AstExpression
context: list[AstExpression]
body: list[AstStatement]
def _to_agentspeak(self) -> str:
assert isinstance(self.trigger_literal, AstLiteral)
indent = " " * 6
colon = " : "
arrow = " <- "
lines = []
lines.append(f"{self.type.value}{self.trigger_literal}")
if self.context:
lines.append(colon + f" &\n{indent}".join(str(c) for c in self.context))
if self.body:
lines.append(arrow + f";\n{indent}".join(str(s) for s in self.body) + ".")
lines.append("")
return "\n".join(lines)
@dataclass
class AstProgram(AstNode):
rules: list[AstRule] = field(default_factory=list)
plans: list[AstPlan] = field(default_factory=list)
def _to_agentspeak(self) -> str:
lines = []
lines.extend(map(str, self.rules))
lines.extend(["", ""])
lines.extend(map(str, self.plans))
return "\n".join(lines)

View File

@@ -0,0 +1,373 @@
from functools import singledispatchmethod
from slugify import slugify
from control_backend.agents.bdi.agentspeak_ast import (
AstBinaryOp,
AstExpression,
AstLiteral,
AstPlan,
AstProgram,
AstRule,
AstStatement,
AstString,
AstVar,
BinaryOperatorType,
StatementType,
TriggerType,
)
from control_backend.schemas.program import (
BasicNorm,
ConditionalNorm,
GestureAction,
Goal,
InferredBelief,
KeywordBelief,
LLMAction,
LogicalOperator,
Norm,
Phase,
PlanElement,
Program,
ProgramElement,
SemanticBelief,
SpeechAction,
Trigger,
)
class AgentSpeakGenerator:
_asp: AstProgram
def generate(self, program: Program) -> str:
self._asp = AstProgram()
self._asp.rules.append(AstRule(self._astify(program.phases[0])))
self._add_keyword_inference()
self._add_default_plans()
self._process_phases(program.phases)
self._add_fallbacks()
return str(self._asp)
def _add_keyword_inference(self) -> None:
keyword = AstVar("Keyword")
message = AstVar("Message")
position = AstVar("Pos")
self._asp.rules.append(
AstRule(
AstLiteral("keyword_said", [keyword]),
AstLiteral("user_said", [message])
& AstLiteral(".substring", [keyword, message, position])
& (position >= 0),
)
)
def _add_default_plans(self):
self._add_reply_with_goal_plan()
self._add_say_plan()
self._add_reply_plan()
def _add_reply_with_goal_plan(self):
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("reply_with_goal", [AstVar("Goal")]),
[AstLiteral("user_said", [AstVar("Message")])],
[
AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")),
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"findall",
[AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")],
),
),
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"reply_with_goal", [AstVar("Message"), AstVar("Norms"), AstVar("Goal")]
),
),
],
)
)
def _add_say_plan(self):
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("say", [AstVar("Text")]),
[],
[
AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")),
AstStatement(StatementType.DO_ACTION, AstLiteral("say", [AstVar("Text")])),
],
)
)
def _add_reply_plan(self):
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("reply"),
[AstLiteral("user_said", [AstVar("Message")])],
[
AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")),
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"findall",
[AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")],
),
),
AstStatement(
StatementType.DO_ACTION,
AstLiteral("reply", [AstVar("Message"), AstVar("Norms")]),
),
],
)
)
def _process_phases(self, phases: list[Phase]) -> None:
for curr_phase, next_phase in zip([None] + phases, phases + [None], strict=True):
if curr_phase:
self._process_phase(curr_phase)
self._add_phase_transition(curr_phase, next_phase)
# End phase behavior
# When deleting this, the entire `reply` plan and action can be deleted
self._asp.plans.append(
AstPlan(
type=TriggerType.ADDED_BELIEF,
trigger_literal=AstLiteral("user_said", [AstVar("Message")]),
context=[AstLiteral("phase", [AstString("end")])],
body=[AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("reply"))],
)
)
def _process_phase(self, phase: Phase) -> None:
for norm in phase.norms:
self._process_norm(norm, phase)
self._add_default_loop(phase)
previous_goal = None
for goal in phase.goals:
self._process_goal(goal, phase, previous_goal)
previous_goal = goal
for trigger in phase.triggers:
self._process_trigger(trigger, phase)
def _add_phase_transition(self, from_phase: Phase | None, to_phase: Phase | None) -> None:
if from_phase is None:
return
from_phase_ast = self._astify(from_phase)
to_phase_ast = (
self._astify(to_phase) if to_phase else AstLiteral("phase", [AstString("end")])
)
context = [from_phase_ast, ~AstLiteral("responded_this_turn")]
if from_phase and from_phase.goals:
context.append(self._astify(from_phase.goals[-1], achieved=True))
body = [
AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast),
AstStatement(StatementType.ADD_BELIEF, to_phase_ast),
]
if from_phase:
body.extend(
[
AstStatement(
StatementType.TEST_GOAL, AstLiteral("user_said", [AstVar("Message")])
),
AstStatement(
StatementType.REPLACE_BELIEF, AstLiteral("user_said", [AstVar("Message")])
),
]
)
self._asp.plans.append(
AstPlan(TriggerType.ADDED_GOAL, AstLiteral("transition_phase"), context, body)
)
def _process_norm(self, norm: Norm, phase: Phase) -> None:
rule: AstRule | None = None
match norm:
case ConditionalNorm(condition=cond):
rule = AstRule(self._astify(norm), self._astify(phase) & self._astify(cond))
case BasicNorm():
rule = AstRule(self._astify(norm), self._astify(phase))
if not rule:
return
self._asp.rules.append(rule)
def _add_default_loop(self, phase: Phase) -> None:
actions = []
actions.append(AstStatement(StatementType.REMOVE_BELIEF, AstLiteral("responded_this_turn")))
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("check_triggers")))
for goal in phase.goals:
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, self._astify(goal)))
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("transition_phase")))
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_BELIEF,
AstLiteral("user_said", [AstVar("Message")]),
[self._astify(phase)],
actions,
)
)
def _process_goal(
self,
goal: Goal,
phase: Phase,
previous_goal: Goal | None = None,
continues_response: bool = False,
) -> None:
context: list[AstExpression] = [self._astify(phase)]
context.append(~self._astify(goal, achieved=True))
if previous_goal and previous_goal.can_fail:
context.append(self._astify(previous_goal, achieved=True))
if not continues_response:
context.append(~AstLiteral("responded_this_turn"))
body = []
subgoals = []
for step in goal.plan.steps:
body.append(self._step_to_statement(step))
if isinstance(step, Goal):
subgoals.append(step)
if not goal.can_fail and not continues_response:
body.append(AstStatement(StatementType.ADD_BELIEF, self._astify(goal, achieved=True)))
self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(goal), context, body))
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
self._astify(goal),
context=[],
body=[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
prev_goal = None
for subgoal in subgoals:
self._process_goal(subgoal, phase, prev_goal)
prev_goal = subgoal
def _step_to_statement(self, step: PlanElement) -> AstStatement:
match step:
case Goal() | SpeechAction() | LLMAction() as a:
return AstStatement(StatementType.ACHIEVE_GOAL, self._astify(a))
case GestureAction() as a:
return AstStatement(StatementType.DO_ACTION, self._astify(a))
# TODO: separate handling of keyword and others
def _process_trigger(self, trigger: Trigger, phase: Phase) -> None:
body = []
subgoals = []
for step in trigger.plan.steps:
body.append(self._step_to_statement(step))
if isinstance(step, Goal):
step.can_fail = False # triggers are continuous sequence
subgoals.append(step)
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("check_triggers"),
[self._astify(phase), self._astify(trigger.condition)],
body,
)
)
for subgoal in subgoals:
self._process_goal(subgoal, phase, continues_response=True)
def _add_fallbacks(self):
# Trigger fallback
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("check_triggers"),
[],
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
# Phase transition fallback
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("transition_phase"),
[],
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
@singledispatchmethod
def _astify(self, element: ProgramElement) -> AstExpression:
raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.")
@_astify.register
def _(self, kwb: KeywordBelief) -> AstExpression:
return AstLiteral("keyword_said", [AstString(kwb.keyword)])
@_astify.register
def _(self, sb: SemanticBelief) -> AstExpression:
return AstLiteral(f"semantic_{self._slugify_str(sb.description)}")
@_astify.register
def _(self, ib: InferredBelief) -> AstExpression:
return AstBinaryOp(
self._astify(ib.left),
BinaryOperatorType.AND if ib.operator == LogicalOperator.AND else BinaryOperatorType.OR,
self._astify(ib.right),
)
@_astify.register
def _(self, norm: Norm) -> AstExpression:
functor = "critical_norm" if norm.critical else "norm"
return AstLiteral(functor, [AstString(norm.norm)])
@_astify.register
def _(self, phase: Phase) -> AstExpression:
return AstLiteral("phase", [AstString(str(phase.id))])
@_astify.register
def _(self, goal: Goal, achieved: bool = False) -> AstExpression:
return AstLiteral(f"{'achieved_' if achieved else ''}{self._slugify_str(goal.name)}")
@_astify.register
def _(self, sa: SpeechAction) -> AstExpression:
return AstLiteral("say", [AstString(sa.text)])
@_astify.register
def _(self, ga: GestureAction) -> AstExpression:
gesture = ga.gesture
return AstLiteral("gesture", [AstString(gesture.type), AstString(gesture.name)])
@_astify.register
def _(self, la: LLMAction) -> AstExpression:
return AstLiteral("reply_with_goal", [AstString(la.goal)])
@staticmethod
def _slugify_str(text: str) -> str:
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])

View File

@@ -0,0 +1,203 @@
import typing
from dataclasses import dataclass, field
# --- Types ---
@dataclass
class BeliefLiteral:
"""
Represents a literal or atom.
Example: phase(1), user_said("hello"), ~started
"""
functor: str
args: list[str] = field(default_factory=list)
negated: bool = False
def __str__(self):
# In ASL, 'not' is usually for closed-world assumption (prolog style),
# '~' is for explicit negation in beliefs.
# For simplicity in behavior trees, we often use 'not' for conditions.
prefix = "not " if self.negated else ""
if not self.args:
return f"{prefix}{self.functor}"
# Clean args to ensure strings are quoted if they look like strings,
# but usually the converter handles the quoting of string literals.
args_str = ", ".join(self.args)
return f"{prefix}{self.functor}({args_str})"
@dataclass
class GoalLiteral:
name: str
def __str__(self):
return f"!{self.name}"
@dataclass
class ActionLiteral:
"""
Represents a step in a plan body.
Example: .say("Hello") or !achieve_goal
"""
code: str
def __str__(self):
return self.code
@dataclass
class BinaryOp:
"""
Represents logical operations.
Example: (A & B) | C
"""
left: "Expression | str"
operator: typing.Literal["&", "|"]
right: "Expression | str"
def __str__(self):
l_str = str(self.left)
r_str = str(self.right)
if isinstance(self.left, BinaryOp):
l_str = f"({l_str})"
if isinstance(self.right, BinaryOp):
r_str = f"({r_str})"
return f"{l_str} {self.operator} {r_str}"
Literal = BeliefLiteral | GoalLiteral | ActionLiteral
Expression = Literal | BinaryOp | str
@dataclass
class Rule:
"""
Represents an inference rule.
Example: head :- body.
"""
head: Expression
body: Expression | None = None
def __str__(self):
if not self.body:
return f"{self.head}."
return f"{self.head} :- {self.body}."
@dataclass
class PersistentRule:
"""
Represents an inference rule, where the inferred belief is persistent when formed.
"""
head: Expression
body: Expression
def __str__(self):
if not self.body:
raise Exception("Rule without body should not be persistent.")
lines = []
if isinstance(self.body, BinaryOp):
lines.append(f"+{self.body.left}")
if self.body.operator == "&":
lines.append(f" : {self.body.right}")
lines.append(f" <- +{self.head}.")
if self.body.operator == "|":
lines.append(f"+{self.body.right}")
lines.append(f" <- +{self.head}.")
return "\n".join(lines)
@dataclass
class Plan:
"""
Represents a plan.
Syntax: +trigger : context <- body.
"""
trigger: BeliefLiteral | GoalLiteral
context: list[Expression] = field(default_factory=list)
body: list[ActionLiteral] = field(default_factory=list)
def __str__(self):
# Indentation settings
INDENT = " "
ARROW = "\n <- "
COLON = "\n : "
# Build Header
header = f"+{self.trigger}"
if self.context:
ctx_str = f" &\n{INDENT}".join(str(c) for c in self.context)
header += f"{COLON}{ctx_str}"
# Case 1: Empty body
if not self.body:
return f"{header}."
# Case 2: Short body (optional optimization, keeping it uniform usually better)
header += ARROW
lines = []
# We start the first action on the same line or next line.
# Let's put it on the next line for readability if there are multiple.
if len(self.body) == 1:
return f"{header}{self.body[0]}."
# First item
lines.append(f"{header}{self.body[0]};")
# Middle items
for item in self.body[1:-1]:
lines.append(f"{INDENT}{item};")
# Last item
lines.append(f"{INDENT}{self.body[-1]}.")
return "\n".join(lines)
@dataclass
class AgentSpeakFile:
"""
Root element representing the entire generated file.
"""
initial_beliefs: list[Rule] = field(default_factory=list)
inference_rules: list[Rule | PersistentRule] = field(default_factory=list)
plans: list[Plan] = field(default_factory=list)
def __str__(self):
sections = []
if self.initial_beliefs:
sections.append("// --- Initial Beliefs & Facts ---")
sections.extend(str(rule) for rule in self.initial_beliefs)
sections.append("")
if self.inference_rules:
sections.append("// --- Inference Rules ---")
sections.extend(str(rule) for rule in self.inference_rules if isinstance(rule, Rule))
sections.append("")
sections.extend(
str(rule) for rule in self.inference_rules if isinstance(rule, PersistentRule)
)
sections.append("")
if self.plans:
sections.append("// --- Plans ---")
# Separate plans by a newline for readability
sections.extend(str(plan) + "\n" for plan in self.plans)
return "\n".join(sections)

View File

@@ -0,0 +1,425 @@
import asyncio
import time
from functools import singledispatchmethod
from slugify import slugify
from control_backend.agents.bdi import BDICoreAgent
from control_backend.agents.bdi.asl_ast import (
ActionLiteral,
AgentSpeakFile,
BeliefLiteral,
BinaryOp,
Expression,
GoalLiteral,
PersistentRule,
Plan,
Rule,
)
from control_backend.agents.bdi.bdi_program_manager import test_program
from control_backend.schemas.program import (
BasicBelief,
Belief,
ConditionalNorm,
GestureAction,
Goal,
InferredBelief,
KeywordBelief,
LLMAction,
LogicalOperator,
Phase,
Program,
ProgramElement,
SemanticBelief,
SpeechAction,
)
async def do_things():
res = input("Wanna generate")
if res == "y":
program = AgentSpeakGenerator().generate(test_program)
filename = f"{int(time.time())}.asl"
with open(filename, "w") as f:
f.write(program)
else:
# filename = "0test.asl"
filename = "1766062491.asl"
bdi_agent = BDICoreAgent("BDICoreAgent", filename)
flag = asyncio.Event()
await bdi_agent.start()
await flag.wait()
def do_other_things():
print(AgentSpeakGenerator().generate(test_program))
class AgentSpeakGenerator:
"""
Converts a Pydantic Program behavior model into an AgentSpeak(L) AST,
then renders it to a string.
"""
def generate(self, program: Program) -> str:
asl = AgentSpeakFile()
self._generate_startup(program, asl)
for i, phase in enumerate(program.phases):
next_phase = program.phases[i + 1] if i < len(program.phases) - 1 else None
self._generate_phase_flow(phase, next_phase, asl)
self._generate_norms(phase, asl)
self._generate_goals(phase, asl)
self._generate_triggers(phase, asl)
self._generate_fallbacks(program, asl)
return str(asl)
# --- Section: Startup & Phase Management ---
def _generate_startup(self, program: Program, asl: AgentSpeakFile):
if not program.phases:
return
# Initial belief: phase(start).
asl.initial_beliefs.append(Rule(head=BeliefLiteral("phase", ['"start"'])))
# Startup plan: +started : phase(start) <- -phase(start); +phase(first_id).
asl.plans.append(
Plan(
trigger=BeliefLiteral("started"),
context=[BeliefLiteral("phase", ['"start"'])],
body=[
ActionLiteral('-phase("start")'),
ActionLiteral(f'+phase("{program.phases[0].id}")'),
],
)
)
# Initial plans:
asl.plans.append(
Plan(
trigger=GoalLiteral("generate_response_with_goal(Goal)"),
context=[BeliefLiteral("user_said", ["Message"])],
body=[
ActionLiteral("+responded_this_turn"),
ActionLiteral(".findall(Norm, norm(Norm), Norms)"),
ActionLiteral(".reply_with_goal(Message, Norms, Goal)"),
],
)
)
def _generate_phase_flow(self, phase: Phase, next_phase: Phase | None, asl: AgentSpeakFile):
"""Generates the main loop listener and the transition logic for this phase."""
# +user_said(Message) : phase(ID) <- !goal1; !goal2; !transition_phase.
goal_actions = [ActionLiteral("-responded_this_turn")]
goal_actions += [
ActionLiteral(f"!check_{self._slugify_str(keyword)}")
for keyword in self._get_keyword_conditionals(phase)
]
goal_actions += [ActionLiteral(f"!{self._slugify(g)}") for g in phase.goals]
goal_actions.append(ActionLiteral("!transition_phase"))
asl.plans.append(
Plan(
trigger=BeliefLiteral("user_said", ["Message"]),
context=[BeliefLiteral("phase", [f'"{phase.id}"'])],
body=goal_actions,
)
)
# +!transition_phase : phase(ID) <- -phase(ID); +(NEXT_ID).
next_id = str(next_phase.id) if next_phase else "end"
transition_context = [BeliefLiteral("phase", [f'"{phase.id}"'])]
if phase.goals:
transition_context.append(BeliefLiteral(f"achieved_{self._slugify(phase.goals[-1])}"))
asl.plans.append(
Plan(
trigger=GoalLiteral("transition_phase"),
context=transition_context,
body=[
ActionLiteral(f'-phase("{phase.id}")'),
ActionLiteral(f'+phase("{next_id}")'),
ActionLiteral("user_said(Anything)"),
ActionLiteral("-+user_said(Anything)"),
],
)
)
def _get_keyword_conditionals(self, phase: Phase) -> list[str]:
res = []
for belief in self._extract_basic_beliefs_from_phase(phase):
if isinstance(belief, KeywordBelief):
res.append(belief.keyword)
return res
# --- Section: Norms & Beliefs ---
def _generate_norms(self, phase: Phase, asl: AgentSpeakFile):
for norm in phase.norms:
norm_slug = f'"{norm.norm}"'
head = BeliefLiteral("norm", [norm_slug])
# Base context is the phase
phase_lit = BeliefLiteral("phase", [f'"{phase.id}"'])
if isinstance(norm, ConditionalNorm):
self._ensure_belief_inference(norm.condition, asl)
condition_expr = self._belief_to_expr(norm.condition)
body = BinaryOp(phase_lit, "&", condition_expr)
else:
body = phase_lit
asl.inference_rules.append(Rule(head=head, body=body))
def _ensure_belief_inference(self, belief: Belief, asl: AgentSpeakFile):
"""
Recursively adds rules to infer beliefs.
Checks strictly to avoid duplicates if necessary,
though ASL engines often handle redefinition or we can use a set to track processed IDs.
"""
if isinstance(belief, KeywordBelief):
pass
# # Rule: keyword_said("word") :- user_said(M) & .substring("word", M, P) & P >= 0.
# kwd_slug = f'"{belief.keyword}"'
# head = BeliefLiteral("keyword_said", [kwd_slug])
#
# # Avoid duplicates
# if any(str(r.head) == str(head) for r in asl.inference_rules):
# return
#
# body = BinaryOp(
# BeliefLiteral("user_said", ["Message"]),
# "&",
# BinaryOp(f".substring({kwd_slug}, Message, Pos)", "&", "Pos >= 0"),
# )
#
# asl.inference_rules.append(Rule(head=head, body=body))
elif isinstance(belief, InferredBelief):
self._ensure_belief_inference(belief.left, asl)
self._ensure_belief_inference(belief.right, asl)
slug = self._slugify(belief)
head = BeliefLiteral(slug)
if any(str(r.head) == str(head) for r in asl.inference_rules):
return
op_char = "&" if belief.operator == LogicalOperator.AND else "|"
body = BinaryOp(
self._belief_to_expr(belief.left), op_char, self._belief_to_expr(belief.right)
)
asl.inference_rules.append(PersistentRule(head=head, body=body))
def _belief_to_expr(self, belief: Belief) -> Expression:
if isinstance(belief, KeywordBelief):
return BeliefLiteral("keyword_said", [f'"{belief.keyword}"'])
else:
return BeliefLiteral(self._slugify(belief))
# --- Section: Goals ---
def _generate_goals(self, phase: Phase, asl: AgentSpeakFile):
previous_goal: Goal | None = None
for goal in phase.goals:
self._generate_goal_plan_recursive(goal, str(phase.id), previous_goal, asl)
previous_goal = goal
def _generate_goal_plan_recursive(
self,
goal: Goal,
phase_id: str,
previous_goal: Goal | None,
asl: AgentSpeakFile,
responded_needed: bool = True,
can_fail: bool = True,
):
goal_slug = self._slugify(goal)
# phase(ID) & not responded_this_turn & not achieved_goal
context = [
BeliefLiteral("phase", [f'"{phase_id}"']),
]
if responded_needed:
context.append(BeliefLiteral("responded_this_turn", negated=True))
if can_fail:
context.append(BeliefLiteral(f"achieved_{goal_slug}", negated=True))
if previous_goal:
prev_slug = self._slugify(previous_goal)
context.append(BeliefLiteral(f"achieved_{prev_slug}"))
body_actions = []
sub_goals_to_process = []
for step in goal.plan.steps:
if isinstance(step, Goal):
sub_slug = self._slugify(step)
body_actions.append(ActionLiteral(f"!{sub_slug}"))
sub_goals_to_process.append(step)
elif isinstance(step, SpeechAction):
body_actions.append(ActionLiteral(f'.say("{step.text}")'))
elif isinstance(step, GestureAction):
body_actions.append(ActionLiteral(f'.gesture("{step.gesture}")'))
elif isinstance(step, LLMAction):
body_actions.append(ActionLiteral(f'!generate_response_with_goal("{step.goal}")'))
# Mark achievement
if not goal.can_fail:
body_actions.append(ActionLiteral(f"+achieved_{goal_slug}"))
asl.plans.append(Plan(trigger=GoalLiteral(goal_slug), context=context, body=body_actions))
asl.plans.append(
Plan(trigger=GoalLiteral(goal_slug), context=[], body=[ActionLiteral("true")])
)
prev_sub = None
for sub_goal in sub_goals_to_process:
self._generate_goal_plan_recursive(sub_goal, phase_id, prev_sub, asl)
prev_sub = sub_goal
# --- Section: Triggers ---
def _generate_triggers(self, phase: Phase, asl: AgentSpeakFile):
for keyword in self._get_keyword_conditionals(phase):
asl.plans.append(
Plan(
trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"),
context=[
ActionLiteral(
f'user_said(Message) & .substring("{keyword}", Message, Pos) & Pos >= 0'
)
],
body=[
ActionLiteral(f'+keyword_said("{keyword}")'),
ActionLiteral(f'-keyword_said("{keyword}")'),
],
)
)
asl.plans.append(
Plan(
trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"),
body=[ActionLiteral("true")],
)
)
for trigger in phase.triggers:
self._ensure_belief_inference(trigger.condition, asl)
trigger_belief_slug = self._belief_to_expr(trigger.condition)
body_actions = []
sub_goals = []
for step in trigger.plan.steps:
if isinstance(step, Goal):
sub_slug = self._slugify(step)
body_actions.append(ActionLiteral(f"!{sub_slug}"))
sub_goals.append(step)
elif isinstance(step, SpeechAction):
body_actions.append(ActionLiteral(f'.say("{step.text}")'))
elif isinstance(step, GestureAction):
body_actions.append(
ActionLiteral(f'.gesture("{step.gesture.type}", "{step.gesture.name}")')
)
elif isinstance(step, LLMAction):
body_actions.append(
ActionLiteral(f'!generate_response_with_goal("{step.goal}")')
)
asl.plans.append(
Plan(
trigger=BeliefLiteral(trigger_belief_slug),
context=[BeliefLiteral("phase", [f'"{phase.id}"'])],
body=body_actions,
)
)
# Recurse for triggered goals
prev_sub = None
for sub_goal in sub_goals:
self._generate_goal_plan_recursive(
sub_goal, str(phase.id), prev_sub, asl, False, False
)
prev_sub = sub_goal
# --- Section: Fallbacks ---
def _generate_fallbacks(self, program: Program, asl: AgentSpeakFile):
asl.plans.append(
Plan(trigger=GoalLiteral("transition_phase"), context=[], body=[ActionLiteral("true")])
)
# --- Helpers ---
@singledispatchmethod
def _slugify(self, element: ProgramElement) -> str:
if element.name:
raise NotImplementedError("Cannot slugify this element.")
return self._slugify_str(element.name)
@_slugify.register
def _(self, goal: Goal) -> str:
if goal.name:
return self._slugify_str(goal.name)
return f"goal_{goal.id.hex}"
@_slugify.register
def _(self, kwb: KeywordBelief) -> str:
return f"keyword_said({kwb.keyword})"
@_slugify.register
def _(self, sb: SemanticBelief) -> str:
return self._slugify_str(sb.description)
@_slugify.register
def _(self, ib: InferredBelief) -> str:
return self._slugify_str(ib.name)
def _slugify_str(self, text: str) -> str:
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])
def _extract_basic_beliefs_from_program(self, program: Program) -> list[BasicBelief]:
beliefs = []
for phase in program.phases:
beliefs.extend(self._extract_basic_beliefs_from_phase(phase))
return beliefs
def _extract_basic_beliefs_from_phase(self, phase: Phase) -> list[BasicBelief]:
beliefs = []
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += self._extract_basic_beliefs_from_belief(norm.condition)
for trigger in phase.triggers:
beliefs += self._extract_basic_beliefs_from_belief(trigger.condition)
return beliefs
def _extract_basic_beliefs_from_belief(self, belief: Belief) -> list[BasicBelief]:
if isinstance(belief, InferredBelief):
return self._extract_basic_beliefs_from_belief(
belief.left
) + self._extract_basic_beliefs_from_belief(belief.right)
return [belief]
if __name__ == "__main__":
asyncio.run(do_things())
# do_other_things()y

View File

@@ -42,13 +42,13 @@ class BDICoreAgent(BaseAgent):
bdi_agent: agentspeak.runtime.Agent
def __init__(self, name: str, asl: str):
def __init__(self, name: str):
super().__init__(name)
self.asl_file = asl
self.env = agentspeak.runtime.Environment()
# Deep copy because we don't actually want to modify the standard actions globally
self.actions = copy.deepcopy(agentspeak.stdlib.actions)
self._wake_bdi_loop = asyncio.Event()
self._bdi_loop_task = None
async def setup(self) -> None:
"""
@@ -65,19 +65,22 @@ class BDICoreAgent(BaseAgent):
await self._load_asl()
# Start the BDI cycle loop
self.add_behavior(self._bdi_loop())
self._bdi_loop_task = self.add_behavior(self._bdi_loop())
self._wake_bdi_loop.set()
self.logger.debug("Setup complete.")
async def _load_asl(self):
async def _load_asl(self, file_name: str | None = None) -> None:
"""
Load and parse the AgentSpeak source file.
"""
file_name = file_name or "src/control_backend/agents/bdi/default_behavior.asl"
try:
with open(self.asl_file) as source:
with open(file_name) as source:
self.bdi_agent = self.env.build_agent(source, self.actions)
self.logger.info(f"Loaded new ASL from {file_name}.")
except FileNotFoundError:
self.logger.warning(f"Could not find the specified ASL file at {self.asl_file}.")
self.logger.warning(f"Could not find the specified ASL file at {file_name}.")
self.bdi_agent = agentspeak.runtime.Agent(self.env, self.name)
async def _bdi_loop(self):
@@ -116,6 +119,7 @@ class BDICoreAgent(BaseAgent):
Handle incoming messages.
- **Beliefs**: Updates the internal belief base.
- **Program**: Updates the internal agentspeak file to match the current program.
- **LLM Responses**: Forwards the generated text to the Robot Speech Agent (actuation).
:param msg: The received internal message.
@@ -130,6 +134,13 @@ class BDICoreAgent(BaseAgent):
self.logger.exception("Error processing belief.")
return
# New agentspeak file
if msg.thread == "new_program":
if self._bdi_loop_task:
self._bdi_loop_task.cancel()
await self._load_asl(msg.body)
self.add_behavior(self._bdi_loop())
# The message was not a belief, handle special cases based on sender
match msg.sender:
case settings.agent_settings.llm_name:
@@ -160,7 +171,7 @@ class BDICoreAgent(BaseAgent):
self._remove_all_with_name(belief.name)
self._add_belief(belief.name, belief.arguments)
def _add_belief(self, name: str, args: Iterable[str] = []):
def _add_belief(self, name: str, args: list[str] = None):
"""
Add a single belief to the BDI agent.
@@ -168,9 +179,13 @@ class BDICoreAgent(BaseAgent):
:param args: Arguments for the belief.
"""
# new_args = (agentspeak.Literal(arg) for arg in args) # TODO: Eventually support multiple
merged_args = DELIMITER.join(arg for arg in args)
new_args = (agentspeak.Literal(merged_args),)
term = agentspeak.Literal(name, new_args)
args = args or []
if args:
merged_args = DELIMITER.join(arg for arg in args)
new_args = (agentspeak.Literal(merged_args),)
term = agentspeak.Literal(name, new_args)
else:
term = agentspeak.Literal(name)
self.bdi_agent.call(
agentspeak.Trigger.addition,
@@ -235,32 +250,86 @@ class BDICoreAgent(BaseAgent):
the function expects (which will be located in `term.args`).
"""
@self.actions.add(".reply", 3)
@self.actions.add(".reply", 2)
def _reply(agent: "BDICoreAgent", term, intention):
"""
Sends text to the LLM (AgentSpeak action).
Example: .reply("Hello LLM!", "Some norm", "Some goal")
Let the LLM generate a response to a user's utterance with the current norms and goals.
"""
message_text = agentspeak.grounded(term.args[0], intention.scope)
norms = agentspeak.grounded(term.args[1], intention.scope)
goals = agentspeak.grounded(term.args[2], intention.scope)
self.logger.debug("Norms: %s", norms)
self.logger.debug("Goals: %s", goals)
self.logger.debug("User text: %s", message_text)
asyncio.create_task(self._send_to_llm(str(message_text), str(norms), str(goals)))
self.add_behavior(self._send_to_llm(str(message_text), str(norms), ""))
yield
async def _send_to_llm(self, text: str, norms: str = None, goals: str = None):
@self.actions.add(".reply_with_goal", 3)
def _reply_with_goal(agent: "BDICoreAgent", term, intention):
"""
Let the LLM generate a response to a user's utterance with the current norms and a
specific goal.
"""
message_text = agentspeak.grounded(term.args[0], intention.scope)
norms = agentspeak.grounded(term.args[1], intention.scope)
goal = agentspeak.grounded(term.args[2], intention.scope)
self.logger.debug(
'"reply_with_goal" action called with message=%s, norms=%s, goal=%s',
message_text,
norms,
goal,
)
self.add_behavior(self._send_to_llm(str(message_text), str(norms), str(goal)))
yield
@self.actions.add(".say", 1)
def _say(agent: "BDICoreAgent", term, intention):
"""
Make the robot say the given text instantly.
"""
message_text = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug('"say" action called with text=%s', message_text)
speech_command = SpeechCommand(data=message_text)
speech_message = InternalMessage(
to=settings.agent_settings.robot_speech_name,
sender=settings.agent_settings.bdi_core_name,
body=speech_command.model_dump_json(),
)
# TODO: add to conversation history
self.add_behavior(self.send(speech_message))
yield
@self.actions.add(".gesture", 2)
def _gesture(agent: "BDICoreAgent", term, intention):
"""
Make the robot perform the given gesture instantly.
"""
gesture_type = agentspeak.grounded(term.args[0], intention.scope)
gesture_name = agentspeak.grounded(term.args[1], intention.scope)
self.logger.debug(
'"gesture" action called with type=%s, name=%s',
gesture_type,
gesture_name,
)
# gesture = Gesture(type=gesture_type, name=gesture_name)
# gesture_message = InternalMessage(
# to=settings.agent_settings.robot_gesture_name,
# sender=settings.agent_settings.bdi_core_name,
# body=gesture.model_dump_json(),
# )
# asyncio.create_task(agent.send(gesture_message))
yield
async def _send_to_llm(self, text: str, norms: str, goals: str):
"""
Sends a text query to the LLM agent asynchronously.
"""
prompt = LLMPromptMessage(
text=text,
norms=norms.split("\n") if norms else [],
goals=goals.split("\n") if norms else [],
)
prompt = LLMPromptMessage(text=text, norms=norms.split("\n"), goals=goals.split("\n"))
msg = InternalMessage(
to=settings.agent_settings.llm_name,
sender=self.name,

View File

@@ -3,9 +3,9 @@ from pydantic import ValidationError
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief, BeliefMessage
from control_backend.schemas.internal_message import InternalMessage
from control_backend.schemas.program import Program
@@ -25,7 +25,7 @@ class BDIProgramManager(BaseAgent):
super().__init__(**kwargs)
self.sub_socket = None
async def _send_to_bdi(self, program: Program):
async def _create_agentspeak_and_send_to_bdi(self, program: Program):
"""
Convert a received program into BDI beliefs and send them to the BDI Core Agent.
@@ -38,27 +38,23 @@ class BDIProgramManager(BaseAgent):
:param program: The program object received from the API.
"""
first_phase = program.phases[0]
norms_belief = Belief(
name="norms",
arguments=[norm.norm for norm in first_phase.norms],
replace=True,
)
goals_belief = Belief(
name="goals",
arguments=[goal.description for goal in first_phase.goals],
replace=True,
)
program_beliefs = BeliefMessage(beliefs=[norms_belief, goals_belief])
asg = AgentSpeakGenerator()
message = InternalMessage(
to=settings.agent_settings.bdi_core_name,
asl_str = asg.generate(program)
file_name = "src/control_backend/agents/bdi/agentspeak.asl"
with open(file_name, "w") as f:
f.write(asl_str)
msg = InternalMessage(
sender=self.name,
body=program_beliefs.model_dump_json(),
thread="beliefs",
to=settings.agent_settings.bdi_core_name,
body=file_name,
thread="new_program",
)
await self.send(message)
self.logger.debug("Sent new norms and goals to the BDI agent.")
await self.send(msg)
async def _receive_programs(self):
"""
@@ -76,7 +72,7 @@ class BDIProgramManager(BaseAgent):
self.logger.exception("Received an invalid program.")
continue
await self._send_to_bdi(program)
await self._create_agentspeak_and_send_to_bdi(program)
async def setup(self):
"""

View File

@@ -101,7 +101,7 @@ class BDIBeliefCollectorAgent(BaseAgent):
:return: A Belief object if the input is valid or None.
"""
try:
return Belief(name=name, arguments=arguments)
return Belief(name=name, arguments=arguments, replace=name == "user_said")
except ValidationError:
return None

View File

@@ -0,0 +1,5 @@
norms("").
+user_said(Message) : norms(Norms) <-
-user_said(Message);
.reply(Message, Norms).

View File

@@ -1,6 +0,0 @@
norms("").
goals("").
+user_said(Message) : norms(Norms) & goals(Goals) <-
-user_said(Message);
.reply(Message, Norms, Goals).

View File

@@ -117,7 +117,6 @@ async def lifespan(app: FastAPI):
BDICoreAgent,
{
"name": settings.agent_settings.bdi_core_name,
"asl": "src/control_backend/agents/bdi/rules.asl",
},
),
"BeliefCollectorAgent": (

View File

@@ -1,64 +1,202 @@
from pydantic import BaseModel
from enum import Enum
from typing import Literal
from pydantic import UUID4, BaseModel
class Norm(BaseModel):
class ProgramElement(BaseModel):
"""
Represents a behavioral norm.
Represents a basic element of our behavior program.
:ivar name: The researcher-assigned name of the element.
:ivar id: Unique identifier.
:ivar label: Human-readable label.
:ivar norm: The actual norm text describing the behavior.
"""
id: str
label: str
norm: str
name: str
id: UUID4
class Goal(BaseModel):
class LogicalOperator(Enum):
AND = "AND"
OR = "OR"
type Belief = KeywordBelief | SemanticBelief | InferredBelief
type BasicBelief = KeywordBelief | SemanticBelief
class KeywordBelief(ProgramElement):
"""
Represents an objective to be achieved.
Represents a belief that is set when the user spoken text contains a certain keyword.
:ivar id: Unique identifier.
:ivar label: Human-readable label.
:ivar description: Detailed description of the goal.
:ivar achieved: Status flag indicating if the goal has been met.
:ivar keyword: The keyword on which this belief gets set.
"""
id: str
label: str
description: str
achieved: bool
class TriggerKeyword(BaseModel):
id: str
name: str = ""
keyword: str
class KeywordTrigger(BaseModel):
id: str
label: str
type: str
keywords: list[TriggerKeyword]
class SemanticBelief(ProgramElement):
"""
Represents a belief that is set by semantic LLM validation.
:ivar description: Description of how to form the belief, used by the LLM.
"""
name: str = ""
description: str
class Phase(BaseModel):
class InferredBelief(ProgramElement):
"""
Represents a belief that gets formed by combining two beliefs with a logical AND or OR.
These beliefs can also be :class:`InferredBelief`, leading to arbitrarily deep nesting.
:ivar operator: The logical operator to apply.
:ivar left: The left part of the logical expression.
:ivar right: The right part of the logical expression.
"""
name: str = ""
operator: LogicalOperator
left: Belief
right: Belief
class Norm(ProgramElement):
name: str = ""
norm: str
critical: bool = False
class BasicNorm(Norm):
"""
Represents a behavioral norm.
:ivar norm: The actual norm text describing the behavior.
:ivar critical: When true, this norm should absolutely not be violated (checked separately).
"""
pass
class ConditionalNorm(Norm):
"""
Represents a norm that is only active when a condition is met (i.e., a certain belief holds).
:ivar condition: When to activate this norm.
"""
condition: Belief
type PlanElement = Goal | Action
class Plan(ProgramElement):
"""
Represents a list of steps to execute. Each of these steps can be a goal (with its own plan)
or a simple action.
:ivar steps: The actions or subgoals to execute, in order.
"""
name: str = ""
steps: list[PlanElement]
class Goal(ProgramElement):
"""
Represents an objective to be achieved. To reach the goal, we should execute
the corresponding plan. If we can fail to achieve a goal after executing the plan,
for example when the achieving of the goal is dependent on the user's reply, this means
that the achieved status will be set from somewhere else in the program.
:ivar plan: The plan to execute.
:ivar can_fail: Whether we can fail to achieve the goal after executing the plan.
"""
plan: Plan
can_fail: bool = True
type Action = SpeechAction | GestureAction | LLMAction
class SpeechAction(ProgramElement):
"""
Represents the action of the robot speaking a literal text.
:ivar text: The text to speak.
"""
name: str = ""
text: str
class Gesture(BaseModel):
"""
Represents a gesture to be performed. Can be either a single gesture,
or a random gesture from a category (tag).
:ivar type: The type of the gesture, "tag" or "single".
:ivar name: The name of the single gesture or tag.
"""
type: Literal["tag", "single"]
name: str
class GestureAction(ProgramElement):
"""
Represents the action of the robot performing a gesture.
:ivar gesture: The gesture to perform.
"""
name: str = ""
gesture: Gesture
class LLMAction(ProgramElement):
"""
Represents the action of letting an LLM generate a reply based on its chat history
and an additional goal added in the prompt.
:ivar goal: The extra (temporary) goal to add to the LLM.
"""
name: str = ""
goal: str
class Trigger(ProgramElement):
"""
Represents a belief-based trigger. When a belief is set, the corresponding plan is executed.
:ivar condition: When to activate the trigger.
:ivar plan: The plan to execute.
"""
name: str = ""
condition: Belief
plan: Plan
class Phase(ProgramElement):
"""
A distinct phase within a program, containing norms, goals, and triggers.
:ivar id: Unique identifier.
:ivar label: Human-readable label.
:ivar norms: List of norms active in this phase.
:ivar goals: List of goals to pursue in this phase.
:ivar triggers: List of triggers that define transitions out of this phase.
"""
id: str
label: str
norms: list[Norm]
name: str = ""
norms: list[BasicNorm | ConditionalNorm]
goals: list[Goal]
triggers: list[KeywordTrigger]
triggers: list[Trigger]
class Program(BaseModel):

View File

@@ -39,7 +39,7 @@ async def test_send_to_bdi():
manager.send = AsyncMock()
program = Program.model_validate_json(make_valid_program_json())
await manager._send_to_bdi(program)
await manager._create_agentspeak_and_send_to_bdi(program)
assert manager.send.await_count == 1
msg: InternalMessage = manager.send.await_args[0][0]
@@ -62,7 +62,7 @@ async def test_receive_programs_valid_and_invalid():
manager = BDIProgramManager(name="program_manager_test")
manager.sub_socket = sub
manager._send_to_bdi = AsyncMock()
manager._create_agentspeak_and_send_to_bdi = AsyncMock()
try:
# Will give StopAsyncIteration when the predefined `sub.recv_multipart` side-effects run out
@@ -71,7 +71,7 @@ async def test_receive_programs_valid_and_invalid():
pass
# Only valid Program should have triggered _send_to_bdi
assert manager._send_to_bdi.await_count == 1
forwarded: Program = manager._send_to_bdi.await_args[0][0]
assert manager._create_agentspeak_and_send_to_bdi.await_count == 1
forwarded: Program = manager._create_agentspeak_and_send_to_bdi.await_args[0][0]
assert forwarded.phases[0].norms[0].norm == "N1"
assert forwarded.phases[0].goals[0].description == "G1"

23
uv.lock generated
View File

@@ -997,6 +997,7 @@ dependencies = [
{ name = "pydantic" },
{ name = "pydantic-settings" },
{ name = "python-json-logger" },
{ name = "python-slugify" },
{ name = "pyyaml" },
{ name = "pyzmq" },
{ name = "silero-vad" },
@@ -1046,6 +1047,7 @@ requires-dist = [
{ name = "pydantic", specifier = ">=2.12.0" },
{ name = "pydantic-settings", specifier = ">=2.11.0" },
{ name = "python-json-logger", specifier = ">=4.0.0" },
{ name = "python-slugify", specifier = ">=8.0.4" },
{ name = "pyyaml", specifier = ">=6.0.3" },
{ name = "pyzmq", specifier = ">=27.1.0" },
{ name = "silero-vad", specifier = ">=6.0.0" },
@@ -1341,6 +1343,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/45/58/38b5afbc1a800eeea951b9285d3912613f2603bdf897a4ab0f4bd7f405fc/python_multipart-0.0.20-py3-none-any.whl", hash = "sha256:8a62d3a8335e06589fe01f2a3e178cdcc632f3fbe0d492ad9ee0ec35aab1f104", size = 24546, upload-time = "2024-12-16T19:45:44.423Z" },
]
[[package]]
name = "python-slugify"
version = "8.0.4"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "text-unidecode" },
]
sdist = { url = "https://files.pythonhosted.org/packages/87/c7/5e1547c44e31da50a460df93af11a535ace568ef89d7a811069ead340c4a/python-slugify-8.0.4.tar.gz", hash = "sha256:59202371d1d05b54a9e7720c5e038f928f45daaffe41dd10822f3907b937c856", size = 10921, upload-time = "2024-02-08T18:32:45.488Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a4/62/02da182e544a51a5c3ccf4b03ab79df279f9c60c5e82d5e8bec7ca26ac11/python_slugify-8.0.4-py2.py3-none-any.whl", hash = "sha256:276540b79961052b66b7d116620b36518847f52d5fd9e3a70164fc8c50faa6b8", size = 10051, upload-time = "2024-02-08T18:32:43.911Z" },
]
[[package]]
name = "pyyaml"
version = "6.0.3"
@@ -1864,6 +1878,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a2/09/77d55d46fd61b4a135c444fc97158ef34a095e5681d0a6c10b75bf356191/sympy-1.14.0-py3-none-any.whl", hash = "sha256:e091cc3e99d2141a0ba2847328f5479b05d94a6635cb96148ccb3f34671bd8f5", size = 6299353, upload-time = "2025-04-27T18:04:59.103Z" },
]
[[package]]
name = "text-unidecode"
version = "1.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ab/e2/e9a00f0ccb71718418230718b3d900e71a5d16e701a3dae079a21e9cd8f8/text-unidecode-1.3.tar.gz", hash = "sha256:bad6603bb14d279193107714b288be206cac565dfa49aa5b105294dd5c4aab93", size = 76885, upload-time = "2019-08-30T21:36:45.405Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a6/a5/c0b6468d3824fe3fde30dbb5e1f687b291608f9473681bbf7dabbf5a87d7/text_unidecode-1.3-py2.py3-none-any.whl", hash = "sha256:1311f10e8b895935241623731c2ba64f4c455287888b18189350b67134a822e8", size = 78154, upload-time = "2019-08-30T21:37:03.543Z" },
]
[[package]]
name = "tiktoken"
version = "0.12.0"