Feat: Implement belief collector #14
@@ -0,0 +1,109 @@
|
|||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from spade.behaviour import CyclicBehaviour
|
||||||
|
from spade.message import Message
|
||||||
|
from control_backend.core.config import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class ContinuousBeliefCollector(CyclicBehaviour):
|
||||||
|
"""
|
||||||
|
Continuously collects beliefs/emotions from extractor agents:
|
||||||
|
Then we send a unified belief packet to the BDI agent.
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def run(self):
|
||||||
|
msg = await self.receive(timeout=0.1) # Wait for 0.1s
|
||||||
|
if msg:
|
||||||
|
await self._process_message(msg)
|
||||||
|
|
||||||
|
|
||||||
|
async def _process_message(self, msg: Message):
|
||||||
|
sender_node = self._sender_node(msg)
|
||||||
|
|
||||||
|
# Parse JSON payload
|
||||||
|
try:
|
||||||
|
payload = json.loads(msg.body)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
"BeliefCollector: failed to parse JSON from %s. Body=%r Error=%s",
|
||||||
|
sender_node, msg.body, e
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
msg_type = payload.get("type")
|
||||||
|
|
||||||
|
# Prefer explicit 'type' field
|
||||||
|
if msg_type == "belief_extraction_text" or sender_node == "belief_text_agent_mock":
|
||||||
|
logger.info("BeliefCollector: message routed to _handle_belief_text (sender=%s)", sender_node)
|
||||||
|
await self._handle_belief_text(payload, sender_node)
|
||||||
|
#This is not implemented yet, but we keep the structure for future use
|
||||||
|
elif msg_type == "emotion_extraction_text" or sender_node == "emo_text_agent_mock":
|
||||||
|
logger.info("BeliefCollector: message routed to _handle_emo_text (sender=%s)", sender_node)
|
||||||
|
await self._handle_emo_text(payload, sender_node)
|
||||||
|
else:
|
||||||
|
logger.info(
|
||||||
|
"BeliefCollector: unrecognized message (sender=%s, type=%r). Ignoring.",
|
||||||
|
sender_node, msg_type
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _sender_node(msg: Message) -> str:
|
||||||
|
"""
|
||||||
|
Extracts the 'node' (localpart) of the sender JID.
|
||||||
|
E.g., 'agent@host/resource' -> 'agent'
|
||||||
|
"""
|
||||||
|
s = str(msg.sender) if msg.sender is not None else "no_sender"
|
||||||
|
return s.split("@", 1)[0] if "@" in s else s
|
||||||
|
|
||||||
|
|
||||||
|
async def _handle_belief_text(self, payload: dict, origin: str):
|
||||||
|
"""
|
||||||
|
Expected payload:
|
||||||
|
{
|
||||||
|
"type": "belief_extraction_text",
|
||||||
|
"beliefs": [
|
||||||
|
{"user_said": [["hello"],["Can you help me?"],["stop talking to me"],["No"],["Pepper do a dance"]]}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
"""
|
||||||
|
beliefs = payload.get("beliefs", [])
|
||||||
|
if not isinstance(beliefs, list):
|
||||||
|
logger.warning("BeliefCollector: 'beliefs' is not a list: %r", beliefs)
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info("BeliefCollector: forwarding %d beliefs.", len(beliefs))
|
||||||
|
for b in beliefs:
|
||||||
|
logger.info(" - %s", b)
|
||||||
|
|
||||||
|
await self._send_beliefs_to_bdi(beliefs, origin=origin)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def _handle_emo_text(self, payload: dict, origin: str):
|
||||||
|
"""TODO: implement (after we have emotional recogntion)"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
async def _send_beliefs_to_bdi(self, beliefs: list[str], origin: str | None = None):
|
||||||
|
"""
|
||||||
|
Sends a unified belief packet to the BDI agent.
|
||||||
|
"""
|
||||||
|
if not beliefs:
|
||||||
|
return
|
||||||
|
|
||||||
|
to_jid = f"{settings.agent_settings.bdi_core_agent_name}@{settings.agent_settings.host}"
|
||||||
|
|
||||||
|
packet = {
|
||||||
|
"type": "belief_packet",
|
||||||
|
"origin": origin,
|
||||||
|
"beliefs": beliefs,
|
||||||
|
}
|
||||||
|
|
||||||
|
msg = Message(to=to_jid)
|
||||||
|
msg.body = json.dumps(packet)
|
||||||
|
|
||||||
|
|
||||||
|
await self.send(msg)
|
||||||
|
logger.info("BeliefCollector: sent %d belief(s) to BDI at %s", len(beliefs), to_jid)
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
import logging
|
||||||
|
from spade.agent import Agent
|
||||||
|
|
||||||
|
from .behaviours.continuous_collect import ContinuousBeliefCollector
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class BeliefCollectorAgent(Agent):
|
||||||
|
async def setup(self):
|
||||||
|
logger.info("BeliefCollectorAgent starting (%s)", self.jid)
|
||||||
|
# Attach the continuous collector behaviour (listens and forwards to BDI)
|
||||||
|
self.add_behaviour(ContinuousBeliefCollector())
|
||||||
|
logger.info("BeliefCollectorAgent ready.")
|
||||||
0
src/control_backend/agents/mock_agents/__init__.py
Normal file
0
src/control_backend/agents/mock_agents/__init__.py
Normal file
31
src/control_backend/agents/mock_agents/belief_text_mock.py
Normal file
31
src/control_backend/agents/mock_agents/belief_text_mock.py
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
import json
|
||||||
|
from spade.agent import Agent
|
||||||
|
from spade.behaviour import OneShotBehaviour
|
||||||
|
from spade.message import Message
|
||||||
|
from control_backend.core.config import settings
|
||||||
|
|
||||||
|
class BeliefTextAgent(Agent):
|
||||||
|
class SendOnceBehaviourBlfText(OneShotBehaviour):
|
||||||
|
async def run(self):
|
||||||
|
to_jid = f"{settings.agent_settings.belief_collector_agent_name}@{settings.agent_settings.host}"
|
||||||
|
|
||||||
|
# Send multiple beliefs in one JSON payload
|
||||||
|
payload = {
|
||||||
|
"type": "belief_extraction_text",
|
||||||
|
"beliefs": [
|
||||||
|
{"user_said": [["hello"],["Can you help me?"],["stop talking to me"],["No"],["Pepper do a dance"]]}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
msg = Message(to=to_jid)
|
||||||
|
msg.body = json.dumps(payload)
|
||||||
|
await self.send(msg)
|
||||||
|
print(f"Beliefs sent to {to_jid}!")
|
||||||
|
|
||||||
|
self.exit_code = "Job Finished!"
|
||||||
|
await self.agent.stop()
|
||||||
|
|
||||||
|
async def setup(self):
|
||||||
|
print("BeliefTextAgent started")
|
||||||
|
self.b = self.SendOnceBehaviourBlfText()
|
||||||
|
self.add_behaviour(self.b)
|
||||||
@@ -10,6 +10,9 @@ class AgentSettings(BaseModel):
|
|||||||
bdi_core_agent_name: str = "bdi_core"
|
bdi_core_agent_name: str = "bdi_core"
|
||||||
belief_collector_agent_name: str = "belief_collector"
|
belief_collector_agent_name: str = "belief_collector"
|
||||||
test_agent_name: str = "test_agent"
|
test_agent_name: str = "test_agent"
|
||||||
|
#mock agents for belief collector
|
||||||
|
emo_text_agent_mock_name: str = "emo_text_agent_mock"
|
||||||
|
belief_text_agent_mock_name: str = "belief_text_agent_mock"
|
||||||
|
|
||||||
class Settings(BaseSettings):
|
class Settings(BaseSettings):
|
||||||
app_title: str = "PepperPlus"
|
app_title: str = "PepperPlus"
|
||||||
|
|||||||
@@ -12,11 +12,17 @@ from spade.behaviour import OneShotBehaviour
|
|||||||
import zmq
|
import zmq
|
||||||
|
|
||||||
# Internal imports
|
# Internal imports
|
||||||
from control_backend.agents.bdi.bdi_core import BDICoreAgent
|
|
||||||
from control_backend.api.v1.router import api_router
|
from control_backend.api.v1.router import api_router
|
||||||
from control_backend.core.config import AgentSettings, settings
|
from control_backend.core.config import AgentSettings, settings
|
||||||
from control_backend.core.zmq_context import context
|
from control_backend.core.zmq_context import context
|
||||||
|
|
||||||
|
|
||||||
|
# Agents
|
||||||
|
from control_backend.agents.bdi.bdi_core import BDICoreAgent
|
||||||
|
from control_backend.agents.belief_collector.belief_collector import BeliefCollectorAgent
|
||||||
|
from control_backend.agents.mock_agents.emo_text_mock import EmoTextAgent
|
||||||
|
from control_backend.agents.mock_agents.belief_text_mock import BeliefTextAgent
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
|
||||||
@@ -32,8 +38,26 @@ async def lifespan(app: FastAPI):
|
|||||||
logger.info("Internal publishing socket bound to %s", internal_comm_socket)
|
logger.info("Internal publishing socket bound to %s", internal_comm_socket)
|
||||||
|
|
||||||
# Initiate agents
|
# Initiate agents
|
||||||
bdi_core = BDICoreAgent(settings.agent_settings.bdi_core_agent_name + '@' + settings.agent_settings.host, settings.agent_settings.bdi_core_agent_name, "src/control_backend/agents/bdi/rules.asl")
|
host = settings.agent_settings.host
|
||||||
|
|
||||||
|
bdi_core = BDICoreAgent(
|
||||||
|
settings.agent_settings.bdi_core_agent_name + '@' + host,
|
||||||
|
settings.agent_settings.bdi_core_agent_name,
|
||||||
|
"src/control_backend/agents/bdi/rules.asl"
|
||||||
|
)
|
||||||
|
|
||||||
|
belief_collector = BeliefCollectorAgent(
|
||||||
|
settings.agent_settings.belief_collector_agent_name + '@' + host,
|
||||||
|
settings.agent_settings.belief_collector_agent_name
|
||||||
|
)
|
||||||
|
belief_text_mock = BeliefTextAgent(
|
||||||
|
settings.agent_settings.belief_text_agent_mock_name + '@' + host,
|
||||||
|
settings.agent_settings.belief_text_agent_mock_name
|
||||||
|
)
|
||||||
|
|
||||||
await bdi_core.start()
|
await bdi_core.start()
|
||||||
|
await belief_collector.start()
|
||||||
|
await belief_text_mock.start()
|
||||||
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user