The Big One #43
@@ -187,9 +187,10 @@ class StatementType(StrEnum):
|
|||||||
EMPTY = ""
|
EMPTY = ""
|
||||||
DO_ACTION = "."
|
DO_ACTION = "."
|
||||||
ACHIEVE_GOAL = "!"
|
ACHIEVE_GOAL = "!"
|
||||||
# TEST_GOAL = "?" # TODO
|
TEST_GOAL = "?"
|
||||||
ADD_BELIEF = "+"
|
ADD_BELIEF = "+"
|
||||||
REMOVE_BELIEF = "-"
|
REMOVE_BELIEF = "-"
|
||||||
|
REPLACE_BELIEF = "-+"
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|||||||
354
src/control_backend/agents/bdi/genv2.py
Normal file
354
src/control_backend/agents/bdi/genv2.py
Normal file
@@ -0,0 +1,354 @@
|
|||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
from functools import singledispatchmethod
|
||||||
|
|
||||||
|
from slugify import slugify
|
||||||
|
|
||||||
|
from control_backend.agents.bdi import BDICoreAgent
|
||||||
|
from control_backend.agents.bdi.astv2 import (
|
||||||
|
AstBinaryOp,
|
||||||
|
AstExpression,
|
||||||
|
AstLiteral,
|
||||||
|
AstPlan,
|
||||||
|
AstProgram,
|
||||||
|
AstRule,
|
||||||
|
AstStatement,
|
||||||
|
AstString,
|
||||||
|
AstVar,
|
||||||
|
BinaryOperatorType,
|
||||||
|
StatementType,
|
||||||
|
TriggerType,
|
||||||
|
)
|
||||||
|
from control_backend.agents.bdi.bdi_program_manager import test_program
|
||||||
|
from control_backend.schemas.program import (
|
||||||
|
BasicNorm,
|
||||||
|
ConditionalNorm,
|
||||||
|
GestureAction,
|
||||||
|
Goal,
|
||||||
|
InferredBelief,
|
||||||
|
KeywordBelief,
|
||||||
|
LLMAction,
|
||||||
|
LogicalOperator,
|
||||||
|
Norm,
|
||||||
|
Phase,
|
||||||
|
PlanElement,
|
||||||
|
Program,
|
||||||
|
ProgramElement,
|
||||||
|
SemanticBelief,
|
||||||
|
SpeechAction,
|
||||||
|
Trigger,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def do_things():
|
||||||
|
program = AgentSpeakGenerator().generate(test_program)
|
||||||
|
print(program)
|
||||||
|
|
||||||
|
|
||||||
|
async def do_other_things():
|
||||||
|
res = input("Wanna generate")
|
||||||
|
if res == "y":
|
||||||
|
program = AgentSpeakGenerator().generate(test_program)
|
||||||
|
filename = f"{int(time.time())}.asl"
|
||||||
|
with open(filename, "w") as f:
|
||||||
|
f.write(program)
|
||||||
|
else:
|
||||||
|
filename = "temp.asl"
|
||||||
|
bdi_agent = BDICoreAgent("BDICoreAgent", filename)
|
||||||
|
flag = asyncio.Event()
|
||||||
|
await bdi_agent.start()
|
||||||
|
await flag.wait()
|
||||||
|
|
||||||
|
|
||||||
|
class AgentSpeakGenerator:
|
||||||
|
_asp: AstProgram
|
||||||
|
|
||||||
|
def generate(self, program: Program) -> str:
|
||||||
|
self._asp = AstProgram()
|
||||||
|
|
||||||
|
self._asp.rules.append(AstRule(AstLiteral("phase", [AstString("start")])))
|
||||||
|
self._add_keyword_inference()
|
||||||
|
self._add_response_goal()
|
||||||
|
|
||||||
|
self._process_phases(program.phases)
|
||||||
|
|
||||||
|
self._add_fallbacks()
|
||||||
|
|
||||||
|
return str(self._asp)
|
||||||
|
|
||||||
|
def _add_keyword_inference(self) -> None:
|
||||||
|
keyword = AstVar("Keyword")
|
||||||
|
message = AstVar("Message")
|
||||||
|
position = AstVar("Pos")
|
||||||
|
|
||||||
|
self._asp.rules.append(
|
||||||
|
AstRule(
|
||||||
|
AstLiteral("keyword_said", [keyword]),
|
||||||
|
AstLiteral("user_said", [message])
|
||||||
|
& AstLiteral(".substring", [keyword, message, position])
|
||||||
|
& (position >= 0),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def _add_response_goal(self):
|
||||||
|
self._asp.plans.append(
|
||||||
|
AstPlan(
|
||||||
|
TriggerType.ADDED_GOAL,
|
||||||
|
AstLiteral("generate_response_with_goal", [AstVar("Goal")]),
|
||||||
|
[AstLiteral("user_said", [AstVar("Message")])],
|
||||||
|
[
|
||||||
|
AstStatement(StatementType.ADD_BELIEF, AstLiteral("responded_this_turn")),
|
||||||
|
AstStatement(
|
||||||
|
StatementType.DO_ACTION,
|
||||||
|
AstLiteral(
|
||||||
|
"findall",
|
||||||
|
[AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")],
|
||||||
|
),
|
||||||
|
),
|
||||||
|
AstStatement(
|
||||||
|
StatementType.DO_ACTION,
|
||||||
|
AstLiteral(
|
||||||
|
"reply_with_goal", [AstVar("Message"), AstVar("Norms"), AstVar("Goal")]
|
||||||
|
),
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def _process_phases(self, phases: list[Phase]) -> None:
|
||||||
|
for curr_phase, next_phase in zip([None] + phases, phases + [None], strict=True):
|
||||||
|
if curr_phase:
|
||||||
|
self._process_phase(curr_phase)
|
||||||
|
self._add_phase_transition(curr_phase, next_phase)
|
||||||
|
|
||||||
|
def _process_phase(self, phase: Phase) -> None:
|
||||||
|
for norm in phase.norms:
|
||||||
|
self._process_norm(norm, phase)
|
||||||
|
|
||||||
|
self._add_default_loop(phase)
|
||||||
|
|
||||||
|
previous_goal = None
|
||||||
|
for goal in phase.goals:
|
||||||
|
self._process_goal(goal, phase, previous_goal)
|
||||||
|
previous_goal = goal
|
||||||
|
|
||||||
|
for trigger in phase.triggers:
|
||||||
|
self._process_trigger(trigger, phase)
|
||||||
|
|
||||||
|
def _add_phase_transition(self, from_phase: Phase | None, to_phase: Phase | None) -> None:
|
||||||
|
from_phase_ast = (
|
||||||
|
self._astify(from_phase) if from_phase else AstLiteral("phase", [AstString("start")])
|
||||||
|
)
|
||||||
|
to_phase_ast = (
|
||||||
|
self._astify(to_phase) if to_phase else AstLiteral("phase", [AstString("end")])
|
||||||
|
)
|
||||||
|
|
||||||
|
context = [from_phase_ast]
|
||||||
|
if from_phase and from_phase.goals:
|
||||||
|
context.append(self._astify(from_phase.goals[-1], achieved=True))
|
||||||
|
|
||||||
|
body = [
|
||||||
|
AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast),
|
||||||
|
AstStatement(StatementType.ADD_BELIEF, to_phase_ast),
|
||||||
|
]
|
||||||
|
|
||||||
|
if from_phase:
|
||||||
|
body.extend(
|
||||||
|
[
|
||||||
|
AstStatement(
|
||||||
|
StatementType.TEST_GOAL, AstLiteral("user_said", [AstVar("Message")])
|
||||||
|
),
|
||||||
|
AstStatement(
|
||||||
|
StatementType.REPLACE_BELIEF, AstLiteral("user_said", [AstVar("Message")])
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
self._asp.plans.append(
|
||||||
|
AstPlan(TriggerType.ADDED_GOAL, AstLiteral("transition_phase"), context, body)
|
||||||
|
)
|
||||||
|
|
||||||
|
def _process_norm(self, norm: Norm, phase: Phase) -> None:
|
||||||
|
rule: AstRule | None = None
|
||||||
|
|
||||||
|
match norm:
|
||||||
|
case ConditionalNorm(condition=cond):
|
||||||
|
rule = AstRule(self._astify(norm), self._astify(phase) & self._astify(cond))
|
||||||
|
case BasicNorm():
|
||||||
|
rule = AstRule(self._astify(norm), self._astify(phase))
|
||||||
|
|
||||||
|
if not rule:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._asp.rules.append(rule)
|
||||||
|
|
||||||
|
def _add_default_loop(self, phase: Phase) -> None:
|
||||||
|
actions = []
|
||||||
|
|
||||||
|
actions.append(AstStatement(StatementType.REMOVE_BELIEF, AstLiteral("responded_this_turn")))
|
||||||
|
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("check_triggers")))
|
||||||
|
|
||||||
|
for goal in phase.goals:
|
||||||
|
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, self._astify(goal)))
|
||||||
|
|
||||||
|
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("transition_phase")))
|
||||||
|
|
||||||
|
self._asp.plans.append(
|
||||||
|
AstPlan(
|
||||||
|
TriggerType.ADDED_BELIEF,
|
||||||
|
AstLiteral("user_said", [AstVar("Message")]),
|
||||||
|
[self._astify(phase)],
|
||||||
|
actions,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def _process_goal(
|
||||||
|
self,
|
||||||
|
goal: Goal,
|
||||||
|
phase: Phase,
|
||||||
|
previous_goal: Goal | None = None,
|
||||||
|
continues_response: bool = False,
|
||||||
|
) -> None:
|
||||||
|
context: list[AstExpression] = [self._astify(phase)]
|
||||||
|
context.append(~self._astify(goal, achieved=True))
|
||||||
|
if previous_goal and previous_goal.can_fail:
|
||||||
|
context.append(self._astify(previous_goal, achieved=True))
|
||||||
|
if not continues_response:
|
||||||
|
context.append(~AstLiteral("responded_this_turn"))
|
||||||
|
|
||||||
|
body = []
|
||||||
|
|
||||||
|
subgoals = []
|
||||||
|
for step in goal.plan.steps:
|
||||||
|
body.append(self._step_to_statement(step))
|
||||||
|
if isinstance(step, Goal):
|
||||||
|
subgoals.append(step)
|
||||||
|
|
||||||
|
if not goal.can_fail and not continues_response:
|
||||||
|
body.append(AstStatement(StatementType.ADD_BELIEF, self._astify(goal, achieved=True)))
|
||||||
|
|
||||||
|
self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(goal), context, body))
|
||||||
|
|
||||||
|
self._asp.plans.append(
|
||||||
|
AstPlan(
|
||||||
|
TriggerType.ADDED_GOAL,
|
||||||
|
self._astify(goal),
|
||||||
|
context=[],
|
||||||
|
body=[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
prev_goal = None
|
||||||
|
for subgoal in subgoals:
|
||||||
|
self._process_goal(subgoal, phase, prev_goal)
|
||||||
|
prev_goal = subgoal
|
||||||
|
|
||||||
|
def _step_to_statement(self, step: PlanElement) -> AstStatement:
|
||||||
|
match step:
|
||||||
|
case Goal() as g:
|
||||||
|
return AstStatement(StatementType.ACHIEVE_GOAL, self._astify(g))
|
||||||
|
case SpeechAction() | GestureAction() as a:
|
||||||
|
return AstStatement(StatementType.DO_ACTION, self._astify(a))
|
||||||
|
case LLMAction() as la:
|
||||||
|
return AstStatement(
|
||||||
|
StatementType.ACHIEVE_GOAL, self._astify(la)
|
||||||
|
) # LLM action is a goal in ASL
|
||||||
|
|
||||||
|
# TODO: separate handling of keyword and others
|
||||||
|
def _process_trigger(self, trigger: Trigger, phase: Phase) -> None:
|
||||||
|
body = []
|
||||||
|
subgoals = []
|
||||||
|
|
||||||
|
for step in trigger.plan.steps:
|
||||||
|
body.append(self._step_to_statement(step))
|
||||||
|
if isinstance(step, Goal):
|
||||||
|
step.can_fail = False # triggers are continuous sequence
|
||||||
|
subgoals.append(step)
|
||||||
|
|
||||||
|
self._asp.plans.append(
|
||||||
|
AstPlan(
|
||||||
|
TriggerType.ADDED_GOAL,
|
||||||
|
AstLiteral("check_triggers"),
|
||||||
|
[self._astify(phase), self._astify(trigger.condition)],
|
||||||
|
body,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
for subgoal in subgoals:
|
||||||
|
self._process_goal(subgoal, phase, continues_response=True)
|
||||||
|
|
||||||
|
def _add_fallbacks(self):
|
||||||
|
# Trigger fallback
|
||||||
|
self._asp.plans.append(
|
||||||
|
AstPlan(
|
||||||
|
TriggerType.ADDED_GOAL,
|
||||||
|
AstLiteral("check_triggers"),
|
||||||
|
[],
|
||||||
|
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Phase transition fallback
|
||||||
|
self._asp.plans.append(
|
||||||
|
AstPlan(
|
||||||
|
TriggerType.ADDED_GOAL,
|
||||||
|
AstLiteral("transition_phase"),
|
||||||
|
[],
|
||||||
|
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
@singledispatchmethod
|
||||||
|
def _astify(self, element: ProgramElement) -> AstExpression:
|
||||||
|
raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.")
|
||||||
|
|
||||||
|
@_astify.register
|
||||||
|
def _(self, kwb: KeywordBelief) -> AstExpression:
|
||||||
|
return AstLiteral("keyword_said", [AstString(kwb.keyword)])
|
||||||
|
|
||||||
|
@_astify.register
|
||||||
|
def _(self, sb: SemanticBelief) -> AstExpression:
|
||||||
|
return AstLiteral(f"semantic_{self._slugify_str(sb.description)}")
|
||||||
|
|
||||||
|
@_astify.register
|
||||||
|
def _(self, ib: InferredBelief) -> AstExpression:
|
||||||
|
return AstBinaryOp(
|
||||||
|
self._astify(ib.left),
|
||||||
|
BinaryOperatorType.AND if ib.operator == LogicalOperator.AND else BinaryOperatorType.OR,
|
||||||
|
self._astify(ib.right),
|
||||||
|
)
|
||||||
|
|
||||||
|
@_astify.register
|
||||||
|
def _(self, norm: Norm) -> AstExpression:
|
||||||
|
functor = "critical_norm" if norm.critical else "norm"
|
||||||
|
return AstLiteral(functor, [AstString(norm.norm)])
|
||||||
|
|
||||||
|
@_astify.register
|
||||||
|
def _(self, phase: Phase) -> AstExpression:
|
||||||
|
return AstLiteral("phase", [AstString(str(phase.id))])
|
||||||
|
|
||||||
|
@_astify.register
|
||||||
|
def _(self, goal: Goal, achieved: bool = False) -> AstExpression:
|
||||||
|
return AstLiteral(f"{'achieved_' if achieved else ''}{self._slugify_str(goal.name)}")
|
||||||
|
|
||||||
|
@_astify.register
|
||||||
|
def _(self, sa: SpeechAction) -> AstExpression:
|
||||||
|
return AstLiteral("say", [AstString(sa.text)])
|
||||||
|
|
||||||
|
@_astify.register
|
||||||
|
def _(self, ga: GestureAction) -> AstExpression:
|
||||||
|
gesture = ga.gesture
|
||||||
|
return AstLiteral("gesture", [AstString(gesture.type), AstString(gesture.name)])
|
||||||
|
|
||||||
|
@_astify.register
|
||||||
|
def _(self, la: LLMAction) -> AstExpression:
|
||||||
|
return AstLiteral("generate_response_with_goal", [AstString(la.goal)])
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _slugify_str(text: str) -> str:
|
||||||
|
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# do_things()
|
||||||
|
asyncio.run(do_other_things())
|
||||||
Reference in New Issue
Block a user