fix: things in AgentSpeak, add custom actions

ref: N25B-376
This commit is contained in:
Twirre Meulenbelt
2025-12-18 11:50:16 +01:00
parent 28262eb27e
commit f91cec6708
2 changed files with 153 additions and 34 deletions

View File

@@ -1,7 +1,11 @@
import asyncio
import time
from functools import singledispatchmethod
from slugify import slugify
from control_backend.agents.bdi import BDICoreAgent
# Import the AST we defined above
from control_backend.agents.bdi.asl_ast import (
ActionLiteral,
@@ -33,8 +37,20 @@ from control_backend.schemas.program import (
)
def do_things():
print(AgentSpeakGenerator().generate(test_program))
async def do_things():
res = input("Wanna generate")
if res == "y":
program = AgentSpeakGenerator().generate(test_program)
filename = f"{int(time.time())}.asl"
with open(filename, "w") as f:
f.write(program)
else:
# filename = "0test.asl"
filename = "1766053943.asl"
bdi_agent = BDICoreAgent("BDICoreAgent", filename)
flag = asyncio.Event()
await bdi_agent.start()
await flag.wait()
class AgentSpeakGenerator:
@@ -59,6 +75,8 @@ class AgentSpeakGenerator:
self._generate_triggers(phase, asl)
self._generate_fallbacks(program, asl)
return str(asl)
# --- Section: Startup & Phase Management ---
@@ -68,14 +86,30 @@ class AgentSpeakGenerator:
return
# Initial belief: phase(start).
asl.initial_beliefs.append(Rule(head=BeliefLiteral("phase", ["start"])))
asl.initial_beliefs.append(Rule(head=BeliefLiteral("phase", ['"start"'])))
# Startup plan: +started : phase(start) <- -+phase(first_id).
# Startup plan: +started : phase(start) <- -phase(start); +phase(first_id).
asl.plans.append(
Plan(
trigger=BeliefLiteral("started"),
context=[BeliefLiteral("phase", ["start"])],
body=[ActionLiteral("!transition_phase")],
context=[BeliefLiteral("phase", ['"start"'])],
body=[
ActionLiteral('-phase("start")'),
ActionLiteral(f'+phase("{program.phases[0].id}")'),
],
)
)
# Initial plans:
asl.plans.append(
Plan(
trigger=GoalLiteral("generate_response_with_goal(Goal)"),
context=[BeliefLiteral("user_said", ["Message"])],
body=[
ActionLiteral("+responded_this_turn"),
ActionLiteral(".findall(Norm, norm(Norm), Norms)"),
ActionLiteral(".reply_with_goal(Message, Norms, Goal)"),
],
)
)
@@ -83,25 +117,33 @@ class AgentSpeakGenerator:
"""Generates the main loop listener and the transition logic for this phase."""
# +user_said(Message) : phase(ID) <- !goal1; !goal2; !transition_phase.
goal_actions = [ActionLiteral(f"!{self._slugify(g)}") for g in phase.goals]
goal_actions = [ActionLiteral("-responded_this_turn")]
goal_actions += [ActionLiteral(f"!{self._slugify(g)}") for g in phase.goals]
goal_actions.append(ActionLiteral("!transition_phase"))
asl.plans.append(
Plan(
trigger=BeliefLiteral("user_said", ["Message"]),
context=[BeliefLiteral("phase", [str(phase.id)])],
context=[BeliefLiteral("phase", [f'"{phase.id}"'])],
body=goal_actions,
)
)
# +!transition_phase : phase(ID) <- -+phase(NEXT_ID).
next_id = next_phase.id if next_phase else "end"
# +!transition_phase : phase(ID) <- -phase(ID); +(NEXT_ID).
next_id = str(next_phase.id) if next_phase else "end"
transition_context = [BeliefLiteral("phase", [f'"{phase.id}"'])]
if phase.goals:
transition_context.append(BeliefLiteral(f"achieved_{self._slugify(phase.goals[-1])}"))
asl.plans.append(
Plan(
trigger=GoalLiteral("transition_phase"),
context=[BeliefLiteral("phase", [str(phase.id)])],
body=[ActionLiteral(f"-+phase({next_id})")],
context=transition_context,
body=[
ActionLiteral(f'-phase("{phase.id}")'),
ActionLiteral(f'+phase("{next_id}")'),
],
)
)
@@ -113,7 +155,7 @@ class AgentSpeakGenerator:
head = BeliefLiteral("norm", [norm_slug])
# Base context is the phase
phase_lit = BeliefLiteral("phase", [str(phase.id)])
phase_lit = BeliefLiteral("phase", [f'"{phase.id}"'])
if isinstance(norm, ConditionalNorm):
self._ensure_belief_inference(norm.condition, asl)
@@ -132,7 +174,7 @@ class AgentSpeakGenerator:
though ASL engines often handle redefinition or we can use a set to track processed IDs.
"""
if isinstance(belief, KeywordBelief):
# Rule: keyword_said("word") :- user_said(M) & .substring(M, "word", P) & P >= 0.
# Rule: keyword_said("word") :- user_said(M) & .substring("word", M, P) & P >= 0.
kwd_slug = f'"{belief.keyword}"'
head = BeliefLiteral("keyword_said", [kwd_slug])
@@ -143,7 +185,7 @@ class AgentSpeakGenerator:
body = BinaryOp(
BeliefLiteral("user_said", ["Message"]),
"&",
BinaryOp(f".substring(Message, {kwd_slug}, Pos)", "&", "Pos >= 0"),
BinaryOp(f".substring({kwd_slug}, Message, Pos)", "&", "Pos >= 0"),
)
asl.inference_rules.append(Rule(head=head, body=body))
@@ -185,7 +227,7 @@ class AgentSpeakGenerator:
# phase(ID) & not responded_this_turn & not achieved_goal
context = [
BeliefLiteral("phase", [phase_id]),
BeliefLiteral("phase", [f'"{phase_id}"']),
BeliefLiteral("responded_this_turn", negated=True),
BeliefLiteral(f"achieved_{goal_slug}", negated=True),
]
@@ -214,9 +256,6 @@ class AgentSpeakGenerator:
body_actions.append(ActionLiteral(f"+achieved_{goal_slug}"))
asl.plans.append(Plan(trigger=GoalLiteral(goal_slug), context=context, body=body_actions))
asl.plans.append(
Plan(trigger=GoalLiteral(goal_slug), context=[], body=[ActionLiteral("true")])
)
prev_sub = None
for sub_goal in sub_goals_to_process:
@@ -253,7 +292,7 @@ class AgentSpeakGenerator:
asl.plans.append(
Plan(
trigger=BeliefLiteral(trigger_belief_slug),
context=[BeliefLiteral("phase", [str(phase.id)])],
context=[BeliefLiteral("phase", [f'"{phase.id}"'])],
body=body_actions,
)
)
@@ -264,6 +303,28 @@ class AgentSpeakGenerator:
self._generate_goal_plan_recursive(sub_goal, str(phase.id), prev_sub, asl)
prev_sub = sub_goal
# --- Section: Fallbacks ---
def _generate_fallbacks(self, program: Program, asl: AgentSpeakFile):
for phase in program.phases:
for goal in phase.goals:
self._generate_goal_fallbacks_recursive(goal, asl)
asl.plans.append(
Plan(trigger=GoalLiteral("transition_phase"), context=[], body=[ActionLiteral("true")])
)
def _generate_goal_fallbacks_recursive(self, goal: Goal, asl: AgentSpeakFile):
goal_slug = self._slugify(goal)
asl.plans.append(
Plan(trigger=GoalLiteral(goal_slug), context=[], body=[ActionLiteral("true")])
)
for step in goal.plan.steps:
if not isinstance(step, Goal):
continue
self._generate_goal_fallbacks_recursive(step, asl)
# --- Helpers ---
@singledispatchmethod
@@ -276,7 +337,7 @@ class AgentSpeakGenerator:
def _(self, goal: Goal) -> str:
if goal.name:
return self._slugify_str(goal.name)
return f"goal_{goal.id}"
return f"goal_{goal.id.hex}"
@_slugify.register
def _(self, kwb: KeywordBelief) -> str:
@@ -295,4 +356,4 @@ class AgentSpeakGenerator:
if __name__ == "__main__":
do_things()
asyncio.run(do_things())

View File

@@ -160,7 +160,7 @@ class BDICoreAgent(BaseAgent):
self._remove_all_with_name(belief.name)
self._add_belief(belief.name, belief.arguments)
def _add_belief(self, name: str, args: Iterable[str] = []):
def _add_belief(self, name: str, args: list[str] = None):
"""
Add a single belief to the BDI agent.
@@ -168,9 +168,12 @@ class BDICoreAgent(BaseAgent):
:param args: Arguments for the belief.
"""
# new_args = (agentspeak.Literal(arg) for arg in args) # TODO: Eventually support multiple
merged_args = DELIMITER.join(arg for arg in args)
new_args = (agentspeak.Literal(merged_args),)
term = agentspeak.Literal(name, new_args)
if args:
merged_args = DELIMITER.join(arg for arg in args)
new_args = (agentspeak.Literal(merged_args),)
term = agentspeak.Literal(name, new_args)
else:
term = agentspeak.Literal(name)
self.bdi_agent.call(
agentspeak.Trigger.addition,
@@ -238,8 +241,7 @@ class BDICoreAgent(BaseAgent):
@self.actions.add(".reply", 3)
def _reply(agent: "BDICoreAgent", term, intention):
"""
Sends text to the LLM (AgentSpeak action).
Example: .reply("Hello LLM!", "Some norm", "Some goal")
Let the LLM generate a response to a user's utterance with the current norms and goals.
"""
message_text = agentspeak.grounded(term.args[0], intention.scope)
norms = agentspeak.grounded(term.args[1], intention.scope)
@@ -252,15 +254,71 @@ class BDICoreAgent(BaseAgent):
asyncio.create_task(self._send_to_llm(str(message_text), str(norms), str(goals)))
yield
async def _send_to_llm(self, text: str, norms: str = None, goals: str = None):
@self.actions.add(".reply_with_goal", 3)
def _reply_with_goal(agent: "BDICoreAgent", term, intention):
"""
Let the LLM generate a response to a user's utterance with the current norms and a
specific goal.
"""
message_text = agentspeak.grounded(term.args[0], intention.scope)
norms = agentspeak.grounded(term.args[1], intention.scope)
goal = agentspeak.grounded(term.args[2], intention.scope)
self.logger.debug(
'"reply_with_goal" action called with message=%s, norms=%s, goal=%s',
message_text,
norms,
goal,
)
# asyncio.create_task(self._send_to_llm(str(message_text), norms, str(goal)))
yield
@self.actions.add(".say", 1)
def _say(agent: "BDICoreAgent", term, intention):
"""
Make the robot say the given text instantly.
"""
message_text = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug('"say" action called with text=%s', message_text)
# speech_command = SpeechCommand(data=message_text)
# speech_message = InternalMessage(
# to=settings.agent_settings.robot_speech_name,
# sender=settings.agent_settings.bdi_core_name,
# body=speech_command.model_dump_json(),
# )
# asyncio.create_task(agent.send(speech_message))
yield
@self.actions.add(".gesture", 2)
def _gesture(agent: "BDICoreAgent", term, intention):
"""
Make the robot perform the given gesture instantly.
"""
gesture_type = agentspeak.grounded(term.args[0], intention.scope)
gesture_name = agentspeak.grounded(term.args[1], intention.scope)
self.logger.debug(
'"gesture" action called with type=%s, name=%s',
gesture_type,
gesture_name,
)
# gesture = Gesture(type=gesture_type, name=gesture_name)
# gesture_message = InternalMessage(
# to=settings.agent_settings.robot_gesture_name,
# sender=settings.agent_settings.bdi_core_name,
# body=gesture.model_dump_json(),
# )
# asyncio.create_task(agent.send(gesture_message))
yield
async def _send_to_llm(self, text: str, norms: str, goals: str):
"""
Sends a text query to the LLM agent asynchronously.
"""
prompt = LLMPromptMessage(
text=text,
norms=norms.split("\n") if norms else [],
goals=goals.split("\n") if norms else [],
)
prompt = LLMPromptMessage(text=text, norms=norms.split("\n"), goals=goals.split("\n"))
msg = InternalMessage(
to=settings.agent_settings.llm_name,
sender=self.name,