diff --git a/src/control_backend/agents/bdi/bdi_program_manager/bdi_program_manager.py b/src/control_backend/agents/bdi/bdi_program_manager/bdi_program_manager.py new file mode 100644 index 0000000..e35e56c --- /dev/null +++ b/src/control_backend/agents/bdi/bdi_program_manager/bdi_program_manager.py @@ -0,0 +1,27 @@ +import zmq +from zmq.asyncio import Context + +from control_backend.agents import BaseAgent +from control_backend.core.config import settings + +from .receive_programs_behavior import ReceiveProgramsBehavior + + +class BDIProgramManager(BaseAgent): + """ + Will interpret programs received from the HTTP endpoint. Extracts norms, goals, triggers and + forwards them to the BDI as beliefs. + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.sub_socket = None + + async def setup(self): + context = Context.instance() + + self.sub_socket = context.socket(zmq.SUB) + self.sub_socket.connect(settings.zmq_settings.internal_sub_address) + self.sub_socket.subscribe("program") + + self.add_behaviour(ReceiveProgramsBehavior()) diff --git a/src/control_backend/agents/bdi/bdi_program_manager/receive_programs_behavior.py b/src/control_backend/agents/bdi/bdi_program_manager/receive_programs_behavior.py new file mode 100644 index 0000000..b5248fa --- /dev/null +++ b/src/control_backend/agents/bdi/bdi_program_manager/receive_programs_behavior.py @@ -0,0 +1,59 @@ +import json + +from pydantic import ValidationError +from spade.behaviour import CyclicBehaviour +from spade.message import Message + +from control_backend.core.config import settings +from control_backend.schemas.program import Program + + +class ReceiveProgramsBehavior(CyclicBehaviour): + async def _receive(self) -> Program | None: + topic, body = await self.agent.sub_socket.recv_multipart() + + try: + return Program.model_validate_json(body) + except ValidationError as e: + self.agent.logger.error("Received an invalid program.", exc_info=e) + return None + + def _extract_norms(self, program: Program) -> str: + """First phase only for now, as a single newline delimited string.""" + if not program.phases: + return "" + if not program.phases[0].phaseData.norms: + return "" + norm_values = [norm.value for norm in program.phases[0].phaseData.norms] + return "\n".join(norm_values) + + def _extract_goals(self, program: Program) -> str: + """First phase only for now, as a single newline delimited string.""" + if not program.phases: + return "" + if not program.phases[0].phaseData.goals: + return "" + goal_descriptions = [goal.description for goal in program.phases[0].phaseData.goals] + return "\n".join(goal_descriptions) + + async def _send_to_bdi(self, program: Program): + temp_allowed_parts = { + "norms": [self._extract_norms(program)], + "goals": [self._extract_goals(program)], + } + + message = Message( + to=settings.agent_settings.bdi_core_agent_name + "@" + settings.agent_settings.host, + sender=self.agent.jid, + body=json.dumps(temp_allowed_parts), + thread="beliefs", + ) + await self.send(message) + self.agent.logger.debug("Sent new norms and goals to the BDI agent.") + + async def run(self): + program = await self._receive() + if not program: + return + + await self._send_to_bdi(program) diff --git a/src/control_backend/agents/bdi/behaviours/belief_setter.py b/src/control_backend/agents/bdi/behaviours/belief_setter.py index 3a98f02..57bd37a 100644 --- a/src/control_backend/agents/bdi/behaviours/belief_setter.py +++ b/src/control_backend/agents/bdi/behaviours/belief_setter.py @@ -39,7 +39,7 @@ class BeliefSetterBehaviour(CyclicBehaviour): "Message is from the belief collector agent. Processing as belief message." ) self._process_belief_message(message) - case settings.agent_settings.program_manager_name: + case settings.agent_settings.program_manager_agent_name: self.agent.logger.debug( "Processing message from the program manager. Processing as belief message." ) diff --git a/src/control_backend/agents/ri_communication_agent.py b/src/control_backend/agents/ri_communication_agent.py index 2ea3b3d..76d6431 100644 --- a/src/control_backend/agents/ri_communication_agent.py +++ b/src/control_backend/agents/ri_communication_agent.py @@ -136,7 +136,7 @@ class RICommunicationAgent(BaseAgent): settings.agent_settings.ri_command_agent_name + "@" + settings.agent_settings.host, - "pohpu7-huqsyH-qutduk", + settings.agent_settings.ri_command_agent_name, address=addr, bind=bind, ) diff --git a/src/control_backend/core/config.py b/src/control_backend/core/config.py index 8de2403..a080aab 100644 --- a/src/control_backend/core/config.py +++ b/src/control_backend/core/config.py @@ -16,6 +16,7 @@ class AgentSettings(BaseModel): llm_agent_name: str = "llm_agent" test_agent_name: str = "test_agent" transcription_agent_name: str = "transcription_agent" + program_manager_agent_name: str = "program_manager" ri_communication_agent_name: str = "ri_communication_agent" ri_command_agent_name: str = "ri_command_agent" diff --git a/src/control_backend/main.py b/src/control_backend/main.py index f31105f..f44bc8e 100644 --- a/src/control_backend/main.py +++ b/src/control_backend/main.py @@ -14,6 +14,7 @@ from control_backend.agents import ( VADAgent, ) from control_backend.agents.bdi import BDICoreAgent, TBeliefExtractorAgent +from control_backend.agents.bdi.bdi_program_manager.bdi_program_manager import BDIProgramManager from control_backend.api.v1.router import api_router from control_backend.core.config import settings from control_backend.logging import setup_logging @@ -115,6 +116,15 @@ async def lifespan(app: FastAPI): VADAgent, {"audio_in_address": "tcp://localhost:5558", "audio_in_bind": False}, ), + "ProgramManager": ( + BDIProgramManager, + { + "name": settings.agent_settings.program_manager_agent_name, + "jid": f"{settings.agent_settings.program_manager_agent_name}@" + f"{settings.agent_settings.host}", + "password": settings.agent_settings.program_manager_agent_name, + }, + ), } vad_agent_instance = None