Compare commits

...

10 Commits

Author SHA1 Message Date
Twirre Meulenbelt
93d67ccb66 feat: add reset functionality to semantic belief extractor
ref: N25B-432
2026-01-07 17:50:47 +01:00
Twirre Meulenbelt
aa5b386f65 feat: semantically determine goal completion
ref: N25B-432
2026-01-07 17:08:23 +01:00
Twirre Meulenbelt
3189b9fee3 fix: let belief extractor send user_said belief
ref: N25B-429
2026-01-07 15:19:23 +01:00
07d70cb781 fix: single dispatch order
ref: N25B-429
2026-01-07 13:02:23 +01:00
af832980c8 feat: general slugify method
ref: N25B-429
2026-01-07 12:24:46 +01:00
Twirre Meulenbelt
cabe35cdbd feat: integrate AgentSpeak with semantic belief extraction
ref: N25B-429
2026-01-07 11:44:48 +01:00
Twirre Meulenbelt
de8e829d3e Merge remote-tracking branch 'origin/feat/agentspeak-generation' into feat/semantic-beliefs
# Conflicts:
#	test/unit/agents/bdi/test_bdi_program_manager.py
2026-01-06 15:30:59 +01:00
Twirre Meulenbelt
3406e9ac2f feat: make the pipeline work with Program and AgentSpeak
ref: N25B-429
2026-01-06 15:26:44 +01:00
a357b6990b feat: send program to bdi core
ref: N25B-376
2026-01-06 12:11:37 +01:00
9eea4ee345 feat: new ASL generation
ref: N25B-376
2026-01-02 12:08:20 +01:00
19 changed files with 955 additions and 860 deletions

View File

@@ -187,9 +187,10 @@ class StatementType(StrEnum):
EMPTY = "" EMPTY = ""
DO_ACTION = "." DO_ACTION = "."
ACHIEVE_GOAL = "!" ACHIEVE_GOAL = "!"
# TEST_GOAL = "?" # TODO TEST_GOAL = "?"
ADD_BELIEF = "+" ADD_BELIEF = "+"
REMOVE_BELIEF = "-" REMOVE_BELIEF = "-"
REPLACE_BELIEF = "-+"
@dataclass @dataclass

View File

@@ -0,0 +1,403 @@
from functools import singledispatchmethod
from slugify import slugify
from control_backend.agents.bdi.agentspeak_ast import (
AstBinaryOp,
AstExpression,
AstLiteral,
AstPlan,
AstProgram,
AstRule,
AstStatement,
AstString,
AstVar,
BinaryOperatorType,
StatementType,
TriggerType,
)
from control_backend.schemas.program import (
BasicNorm,
ConditionalNorm,
GestureAction,
Goal,
InferredBelief,
KeywordBelief,
LLMAction,
LogicalOperator,
Norm,
Phase,
PlanElement,
Program,
ProgramElement,
SemanticBelief,
SpeechAction,
Trigger,
)
class AgentSpeakGenerator:
_asp: AstProgram
def generate(self, program: Program) -> str:
self._asp = AstProgram()
self._asp.rules.append(AstRule(self._astify(program.phases[0])))
self._add_keyword_inference()
self._add_default_plans()
self._process_phases(program.phases)
self._add_fallbacks()
return str(self._asp)
def _add_keyword_inference(self) -> None:
keyword = AstVar("Keyword")
message = AstVar("Message")
position = AstVar("Pos")
self._asp.rules.append(
AstRule(
AstLiteral("keyword_said", [keyword]),
AstLiteral("user_said", [message])
& AstLiteral(".substring", [keyword, message, position])
& (position >= 0),
)
)
def _add_default_plans(self):
self._add_reply_with_goal_plan()
self._add_say_plan()
self._add_reply_plan()
def _add_reply_with_goal_plan(self):
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("reply_with_goal", [AstVar("Goal")]),
[AstLiteral("user_said", [AstVar("Message")])],
[
AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")),
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"findall",
[AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")],
),
),
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"reply_with_goal", [AstVar("Message"), AstVar("Norms"), AstVar("Goal")]
),
),
],
)
)
def _add_say_plan(self):
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("say", [AstVar("Text")]),
[],
[
AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")),
AstStatement(StatementType.DO_ACTION, AstLiteral("say", [AstVar("Text")])),
],
)
)
def _add_reply_plan(self):
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("reply"),
[AstLiteral("user_said", [AstVar("Message")])],
[
AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")),
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"findall",
[AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")],
),
),
AstStatement(
StatementType.DO_ACTION,
AstLiteral("reply", [AstVar("Message"), AstVar("Norms")]),
),
],
)
)
def _process_phases(self, phases: list[Phase]) -> None:
for curr_phase, next_phase in zip([None] + phases, phases + [None], strict=True):
if curr_phase:
self._process_phase(curr_phase)
self._add_phase_transition(curr_phase, next_phase)
# End phase behavior
# When deleting this, the entire `reply` plan and action can be deleted
self._asp.plans.append(
AstPlan(
type=TriggerType.ADDED_BELIEF,
trigger_literal=AstLiteral("user_said", [AstVar("Message")]),
context=[AstLiteral("phase", [AstString("end")])],
body=[AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("reply"))],
)
)
def _process_phase(self, phase: Phase) -> None:
for norm in phase.norms:
self._process_norm(norm, phase)
self._add_default_loop(phase)
previous_goal = None
for goal in phase.goals:
self._process_goal(goal, phase, previous_goal)
previous_goal = goal
for trigger in phase.triggers:
self._process_trigger(trigger, phase)
def _add_phase_transition(self, from_phase: Phase | None, to_phase: Phase | None) -> None:
if from_phase is None:
return
from_phase_ast = self._astify(from_phase)
to_phase_ast = (
self._astify(to_phase) if to_phase else AstLiteral("phase", [AstString("end")])
)
context = [from_phase_ast, ~AstLiteral("responded_this_turn")]
if from_phase and from_phase.goals:
context.append(self._astify(from_phase.goals[-1], achieved=True))
body = [
AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast),
AstStatement(StatementType.ADD_BELIEF, to_phase_ast),
]
if from_phase:
body.extend(
[
AstStatement(
StatementType.TEST_GOAL, AstLiteral("user_said", [AstVar("Message")])
),
AstStatement(
StatementType.REPLACE_BELIEF, AstLiteral("user_said", [AstVar("Message")])
),
]
)
self._asp.plans.append(
AstPlan(TriggerType.ADDED_GOAL, AstLiteral("transition_phase"), context, body)
)
def _process_norm(self, norm: Norm, phase: Phase) -> None:
rule: AstRule | None = None
match norm:
case ConditionalNorm(condition=cond):
rule = AstRule(self._astify(norm), self._astify(phase) & self._astify(cond))
case BasicNorm():
rule = AstRule(self._astify(norm), self._astify(phase))
if not rule:
return
self._asp.rules.append(rule)
def _add_default_loop(self, phase: Phase) -> None:
actions = []
actions.append(AstStatement(StatementType.REMOVE_BELIEF, AstLiteral("responded_this_turn")))
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("check_triggers")))
for goal in phase.goals:
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, self._astify(goal)))
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("transition_phase")))
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_BELIEF,
AstLiteral("user_said", [AstVar("Message")]),
[self._astify(phase)],
actions,
)
)
def _process_goal(
self,
goal: Goal,
phase: Phase,
previous_goal: Goal | None = None,
continues_response: bool = False,
) -> None:
context: list[AstExpression] = [self._astify(phase)]
context.append(~self._astify(goal, achieved=True))
if previous_goal and previous_goal.can_fail:
context.append(self._astify(previous_goal, achieved=True))
if not continues_response:
context.append(~AstLiteral("responded_this_turn"))
body = []
subgoals = []
for step in goal.plan.steps:
body.append(self._step_to_statement(step))
if isinstance(step, Goal):
subgoals.append(step)
if not goal.can_fail and not continues_response:
body.append(AstStatement(StatementType.ADD_BELIEF, self._astify(goal, achieved=True)))
self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(goal), context, body))
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
self._astify(goal),
context=[],
body=[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
prev_goal = None
for subgoal in subgoals:
self._process_goal(subgoal, phase, prev_goal)
prev_goal = subgoal
def _step_to_statement(self, step: PlanElement) -> AstStatement:
match step:
case Goal() | SpeechAction() | LLMAction() as a:
return AstStatement(StatementType.ACHIEVE_GOAL, self._astify(a))
case GestureAction() as a:
return AstStatement(StatementType.DO_ACTION, self._astify(a))
# TODO: separate handling of keyword and others
def _process_trigger(self, trigger: Trigger, phase: Phase) -> None:
body = []
subgoals = []
for step in trigger.plan.steps:
body.append(self._step_to_statement(step))
if isinstance(step, Goal):
step.can_fail = False # triggers are continuous sequence
subgoals.append(step)
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("check_triggers"),
[self._astify(phase), self._astify(trigger.condition)],
body,
)
)
for subgoal in subgoals:
self._process_goal(subgoal, phase, continues_response=True)
def _add_fallbacks(self):
# Trigger fallback
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("check_triggers"),
[],
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
# Phase transition fallback
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("transition_phase"),
[],
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
@singledispatchmethod
def _astify(self, element: ProgramElement) -> AstExpression:
raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.")
@_astify.register
def _(self, kwb: KeywordBelief) -> AstExpression:
return AstLiteral("keyword_said", [AstString(kwb.keyword)])
@_astify.register
def _(self, sb: SemanticBelief) -> AstExpression:
return AstLiteral(self.get_semantic_belief_slug(sb))
@staticmethod
def get_semantic_belief_slug(sb: SemanticBelief) -> str:
# If you need a method like this for other types, make a public slugify singledispatch for
# all types.
return f"semantic_{AgentSpeakGenerator._slugify_str(sb.name)}"
@_astify.register
def _(self, ib: InferredBelief) -> AstExpression:
return AstBinaryOp(
self._astify(ib.left),
BinaryOperatorType.AND if ib.operator == LogicalOperator.AND else BinaryOperatorType.OR,
self._astify(ib.right),
)
@_astify.register
def _(self, norm: Norm) -> AstExpression:
functor = "critical_norm" if norm.critical else "norm"
return AstLiteral(functor, [AstString(norm.norm)])
@_astify.register
def _(self, phase: Phase) -> AstExpression:
return AstLiteral("phase", [AstString(str(phase.id))])
@_astify.register
def _(self, goal: Goal, achieved: bool = False) -> AstExpression:
return AstLiteral(f"{'achieved_' if achieved else ''}{self._slugify_str(goal.name)}")
@_astify.register
def _(self, trigger: Trigger) -> AstExpression:
return AstLiteral(self.slugify(trigger))
@_astify.register
def _(self, sa: SpeechAction) -> AstExpression:
return AstLiteral("say", [AstString(sa.text)])
@_astify.register
def _(self, ga: GestureAction) -> AstExpression:
gesture = ga.gesture
return AstLiteral("gesture", [AstString(gesture.type), AstString(gesture.name)])
@_astify.register
def _(self, la: LLMAction) -> AstExpression:
return AstLiteral("reply_with_goal", [AstString(la.goal)])
@singledispatchmethod
@staticmethod
def slugify(element: ProgramElement) -> str:
raise NotImplementedError(f"Cannot convert element {element} to a slug.")
@slugify.register
@staticmethod
def _(sb: SemanticBelief) -> str:
return f"semantic_{AgentSpeakGenerator._slugify_str(sb.name)}"
@slugify.register
@staticmethod
def _(g: Goal) -> str:
return AgentSpeakGenerator._slugify_str(g.name)
@slugify.register
@staticmethod
def _(t: Trigger):
return f"trigger_{AgentSpeakGenerator._slugify_str(t.name)}"
@staticmethod
def _slugify_str(text: str) -> str:
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])

View File

@@ -42,13 +42,13 @@ class BDICoreAgent(BaseAgent):
bdi_agent: agentspeak.runtime.Agent bdi_agent: agentspeak.runtime.Agent
def __init__(self, name: str, asl: str): def __init__(self, name: str):
super().__init__(name) super().__init__(name)
self.asl_file = asl
self.env = agentspeak.runtime.Environment() self.env = agentspeak.runtime.Environment()
# Deep copy because we don't actually want to modify the standard actions globally # Deep copy because we don't actually want to modify the standard actions globally
self.actions = copy.deepcopy(agentspeak.stdlib.actions) self.actions = copy.deepcopy(agentspeak.stdlib.actions)
self._wake_bdi_loop = asyncio.Event() self._wake_bdi_loop = asyncio.Event()
self._bdi_loop_task = None
async def setup(self) -> None: async def setup(self) -> None:
""" """
@@ -65,19 +65,22 @@ class BDICoreAgent(BaseAgent):
await self._load_asl() await self._load_asl()
# Start the BDI cycle loop # Start the BDI cycle loop
self.add_behavior(self._bdi_loop()) self._bdi_loop_task = self.add_behavior(self._bdi_loop())
self._wake_bdi_loop.set() self._wake_bdi_loop.set()
self.logger.debug("Setup complete.") self.logger.debug("Setup complete.")
async def _load_asl(self): async def _load_asl(self, file_name: str | None = None) -> None:
""" """
Load and parse the AgentSpeak source file. Load and parse the AgentSpeak source file.
""" """
file_name = file_name or "src/control_backend/agents/bdi/default_behavior.asl"
try: try:
with open(self.asl_file) as source: with open(file_name) as source:
self.bdi_agent = self.env.build_agent(source, self.actions) self.bdi_agent = self.env.build_agent(source, self.actions)
self.logger.info(f"Loaded new ASL from {file_name}.")
except FileNotFoundError: except FileNotFoundError:
self.logger.warning(f"Could not find the specified ASL file at {self.asl_file}.") self.logger.warning(f"Could not find the specified ASL file at {file_name}.")
self.bdi_agent = agentspeak.runtime.Agent(self.env, self.name) self.bdi_agent = agentspeak.runtime.Agent(self.env, self.name)
async def _bdi_loop(self): async def _bdi_loop(self):
@@ -116,6 +119,7 @@ class BDICoreAgent(BaseAgent):
Handle incoming messages. Handle incoming messages.
- **Beliefs**: Updates the internal belief base. - **Beliefs**: Updates the internal belief base.
- **Program**: Updates the internal agentspeak file to match the current program.
- **LLM Responses**: Forwards the generated text to the Robot Speech Agent (actuation). - **LLM Responses**: Forwards the generated text to the Robot Speech Agent (actuation).
:param msg: The received internal message. :param msg: The received internal message.
@@ -130,6 +134,13 @@ class BDICoreAgent(BaseAgent):
self.logger.exception("Error processing belief.") self.logger.exception("Error processing belief.")
return return
# New agentspeak file
if msg.thread == "new_program":
if self._bdi_loop_task:
self._bdi_loop_task.cancel()
await self._load_asl(msg.body)
self.add_behavior(self._bdi_loop())
# The message was not a belief, handle special cases based on sender # The message was not a belief, handle special cases based on sender
match msg.sender: match msg.sender:
case settings.agent_settings.llm_name: case settings.agent_settings.llm_name:
@@ -194,10 +205,13 @@ class BDICoreAgent(BaseAgent):
self.logger.debug(f"Added belief {self.format_belief_string(name, args)}") self.logger.debug(f"Added belief {self.format_belief_string(name, args)}")
def _remove_belief(self, name: str, args: Iterable[str]): def _remove_belief(self, name: str, args: Iterable[str] | None):
""" """
Removes a specific belief (with arguments), if it exists. Removes a specific belief (with arguments), if it exists.
""" """
if args is None:
term = agentspeak.Literal(name)
else:
new_args = (agentspeak.Literal(arg) for arg in args) new_args = (agentspeak.Literal(arg) for arg in args)
term = agentspeak.Literal(name, new_args) term = agentspeak.Literal(name, new_args)
@@ -246,20 +260,18 @@ class BDICoreAgent(BaseAgent):
the function expects (which will be located in `term.args`). the function expects (which will be located in `term.args`).
""" """
@self.actions.add(".reply", 3) @self.actions.add(".reply", 2)
def _reply(agent: "BDICoreAgent", term, intention): def _reply(agent: "BDICoreAgent", term, intention):
""" """
Let the LLM generate a response to a user's utterance with the current norms and goals. Let the LLM generate a response to a user's utterance with the current norms and goals.
""" """
message_text = agentspeak.grounded(term.args[0], intention.scope) message_text = agentspeak.grounded(term.args[0], intention.scope)
norms = agentspeak.grounded(term.args[1], intention.scope) norms = agentspeak.grounded(term.args[1], intention.scope)
goals = agentspeak.grounded(term.args[2], intention.scope)
self.logger.debug("Norms: %s", norms) self.logger.debug("Norms: %s", norms)
self.logger.debug("Goals: %s", goals)
self.logger.debug("User text: %s", message_text) self.logger.debug("User text: %s", message_text)
asyncio.create_task(self._send_to_llm(str(message_text), str(norms), str(goals))) self.add_behavior(self._send_to_llm(str(message_text), str(norms), ""))
yield yield
@self.actions.add(".reply_with_goal", 3) @self.actions.add(".reply_with_goal", 3)
@@ -278,7 +290,7 @@ class BDICoreAgent(BaseAgent):
norms, norms,
goal, goal,
) )
# asyncio.create_task(self._send_to_llm(str(message_text), norms, str(goal))) self.add_behavior(self._send_to_llm(str(message_text), str(norms), str(goal)))
yield yield
@self.actions.add(".say", 1) @self.actions.add(".say", 1)
@@ -290,13 +302,14 @@ class BDICoreAgent(BaseAgent):
self.logger.debug('"say" action called with text=%s', message_text) self.logger.debug('"say" action called with text=%s', message_text)
# speech_command = SpeechCommand(data=message_text) speech_command = SpeechCommand(data=message_text)
# speech_message = InternalMessage( speech_message = InternalMessage(
# to=settings.agent_settings.robot_speech_name, to=settings.agent_settings.robot_speech_name,
# sender=settings.agent_settings.bdi_core_name, sender=settings.agent_settings.bdi_core_name,
# body=speech_command.model_dump_json(), body=speech_command.model_dump_json(),
# ) )
# asyncio.create_task(agent.send(speech_message)) # TODO: add to conversation history
self.add_behavior(self.send(speech_message))
yield yield
@self.actions.add(".gesture", 2) @self.actions.add(".gesture", 2)
@@ -336,8 +349,8 @@ class BDICoreAgent(BaseAgent):
self.logger.info("Message sent to LLM agent: %s", text) self.logger.info("Message sent to LLM agent: %s", text)
@staticmethod @staticmethod
def format_belief_string(name: str, args: Iterable[str] = []): def format_belief_string(name: str, args: Iterable[str] | None = []):
""" """
Given a belief's name and its args, return a string of the form "name(*args)" Given a belief's name and its args, return a string of the form "name(*args)"
""" """
return f"{name}{'(' if args else ''}{','.join(args)}{')' if args else ''}" return f"{name}{'(' if args else ''}{','.join(args or [])}{')' if args else ''}"

View File

@@ -1,598 +1,15 @@
import uuid import asyncio
from collections.abc import Iterable
import zmq import zmq
from pydantic import ValidationError from pydantic import ValidationError
from slugify import slugify
from zmq.asyncio import Context from zmq.asyncio import Context
from control_backend.agents import BaseAgent from control_backend.agents import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.program import ( from control_backend.schemas.belief_list import BeliefList, GoalList
Action, from control_backend.schemas.internal_message import InternalMessage
BasicBelief, from control_backend.schemas.program import Belief, ConditionalNorm, Goal, InferredBelief, Program
BasicNorm,
Belief,
ConditionalNorm,
GestureAction,
Goal,
InferredBelief,
KeywordBelief,
LLMAction,
LogicalOperator,
Phase,
Plan,
Program,
ProgramElement,
SemanticBelief,
SpeechAction,
Trigger,
)
test_program = Program(
phases=[
Phase(
norms=[
BasicNorm(norm="Talk like a pirate", id=uuid.uuid4()),
ConditionalNorm(
condition=InferredBelief(
left=KeywordBelief(keyword="Arr", id=uuid.uuid4()),
right=SemanticBelief(
description="testing", name="semantic belief", id=uuid.uuid4()
),
operator=LogicalOperator.OR,
name="Talking to a pirate",
id=uuid.uuid4(),
),
norm="Use nautical terms",
id=uuid.uuid4(),
),
ConditionalNorm(
condition=SemanticBelief(
description="We are talking to a child",
name="talking to child",
id=uuid.uuid4(),
),
norm="Do not use cuss words",
id=uuid.uuid4(),
),
],
triggers=[
Trigger(
condition=InferredBelief(
left=KeywordBelief(keyword="key", id=uuid.uuid4()),
right=InferredBelief(
left=KeywordBelief(keyword="key2", id=uuid.uuid4()),
right=SemanticBelief(
description="Decode this", name="semantic belief 2", id=uuid.uuid4()
),
operator=LogicalOperator.OR,
name="test trigger inferred inner",
id=uuid.uuid4(),
),
operator=LogicalOperator.OR,
name="test trigger inferred outer",
id=uuid.uuid4(),
),
plan=Plan(
steps=[
SpeechAction(text="Testing trigger", id=uuid.uuid4()),
Goal(
name="Testing trigger",
plan=Plan(
steps=[LLMAction(goal="Do something", id=uuid.uuid4())],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
)
],
goals=[
Goal(
name="Determine user age",
plan=Plan(
steps=[LLMAction(goal="Determine the age of the user.", id=uuid.uuid4())],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
Goal(
name="Find the user's name",
plan=Plan(
steps=[
Goal(
name="Greet the user",
plan=Plan(
steps=[LLMAction(goal="Greet the user.", id=uuid.uuid4())],
id=uuid.uuid4(),
),
can_fail=False,
id=uuid.uuid4(),
),
Goal(
name="Ask for name",
plan=Plan(
steps=[
LLMAction(goal="Obtain the user's name.", id=uuid.uuid4())
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
Goal(
name="Tell a joke",
plan=Plan(
steps=[LLMAction(goal="Tell a joke.", id=uuid.uuid4())], id=uuid.uuid4()
),
id=uuid.uuid4(),
),
],
id=uuid.uuid4(),
),
Phase(
id=uuid.uuid4(),
norms=[
BasicNorm(norm="Use very gentle speech.", id=uuid.uuid4()),
ConditionalNorm(
condition=SemanticBelief(
description="We are talking to a child",
name="talking to child",
id=uuid.uuid4(),
),
norm="Do not use cuss words",
id=uuid.uuid4(),
),
],
triggers=[
Trigger(
condition=InferredBelief(
left=KeywordBelief(keyword="help", id=uuid.uuid4()),
right=SemanticBelief(
description="User is stuck", name="stuck", id=uuid.uuid4()
),
operator=LogicalOperator.OR,
name="help_or_stuck",
id=uuid.uuid4(),
),
plan=Plan(
steps=[
Goal(
name="Unblock user",
plan=Plan(
steps=[
LLMAction(
goal="Provide a step-by-step path to "
"resolve the user's issue.",
id=uuid.uuid4(),
)
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
],
goals=[
Goal(
name="Clarify intent",
plan=Plan(
steps=[
LLMAction(
goal="Ask 1-2 targeted questions to clarify the "
"user's intent, then proceed.",
id=uuid.uuid4(),
)
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
Goal(
name="Provide solution",
plan=Plan(
steps=[
LLMAction(
goal="Deliver a solution to complete the user's goal.",
id=uuid.uuid4(),
)
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
Goal(
name="Summarize next steps",
plan=Plan(
steps=[
LLMAction(
goal="Summarize what the user should do next.", id=uuid.uuid4()
)
],
id=uuid.uuid4(),
),
id=uuid.uuid4(),
),
],
),
]
)
def do_things():
print(AgentSpeakGenerator().generate(test_program))
class AgentSpeakGenerator:
"""
Converts Pydantic representation of behavior programs into AgentSpeak(L) code string.
"""
arrow_prefix = f"{' ' * 2}<-{' ' * 2}"
colon_prefix = f"{' ' * 2}:{' ' * 3}"
indent_prefix = " " * 6
def generate(self, program: Program) -> str:
lines = []
lines.append("")
lines += self._generate_initial_beliefs(program)
lines += self._generate_basic_flow(program)
lines += self._generate_phase_transitions(program)
lines += self._generate_norms(program)
lines += self._generate_belief_inference(program)
lines += self._generate_goals(program)
lines += self._generate_triggers(program)
return "\n".join(lines)
def _generate_initial_beliefs(self, program: Program) -> Iterable[str]:
yield "// --- Initial beliefs and agent startup ---"
yield "phase(start)."
yield ""
yield "+started"
yield f"{self.colon_prefix}phase(start)"
yield f"{self.arrow_prefix}phase({program.phases[0].id if program.phases else 'end'})."
yield from ["", ""]
def _generate_basic_flow(self, program: Program) -> Iterable[str]:
yield "// --- Basic flow ---"
for phase in program.phases:
yield from self._generate_basic_flow_per_phase(phase)
yield from ["", ""]
def _generate_basic_flow_per_phase(self, phase: Phase) -> Iterable[str]:
yield "+user_said(Message)"
yield f"{self.colon_prefix}phase({phase.id})"
goals = phase.goals
if goals:
yield f"{self.arrow_prefix}{self._slugify(goals[0], include_prefix=True)}"
for goal in goals[1:]:
yield f"{self.indent_prefix}{self._slugify(goal, include_prefix=True)}"
yield f"{self.indent_prefix if goals else self.arrow_prefix}!transition_phase."
def _generate_phase_transitions(self, program: Program) -> Iterable[str]:
yield "// --- Phase transitions ---"
if len(program.phases) == 0:
yield from ["", ""]
return
# TODO: remove outdated things
for i in range(-1, len(program.phases)):
predecessor = program.phases[i] if i >= 0 else None
successor = program.phases[i + 1] if i < len(program.phases) - 1 else None
yield from self._generate_phase_transition(predecessor, successor)
yield from self._generate_phase_transition(None, None) # to avoid failing plan
yield from ["", ""]
def _generate_phase_transition(
self, phase: Phase | None = None, next_phase: Phase | None = None
) -> Iterable[str]:
yield "+!transition_phase"
if phase is None and next_phase is None: # base case true to avoid failing plan
yield f"{self.arrow_prefix}true."
return
yield f"{self.colon_prefix}phase({phase.id if phase else 'start'})"
yield f"{self.arrow_prefix}-+phase({next_phase.id if next_phase else 'end'})."
def _generate_norms(self, program: Program) -> Iterable[str]:
yield "// --- Norms ---"
for phase in program.phases:
for norm in phase.norms:
if type(norm) is BasicNorm:
yield f"{self._slugify(norm)} :- phase({phase.id})."
if type(norm) is ConditionalNorm:
yield (
f"{self._slugify(norm)} :- phase({phase.id}) & "
f"{self._slugify(norm.condition)}."
)
yield from ["", ""]
def _generate_belief_inference(self, program: Program) -> Iterable[str]:
yield "// --- Belief inference rules ---"
for phase in program.phases:
for norm in phase.norms:
if not isinstance(norm, ConditionalNorm):
continue
yield from self._belief_inference_recursive(norm.condition)
for trigger in phase.triggers:
yield from self._belief_inference_recursive(trigger.condition)
yield from ["", ""]
def _belief_inference_recursive(self, belief: Belief) -> Iterable[str]:
if type(belief) is KeywordBelief:
yield (
f"{self._slugify(belief)} :- user_said(Message) & "
f'.substring(Message, "{belief.keyword}", Pos) & Pos >= 0.'
)
if type(belief) is InferredBelief:
yield (
f"{self._slugify(belief)} :- {self._slugify(belief.left)} "
f"{'&' if belief.operator == LogicalOperator.AND else '|'} "
f"{self._slugify(belief.right)}."
)
yield from self._belief_inference_recursive(belief.left)
yield from self._belief_inference_recursive(belief.right)
def _generate_goals(self, program: Program) -> Iterable[str]:
yield "// --- Goals ---"
for phase in program.phases:
previous_goal: Goal | None = None
for goal in phase.goals:
yield from self._generate_goal_plan_recursive(goal, phase, previous_goal)
previous_goal = goal
yield from ["", ""]
def _generate_goal_plan_recursive(
self, goal: Goal, phase: Phase, previous_goal: Goal | None = None
) -> Iterable[str]:
yield f"+{self._slugify(goal, include_prefix=True)}"
# Context
yield f"{self.colon_prefix}phase({phase.id}) &"
yield f"{self.indent_prefix}not responded_this_turn &"
yield f"{self.indent_prefix}not achieved_{self._slugify(goal)} &"
if previous_goal:
yield f"{self.indent_prefix}achieved_{self._slugify(previous_goal)}"
else:
yield f"{self.indent_prefix}true"
extra_goals_to_generate = []
steps = goal.plan.steps
if len(steps) == 0:
yield f"{self.arrow_prefix}true."
return
first_step = steps[0]
yield (
f"{self.arrow_prefix}{self._slugify(first_step, include_prefix=True)}"
f"{'.' if len(steps) == 1 and goal.can_fail else ';'}"
)
if isinstance(first_step, Goal):
extra_goals_to_generate.append(first_step)
for step in steps[1:-1]:
yield f"{self.indent_prefix}{self._slugify(step, include_prefix=True)};"
if isinstance(step, Goal):
extra_goals_to_generate.append(step)
if len(steps) > 1:
last_step = steps[-1]
yield (
f"{self.indent_prefix}{self._slugify(last_step, include_prefix=True)}"
f"{'.' if goal.can_fail else ';'}"
)
if isinstance(last_step, Goal):
extra_goals_to_generate.append(last_step)
if not goal.can_fail:
yield f"{self.indent_prefix}+achieved_{self._slugify(goal)}."
yield f"+{self._slugify(goal, include_prefix=True)}"
yield f"{self.arrow_prefix}true."
yield ""
extra_previous_goal: Goal | None = None
for extra_goal in extra_goals_to_generate:
yield from self._generate_goal_plan_recursive(extra_goal, phase, extra_previous_goal)
extra_previous_goal = extra_goal
def _generate_triggers(self, program: Program) -> Iterable[str]:
yield "// --- Triggers ---"
for phase in program.phases:
for trigger in phase.triggers:
yield from self._generate_trigger_plan(trigger, phase)
yield from ["", ""]
def _generate_trigger_plan(self, trigger: Trigger, phase: Phase) -> Iterable[str]:
belief_name = self._slugify(trigger.condition)
yield f"+{belief_name}"
yield f"{self.colon_prefix}phase({phase.id})"
extra_goals_to_generate = []
steps = trigger.plan.steps
if len(steps) == 0:
yield f"{self.arrow_prefix}true."
return
first_step = steps[0]
yield (
f"{self.arrow_prefix}{self._slugify(first_step, include_prefix=True)}"
f"{'.' if len(steps) == 1 else ';'}"
)
if isinstance(first_step, Goal):
extra_goals_to_generate.append(first_step)
for step in steps[1:-1]:
yield f"{self.indent_prefix}{self._slugify(step, include_prefix=True)};"
if isinstance(step, Goal):
extra_goals_to_generate.append(step)
if len(steps) > 1:
last_step = steps[-1]
yield f"{self.indent_prefix}{self._slugify(last_step, include_prefix=True)}."
if isinstance(last_step, Goal):
extra_goals_to_generate.append(last_step)
yield ""
extra_previous_goal: Goal | None = None
for extra_goal in extra_goals_to_generate:
yield from self._generate_trigger_plan_recursive(extra_goal, phase, extra_previous_goal)
extra_previous_goal = extra_goal
def _generate_trigger_plan_recursive(
self, goal: Goal, phase: Phase, previous_goal: Goal | None = None
) -> Iterable[str]:
yield f"+{self._slugify(goal, include_prefix=True)}"
extra_goals_to_generate = []
steps = goal.plan.steps
if len(steps) == 0:
yield f"{self.arrow_prefix}true."
return
first_step = steps[0]
yield (
f"{self.arrow_prefix}{self._slugify(first_step, include_prefix=True)}"
f"{'.' if len(steps) == 1 and goal.can_fail else ';'}"
)
if isinstance(first_step, Goal):
extra_goals_to_generate.append(first_step)
for step in steps[1:-1]:
yield f"{self.indent_prefix}{self._slugify(step, include_prefix=True)};"
if isinstance(step, Goal):
extra_goals_to_generate.append(step)
if len(steps) > 1:
last_step = steps[-1]
yield (
f"{self.indent_prefix}{self._slugify(last_step, include_prefix=True)}"
f"{'.' if goal.can_fail else ';'}"
)
if isinstance(last_step, Goal):
extra_goals_to_generate.append(last_step)
if not goal.can_fail:
yield f"{self.indent_prefix}+achieved_{self._slugify(goal)}."
yield f"+{self._slugify(goal, include_prefix=True)}"
yield f"{self.arrow_prefix}true."
yield ""
extra_previous_goal: Goal | None = None
for extra_goal in extra_goals_to_generate:
yield from self._generate_goal_plan_recursive(extra_goal, phase, extra_previous_goal)
extra_previous_goal = extra_goal
def _slugify(self, element: ProgramElement, include_prefix: bool = False) -> str:
def base_slugify_call(text: str):
return slugify(text, separator="_", stopwords=["a", "the"])
if type(element) is KeywordBelief:
return f'keyword_said("{element.keyword}")'
if type(element) is SemanticBelief:
name = element.name
return f"semantic_{base_slugify_call(name if name else element.description)}"
if isinstance(element, BasicNorm):
return f'norm("{element.norm}")'
if isinstance(element, Goal):
return f"{'!' if include_prefix else ''}{base_slugify_call(element.name)}"
if isinstance(element, SpeechAction):
return f'.say("{element.text}")'
if isinstance(element, GestureAction):
return f'.gesture("{element.gesture}")'
if isinstance(element, LLMAction):
return f'!generate_response_with_goal("{element.goal}")'
if isinstance(element, Action.__value__):
raise NotImplementedError(
"Have not implemented an ASL string representation for this action."
)
if element.name == "":
raise ValueError("Name must be initialized for this type of ProgramElement.")
return base_slugify_call(element.name)
def _extract_basic_beliefs_from_program(self, program: Program) -> list[BasicBelief]:
beliefs = []
for phase in program.phases:
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += self._extract_basic_beliefs_from_belief(norm.condition)
for trigger in phase.triggers:
beliefs += self._extract_basic_beliefs_from_belief(trigger.condition)
return beliefs
def _extract_basic_beliefs_from_belief(self, belief: Belief) -> list[BasicBelief]:
if isinstance(belief, InferredBelief):
return self._extract_basic_beliefs_from_belief(
belief.left
) + self._extract_basic_beliefs_from_belief(belief.right)
return [belief]
class BDIProgramManager(BaseAgent): class BDIProgramManager(BaseAgent):
@@ -611,40 +28,114 @@ class BDIProgramManager(BaseAgent):
super().__init__(**kwargs) super().__init__(**kwargs)
self.sub_socket = None self.sub_socket = None
# async def _send_to_bdi(self, program: Program): async def _create_agentspeak_and_send_to_bdi(self, program: Program):
# """ """
# Convert a received program into BDI beliefs and send them to the BDI Core Agent. Convert a received program into BDI beliefs and send them to the BDI Core Agent.
#
# Currently, it takes the **first phase** of the program and extracts: Currently, it takes the **first phase** of the program and extracts:
# - **Norms**: Constraints or rules the agent must follow. - **Norms**: Constraints or rules the agent must follow.
# - **Goals**: Objectives the agent must achieve. - **Goals**: Objectives the agent must achieve.
#
# These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will
# overwrite any existing norms/goals of the same name in the BDI agent. overwrite any existing norms/goals of the same name in the BDI agent.
#
# :param program: The program object received from the API. :param program: The program object received from the API.
# """ """
# first_phase = program.phases[0] asg = AgentSpeakGenerator()
# norms_belief = Belief(
# name="norms", asl_str = asg.generate(program)
# arguments=[norm.norm for norm in first_phase.norms],
# replace=True, file_name = "src/control_backend/agents/bdi/agentspeak.asl"
# )
# goals_belief = Belief( with open(file_name, "w") as f:
# name="goals", f.write(asl_str)
# arguments=[goal.description for goal in first_phase.goals],
# replace=True, msg = InternalMessage(
# ) sender=self.name,
# program_beliefs = BeliefMessage(beliefs=[norms_belief, goals_belief]) to=settings.agent_settings.bdi_core_name,
# body=file_name,
# message = InternalMessage( thread="new_program",
# to=settings.agent_settings.bdi_core_name, )
# sender=self.name,
# body=program_beliefs.model_dump_json(), await self.send(msg)
# thread="beliefs",
# ) @staticmethod
# await self.send(message) def _extract_beliefs_from_program(program: Program) -> list[Belief]:
# self.logger.debug("Sent new norms and goals to the BDI agent.") beliefs: list[Belief] = []
def extract_beliefs_from_belief(belief: Belief) -> list[Belief]:
if isinstance(belief, InferredBelief):
return extract_beliefs_from_belief(belief.left) + extract_beliefs_from_belief(
belief.right
)
return [belief]
for phase in program.phases:
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += extract_beliefs_from_belief(norm.condition)
for trigger in phase.triggers:
beliefs += extract_beliefs_from_belief(trigger.condition)
return beliefs
async def _send_beliefs_to_semantic_belief_extractor(self, program: Program):
"""
Extract beliefs from the program and send them to the Semantic Belief Extractor Agent.
:param program: The program received from the API.
"""
beliefs = BeliefList(beliefs=self._extract_beliefs_from_program(program))
message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=self.name,
body=beliefs.model_dump_json(),
thread="beliefs",
)
await self.send(message)
@staticmethod
def _extract_goals_from_program(program: Program) -> list[Goal]:
"""
Extract all goals from the program, including subgoals.
:param program: The program received from the API.
:return: A list of Goal objects.
"""
goals: list[Goal] = []
def extract_goals_from_goal(goal_: Goal) -> list[Goal]:
goals_: list[Goal] = [goal]
for plan in goal_.plan:
if isinstance(plan, Goal):
goals_.extend(extract_goals_from_goal(plan))
return goals_
for phase in program.phases:
for goal in phase.goals:
goals.extend(extract_goals_from_goal(goal))
return goals
async def _send_goals_to_semantic_belief_extractor(self, program: Program):
"""
Extract goals from the program and send them to the Semantic Belief Extractor Agent.
:param program: The program received from the API.
"""
goals = GoalList(goals=self._extract_goals_from_program(program))
message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=self.name,
body=goals.model_dump_json(),
thread="goals",
)
await self.send(message)
async def _receive_programs(self): async def _receive_programs(self):
""" """
@@ -662,7 +153,11 @@ class BDIProgramManager(BaseAgent):
self.logger.exception("Received an invalid program.") self.logger.exception("Received an invalid program.")
continue continue
await self._send_to_bdi(program) await asyncio.gather(
self._create_agentspeak_and_send_to_bdi(program),
self._send_beliefs_to_semantic_belief_extractor(program),
self._send_goals_to_semantic_belief_extractor(program),
)
async def setup(self): async def setup(self):
""" """
@@ -678,7 +173,3 @@ class BDIProgramManager(BaseAgent):
self.sub_socket.subscribe("program") self.sub_socket.subscribe("program")
self.add_behavior(self._receive_programs()) self.add_behavior(self._receive_programs())
if __name__ == "__main__":
do_things()

View File

@@ -0,0 +1,5 @@
norms("").
+user_said(Message) : norms(Norms) <-
-user_said(Message);
.reply(Message, Norms).

View File

@@ -1,6 +0,0 @@
norms("").
goals("").
+user_said(Message) : norms(Norms) & goals(Goals) <-
-user_said(Message);
.reply(Message, Norms, Goals).

View File

@@ -2,23 +2,46 @@ import asyncio
import json import json
import httpx import httpx
from pydantic import ValidationError from pydantic import BaseModel, ValidationError
from slugify import slugify
from control_backend.agents.base import BaseAgent from control_backend.agents.base import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.agent_system import InternalMessage from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList, GoalList
from control_backend.schemas.belief_message import Belief as InternalBelief from control_backend.schemas.belief_message import Belief as InternalBelief
from control_backend.schemas.belief_message import BeliefMessage from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.chat_history import ChatHistory, ChatMessage from control_backend.schemas.chat_history import ChatHistory, ChatMessage
from control_backend.schemas.program import ( from control_backend.schemas.program import Goal, SemanticBelief
Belief,
ConditionalNorm, type JSONLike = None | bool | int | float | str | list["JSONLike"] | dict[str, "JSONLike"]
InferredBelief,
Program,
SemanticBelief, class BeliefState(BaseModel):
true: set[InternalBelief] = set()
false: set[InternalBelief] = set()
def difference(self, other: "BeliefState") -> "BeliefState":
return BeliefState(
true=self.true - other.true,
false=self.false - other.false,
) )
def union(self, other: "BeliefState") -> "BeliefState":
return BeliefState(
true=self.true | other.true,
false=self.false | other.false,
)
def __sub__(self, other):
return self.difference(other)
def __or__(self, other):
return self.union(other)
def __bool__(self):
return bool(self.true) or bool(self.false)
class TextBeliefExtractorAgent(BaseAgent): class TextBeliefExtractorAgent(BaseAgent):
""" """
@@ -34,8 +57,11 @@ class TextBeliefExtractorAgent(BaseAgent):
def __init__(self, name: str): def __init__(self, name: str):
super().__init__(name) super().__init__(name)
self.beliefs: dict[str, bool] = {} self._llm = self.LLM(self, settings.llm_settings.n_parallel)
self.available_beliefs: list[SemanticBelief] = [] self.belief_inferrer = SemanticBeliefInferrer(self._llm)
self.goal_inferrer = GoalAchievementInferrer(self._llm)
self._current_beliefs = BeliefState()
self._current_goal_completions: dict[str, bool] = {}
self.conversation = ChatHistory(messages=[]) self.conversation = ChatHistory(messages=[])
async def setup(self): async def setup(self):
@@ -57,8 +83,9 @@ class TextBeliefExtractorAgent(BaseAgent):
case settings.agent_settings.transcription_name: case settings.agent_settings.transcription_name:
self.logger.debug("Received text from transcriber: %s", msg.body) self.logger.debug("Received text from transcriber: %s", msg.body)
self._apply_conversation_message(ChatMessage(role="user", content=msg.body)) self._apply_conversation_message(ChatMessage(role="user", content=msg.body))
await self._infer_new_beliefs()
await self._user_said(msg.body) await self._user_said(msg.body)
await self._infer_new_beliefs()
await self._infer_goal_completions()
case settings.agent_settings.llm_name: case settings.agent_settings.llm_name:
self.logger.debug("Received text from LLM: %s", msg.body) self.logger.debug("Received text from LLM: %s", msg.body)
self._apply_conversation_message(ChatMessage(role="assistant", content=msg.body)) self._apply_conversation_message(ChatMessage(role="assistant", content=msg.body))
@@ -80,49 +107,60 @@ class TextBeliefExtractorAgent(BaseAgent):
def _handle_program_manager_message(self, msg: InternalMessage): def _handle_program_manager_message(self, msg: InternalMessage):
""" """
Handle a message from the program manager: extract available beliefs from it. Handle a message from the program manager: extract available beliefs and goals from it.
:param msg: The received message from the program manager. :param msg: The received message from the program manager.
""" """
match msg.thread:
case "beliefs":
self._handle_beliefs_message(msg)
case "goals":
self._handle_goals_message(msg)
case "conversation_history":
if msg.body == "reset":
self._reset()
case _:
self.logger.warning("Received unexpected message from %s", msg.sender)
def _reset(self):
self.conversation = ChatHistory(messages=[])
self.belief_inferrer.available_beliefs.clear()
self._current_beliefs = BeliefState()
self.goal_inferrer.goals.clear()
self._current_goal_completions = {}
def _handle_beliefs_message(self, msg: InternalMessage):
try: try:
program = Program.model_validate_json(msg.body) belief_list = BeliefList.model_validate_json(msg.body)
except ValidationError: except ValidationError:
self.logger.warning( self.logger.warning(
"Received message from program manager but it is not a valid program." "Received message from program manager but it is not a valid list of beliefs."
) )
return return
self.logger.debug("Received a program from the program manager.") available_beliefs = [b for b in belief_list.beliefs if isinstance(b, SemanticBelief)]
self.belief_inferrer.available_beliefs = available_beliefs
self.available_beliefs = self._extract_basic_beliefs_from_program(program) self.logger.debug(
"Received %d semantic beliefs from the program manager.",
# TODO Copied from an incomplete version of the program manager. Use that one instead. len(available_beliefs),
@staticmethod
def _extract_basic_beliefs_from_program(program: Program) -> list[SemanticBelief]:
beliefs = []
for phase in program.phases:
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += TextBeliefExtractorAgent._extract_basic_beliefs_from_belief(
norm.condition
) )
for trigger in phase.triggers: def _handle_goals_message(self, msg: InternalMessage):
beliefs += TextBeliefExtractorAgent._extract_basic_beliefs_from_belief( try:
trigger.condition goals_list = GoalList.model_validate_json(msg.body)
except ValidationError:
self.logger.warning(
"Received message from program manager but it is not a valid list of goals."
) )
return
return beliefs # Use only goals that can fail, as the others are always assumed to be completed
available_goals = [g for g in goals_list.goals if g.can_fail]
# TODO Copied from an incomplete version of the program manager. Use that one instead. self.goal_inferrer.goals = available_goals
@staticmethod self.logger.debug(
def _extract_basic_beliefs_from_belief(belief: Belief) -> list[SemanticBelief]: "Received %d failable goals from the program manager.",
if isinstance(belief, InferredBelief): len(available_goals),
return TextBeliefExtractorAgent._extract_basic_beliefs_from_belief( )
belief.left
) + TextBeliefExtractorAgent._extract_basic_beliefs_from_belief(belief.right)
return [belief]
async def _user_said(self, text: str): async def _user_said(self, text: str):
""" """
@@ -130,161 +168,86 @@ class TextBeliefExtractorAgent(BaseAgent):
:param text: User's transcribed text. :param text: User's transcribed text.
""" """
belief = {"beliefs": {"user_said": [text]}, "type": "belief_extraction_text"}
payload = json.dumps(belief)
belief_msg = InternalMessage( belief_msg = InternalMessage(
to=settings.agent_settings.bdi_belief_collector_name, to=settings.agent_settings.bdi_core_name,
sender=self.name, sender=self.name,
body=payload, body=BeliefMessage(
replace=[InternalBelief(name="user_said", arguments=[text])],
).model_dump_json(),
thread="beliefs", thread="beliefs",
) )
await self.send(belief_msg) await self.send(belief_msg)
async def _infer_new_beliefs(self): async def _infer_new_beliefs(self):
""" conversation_beliefs = await self.belief_inferrer.infer_from_conversation(self.conversation)
Process conversation history to extract beliefs, semantically. Any changed beliefs are sent
to the BDI core. new_beliefs = conversation_beliefs - self._current_beliefs
""" if not new_beliefs:
# Return instantly if there are no beliefs to infer
if not self.available_beliefs:
return return
candidate_beliefs = await self._infer_turn() self._current_beliefs |= new_beliefs
belief_changes = BeliefMessage()
for belief_key, belief_value in candidate_beliefs.items():
if belief_value is None:
continue
old_belief_value = self.beliefs.get(belief_key)
if belief_value == old_belief_value:
continue
self.beliefs[belief_key] = belief_value belief_changes = BeliefMessage(
create=list(new_beliefs.true),
delete=list(new_beliefs.false),
)
belief = InternalBelief(name=belief_key, arguments=None) message = InternalMessage(
if belief_value:
belief_changes.create.append(belief)
else:
belief_changes.delete.append(belief)
# Return if there were no changes in beliefs
if not belief_changes.has_values():
return
beliefs_message = InternalMessage(
to=settings.agent_settings.bdi_core_name, to=settings.agent_settings.bdi_core_name,
sender=self.name, sender=self.name,
body=belief_changes.model_dump_json(), body=belief_changes.model_dump_json(),
thread="beliefs", thread="beliefs",
) )
await self.send(beliefs_message) await self.send(message)
@staticmethod async def _infer_goal_completions(self):
def _split_into_chunks[T](items: list[T], n: int) -> list[list[T]]: goal_completions = await self.goal_inferrer.infer_from_conversation(self.conversation)
k, m = divmod(len(items), n)
return [items[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n)]
async def _infer_turn(self) -> dict: new_achieved = [
""" InternalBelief(name=goal, arguments=None)
Process the stored conversation history to extract semantic beliefs. Returns a list of for goal, achieved in goal_completions.items()
beliefs that have been set to ``True``, ``False`` or ``None``. if achieved and self._current_goal_completions.get(goal) != achieved
:return: A dict mapping belief names to a value ``True``, ``False`` or ``None``.
"""
n_parallel = max(1, min(settings.llm_settings.n_parallel - 1, len(self.available_beliefs)))
all_beliefs = await asyncio.gather(
*[
self._infer_beliefs(self.conversation, beliefs)
for beliefs in self._split_into_chunks(self.available_beliefs, n_parallel)
] ]
) new_not_achieved = [
retval = {} InternalBelief(name=goal, arguments=None)
for beliefs in all_beliefs: for goal, achieved in goal_completions.items()
if beliefs is None: if not achieved and self._current_goal_completions.get(goal) != achieved
continue
retval.update(beliefs)
return retval
@staticmethod
def _create_belief_schema(belief: SemanticBelief) -> tuple[str, dict]:
# TODO: use real belief names
return belief.name or slugify(belief.description), {
"type": ["boolean", "null"],
"description": belief.description,
}
@staticmethod
def _create_beliefs_schema(beliefs: list[SemanticBelief]) -> dict:
belief_schemas = [
TextBeliefExtractorAgent._create_belief_schema(belief) for belief in beliefs
] ]
for goal, achieved in goal_completions.items():
self._current_goal_completions[goal] = achieved
return { if not new_achieved and not new_not_achieved:
"type": "object", return
"properties": dict(belief_schemas),
"required": [name for name, _ in belief_schemas],
}
@staticmethod belief_changes = BeliefMessage(
def _format_message(message: ChatMessage): create=new_achieved,
return f"{message.role.upper()}:\n{message.content}" delete=new_not_achieved,
@staticmethod
def _format_conversation(conversation: ChatHistory):
return "\n\n".join(
[TextBeliefExtractorAgent._format_message(message) for message in conversation.messages]
) )
message = InternalMessage(
@staticmethod to=settings.agent_settings.bdi_core_name,
def _format_beliefs(beliefs: list[SemanticBelief]): sender=self.name,
# TODO: use real belief names body=belief_changes.model_dump_json(),
return "\n".join( thread="beliefs",
[
f"- {belief.name or slugify(belief.description)}: {belief.description}"
for belief in beliefs
]
) )
await self.send(message)
async def _infer_beliefs( class LLM:
self,
conversation: ChatHistory,
beliefs: list[SemanticBelief],
) -> dict | None:
""" """
Infer given beliefs based on the given conversation. Class that handles sending structured generation requests to an LLM.
:param conversation: The conversation to infer beliefs from.
:param beliefs: The beliefs to infer.
:return: A dict containing belief names and a boolean whether they hold, or None if the
belief cannot be inferred based on the given conversation.
"""
example = {
"example_belief": True,
}
prompt = f"""{self._format_conversation(conversation)}
Given the above conversation, what beliefs can be inferred?
If there is no relevant information about a belief belief, give null.
In case messages conflict, prefer using the most recent messages for inference.
Choose from the following list of beliefs, formatted as (belief_name, description):
{self._format_beliefs(beliefs)}
Respond with a JSON similar to the following, but with the property names as given above:
{json.dumps(example, indent=2)}
""" """
schema = self._create_beliefs_schema(beliefs) def __init__(self, agent: "TextBeliefExtractorAgent", n_parallel: int):
self._agent = agent
self._semaphore = asyncio.Semaphore(n_parallel)
return await self._retry_query_llm(prompt, schema) async def query(self, prompt: str, schema: dict, tries: int = 3) -> JSONLike | None:
async def _retry_query_llm(self, prompt: str, schema: dict, tries: int = 3) -> dict | None:
""" """
Query the LLM with the given prompt and schema, return an instance of a dict conforming Query the LLM with the given prompt and schema, return an instance of a dict conforming
to this schema. Try ``tries`` times, or return None. to this schema. Try ``tries`` times, or return None.
:param prompt: Prompt to be queried. :param prompt: Prompt to be queried.
:param schema: Schema to be queried. :param schema: Schema to be queried.
:param tries: Number of times to try to query the LLM.
:return: An instance of a dict conforming to this schema, or None if failed. :return: An instance of a dict conforming to this schema, or None if failed.
""" """
try_count = 0 try_count = 0
@@ -296,7 +259,7 @@ Respond with a JSON similar to the following, but with the property names as giv
except (httpx.HTTPError, json.JSONDecodeError, KeyError) as e: except (httpx.HTTPError, json.JSONDecodeError, KeyError) as e:
if try_count < tries: if try_count < tries:
continue continue
self.logger.exception( self._agent.logger.exception(
"Failed to get LLM response after %d tries.", "Failed to get LLM response after %d tries.",
try_count, try_count,
exc_info=e, exc_info=e,
@@ -304,11 +267,10 @@ Respond with a JSON similar to the following, but with the property names as giv
return None return None
@staticmethod async def _query_llm(self, prompt: str, schema: dict) -> JSONLike:
async def _query_llm(prompt: str, schema: dict) -> dict:
""" """
Query an LLM with the given prompt and schema, return an instance of a dict conforming to Query an LLM with the given prompt and schema, return an instance of a dict conforming
that schema. to that schema.
:param prompt: The prompt to be queried. :param prompt: The prompt to be queried.
:param schema: Schema to use during response. :param schema: Schema to use during response.
@@ -316,8 +278,10 @@ Respond with a JSON similar to the following, but with the property names as giv
:raises httpx.HTTPStatusError: If the LLM server responded with an error. :raises httpx.HTTPStatusError: If the LLM server responded with an error.
:raises json.JSONDecodeError: If the LLM response was not valid JSON. May happen if the :raises json.JSONDecodeError: If the LLM response was not valid JSON. May happen if the
response was cut off early due to length limitations. response was cut off early due to length limitations.
:raises KeyError: If the LLM server responded with no error, but the response was invalid. :raises KeyError: If the LLM server responded with no error, but the response was
invalid.
""" """
async with self._semaphore:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
response = await client.post( response = await client.post(
settings.llm_settings.local_llm_url, settings.llm_settings.local_llm_url,
@@ -336,10 +300,177 @@ Respond with a JSON similar to the following, but with the property names as giv
"temperature": settings.llm_settings.code_temperature, "temperature": settings.llm_settings.code_temperature,
"stream": False, "stream": False,
}, },
timeout=None, timeout=30.0,
) )
response.raise_for_status() response.raise_for_status()
response_json = response.json() response_json = response.json()
json_message = response_json["choices"][0]["message"]["content"] json_message = response_json["choices"][0]["message"]["content"]
return json.loads(json_message) return json.loads(json_message)
class SemanticBeliefInferrer:
"""
Class that handles only prompting an LLM for semantic beliefs.
"""
def __init__(
self,
llm: "TextBeliefExtractorAgent.LLM",
available_beliefs: list[SemanticBelief] | None = None,
):
self._llm = llm
self.available_beliefs: list[SemanticBelief] = available_beliefs or []
async def infer_from_conversation(self, conversation: ChatHistory) -> BeliefState:
"""
Process conversation history to extract beliefs, semantically. The result is an object that
describes all beliefs that hold or don't hold based on the full conversation.
:param conversation: The conversation history to be processed.
:return: An object that describes beliefs.
"""
# Return instantly if there are no beliefs to infer
if not self.available_beliefs:
return BeliefState()
n_parallel = max(1, min(settings.llm_settings.n_parallel - 1, len(self.available_beliefs)))
all_beliefs: list[dict[str, bool | None] | None] = await asyncio.gather(
*[
self._infer_beliefs(conversation, beliefs)
for beliefs in self._split_into_chunks(self.available_beliefs, n_parallel)
]
)
retval = BeliefState()
for beliefs in all_beliefs:
if beliefs is None:
continue
for belief_name, belief_holds in beliefs.items():
if belief_holds is None:
continue
belief = InternalBelief(name=belief_name, arguments=None)
if belief_holds:
retval.true.add(belief)
else:
retval.false.add(belief)
return retval
@staticmethod
def _split_into_chunks[T](items: list[T], n: int) -> list[list[T]]:
"""
Split a list into ``n`` chunks, making each chunk approximately ``len(items) / n`` long.
:param items: The list of items to split.
:param n: The number of desired chunks.
:return: A list of chunks each approximately ``len(items) / n`` long.
"""
k, m = divmod(len(items), n)
return [items[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n)]
async def _infer_beliefs(
self,
conversation: ChatHistory,
beliefs: list[SemanticBelief],
) -> dict[str, bool | None] | None:
"""
Infer given beliefs based on the given conversation.
:param conversation: The conversation to infer beliefs from.
:param beliefs: The beliefs to infer.
:return: A dict containing belief names and a boolean whether they hold, or None if the
belief cannot be inferred based on the given conversation.
"""
example = {
"example_belief": True,
}
prompt = f"""{self._format_conversation(conversation)}
Given the above conversation, what beliefs can be inferred?
If there is no relevant information about a belief belief, give null.
In case messages conflict, prefer using the most recent messages for inference.
Choose from the following list of beliefs, formatted as `- <belief_name>: <description>`:
{self._format_beliefs(beliefs)}
Respond with a JSON similar to the following, but with the property names as given above:
{json.dumps(example, indent=2)}
"""
schema = self._create_beliefs_schema(beliefs)
return await self._llm.query(prompt, schema)
@staticmethod
def _create_belief_schema(belief: SemanticBelief) -> tuple[str, dict]:
return AgentSpeakGenerator.slugify(belief), {
"type": ["boolean", "null"],
"description": belief.description,
}
@staticmethod
def _create_beliefs_schema(beliefs: list[SemanticBelief]) -> dict:
belief_schemas = [
SemanticBeliefInferrer._create_belief_schema(belief) for belief in beliefs
]
return {
"type": "object",
"properties": dict(belief_schemas),
"required": [name for name, _ in belief_schemas],
}
@staticmethod
def _format_message(message: ChatMessage):
return f"{message.role.upper()}:\n{message.content}"
@staticmethod
def _format_conversation(conversation: ChatHistory):
return "\n\n".join(
[SemanticBeliefInferrer._format_message(message) for message in conversation.messages]
)
@staticmethod
def _format_beliefs(beliefs: list[SemanticBelief]):
return "\n".join(
[f"- {AgentSpeakGenerator.slugify(belief)}: {belief.description}" for belief in beliefs]
)
class GoalAchievementInferrer(SemanticBeliefInferrer):
def __init__(self, llm: TextBeliefExtractorAgent.LLM):
super().__init__(llm)
self.goals = []
async def infer_from_conversation(self, conversation: ChatHistory) -> dict[str, bool]:
"""
Determine which goals have been achieved based on the given conversation.
:param conversation: The conversation to infer goal completion from.
:return: A mapping of goals and a boolean whether they have been achieved.
"""
if not self.goals:
return {}
goals_achieved = await asyncio.gather(
*[self._infer_goal(conversation, g) for g in self.goals]
)
return {
f"achieved_{AgentSpeakGenerator.slugify(goal)}": achieved
for goal, achieved in zip(self.goals, goals_achieved, strict=True)
}
async def _infer_goal(self, conversation: ChatHistory, goal: Goal) -> bool:
prompt = f"""{self._format_conversation(conversation)}
Given the above conversation, what has the following goal been achieved?
The name of the goal: {goal.name}
Description of the goal: {goal.description}
Answer with literally only `true` or `false` (without backticks)."""
schema = {
"type": "boolean",
}
return await self._llm.query(prompt, schema)

View File

@@ -192,7 +192,16 @@ class BaseAgent(ABC):
:param coro: The coroutine to execute as a task. :param coro: The coroutine to execute as a task.
""" """
task = asyncio.create_task(coro)
async def try_coro(coro_: Coroutine):
try:
await coro_
except asyncio.CancelledError:
self.logger.debug("A behavior was canceled successfully: %s", coro_)
except Exception:
self.logger.warning("An exception occurred in a behavior.", exc_info=True)
task = asyncio.create_task(try_coro(coro))
self._tasks.add(task) self._tasks.add(task)
task.add_done_callback(self._tasks.discard) task.add_done_callback(self._tasks.discard)
return task return task

View File

@@ -120,7 +120,6 @@ async def lifespan(app: FastAPI):
BDICoreAgent, BDICoreAgent,
{ {
"name": settings.agent_settings.bdi_core_name, "name": settings.agent_settings.bdi_core_name,
"asl": "src/control_backend/agents/bdi/rules.asl",
}, },
), ),
"BeliefCollectorAgent": ( "BeliefCollectorAgent": (

View File

@@ -0,0 +1,19 @@
from pydantic import BaseModel
from control_backend.schemas.program import Belief as ProgramBelief
from control_backend.schemas.program import Goal
class BeliefList(BaseModel):
"""
Represents a list of beliefs, separated from a program. Useful in agents which need to
communicate beliefs.
:ivar: beliefs: The list of beliefs.
"""
beliefs: list[ProgramBelief]
class GoalList(BaseModel):
goals: list[Goal]

View File

@@ -13,6 +13,9 @@ class Belief(BaseModel):
name: str name: str
arguments: list[str] | None arguments: list[str] | None
# To make it hashable
model_config = {"frozen": True}
class BeliefMessage(BaseModel): class BeliefMessage(BaseModel):
""" """

View File

@@ -43,7 +43,6 @@ class SemanticBelief(ProgramElement):
:ivar description: Description of how to form the belief, used by the LLM. :ivar description: Description of how to form the belief, used by the LLM.
""" """
name: str = ""
description: str description: str
@@ -113,10 +112,12 @@ class Goal(ProgramElement):
for example when the achieving of the goal is dependent on the user's reply, this means for example when the achieving of the goal is dependent on the user's reply, this means
that the achieved status will be set from somewhere else in the program. that the achieved status will be set from somewhere else in the program.
:ivar description: A description of the goal, used to determine if it has been achieved.
:ivar plan: The plan to execute. :ivar plan: The plan to execute.
:ivar can_fail: Whether we can fail to achieve the goal after executing the plan. :ivar can_fail: Whether we can fail to achieve the goal after executing the plan.
""" """
description: str = ""
plan: Plan plan: Plan
can_fail: bool = True can_fail: bool = True

View File

@@ -20,7 +20,7 @@ def mock_agentspeak_env():
@pytest.fixture @pytest.fixture
def agent(): def agent():
agent = BDICoreAgent("bdi_agent", "dummy.asl") agent = BDICoreAgent("bdi_agent")
agent.send = AsyncMock() agent.send = AsyncMock()
agent.bdi_agent = MagicMock() agent.bdi_agent = MagicMock()
return agent return agent
@@ -133,14 +133,14 @@ async def test_custom_actions(agent):
# Invoke action # Invoke action
mock_term = MagicMock() mock_term = MagicMock()
mock_term.args = ["Hello", "Norm", "Goal"] mock_term.args = ["Hello", "Norm"]
mock_intention = MagicMock() mock_intention = MagicMock()
# Run generator # Run generator
gen = action_fn(agent, mock_term, mock_intention) gen = action_fn(agent, mock_term, mock_intention)
next(gen) # Execute next(gen) # Execute
agent._send_to_llm.assert_called_with("Hello", "Norm", "Goal") agent._send_to_llm.assert_called_with("Hello", "Norm", "")
def test_add_belief_sets_event(agent): def test_add_belief_sets_event(agent):

View File

@@ -32,6 +32,8 @@ def make_valid_program_json(norm="N1", goal="G1") -> str:
Goal( Goal(
id=uuid.uuid4(), id=uuid.uuid4(),
name=goal, name=goal,
description="This description can be used to determine whether the goal "
"has been achieved.",
plan=Plan( plan=Plan(
id=uuid.uuid4(), id=uuid.uuid4(),
name="Goal Plan", name="Goal Plan",
@@ -53,7 +55,7 @@ async def test_send_to_bdi():
manager.send = AsyncMock() manager.send = AsyncMock()
program = Program.model_validate_json(make_valid_program_json()) program = Program.model_validate_json(make_valid_program_json())
await manager._send_to_bdi(program) await manager._create_agentspeak_and_send_to_bdi(program)
assert manager.send.await_count == 1 assert manager.send.await_count == 1
msg: InternalMessage = manager.send.await_args[0][0] msg: InternalMessage = manager.send.await_args[0][0]
@@ -75,8 +77,9 @@ async def test_receive_programs_valid_and_invalid():
] ]
manager = BDIProgramManager(name="program_manager_test") manager = BDIProgramManager(name="program_manager_test")
manager._internal_pub_socket = AsyncMock()
manager.sub_socket = sub manager.sub_socket = sub
manager._send_to_bdi = AsyncMock() manager._create_agentspeak_and_send_to_bdi = AsyncMock()
try: try:
# Will give StopAsyncIteration when the predefined `sub.recv_multipart` side-effects run out # Will give StopAsyncIteration when the predefined `sub.recv_multipart` side-effects run out
@@ -85,7 +88,7 @@ async def test_receive_programs_valid_and_invalid():
pass pass
# Only valid Program should have triggered _send_to_bdi # Only valid Program should have triggered _send_to_bdi
assert manager._send_to_bdi.await_count == 1 assert manager._create_agentspeak_and_send_to_bdi.await_count == 1
forwarded: Program = manager._send_to_bdi.await_args[0][0] forwarded: Program = manager._create_agentspeak_and_send_to_bdi.await_args[0][0]
assert forwarded.phases[0].norms[0].name == "N1" assert forwarded.phases[0].norms[0].name == "N1"
assert forwarded.phases[0].goals[0].name == "G1" assert forwarded.phases[0].goals[0].name == "G1"

View File

@@ -8,9 +8,11 @@ import pytest
from control_backend.agents.bdi import TextBeliefExtractorAgent from control_backend.agents.bdi import TextBeliefExtractorAgent
from control_backend.core.agent_system import InternalMessage from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList
from control_backend.schemas.belief_message import BeliefMessage from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.program import ( from control_backend.schemas.program import (
ConditionalNorm, ConditionalNorm,
KeywordBelief,
LLMAction, LLMAction,
Phase, Phase,
Plan, Plan,
@@ -186,13 +188,31 @@ async def test_retry_query_llm_fail_immediately(agent):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_extracting_beliefs_from_program(agent, sample_program): async def test_extracting_semantic_beliefs(agent):
"""
The Program Manager sends beliefs to this agent. Test whether the agent handles them correctly.
"""
assert len(agent.available_beliefs) == 0 assert len(agent.available_beliefs) == 0
beliefs = BeliefList(
beliefs=[
KeywordBelief(
id=uuid.uuid4(),
name="keyword_hello",
keyword="hello",
),
SemanticBelief(
id=uuid.uuid4(), name="semantic_hello_1", description="Some semantic belief 1"
),
SemanticBelief(
id=uuid.uuid4(), name="semantic_hello_2", description="Some semantic belief 2"
),
]
)
await agent.handle_message( await agent.handle_message(
InternalMessage( InternalMessage(
to=settings.agent_settings.text_belief_extractor_name, to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.bdi_program_manager_name, sender=settings.agent_settings.bdi_program_manager_name,
body=sample_program.model_dump_json(), body=beliefs.model_dump_json(),
), ),
) )
assert len(agent.available_beliefs) == 2 assert len(agent.available_beliefs) == 2

View File

@@ -43,6 +43,8 @@ def make_valid_program_dict():
Goal( Goal(
id=uuid.uuid4(), id=uuid.uuid4(),
name="Some goal", name="Some goal",
description="This description can be used to determine whether the goal "
"has been achieved.",
plan=Plan( plan=Plan(
id=uuid.uuid4(), id=uuid.uuid4(),
name="Goal Plan", name="Goal Plan",

View File

@@ -31,6 +31,7 @@ def base_goal() -> Goal:
return Goal( return Goal(
id=uuid.uuid4(), id=uuid.uuid4(),
name="testGoalName", name="testGoalName",
description="This description can be used to determine whether the goal has been achieved.",
plan=Plan( plan=Plan(
id=uuid.uuid4(), id=uuid.uuid4(),
name="testGoalPlanName", name="testGoalPlanName",