fix: complete pipeline working

User interrupts still need to be tested.

ref: N25B-429
This commit is contained in:
2026-01-07 17:13:58 +01:00
parent 3189b9fee3
commit 3d49e44cf7
8 changed files with 276 additions and 59 deletions

View File

@@ -1,4 +1,5 @@
import asyncio
import json
import zmq
from pydantic import ValidationError
@@ -9,7 +10,7 @@ from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList
from control_backend.schemas.internal_message import InternalMessage
from control_backend.schemas.program import Belief, ConditionalNorm, InferredBelief, Program
from control_backend.schemas.program import Belief, ConditionalNorm, InferredBelief, Phase, Program
class BDIProgramManager(BaseAgent):
@@ -24,20 +25,20 @@ class BDIProgramManager(BaseAgent):
:ivar sub_socket: The ZMQ SUB socket used to receive program updates.
"""
_program: Program
_phase: Phase | None
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sub_socket = None
def _initialize_internal_state(self, program: Program):
self._program = program
self._phase = program.phases[0] # start in first phase
async def _create_agentspeak_and_send_to_bdi(self, program: Program):
"""
Convert a received program into BDI beliefs and send them to the BDI Core Agent.
Currently, it takes the **first phase** of the program and extracts:
- **Norms**: Constraints or rules the agent must follow.
- **Goals**: Objectives the agent must achieve.
These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will
overwrite any existing norms/goals of the same name in the BDI agent.
Convert a received program into an AgentSpeak file and send it to the BDI Core Agent.
:param program: The program object received from the API.
"""
@@ -59,17 +60,44 @@ class BDIProgramManager(BaseAgent):
await self.send(msg)
@staticmethod
def _extract_beliefs_from_program(program: Program) -> list[Belief]:
def handle_message(self, msg: InternalMessage):
match msg.thread:
case "transition_phase":
phases = json.loads(msg.body)
self._transition_phase(phases["old"], phases["new"])
def _transition_phase(self, old: str, new: str):
assert old == str(self._phase.id)
if new == "end":
self._phase = None
return
for phase in self._program.phases:
if str(phase.id) == new:
self._phase = phase
self._send_beliefs_to_semantic_belief_extractor()
# Notify user interaction agent
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
thread="transition_phase",
body=str(self._phase.id),
)
self.add_behavior(self.send(msg))
def _extract_current_beliefs(self) -> list[Belief]:
beliefs: list[Belief] = []
for phase in program.phases:
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += BDIProgramManager._extract_beliefs_from_belief(norm.condition)
for norm in self._phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += self._extract_beliefs_from_belief(norm.condition)
for trigger in phase.triggers:
beliefs += BDIProgramManager._extract_beliefs_from_belief(trigger.condition)
for trigger in self._phase.triggers:
beliefs += self._extract_beliefs_from_belief(trigger.condition)
return beliefs
@@ -81,13 +109,11 @@ class BDIProgramManager(BaseAgent):
) + BDIProgramManager._extract_beliefs_from_belief(belief.right)
return [belief]
async def _send_beliefs_to_semantic_belief_extractor(self, program: Program):
async def _send_beliefs_to_semantic_belief_extractor(self):
"""
Extract beliefs from the program and send them to the Semantic Belief Extractor Agent.
:param program: The program received from the API.
"""
beliefs = BeliefList(beliefs=self._extract_beliefs_from_program(program))
beliefs = BeliefList(beliefs=self._extract_current_beliefs())
message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
@@ -111,12 +137,14 @@ class BDIProgramManager(BaseAgent):
try:
program = Program.model_validate_json(body)
except ValidationError:
self.logger.exception("Received an invalid program.")
self.logger.warning("Received an invalid program.")
continue
self._initialize_internal_state(program)
await asyncio.gather(
self._create_agentspeak_and_send_to_bdi(program),
self._send_beliefs_to_semantic_belief_extractor(program),
self._send_beliefs_to_semantic_belief_extractor(),
)
async def setup(self):