perf: improved speed of BDI
By efficiently checking when the next work has to be done, we can increase performance not having to "busy loop". Time from transcription -> message to LLM agent is now down to sub 1 millisecond. ref: N25B-316
This commit is contained in:
88
src/control_backend/agents/bdi/belief_collector_agent.py
Normal file
88
src/control_backend/agents/bdi/belief_collector_agent.py
Normal file
@@ -0,0 +1,88 @@
|
||||
import json
|
||||
|
||||
from control_backend.agents.base import BaseAgent
|
||||
from control_backend.core.agent_system import InternalMessage
|
||||
from control_backend.core.config import settings
|
||||
|
||||
|
||||
class BDIBeliefCollectorAgent(BaseAgent):
|
||||
"""
|
||||
Continuously collects beliefs/emotions from extractor agents and forwards a
|
||||
unified belief packet to the BDI agent.
|
||||
"""
|
||||
|
||||
async def setup(self):
|
||||
self.logger.info("Setting up %s", self.name)
|
||||
|
||||
async def handle_message(self, msg: InternalMessage):
|
||||
sender_node = msg.sender
|
||||
|
||||
# Parse JSON payload
|
||||
try:
|
||||
payload = json.loads(msg.body)
|
||||
except Exception as e:
|
||||
self.logger.warning(
|
||||
"BeliefCollector: failed to parse JSON from %s. Body=%r Error=%s",
|
||||
sender_node,
|
||||
msg.body,
|
||||
e,
|
||||
)
|
||||
return
|
||||
|
||||
msg_type = payload.get("type")
|
||||
|
||||
# Prefer explicit 'type' field
|
||||
if msg_type == "belief_extraction_text":
|
||||
self.logger.debug("Message routed to _handle_belief_text (sender=%s)", sender_node)
|
||||
await self._handle_belief_text(payload, sender_node)
|
||||
# This is not implemented yet, but we keep the structure for future use
|
||||
elif msg_type == "emotion_extraction_text":
|
||||
self.logger.debug("Message routed to _handle_emo_text (sender=%s)", sender_node)
|
||||
await self._handle_emo_text(payload, sender_node)
|
||||
else:
|
||||
self.logger.warning(
|
||||
"Unrecognized message (sender=%s, type=%r). Ignoring.", sender_node, msg_type
|
||||
)
|
||||
|
||||
async def _handle_belief_text(self, payload: dict, origin: str):
|
||||
"""
|
||||
Expected payload:
|
||||
{
|
||||
"type": "belief_extraction_text",
|
||||
"beliefs": {"user_said": ["Can you help me?"]}
|
||||
|
||||
}
|
||||
"""
|
||||
beliefs = payload.get("beliefs", {})
|
||||
|
||||
if not beliefs:
|
||||
self.logger.debug("Received empty beliefs set.")
|
||||
return
|
||||
|
||||
self.logger.debug("Forwarding %d beliefs.", len(beliefs))
|
||||
for belief_name, belief_list in beliefs.items():
|
||||
for belief in belief_list:
|
||||
self.logger.debug(" - %s %s", belief_name, str(belief))
|
||||
|
||||
await self._send_beliefs_to_bdi(beliefs, origin=origin)
|
||||
|
||||
async def _handle_emo_text(self, payload: dict, origin: str):
|
||||
"""TODO: implement (after we have emotional recognition)"""
|
||||
pass
|
||||
|
||||
async def _send_beliefs_to_bdi(self, beliefs: dict, origin: str | None = None):
|
||||
"""
|
||||
Sends a unified belief packet to the BDI agent.
|
||||
"""
|
||||
if not beliefs:
|
||||
return
|
||||
|
||||
msg = InternalMessage(
|
||||
to=settings.agent_settings.bdi_core_name,
|
||||
sender=self.name,
|
||||
body=json.dumps(beliefs),
|
||||
thread="beliefs",
|
||||
)
|
||||
|
||||
await self.send(msg)
|
||||
self.logger.info("Sent %d belief(s) to BDI core.", len(beliefs))
|
||||
Reference in New Issue
Block a user