import asyncio import json import logging import zmq from pydantic import ValidationError from zmq.asyncio import Context import control_backend from control_backend.agents import BaseAgent from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator from control_backend.core.config import settings from control_backend.schemas.belief_list import BeliefList, GoalList from control_backend.schemas.internal_message import InternalMessage from control_backend.schemas.program import ( Belief, ConditionalNorm, Goal, InferredBelief, Phase, Program, ) experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name) class BDIProgramManager(BaseAgent): """ BDI Program Manager Agent. This agent is responsible for receiving high-level programs (sequences of instructions/goals) from the external HTTP API (via ZMQ), transforming it into an AgentSpeak program, sharing the program and its components to other agents, and keeping agents informed of the current state. :ivar sub_socket: The ZMQ SUB socket used to receive program updates. :ivar _program: The current Program. :ivar _phase: The current Phase. :ivar _goal_mapping: A mapping of goal IDs to goals. """ _program: Program _phase: Phase | None def __init__(self, **kwargs): super().__init__(**kwargs) self.sub_socket = None self._goal_mapping: dict[str, Goal] = {} def _initialize_internal_state(self, program: Program): """ Initialize the state of the program manager given a new Program. Reset the tracking of the current phase to the first phase, make a mapping of goal IDs to goals, used during the life of the program. :param program: The new program. """ self._program = program self._phase = program.phases[0] # start in first phase self._goal_mapping = {} for phase in program.phases: for goal in phase.goals: self._populate_goal_mapping_with_goal(goal) def _populate_goal_mapping_with_goal(self, goal: Goal): """ Recurse through the given goal and its subgoals and add all goals found to the ``self._goal_mapping``. :param goal: The goal to add to the ``self._goal_mapping``, including subgoals. """ self._goal_mapping[str(goal.id)] = goal for step in goal.plan.steps: if isinstance(step, Goal): self._populate_goal_mapping_with_goal(step) async def _create_agentspeak_and_send_to_bdi(self, program: Program): """ Convert a received program into an AgentSpeak file and send it to the BDI Core Agent. :param program: The program object received from the API. """ asg = AgentSpeakGenerator() asl_str = asg.generate(program) file_name = settings.behaviour_settings.agentspeak_file with open(file_name, "w") as f: f.write(asl_str) msg = InternalMessage( sender=self.name, to=settings.agent_settings.bdi_core_name, body=file_name, thread="new_program", ) await self.send(msg) async def handle_message(self, msg: InternalMessage): match msg.thread: case "transition_phase": phases = json.loads(msg.body) await self._transition_phase(phases["old"], phases["new"]) case "achieve_goal": goal_id = msg.body await self._send_achieved_goal_to_semantic_belief_extractor(goal_id) async def _transition_phase(self, old: str, new: str): """ When receiving a signal from the BDI core that the phase has changed, apply this change to the current state and inform other agents about the change. :param old: The ID of the old phase. :param new: The ID of the new phase. """ if self._phase is None: return if old != str(self._phase.id): self.logger.warning( f"Phase transition desync detected! ASL requested move from '{old}', " f"but Python is currently in '{self._phase.id}'. Request ignored." ) return if new == "end": self._phase = None # Notify user interaction agent msg = InternalMessage( to=settings.agent_settings.user_interrupt_name, thread="transition_phase", body="end", ) self.logger.info("Transitioned to end phase, notifying UserInterruptAgent.") self.add_behavior(self.send(msg)) return for phase in self._program.phases: if str(phase.id) == new: self._phase = phase await self._send_beliefs_to_semantic_belief_extractor() await self._send_goals_to_semantic_belief_extractor() # Notify user interaction agent msg = InternalMessage( to=settings.agent_settings.user_interrupt_name, thread="transition_phase", body=str(self._phase.id), ) self.logger.info(f"Transitioned to phase {new}, notifying UserInterruptAgent.") self.add_behavior(self.send(msg)) def _extract_current_beliefs(self) -> list[Belief]: """Extract beliefs from the current phase.""" assert self._phase is not None, ( "Invalid state, no phase set. Call this method only when " "a program has been received and the end-phase has not " "been reached." ) beliefs: list[Belief] = [] for norm in self._phase.norms: if isinstance(norm, ConditionalNorm): beliefs += self._extract_beliefs_from_belief(norm.condition) for trigger in self._phase.triggers: beliefs += self._extract_beliefs_from_belief(trigger.condition) return beliefs @staticmethod def _extract_beliefs_from_belief(belief: Belief) -> list[Belief]: """Recursively extract beliefs from the given belief.""" if isinstance(belief, InferredBelief): return BDIProgramManager._extract_beliefs_from_belief( belief.left ) + BDIProgramManager._extract_beliefs_from_belief(belief.right) return [belief] async def _send_beliefs_to_semantic_belief_extractor(self): """Extract beliefs from the program and send them to the Semantic Belief Extractor Agent.""" beliefs = BeliefList(beliefs=self._extract_current_beliefs()) message = InternalMessage( to=settings.agent_settings.text_belief_extractor_name, sender=self.name, body=beliefs.model_dump_json(), thread="beliefs", ) await self.send(message) @staticmethod def _extract_goals_from_goal(goal: Goal) -> list[Goal]: """ Extract all goals from a given goal, that is: the goal itself and any subgoals. :return: All goals within and including the given goal. """ goals: list[Goal] = [goal] for step in goal.plan.steps: if isinstance(step, Goal): goals.extend(BDIProgramManager._extract_goals_from_goal(step)) return goals def _extract_current_goals(self) -> list[Goal]: """ Extract all goals from the program, including subgoals. :return: A list of Goal objects. """ assert self._phase is not None, ( "Invalid state, no phase set. Call this method only when " "a program has been received and the end-phase has not " "been reached." ) goals: list[Goal] = [] for goal in self._phase.goals: goals.extend(self._extract_goals_from_goal(goal)) return goals async def _send_goals_to_semantic_belief_extractor(self): """ Extract goals for the current phase and send them to the Semantic Belief Extractor Agent. """ goals = GoalList(goals=self._extract_current_goals()) message = InternalMessage( to=settings.agent_settings.text_belief_extractor_name, sender=self.name, body=goals.model_dump_json(), thread="goals", ) await self.send(message) async def _send_achieved_goal_to_semantic_belief_extractor(self, achieved_goal_id: str): """ Inform the semantic belief extractor when a goal is marked achieved. :param achieved_goal_id: The id of the achieved goal. """ goal = self._goal_mapping.get(achieved_goal_id) if goal is None: self.logger.debug(f"Goal with ID {achieved_goal_id} marked achieved but was not found.") return goals = self._extract_goals_from_goal(goal) message = InternalMessage( to=settings.agent_settings.text_belief_extractor_name, body=GoalList(goals=goals).model_dump_json(), thread="achieved_goals", ) await self.send(message) async def _send_clear_llm_history(self): """ Clear the LLM Agent's conversation history. Sends an empty history to the LLM Agent to reset its state. """ message = InternalMessage( to=settings.agent_settings.llm_name, body="clear_history", ) await self.send(message) self.logger.debug("Sent message to LLM agent to clear history.") extractor_msg = InternalMessage( to=settings.agent_settings.text_belief_extractor_name, thread="conversation_history", body="reset", ) await self.send(extractor_msg) self.logger.debug("Sent message to extractor agent to clear history.") @staticmethod def _rollover_experiment_logs(): """ A new experiment program started; make a new experiment log file. """ handlers = logging.getLogger(settings.logging_settings.experiment_logger_name).handlers for handler in handlers: if isinstance(handler, control_backend.logging.DatedFileHandler): experiment_logger.action("Doing rollover...") handler.do_rollover() experiment_logger.debug("Finished rollover.") async def _receive_programs(self): """ Continuous loop that receives program updates from the HTTP endpoint. It listens to the ``program`` topic on the internal ZMQ SUB socket. When a program is received, it is validated and forwarded to BDI via :meth:`_send_to_bdi`. Additionally, the LLM history is cleared via :meth:`_send_clear_llm_history`. """ while True: topic, body = await self.sub_socket.recv_multipart() try: program = Program.model_validate_json(body) except ValidationError: self.logger.warning("Received an invalid program.") continue self._initialize_internal_state(program) await self._send_program_to_user_interrupt(program) await self._send_clear_llm_history() self._rollover_experiment_logs() await asyncio.gather( self._create_agentspeak_and_send_to_bdi(program), self._send_beliefs_to_semantic_belief_extractor(), self._send_goals_to_semantic_belief_extractor(), ) async def _send_program_to_user_interrupt(self, program: Program): """ Send the received program to the User Interrupt Agent. :param program: The program object received from the API. """ msg = InternalMessage( sender=self.name, to=settings.agent_settings.user_interrupt_name, body=program.model_dump_json(), thread="new_program", ) await self.send(msg) async def setup(self): """ Initialize the agent. Connects the internal ZMQ SUB socket and subscribes to the 'program' topic. Starts the background behavior to receive programs. Initializes a default program. """ await self._create_agentspeak_and_send_to_bdi(Program(phases=[])) context = Context.instance() self.sub_socket = context.socket(zmq.SUB) self.sub_socket.connect(settings.zmq_settings.internal_sub_address) self.sub_socket.subscribe("program") self.add_behavior(self._receive_programs())