Merge branch 'test/increase-coverage' into feat/reset-experiment-and-phase

This commit is contained in:
2026-01-16 15:08:34 +01:00
26 changed files with 1780 additions and 392 deletions

View File

@@ -1,8 +1,5 @@
from control_backend.agents.bdi.bdi_core_agent import BDICoreAgent as BDICoreAgent
from .belief_collector_agent import (
BDIBeliefCollectorAgent as BDIBeliefCollectorAgent,
)
from .text_belief_extractor_agent import (
TextBeliefExtractorAgent as TextBeliefExtractorAgent,
)

View File

@@ -77,7 +77,7 @@ class AstTerm(AstExpression, ABC):
return AstBinaryOp(self, BinaryOperatorType.NOT_EQUALS, _coalesce_expr(other))
@dataclass
@dataclass(eq=False)
class AstAtom(AstTerm):
"""
Grounded expression in all lowercase.
@@ -89,7 +89,7 @@ class AstAtom(AstTerm):
return self.value.lower()
@dataclass
@dataclass(eq=False)
class AstVar(AstTerm):
"""
Ungrounded variable expression. First letter capitalized.
@@ -101,7 +101,7 @@ class AstVar(AstTerm):
return self.name.capitalize()
@dataclass
@dataclass(eq=False)
class AstNumber(AstTerm):
value: int | float
@@ -109,7 +109,7 @@ class AstNumber(AstTerm):
return str(self.value)
@dataclass
@dataclass(eq=False)
class AstString(AstTerm):
value: str
@@ -117,7 +117,7 @@ class AstString(AstTerm):
return f'"{self.value}"'
@dataclass
@dataclass(eq=False)
class AstLiteral(AstTerm):
functor: str
terms: list[AstTerm] = field(default_factory=list)

View File

@@ -167,7 +167,7 @@ class BDICoreAgent(BaseAgent):
case "force_next_phase":
self._force_next_phase()
case _:
self.logger.warning("Received unknow user interruption: %s", msg)
self.logger.warning("Received unknown user interruption: %s", msg)
def _apply_belief_changes(self, belief_changes: BeliefMessage):
"""

View File

@@ -1,152 +0,0 @@
import json
from pydantic import ValidationError
from control_backend.agents.base import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief, BeliefMessage
class BDIBeliefCollectorAgent(BaseAgent):
"""
BDI Belief Collector Agent.
This agent acts as a central aggregator for beliefs derived from various sources (e.g., text,
emotion, vision). It receives raw extracted data from other agents,
normalizes them into valid :class:`Belief` objects, and forwards them as a unified packet to the
BDI Core Agent.
It serves as a funnel to ensure the BDI agent receives a consistent stream of beliefs.
"""
async def setup(self):
"""
Initialize the agent.
"""
self.logger.info("Setting up %s", self.name)
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming messages from other extractor agents.
Routes the message to specific handlers based on the 'type' field in the JSON body.
Supported types:
- ``belief_extraction_text``: Handled by :meth:`_handle_belief_text`
- ``emotion_extraction_text``: Handled by :meth:`_handle_emo_text`
:param msg: The received internal message.
"""
sender_node = msg.sender
# Parse JSON payload
try:
payload = json.loads(msg.body)
except Exception as e:
self.logger.warning(
"BeliefCollector: failed to parse JSON from %s. Body=%r Error=%s",
sender_node,
msg.body,
e,
)
return
msg_type = payload.get("type")
# Prefer explicit 'type' field
if msg_type == "belief_extraction_text":
self.logger.debug("Message routed to _handle_belief_text (sender=%s)", sender_node)
await self._handle_belief_text(payload, sender_node)
# This is not implemented yet, but we keep the structure for future use
elif msg_type == "emotion_extraction_text":
self.logger.debug("Message routed to _handle_emo_text (sender=%s)", sender_node)
await self._handle_emo_text(payload, sender_node)
else:
self.logger.warning(
"Unrecognized message (sender=%s, type=%r). Ignoring.", sender_node, msg_type
)
async def _handle_belief_text(self, payload: dict, origin: str):
"""
Process text-based belief extraction payloads.
Expected payload format::
{
"type": "belief_extraction_text",
"beliefs": {
"user_said": ["Can you help me?"],
"intention": ["ask_help"]
}
}
Validates and converts the dictionary items into :class:`Belief` objects.
:param payload: The dictionary payload containing belief data.
:param origin: The name of the sender agent.
"""
beliefs = payload.get("beliefs", {})
if not beliefs:
self.logger.debug("Received empty beliefs set.")
return
def try_create_belief(name, arguments) -> Belief | None:
"""
Create a belief object from name and arguments, or return None silently if the input is
not correct.
:param name: The name of the belief.
:param arguments: The arguments of the belief.
:return: A Belief object if the input is valid or None.
"""
try:
return Belief(name=name, arguments=arguments)
except ValidationError:
return None
beliefs = [
belief
for name, arguments in beliefs.items()
if (belief := try_create_belief(name, arguments)) is not None
]
self.logger.debug("Forwarding %d beliefs.", len(beliefs))
for belief in beliefs:
for argument in belief.arguments:
self.logger.debug(" - %s %s", belief.name, argument)
await self._send_beliefs_to_bdi(beliefs, origin=origin)
async def _handle_emo_text(self, payload: dict, origin: str):
"""
Process emotion extraction payloads.
**TODO**: Implement this method once emotion recognition is integrated.
:param payload: The dictionary payload containing emotion data.
:param origin: The name of the sender agent.
"""
pass
async def _send_beliefs_to_bdi(self, beliefs: list[Belief], origin: str | None = None):
"""
Send a list of aggregated beliefs to the BDI Core Agent.
Wraps the beliefs in a :class:`BeliefMessage` and sends it via the 'beliefs' thread.
:param beliefs: The list of Belief objects to send.
:param origin: (Optional) The original source of the beliefs (unused currently).
"""
if not beliefs:
return
msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=BeliefMessage(create=beliefs).model_dump_json(),
thread="beliefs",
)
await self.send(msg)
self.logger.info("Sent %d belief(s) to BDI core.", len(beliefs))

View File

@@ -324,7 +324,7 @@ class RICommunicationAgent(BaseAgent):
async def handle_message(self, msg: InternalMessage):
try:
pause_command = PauseCommand.model_validate_json(msg.body)
self._req_socket.send_json(pause_command.model_dump())
self.logger.debug(self._req_socket.recv_json())
await self._req_socket.send_json(pause_command.model_dump())
self.logger.debug(await self._req_socket.recv_json())
except ValidationError:
self.logger.warning("Incorrect message format for PauseCommand.")

View File

@@ -35,7 +35,6 @@ class AgentSettings(BaseModel):
Names of the various agents in the system. These names are used for routing messages.
:ivar bdi_core_name: Name of the BDI Core Agent.
:ivar bdi_belief_collector_name: Name of the Belief Collector Agent.
:ivar bdi_program_manager_name: Name of the BDI Program Manager Agent.
:ivar text_belief_extractor_name: Name of the Text Belief Extractor Agent.
:ivar vad_name: Name of the Voice Activity Detection (VAD) Agent.
@@ -50,7 +49,6 @@ class AgentSettings(BaseModel):
# agent names
bdi_core_name: str = "bdi_core_agent"
bdi_belief_collector_name: str = "belief_collector_agent"
bdi_program_manager_name: str = "bdi_program_manager_agent"
text_belief_extractor_name: str = "text_belief_extractor_agent"
vad_name: str = "vad_agent"

View File

@@ -26,7 +26,6 @@ from zmq.asyncio import Context
# BDI agents
from control_backend.agents.bdi import (
BDIBeliefCollectorAgent,
BDICoreAgent,
TextBeliefExtractorAgent,
)
@@ -122,12 +121,6 @@ async def lifespan(app: FastAPI):
"name": settings.agent_settings.bdi_core_name,
},
),
"BeliefCollectorAgent": (
BDIBeliefCollectorAgent,
{
"name": settings.agent_settings.bdi_belief_collector_name,
},
),
"TextBeliefExtractorAgent": (
TextBeliefExtractorAgent,
{
@@ -172,6 +165,8 @@ async def lifespan(app: FastAPI):
await endpoints_pub_socket.send_multipart([PROGRAM_STATUS, ProgramStatus.STOPPING.value])
# Additional shutdown logic goes here
for agent in agents:
await agent.stop()
logger.info("Application shutdown complete.")