529 lines
17 KiB
Python
529 lines
17 KiB
Python
from functools import singledispatchmethod
|
|
|
|
from slugify import slugify
|
|
|
|
from control_backend.agents.bdi.agentspeak_ast import (
|
|
AstAtom,
|
|
AstBinaryOp,
|
|
AstExpression,
|
|
AstLiteral,
|
|
AstNumber,
|
|
AstPlan,
|
|
AstProgram,
|
|
AstRule,
|
|
AstStatement,
|
|
AstString,
|
|
AstVar,
|
|
BinaryOperatorType,
|
|
StatementType,
|
|
TriggerType,
|
|
)
|
|
from control_backend.schemas.program import (
|
|
BaseGoal,
|
|
BasicNorm,
|
|
ConditionalNorm,
|
|
GestureAction,
|
|
Goal,
|
|
InferredBelief,
|
|
KeywordBelief,
|
|
LLMAction,
|
|
LogicalOperator,
|
|
Norm,
|
|
Phase,
|
|
PlanElement,
|
|
Program,
|
|
ProgramElement,
|
|
SemanticBelief,
|
|
SpeechAction,
|
|
Trigger,
|
|
)
|
|
|
|
|
|
class AgentSpeakGenerator:
|
|
"""
|
|
Generator class that translates a high-level :class:`~control_backend.schemas.program.Program`
|
|
into AgentSpeak(L) source code.
|
|
|
|
It handles the conversion of phases, norms, goals, and triggers into AgentSpeak rules and plans,
|
|
ensuring the robot follows the defined behavioral logic.
|
|
"""
|
|
|
|
_asp: AstProgram
|
|
|
|
def generate(self, program: Program) -> str:
|
|
"""
|
|
Translates a Program object into an AgentSpeak source string.
|
|
|
|
:param program: The behavior program to translate.
|
|
:return: The generated AgentSpeak code as a string.
|
|
"""
|
|
self._asp = AstProgram()
|
|
|
|
if program.phases:
|
|
self._asp.rules.append(AstRule(self._astify(program.phases[0])))
|
|
else:
|
|
self._asp.rules.append(AstRule(AstLiteral("phase", [AstString("end")])))
|
|
|
|
self._asp.rules.append(AstRule(AstLiteral("!notify_cycle")))
|
|
|
|
self._add_keyword_inference()
|
|
self._add_default_plans()
|
|
|
|
self._process_phases(program.phases)
|
|
|
|
self._add_fallbacks()
|
|
|
|
return str(self._asp)
|
|
|
|
def _add_keyword_inference(self) -> None:
|
|
keyword = AstVar("Keyword")
|
|
message = AstVar("Message")
|
|
position = AstVar("Pos")
|
|
|
|
self._asp.rules.append(
|
|
AstRule(
|
|
AstLiteral("keyword_said", [keyword]),
|
|
AstLiteral("user_said", [message])
|
|
& AstLiteral(".substring", [keyword, message, position])
|
|
& (position >= 0),
|
|
)
|
|
)
|
|
|
|
def _add_default_plans(self):
|
|
self._add_reply_with_goal_plan()
|
|
self._add_say_plan()
|
|
self._add_reply_plan()
|
|
self._add_notify_cycle_plan()
|
|
|
|
def _add_reply_with_goal_plan(self):
|
|
self._asp.plans.append(
|
|
AstPlan(
|
|
TriggerType.ADDED_GOAL,
|
|
AstLiteral("reply_with_goal", [AstVar("Goal")]),
|
|
[AstLiteral("user_said", [AstVar("Message")])],
|
|
[
|
|
AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")),
|
|
AstStatement(
|
|
StatementType.DO_ACTION,
|
|
AstLiteral(
|
|
"findall",
|
|
[AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")],
|
|
),
|
|
),
|
|
AstStatement(
|
|
StatementType.DO_ACTION,
|
|
AstLiteral(
|
|
"reply_with_goal", [AstVar("Message"), AstVar("Norms"), AstVar("Goal")]
|
|
),
|
|
),
|
|
],
|
|
)
|
|
)
|
|
|
|
def _add_say_plan(self):
|
|
self._asp.plans.append(
|
|
AstPlan(
|
|
TriggerType.ADDED_GOAL,
|
|
AstLiteral("say", [AstVar("Text")]),
|
|
[],
|
|
[
|
|
AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")),
|
|
AstStatement(StatementType.DO_ACTION, AstLiteral("say", [AstVar("Text")])),
|
|
],
|
|
)
|
|
)
|
|
|
|
def _add_reply_plan(self):
|
|
self._asp.plans.append(
|
|
AstPlan(
|
|
TriggerType.ADDED_GOAL,
|
|
AstLiteral("reply"),
|
|
[AstLiteral("user_said", [AstVar("Message")])],
|
|
[
|
|
AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")),
|
|
AstStatement(
|
|
StatementType.DO_ACTION,
|
|
AstLiteral(
|
|
"findall",
|
|
[AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")],
|
|
),
|
|
),
|
|
AstStatement(
|
|
StatementType.DO_ACTION,
|
|
AstLiteral("reply", [AstVar("Message"), AstVar("Norms")]),
|
|
),
|
|
],
|
|
)
|
|
)
|
|
|
|
def _add_notify_cycle_plan(self):
|
|
self._asp.plans.append(
|
|
AstPlan(
|
|
TriggerType.ADDED_GOAL,
|
|
AstLiteral("notify_cycle"),
|
|
[],
|
|
[
|
|
AstStatement(
|
|
StatementType.DO_ACTION,
|
|
AstLiteral(
|
|
"findall",
|
|
[AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")],
|
|
),
|
|
),
|
|
AstStatement(
|
|
StatementType.DO_ACTION, AstLiteral("notify_norms", [AstVar("Norms")])
|
|
),
|
|
AstStatement(StatementType.DO_ACTION, AstLiteral("wait", [AstNumber(100)])),
|
|
AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("notify_cycle")),
|
|
],
|
|
)
|
|
)
|
|
|
|
def _process_phases(self, phases: list[Phase]) -> None:
|
|
for curr_phase, next_phase in zip([None] + phases, phases + [None], strict=True):
|
|
if curr_phase:
|
|
self._process_phase(curr_phase)
|
|
self._add_phase_transition(curr_phase, next_phase)
|
|
|
|
# End phase behavior
|
|
# When deleting this, the entire `reply` plan and action can be deleted
|
|
self._asp.plans.append(
|
|
AstPlan(
|
|
type=TriggerType.ADDED_BELIEF,
|
|
trigger_literal=AstLiteral("user_said", [AstVar("Message")]),
|
|
context=[AstLiteral("phase", [AstString("end")])],
|
|
body=[
|
|
AstStatement(
|
|
StatementType.DO_ACTION, AstLiteral("notify_user_said", [AstVar("Message")])
|
|
),
|
|
AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("reply")),
|
|
],
|
|
)
|
|
)
|
|
|
|
def _process_phase(self, phase: Phase) -> None:
|
|
for norm in phase.norms:
|
|
self._process_norm(norm, phase)
|
|
|
|
self._add_default_loop(phase)
|
|
|
|
previous_goal = None
|
|
for goal in phase.goals:
|
|
self._process_goal(goal, phase, previous_goal, main_goal=True)
|
|
previous_goal = goal
|
|
|
|
for trigger in phase.triggers:
|
|
self._process_trigger(trigger, phase)
|
|
|
|
def _add_phase_transition(self, from_phase: Phase | None, to_phase: Phase | None) -> None:
|
|
if from_phase is None:
|
|
return
|
|
from_phase_ast = self._astify(from_phase)
|
|
to_phase_ast = (
|
|
self._astify(to_phase) if to_phase else AstLiteral("phase", [AstString("end")])
|
|
)
|
|
|
|
check_context = [from_phase_ast]
|
|
if from_phase:
|
|
for goal in from_phase.goals:
|
|
check_context.append(self._astify(goal, achieved=True))
|
|
|
|
force_context = [from_phase_ast]
|
|
|
|
body = [
|
|
AstStatement(
|
|
StatementType.DO_ACTION,
|
|
AstLiteral(
|
|
"notify_transition_phase",
|
|
[
|
|
AstString(str(from_phase.id)),
|
|
AstString(str(to_phase.id) if to_phase else "end"),
|
|
],
|
|
),
|
|
),
|
|
AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast),
|
|
AstStatement(StatementType.ADD_BELIEF, to_phase_ast),
|
|
]
|
|
|
|
# if from_phase:
|
|
# body.extend(
|
|
# [
|
|
# AstStatement(
|
|
# StatementType.TEST_GOAL, AstLiteral("user_said", [AstVar("Message")])
|
|
# ),
|
|
# AstStatement(
|
|
# StatementType.REPLACE_BELIEF, AstLiteral("user_said", [AstVar("Message")])
|
|
# ),
|
|
# ]
|
|
# )
|
|
|
|
# Check
|
|
self._asp.plans.append(
|
|
AstPlan(
|
|
TriggerType.ADDED_GOAL,
|
|
AstLiteral("transition_phase"),
|
|
check_context,
|
|
[
|
|
AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("force_transition_phase")),
|
|
],
|
|
)
|
|
)
|
|
|
|
# Force
|
|
self._asp.plans.append(
|
|
AstPlan(
|
|
TriggerType.ADDED_GOAL, AstLiteral("force_transition_phase"), force_context, body
|
|
)
|
|
)
|
|
|
|
def _process_norm(self, norm: Norm, phase: Phase) -> None:
|
|
rule: AstRule | None = None
|
|
|
|
match norm:
|
|
case ConditionalNorm(condition=cond):
|
|
rule = AstRule(
|
|
self._astify(norm),
|
|
self._astify(phase) & self._astify(cond)
|
|
| AstAtom(f"force_{self.slugify(norm)}"),
|
|
)
|
|
case BasicNorm():
|
|
rule = AstRule(self._astify(norm), self._astify(phase))
|
|
|
|
if not rule:
|
|
return
|
|
|
|
self._asp.rules.append(rule)
|
|
|
|
def _add_default_loop(self, phase: Phase) -> None:
|
|
actions = []
|
|
|
|
actions.append(
|
|
AstStatement(
|
|
StatementType.DO_ACTION, AstLiteral("notify_user_said", [AstVar("Message")])
|
|
)
|
|
)
|
|
actions.append(AstStatement(StatementType.REMOVE_BELIEF, AstLiteral("responded_this_turn")))
|
|
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("check_triggers")))
|
|
|
|
for goal in phase.goals:
|
|
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, self._astify(goal)))
|
|
|
|
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("transition_phase")))
|
|
|
|
self._asp.plans.append(
|
|
AstPlan(
|
|
TriggerType.ADDED_BELIEF,
|
|
AstLiteral("user_said", [AstVar("Message")]),
|
|
[self._astify(phase)],
|
|
actions,
|
|
)
|
|
)
|
|
|
|
def _process_goal(
|
|
self,
|
|
goal: Goal,
|
|
phase: Phase,
|
|
previous_goal: Goal | None = None,
|
|
continues_response: bool = False,
|
|
main_goal: bool = False,
|
|
) -> None:
|
|
context: list[AstExpression] = [self._astify(phase)]
|
|
context.append(~self._astify(goal, achieved=True))
|
|
if previous_goal and previous_goal.can_fail:
|
|
context.append(self._astify(previous_goal, achieved=True))
|
|
if not continues_response:
|
|
context.append(~AstLiteral("responded_this_turn"))
|
|
|
|
body = []
|
|
if main_goal: # UI only needs to know about the main goals
|
|
body.append(
|
|
AstStatement(
|
|
StatementType.DO_ACTION,
|
|
AstLiteral("notify_goal_start", [AstString(self.slugify(goal))]),
|
|
)
|
|
)
|
|
|
|
subgoals = []
|
|
for step in goal.plan.steps:
|
|
body.append(self._step_to_statement(step))
|
|
if isinstance(step, Goal):
|
|
subgoals.append(step)
|
|
|
|
if not goal.can_fail and not continues_response:
|
|
body.append(AstStatement(StatementType.ADD_BELIEF, self._astify(goal, achieved=True)))
|
|
|
|
self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(goal), context, body))
|
|
|
|
self._asp.plans.append(
|
|
AstPlan(
|
|
TriggerType.ADDED_GOAL,
|
|
self._astify(goal),
|
|
context=[],
|
|
body=[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
|
|
)
|
|
)
|
|
|
|
prev_goal = None
|
|
for subgoal in subgoals:
|
|
self._process_goal(subgoal, phase, prev_goal)
|
|
prev_goal = subgoal
|
|
|
|
def _step_to_statement(self, step: PlanElement) -> AstStatement:
|
|
match step:
|
|
case Goal() | SpeechAction() | LLMAction() as a:
|
|
return AstStatement(StatementType.ACHIEVE_GOAL, self._astify(a))
|
|
case GestureAction() as a:
|
|
return AstStatement(StatementType.DO_ACTION, self._astify(a))
|
|
|
|
# TODO: separate handling of keyword and others
|
|
def _process_trigger(self, trigger: Trigger, phase: Phase) -> None:
|
|
body = []
|
|
subgoals = []
|
|
|
|
body.append(
|
|
AstStatement(
|
|
StatementType.DO_ACTION,
|
|
AstLiteral("notify_trigger_start", [AstString(self.slugify(trigger))]),
|
|
)
|
|
)
|
|
for step in trigger.plan.steps:
|
|
body.append(self._step_to_statement(step))
|
|
if isinstance(step, Goal):
|
|
step.can_fail = False # triggers are continuous sequence
|
|
subgoals.append(step)
|
|
|
|
# Arbitrary wait for UI to display nicely
|
|
body.append(AstStatement(StatementType.DO_ACTION, AstLiteral("wait", [AstNumber(2000)])))
|
|
|
|
body.append(
|
|
AstStatement(
|
|
StatementType.DO_ACTION,
|
|
AstLiteral("notify_trigger_end", [AstString(self.slugify(trigger))]),
|
|
)
|
|
)
|
|
|
|
self._asp.plans.append(
|
|
AstPlan(
|
|
TriggerType.ADDED_GOAL,
|
|
AstLiteral("check_triggers"),
|
|
[self._astify(phase), self._astify(trigger.condition)],
|
|
body,
|
|
)
|
|
)
|
|
|
|
# Force trigger (from UI)
|
|
self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(trigger), [], body))
|
|
|
|
for subgoal in subgoals:
|
|
self._process_goal(subgoal, phase, continues_response=True)
|
|
|
|
def _add_fallbacks(self):
|
|
# Trigger fallback
|
|
self._asp.plans.append(
|
|
AstPlan(
|
|
TriggerType.ADDED_GOAL,
|
|
AstLiteral("check_triggers"),
|
|
[],
|
|
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
|
|
)
|
|
)
|
|
|
|
# Phase transition fallback
|
|
self._asp.plans.append(
|
|
AstPlan(
|
|
TriggerType.ADDED_GOAL,
|
|
AstLiteral("transition_phase"),
|
|
[],
|
|
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
|
|
)
|
|
)
|
|
|
|
# Force phase transition fallback
|
|
self._asp.plans.append(
|
|
AstPlan(
|
|
TriggerType.ADDED_GOAL,
|
|
AstLiteral("force_transition_phase"),
|
|
[],
|
|
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
|
|
)
|
|
)
|
|
|
|
@singledispatchmethod
|
|
def _astify(self, element: ProgramElement) -> AstExpression:
|
|
raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.")
|
|
|
|
@_astify.register
|
|
def _(self, kwb: KeywordBelief) -> AstExpression:
|
|
return AstLiteral("keyword_said", [AstString(kwb.keyword)])
|
|
|
|
@_astify.register
|
|
def _(self, sb: SemanticBelief) -> AstExpression:
|
|
return AstLiteral(self.slugify(sb))
|
|
|
|
@_astify.register
|
|
def _(self, ib: InferredBelief) -> AstExpression:
|
|
return AstBinaryOp(
|
|
self._astify(ib.left),
|
|
BinaryOperatorType.AND if ib.operator == LogicalOperator.AND else BinaryOperatorType.OR,
|
|
self._astify(ib.right),
|
|
)
|
|
|
|
@_astify.register
|
|
def _(self, norm: Norm) -> AstExpression:
|
|
functor = "critical_norm" if norm.critical else "norm"
|
|
return AstLiteral(functor, [AstString(norm.norm)])
|
|
|
|
@_astify.register
|
|
def _(self, phase: Phase) -> AstExpression:
|
|
return AstLiteral("phase", [AstString(str(phase.id))])
|
|
|
|
@_astify.register
|
|
def _(self, goal: Goal, achieved: bool = False) -> AstExpression:
|
|
return AstLiteral(f"{'achieved_' if achieved else ''}{self._slugify_str(goal.name)}")
|
|
|
|
@_astify.register
|
|
def _(self, trigger: Trigger) -> AstExpression:
|
|
return AstLiteral(self.slugify(trigger))
|
|
|
|
@_astify.register
|
|
def _(self, sa: SpeechAction) -> AstExpression:
|
|
return AstLiteral("say", [AstString(sa.text)])
|
|
|
|
@_astify.register
|
|
def _(self, ga: GestureAction) -> AstExpression:
|
|
gesture = ga.gesture
|
|
return AstLiteral("gesture", [AstString(gesture.type), AstString(gesture.name)])
|
|
|
|
@_astify.register
|
|
def _(self, la: LLMAction) -> AstExpression:
|
|
return AstLiteral("reply_with_goal", [AstString(la.goal)])
|
|
|
|
@singledispatchmethod
|
|
@staticmethod
|
|
def slugify(element: ProgramElement) -> str:
|
|
raise NotImplementedError(f"Cannot convert element {element} to a slug.")
|
|
|
|
@slugify.register
|
|
@staticmethod
|
|
def _(n: Norm) -> str:
|
|
return f"norm_{AgentSpeakGenerator._slugify_str(n.norm)}"
|
|
|
|
@slugify.register
|
|
@staticmethod
|
|
def _(sb: SemanticBelief) -> str:
|
|
return f"semantic_{AgentSpeakGenerator._slugify_str(sb.name)}"
|
|
|
|
@slugify.register
|
|
@staticmethod
|
|
def _(g: BaseGoal) -> str:
|
|
return AgentSpeakGenerator._slugify_str(g.name)
|
|
|
|
@slugify.register
|
|
@staticmethod
|
|
def _(t: Trigger):
|
|
return f"trigger_{AgentSpeakGenerator._slugify_str(t.name)}"
|
|
|
|
@staticmethod
|
|
def _slugify_str(text: str) -> str:
|
|
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])
|