import asyncio import json import zmq from pydantic import ValidationError from zmq.asyncio import Context from control_backend.agents import BaseAgent from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator from control_backend.core.config import settings from control_backend.schemas.belief_list import BeliefList, GoalList from control_backend.schemas.internal_message import InternalMessage from control_backend.schemas.program import ( Belief, ConditionalNorm, Goal, InferredBelief, Phase, Program, ) class BDIProgramManager(BaseAgent): """ BDI Program Manager Agent. This agent is responsible for receiving high-level programs (sequences of instructions/goals) from the external HTTP API (via ZMQ) and translating them into core beliefs (norms and goals) for the BDI Core Agent. In the future, it will be responsible for determining when goals are met, and passing on new norms and goals accordingly. :ivar sub_socket: The ZMQ SUB socket used to receive program updates. """ _program: Program _phase: Phase | None def __init__(self, **kwargs): super().__init__(**kwargs) self.sub_socket = None def _initialize_internal_state(self, program: Program): self._program = program self._phase = program.phases[0] # start in first phase async def _create_agentspeak_and_send_to_bdi(self, program: Program): """ Convert a received program into an AgentSpeak file and send it to the BDI Core Agent. :param program: The program object received from the API. """ asg = AgentSpeakGenerator() asl_str = asg.generate(program) file_name = "src/control_backend/agents/bdi/agentspeak.asl" with open(file_name, "w") as f: f.write(asl_str) msg = InternalMessage( sender=self.name, to=settings.agent_settings.bdi_core_name, body=file_name, thread="new_program", ) await self.send(msg) async def handle_message(self, msg: InternalMessage): match msg.thread: case "transition_phase": phases = json.loads(msg.body) await self._transition_phase(phases["old"], phases["new"]) async def _transition_phase(self, old: str, new: str): if old != str(self._phase.id): self.logger.warning( f"Phase transition desync detected! ASL requested move from '{old}', " f"but Python is currently in '{self._phase.id}'. Request ignored." ) return if new == "end": self._phase = None return for phase in self._program.phases: if str(phase.id) == new: self._phase = phase await self._send_beliefs_to_semantic_belief_extractor() await self._send_goals_to_semantic_belief_extractor() # Notify user interaction agent msg = InternalMessage( to=settings.agent_settings.user_interrupt_name, thread="transition_phase", body=str(self._phase.id), ) self.logger.info(f"Transitioned to phase {new}, notifying UserInterruptAgent.") self.add_behavior(self.send(msg)) def _extract_current_beliefs(self) -> list[Belief]: beliefs: list[Belief] = [] for norm in self._phase.norms: if isinstance(norm, ConditionalNorm): beliefs += self._extract_beliefs_from_belief(norm.condition) for trigger in self._phase.triggers: beliefs += self._extract_beliefs_from_belief(trigger.condition) return beliefs @staticmethod def _extract_beliefs_from_belief(belief: Belief) -> list[Belief]: if isinstance(belief, InferredBelief): return BDIProgramManager._extract_beliefs_from_belief( belief.left ) + BDIProgramManager._extract_beliefs_from_belief(belief.right) return [belief] async def _send_beliefs_to_semantic_belief_extractor(self): """ Extract beliefs from the program and send them to the Semantic Belief Extractor Agent. """ beliefs = BeliefList(beliefs=self._extract_current_beliefs()) message = InternalMessage( to=settings.agent_settings.text_belief_extractor_name, sender=self.name, body=beliefs.model_dump_json(), thread="beliefs", ) await self.send(message) def _extract_current_goals(self) -> list[Goal]: """ Extract all goals from the program, including subgoals. :return: A list of Goal objects. """ goals: list[Goal] = [] def extract_goals_from_goal(goal_: Goal) -> list[Goal]: goals_: list[Goal] = [goal] for plan in goal_.plan: if isinstance(plan, Goal): goals_.extend(extract_goals_from_goal(plan)) return goals_ for goal in self._phase.goals: goals.extend(extract_goals_from_goal(goal)) return goals async def _send_goals_to_semantic_belief_extractor(self): """ Extract goals for the current phase and send them to the Semantic Belief Extractor Agent. """ goals = GoalList(goals=self._extract_current_goals()) message = InternalMessage( to=settings.agent_settings.text_belief_extractor_name, sender=self.name, body=goals.model_dump_json(), thread="goals", ) await self.send(message) async def _send_clear_llm_history(self): """ Clear the LLM Agent's conversation history. Sends an empty history to the LLM Agent to reset its state. """ message = InternalMessage( to=settings.agent_settings.llm_name, body="clear_history", ) await self.send(message) self.logger.debug("Sent message to LLM agent to clear history.") extractor_msg = InternalMessage( to=settings.agent_settings.text_belief_extractor_name, thread="conversation_history", body="reset", ) await self.send(extractor_msg) self.logger.debug("Sent message to extractor agent to clear history.") async def _receive_programs(self): """ Continuous loop that receives program updates from the HTTP endpoint. It listens to the ``program`` topic on the internal ZMQ SUB socket. When a program is received, it is validated and forwarded to BDI via :meth:`_send_to_bdi`. Additionally, the LLM history is cleared via :meth:`_send_clear_llm_history`. """ while True: topic, body = await self.sub_socket.recv_multipart() try: program = Program.model_validate_json(body) except ValidationError: self.logger.warning("Received an invalid program.") continue self._initialize_internal_state(program) await self._send_program_to_user_interrupt(program) await self._send_clear_llm_history() await asyncio.gather( self._create_agentspeak_and_send_to_bdi(program), self._send_beliefs_to_semantic_belief_extractor(), self._send_goals_to_semantic_belief_extractor(), ) async def _send_program_to_user_interrupt(self, program: Program): """ Send the received program to the User Interrupt Agent. :param program: The program object received from the API. """ msg = InternalMessage( sender=self.name, to=settings.agent_settings.user_interrupt_name, body=program.model_dump_json(), thread="new_program", ) await self.send(msg) async def setup(self): """ Initialize the agent. Connects the internal ZMQ SUB socket and subscribes to the 'program' topic. Starts the background behavior to receive programs. """ context = Context.instance() self.sub_socket = context.socket(zmq.SUB) self.sub_socket.connect(settings.zmq_settings.internal_sub_address) self.sub_socket.subscribe("program") self.add_behavior(self._receive_programs())