Files
pepperplus-cb/src/control_backend/agents/bdi/agentspeak_generator.py
2026-02-02 14:31:26 +01:00

897 lines
31 KiB
Python

"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
from functools import singledispatchmethod
from slugify import slugify
from control_backend.agents.bdi.agentspeak_ast import (
AstAtom,
AstBinaryOp,
AstExpression,
AstLiteral,
AstNumber,
AstPlan,
AstProgram,
AstRule,
AstStatement,
AstString,
AstVar,
BinaryOperatorType,
StatementType,
TriggerType,
)
from control_backend.core.config import settings
from control_backend.schemas.program import (
BaseGoal,
BasicNorm,
ConditionalNorm,
EmotionBelief,
GestureAction,
Goal,
InferredBelief,
KeywordBelief,
LLMAction,
LogicalOperator,
Norm,
Phase,
PlanElement,
Program,
ProgramElement,
SemanticBelief,
SpeechAction,
Trigger,
)
class AgentSpeakGenerator:
"""
Generator class that translates a high-level :class:`~control_backend.schemas.program.Program`
into AgentSpeak(L) source code.
It handles the conversion of phases, norms, goals, and triggers into AgentSpeak rules and plans,
ensuring the robot follows the defined behavioral logic.
The generator follows a systematic approach:
1. Sets up initial phase and cycle notification rules
2. Adds keyword inference capabilities for natural language processing
3. Creates default plans for common operations
4. Processes each phase with its norms, goals, and triggers
5. Adds fallback plans for robust execution
:ivar _asp: The internal AgentSpeak program representation being built.
"""
_asp: AstProgram
logger = logging.getLogger(__name__)
def generate(self, program: Program) -> str:
"""
Translates a Program object into an AgentSpeak source string.
This is the main entry point for the code generation process. It initializes
the AgentSpeak program structure and orchestrates the conversion of all
program elements into their AgentSpeak representations.
:param program: The behavior program to translate.
:return: The generated AgentSpeak code as a string.
"""
self._asp = AstProgram()
if program.phases:
self._asp.rules.append(AstRule(self._astify(program.phases[0])))
else:
self._asp.rules.append(AstRule(AstLiteral("phase", [AstString("end")])))
self._asp.rules.append(AstRule(AstLiteral("!notify_cycle")))
self._add_keyword_inference()
self._add_default_plans()
self._process_phases(program.phases)
self._add_fallbacks()
return str(self._asp)
def _add_keyword_inference(self) -> None:
"""
Adds inference rules for keyword detection in user messages.
This method creates rules that allow the system to detect when specific
keywords are mentioned in user messages. It uses string operations to
check if a keyword is a substring of the user's message.
The generated rule has the form:
keyword_said(Keyword) :- user_said(Message) & .substring_case_insensitive(Keyword, Message, Pos) & Pos >= 0
This enables the system to trigger behaviors based on keyword detection.
"""
keyword = AstVar("Keyword")
message = AstVar("Message")
position = AstVar("Pos")
self._asp.rules.append(
AstRule(
AstLiteral("keyword_said", [keyword]),
AstLiteral("user_said", [message])
& AstLiteral(".substring_case_insensitive", [keyword, message, position])
& (position >= 0),
)
)
def _add_default_plans(self):
"""
Adds default plans for common operations.
This method sets up the standard plans that handle fundamental operations
like replying with goals, simple speech actions, general replies, and
cycle notifications. These plans provide the basic infrastructure for
the agent's reactive behavior.
"""
self._add_reply_with_goal_plan()
self._add_say_plan()
self._add_notify_cycle_plan()
def _add_reply_with_goal_plan(self):
"""
Adds a plan for replying with a specific conversational goal.
This plan handles the case where the agent needs to respond to user input
while pursuing a specific conversational goal. It:
1. Marks that the agent has responded this turn
2. Gathers all active norms
3. Generates a reply that considers both the user message and the goal
Trigger: +!reply_with_goal(Goal)
Context: user_said(Message)
"""
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("reply_with_goal", [AstVar("Goal")]),
[AstLiteral("user_said", [AstVar("Message")])],
[
AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")),
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"findall",
[AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")],
),
),
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"reply_with_goal", [AstVar("Message"), AstVar("Norms"), AstVar("Goal")]
),
),
],
)
)
def _add_say_plan(self):
"""
Adds a plan for simple speech actions.
This plan handles direct speech actions where the agent needs to say
a specific text. It:
1. Marks that the agent has responded this turn
2. Executes the speech action
Trigger: +!say(Text)
Context: None (can be executed anytime)
"""
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("say", [AstVar("Text")]),
[],
[
AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")),
AstStatement(StatementType.DO_ACTION, AstLiteral("say", [AstVar("Text")])),
],
)
)
def _add_notify_cycle_plan(self):
"""
Adds a plan for cycle notification.
This plan handles the periodic notification cycle that allows the system
to monitor and report on the current state. It:
1. Gathers all active norms
2. Notifies the system about the current norms
3. Waits briefly to allow processing
4. Recursively triggers the next cycle
Trigger: +!notify_cycle
Context: None (can be executed anytime)
"""
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("notify_cycle"),
[],
[
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"findall",
[AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")],
),
),
AstStatement(
StatementType.DO_ACTION, AstLiteral("notify_norms", [AstVar("Norms")])
),
AstStatement(StatementType.DO_ACTION, AstLiteral("wait", [AstNumber(100)])),
AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("notify_cycle")),
],
)
)
def _add_stop_plan(self, phase: Phase):
"""
Adds a plan to stop the program. This just skips to the end phase,
where there is no behavior defined.
"""
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("stop"),
[AstLiteral("phase", [AstString(phase.id)])],
[
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"notify_transition_phase",
[
AstString(phase.id),
AstString("end")
]
)
),
AstStatement(
StatementType.REMOVE_BELIEF,
AstLiteral("phase", [AstVar("Phase")]),
),
AstStatement(
StatementType.ADD_BELIEF,
AstLiteral("phase", [AstString("end")])
)
]
)
)
def _process_phases(self, phases: list[Phase]) -> None:
"""
Processes all phases in the program and their transitions.
This method iterates through each phase and:
1. Processes the current phase (norms, goals, triggers)
2. Sets up transitions between phases
3. Adds special handling for the end phase
:param phases: The list of phases to process.
"""
for curr_phase, next_phase in zip([None] + phases, phases + [None], strict=True):
if curr_phase:
self._process_phase(curr_phase)
self._add_phase_transition(curr_phase, next_phase)
def _process_phase(self, phase: Phase) -> None:
"""
Processes a single phase, including its norms, goals, and triggers.
This method handles the complete processing of a phase by:
1. Processing all norms in the phase
2. Setting up the default execution loop for the phase
3. Processing all goals in sequence
4. Processing all triggers for reactive behavior
:param phase: The phase to process.
"""
for norm in phase.norms:
self._process_norm(norm, phase)
self._add_default_loop(phase)
previous_goal = None
for goal in phase.goals:
self._process_goal(goal, phase, previous_goal, main_goal=True)
previous_goal = goal
for trigger in phase.triggers:
self._process_trigger(trigger, phase)
# Add force transition to end phase
self._add_stop_plan(phase)
def _add_phase_transition(self, from_phase: Phase | None, to_phase: Phase | None) -> None:
"""
Adds plans for transitioning between phases.
This method creates two plans for each phase transition:
1. A check plan that verifies if transition conditions are met
2. A force plan that actually performs the transition (can be forced externally)
The transition involves:
- Notifying the system about the phase change
- Removing the current phase belief
- Adding the next phase belief
:param from_phase: The phase being transitioned from (or None for initial setup).
:param to_phase: The phase being transitioned to (or None for end phase).
"""
if from_phase is None:
return
from_phase_ast = self._astify(from_phase)
to_phase_ast = (
self._astify(to_phase) if to_phase else AstLiteral("phase", [AstString("end")])
)
check_context = [from_phase_ast]
if from_phase:
for goal in from_phase.goals:
check_context.append(self._astify(goal, achieved=True))
force_context = [from_phase_ast]
body = [
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"notify_transition_phase",
[
AstString(str(from_phase.id)),
AstString(str(to_phase.id) if to_phase else "end"),
],
),
),
AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast),
AstStatement(StatementType.ADD_BELIEF, to_phase_ast),
]
# Check
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("transition_phase"),
check_context,
[
AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("force_transition_phase")),
],
)
)
# Force
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL, AstLiteral("force_transition_phase"), force_context, body
)
)
def _process_norm(self, norm: Norm, phase: Phase) -> None:
"""
Processes a norm and adds it as an inference rule.
This method converts norms into AgentSpeak rules that define when
the norm should be active. It handles both basic norms (always active
in their phase) and conditional norms (active only when their condition
is met).
:param norm: The norm to process.
:param phase: The phase this norm belongs to.
"""
rule: AstRule | None = None
match norm:
case ConditionalNorm(condition=cond):
rule = AstRule(
self._astify(norm),
self._astify(phase) & self._astify(cond)
| AstAtom(f"force_{self.slugify(norm)}"),
)
case BasicNorm():
rule = AstRule(self._astify(norm), self._astify(phase))
if not rule:
return
self._asp.rules.append(rule)
def _add_default_loop(self, phase: Phase) -> None:
"""
Adds the default execution loop for a phase.
This method creates the main reactive loop that runs when the agent
receives user input during a phase. The loop:
1. Notifies the system about the user input
2. Resets the response tracking
3. Executes all phase goals
4. Attempts phase transition
:param phase: The phase to create the loop for.
"""
actions = []
actions.append(
AstStatement(
StatementType.DO_ACTION, AstLiteral("notify_user_said", [AstVar("Message")])
)
)
actions.append(AstStatement(StatementType.REMOVE_BELIEF, AstLiteral("responded_this_turn")))
for goal in phase.goals:
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, self._astify(goal)))
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("transition_phase")))
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_BELIEF,
AstLiteral("user_said", [AstVar("Message")]),
[self._astify(phase)],
actions,
)
)
def _process_goal(
self,
goal: Goal,
phase: Phase,
previous_goal: Goal | None = None,
continues_response: bool = False,
main_goal: bool = False,
) -> None:
"""
Processes a goal and creates plans for achieving it.
This method creates two plans for each goal:
1. A main plan that executes the goal's steps when conditions are met
2. A fallback plan that provides a default empty implementation (prevents crashes)
The method also recursively processes any subgoals contained within
the goal's plan.
:param goal: The goal to process.
:param phase: The phase this goal belongs to.
:param previous_goal: The previous goal in sequence (for dependency tracking).
:param continues_response: Whether this goal continues an existing response.
:param main_goal: Whether this is a main goal (for UI notification purposes).
"""
context: list[AstExpression] = [self._astify(phase)]
if goal.can_fail:
context.append(~self._astify(goal, achieved=True))
if previous_goal and previous_goal.can_fail:
context.append(self._astify(previous_goal, achieved=True))
if not continues_response:
context.append(~AstLiteral("responded_this_turn"))
body = []
if main_goal: # UI only needs to know about the main goals
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral("notify_goal_start", [AstString(self.slugify(goal))]),
)
)
subgoals = []
for step in goal.plan.steps:
body.append(self._step_to_statement(step))
if isinstance(step, Goal):
subgoals.append(step)
if not goal.can_fail and not continues_response:
body.append(AstStatement(StatementType.ADD_BELIEF, self._astify(goal, achieved=True)))
if len(body) == 0:
self.logger.warning("Goal with no plan detected: %s", goal.name)
body.append(AstStatement(StatementType.EMPTY, AstLiteral("true")))
self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(goal), context, body))
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
self._astify(goal),
context=[],
body=[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
prev_goal = None
for subgoal in subgoals:
self._process_goal(subgoal, phase, prev_goal)
prev_goal = subgoal
def _step_to_statement(self, step: PlanElement) -> AstStatement:
"""
Converts a plan step to an AgentSpeak statement.
This method transforms different types of plan elements into their
corresponding AgentSpeak statements. Goals and speech-related actions
become achieve-goal statements, while gesture actions become do-action
statements.
:param step: The plan element to convert.
:return: The corresponding AgentSpeak statement.
"""
match step:
# Note that SpeechAction gets included in the ACHIEVE_GOAL, since it's a goal internally
case Goal() | SpeechAction() | LLMAction() as a:
return AstStatement(StatementType.ACHIEVE_GOAL, self._astify(a))
case GestureAction() as a:
return AstStatement(StatementType.DO_ACTION, self._astify(a))
def _process_trigger(self, trigger: Trigger, phase: Phase) -> None:
"""
Processes a trigger and creates plans for its execution.
This method creates plans that execute when trigger conditions are met.
It handles both automatic triggering (when conditions are detected) and
manual forcing (from UI). The trigger execution includes:
1. Notifying the system about trigger start
2. Executing all trigger steps
3. Waiting briefly for UI display
4. Notifying the system about trigger end
:param trigger: The trigger to process.
:param phase: The phase this trigger belongs to.
"""
body = []
subgoals = []
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral("notify_trigger_start", [AstString(self.slugify(trigger))]),
)
)
for step in trigger.plan.steps:
if isinstance(step, Goal):
new_step = step.model_copy(update={"can_fail": False}) # triggers are sequence
subgoals.append(new_step)
body.append(self._step_to_statement(step))
# Arbitrary wait for UI to display nicely
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral("wait", [AstNumber(settings.behaviour_settings.trigger_time_to_wait)]),
)
)
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral("notify_trigger_end", [AstString(self.slugify(trigger))]),
)
)
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("check_triggers"),
[self._astify(phase), self._astify(trigger.condition)],
body,
)
)
# Force trigger (from UI)
self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(trigger), [], body))
for subgoal in subgoals:
self._process_goal(subgoal, phase, continues_response=True)
def _add_fallbacks(self):
"""
Adds fallback plans for robust execution, preventing crashes.
This method creates fallback plans that provide default empty implementations
for key goals. These fallbacks ensure that the system can continue execution
even when no specific plans are applicable, preventing crashes.
The fallbacks are created for:
- check_triggers: When no triggers are applicable
- transition_phase: When phase transition conditions aren't met
- force_transition_phase: When forced transitions aren't possible
- stop: When we are already in the end phase
"""
# Trigger fallback
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("check_triggers"),
[],
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
# Phase transition fallback
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("transition_phase"),
[],
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
# Force phase transition fallback
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("force_transition_phase"),
[],
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
# Stop fallback
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("stop"),
[],
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
@singledispatchmethod
def _astify(self, element: ProgramElement) -> AstExpression:
"""
Converts program elements to AgentSpeak expressions (base method).
This is the base method for the singledispatch mechanism that handles
conversion of different program element types to their AgentSpeak
representations. Specific implementations are provided for each
element type through registered methods.
:param element: The program element to convert.
:return: The corresponding AgentSpeak expression.
:raises NotImplementedError: If no specific implementation exists for the element type.
"""
raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.")
@_astify.register
def _(self, kwb: KeywordBelief) -> AstExpression:
"""
Converts a KeywordBelief to an AgentSpeak expression.
Keyword beliefs are converted to keyword_said literals that check
if the keyword was mentioned in user input.
:param kwb: The KeywordBelief to convert.
:return: An AstLiteral representing the keyword detection.
"""
return AstLiteral("keyword_said", [AstString(kwb.keyword)])
@_astify.register
def _(self, sb: SemanticBelief) -> AstExpression:
"""
Converts a SemanticBelief to an AgentSpeak expression.
Semantic beliefs are converted to literals using their slugified names,
which are used for LLM-based belief evaluation.
:param sb: The SemanticBelief to convert.
:return: An AstLiteral representing the semantic belief.
"""
return AstLiteral(self.slugify(sb))
@_astify.register
def _(self, eb: EmotionBelief) -> AstExpression:
return AstLiteral("emotion_detected", [AstAtom(eb.emotion)])
@_astify.register
def _(self, ib: InferredBelief) -> AstExpression:
"""
Converts an InferredBelief to an AgentSpeak expression.
Inferred beliefs are converted to binary operations that combine
their left and right operands using the appropriate logical operator.
:param ib: The InferredBelief to convert.
:return: An AstBinaryOp representing the logical combination.
"""
return AstBinaryOp(
self._astify(ib.left),
BinaryOperatorType.AND if ib.operator == LogicalOperator.AND else BinaryOperatorType.OR,
self._astify(ib.right),
)
@_astify.register
def _(self, norm: Norm) -> AstExpression:
"""
Converts a Norm to an AgentSpeak expression.
Norms are converted to literals with either 'norm' or 'critical_norm'
functors depending on their critical flag, with the norm text as an argument.
Note that currently, critical norms are not yet functionally supported. They are possible
to astify for future use.
:param norm: The Norm to convert.
:return: An AstLiteral representing the norm.
"""
functor = "critical_norm" if norm.critical else "norm"
return AstLiteral(functor, [AstString(norm.norm)])
@_astify.register
def _(self, phase: Phase) -> AstExpression:
"""
Converts a Phase to an AgentSpeak expression.
Phases are converted to phase literals with their unique identifier
as an argument, which is used for phase tracking and transitions.
:param phase: The Phase to convert.
:return: An AstLiteral representing the phase.
"""
return AstLiteral("phase", [AstString(str(phase.id))])
@_astify.register
def _(self, goal: Goal, achieved: bool = False) -> AstExpression:
"""
Converts a Goal to an AgentSpeak expression.
Goals are converted to literals using their slugified names. If the
achieved parameter is True, the literal is prefixed with 'achieved_'.
:param goal: The Goal to convert.
:param achieved: Whether to represent this as an achieved goal.
:return: An AstLiteral representing the goal.
"""
return AstLiteral(f"{'achieved_' if achieved else ''}{self._slugify_str(goal.name)}")
@_astify.register
def _(self, trigger: Trigger) -> AstExpression:
"""
Converts a Trigger to an AgentSpeak expression.
Triggers are converted to literals using their slugified names,
which are used to identify and execute trigger plans.
:param trigger: The Trigger to convert.
:return: An AstLiteral representing the trigger.
"""
return AstLiteral(self.slugify(trigger))
@_astify.register
def _(self, sa: SpeechAction) -> AstExpression:
"""
Converts a SpeechAction to an AgentSpeak expression.
Speech actions are converted to say literals with the text content
as an argument, which are used for direct speech output.
:param sa: The SpeechAction to convert.
:return: An AstLiteral representing the speech action.
"""
return AstLiteral("say", [AstString(sa.text)])
@_astify.register
def _(self, ga: GestureAction) -> AstExpression:
"""
Converts a GestureAction to an AgentSpeak expression.
Gesture actions are converted to gesture literals with the gesture
type and name as arguments, which are used for physical robot gestures.
:param ga: The GestureAction to convert.
:return: An AstLiteral representing the gesture action.
"""
gesture = ga.gesture
return AstLiteral("gesture", [AstString(gesture.type), AstString(gesture.name)])
@_astify.register
def _(self, la: LLMAction) -> AstExpression:
"""
Converts an LLMAction to an AgentSpeak expression.
LLM actions are converted to reply_with_goal literals with the
conversational goal as an argument, which are used for LLM-generated
responses guided by specific goals.
:param la: The LLMAction to convert.
:return: An AstLiteral representing the LLM action.
"""
return AstLiteral("reply_with_goal", [AstString(la.goal)])
@singledispatchmethod
@staticmethod
def slugify(element: ProgramElement) -> str:
"""
Converts program elements to slugs (base method).
This is the base method for the singledispatch mechanism that handles
conversion of different program element types to their slug representations.
Specific implementations are provided for each element type through
registered methods.
Slugs are used outside of AgentSpeak, mostly for identifying what to send to the AgentSpeak
program as beliefs.
:param element: The program element to convert to a slug.
:return: The slug string representation.
:raises NotImplementedError: If no specific implementation exists for the element type.
"""
raise NotImplementedError(f"Cannot convert element {element} to a slug.")
@slugify.register
@staticmethod
def _(n: Norm) -> str:
"""
Converts a Norm to a slug.
Norms are converted to slugs with the 'norm_' prefix followed by
the slugified norm text.
:param n: The Norm to convert.
:return: The slug string representation.
"""
return f"norm_{AgentSpeakGenerator._slugify_str(n.norm)}"
@slugify.register
@staticmethod
def _(sb: SemanticBelief) -> str:
"""
Converts a SemanticBelief to a slug.
Semantic beliefs are converted to slugs with the 'semantic_' prefix
followed by the slugified belief name.
:param sb: The SemanticBelief to convert.
:return: The slug string representation.
"""
return f"semantic_{AgentSpeakGenerator._slugify_str(sb.name)}"
@slugify.register
@staticmethod
def _(g: BaseGoal) -> str:
"""
Converts a BaseGoal to a slug.
Goals are converted to slugs using their slugified names directly.
:param g: The BaseGoal to convert.
:return: The slug string representation.
"""
return AgentSpeakGenerator._slugify_str(g.name)
@slugify.register
@staticmethod
def _(t: Trigger) -> str:
"""
Converts a Trigger to a slug.
Triggers are converted to slugs with the 'trigger_' prefix followed by
the slugified trigger name.
:param t: The Trigger to convert.
:return: The slug string representation.
"""
return f"trigger_{AgentSpeakGenerator._slugify_str(t.name)}"
@staticmethod
def _slugify_str(text: str) -> str:
"""
Converts a text string to a slug.
This helper method converts arbitrary text to a URL-friendly slug format
by converting to lowercase, removing special characters, and replacing
spaces with underscores. It also removes common stopwords.
:param text: The text string to convert.
:return: The slugified string.
"""
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])