302 lines
10 KiB
Python
302 lines
10 KiB
Python
import asyncio
|
|
import json
|
|
|
|
import zmq
|
|
from pydantic import ValidationError
|
|
from zmq.asyncio import Context
|
|
|
|
from control_backend.agents import BaseAgent
|
|
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
|
|
from control_backend.core.config import settings
|
|
from control_backend.schemas.belief_list import BeliefList, GoalList
|
|
from control_backend.schemas.internal_message import InternalMessage
|
|
from control_backend.schemas.program import (
|
|
Belief,
|
|
ConditionalNorm,
|
|
Goal,
|
|
InferredBelief,
|
|
Phase,
|
|
Program,
|
|
)
|
|
|
|
|
|
class BDIProgramManager(BaseAgent):
|
|
"""
|
|
BDI Program Manager Agent.
|
|
|
|
This agent is responsible for receiving high-level programs (sequences of instructions/goals)
|
|
from the external HTTP API (via ZMQ) and translating them into core beliefs (norms and goals)
|
|
for the BDI Core Agent. In the future, it will be responsible for determining when goals are
|
|
met, and passing on new norms and goals accordingly.
|
|
|
|
:ivar sub_socket: The ZMQ SUB socket used to receive program updates.
|
|
"""
|
|
|
|
_program: Program
|
|
_phase: Phase | None
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.sub_socket = None
|
|
|
|
def _initialize_internal_state(self, program: Program):
|
|
self._program = program
|
|
self._phase = program.phases[0] # start in first phase
|
|
self._goal_mapping: dict[str, Goal] = {}
|
|
for phase in program.phases:
|
|
for goal in phase.goals:
|
|
self._populate_goal_mapping_with_goal(goal)
|
|
|
|
def _populate_goal_mapping_with_goal(self, goal: Goal):
|
|
self._goal_mapping[str(goal.id)] = goal
|
|
for step in goal.plan.steps:
|
|
if isinstance(step, Goal):
|
|
self._populate_goal_mapping_with_goal(step)
|
|
|
|
async def _create_agentspeak_and_send_to_bdi(self, program: Program):
|
|
"""
|
|
Convert a received program into an AgentSpeak file and send it to the BDI Core Agent.
|
|
|
|
:param program: The program object received from the API.
|
|
"""
|
|
asg = AgentSpeakGenerator()
|
|
|
|
asl_str = asg.generate(program)
|
|
|
|
file_name = "src/control_backend/agents/bdi/agentspeak.asl"
|
|
|
|
with open(file_name, "w") as f:
|
|
f.write(asl_str)
|
|
|
|
msg = InternalMessage(
|
|
sender=self.name,
|
|
to=settings.agent_settings.bdi_core_name,
|
|
body=file_name,
|
|
thread="new_program",
|
|
)
|
|
|
|
await self.send(msg)
|
|
|
|
async def handle_message(self, msg: InternalMessage):
|
|
match msg.thread:
|
|
case "transition_phase":
|
|
phases = json.loads(msg.body)
|
|
|
|
await self._transition_phase(phases["old"], phases["new"])
|
|
case "achieve_goal":
|
|
goal_id = msg.body
|
|
await self._send_achieved_goal_to_semantic_belief_extractor(goal_id)
|
|
|
|
async def _transition_phase(self, old: str, new: str):
|
|
if old != str(self._phase.id):
|
|
self.logger.warning(
|
|
f"Phase transition desync detected! ASL requested move from '{old}', "
|
|
f"but Python is currently in '{self._phase.id}'. Request ignored."
|
|
)
|
|
return
|
|
|
|
if new == "end":
|
|
self._phase = None
|
|
# Notify user interaction agent
|
|
msg = InternalMessage(
|
|
to=settings.agent_settings.user_interrupt_name,
|
|
thread="transition_phase",
|
|
body="end",
|
|
)
|
|
self.logger.info("Transitioned to end phase, notifying UserInterruptAgent.")
|
|
|
|
self.add_behavior(self.send(msg))
|
|
return
|
|
|
|
for phase in self._program.phases:
|
|
if str(phase.id) == new:
|
|
self._phase = phase
|
|
|
|
await self._send_beliefs_to_semantic_belief_extractor()
|
|
await self._send_goals_to_semantic_belief_extractor()
|
|
|
|
# Notify user interaction agent
|
|
msg = InternalMessage(
|
|
to=settings.agent_settings.user_interrupt_name,
|
|
thread="transition_phase",
|
|
body=str(self._phase.id),
|
|
)
|
|
self.logger.info(f"Transitioned to phase {new}, notifying UserInterruptAgent.")
|
|
|
|
self.add_behavior(self.send(msg))
|
|
|
|
def _extract_current_beliefs(self) -> list[Belief]:
|
|
beliefs: list[Belief] = []
|
|
|
|
for norm in self._phase.norms:
|
|
if isinstance(norm, ConditionalNorm):
|
|
beliefs += self._extract_beliefs_from_belief(norm.condition)
|
|
|
|
for trigger in self._phase.triggers:
|
|
beliefs += self._extract_beliefs_from_belief(trigger.condition)
|
|
|
|
return beliefs
|
|
|
|
@staticmethod
|
|
def _extract_beliefs_from_belief(belief: Belief) -> list[Belief]:
|
|
if isinstance(belief, InferredBelief):
|
|
return BDIProgramManager._extract_beliefs_from_belief(
|
|
belief.left
|
|
) + BDIProgramManager._extract_beliefs_from_belief(belief.right)
|
|
return [belief]
|
|
|
|
async def _send_beliefs_to_semantic_belief_extractor(self):
|
|
"""
|
|
Extract beliefs from the program and send them to the Semantic Belief Extractor Agent.
|
|
"""
|
|
beliefs = BeliefList(beliefs=self._extract_current_beliefs())
|
|
|
|
message = InternalMessage(
|
|
to=settings.agent_settings.text_belief_extractor_name,
|
|
sender=self.name,
|
|
body=beliefs.model_dump_json(),
|
|
thread="beliefs",
|
|
)
|
|
|
|
await self.send(message)
|
|
|
|
@staticmethod
|
|
def _extract_goals_from_goal(goal: Goal) -> list[Goal]:
|
|
"""
|
|
Extract all goals from a given goal, that is: the goal itself and any subgoals.
|
|
|
|
:return: All goals within and including the given goal.
|
|
"""
|
|
goals: list[Goal] = [goal]
|
|
for plan in goal.plan:
|
|
if isinstance(plan, Goal):
|
|
goals.extend(BDIProgramManager._extract_goals_from_goal(plan))
|
|
return goals
|
|
|
|
def _extract_current_goals(self) -> list[Goal]:
|
|
"""
|
|
Extract all goals from the program, including subgoals.
|
|
|
|
:return: A list of Goal objects.
|
|
"""
|
|
goals: list[Goal] = []
|
|
|
|
for goal in self._phase.goals:
|
|
goals.extend(self._extract_goals_from_goal(goal))
|
|
|
|
return goals
|
|
|
|
async def _send_goals_to_semantic_belief_extractor(self):
|
|
"""
|
|
Extract goals for the current phase and send them to the Semantic Belief Extractor Agent.
|
|
"""
|
|
goals = GoalList(goals=self._extract_current_goals())
|
|
|
|
message = InternalMessage(
|
|
to=settings.agent_settings.text_belief_extractor_name,
|
|
sender=self.name,
|
|
body=goals.model_dump_json(),
|
|
thread="goals",
|
|
)
|
|
|
|
await self.send(message)
|
|
|
|
async def _send_achieved_goal_to_semantic_belief_extractor(self, achieved_goal_id: str):
|
|
"""
|
|
Inform the semantic belief extractor when a goal is marked achieved.
|
|
|
|
:param achieved_goal_id: The id of the achieved goal.
|
|
"""
|
|
goal = self._goal_mapping.get(achieved_goal_id)
|
|
if goal is None:
|
|
self.logger.debug(f"Goal with ID {achieved_goal_id} marked achieved but was not found.")
|
|
return
|
|
|
|
goals = self._extract_goals_from_goal(goal)
|
|
message = InternalMessage(
|
|
to=settings.agent_settings.text_belief_extractor_name,
|
|
body=GoalList(goals=goals).model_dump_json(),
|
|
thread="achieved_goals",
|
|
)
|
|
await self.send(message)
|
|
|
|
async def _send_clear_llm_history(self):
|
|
"""
|
|
Clear the LLM Agent's conversation history.
|
|
|
|
Sends an empty history to the LLM Agent to reset its state.
|
|
"""
|
|
message = InternalMessage(
|
|
to=settings.agent_settings.llm_name,
|
|
body="clear_history",
|
|
)
|
|
await self.send(message)
|
|
self.logger.debug("Sent message to LLM agent to clear history.")
|
|
|
|
extractor_msg = InternalMessage(
|
|
to=settings.agent_settings.text_belief_extractor_name,
|
|
thread="conversation_history",
|
|
body="reset",
|
|
)
|
|
await self.send(extractor_msg)
|
|
self.logger.debug("Sent message to extractor agent to clear history.")
|
|
|
|
async def _receive_programs(self):
|
|
"""
|
|
Continuous loop that receives program updates from the HTTP endpoint.
|
|
|
|
It listens to the ``program`` topic on the internal ZMQ SUB socket.
|
|
When a program is received, it is validated and forwarded to BDI via :meth:`_send_to_bdi`.
|
|
Additionally, the LLM history is cleared via :meth:`_send_clear_llm_history`.
|
|
"""
|
|
while True:
|
|
topic, body = await self.sub_socket.recv_multipart()
|
|
|
|
try:
|
|
program = Program.model_validate_json(body)
|
|
except ValidationError:
|
|
self.logger.warning("Received an invalid program.")
|
|
continue
|
|
|
|
self._initialize_internal_state(program)
|
|
await self._send_program_to_user_interrupt(program)
|
|
await self._send_clear_llm_history()
|
|
|
|
await asyncio.gather(
|
|
self._create_agentspeak_and_send_to_bdi(program),
|
|
self._send_beliefs_to_semantic_belief_extractor(),
|
|
self._send_goals_to_semantic_belief_extractor(),
|
|
)
|
|
|
|
async def _send_program_to_user_interrupt(self, program: Program):
|
|
"""
|
|
Send the received program to the User Interrupt Agent.
|
|
|
|
:param program: The program object received from the API.
|
|
"""
|
|
msg = InternalMessage(
|
|
sender=self.name,
|
|
to=settings.agent_settings.user_interrupt_name,
|
|
body=program.model_dump_json(),
|
|
thread="new_program",
|
|
)
|
|
|
|
await self.send(msg)
|
|
|
|
async def setup(self):
|
|
"""
|
|
Initialize the agent.
|
|
|
|
Connects the internal ZMQ SUB socket and subscribes to the 'program' topic.
|
|
Starts the background behavior to receive programs. Initializes a default program.
|
|
"""
|
|
await self._create_agentspeak_and_send_to_bdi(Program(phases=[]))
|
|
|
|
context = Context.instance()
|
|
|
|
self.sub_socket = context.socket(zmq.SUB)
|
|
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
|
|
self.sub_socket.subscribe("program")
|
|
|
|
self.add_behavior(self._receive_programs())
|