Compare commits

...

41 Commits

Author SHA1 Message Date
Twirre Meulenbelt
4b71981a3e fix: some bugs and some tests
ref: N25B-429
2026-01-12 09:00:50 +01:00
866d7c4958 fix: end phase loop correctly notifies about user_said
ref: N25B-429
2026-01-08 15:13:12 +01:00
133019a928 feat: trigger name and trigger checks on belief update
ref: N25B-429
2026-01-08 14:04:44 +01:00
4d0ba69443 fix: don't re-add user_said upon phase transition
ref: N25B-429
2026-01-08 13:44:25 +01:00
625ef0c365 feat: phase transition waits for all goals
ref: N25B-429
2026-01-08 13:36:03 +01:00
b88758fa76 feat: phase transition independent of response
ref: N25B-429
2026-01-08 13:33:37 +01:00
Twirre Meulenbelt
45719c580b feat: prepend more silence before speech audio for better transcription beginnings
ref: N25B-429
2026-01-08 10:49:13 +01:00
5a61225c6f feat: reset extractor history
ref: N25B-429
2026-01-07 18:10:13 +01:00
a30cea5231 Merge branch 'feat/semantic-beliefs' into feat/extra-agentspeak-functionality 2026-01-07 17:51:30 +01:00
Twirre Meulenbelt
93d67ccb66 feat: add reset functionality to semantic belief extractor
ref: N25B-432
2026-01-07 17:50:47 +01:00
240624f887 Merge branch 'dev' into feat/extra-agentspeak-functionality
# Conflicts:
#	src/control_backend/agents/bdi/bdi_program_manager.py
#	src/control_backend/agents/llm/llm_agent.py
#	test/unit/agents/bdi/test_bdi_program_manager.py
2026-01-07 17:46:48 +01:00
8a77e8e1c7 feat: check goals only for this phase
Since conversation history still remains we can still check at a later point.

ref: N25B-429
2026-01-07 17:31:24 +01:00
3b4dccc760 Merge branch 'feat/semantic-beliefs' into feat/extra-agentspeak-functionality
# Conflicts:
#	src/control_backend/agents/bdi/bdi_program_manager.py
2026-01-07 17:20:52 +01:00
3d49e44cf7 fix: complete pipeline working
User interrupts still need to be tested.

ref: N25B-429
2026-01-07 17:13:58 +01:00
Twirre Meulenbelt
aa5b386f65 feat: semantically determine goal completion
ref: N25B-432
2026-01-07 17:08:23 +01:00
Twirre Meulenbelt
3189b9fee3 fix: let belief extractor send user_said belief
ref: N25B-429
2026-01-07 15:19:23 +01:00
07d70cb781 fix: single dispatch order
ref: N25B-429
2026-01-07 13:02:23 +01:00
af832980c8 feat: general slugify method
ref: N25B-429
2026-01-07 12:24:46 +01:00
Twirre Meulenbelt
cabe35cdbd feat: integrate AgentSpeak with semantic belief extraction
ref: N25B-429
2026-01-07 11:44:48 +01:00
Twirre Meulenbelt
de8e829d3e Merge remote-tracking branch 'origin/feat/agentspeak-generation' into feat/semantic-beliefs
# Conflicts:
#	test/unit/agents/bdi/test_bdi_program_manager.py
2026-01-06 15:30:59 +01:00
Twirre Meulenbelt
3406e9ac2f feat: make the pipeline work with Program and AgentSpeak
ref: N25B-429
2026-01-06 15:26:44 +01:00
a357b6990b feat: send program to bdi core
ref: N25B-376
2026-01-06 12:11:37 +01:00
9eea4ee345 feat: new ASL generation
ref: N25B-376
2026-01-02 12:08:20 +01:00
Twirre Meulenbelt
42ee5c76d8 test: create tests for belief extractor agent
Includes changes in schemas. Change type of `norms` in `Program` imperceptibly, big changes in schema of `BeliefMessage` to support deleting beliefs.

ref: N25B-380
2025-12-29 17:12:02 +01:00
Twirre Meulenbelt
57b1276cb5 test: make tests work again after changing Program schema
ref: N25B-380
2025-12-29 12:31:51 +01:00
Twirre Meulenbelt
7e0dc9ce1c Merge remote-tracking branch 'origin/feat/agentspeak-generation' into feat/semantic-beliefs
# Conflicts:
#	src/control_backend/schemas/program.py
2025-12-23 17:36:39 +01:00
3253760ef1 feat: new AST representation
File names will be changed eventually.

ref: N25B-376
2025-12-23 17:30:35 +01:00
Twirre Meulenbelt
71cefdfef3 fix: add types to all config properties
ref: N25B-380
2025-12-23 17:14:49 +01:00
Twirre Meulenbelt
33501093a1 feat: extract semantic beliefs from conversation
ref: N25B-380
2025-12-23 17:09:58 +01:00
756e1f0dc5 feat: persistent rules and stuff
So ugly

ref: N25B-376
2025-12-18 14:33:42 +01:00
Twirre Meulenbelt
f91cec6708 fix: things in AgentSpeak, add custom actions
ref: N25B-376
2025-12-18 11:50:16 +01:00
28262eb27e fix: default case for plans
ref: N25B-376
2025-12-17 16:20:37 +01:00
1d36d2e089 feat: (hopefully) better intermediate representation
ref: N25B-376
2025-12-17 15:33:27 +01:00
742e36b94f chore: non-optional uuid id
ref: N25B-376
2025-12-17 14:30:14 +01:00
Twirre Meulenbelt
57fe3ae3f6 Merge remote-tracking branch 'origin/dev' into feat/agentspeak-generation 2025-12-17 13:20:14 +01:00
e704ec5ed4 feat: basic flow and phase transitions
ref: N25B-376
2025-12-16 17:00:32 +01:00
Twirre Meulenbelt
27f04f0958 style: use yield instead of returning arrays
ref: N25B-376
2025-12-16 16:11:01 +01:00
Twirre Meulenbelt
8cc177041a feat: add a second phase in test_program
ref: N25B-376
2025-12-16 15:12:22 +01:00
4a432a603f fix: separate trigger plan generation
ref: N25B-376
2025-12-16 14:12:04 +01:00
bab4800698 feat: add trigger generation
ref: N25B-376
2025-12-16 12:10:52 +01:00
d043c54336 refactor: program restructure
Also includes some AgentSpeak generation.

ref: N25B-376
2025-12-16 10:21:50 +01:00
32 changed files with 3175 additions and 327 deletions

View File

@@ -10,7 +10,7 @@ LLM_SETTINGS__LOCAL_LLM_URL="http://localhost:1234/v1/chat/completions"
LLM_SETTINGS__LOCAL_LLM_MODEL="gpt-oss"
# Number of non-speech chunks to wait before speech ended. A chunk is approximately 31 ms. Increasing this number allows longer pauses in speech, but also increases response time.
BEHAVIOUR_SETTINGS__VAD_NON_SPEECH_PATIENCE_CHUNKS=3
BEHAVIOUR_SETTINGS__VAD_NON_SPEECH_PATIENCE_CHUNKS=15
# Timeout in milliseconds for socket polling. Increase this number if network latency/jitter is high, often the case when using Wi-Fi. Perhaps 500 ms. A symptom of this issue is transcriptions getting cut off.
BEHAVIOUR_SETTINGS__SOCKET_POLLER_TIMEOUT_MS=100

View File

@@ -15,6 +15,7 @@ dependencies = [
"pydantic>=2.12.0",
"pydantic-settings>=2.11.0",
"python-json-logger>=4.0.0",
"python-slugify>=8.0.4",
"pyyaml>=6.0.3",
"pyzmq>=27.1.0",
"silero-vad>=6.0.0",

View File

@@ -0,0 +1,273 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import StrEnum
class AstNode(ABC):
"""
Abstract base class for all elements of an AgentSpeak program.
"""
@abstractmethod
def _to_agentspeak(self) -> str:
"""
Generates the AgentSpeak code string.
"""
pass
def __str__(self) -> str:
return self._to_agentspeak()
class AstExpression(AstNode, ABC):
"""
Intermediate class for anything that can be used in a logical expression.
"""
def __and__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.AND, _coalesce_expr(other))
def __or__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.OR, _coalesce_expr(other))
def __invert__(self) -> AstLogicalExpression:
if isinstance(self, AstLogicalExpression):
self.negated = not self.negated
return self
return AstLogicalExpression(self, negated=True)
type ExprCoalescible = AstExpression | str | int | float
def _coalesce_expr(value: ExprCoalescible) -> AstExpression:
if isinstance(value, AstExpression):
return value
if isinstance(value, str):
return AstString(value)
if isinstance(value, (int, float)):
return AstNumber(value)
raise TypeError(f"Cannot coalesce type {type(value)} into an AstTerm.")
@dataclass
class AstTerm(AstExpression, ABC):
"""
Base class for terms appearing inside literals.
"""
def __ge__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.GREATER_EQUALS, _coalesce_expr(other))
def __gt__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.GREATER_THAN, _coalesce_expr(other))
def __le__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.LESS_EQUALS, _coalesce_expr(other))
def __lt__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.LESS_THAN, _coalesce_expr(other))
def __eq__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.EQUALS, _coalesce_expr(other))
def __ne__(self, other: ExprCoalescible) -> AstBinaryOp:
return AstBinaryOp(self, BinaryOperatorType.NOT_EQUALS, _coalesce_expr(other))
@dataclass
class AstAtom(AstTerm):
"""
Grounded expression in all lowercase.
"""
value: str
def _to_agentspeak(self) -> str:
return self.value.lower()
@dataclass
class AstVar(AstTerm):
"""
Ungrounded variable expression. First letter capitalized.
"""
name: str
def _to_agentspeak(self) -> str:
return self.name.capitalize()
@dataclass
class AstNumber(AstTerm):
value: int | float
def _to_agentspeak(self) -> str:
return str(self.value)
@dataclass
class AstString(AstTerm):
value: str
def _to_agentspeak(self) -> str:
return f'"{self.value}"'
@dataclass
class AstLiteral(AstTerm):
functor: str
terms: list[AstTerm] = field(default_factory=list)
def _to_agentspeak(self) -> str:
if not self.terms:
return self.functor
args = ", ".join(map(str, self.terms))
return f"{self.functor}({args})"
class BinaryOperatorType(StrEnum):
AND = "&"
OR = "|"
GREATER_THAN = ">"
LESS_THAN = "<"
EQUALS = "=="
NOT_EQUALS = "\\=="
GREATER_EQUALS = ">="
LESS_EQUALS = "<="
@dataclass
class AstBinaryOp(AstExpression):
left: AstExpression
operator: BinaryOperatorType
right: AstExpression
def __post_init__(self):
self.left = _as_logical(self.left)
self.right = _as_logical(self.right)
def _to_agentspeak(self) -> str:
l_str = str(self.left)
r_str = str(self.right)
assert isinstance(self.left, AstLogicalExpression)
assert isinstance(self.right, AstLogicalExpression)
if isinstance(self.left.expression, AstBinaryOp) or self.left.negated:
l_str = f"({l_str})"
if isinstance(self.right.expression, AstBinaryOp) or self.right.negated:
r_str = f"({r_str})"
return f"{l_str} {self.operator.value} {r_str}"
@dataclass
class AstLogicalExpression(AstExpression):
expression: AstExpression
negated: bool = False
def _to_agentspeak(self) -> str:
expr_str = str(self.expression)
if isinstance(self.expression, AstBinaryOp) and self.negated:
expr_str = f"({expr_str})"
return f"{'not ' if self.negated else ''}{expr_str}"
def _as_logical(expr: AstExpression) -> AstLogicalExpression:
if isinstance(expr, AstLogicalExpression):
return expr
return AstLogicalExpression(expr)
class StatementType(StrEnum):
EMPTY = ""
DO_ACTION = "."
ACHIEVE_GOAL = "!"
TEST_GOAL = "?"
ADD_BELIEF = "+"
REMOVE_BELIEF = "-"
REPLACE_BELIEF = "-+"
@dataclass
class AstStatement(AstNode):
"""
A statement that can appear inside a plan.
"""
type: StatementType
expression: AstExpression
def _to_agentspeak(self) -> str:
return f"{self.type.value}{self.expression}"
@dataclass
class AstRule(AstNode):
result: AstExpression
condition: AstExpression | None = None
def __post_init__(self):
if self.condition is not None:
self.condition = _as_logical(self.condition)
def _to_agentspeak(self) -> str:
if not self.condition:
return f"{self.result}."
return f"{self.result} :- {self.condition}."
class TriggerType(StrEnum):
ADDED_BELIEF = "+"
# REMOVED_BELIEF = "-" # TODO
# MODIFIED_BELIEF = "^" # TODO
ADDED_GOAL = "+!"
# REMOVED_GOAL = "-!" # TODO
@dataclass
class AstPlan(AstNode):
type: TriggerType
trigger_literal: AstExpression
context: list[AstExpression]
body: list[AstStatement]
def _to_agentspeak(self) -> str:
assert isinstance(self.trigger_literal, AstLiteral)
indent = " " * 6
colon = " : "
arrow = " <- "
lines = []
lines.append(f"{self.type.value}{self.trigger_literal}")
if self.context:
lines.append(colon + f" &\n{indent}".join(str(c) for c in self.context))
if self.body:
lines.append(arrow + f";\n{indent}".join(str(s) for s in self.body) + ".")
lines.append("")
return "\n".join(lines)
@dataclass
class AstProgram(AstNode):
rules: list[AstRule] = field(default_factory=list)
plans: list[AstPlan] = field(default_factory=list)
def _to_agentspeak(self) -> str:
lines = []
lines.extend(map(str, self.rules))
lines.extend(["", ""])
lines.extend(map(str, self.plans))
return "\n".join(lines)

View File

@@ -0,0 +1,443 @@
from functools import singledispatchmethod
from slugify import slugify
from control_backend.agents.bdi.agentspeak_ast import (
AstBinaryOp,
AstExpression,
AstLiteral,
AstPlan,
AstProgram,
AstRule,
AstStatement,
AstString,
AstVar,
BinaryOperatorType,
StatementType,
TriggerType,
)
from control_backend.schemas.program import (
BasicNorm,
ConditionalNorm,
GestureAction,
Goal,
InferredBelief,
KeywordBelief,
LLMAction,
LogicalOperator,
Norm,
Phase,
PlanElement,
Program,
ProgramElement,
SemanticBelief,
SpeechAction,
Trigger,
)
class AgentSpeakGenerator:
_asp: AstProgram
def generate(self, program: Program) -> str:
self._asp = AstProgram()
self._asp.rules.append(AstRule(self._astify(program.phases[0])))
self._add_keyword_inference()
self._add_default_plans()
self._process_phases(program.phases)
self._add_fallbacks()
return str(self._asp)
def _add_keyword_inference(self) -> None:
keyword = AstVar("Keyword")
message = AstVar("Message")
position = AstVar("Pos")
self._asp.rules.append(
AstRule(
AstLiteral("keyword_said", [keyword]),
AstLiteral("user_said", [message])
& AstLiteral(".substring", [keyword, message, position])
& (position >= 0),
)
)
def _add_default_plans(self):
self._add_reply_with_goal_plan()
self._add_say_plan()
self._add_reply_plan()
def _add_reply_with_goal_plan(self):
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("reply_with_goal", [AstVar("Goal")]),
[AstLiteral("user_said", [AstVar("Message")])],
[
AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")),
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"findall",
[AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")],
),
),
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"reply_with_goal", [AstVar("Message"), AstVar("Norms"), AstVar("Goal")]
),
),
],
)
)
def _add_say_plan(self):
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("say", [AstVar("Text")]),
[],
[
AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")),
AstStatement(StatementType.DO_ACTION, AstLiteral("say", [AstVar("Text")])),
],
)
)
def _add_reply_plan(self):
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("reply"),
[AstLiteral("user_said", [AstVar("Message")])],
[
AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")),
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"findall",
[AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")],
),
),
AstStatement(
StatementType.DO_ACTION,
AstLiteral("reply", [AstVar("Message"), AstVar("Norms")]),
),
],
)
)
def _process_phases(self, phases: list[Phase]) -> None:
for curr_phase, next_phase in zip([None] + phases, phases + [None], strict=True):
if curr_phase:
self._process_phase(curr_phase)
self._add_phase_transition(curr_phase, next_phase)
# End phase behavior
# When deleting this, the entire `reply` plan and action can be deleted
self._asp.plans.append(
AstPlan(
type=TriggerType.ADDED_BELIEF,
trigger_literal=AstLiteral("user_said", [AstVar("Message")]),
context=[AstLiteral("phase", [AstString("end")])],
body=[
AstStatement(StatementType.DO_ACTION, AstLiteral("notify_user_said")),
AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("reply")),
],
)
)
def _process_phase(self, phase: Phase) -> None:
for norm in phase.norms:
self._process_norm(norm, phase)
self._add_default_loop(phase)
previous_goal = None
for goal in phase.goals:
self._process_goal(goal, phase, previous_goal, main_goal=True)
previous_goal = goal
for trigger in phase.triggers:
self._process_trigger(trigger, phase)
def _add_phase_transition(self, from_phase: Phase | None, to_phase: Phase | None) -> None:
if from_phase is None:
return
from_phase_ast = self._astify(from_phase)
to_phase_ast = (
self._astify(to_phase) if to_phase else AstLiteral("phase", [AstString("end")])
)
context = [from_phase_ast]
if from_phase:
for goal in from_phase.goals:
context.append(self._astify(goal, achieved=True))
body = [
AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast),
AstStatement(StatementType.ADD_BELIEF, to_phase_ast),
]
# if from_phase:
# body.extend(
# [
# AstStatement(
# StatementType.TEST_GOAL, AstLiteral("user_said", [AstVar("Message")])
# ),
# AstStatement(
# StatementType.REPLACE_BELIEF, AstLiteral("user_said", [AstVar("Message")])
# ),
# ]
# )
# Notify outside world about transition
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"notify_transition_phase",
[
AstString(str(from_phase.id)),
AstString(str(to_phase.id) if to_phase else "end"),
],
),
)
)
self._asp.plans.append(
AstPlan(TriggerType.ADDED_GOAL, AstLiteral("transition_phase"), context, body)
)
def _process_norm(self, norm: Norm, phase: Phase) -> None:
rule: AstRule | None = None
match norm:
case ConditionalNorm(condition=cond):
rule = AstRule(self._astify(norm), self._astify(phase) & self._astify(cond))
case BasicNorm():
rule = AstRule(self._astify(norm), self._astify(phase))
if not rule:
return
self._asp.rules.append(rule)
def _add_default_loop(self, phase: Phase) -> None:
actions = []
actions.append(
AstStatement(
StatementType.DO_ACTION, AstLiteral("notify_user_said", [AstVar("Message")])
)
)
actions.append(AstStatement(StatementType.REMOVE_BELIEF, AstLiteral("responded_this_turn")))
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("check_triggers")))
for goal in phase.goals:
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, self._astify(goal)))
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("transition_phase")))
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_BELIEF,
AstLiteral("user_said", [AstVar("Message")]),
[self._astify(phase)],
actions,
)
)
def _process_goal(
self,
goal: Goal,
phase: Phase,
previous_goal: Goal | None = None,
continues_response: bool = False,
main_goal: bool = False,
) -> None:
context: list[AstExpression] = [self._astify(phase)]
context.append(~self._astify(goal, achieved=True))
if previous_goal and previous_goal.can_fail:
context.append(self._astify(previous_goal, achieved=True))
if not continues_response:
context.append(~AstLiteral("responded_this_turn"))
body = []
if main_goal: # UI only needs to know about the main goals
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral("notify_goal_start", [AstString(self.slugify(goal))]),
)
)
subgoals = []
for step in goal.plan.steps:
body.append(self._step_to_statement(step))
if isinstance(step, Goal):
subgoals.append(step)
if not goal.can_fail and not continues_response:
body.append(AstStatement(StatementType.ADD_BELIEF, self._astify(goal, achieved=True)))
self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(goal), context, body))
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
self._astify(goal),
context=[],
body=[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
prev_goal = None
for subgoal in subgoals:
self._process_goal(subgoal, phase, prev_goal)
prev_goal = subgoal
def _step_to_statement(self, step: PlanElement) -> AstStatement:
match step:
case Goal() | SpeechAction() | LLMAction() as a:
return AstStatement(StatementType.ACHIEVE_GOAL, self._astify(a))
case GestureAction() as a:
return AstStatement(StatementType.DO_ACTION, self._astify(a))
# TODO: separate handling of keyword and others
def _process_trigger(self, trigger: Trigger, phase: Phase) -> None:
body = []
subgoals = []
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral("notify_trigger_start", [AstString(self.slugify(trigger))]),
)
)
for step in trigger.plan.steps:
body.append(self._step_to_statement(step))
if isinstance(step, Goal):
step.can_fail = False # triggers are continuous sequence
subgoals.append(step)
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral("notify_trigger_end", [AstString(self.slugify(trigger))]),
)
)
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("check_triggers"),
[self._astify(phase), self._astify(trigger.condition)],
body,
)
)
# Force trigger (from UI)
self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(trigger), [], body))
for subgoal in subgoals:
self._process_goal(subgoal, phase, continues_response=True)
def _add_fallbacks(self):
# Trigger fallback
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("check_triggers"),
[],
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
# Phase transition fallback
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("transition_phase"),
[],
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
@singledispatchmethod
def _astify(self, element: ProgramElement) -> AstExpression:
raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.")
@_astify.register
def _(self, kwb: KeywordBelief) -> AstExpression:
return AstLiteral("keyword_said", [AstString(kwb.keyword)])
@_astify.register
def _(self, sb: SemanticBelief) -> AstExpression:
return AstLiteral(self.slugify(sb))
@_astify.register
def _(self, ib: InferredBelief) -> AstExpression:
return AstBinaryOp(
self._astify(ib.left),
BinaryOperatorType.AND if ib.operator == LogicalOperator.AND else BinaryOperatorType.OR,
self._astify(ib.right),
)
@_astify.register
def _(self, norm: Norm) -> AstExpression:
functor = "critical_norm" if norm.critical else "norm"
return AstLiteral(functor, [AstString(norm.norm)])
@_astify.register
def _(self, phase: Phase) -> AstExpression:
return AstLiteral("phase", [AstString(str(phase.id))])
@_astify.register
def _(self, goal: Goal, achieved: bool = False) -> AstExpression:
return AstLiteral(f"{'achieved_' if achieved else ''}{self._slugify_str(goal.name)}")
@_astify.register
def _(self, trigger: Trigger) -> AstExpression:
return AstLiteral(self.slugify(trigger))
@_astify.register
def _(self, sa: SpeechAction) -> AstExpression:
return AstLiteral("say", [AstString(sa.text)])
@_astify.register
def _(self, ga: GestureAction) -> AstExpression:
gesture = ga.gesture
return AstLiteral("gesture", [AstString(gesture.type), AstString(gesture.name)])
@_astify.register
def _(self, la: LLMAction) -> AstExpression:
return AstLiteral("reply_with_goal", [AstString(la.goal)])
@singledispatchmethod
@staticmethod
def slugify(element: ProgramElement) -> str:
raise NotImplementedError(f"Cannot convert element {element} to a slug.")
@slugify.register
@staticmethod
def _(sb: SemanticBelief) -> str:
return f"semantic_{AgentSpeakGenerator._slugify_str(sb.name)}"
@slugify.register
@staticmethod
def _(g: Goal) -> str:
return AgentSpeakGenerator._slugify_str(g.name)
@slugify.register
@staticmethod
def _(t: Trigger):
return f"trigger_{AgentSpeakGenerator._slugify_str(t.name)}"
@staticmethod
def _slugify_str(text: str) -> str:
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])

View File

@@ -0,0 +1,203 @@
import typing
from dataclasses import dataclass, field
# --- Types ---
@dataclass
class BeliefLiteral:
"""
Represents a literal or atom.
Example: phase(1), user_said("hello"), ~started
"""
functor: str
args: list[str] = field(default_factory=list)
negated: bool = False
def __str__(self):
# In ASL, 'not' is usually for closed-world assumption (prolog style),
# '~' is for explicit negation in beliefs.
# For simplicity in behavior trees, we often use 'not' for conditions.
prefix = "not " if self.negated else ""
if not self.args:
return f"{prefix}{self.functor}"
# Clean args to ensure strings are quoted if they look like strings,
# but usually the converter handles the quoting of string literals.
args_str = ", ".join(self.args)
return f"{prefix}{self.functor}({args_str})"
@dataclass
class GoalLiteral:
name: str
def __str__(self):
return f"!{self.name}"
@dataclass
class ActionLiteral:
"""
Represents a step in a plan body.
Example: .say("Hello") or !achieve_goal
"""
code: str
def __str__(self):
return self.code
@dataclass
class BinaryOp:
"""
Represents logical operations.
Example: (A & B) | C
"""
left: "Expression | str"
operator: typing.Literal["&", "|"]
right: "Expression | str"
def __str__(self):
l_str = str(self.left)
r_str = str(self.right)
if isinstance(self.left, BinaryOp):
l_str = f"({l_str})"
if isinstance(self.right, BinaryOp):
r_str = f"({r_str})"
return f"{l_str} {self.operator} {r_str}"
Literal = BeliefLiteral | GoalLiteral | ActionLiteral
Expression = Literal | BinaryOp | str
@dataclass
class Rule:
"""
Represents an inference rule.
Example: head :- body.
"""
head: Expression
body: Expression | None = None
def __str__(self):
if not self.body:
return f"{self.head}."
return f"{self.head} :- {self.body}."
@dataclass
class PersistentRule:
"""
Represents an inference rule, where the inferred belief is persistent when formed.
"""
head: Expression
body: Expression
def __str__(self):
if not self.body:
raise Exception("Rule without body should not be persistent.")
lines = []
if isinstance(self.body, BinaryOp):
lines.append(f"+{self.body.left}")
if self.body.operator == "&":
lines.append(f" : {self.body.right}")
lines.append(f" <- +{self.head}.")
if self.body.operator == "|":
lines.append(f"+{self.body.right}")
lines.append(f" <- +{self.head}.")
return "\n".join(lines)
@dataclass
class Plan:
"""
Represents a plan.
Syntax: +trigger : context <- body.
"""
trigger: BeliefLiteral | GoalLiteral
context: list[Expression] = field(default_factory=list)
body: list[ActionLiteral] = field(default_factory=list)
def __str__(self):
# Indentation settings
INDENT = " "
ARROW = "\n <- "
COLON = "\n : "
# Build Header
header = f"+{self.trigger}"
if self.context:
ctx_str = f" &\n{INDENT}".join(str(c) for c in self.context)
header += f"{COLON}{ctx_str}"
# Case 1: Empty body
if not self.body:
return f"{header}."
# Case 2: Short body (optional optimization, keeping it uniform usually better)
header += ARROW
lines = []
# We start the first action on the same line or next line.
# Let's put it on the next line for readability if there are multiple.
if len(self.body) == 1:
return f"{header}{self.body[0]}."
# First item
lines.append(f"{header}{self.body[0]};")
# Middle items
for item in self.body[1:-1]:
lines.append(f"{INDENT}{item};")
# Last item
lines.append(f"{INDENT}{self.body[-1]}.")
return "\n".join(lines)
@dataclass
class AgentSpeakFile:
"""
Root element representing the entire generated file.
"""
initial_beliefs: list[Rule] = field(default_factory=list)
inference_rules: list[Rule | PersistentRule] = field(default_factory=list)
plans: list[Plan] = field(default_factory=list)
def __str__(self):
sections = []
if self.initial_beliefs:
sections.append("// --- Initial Beliefs & Facts ---")
sections.extend(str(rule) for rule in self.initial_beliefs)
sections.append("")
if self.inference_rules:
sections.append("// --- Inference Rules ---")
sections.extend(str(rule) for rule in self.inference_rules if isinstance(rule, Rule))
sections.append("")
sections.extend(
str(rule) for rule in self.inference_rules if isinstance(rule, PersistentRule)
)
sections.append("")
if self.plans:
sections.append("// --- Plans ---")
# Separate plans by a newline for readability
sections.extend(str(plan) + "\n" for plan in self.plans)
return "\n".join(sections)

View File

@@ -0,0 +1,425 @@
import asyncio
import time
from functools import singledispatchmethod
from slugify import slugify
from control_backend.agents.bdi import BDICoreAgent
from control_backend.agents.bdi.asl_ast import (
ActionLiteral,
AgentSpeakFile,
BeliefLiteral,
BinaryOp,
Expression,
GoalLiteral,
PersistentRule,
Plan,
Rule,
)
from control_backend.agents.bdi.bdi_program_manager import test_program
from control_backend.schemas.program import (
BasicBelief,
Belief,
ConditionalNorm,
GestureAction,
Goal,
InferredBelief,
KeywordBelief,
LLMAction,
LogicalOperator,
Phase,
Program,
ProgramElement,
SemanticBelief,
SpeechAction,
)
async def do_things():
res = input("Wanna generate")
if res == "y":
program = AgentSpeakGenerator().generate(test_program)
filename = f"{int(time.time())}.asl"
with open(filename, "w") as f:
f.write(program)
else:
# filename = "0test.asl"
filename = "1766062491.asl"
bdi_agent = BDICoreAgent("BDICoreAgent", filename)
flag = asyncio.Event()
await bdi_agent.start()
await flag.wait()
def do_other_things():
print(AgentSpeakGenerator().generate(test_program))
class AgentSpeakGenerator:
"""
Converts a Pydantic Program behavior model into an AgentSpeak(L) AST,
then renders it to a string.
"""
def generate(self, program: Program) -> str:
asl = AgentSpeakFile()
self._generate_startup(program, asl)
for i, phase in enumerate(program.phases):
next_phase = program.phases[i + 1] if i < len(program.phases) - 1 else None
self._generate_phase_flow(phase, next_phase, asl)
self._generate_norms(phase, asl)
self._generate_goals(phase, asl)
self._generate_triggers(phase, asl)
self._generate_fallbacks(program, asl)
return str(asl)
# --- Section: Startup & Phase Management ---
def _generate_startup(self, program: Program, asl: AgentSpeakFile):
if not program.phases:
return
# Initial belief: phase(start).
asl.initial_beliefs.append(Rule(head=BeliefLiteral("phase", ['"start"'])))
# Startup plan: +started : phase(start) <- -phase(start); +phase(first_id).
asl.plans.append(
Plan(
trigger=BeliefLiteral("started"),
context=[BeliefLiteral("phase", ['"start"'])],
body=[
ActionLiteral('-phase("start")'),
ActionLiteral(f'+phase("{program.phases[0].id}")'),
],
)
)
# Initial plans:
asl.plans.append(
Plan(
trigger=GoalLiteral("generate_response_with_goal(Goal)"),
context=[BeliefLiteral("user_said", ["Message"])],
body=[
ActionLiteral("+responded_this_turn"),
ActionLiteral(".findall(Norm, norm(Norm), Norms)"),
ActionLiteral(".reply_with_goal(Message, Norms, Goal)"),
],
)
)
def _generate_phase_flow(self, phase: Phase, next_phase: Phase | None, asl: AgentSpeakFile):
"""Generates the main loop listener and the transition logic for this phase."""
# +user_said(Message) : phase(ID) <- !goal1; !goal2; !transition_phase.
goal_actions = [ActionLiteral("-responded_this_turn")]
goal_actions += [
ActionLiteral(f"!check_{self._slugify_str(keyword)}")
for keyword in self._get_keyword_conditionals(phase)
]
goal_actions += [ActionLiteral(f"!{self._slugify(g)}") for g in phase.goals]
goal_actions.append(ActionLiteral("!transition_phase"))
asl.plans.append(
Plan(
trigger=BeliefLiteral("user_said", ["Message"]),
context=[BeliefLiteral("phase", [f'"{phase.id}"'])],
body=goal_actions,
)
)
# +!transition_phase : phase(ID) <- -phase(ID); +(NEXT_ID).
next_id = str(next_phase.id) if next_phase else "end"
transition_context = [BeliefLiteral("phase", [f'"{phase.id}"'])]
if phase.goals:
transition_context.append(BeliefLiteral(f"achieved_{self._slugify(phase.goals[-1])}"))
asl.plans.append(
Plan(
trigger=GoalLiteral("transition_phase"),
context=transition_context,
body=[
ActionLiteral(f'-phase("{phase.id}")'),
ActionLiteral(f'+phase("{next_id}")'),
ActionLiteral("user_said(Anything)"),
ActionLiteral("-+user_said(Anything)"),
],
)
)
def _get_keyword_conditionals(self, phase: Phase) -> list[str]:
res = []
for belief in self._extract_basic_beliefs_from_phase(phase):
if isinstance(belief, KeywordBelief):
res.append(belief.keyword)
return res
# --- Section: Norms & Beliefs ---
def _generate_norms(self, phase: Phase, asl: AgentSpeakFile):
for norm in phase.norms:
norm_slug = f'"{norm.norm}"'
head = BeliefLiteral("norm", [norm_slug])
# Base context is the phase
phase_lit = BeliefLiteral("phase", [f'"{phase.id}"'])
if isinstance(norm, ConditionalNorm):
self._ensure_belief_inference(norm.condition, asl)
condition_expr = self._belief_to_expr(norm.condition)
body = BinaryOp(phase_lit, "&", condition_expr)
else:
body = phase_lit
asl.inference_rules.append(Rule(head=head, body=body))
def _ensure_belief_inference(self, belief: Belief, asl: AgentSpeakFile):
"""
Recursively adds rules to infer beliefs.
Checks strictly to avoid duplicates if necessary,
though ASL engines often handle redefinition or we can use a set to track processed IDs.
"""
if isinstance(belief, KeywordBelief):
pass
# # Rule: keyword_said("word") :- user_said(M) & .substring("word", M, P) & P >= 0.
# kwd_slug = f'"{belief.keyword}"'
# head = BeliefLiteral("keyword_said", [kwd_slug])
#
# # Avoid duplicates
# if any(str(r.head) == str(head) for r in asl.inference_rules):
# return
#
# body = BinaryOp(
# BeliefLiteral("user_said", ["Message"]),
# "&",
# BinaryOp(f".substring({kwd_slug}, Message, Pos)", "&", "Pos >= 0"),
# )
#
# asl.inference_rules.append(Rule(head=head, body=body))
elif isinstance(belief, InferredBelief):
self._ensure_belief_inference(belief.left, asl)
self._ensure_belief_inference(belief.right, asl)
slug = self._slugify(belief)
head = BeliefLiteral(slug)
if any(str(r.head) == str(head) for r in asl.inference_rules):
return
op_char = "&" if belief.operator == LogicalOperator.AND else "|"
body = BinaryOp(
self._belief_to_expr(belief.left), op_char, self._belief_to_expr(belief.right)
)
asl.inference_rules.append(PersistentRule(head=head, body=body))
def _belief_to_expr(self, belief: Belief) -> Expression:
if isinstance(belief, KeywordBelief):
return BeliefLiteral("keyword_said", [f'"{belief.keyword}"'])
else:
return BeliefLiteral(self._slugify(belief))
# --- Section: Goals ---
def _generate_goals(self, phase: Phase, asl: AgentSpeakFile):
previous_goal: Goal | None = None
for goal in phase.goals:
self._generate_goal_plan_recursive(goal, str(phase.id), previous_goal, asl)
previous_goal = goal
def _generate_goal_plan_recursive(
self,
goal: Goal,
phase_id: str,
previous_goal: Goal | None,
asl: AgentSpeakFile,
responded_needed: bool = True,
can_fail: bool = True,
):
goal_slug = self._slugify(goal)
# phase(ID) & not responded_this_turn & not achieved_goal
context = [
BeliefLiteral("phase", [f'"{phase_id}"']),
]
if responded_needed:
context.append(BeliefLiteral("responded_this_turn", negated=True))
if can_fail:
context.append(BeliefLiteral(f"achieved_{goal_slug}", negated=True))
if previous_goal:
prev_slug = self._slugify(previous_goal)
context.append(BeliefLiteral(f"achieved_{prev_slug}"))
body_actions = []
sub_goals_to_process = []
for step in goal.plan.steps:
if isinstance(step, Goal):
sub_slug = self._slugify(step)
body_actions.append(ActionLiteral(f"!{sub_slug}"))
sub_goals_to_process.append(step)
elif isinstance(step, SpeechAction):
body_actions.append(ActionLiteral(f'.say("{step.text}")'))
elif isinstance(step, GestureAction):
body_actions.append(ActionLiteral(f'.gesture("{step.gesture}")'))
elif isinstance(step, LLMAction):
body_actions.append(ActionLiteral(f'!generate_response_with_goal("{step.goal}")'))
# Mark achievement
if not goal.can_fail:
body_actions.append(ActionLiteral(f"+achieved_{goal_slug}"))
asl.plans.append(Plan(trigger=GoalLiteral(goal_slug), context=context, body=body_actions))
asl.plans.append(
Plan(trigger=GoalLiteral(goal_slug), context=[], body=[ActionLiteral("true")])
)
prev_sub = None
for sub_goal in sub_goals_to_process:
self._generate_goal_plan_recursive(sub_goal, phase_id, prev_sub, asl)
prev_sub = sub_goal
# --- Section: Triggers ---
def _generate_triggers(self, phase: Phase, asl: AgentSpeakFile):
for keyword in self._get_keyword_conditionals(phase):
asl.plans.append(
Plan(
trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"),
context=[
ActionLiteral(
f'user_said(Message) & .substring("{keyword}", Message, Pos) & Pos >= 0'
)
],
body=[
ActionLiteral(f'+keyword_said("{keyword}")'),
ActionLiteral(f'-keyword_said("{keyword}")'),
],
)
)
asl.plans.append(
Plan(
trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"),
body=[ActionLiteral("true")],
)
)
for trigger in phase.triggers:
self._ensure_belief_inference(trigger.condition, asl)
trigger_belief_slug = self._belief_to_expr(trigger.condition)
body_actions = []
sub_goals = []
for step in trigger.plan.steps:
if isinstance(step, Goal):
sub_slug = self._slugify(step)
body_actions.append(ActionLiteral(f"!{sub_slug}"))
sub_goals.append(step)
elif isinstance(step, SpeechAction):
body_actions.append(ActionLiteral(f'.say("{step.text}")'))
elif isinstance(step, GestureAction):
body_actions.append(
ActionLiteral(f'.gesture("{step.gesture.type}", "{step.gesture.name}")')
)
elif isinstance(step, LLMAction):
body_actions.append(
ActionLiteral(f'!generate_response_with_goal("{step.goal}")')
)
asl.plans.append(
Plan(
trigger=BeliefLiteral(trigger_belief_slug),
context=[BeliefLiteral("phase", [f'"{phase.id}"'])],
body=body_actions,
)
)
# Recurse for triggered goals
prev_sub = None
for sub_goal in sub_goals:
self._generate_goal_plan_recursive(
sub_goal, str(phase.id), prev_sub, asl, False, False
)
prev_sub = sub_goal
# --- Section: Fallbacks ---
def _generate_fallbacks(self, program: Program, asl: AgentSpeakFile):
asl.plans.append(
Plan(trigger=GoalLiteral("transition_phase"), context=[], body=[ActionLiteral("true")])
)
# --- Helpers ---
@singledispatchmethod
def _slugify(self, element: ProgramElement) -> str:
if element.name:
raise NotImplementedError("Cannot slugify this element.")
return self._slugify_str(element.name)
@_slugify.register
def _(self, goal: Goal) -> str:
if goal.name:
return self._slugify_str(goal.name)
return f"goal_{goal.id.hex}"
@_slugify.register
def _(self, kwb: KeywordBelief) -> str:
return f"keyword_said({kwb.keyword})"
@_slugify.register
def _(self, sb: SemanticBelief) -> str:
return self._slugify_str(sb.description)
@_slugify.register
def _(self, ib: InferredBelief) -> str:
return self._slugify_str(ib.name)
def _slugify_str(self, text: str) -> str:
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])
def _extract_basic_beliefs_from_program(self, program: Program) -> list[BasicBelief]:
beliefs = []
for phase in program.phases:
beliefs.extend(self._extract_basic_beliefs_from_phase(phase))
return beliefs
def _extract_basic_beliefs_from_phase(self, phase: Phase) -> list[BasicBelief]:
beliefs = []
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += self._extract_basic_beliefs_from_belief(norm.condition)
for trigger in phase.triggers:
beliefs += self._extract_basic_beliefs_from_belief(trigger.condition)
return beliefs
def _extract_basic_beliefs_from_belief(self, belief: Belief) -> list[BasicBelief]:
if isinstance(belief, InferredBelief):
return self._extract_basic_beliefs_from_belief(
belief.left
) + self._extract_basic_beliefs_from_belief(belief.right)
return [belief]
if __name__ == "__main__":
asyncio.run(do_things())
# do_other_things()y

View File

@@ -1,5 +1,6 @@
import asyncio
import copy
import json
import time
from collections.abc import Iterable
@@ -11,9 +12,9 @@ from pydantic import ValidationError
from control_backend.agents.base import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief, BeliefMessage
from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.llm_prompt_message import LLMPromptMessage
from control_backend.schemas.ri_message import SpeechCommand
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand
DELIMITER = ";\n" # TODO: temporary until we support lists in AgentSpeak
@@ -42,13 +43,13 @@ class BDICoreAgent(BaseAgent):
bdi_agent: agentspeak.runtime.Agent
def __init__(self, name: str, asl: str):
def __init__(self, name: str):
super().__init__(name)
self.asl_file = asl
self.env = agentspeak.runtime.Environment()
# Deep copy because we don't actually want to modify the standard actions globally
self.actions = copy.deepcopy(agentspeak.stdlib.actions)
self._wake_bdi_loop = asyncio.Event()
self._bdi_loop_task = None
async def setup(self) -> None:
"""
@@ -65,19 +66,22 @@ class BDICoreAgent(BaseAgent):
await self._load_asl()
# Start the BDI cycle loop
self.add_behavior(self._bdi_loop())
self._bdi_loop_task = self.add_behavior(self._bdi_loop())
self._wake_bdi_loop.set()
self.logger.debug("Setup complete.")
async def _load_asl(self):
async def _load_asl(self, file_name: str | None = None) -> None:
"""
Load and parse the AgentSpeak source file.
"""
file_name = file_name or "src/control_backend/agents/bdi/default_behavior.asl"
try:
with open(self.asl_file) as source:
with open(file_name) as source:
self.bdi_agent = self.env.build_agent(source, self.actions)
self.logger.info(f"Loaded new ASL from {file_name}.")
except FileNotFoundError:
self.logger.warning(f"Could not find the specified ASL file at {self.asl_file}.")
self.logger.warning(f"Could not find the specified ASL file at {file_name}.")
self.bdi_agent = agentspeak.runtime.Agent(self.env, self.name)
async def _bdi_loop(self):
@@ -97,7 +101,6 @@ class BDICoreAgent(BaseAgent):
maybe_more_work = True
while maybe_more_work:
maybe_more_work = False
self.logger.debug("Stepping BDI.")
if self.bdi_agent.step():
maybe_more_work = True
@@ -116,6 +119,7 @@ class BDICoreAgent(BaseAgent):
Handle incoming messages.
- **Beliefs**: Updates the internal belief base.
- **Program**: Updates the internal agentspeak file to match the current program.
- **LLM Responses**: Forwards the generated text to the Robot Speech Agent (actuation).
:param msg: The received internal message.
@@ -124,12 +128,19 @@ class BDICoreAgent(BaseAgent):
if msg.thread == "beliefs":
try:
beliefs = BeliefMessage.model_validate_json(msg.body).beliefs
self._apply_beliefs(beliefs)
belief_changes = BeliefMessage.model_validate_json(msg.body)
self._apply_belief_changes(belief_changes)
except ValidationError:
self.logger.exception("Error processing belief.")
return
# New agentspeak file
if msg.thread == "new_program":
if self._bdi_loop_task:
self._bdi_loop_task.cancel()
await self._load_asl(msg.body)
self.add_behavior(self._bdi_loop())
# The message was not a belief, handle special cases based on sender
match msg.sender:
case settings.agent_settings.llm_name:
@@ -144,23 +155,41 @@ class BDICoreAgent(BaseAgent):
body=cmd.model_dump_json(),
)
await self.send(out_msg)
case settings.agent_settings.user_interrupt_name:
content = msg.body
self.logger.debug("Received user interruption: %s", content)
def _apply_beliefs(self, beliefs: list[Belief]):
match msg.thread:
case "force_phase_transition":
self._set_goal("transition_phase")
case "force_trigger":
self._force_trigger(msg.body)
case _:
self.logger.warning("Received unknow user interruption: %s", msg)
def _apply_belief_changes(self, belief_changes: BeliefMessage):
"""
Update the belief base with a list of new beliefs.
If ``replace=True`` is set on a belief, it removes all existing beliefs with that name
before adding the new one.
For beliefs in ``belief_changes.replace``, it removes all existing beliefs with that name
before adding one new one.
:param belief_changes: The changes in beliefs to apply.
"""
if not beliefs:
if not belief_changes.create and not belief_changes.replace and not belief_changes.delete:
return
for belief in beliefs:
if belief.replace:
self._remove_all_with_name(belief.name)
for belief in belief_changes.create:
self._add_belief(belief.name, belief.arguments)
def _add_belief(self, name: str, args: Iterable[str] = []):
for belief in belief_changes.replace:
self._remove_all_with_name(belief.name)
self._add_belief(belief.name, belief.arguments)
for belief in belief_changes.delete:
self._remove_belief(belief.name, belief.arguments)
def _add_belief(self, name: str, args: list[str] = None):
"""
Add a single belief to the BDI agent.
@@ -168,9 +197,13 @@ class BDICoreAgent(BaseAgent):
:param args: Arguments for the belief.
"""
# new_args = (agentspeak.Literal(arg) for arg in args) # TODO: Eventually support multiple
merged_args = DELIMITER.join(arg for arg in args)
new_args = (agentspeak.Literal(merged_args),)
term = agentspeak.Literal(name, new_args)
args = args or []
if args:
merged_args = DELIMITER.join(arg for arg in args)
new_args = (agentspeak.Literal(merged_args),)
term = agentspeak.Literal(name, new_args)
else:
term = agentspeak.Literal(name)
self.bdi_agent.call(
agentspeak.Trigger.addition,
@@ -179,16 +212,35 @@ class BDICoreAgent(BaseAgent):
agentspeak.runtime.Intention(),
)
# Check for transitions
self.bdi_agent.call(
agentspeak.Trigger.addition,
agentspeak.GoalType.achievement,
agentspeak.Literal("transition_phase"),
agentspeak.runtime.Intention(),
)
# Check triggers
self.bdi_agent.call(
agentspeak.Trigger.addition,
agentspeak.GoalType.achievement,
agentspeak.Literal("check_triggers"),
agentspeak.runtime.Intention(),
)
self._wake_bdi_loop.set()
self.logger.debug(f"Added belief {self.format_belief_string(name, args)}")
def _remove_belief(self, name: str, args: Iterable[str]):
def _remove_belief(self, name: str, args: Iterable[str] | None):
"""
Removes a specific belief (with arguments), if it exists.
"""
new_args = (agentspeak.Literal(arg) for arg in args)
term = agentspeak.Literal(name, new_args)
if args is None:
term = agentspeak.Literal(name)
else:
new_args = (agentspeak.Literal(arg) for arg in args)
term = agentspeak.Literal(name, new_args)
result = self.bdi_agent.call(
agentspeak.Trigger.removal,
@@ -228,6 +280,37 @@ class BDICoreAgent(BaseAgent):
self.logger.debug(f"Removed {removed_count} beliefs.")
def _set_goal(self, name: str, args: Iterable[str] | None = None):
args = args or []
if args:
merged_args = DELIMITER.join(arg for arg in args)
new_args = (agentspeak.Literal(merged_args),)
term = agentspeak.Literal(name, new_args)
else:
term = agentspeak.Literal(name)
self.bdi_agent.call(
agentspeak.Trigger.addition,
agentspeak.GoalType.achievement,
term,
agentspeak.runtime.Intention(),
)
self._wake_bdi_loop.set()
self.logger.debug(f"Set goal !{self.format_belief_string(name, args)}.")
def _force_trigger(self, name: str):
self.bdi_agent.call(
agentspeak.Trigger.addition,
agentspeak.GoalType.achievement,
agentspeak.Literal(name),
agentspeak.runtime.Intention(),
)
self.logger.info("Manually forced trigger %s.", name)
def _add_custom_actions(self) -> None:
"""
Add any custom actions here. Inside `@self.actions.add()`, the first argument is
@@ -235,43 +318,207 @@ class BDICoreAgent(BaseAgent):
the function expects (which will be located in `term.args`).
"""
@self.actions.add(".reply", 3)
def _reply(agent: "BDICoreAgent", term, intention):
@self.actions.add(".reply", 2)
def _reply(agent, term, intention):
"""
Sends text to the LLM (AgentSpeak action).
Example: .reply("Hello LLM!", "Some norm", "Some goal")
Let the LLM generate a response to a user's utterance with the current norms and goals.
"""
message_text = agentspeak.grounded(term.args[0], intention.scope)
norms = agentspeak.grounded(term.args[1], intention.scope)
goals = agentspeak.grounded(term.args[2], intention.scope)
self.logger.debug("Norms: %s", norms)
self.logger.debug("Goals: %s", goals)
self.logger.debug("User text: %s", message_text)
asyncio.create_task(self._send_to_llm(str(message_text), str(norms), str(goals)))
self.add_behavior(self._send_to_llm(str(message_text), str(norms), ""))
yield
async def _send_to_llm(self, text: str, norms: str = None, goals: str = None):
@self.actions.add(".reply_with_goal", 3)
def _reply_with_goal(agent: "BDICoreAgent", term, intention):
"""
Let the LLM generate a response to a user's utterance with the current norms and a
specific goal.
"""
message_text = agentspeak.grounded(term.args[0], intention.scope)
norms = agentspeak.grounded(term.args[1], intention.scope)
goal = agentspeak.grounded(term.args[2], intention.scope)
self.logger.debug(
'"reply_with_goal" action called with message=%s, norms=%s, goal=%s',
message_text,
norms,
goal,
)
self.add_behavior(self._send_to_llm(str(message_text), str(norms), str(goal)))
yield
@self.actions.add(".say", 1)
def _say(agent, term, intention):
"""
Make the robot say the given text instantly.
"""
message_text = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug('"say" action called with text=%s', message_text)
speech_command = SpeechCommand(data=message_text)
speech_message = InternalMessage(
to=settings.agent_settings.robot_speech_name,
sender=settings.agent_settings.bdi_core_name,
body=speech_command.model_dump_json(),
)
self.add_behavior(self.send(speech_message))
chat_history_message = InternalMessage(
to=settings.agent_settings.llm_name,
thread="assistant_message",
body=str(message_text),
)
self.add_behavior(self.send(chat_history_message))
yield
@self.actions.add(".gesture", 2)
def _gesture(agent, term, intention):
"""
Make the robot perform the given gesture instantly.
"""
gesture_type = agentspeak.grounded(term.args[0], intention.scope)
gesture_name = agentspeak.grounded(term.args[1], intention.scope)
self.logger.debug(
'"gesture" action called with type=%s, name=%s',
gesture_type,
gesture_name,
)
if str(gesture_type) == "single":
endpoint = RIEndpoint.GESTURE_SINGLE
elif str(gesture_type) == "tag":
endpoint = RIEndpoint.GESTURE_TAG
else:
self.logger.warning("Gesture type %s could not be resolved.", gesture_type)
endpoint = RIEndpoint.GESTURE_SINGLE
gesture_command = GestureCommand(endpoint=endpoint, data=gesture_name)
gesture_message = InternalMessage(
to=settings.agent_settings.robot_gesture_name,
sender=settings.agent_settings.bdi_core_name,
body=gesture_command.model_dump_json(),
)
self.add_behavior(self.send(gesture_message))
yield
@self.actions.add(".notify_user_said", 1)
def _notify_user_said(agent, term, intention):
user_said = agentspeak.grounded(term.args[0], intention.scope)
msg = InternalMessage(
to=settings.agent_settings.llm_name, thread="user_message", body=str(user_said)
)
self.add_behavior(self.send(msg))
yield
@self.actions.add(".notify_trigger_start", 1)
def _notify_trigger_start(agent, term, intention):
"""
Notify the UI about the trigger we just started doing.
"""
trigger_name = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug("Started trigger %s", trigger_name)
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
sender=self.name,
thread="trigger_start",
body=str(trigger_name),
)
# TODO: check with Pim
self.add_behavior(self.send(msg))
yield
@self.actions.add(".notify_trigger_end", 1)
def _notify_trigger_end(agent, term, intention):
"""
Notify the UI about the trigger we just started doing.
"""
trigger_name = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug("Finished trigger %s", trigger_name)
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
sender=self.name,
thread="trigger_end",
body=str(trigger_name),
)
# TODO: check with Pim
self.add_behavior(self.send(msg))
yield
@self.actions.add(".notify_goal_start", 1)
def _notify_goal_start(agent, term, intention):
"""
Notify the UI about the goal we just started chasing.
"""
goal_name = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug("Started chasing goal %s", goal_name)
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
sender=self.name,
thread="goal_start",
body=str(goal_name),
)
self.add_behavior(self.send(msg))
yield
@self.actions.add(".notify_transition_phase", 2)
def _notify_transition_phase(agent, term, intention):
"""
Notify the BDI program manager about a phase transition.
"""
old = agentspeak.grounded(term.args[0], intention.scope)
new = agentspeak.grounded(term.args[1], intention.scope)
msg = InternalMessage(
to=settings.agent_settings.bdi_program_manager_name,
thread="transition_phase",
body=json.dumps({"old": str(old), "new": str(new)}),
)
self.add_behavior(self.send(msg))
yield
async def _send_to_llm(self, text: str, norms: str, goals: str):
"""
Sends a text query to the LLM agent asynchronously.
"""
prompt = LLMPromptMessage(
text=text,
norms=norms.split("\n") if norms else [],
goals=goals.split("\n") if norms else [],
)
prompt = LLMPromptMessage(text=text, norms=norms.split("\n"), goals=goals.split("\n"))
msg = InternalMessage(
to=settings.agent_settings.llm_name,
sender=self.name,
body=prompt.model_dump_json(),
thread="prompt_message",
)
await self.send(msg)
self.logger.info("Message sent to LLM agent: %s", text)
@staticmethod
def format_belief_string(name: str, args: Iterable[str] = []):
def format_belief_string(name: str, args: Iterable[str] | None = []):
"""
Given a belief's name and its args, return a string of the form "name(*args)"
"""
return f"{name}{'(' if args else ''}{','.join(args)}{')' if args else ''}"
return f"{name}{'(' if args else ''}{','.join(args or [])}{')' if args else ''}"

View File

@@ -1,12 +1,23 @@
import asyncio
import json
import zmq
from pydantic import ValidationError
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief, BeliefMessage
from control_backend.schemas.program import Program
from control_backend.schemas.belief_list import BeliefList, GoalList
from control_backend.schemas.internal_message import InternalMessage
from control_backend.schemas.program import (
Belief,
ConditionalNorm,
Goal,
InferredBelief,
Phase,
Program,
)
class BDIProgramManager(BaseAgent):
@@ -21,44 +32,140 @@ class BDIProgramManager(BaseAgent):
:ivar sub_socket: The ZMQ SUB socket used to receive program updates.
"""
_program: Program
_phase: Phase | None
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sub_socket = None
async def _send_to_bdi(self, program: Program):
def _initialize_internal_state(self, program: Program):
self._program = program
self._phase = program.phases[0] # start in first phase
async def _create_agentspeak_and_send_to_bdi(self, program: Program):
"""
Convert a received program into BDI beliefs and send them to the BDI Core Agent.
Currently, it takes the **first phase** of the program and extracts:
- **Norms**: Constraints or rules the agent must follow.
- **Goals**: Objectives the agent must achieve.
These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will
overwrite any existing norms/goals of the same name in the BDI agent.
Convert a received program into an AgentSpeak file and send it to the BDI Core Agent.
:param program: The program object received from the API.
"""
first_phase = program.phases[0]
norms_belief = Belief(
name="norms",
arguments=[norm.norm for norm in first_phase.norms],
replace=True,
asg = AgentSpeakGenerator()
asl_str = asg.generate(program)
file_name = "src/control_backend/agents/bdi/agentspeak.asl"
with open(file_name, "w") as f:
f.write(asl_str)
msg = InternalMessage(
sender=self.name,
to=settings.agent_settings.bdi_core_name,
body=file_name,
thread="new_program",
)
goals_belief = Belief(
name="goals",
arguments=[goal.description for goal in first_phase.goals],
replace=True,
await self.send(msg)
async def handle_message(self, msg: InternalMessage):
match msg.thread:
case "transition_phase":
phases = json.loads(msg.body)
await self._transition_phase(phases["old"], phases["new"])
async def _transition_phase(self, old: str, new: str):
assert old == str(self._phase.id)
if new == "end":
self._phase = None
return
for phase in self._program.phases:
if str(phase.id) == new:
self._phase = phase
await self._send_beliefs_to_semantic_belief_extractor()
await self._send_goals_to_semantic_belief_extractor()
# Notify user interaction agent
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
thread="transition_phase",
body=str(self._phase.id),
)
program_beliefs = BeliefMessage(beliefs=[norms_belief, goals_belief])
self.add_behavior(self.send(msg))
def _extract_current_beliefs(self) -> list[Belief]:
beliefs: list[Belief] = []
for norm in self._phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += self._extract_beliefs_from_belief(norm.condition)
for trigger in self._phase.triggers:
beliefs += self._extract_beliefs_from_belief(trigger.condition)
return beliefs
@staticmethod
def _extract_beliefs_from_belief(belief: Belief) -> list[Belief]:
if isinstance(belief, InferredBelief):
return BDIProgramManager._extract_beliefs_from_belief(
belief.left
) + BDIProgramManager._extract_beliefs_from_belief(belief.right)
return [belief]
async def _send_beliefs_to_semantic_belief_extractor(self):
"""
Extract beliefs from the program and send them to the Semantic Belief Extractor Agent.
"""
beliefs = BeliefList(beliefs=self._extract_current_beliefs())
message = InternalMessage(
to=settings.agent_settings.bdi_core_name,
to=settings.agent_settings.text_belief_extractor_name,
sender=self.name,
body=program_beliefs.model_dump_json(),
body=beliefs.model_dump_json(),
thread="beliefs",
)
await self.send(message)
def _extract_current_goals(self) -> list[Goal]:
"""
Extract all goals from the program, including subgoals.
:return: A list of Goal objects.
"""
goals: list[Goal] = []
def extract_goals_from_goal(goal_: Goal) -> list[Goal]:
goals_: list[Goal] = [goal]
for plan in goal_.plan:
if isinstance(plan, Goal):
goals_.extend(extract_goals_from_goal(plan))
return goals_
for goal in self._phase.goals:
goals.extend(extract_goals_from_goal(goal))
return goals
async def _send_goals_to_semantic_belief_extractor(self):
"""
Extract goals for the current phase and send them to the Semantic Belief Extractor Agent.
"""
goals = GoalList(goals=self._extract_current_goals())
message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=self.name,
body=goals.model_dump_json(),
thread="goals",
)
await self.send(message)
self.logger.debug("Sent new norms and goals to the BDI agent.")
async def _send_clear_llm_history(self):
"""
@@ -68,13 +175,19 @@ class BDIProgramManager(BaseAgent):
"""
message = InternalMessage(
to=settings.agent_settings.llm_name,
sender=self.name,
body="clear_history",
threads="clear history message",
)
await self.send(message)
self.logger.debug("Sent message to LLM agent to clear history.")
extractor_msg = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
thread="conversation_history",
body="reset",
)
await self.send(extractor_msg)
self.logger.debug("Sent message to extractor agent to clear history.")
async def _receive_programs(self):
"""
Continuous loop that receives program updates from the HTTP endpoint.
@@ -88,13 +201,20 @@ class BDIProgramManager(BaseAgent):
try:
program = Program.model_validate_json(body)
await self._send_to_bdi(program)
await self._send_clear_llm_history()
except ValidationError:
self.logger.exception("Received an invalid program.")
self.logger.warning("Received an invalid program.")
continue
self._initialize_internal_state(program)
await self._send_clear_llm_history()
await asyncio.gather(
self._create_agentspeak_and_send_to_bdi(program),
self._send_beliefs_to_semantic_belief_extractor(),
self._send_goals_to_semantic_belief_extractor(),
)
async def setup(self):
"""
Initialize the agent.

View File

@@ -144,7 +144,7 @@ class BDIBeliefCollectorAgent(BaseAgent):
msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=BeliefMessage(beliefs=beliefs).model_dump_json(),
body=BeliefMessage(create=beliefs).model_dump_json(),
thread="beliefs",
)

View File

@@ -0,0 +1,6 @@
norms("").
+user_said(Message) : norms(Norms) <-
.notify_user_said(Message);
-user_said(Message);
.reply(Message, Norms).

View File

@@ -1,6 +0,0 @@
norms("").
goals("").
+user_said(Message) : norms(Norms) & goals(Goals) <-
-user_said(Message);
.reply(Message, Norms, Goals).

View File

@@ -1,8 +1,46 @@
import asyncio
import json
import httpx
from pydantic import BaseModel, ValidationError
from control_backend.agents.base import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList, GoalList
from control_backend.schemas.belief_message import Belief as InternalBelief
from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.chat_history import ChatHistory, ChatMessage
from control_backend.schemas.program import Goal, SemanticBelief
type JSONLike = None | bool | int | float | str | list["JSONLike"] | dict[str, "JSONLike"]
class BeliefState(BaseModel):
true: set[InternalBelief] = set()
false: set[InternalBelief] = set()
def difference(self, other: "BeliefState") -> "BeliefState":
return BeliefState(
true=self.true - other.true,
false=self.false - other.false,
)
def union(self, other: "BeliefState") -> "BeliefState":
return BeliefState(
true=self.true | other.true,
false=self.false | other.false,
)
def __sub__(self, other):
return self.difference(other)
def __or__(self, other):
return self.union(other)
def __bool__(self):
return bool(self.true) or bool(self.false)
class TextBeliefExtractorAgent(BaseAgent):
@@ -12,54 +50,433 @@ class TextBeliefExtractorAgent(BaseAgent):
This agent is responsible for processing raw text (e.g., from speech transcription) and
extracting semantic beliefs from it.
In the current demonstration version, it performs a simple wrapping of the user's input
into a ``user_said`` belief. In a full implementation, this agent would likely interact
with an LLM or NLU engine to extract intent, entities, and other structured information.
It uses the available beliefs received from the program manager to try to extract beliefs from a
user's message, sends and updated beliefs to the BDI core, and forms a ``user_said`` belief from
the message itself.
"""
def __init__(self, name: str):
super().__init__(name)
self._llm = self.LLM(self, settings.llm_settings.n_parallel)
self.belief_inferrer = SemanticBeliefInferrer(self._llm)
self.goal_inferrer = GoalAchievementInferrer(self._llm)
self._current_beliefs = BeliefState()
self._current_goal_completions: dict[str, bool] = {}
self.conversation = ChatHistory(messages=[])
async def setup(self):
"""
Initialize the agent and its resources.
"""
self.logger.info("Settting up %s.", self.name)
# Setup LLM belief context if needed (currently demo is just passthrough)
self.beliefs = {"mood": ["X"], "car": ["Y"]}
self.logger.info("Setting up %s.", self.name)
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming messages, primarily from the Transcription Agent.
Handle incoming messages. Expect messages from the Transcriber agent, LLM agent, and the
Program manager agent.
:param msg: The received message containing transcribed text.
:param msg: The received message.
"""
sender = msg.sender
if sender == settings.agent_settings.transcription_name:
self.logger.debug("Received text from transcriber: %s", msg.body)
await self._process_transcription_demo(msg.body)
else:
self.logger.info("Discarding message from %s", sender)
async def _process_transcription_demo(self, txt: str):
match sender:
case settings.agent_settings.transcription_name:
self.logger.debug("Received text from transcriber: %s", msg.body)
self._apply_conversation_message(ChatMessage(role="user", content=msg.body))
await self._user_said(msg.body)
await self._infer_new_beliefs()
await self._infer_goal_completions()
case settings.agent_settings.llm_name:
self.logger.debug("Received text from LLM: %s", msg.body)
self._apply_conversation_message(ChatMessage(role="assistant", content=msg.body))
case settings.agent_settings.bdi_program_manager_name:
await self._handle_program_manager_message(msg)
case _:
self.logger.info("Discarding message from %s", sender)
return
def _apply_conversation_message(self, message: ChatMessage):
"""
Process the transcribed text and generate beliefs.
Save the chat message to our conversation history, taking into account the conversation
length limit.
**Demo Implementation:**
Currently, this method takes the raw text ``txt`` and wraps it into a belief structure:
``user_said("txt")``.
This belief is then sent to the :class:`BDIBeliefCollectorAgent`.
:param txt: The raw transcribed text string.
:param message: The chat message to add to the conversation history.
"""
# For demo, just wrapping user text as user_said belief
belief = {"beliefs": {"user_said": [txt]}, "type": "belief_extraction_text"}
payload = json.dumps(belief)
length_limit = settings.behaviour_settings.conversation_history_length_limit
self.conversation.messages = (self.conversation.messages + [message])[-length_limit:]
belief_msg = InternalMessage(
to=settings.agent_settings.bdi_belief_collector_name,
sender=self.name,
body=payload,
thread="beliefs",
async def _handle_program_manager_message(self, msg: InternalMessage):
"""
Handle a message from the program manager: extract available beliefs and goals from it.
:param msg: The received message from the program manager.
"""
match msg.thread:
case "beliefs":
self._handle_beliefs_message(msg)
await self._infer_new_beliefs()
case "goals":
self._handle_goals_message(msg)
await self._infer_goal_completions()
case "conversation_history":
if msg.body == "reset":
self._reset()
case _:
self.logger.warning("Received unexpected message from %s", msg.sender)
def _reset(self):
self.conversation = ChatHistory(messages=[])
self.belief_inferrer.available_beliefs.clear()
self._current_beliefs = BeliefState()
self.goal_inferrer.goals.clear()
self._current_goal_completions = {}
def _handle_beliefs_message(self, msg: InternalMessage):
try:
belief_list = BeliefList.model_validate_json(msg.body)
except ValidationError:
self.logger.warning(
"Received message from program manager but it is not a valid list of beliefs."
)
return
available_beliefs = [b for b in belief_list.beliefs if isinstance(b, SemanticBelief)]
self.belief_inferrer.available_beliefs = available_beliefs
self.logger.debug(
"Received %d semantic beliefs from the program manager: %s",
len(available_beliefs),
", ".join(b.name for b in available_beliefs),
)
def _handle_goals_message(self, msg: InternalMessage):
try:
goals_list = GoalList.model_validate_json(msg.body)
except ValidationError:
self.logger.warning(
"Received message from program manager but it is not a valid list of goals."
)
return
# Use only goals that can fail, as the others are always assumed to be completed
available_goals = [g for g in goals_list.goals if g.can_fail]
self.goal_inferrer.goals = available_goals
self.logger.debug(
"Received %d failable goals from the program manager: %s",
len(available_goals),
", ".join(g.name for g in available_goals),
)
async def _user_said(self, text: str):
"""
Create a belief for the user's full speech.
:param text: User's transcribed text.
"""
belief_msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=BeliefMessage(
replace=[InternalBelief(name="user_said", arguments=[text])],
).model_dump_json(),
thread="beliefs",
)
await self.send(belief_msg)
self.logger.info("Sent %d beliefs to the belief collector.", len(belief["beliefs"]))
async def _infer_new_beliefs(self):
conversation_beliefs = await self.belief_inferrer.infer_from_conversation(self.conversation)
new_beliefs = conversation_beliefs - self._current_beliefs
if not new_beliefs:
self.logger.debug("No new beliefs detected.")
return
self._current_beliefs |= new_beliefs
belief_changes = BeliefMessage(
create=list(new_beliefs.true),
delete=list(new_beliefs.false),
)
message = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=belief_changes.model_dump_json(),
thread="beliefs",
)
await self.send(message)
async def _infer_goal_completions(self):
goal_completions = await self.goal_inferrer.infer_from_conversation(self.conversation)
new_achieved = [
InternalBelief(name=goal, arguments=None)
for goal, achieved in goal_completions.items()
if achieved and self._current_goal_completions.get(goal) != achieved
]
new_not_achieved = [
InternalBelief(name=goal, arguments=None)
for goal, achieved in goal_completions.items()
if not achieved and self._current_goal_completions.get(goal) != achieved
]
for goal, achieved in goal_completions.items():
self._current_goal_completions[goal] = achieved
if not new_achieved and not new_not_achieved:
self.logger.debug("No goal achievement changes detected.")
return
belief_changes = BeliefMessage(
create=new_achieved,
delete=new_not_achieved,
)
message = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=belief_changes.model_dump_json(),
thread="beliefs",
)
await self.send(message)
class LLM:
"""
Class that handles sending structured generation requests to an LLM.
"""
def __init__(self, agent: "TextBeliefExtractorAgent", n_parallel: int):
self._agent = agent
self._semaphore = asyncio.Semaphore(n_parallel)
async def query(self, prompt: str, schema: dict, tries: int = 3) -> JSONLike | None:
"""
Query the LLM with the given prompt and schema, return an instance of a dict conforming
to this schema. Try ``tries`` times, or return None.
:param prompt: Prompt to be queried.
:param schema: Schema to be queried.
:param tries: Number of times to try to query the LLM.
:return: An instance of a dict conforming to this schema, or None if failed.
"""
try_count = 0
while try_count < tries:
try_count += 1
try:
return await self._query_llm(prompt, schema)
except (httpx.HTTPError, json.JSONDecodeError, KeyError) as e:
if try_count < tries:
continue
self._agent.logger.exception(
"Failed to get LLM response after %d tries.",
try_count,
exc_info=e,
)
return None
async def _query_llm(self, prompt: str, schema: dict) -> JSONLike:
"""
Query an LLM with the given prompt and schema, return an instance of a dict conforming
to that schema.
:param prompt: The prompt to be queried.
:param schema: Schema to use during response.
:return: A dict conforming to this schema.
:raises httpx.HTTPStatusError: If the LLM server responded with an error.
:raises json.JSONDecodeError: If the LLM response was not valid JSON. May happen if the
response was cut off early due to length limitations.
:raises KeyError: If the LLM server responded with no error, but the response was
invalid.
"""
async with self._semaphore:
async with httpx.AsyncClient() as client:
response = await client.post(
settings.llm_settings.local_llm_url,
json={
"model": settings.llm_settings.local_llm_model,
"messages": [{"role": "user", "content": prompt}],
"response_format": {
"type": "json_schema",
"json_schema": {
"name": "Beliefs",
"strict": True,
"schema": schema,
},
},
"reasoning_effort": "low",
"temperature": settings.llm_settings.code_temperature,
"stream": False,
},
timeout=30.0,
)
response.raise_for_status()
response_json = response.json()
json_message = response_json["choices"][0]["message"]["content"]
return json.loads(json_message)
class SemanticBeliefInferrer:
"""
Class that handles only prompting an LLM for semantic beliefs.
"""
def __init__(
self,
llm: "TextBeliefExtractorAgent.LLM",
available_beliefs: list[SemanticBelief] | None = None,
):
self._llm = llm
self.available_beliefs: list[SemanticBelief] = available_beliefs or []
async def infer_from_conversation(self, conversation: ChatHistory) -> BeliefState:
"""
Process conversation history to extract beliefs, semantically. The result is an object that
describes all beliefs that hold or don't hold based on the full conversation.
:param conversation: The conversation history to be processed.
:return: An object that describes beliefs.
"""
# Return instantly if there are no beliefs to infer
if not self.available_beliefs:
return BeliefState()
n_parallel = max(1, min(settings.llm_settings.n_parallel - 1, len(self.available_beliefs)))
all_beliefs: list[dict[str, bool | None] | None] = await asyncio.gather(
*[
self._infer_beliefs(conversation, beliefs)
for beliefs in self._split_into_chunks(self.available_beliefs, n_parallel)
]
)
retval = BeliefState()
for beliefs in all_beliefs:
if beliefs is None:
continue
for belief_name, belief_holds in beliefs.items():
if belief_holds is None:
continue
belief = InternalBelief(name=belief_name, arguments=None)
if belief_holds:
retval.true.add(belief)
else:
retval.false.add(belief)
return retval
@staticmethod
def _split_into_chunks[T](items: list[T], n: int) -> list[list[T]]:
"""
Split a list into ``n`` chunks, making each chunk approximately ``len(items) / n`` long.
:param items: The list of items to split.
:param n: The number of desired chunks.
:return: A list of chunks each approximately ``len(items) / n`` long.
"""
k, m = divmod(len(items), n)
return [items[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n)]
async def _infer_beliefs(
self,
conversation: ChatHistory,
beliefs: list[SemanticBelief],
) -> dict[str, bool | None] | None:
"""
Infer given beliefs based on the given conversation.
:param conversation: The conversation to infer beliefs from.
:param beliefs: The beliefs to infer.
:return: A dict containing belief names and a boolean whether they hold, or None if the
belief cannot be inferred based on the given conversation.
"""
example = {
"example_belief": True,
}
prompt = f"""{self._format_conversation(conversation)}
Given the above conversation, what beliefs can be inferred?
If there is no relevant information about a belief belief, give null.
In case messages conflict, prefer using the most recent messages for inference.
Choose from the following list of beliefs, formatted as `- <belief_name>: <description>`:
{self._format_beliefs(beliefs)}
Respond with a JSON similar to the following, but with the property names as given above:
{json.dumps(example, indent=2)}
"""
schema = self._create_beliefs_schema(beliefs)
return await self._llm.query(prompt, schema)
@staticmethod
def _create_belief_schema(belief: SemanticBelief) -> tuple[str, dict]:
return AgentSpeakGenerator.slugify(belief), {
"type": ["boolean", "null"],
"description": belief.description,
}
@staticmethod
def _create_beliefs_schema(beliefs: list[SemanticBelief]) -> dict:
belief_schemas = [
SemanticBeliefInferrer._create_belief_schema(belief) for belief in beliefs
]
return {
"type": "object",
"properties": dict(belief_schemas),
"required": [name for name, _ in belief_schemas],
}
@staticmethod
def _format_message(message: ChatMessage):
return f"{message.role.upper()}:\n{message.content}"
@staticmethod
def _format_conversation(conversation: ChatHistory):
return "\n\n".join(
[SemanticBeliefInferrer._format_message(message) for message in conversation.messages]
)
@staticmethod
def _format_beliefs(beliefs: list[SemanticBelief]):
return "\n".join(
[f"- {AgentSpeakGenerator.slugify(belief)}: {belief.description}" for belief in beliefs]
)
class GoalAchievementInferrer(SemanticBeliefInferrer):
def __init__(self, llm: TextBeliefExtractorAgent.LLM):
super().__init__(llm)
self.goals = []
async def infer_from_conversation(self, conversation: ChatHistory) -> dict[str, bool]:
"""
Determine which goals have been achieved based on the given conversation.
:param conversation: The conversation to infer goal completion from.
:return: A mapping of goals and a boolean whether they have been achieved.
"""
if not self.goals:
return {}
goals_achieved = await asyncio.gather(
*[self._infer_goal(conversation, g) for g in self.goals]
)
return {
f"achieved_{AgentSpeakGenerator.slugify(goal)}": achieved
for goal, achieved in zip(self.goals, goals_achieved, strict=True)
}
async def _infer_goal(self, conversation: ChatHistory, goal: Goal) -> bool:
prompt = f"""{self._format_conversation(conversation)}
Given the above conversation, what has the following goal been achieved?
The name of the goal: {goal.name}
Description of the goal: {goal.description}
Answer with literally only `true` or `false` (without backticks)."""
schema = {
"type": "boolean",
}
return await self._llm.query(prompt, schema)

View File

@@ -248,7 +248,8 @@ class RICommunicationAgent(BaseAgent):
self._req_socket.recv_json(), timeout=seconds_to_wait_total / 2
)
self.logger.debug(f'Received message "{message}" from RI.')
if "endpoint" in message and message["endpoint"] != "ping":
self.logger.debug(f'Received message "{message}" from RI.')
if "endpoint" not in message:
self.logger.warning("No received endpoint in message, expected ping endpoint.")
continue

View File

@@ -46,18 +46,23 @@ class LLMAgent(BaseAgent):
:param msg: The received internal message.
"""
if msg.sender == settings.agent_settings.bdi_core_name:
self.logger.debug("Processing message from BDI core.")
try:
prompt_message = LLMPromptMessage.model_validate_json(msg.body)
await self._process_bdi_message(prompt_message)
except ValidationError:
self.logger.debug("Prompt message from BDI core is invalid.")
match msg.thread:
case "prompt_message":
try:
prompt_message = LLMPromptMessage.model_validate_json(msg.body)
await self._process_bdi_message(prompt_message)
except ValidationError:
self.logger.debug("Prompt message from BDI core is invalid.")
case "assistant_message":
self.history.append({"role": "assistant", "content": msg.body})
case "user_message":
self.history.append({"role": "user", "content": msg.body})
elif msg.sender == settings.agent_settings.bdi_program_manager_name:
if msg.body == "clear_history":
self.logger.debug("Clearing conversation history.")
self.history.clear()
else:
self.logger.debug("Message ignored (not from BDI core.")
self.logger.debug("Message ignored.")
async def _process_bdi_message(self, message: LLMPromptMessage):
"""
@@ -68,11 +73,12 @@ class LLMAgent(BaseAgent):
:param message: The parsed prompt message containing text, norms, and goals.
"""
full_message = ""
async for chunk in self._query_llm(message.text, message.norms, message.goals):
await self._send_reply(chunk)
self.logger.debug(
"Finished processing BDI message. Response sent in chunks to BDI core."
)
full_message += chunk
self.logger.debug("Finished processing BDI message. Response sent in chunks to BDI core.")
await self._send_full_reply(full_message)
async def _send_reply(self, msg: str):
"""
@@ -87,6 +93,19 @@ class LLMAgent(BaseAgent):
)
await self.send(reply)
async def _send_full_reply(self, msg: str):
"""
Sends a response message (full) to agents that need it.
:param msg: The text content of the message.
"""
message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=self.name,
body=msg,
)
await self.send(message)
async def _query_llm(
self, prompt: str, norms: list[str], goals: list[str]
) -> AsyncGenerator[str]:
@@ -104,13 +123,6 @@ class LLMAgent(BaseAgent):
:param goals: Goals the LLM should achieve.
:yield: Fragments of the LLM-generated content (e.g., sentences/phrases).
"""
self.history.append(
{
"role": "user",
"content": prompt,
}
)
instructions = LLMInstructions(norms if norms else None, goals if goals else None)
messages = [
{
@@ -176,7 +188,7 @@ class LLMAgent(BaseAgent):
json={
"model": settings.llm_settings.local_llm_model,
"messages": messages,
"temperature": 0.3,
"temperature": settings.llm_settings.chat_temperature,
"stream": True,
},
) as response:

View File

@@ -229,10 +229,11 @@ class VADAgent(BaseAgent):
assert self.model is not None
prob = self.model(torch.from_numpy(chunk), settings.vad_settings.sample_rate_hz).item()
non_speech_patience = settings.behaviour_settings.vad_non_speech_patience_chunks
begin_silence_length = settings.behaviour_settings.vad_begin_silence_chunks
prob_threshold = settings.behaviour_settings.vad_prob_threshold
if prob > prob_threshold:
if self.i_since_speech > non_speech_patience:
if self.i_since_speech > non_speech_patience + begin_silence_length:
self.logger.debug("Speech started.")
self.audio_buffer = np.append(self.audio_buffer, chunk)
self.i_since_speech = 0
@@ -246,11 +247,12 @@ class VADAgent(BaseAgent):
continue
# Speech probably ended. Make sure we have a usable amount of data.
if len(self.audio_buffer) >= 3 * len(chunk):
if len(self.audio_buffer) > begin_silence_length * len(chunk):
self.logger.debug("Speech ended.")
assert self.audio_out_socket is not None
await self.audio_out_socket.send(self.audio_buffer[: -2 * len(chunk)].tobytes())
# At this point, we know that the speech has ended.
# Prepend the last chunk that had no speech, for a more fluent boundary
self.audio_buffer = chunk
# At this point, we know that there is no speech.
# Prepend the last few chunks that had no speech, for a more fluent boundary.
self.audio_buffer = np.append(self.audio_buffer, chunk)
self.audio_buffer = self.audio_buffer[-begin_silence_length * len(chunk) :]

View File

@@ -131,6 +131,7 @@ class BaseAgent(ABC):
:param message: The message to send.
"""
target = AgentDirectory.get(message.to)
message.sender = self.name
if target:
await target.inbox.put(message)
self.logger.debug(f"Sent message {message.body} to {message.to} via regular inbox.")
@@ -192,7 +193,16 @@ class BaseAgent(ABC):
:param coro: The coroutine to execute as a task.
"""
task = asyncio.create_task(coro)
async def try_coro(coro_: Coroutine):
try:
await coro_
except asyncio.CancelledError:
self.logger.debug("A behavior was canceled successfully: %s", coro_)
except Exception:
self.logger.warning("An exception occurred in a behavior.", exc_info=True)
task = asyncio.create_task(try_coro(coro))
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
return task

View File

@@ -73,10 +73,12 @@ class BehaviourSettings(BaseModel):
:ivar vad_prob_threshold: Probability threshold for Voice Activity Detection.
:ivar vad_initial_since_speech: Initial value for 'since speech' counter in VAD.
:ivar vad_non_speech_patience_chunks: Number of non-speech chunks to wait before speech ended.
:ivar vad_begin_silence_chunks: The number of chunks of silence to prepend to speech chunks.
:ivar transcription_max_concurrent_tasks: Maximum number of concurrent transcription tasks.
:ivar transcription_words_per_minute: Estimated words per minute for transcription timing.
:ivar transcription_words_per_token: Estimated words per token for transcription timing.
:ivar transcription_token_buffer: Buffer for transcription tokens.
:ivar conversation_history_length_limit: The maximum amount of messages to extract beliefs from.
"""
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
@@ -88,7 +90,8 @@ class BehaviourSettings(BaseModel):
# VAD settings
vad_prob_threshold: float = 0.5
vad_initial_since_speech: int = 100
vad_non_speech_patience_chunks: int = 3
vad_non_speech_patience_chunks: int = 15
vad_begin_silence_chunks: int = 6
# transcription behaviour
transcription_max_concurrent_tasks: int = 3
@@ -96,6 +99,9 @@ class BehaviourSettings(BaseModel):
transcription_words_per_token: float = 0.75 # (3 words = 4 tokens)
transcription_token_buffer: int = 10
# Text belief extractor settings
conversation_history_length_limit: int = 10
class LLMSettings(BaseModel):
"""
@@ -103,12 +109,19 @@ class LLMSettings(BaseModel):
:ivar local_llm_url: URL for the local LLM API.
:ivar local_llm_model: Name of the local LLM model to use.
:ivar chat_temperature: The temperature to use while generating chat responses.
:ivar code_temperature: The temperature to use while generating code-like responses like during
belief inference.
:ivar n_parallel: The number of parallel calls allowed to be made to the LLM.
"""
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
local_llm_url: str = "http://localhost:1234/v1/chat/completions"
local_llm_model: str = "gpt-oss"
chat_temperature: float = 1.0
code_temperature: float = 0.3
n_parallel: int = 4
class VADSettings(BaseModel):

View File

@@ -120,7 +120,6 @@ async def lifespan(app: FastAPI):
BDICoreAgent,
{
"name": settings.agent_settings.bdi_core_name,
"asl": "src/control_backend/agents/bdi/rules.asl",
},
),
"BeliefCollectorAgent": (

View File

@@ -0,0 +1,19 @@
from pydantic import BaseModel
from control_backend.schemas.program import Belief as ProgramBelief
from control_backend.schemas.program import Goal
class BeliefList(BaseModel):
"""
Represents a list of beliefs, separated from a program. Useful in agents which need to
communicate beliefs.
:ivar: beliefs: The list of beliefs.
"""
beliefs: list[ProgramBelief]
class GoalList(BaseModel):
goals: list[Goal]

View File

@@ -6,18 +6,30 @@ class Belief(BaseModel):
Represents a single belief in the BDI system.
:ivar name: The functor or name of the belief (e.g., 'user_said').
:ivar arguments: A list of string arguments for the belief.
:ivar replace: If True, existing beliefs with this name should be replaced by this one.
:ivar arguments: A list of string arguments for the belief, or None if the belief has no
arguments.
"""
name: str
arguments: list[str]
replace: bool = False
arguments: list[str] | None
# To make it hashable
model_config = {"frozen": True}
class BeliefMessage(BaseModel):
"""
A container for transporting a list of beliefs between agents.
A container for communicating beliefs between agents.
:ivar create: Beliefs to create.
:ivar delete: Beliefs to delete.
:ivar replace: Beliefs to replace. Deletes all beliefs with the same name, replacing them with
one new belief.
"""
beliefs: list[Belief]
create: list[Belief] = []
delete: list[Belief] = []
replace: list[Belief] = []
def has_values(self) -> bool:
return len(self.create) > 0 or len(self.delete) > 0 or len(self.replace) > 0

View File

@@ -0,0 +1,10 @@
from pydantic import BaseModel
class ChatMessage(BaseModel):
role: str
content: str
class ChatHistory(BaseModel):
messages: list[ChatMessage]

View File

@@ -12,6 +12,6 @@ class InternalMessage(BaseModel):
"""
to: str
sender: str
sender: str | None = None
body: str
thread: str | None = None

View File

@@ -1,64 +1,202 @@
from pydantic import BaseModel
from enum import Enum
from typing import Literal
from pydantic import UUID4, BaseModel
class Norm(BaseModel):
class ProgramElement(BaseModel):
"""
Represents a behavioral norm.
Represents a basic element of our behavior program.
:ivar name: The researcher-assigned name of the element.
:ivar id: Unique identifier.
:ivar label: Human-readable label.
:ivar norm: The actual norm text describing the behavior.
"""
id: str
label: str
norm: str
name: str
id: UUID4
class Goal(BaseModel):
class LogicalOperator(Enum):
AND = "AND"
OR = "OR"
type Belief = KeywordBelief | SemanticBelief | InferredBelief
type BasicBelief = KeywordBelief | SemanticBelief
class KeywordBelief(ProgramElement):
"""
Represents an objective to be achieved.
Represents a belief that is set when the user spoken text contains a certain keyword.
:ivar id: Unique identifier.
:ivar label: Human-readable label.
:ivar description: Detailed description of the goal.
:ivar achieved: Status flag indicating if the goal has been met.
:ivar keyword: The keyword on which this belief gets set.
"""
id: str
label: str
description: str
achieved: bool
class TriggerKeyword(BaseModel):
id: str
name: str = ""
keyword: str
class KeywordTrigger(BaseModel):
id: str
label: str
type: str
keywords: list[TriggerKeyword]
class SemanticBelief(ProgramElement):
"""
Represents a belief that is set by semantic LLM validation.
:ivar description: Description of how to form the belief, used by the LLM.
"""
description: str
class Phase(BaseModel):
class InferredBelief(ProgramElement):
"""
Represents a belief that gets formed by combining two beliefs with a logical AND or OR.
These beliefs can also be :class:`InferredBelief`, leading to arbitrarily deep nesting.
:ivar operator: The logical operator to apply.
:ivar left: The left part of the logical expression.
:ivar right: The right part of the logical expression.
"""
name: str = ""
operator: LogicalOperator
left: Belief
right: Belief
class Norm(ProgramElement):
name: str = ""
norm: str
critical: bool = False
class BasicNorm(Norm):
"""
Represents a behavioral norm.
:ivar norm: The actual norm text describing the behavior.
:ivar critical: When true, this norm should absolutely not be violated (checked separately).
"""
pass
class ConditionalNorm(Norm):
"""
Represents a norm that is only active when a condition is met (i.e., a certain belief holds).
:ivar condition: When to activate this norm.
"""
condition: Belief
type PlanElement = Goal | Action
class Plan(ProgramElement):
"""
Represents a list of steps to execute. Each of these steps can be a goal (with its own plan)
or a simple action.
:ivar steps: The actions or subgoals to execute, in order.
"""
name: str = ""
steps: list[PlanElement]
class Goal(ProgramElement):
"""
Represents an objective to be achieved. To reach the goal, we should execute
the corresponding plan. If we can fail to achieve a goal after executing the plan,
for example when the achieving of the goal is dependent on the user's reply, this means
that the achieved status will be set from somewhere else in the program.
:ivar description: A description of the goal, used to determine if it has been achieved.
:ivar plan: The plan to execute.
:ivar can_fail: Whether we can fail to achieve the goal after executing the plan.
"""
description: str = ""
plan: Plan
can_fail: bool = True
type Action = SpeechAction | GestureAction | LLMAction
class SpeechAction(ProgramElement):
"""
Represents the action of the robot speaking a literal text.
:ivar text: The text to speak.
"""
name: str = ""
text: str
class Gesture(BaseModel):
"""
Represents a gesture to be performed. Can be either a single gesture,
or a random gesture from a category (tag).
:ivar type: The type of the gesture, "tag" or "single".
:ivar name: The name of the single gesture or tag.
"""
type: Literal["tag", "single"]
name: str
class GestureAction(ProgramElement):
"""
Represents the action of the robot performing a gesture.
:ivar gesture: The gesture to perform.
"""
name: str = ""
gesture: Gesture
class LLMAction(ProgramElement):
"""
Represents the action of letting an LLM generate a reply based on its chat history
and an additional goal added in the prompt.
:ivar goal: The extra (temporary) goal to add to the LLM.
"""
name: str = ""
goal: str
class Trigger(ProgramElement):
"""
Represents a belief-based trigger. When a belief is set, the corresponding plan is executed.
:ivar condition: When to activate the trigger.
:ivar plan: The plan to execute.
"""
condition: Belief
plan: Plan
class Phase(ProgramElement):
"""
A distinct phase within a program, containing norms, goals, and triggers.
:ivar id: Unique identifier.
:ivar label: Human-readable label.
:ivar norms: List of norms active in this phase.
:ivar goals: List of goals to pursue in this phase.
:ivar triggers: List of triggers that define transitions out of this phase.
"""
id: str
label: str
norms: list[Norm]
name: str = ""
norms: list[BasicNorm | ConditionalNorm]
goals: list[Goal]
triggers: list[KeywordTrigger]
triggers: list[Trigger]
class Program(BaseModel):

View File

@@ -20,7 +20,7 @@ def mock_agentspeak_env():
@pytest.fixture
def agent():
agent = BDICoreAgent("bdi_agent", "dummy.asl")
agent = BDICoreAgent("bdi_agent")
agent.send = AsyncMock()
agent.bdi_agent = MagicMock()
return agent
@@ -51,7 +51,7 @@ async def test_handle_belief_collector_message(agent, mock_settings):
msg = InternalMessage(
to="bdi_agent",
sender=mock_settings.agent_settings.bdi_belief_collector_name,
body=BeliefMessage(beliefs=beliefs).model_dump_json(),
body=BeliefMessage(create=beliefs).model_dump_json(),
thread="beliefs",
)
@@ -64,6 +64,26 @@ async def test_handle_belief_collector_message(agent, mock_settings):
assert args[2] == agentspeak.Literal("user_said", (agentspeak.Literal("Hello"),))
@pytest.mark.asyncio
async def test_handle_delete_belief_message(agent, mock_settings):
"""Test that incoming beliefs to be deleted are removed from the BDI agent"""
beliefs = [Belief(name="user_said", arguments=["Hello"])]
msg = InternalMessage(
to="bdi_agent",
sender=mock_settings.agent_settings.bdi_belief_collector_name,
body=BeliefMessage(delete=beliefs).model_dump_json(),
thread="beliefs",
)
await agent.handle_message(msg)
# Expect bdi_agent.call to be triggered to remove belief
args = agent.bdi_agent.call.call_args.args
assert args[0] == agentspeak.Trigger.removal
assert args[1] == agentspeak.GoalType.belief
assert args[2] == agentspeak.Literal("user_said", (agentspeak.Literal("Hello"),))
@pytest.mark.asyncio
async def test_incorrect_belief_collector_message(agent, mock_settings):
"""Test that incorrect message format triggers an exception."""
@@ -113,14 +133,14 @@ async def test_custom_actions(agent):
# Invoke action
mock_term = MagicMock()
mock_term.args = ["Hello", "Norm", "Goal"]
mock_term.args = ["Hello", "Norm"]
mock_intention = MagicMock()
# Run generator
gen = action_fn(agent, mock_term, mock_intention)
next(gen) # Execute
agent._send_to_llm.assert_called_with("Hello", "Norm", "Goal")
agent._send_to_llm.assert_called_with("Hello", "Norm", "")
def test_add_belief_sets_event(agent):
@@ -128,7 +148,8 @@ def test_add_belief_sets_event(agent):
agent._wake_bdi_loop = MagicMock()
belief = Belief(name="test_belief", arguments=["a", "b"])
agent._apply_beliefs([belief])
belief_changes = BeliefMessage(replace=[belief])
agent._apply_belief_changes(belief_changes)
assert agent.bdi_agent.call.called
agent._wake_bdi_loop.set.assert_called()
@@ -137,7 +158,7 @@ def test_add_belief_sets_event(agent):
def test_apply_beliefs_empty_returns(agent):
"""Line: if not beliefs: return"""
agent._wake_bdi_loop = MagicMock()
agent._apply_beliefs([])
agent._apply_belief_changes(BeliefMessage())
agent.bdi_agent.call.assert_not_called()
agent._wake_bdi_loop.set.assert_not_called()
@@ -220,8 +241,9 @@ def test_replace_belief_calls_remove_all(agent):
agent._remove_all_with_name = MagicMock()
agent._wake_bdi_loop = MagicMock()
belief = Belief(name="user_said", arguments=["Hello"], replace=True)
agent._apply_beliefs([belief])
belief = Belief(name="user_said", arguments=["Hello"])
belief_changes = BeliefMessage(replace=[belief])
agent._apply_belief_changes(belief_changes)
agent._remove_all_with_name.assert_called_with("user_said")

View File

@@ -1,6 +1,6 @@
import asyncio
import json
import sys
import uuid
from unittest.mock import AsyncMock
import pytest
@@ -8,38 +8,54 @@ import pytest
from control_backend.agents.bdi.bdi_program_manager import BDIProgramManager
from control_backend.core.agent_system import InternalMessage
from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.program import Program
from control_backend.schemas.program import BasicNorm, Goal, Phase, Plan, Program
# Fix Windows Proactor loop for zmq
if sys.platform.startswith("win"):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
def make_valid_program_json(norm="N1", goal="G1"):
return json.dumps(
{
"phases": [
{
"id": "phase1",
"label": "Phase 1",
"triggers": [],
"norms": [{"id": "n1", "label": "Norm 1", "norm": norm}],
"goals": [
{"id": "g1", "label": "Goal 1", "description": goal, "achieved": False}
],
}
]
}
)
def make_valid_program_json(norm="N1", goal="G1") -> str:
return Program(
phases=[
Phase(
id=uuid.uuid4(),
name="Basic Phase",
norms=[
BasicNorm(
id=uuid.uuid4(),
name=norm,
norm=norm,
),
],
goals=[
Goal(
id=uuid.uuid4(),
name=goal,
description="This description can be used to determine whether the goal "
"has been achieved.",
plan=Plan(
id=uuid.uuid4(),
name="Goal Plan",
steps=[],
),
can_fail=False,
),
],
triggers=[],
),
],
).model_dump_json()
@pytest.mark.skip(reason="Functionality being rebuilt.")
@pytest.mark.asyncio
async def test_send_to_bdi():
manager = BDIProgramManager(name="program_manager_test")
manager.send = AsyncMock()
program = Program.model_validate_json(make_valid_program_json())
await manager._send_to_bdi(program)
await manager._create_agentspeak_and_send_to_bdi(program)
assert manager.send.await_count == 1
msg: InternalMessage = manager.send.await_args[0][0]
@@ -61,8 +77,9 @@ async def test_receive_programs_valid_and_invalid():
]
manager = BDIProgramManager(name="program_manager_test")
manager._internal_pub_socket = AsyncMock()
manager.sub_socket = sub
manager._send_to_bdi = AsyncMock()
manager._create_agentspeak_and_send_to_bdi = AsyncMock()
manager._send_clear_llm_history = AsyncMock()
try:
@@ -72,10 +89,10 @@ async def test_receive_programs_valid_and_invalid():
pass
# Only valid Program should have triggered _send_to_bdi
assert manager._send_to_bdi.await_count == 1
forwarded: Program = manager._send_to_bdi.await_args[0][0]
assert forwarded.phases[0].norms[0].norm == "N1"
assert forwarded.phases[0].goals[0].description == "G1"
assert manager._create_agentspeak_and_send_to_bdi.await_count == 1
forwarded: Program = manager._create_agentspeak_and_send_to_bdi.await_args[0][0]
assert forwarded.phases[0].norms[0].name == "N1"
assert forwarded.phases[0].goals[0].name == "G1"
# Verify history clear was triggered
assert manager._send_clear_llm_history.await_count == 1
@@ -91,8 +108,8 @@ async def test_send_clear_llm_history(mock_settings):
await manager._send_clear_llm_history()
assert manager.send.await_count == 1
msg: InternalMessage = manager.send.await_args[0][0]
assert manager.send.await_count == 2
msg: InternalMessage = manager.send.await_args_list[0][0][0]
# Verify the content and recipient
assert msg.body == "clear_history"

View File

@@ -86,7 +86,7 @@ async def test_send_beliefs_to_bdi(agent):
sent: InternalMessage = agent.send.call_args.args[0]
assert sent.to == settings.agent_settings.bdi_core_name
assert sent.thread == "beliefs"
assert json.loads(sent.body)["beliefs"] == [belief.model_dump() for belief in beliefs]
assert json.loads(sent.body)["create"] == [belief.model_dump() for belief in beliefs]
@pytest.mark.asyncio

View File

@@ -0,0 +1,376 @@
import json
import uuid
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from control_backend.agents.bdi import TextBeliefExtractorAgent
from control_backend.agents.bdi.text_belief_extractor_agent import BeliefState
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList
from control_backend.schemas.belief_message import Belief as InternalBelief
from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.chat_history import ChatHistory, ChatMessage
from control_backend.schemas.program import (
ConditionalNorm,
KeywordBelief,
LLMAction,
Phase,
Plan,
Program,
SemanticBelief,
Trigger,
)
@pytest.fixture
def llm():
llm = TextBeliefExtractorAgent.LLM(MagicMock(), 4)
llm._query_llm = AsyncMock()
return llm
@pytest.fixture
def agent(llm):
with patch(
"control_backend.agents.bdi.text_belief_extractor_agent.TextBeliefExtractorAgent.LLM",
return_value=llm,
):
agent = TextBeliefExtractorAgent("text_belief_agent")
agent.send = AsyncMock()
return agent
@pytest.fixture
def sample_program():
return Program(
phases=[
Phase(
name="Some phase",
id=uuid.uuid4(),
norms=[
ConditionalNorm(
name="Some norm",
id=uuid.uuid4(),
norm="Use nautical terms.",
critical=False,
condition=SemanticBelief(
name="is_pirate",
id=uuid.uuid4(),
description="The user is a pirate. Perhaps because they say "
"they are, or because they speak like a pirate "
'with terms like "arr".',
),
),
],
goals=[],
triggers=[
Trigger(
name="Some trigger",
id=uuid.uuid4(),
condition=SemanticBelief(
name="no_more_booze",
id=uuid.uuid4(),
description="There is no more alcohol.",
),
plan=Plan(
name="Some plan",
id=uuid.uuid4(),
steps=[
LLMAction(
name="Some action",
id=uuid.uuid4(),
goal="Suggest eating chocolate instead.",
),
],
),
),
],
),
],
)
def make_msg(sender: str, body: str, thread: str | None = None) -> InternalMessage:
return InternalMessage(to="unused", sender=sender, body=body, thread=thread)
@pytest.mark.asyncio
async def test_handle_message_ignores_other_agents(agent):
msg = make_msg("unknown", "some data", None)
await agent.handle_message(msg)
agent.send.assert_not_called() # noqa # `agent.send` has no such property, but we mock it.
@pytest.mark.asyncio
async def test_handle_message_from_transcriber(agent, mock_settings):
transcription = "hello world"
msg = make_msg(mock_settings.agent_settings.transcription_name, transcription, None)
await agent.handle_message(msg)
agent.send.assert_awaited_once() # noqa # `agent.send` has no such property, but we mock it.
sent: InternalMessage = agent.send.call_args.args[0] # noqa
assert sent.to == mock_settings.agent_settings.bdi_core_name
assert sent.thread == "beliefs"
parsed = BeliefMessage.model_validate_json(sent.body)
replaced_last = parsed.replace.pop()
assert replaced_last.name == "user_said"
assert replaced_last.arguments == [transcription]
@pytest.mark.asyncio
async def test_query_llm():
mock_response = MagicMock()
mock_response.json.return_value = {
"choices": [
{
"message": {
"content": "null",
}
}
]
}
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
mock_async_client = MagicMock()
mock_async_client.__aenter__.return_value = mock_client
mock_async_client.__aexit__.return_value = None
with patch(
"control_backend.agents.bdi.text_belief_extractor_agent.httpx.AsyncClient",
return_value=mock_async_client,
):
llm = TextBeliefExtractorAgent.LLM(MagicMock(), 4)
res = await llm._query_llm("hello world", {"type": "null"})
# Response content was set as "null", so should be deserialized as None
assert res is None
@pytest.mark.asyncio
async def test_retry_query_llm_success(llm):
llm._query_llm.return_value = None
res = await llm.query("hello world", {"type": "null"})
llm._query_llm.assert_called_once()
assert res is None
@pytest.mark.asyncio
async def test_retry_query_llm_success_after_failure(llm):
llm._query_llm.side_effect = [KeyError(), "real value"]
res = await llm.query("hello world", {"type": "string"})
assert llm._query_llm.call_count == 2
assert res == "real value"
@pytest.mark.asyncio
async def test_retry_query_llm_failures(llm):
llm._query_llm.side_effect = [KeyError(), KeyError(), KeyError(), "real value"]
res = await llm.query("hello world", {"type": "string"})
assert llm._query_llm.call_count == 3
assert res is None
@pytest.mark.asyncio
async def test_retry_query_llm_fail_immediately(llm):
llm._query_llm.side_effect = [KeyError(), "real value"]
res = await llm.query("hello world", {"type": "string"}, tries=1)
assert llm._query_llm.call_count == 1
assert res is None
@pytest.mark.asyncio
async def test_extracting_semantic_beliefs(agent):
"""
The Program Manager sends beliefs to this agent. Test whether the agent handles them correctly.
"""
assert len(agent.belief_inferrer.available_beliefs) == 0
beliefs = BeliefList(
beliefs=[
KeywordBelief(
id=uuid.uuid4(),
name="keyword_hello",
keyword="hello",
),
SemanticBelief(
id=uuid.uuid4(), name="semantic_hello_1", description="Some semantic belief 1"
),
SemanticBelief(
id=uuid.uuid4(), name="semantic_hello_2", description="Some semantic belief 2"
),
]
)
await agent.handle_message(
InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.bdi_program_manager_name,
body=beliefs.model_dump_json(),
thread="beliefs",
),
)
assert len(agent.belief_inferrer.available_beliefs) == 2
@pytest.mark.asyncio
async def test_handle_invalid_beliefs(agent, sample_program):
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
assert len(agent.belief_inferrer.available_beliefs) == 2
await agent.handle_message(
InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.bdi_program_manager_name,
body=json.dumps({"phases": "Invalid"}),
thread="beliefs",
),
)
assert len(agent.belief_inferrer.available_beliefs) == 2
@pytest.mark.asyncio
async def test_handle_robot_response(agent):
initial_length = len(agent.conversation.messages)
response = "Hi, I'm Pepper. What's your name?"
await agent.handle_message(
InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.llm_name,
body=response,
),
)
assert len(agent.conversation.messages) == initial_length + 1
assert agent.conversation.messages[-1].role == "assistant"
assert agent.conversation.messages[-1].content == response
@pytest.mark.asyncio
async def test_simulated_real_turn_with_beliefs(agent, llm, sample_program):
"""Test sending user message to extract beliefs from."""
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
# Send a user message with the belief that there's no more booze
llm._query_llm.return_value = {"is_pirate": None, "no_more_booze": True}
assert len(agent.conversation.messages) == 0
await agent.handle_message(
InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.transcription_name,
body="We're all out of schnaps.",
),
)
assert len(agent.conversation.messages) == 1
# There should be a belief set and sent to the BDI core, as well as the user_said belief
assert agent.send.call_count == 2
# First should be the beliefs message
message: InternalMessage = agent.send.call_args_list[1].args[0]
beliefs = BeliefMessage.model_validate_json(message.body)
assert len(beliefs.create) == 1
assert beliefs.create[0].name == "no_more_booze"
@pytest.mark.asyncio
async def test_simulated_real_turn_no_beliefs(agent, llm, sample_program):
"""Test a user message to extract beliefs from, but no beliefs are formed."""
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
# Send a user message with no new beliefs
llm._query_llm.return_value = {"is_pirate": None, "no_more_booze": None}
await agent.handle_message(
InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.transcription_name,
body="Hello there!",
),
)
# Only the user_said belief should've been sent
agent.send.assert_called_once()
@pytest.mark.asyncio
async def test_simulated_real_turn_no_new_beliefs(agent, llm, sample_program):
"""
Test a user message to extract beliefs from, but no new beliefs are formed because they already
existed.
"""
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
agent._current_beliefs = BeliefState(true={InternalBelief(name="is_pirate", arguments=None)})
# Send a user message with the belief the user is a pirate, still
llm._query_llm.return_value = {"is_pirate": True, "no_more_booze": None}
await agent.handle_message(
InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.transcription_name,
body="Arr, nice to meet you, matey.",
),
)
# Only the user_said belief should've been sent, as no beliefs have changed
agent.send.assert_called_once()
@pytest.mark.asyncio
async def test_simulated_real_turn_remove_belief(agent, llm, sample_program):
"""
Test a user message to extract beliefs from, but an existing belief is determined no longer to
hold.
"""
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
agent._current_beliefs = BeliefState(
true={InternalBelief(name="no_more_booze", arguments=None)},
)
# Send a user message with the belief the user is a pirate, still
llm._query_llm.return_value = {"is_pirate": None, "no_more_booze": False}
await agent.handle_message(
InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.transcription_name,
body="I found an untouched barrel of wine!",
),
)
# Both user_said and belief change should've been sent
assert agent.send.call_count == 2
# Agent's current beliefs should've changed
assert any(b.name == "no_more_booze" for b in agent._current_beliefs.false)
@pytest.mark.asyncio
async def test_llm_failure_handling(agent, llm, sample_program):
"""
Check that the agent handles failures gracefully without crashing.
"""
llm._query_llm.side_effect = httpx.HTTPError("")
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
belief_changes = await agent.belief_inferrer.infer_from_conversation(
ChatHistory(
messages=[ChatMessage(role="user", content="Good day!")],
),
)
assert len(belief_changes.true) == 0
assert len(belief_changes.false) == 0

View File

@@ -1,65 +0,0 @@
import json
from unittest.mock import AsyncMock
import pytest
from control_backend.agents.bdi import (
TextBeliefExtractorAgent,
)
from control_backend.core.agent_system import InternalMessage
@pytest.fixture
def agent():
agent = TextBeliefExtractorAgent("text_belief_agent")
agent.send = AsyncMock()
return agent
def make_msg(sender: str, body: str, thread: str | None = None) -> InternalMessage:
return InternalMessage(to="unused", sender=sender, body=body, thread=thread)
@pytest.mark.asyncio
async def test_handle_message_ignores_other_agents(agent):
msg = make_msg("unknown", "some data", None)
await agent.handle_message(msg)
agent.send.assert_not_called() # noqa # `agent.send` has no such property, but we mock it.
@pytest.mark.asyncio
async def test_handle_message_from_transcriber(agent, mock_settings):
transcription = "hello world"
msg = make_msg(mock_settings.agent_settings.transcription_name, transcription, None)
await agent.handle_message(msg)
agent.send.assert_awaited_once() # noqa # `agent.send` has no such property, but we mock it.
sent: InternalMessage = agent.send.call_args.args[0] # noqa
assert sent.to == mock_settings.agent_settings.bdi_belief_collector_name
assert sent.thread == "beliefs"
parsed = json.loads(sent.body)
assert parsed == {"beliefs": {"user_said": [transcription]}, "type": "belief_extraction_text"}
@pytest.mark.asyncio
async def test_process_transcription_demo(agent, mock_settings):
transcription = "this is a test"
await agent._process_transcription_demo(transcription)
agent.send.assert_awaited_once() # noqa # `agent.send` has no such property, but we mock it.
sent: InternalMessage = agent.send.call_args.args[0] # noqa
assert sent.to == mock_settings.agent_settings.bdi_belief_collector_name
assert sent.thread == "beliefs"
parsed = json.loads(sent.body)
assert parsed["beliefs"]["user_said"] == [transcription]
@pytest.mark.asyncio
async def test_setup_initializes_beliefs(agent):
"""Covers the setup method and ensures beliefs are initialized."""
await agent.setup()
assert agent.beliefs == {"mood": ["X"], "car": ["Y"]}

View File

@@ -66,7 +66,7 @@ async def test_llm_processing_success(mock_httpx_client, mock_settings):
# "Hello world." constitutes one sentence/chunk based on punctuation split
# The agent should call send once with the full sentence
assert agent.send.called
args = agent.send.call_args[0][0]
args = agent.send.call_args_list[0][0][0]
assert args.to == mock_settings.agent_settings.bdi_core_name
assert "Hello world." in args.body

View File

@@ -1,4 +1,5 @@
import json
import uuid
from unittest.mock import AsyncMock
import pytest
@@ -6,7 +7,7 @@ from fastapi import FastAPI
from fastapi.testclient import TestClient
from control_backend.api.v1.endpoints import program
from control_backend.schemas.program import Program
from control_backend.schemas.program import BasicNorm, Goal, Phase, Plan, Program
@pytest.fixture
@@ -25,29 +26,39 @@ def client(app):
def make_valid_program_dict():
"""Helper to create a valid Program JSON structure."""
return {
"phases": [
{
"id": "phase1",
"label": "basephase",
"norms": [{"id": "n1", "label": "norm", "norm": "be nice"}],
"goals": [
{"id": "g1", "label": "goal", "description": "test goal", "achieved": False}
# Converting to JSON using Pydantic because it knows how to convert a UUID object
program_json_str = Program(
phases=[
Phase(
id=uuid.uuid4(),
name="Basic Phase",
norms=[
BasicNorm(
id=uuid.uuid4(),
name="Some norm",
norm="Do normal.",
),
],
"triggers": [
{
"id": "t1",
"label": "trigger",
"type": "keywords",
"keywords": [
{"id": "kw1", "keyword": "keyword1"},
{"id": "kw2", "keyword": "keyword2"},
],
},
goals=[
Goal(
id=uuid.uuid4(),
name="Some goal",
description="This description can be used to determine whether the goal "
"has been achieved.",
plan=Plan(
id=uuid.uuid4(),
name="Goal Plan",
steps=[],
),
can_fail=False,
),
],
}
]
}
triggers=[],
),
],
).model_dump_json()
# Converting back to a dict because that's what's expected
return json.loads(program_json_str)
def test_receive_program_success(client):
@@ -71,7 +82,8 @@ def test_receive_program_success(client):
sent_bytes = args[0][1]
sent_obj = json.loads(sent_bytes.decode())
expected_obj = Program.model_validate(program_dict).model_dump()
# Converting to JSON using Pydantic because it knows how to handle UUIDs
expected_obj = json.loads(Program.model_validate(program_dict).model_dump_json())
assert sent_obj == expected_obj

View File

@@ -1,49 +1,66 @@
import uuid
import pytest
from pydantic import ValidationError
from control_backend.schemas.program import (
BasicNorm,
ConditionalNorm,
Goal,
KeywordTrigger,
Norm,
InferredBelief,
KeywordBelief,
LogicalOperator,
Phase,
Plan,
Program,
TriggerKeyword,
SemanticBelief,
Trigger,
)
def base_norm() -> Norm:
return Norm(
id="norm1",
label="testNorm",
def base_norm() -> BasicNorm:
return BasicNorm(
id=uuid.uuid4(),
name="testNormName",
norm="testNormNorm",
critical=False,
)
def base_goal() -> Goal:
return Goal(
id="goal1",
label="testGoal",
description="testGoalDescription",
achieved=False,
id=uuid.uuid4(),
name="testGoalName",
description="This description can be used to determine whether the goal has been achieved.",
plan=Plan(
id=uuid.uuid4(),
name="testGoalPlanName",
steps=[],
),
can_fail=False,
)
def base_trigger() -> KeywordTrigger:
return KeywordTrigger(
id="trigger1",
label="testTrigger",
type="keywords",
keywords=[
TriggerKeyword(id="keyword1", keyword="testKeyword1"),
TriggerKeyword(id="keyword1", keyword="testKeyword2"),
],
def base_trigger() -> Trigger:
return Trigger(
id=uuid.uuid4(),
name="testTriggerName",
condition=KeywordBelief(
id=uuid.uuid4(),
name="testTriggerKeywordBeliefTriggerName",
keyword="Keyword",
),
plan=Plan(
id=uuid.uuid4(),
name="testTriggerPlanName",
steps=[],
),
)
def base_phase() -> Phase:
return Phase(
id="phase1",
label="basephase",
id=uuid.uuid4(),
norms=[base_norm()],
goals=[base_goal()],
triggers=[base_trigger()],
@@ -58,7 +75,7 @@ def invalid_program() -> dict:
# wrong types inside phases list (not Phase objects)
return {
"phases": [
{"id": "phase1"}, # incomplete
{"id": uuid.uuid4()}, # incomplete
{"not_a_phase": True},
]
}
@@ -77,11 +94,112 @@ def test_valid_deepprogram():
# validate nested components directly
phase = validated.phases[0]
assert isinstance(phase.goals[0], Goal)
assert isinstance(phase.triggers[0], KeywordTrigger)
assert isinstance(phase.norms[0], Norm)
assert isinstance(phase.triggers[0], Trigger)
assert isinstance(phase.norms[0], BasicNorm)
def test_invalid_program():
bad = invalid_program()
with pytest.raises(ValidationError):
Program.model_validate(bad)
def test_conditional_norm_parsing():
"""
Check that pydantic is able to preserve the type of the norm, that it doesn't lose its
"condition" field when serializing and deserializing.
"""
norm = ConditionalNorm(
name="testNormName",
id=uuid.uuid4(),
norm="testNormNorm",
critical=False,
condition=KeywordBelief(
name="testKeywordBelief",
id=uuid.uuid4(),
keyword="testKeywordBelief",
),
)
program = Program(
phases=[
Phase(
name="Some phase",
id=uuid.uuid4(),
norms=[norm],
goals=[],
triggers=[],
),
],
)
parsed_program = Program.model_validate_json(program.model_dump_json())
parsed_norm = parsed_program.phases[0].norms[0]
assert hasattr(parsed_norm, "condition")
assert isinstance(parsed_norm, ConditionalNorm)
def test_belief_type_parsing():
"""
Check that pydantic is able to discern between the different types of beliefs when serializing
and deserializing.
"""
keyword_belief = KeywordBelief(
name="testKeywordBelief",
id=uuid.uuid4(),
keyword="something",
)
semantic_belief = SemanticBelief(
name="testSemanticBelief",
id=uuid.uuid4(),
description="something",
)
inferred_belief = InferredBelief(
name="testInferredBelief",
id=uuid.uuid4(),
operator=LogicalOperator.OR,
left=keyword_belief,
right=semantic_belief,
)
program = Program(
phases=[
Phase(
name="Some phase",
id=uuid.uuid4(),
norms=[],
goals=[],
triggers=[
Trigger(
name="testTriggerKeywordTrigger",
id=uuid.uuid4(),
condition=keyword_belief,
plan=Plan(name="testTriggerPlanName", id=uuid.uuid4(), steps=[]),
),
Trigger(
name="testTriggerSemanticTrigger",
id=uuid.uuid4(),
condition=semantic_belief,
plan=Plan(name="testTriggerPlanName", id=uuid.uuid4(), steps=[]),
),
Trigger(
name="testTriggerInferredTrigger",
id=uuid.uuid4(),
condition=inferred_belief,
plan=Plan(name="testTriggerPlanName", id=uuid.uuid4(), steps=[]),
),
],
),
],
)
parsed_program = Program.model_validate_json(program.model_dump_json())
parsed_keyword_belief = parsed_program.phases[0].triggers[0].condition
assert isinstance(parsed_keyword_belief, KeywordBelief)
parsed_semantic_belief = parsed_program.phases[0].triggers[1].condition
assert isinstance(parsed_semantic_belief, SemanticBelief)
parsed_inferred_belief = parsed_program.phases[0].triggers[2].condition
assert isinstance(parsed_inferred_belief, InferredBelief)

23
uv.lock generated
View File

@@ -997,6 +997,7 @@ dependencies = [
{ name = "pydantic" },
{ name = "pydantic-settings" },
{ name = "python-json-logger" },
{ name = "python-slugify" },
{ name = "pyyaml" },
{ name = "pyzmq" },
{ name = "silero-vad" },
@@ -1046,6 +1047,7 @@ requires-dist = [
{ name = "pydantic", specifier = ">=2.12.0" },
{ name = "pydantic-settings", specifier = ">=2.11.0" },
{ name = "python-json-logger", specifier = ">=4.0.0" },
{ name = "python-slugify", specifier = ">=8.0.4" },
{ name = "pyyaml", specifier = ">=6.0.3" },
{ name = "pyzmq", specifier = ">=27.1.0" },
{ name = "silero-vad", specifier = ">=6.0.0" },
@@ -1341,6 +1343,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/45/58/38b5afbc1a800eeea951b9285d3912613f2603bdf897a4ab0f4bd7f405fc/python_multipart-0.0.20-py3-none-any.whl", hash = "sha256:8a62d3a8335e06589fe01f2a3e178cdcc632f3fbe0d492ad9ee0ec35aab1f104", size = 24546, upload-time = "2024-12-16T19:45:44.423Z" },
]
[[package]]
name = "python-slugify"
version = "8.0.4"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "text-unidecode" },
]
sdist = { url = "https://files.pythonhosted.org/packages/87/c7/5e1547c44e31da50a460df93af11a535ace568ef89d7a811069ead340c4a/python-slugify-8.0.4.tar.gz", hash = "sha256:59202371d1d05b54a9e7720c5e038f928f45daaffe41dd10822f3907b937c856", size = 10921, upload-time = "2024-02-08T18:32:45.488Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a4/62/02da182e544a51a5c3ccf4b03ab79df279f9c60c5e82d5e8bec7ca26ac11/python_slugify-8.0.4-py2.py3-none-any.whl", hash = "sha256:276540b79961052b66b7d116620b36518847f52d5fd9e3a70164fc8c50faa6b8", size = 10051, upload-time = "2024-02-08T18:32:43.911Z" },
]
[[package]]
name = "pyyaml"
version = "6.0.3"
@@ -1864,6 +1878,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a2/09/77d55d46fd61b4a135c444fc97158ef34a095e5681d0a6c10b75bf356191/sympy-1.14.0-py3-none-any.whl", hash = "sha256:e091cc3e99d2141a0ba2847328f5479b05d94a6635cb96148ccb3f34671bd8f5", size = 6299353, upload-time = "2025-04-27T18:04:59.103Z" },
]
[[package]]
name = "text-unidecode"
version = "1.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ab/e2/e9a00f0ccb71718418230718b3d900e71a5d16e701a3dae079a21e9cd8f8/text-unidecode-1.3.tar.gz", hash = "sha256:bad6603bb14d279193107714b288be206cac565dfa49aa5b105294dd5c4aab93", size = 76885, upload-time = "2019-08-30T21:36:45.405Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a6/a5/c0b6468d3824fe3fde30dbb5e1f687b291608f9473681bbf7dabbf5a87d7/text_unidecode-1.3-py2.py3-none-any.whl", hash = "sha256:1311f10e8b895935241623731c2ba64f4c455287888b18189350b67134a822e8", size = 78154, upload-time = "2019-08-30T21:37:03.543Z" },
]
[[package]]
name = "tiktoken"
version = "0.12.0"