import zmq from pydantic import ValidationError from zmq.asyncio import Context from control_backend.agents import BaseAgent from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings from control_backend.schemas.belief_message import Belief, BeliefMessage from control_backend.schemas.program import Program class BDIProgramManager(BaseAgent): """ BDI Program Manager Agent. This agent is responsible for receiving high-level programs (sequences of instructions/goals) from the external HTTP API (via ZMQ) and translating them into core beliefs (norms and goals) for the BDI Core Agent. In the future, it will be responsible for determining when goals are met, and passing on new norms and goals accordingly. :ivar sub_socket: The ZMQ SUB socket used to receive program updates. """ def __init__(self, **kwargs): super().__init__(**kwargs) self.sub_socket = None async def _send_to_bdi(self, program: Program): """ Convert a received program into BDI beliefs and send them to the BDI Core Agent. Currently, it takes the **first phase** of the program and extracts: - **Norms**: Constraints or rules the agent must follow. - **Goals**: Objectives the agent must achieve. These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will overwrite any existing norms/goals of the same name in the BDI agent. :param program: The program object received from the API. """ first_phase = program.phases[0] norms_belief = Belief( name="norms", arguments=[norm.norm for norm in first_phase.norms], replace=True, ) goals_belief = Belief( name="goals", arguments=[goal.description for goal in first_phase.goals], replace=True, ) program_beliefs = BeliefMessage(beliefs=[norms_belief, goals_belief]) message = InternalMessage( to=settings.agent_settings.bdi_core_name, sender=self.name, body=program_beliefs.model_dump_json(), thread="beliefs", ) await self.send(message) self.logger.debug("Sent new norms and goals to the BDI agent.") async def _send_clear_llm_history(self): """ Clear the LLM Agent's conversation history. Sends an empty history to the LLM Agent to reset its state. """ message = InternalMessage( to=settings.agent_settings.llm_name, sender=self.name, body="clear_history", threads="clear history message", ) await self.send(message) self.logger.debug("Sent message to LLM agent to clear history.") async def _receive_programs(self): """ Continuous loop that receives program updates from the HTTP endpoint. It listens to the ``program`` topic on the internal ZMQ SUB socket. When a program is received, it is validated and forwarded to BDI via :meth:`_send_to_bdi`. Additionally, the LLM history is cleared via :meth:`_send_clear_llm_history`. """ while True: topic, body = await self.sub_socket.recv_multipart() try: program = Program.model_validate_json(body) await self._send_to_bdi(program) await self._send_clear_llm_history() except ValidationError: self.logger.exception("Received an invalid program.") continue async def setup(self): """ Initialize the agent. Connects the internal ZMQ SUB socket and subscribes to the 'program' topic. Starts the background behavior to receive programs. """ context = Context.instance() self.sub_socket = context.socket(zmq.SUB) self.sub_socket.connect(settings.zmq_settings.internal_sub_address) self.sub_socket.subscribe("program") self.add_behavior(self._receive_programs())