feat: create the program manager agent to interpret programs from the UI

Extracts norms and goals and sends these to the BDI agent.

ref: N25B-200
This commit is contained in:
Twirre Meulenbelt
2025-11-12 16:28:52 +01:00
parent 8cd8988fe0
commit 5376b3bb4c
6 changed files with 99 additions and 2 deletions

View File

@@ -0,0 +1,27 @@
import zmq
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.core.config import settings
from .receive_programs_behavior import ReceiveProgramsBehavior
class BDIProgramManager(BaseAgent):
"""
Will interpret programs received from the HTTP endpoint. Extracts norms, goals, triggers and
forwards them to the BDI as beliefs.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sub_socket = None
async def setup(self):
context = Context.instance()
self.sub_socket = context.socket(zmq.SUB)
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
self.sub_socket.subscribe("program")
self.add_behaviour(ReceiveProgramsBehavior())

View File

@@ -0,0 +1,59 @@
import json
from pydantic import ValidationError
from spade.behaviour import CyclicBehaviour
from spade.message import Message
from control_backend.core.config import settings
from control_backend.schemas.program import Program
class ReceiveProgramsBehavior(CyclicBehaviour):
async def _receive(self) -> Program | None:
topic, body = await self.agent.sub_socket.recv_multipart()
try:
return Program.model_validate_json(body)
except ValidationError as e:
self.agent.logger.error("Received an invalid program.", exc_info=e)
return None
def _extract_norms(self, program: Program) -> str:
"""First phase only for now, as a single newline delimited string."""
if not program.phases:
return ""
if not program.phases[0].phaseData.norms:
return ""
norm_values = [norm.value for norm in program.phases[0].phaseData.norms]
return "\n".join(norm_values)
def _extract_goals(self, program: Program) -> str:
"""First phase only for now, as a single newline delimited string."""
if not program.phases:
return ""
if not program.phases[0].phaseData.goals:
return ""
goal_descriptions = [goal.description for goal in program.phases[0].phaseData.goals]
return "\n".join(goal_descriptions)
async def _send_to_bdi(self, program: Program):
temp_allowed_parts = {
"norms": [self._extract_norms(program)],
"goals": [self._extract_goals(program)],
}
message = Message(
to=settings.agent_settings.bdi_core_agent_name + "@" + settings.agent_settings.host,
sender=self.agent.jid,
body=json.dumps(temp_allowed_parts),
thread="beliefs",
)
await self.send(message)
self.agent.logger.debug("Sent new norms and goals to the BDI agent.")
async def run(self):
program = await self._receive()
if not program:
return
await self._send_to_bdi(program)

View File

@@ -39,7 +39,7 @@ class BeliefSetterBehaviour(CyclicBehaviour):
"Message is from the belief collector agent. Processing as belief message."
)
self._process_belief_message(message)
case settings.agent_settings.program_manager_name:
case settings.agent_settings.program_manager_agent_name:
self.agent.logger.debug(
"Processing message from the program manager. Processing as belief message."
)

View File

@@ -136,7 +136,7 @@ class RICommunicationAgent(BaseAgent):
settings.agent_settings.ri_command_agent_name
+ "@"
+ settings.agent_settings.host,
"pohpu7-huqsyH-qutduk",
settings.agent_settings.ri_command_agent_name,
address=addr,
bind=bind,
)

View File

@@ -16,6 +16,7 @@ class AgentSettings(BaseModel):
llm_agent_name: str = "llm_agent"
test_agent_name: str = "test_agent"
transcription_agent_name: str = "transcription_agent"
program_manager_agent_name: str = "program_manager"
ri_communication_agent_name: str = "ri_communication_agent"
ri_command_agent_name: str = "ri_command_agent"

View File

@@ -14,6 +14,7 @@ from control_backend.agents import (
VADAgent,
)
from control_backend.agents.bdi import BDICoreAgent, TBeliefExtractorAgent
from control_backend.agents.bdi.bdi_program_manager.bdi_program_manager import BDIProgramManager
from control_backend.api.v1.router import api_router
from control_backend.core.config import settings
from control_backend.logging import setup_logging
@@ -115,6 +116,15 @@ async def lifespan(app: FastAPI):
VADAgent,
{"audio_in_address": "tcp://localhost:5558", "audio_in_bind": False},
),
"ProgramManager": (
BDIProgramManager,
{
"name": settings.agent_settings.program_manager_agent_name,
"jid": f"{settings.agent_settings.program_manager_agent_name}@"
f"{settings.agent_settings.host}",
"password": settings.agent_settings.program_manager_agent_name,
},
),
}
vad_agent_instance = None