import zmq from pydantic import ValidationError from zmq.asyncio import Context from control_backend.agents import BaseAgent from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings from control_backend.schemas.belief_message import Belief, BeliefMessage from control_backend.schemas.program import Program class BDIProgramManager(BaseAgent): """ Will interpret programs received from the HTTP endpoint. Extracts norms, goals, triggers and forwards them to the BDI as beliefs. """ def __init__(self, **kwargs): super().__init__(**kwargs) self.sub_socket = None async def _send_to_bdi(self, program: Program): first_phase = program.phases[0] norms_belief = Belief( name="norms", arguments=[norm.norm for norm in first_phase.norms], replace=True, ) goals_belief = Belief( name="goals", arguments=[goal.description for goal in first_phase.goals], replace=True, ) program_beliefs = BeliefMessage(beliefs=[norms_belief, goals_belief]) message = InternalMessage( to=settings.agent_settings.bdi_core_name, sender=self.name, body=program_beliefs.model_dump_json(), thread="beliefs", ) await self.send(message) self.logger.debug("Sent new norms and goals to the BDI agent.") async def _receive_programs(self): """ Continuously receive programs from the HTTP endpoint, sent to us over ZMQ. """ while True: topic, body = await self.sub_socket.recv_multipart() try: program = Program.model_validate_json(body) except ValidationError as e: self.logger.exception("Received an invalid program.") continue await self._send_to_bdi(program) async def setup(self): context = Context.instance() self.sub_socket = context.socket(zmq.SUB) self.sub_socket.connect(settings.zmq_settings.internal_sub_address) self.sub_socket.subscribe("program") self.add_behavior(self._receive_programs())