diff --git a/src/control_backend/agents/bdi/asl_gen.py b/src/control_backend/agents/bdi/asl_gen.py index 7d0fa77..845b4e3 100644 --- a/src/control_backend/agents/bdi/asl_gen.py +++ b/src/control_backend/agents/bdi/asl_gen.py @@ -1,7 +1,11 @@ +import asyncio +import time from functools import singledispatchmethod from slugify import slugify +from control_backend.agents.bdi import BDICoreAgent + # Import the AST we defined above from control_backend.agents.bdi.asl_ast import ( ActionLiteral, @@ -33,8 +37,20 @@ from control_backend.schemas.program import ( ) -def do_things(): - print(AgentSpeakGenerator().generate(test_program)) +async def do_things(): + res = input("Wanna generate") + if res == "y": + program = AgentSpeakGenerator().generate(test_program) + filename = f"{int(time.time())}.asl" + with open(filename, "w") as f: + f.write(program) + else: + # filename = "0test.asl" + filename = "1766053943.asl" + bdi_agent = BDICoreAgent("BDICoreAgent", filename) + flag = asyncio.Event() + await bdi_agent.start() + await flag.wait() class AgentSpeakGenerator: @@ -59,6 +75,8 @@ class AgentSpeakGenerator: self._generate_triggers(phase, asl) + self._generate_fallbacks(program, asl) + return str(asl) # --- Section: Startup & Phase Management --- @@ -68,14 +86,30 @@ class AgentSpeakGenerator: return # Initial belief: phase(start). - asl.initial_beliefs.append(Rule(head=BeliefLiteral("phase", ["start"]))) + asl.initial_beliefs.append(Rule(head=BeliefLiteral("phase", ['"start"']))) - # Startup plan: +started : phase(start) <- -+phase(first_id). + # Startup plan: +started : phase(start) <- -phase(start); +phase(first_id). asl.plans.append( Plan( trigger=BeliefLiteral("started"), - context=[BeliefLiteral("phase", ["start"])], - body=[ActionLiteral("!transition_phase")], + context=[BeliefLiteral("phase", ['"start"'])], + body=[ + ActionLiteral('-phase("start")'), + ActionLiteral(f'+phase("{program.phases[0].id}")'), + ], + ) + ) + + # Initial plans: + asl.plans.append( + Plan( + trigger=GoalLiteral("generate_response_with_goal(Goal)"), + context=[BeliefLiteral("user_said", ["Message"])], + body=[ + ActionLiteral("+responded_this_turn"), + ActionLiteral(".findall(Norm, norm(Norm), Norms)"), + ActionLiteral(".reply_with_goal(Message, Norms, Goal)"), + ], ) ) @@ -83,25 +117,33 @@ class AgentSpeakGenerator: """Generates the main loop listener and the transition logic for this phase.""" # +user_said(Message) : phase(ID) <- !goal1; !goal2; !transition_phase. - goal_actions = [ActionLiteral(f"!{self._slugify(g)}") for g in phase.goals] + goal_actions = [ActionLiteral("-responded_this_turn")] + goal_actions += [ActionLiteral(f"!{self._slugify(g)}") for g in phase.goals] goal_actions.append(ActionLiteral("!transition_phase")) asl.plans.append( Plan( trigger=BeliefLiteral("user_said", ["Message"]), - context=[BeliefLiteral("phase", [str(phase.id)])], + context=[BeliefLiteral("phase", [f'"{phase.id}"'])], body=goal_actions, ) ) - # +!transition_phase : phase(ID) <- -+phase(NEXT_ID). - next_id = next_phase.id if next_phase else "end" + # +!transition_phase : phase(ID) <- -phase(ID); +(NEXT_ID). + next_id = str(next_phase.id) if next_phase else "end" + + transition_context = [BeliefLiteral("phase", [f'"{phase.id}"'])] + if phase.goals: + transition_context.append(BeliefLiteral(f"achieved_{self._slugify(phase.goals[-1])}")) asl.plans.append( Plan( trigger=GoalLiteral("transition_phase"), - context=[BeliefLiteral("phase", [str(phase.id)])], - body=[ActionLiteral(f"-+phase({next_id})")], + context=transition_context, + body=[ + ActionLiteral(f'-phase("{phase.id}")'), + ActionLiteral(f'+phase("{next_id}")'), + ], ) ) @@ -113,7 +155,7 @@ class AgentSpeakGenerator: head = BeliefLiteral("norm", [norm_slug]) # Base context is the phase - phase_lit = BeliefLiteral("phase", [str(phase.id)]) + phase_lit = BeliefLiteral("phase", [f'"{phase.id}"']) if isinstance(norm, ConditionalNorm): self._ensure_belief_inference(norm.condition, asl) @@ -132,7 +174,7 @@ class AgentSpeakGenerator: though ASL engines often handle redefinition or we can use a set to track processed IDs. """ if isinstance(belief, KeywordBelief): - # Rule: keyword_said("word") :- user_said(M) & .substring(M, "word", P) & P >= 0. + # Rule: keyword_said("word") :- user_said(M) & .substring("word", M, P) & P >= 0. kwd_slug = f'"{belief.keyword}"' head = BeliefLiteral("keyword_said", [kwd_slug]) @@ -143,7 +185,7 @@ class AgentSpeakGenerator: body = BinaryOp( BeliefLiteral("user_said", ["Message"]), "&", - BinaryOp(f".substring(Message, {kwd_slug}, Pos)", "&", "Pos >= 0"), + BinaryOp(f".substring({kwd_slug}, Message, Pos)", "&", "Pos >= 0"), ) asl.inference_rules.append(Rule(head=head, body=body)) @@ -185,7 +227,7 @@ class AgentSpeakGenerator: # phase(ID) & not responded_this_turn & not achieved_goal context = [ - BeliefLiteral("phase", [phase_id]), + BeliefLiteral("phase", [f'"{phase_id}"']), BeliefLiteral("responded_this_turn", negated=True), BeliefLiteral(f"achieved_{goal_slug}", negated=True), ] @@ -214,9 +256,6 @@ class AgentSpeakGenerator: body_actions.append(ActionLiteral(f"+achieved_{goal_slug}")) asl.plans.append(Plan(trigger=GoalLiteral(goal_slug), context=context, body=body_actions)) - asl.plans.append( - Plan(trigger=GoalLiteral(goal_slug), context=[], body=[ActionLiteral("true")]) - ) prev_sub = None for sub_goal in sub_goals_to_process: @@ -253,7 +292,7 @@ class AgentSpeakGenerator: asl.plans.append( Plan( trigger=BeliefLiteral(trigger_belief_slug), - context=[BeliefLiteral("phase", [str(phase.id)])], + context=[BeliefLiteral("phase", [f'"{phase.id}"'])], body=body_actions, ) ) @@ -264,6 +303,28 @@ class AgentSpeakGenerator: self._generate_goal_plan_recursive(sub_goal, str(phase.id), prev_sub, asl) prev_sub = sub_goal + # --- Section: Fallbacks --- + + def _generate_fallbacks(self, program: Program, asl: AgentSpeakFile): + for phase in program.phases: + for goal in phase.goals: + self._generate_goal_fallbacks_recursive(goal, asl) + + asl.plans.append( + Plan(trigger=GoalLiteral("transition_phase"), context=[], body=[ActionLiteral("true")]) + ) + + def _generate_goal_fallbacks_recursive(self, goal: Goal, asl: AgentSpeakFile): + goal_slug = self._slugify(goal) + asl.plans.append( + Plan(trigger=GoalLiteral(goal_slug), context=[], body=[ActionLiteral("true")]) + ) + + for step in goal.plan.steps: + if not isinstance(step, Goal): + continue + self._generate_goal_fallbacks_recursive(step, asl) + # --- Helpers --- @singledispatchmethod @@ -276,7 +337,7 @@ class AgentSpeakGenerator: def _(self, goal: Goal) -> str: if goal.name: return self._slugify_str(goal.name) - return f"goal_{goal.id}" + return f"goal_{goal.id.hex}" @_slugify.register def _(self, kwb: KeywordBelief) -> str: @@ -295,4 +356,4 @@ class AgentSpeakGenerator: if __name__ == "__main__": - do_things() + asyncio.run(do_things()) diff --git a/src/control_backend/agents/bdi/bdi_core_agent.py b/src/control_backend/agents/bdi/bdi_core_agent.py index f056e09..9408ff8 100644 --- a/src/control_backend/agents/bdi/bdi_core_agent.py +++ b/src/control_backend/agents/bdi/bdi_core_agent.py @@ -160,7 +160,7 @@ class BDICoreAgent(BaseAgent): self._remove_all_with_name(belief.name) self._add_belief(belief.name, belief.arguments) - def _add_belief(self, name: str, args: Iterable[str] = []): + def _add_belief(self, name: str, args: list[str] = None): """ Add a single belief to the BDI agent. @@ -168,9 +168,12 @@ class BDICoreAgent(BaseAgent): :param args: Arguments for the belief. """ # new_args = (agentspeak.Literal(arg) for arg in args) # TODO: Eventually support multiple - merged_args = DELIMITER.join(arg for arg in args) - new_args = (agentspeak.Literal(merged_args),) - term = agentspeak.Literal(name, new_args) + if args: + merged_args = DELIMITER.join(arg for arg in args) + new_args = (agentspeak.Literal(merged_args),) + term = agentspeak.Literal(name, new_args) + else: + term = agentspeak.Literal(name) self.bdi_agent.call( agentspeak.Trigger.addition, @@ -238,8 +241,7 @@ class BDICoreAgent(BaseAgent): @self.actions.add(".reply", 3) def _reply(agent: "BDICoreAgent", term, intention): """ - Sends text to the LLM (AgentSpeak action). - Example: .reply("Hello LLM!", "Some norm", "Some goal") + Let the LLM generate a response to a user's utterance with the current norms and goals. """ message_text = agentspeak.grounded(term.args[0], intention.scope) norms = agentspeak.grounded(term.args[1], intention.scope) @@ -252,15 +254,71 @@ class BDICoreAgent(BaseAgent): asyncio.create_task(self._send_to_llm(str(message_text), str(norms), str(goals))) yield - async def _send_to_llm(self, text: str, norms: str = None, goals: str = None): + @self.actions.add(".reply_with_goal", 3) + def _reply_with_goal(agent: "BDICoreAgent", term, intention): + """ + Let the LLM generate a response to a user's utterance with the current norms and a + specific goal. + """ + message_text = agentspeak.grounded(term.args[0], intention.scope) + norms = agentspeak.grounded(term.args[1], intention.scope) + goal = agentspeak.grounded(term.args[2], intention.scope) + + self.logger.debug( + '"reply_with_goal" action called with message=%s, norms=%s, goal=%s', + message_text, + norms, + goal, + ) + # asyncio.create_task(self._send_to_llm(str(message_text), norms, str(goal))) + yield + + @self.actions.add(".say", 1) + def _say(agent: "BDICoreAgent", term, intention): + """ + Make the robot say the given text instantly. + """ + message_text = agentspeak.grounded(term.args[0], intention.scope) + + self.logger.debug('"say" action called with text=%s', message_text) + + # speech_command = SpeechCommand(data=message_text) + # speech_message = InternalMessage( + # to=settings.agent_settings.robot_speech_name, + # sender=settings.agent_settings.bdi_core_name, + # body=speech_command.model_dump_json(), + # ) + # asyncio.create_task(agent.send(speech_message)) + yield + + @self.actions.add(".gesture", 2) + def _gesture(agent: "BDICoreAgent", term, intention): + """ + Make the robot perform the given gesture instantly. + """ + gesture_type = agentspeak.grounded(term.args[0], intention.scope) + gesture_name = agentspeak.grounded(term.args[1], intention.scope) + + self.logger.debug( + '"gesture" action called with type=%s, name=%s', + gesture_type, + gesture_name, + ) + + # gesture = Gesture(type=gesture_type, name=gesture_name) + # gesture_message = InternalMessage( + # to=settings.agent_settings.robot_gesture_name, + # sender=settings.agent_settings.bdi_core_name, + # body=gesture.model_dump_json(), + # ) + # asyncio.create_task(agent.send(gesture_message)) + yield + + async def _send_to_llm(self, text: str, norms: str, goals: str): """ Sends a text query to the LLM agent asynchronously. """ - prompt = LLMPromptMessage( - text=text, - norms=norms.split("\n") if norms else [], - goals=goals.split("\n") if norms else [], - ) + prompt = LLMPromptMessage(text=text, norms=norms.split("\n"), goals=goals.split("\n")) msg = InternalMessage( to=settings.agent_settings.llm_name, sender=self.name,