import json from pydantic import ValidationError from control_backend.agents.base import BaseAgent from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings from control_backend.schemas.belief_message import Belief, BeliefMessage class BDIBeliefCollectorAgent(BaseAgent): """ BDI Belief Collector Agent. This agent acts as a central aggregator for beliefs derived from various sources (e.g., text, emotion, vision). It receives raw extracted data from other agents, normalizes them into valid :class:`Belief` objects, and forwards them as a unified packet to the BDI Core Agent. It serves as a funnel to ensure the BDI agent receives a consistent stream of beliefs. """ async def setup(self): """ Initialize the agent. """ self.logger.info("Setting up %s", self.name) async def handle_message(self, msg: InternalMessage): """ Handle incoming messages from other extractor agents. Routes the message to specific handlers based on the 'type' field in the JSON body. Supported types: - ``belief_extraction_text``: Handled by :meth:`_handle_belief_text` - ``emotion_extraction_text``: Handled by :meth:`_handle_emo_text` :param msg: The received internal message. """ sender_node = msg.sender # Parse JSON payload try: payload = json.loads(msg.body) except Exception as e: self.logger.warning( "BeliefCollector: failed to parse JSON from %s. Body=%r Error=%s", sender_node, msg.body, e, ) return msg_type = payload.get("type") # Prefer explicit 'type' field if msg_type == "belief_extraction_text": self.logger.debug("Message routed to _handle_belief_text (sender=%s)", sender_node) await self._handle_belief_text(payload, sender_node) # This is not implemented yet, but we keep the structure for future use elif msg_type == "emotion_extraction_text": self.logger.debug("Message routed to _handle_emo_text (sender=%s)", sender_node) await self._handle_emo_text(payload, sender_node) else: self.logger.warning( "Unrecognized message (sender=%s, type=%r). Ignoring.", sender_node, msg_type ) async def _handle_belief_text(self, payload: dict, origin: str): """ Process text-based belief extraction payloads. Expected payload format:: { "type": "belief_extraction_text", "beliefs": { "user_said": ["Can you help me?"], "intention": ["ask_help"] } } Validates and converts the dictionary items into :class:`Belief` objects. :param payload: The dictionary payload containing belief data. :param origin: The name of the sender agent. """ beliefs = payload.get("beliefs", {}) if not beliefs: self.logger.debug("Received empty beliefs set.") return def try_create_belief(name, arguments) -> Belief | None: """ Create a belief object from name and arguments, or return None silently if the input is not correct. :param name: The name of the belief. :param arguments: The arguments of the belief. :return: A Belief object if the input is valid or None. """ try: return Belief(name=name, arguments=arguments) except ValidationError: return None beliefs = [ belief for name, arguments in beliefs.items() if (belief := try_create_belief(name, arguments)) is not None ] self.logger.debug("Forwarding %d beliefs.", len(beliefs)) for belief in beliefs: for argument in belief.arguments: self.logger.debug(" - %s %s", belief.name, argument) await self._send_beliefs_to_bdi(beliefs, origin=origin) async def _handle_emo_text(self, payload: dict, origin: str): """ Process emotion extraction payloads. **TODO**: Implement this method once emotion recognition is integrated. :param payload: The dictionary payload containing emotion data. :param origin: The name of the sender agent. """ pass async def _send_beliefs_to_bdi(self, beliefs: list[Belief], origin: str | None = None): """ Send a list of aggregated beliefs to the BDI Core Agent. Wraps the beliefs in a :class:`BeliefMessage` and sends it via the 'beliefs' thread. :param beliefs: The list of Belief objects to send. :param origin: (Optional) The original source of the beliefs (unused currently). """ if not beliefs: return msg = InternalMessage( to=settings.agent_settings.bdi_core_name, sender=self.name, body=BeliefMessage(beliefs=beliefs).model_dump_json(), thread="beliefs", ) await self.send(msg) self.logger.info("Sent %d belief(s) to BDI core.", len(beliefs))