feat: integrate AgentSpeak with semantic belief extraction

ref: N25B-429
This commit is contained in:
Twirre Meulenbelt
2026-01-07 11:44:48 +01:00
parent de8e829d3e
commit cabe35cdbd
10 changed files with 120 additions and 60 deletions

View File

@@ -332,7 +332,13 @@ class AgentSpeakGenerator:
@_astify.register @_astify.register
def _(self, sb: SemanticBelief) -> AstExpression: def _(self, sb: SemanticBelief) -> AstExpression:
return AstLiteral(f"semantic_{self._slugify_str(sb.description)}") return AstLiteral(self.get_semantic_belief_slug(sb))
@staticmethod
def get_semantic_belief_slug(sb: SemanticBelief) -> str:
# If you need a method like this for other types, make a public slugify singledispatch for
# all types.
return f"semantic_{AgentSpeakGenerator._slugify_str(sb.name)}"
@_astify.register @_astify.register
def _(self, ib: InferredBelief) -> AstExpression: def _(self, ib: InferredBelief) -> AstExpression:

View File

@@ -1,3 +1,5 @@
import asyncio
import zmq import zmq
from pydantic import ValidationError from pydantic import ValidationError
from zmq.asyncio import Context from zmq.asyncio import Context
@@ -5,8 +7,9 @@ from zmq.asyncio import Context
from control_backend.agents import BaseAgent from control_backend.agents import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList
from control_backend.schemas.internal_message import InternalMessage from control_backend.schemas.internal_message import InternalMessage
from control_backend.schemas.program import Program from control_backend.schemas.program import Belief, ConditionalNorm, InferredBelief, Program
class BDIProgramManager(BaseAgent): class BDIProgramManager(BaseAgent):
@@ -56,6 +59,45 @@ class BDIProgramManager(BaseAgent):
await self.send(msg) await self.send(msg)
@staticmethod
def _extract_beliefs_from_program(program: Program) -> list[Belief]:
beliefs: list[Belief] = []
for phase in program.phases:
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += BDIProgramManager._extract_beliefs_from_belief(norm.condition)
for trigger in phase.triggers:
beliefs += BDIProgramManager._extract_beliefs_from_belief(trigger.condition)
return beliefs
@staticmethod
def _extract_beliefs_from_belief(belief: Belief) -> list[Belief]:
if isinstance(belief, InferredBelief):
return BDIProgramManager._extract_beliefs_from_belief(
belief.left
) + BDIProgramManager._extract_beliefs_from_belief(belief.right)
return [belief]
async def _send_beliefs_to_semantic_belief_extractor(self, program: Program):
"""
Extract beliefs from the program and send them to the Semantic Belief Extractor Agent.
:param program: The program received from the API.
"""
beliefs = BeliefList(beliefs=self._extract_beliefs_from_program(program))
message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=self.name,
body=beliefs.model_dump_json(),
thread="beliefs",
)
await self.send(message)
async def _receive_programs(self): async def _receive_programs(self):
""" """
Continuous loop that receives program updates from the HTTP endpoint. Continuous loop that receives program updates from the HTTP endpoint.
@@ -72,7 +114,10 @@ class BDIProgramManager(BaseAgent):
self.logger.exception("Received an invalid program.") self.logger.exception("Received an invalid program.")
continue continue
await self._create_agentspeak_and_send_to_bdi(program) await asyncio.gather(
self._create_agentspeak_and_send_to_bdi(program),
self._send_beliefs_to_semantic_belief_extractor(program),
)
async def setup(self): async def setup(self):
""" """

View File

@@ -3,21 +3,16 @@ import json
import httpx import httpx
from pydantic import ValidationError from pydantic import ValidationError
from slugify import slugify
from control_backend.agents.base import BaseAgent from control_backend.agents.base import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.agent_system import InternalMessage from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList
from control_backend.schemas.belief_message import Belief as InternalBelief from control_backend.schemas.belief_message import Belief as InternalBelief
from control_backend.schemas.belief_message import BeliefMessage from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.chat_history import ChatHistory, ChatMessage from control_backend.schemas.chat_history import ChatHistory, ChatMessage
from control_backend.schemas.program import ( from control_backend.schemas.program import SemanticBelief
Belief,
ConditionalNorm,
InferredBelief,
Program,
SemanticBelief,
)
class TextBeliefExtractorAgent(BaseAgent): class TextBeliefExtractorAgent(BaseAgent):
@@ -32,11 +27,12 @@ class TextBeliefExtractorAgent(BaseAgent):
the message itself. the message itself.
""" """
def __init__(self, name: str): def __init__(self, name: str, temperature: float = settings.llm_settings.code_temperature):
super().__init__(name) super().__init__(name)
self.beliefs: dict[str, bool] = {} self.beliefs: dict[str, bool] = {}
self.available_beliefs: list[SemanticBelief] = [] self.available_beliefs: list[SemanticBelief] = []
self.conversation = ChatHistory(messages=[]) self.conversation = ChatHistory(messages=[])
self.temperature = temperature
async def setup(self): async def setup(self):
""" """
@@ -85,45 +81,19 @@ class TextBeliefExtractorAgent(BaseAgent):
:param msg: The received message from the program manager. :param msg: The received message from the program manager.
""" """
try: try:
program = Program.model_validate_json(msg.body) belief_list = BeliefList.model_validate_json(msg.body)
except ValidationError: except ValidationError:
self.logger.warning( self.logger.warning(
"Received message from program manager but it is not a valid program." "Received message from program manager but it is not a valid list of beliefs."
) )
return return
self.logger.debug("Received a program from the program manager.") self.available_beliefs = [b for b in belief_list.beliefs if isinstance(b, SemanticBelief)]
self.logger.debug(
self.available_beliefs = self._extract_basic_beliefs_from_program(program) "Received %d beliefs from the program manager.",
len(self.available_beliefs),
# TODO Copied from an incomplete version of the program manager. Use that one instead.
@staticmethod
def _extract_basic_beliefs_from_program(program: Program) -> list[SemanticBelief]:
beliefs = []
for phase in program.phases:
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += TextBeliefExtractorAgent._extract_basic_beliefs_from_belief(
norm.condition
) )
for trigger in phase.triggers:
beliefs += TextBeliefExtractorAgent._extract_basic_beliefs_from_belief(
trigger.condition
)
return beliefs
# TODO Copied from an incomplete version of the program manager. Use that one instead.
@staticmethod
def _extract_basic_beliefs_from_belief(belief: Belief) -> list[SemanticBelief]:
if isinstance(belief, InferredBelief):
return TextBeliefExtractorAgent._extract_basic_beliefs_from_belief(
belief.left
) + TextBeliefExtractorAgent._extract_basic_beliefs_from_belief(belief.right)
return [belief]
async def _user_said(self, text: str): async def _user_said(self, text: str):
""" """
Create a belief for the user's full speech. Create a belief for the user's full speech.
@@ -207,8 +177,7 @@ class TextBeliefExtractorAgent(BaseAgent):
@staticmethod @staticmethod
def _create_belief_schema(belief: SemanticBelief) -> tuple[str, dict]: def _create_belief_schema(belief: SemanticBelief) -> tuple[str, dict]:
# TODO: use real belief names return AgentSpeakGenerator.get_semantic_belief_slug(belief), {
return belief.name or slugify(belief.description), {
"type": ["boolean", "null"], "type": ["boolean", "null"],
"description": belief.description, "description": belief.description,
} }
@@ -237,10 +206,9 @@ class TextBeliefExtractorAgent(BaseAgent):
@staticmethod @staticmethod
def _format_beliefs(beliefs: list[SemanticBelief]): def _format_beliefs(beliefs: list[SemanticBelief]):
# TODO: use real belief names
return "\n".join( return "\n".join(
[ [
f"- {belief.name or slugify(belief.description)}: {belief.description}" f"- {AgentSpeakGenerator.get_semantic_belief_slug(belief)}: {belief.description}"
for belief in beliefs for belief in beliefs
] ]
) )
@@ -267,7 +235,7 @@ Given the above conversation, what beliefs can be inferred?
If there is no relevant information about a belief belief, give null. If there is no relevant information about a belief belief, give null.
In case messages conflict, prefer using the most recent messages for inference. In case messages conflict, prefer using the most recent messages for inference.
Choose from the following list of beliefs, formatted as (belief_name, description): Choose from the following list of beliefs, formatted as `- <belief_name>: <description>`:
{self._format_beliefs(beliefs)} {self._format_beliefs(beliefs)}
Respond with a JSON similar to the following, but with the property names as given above: Respond with a JSON similar to the following, but with the property names as given above:
@@ -304,8 +272,7 @@ Respond with a JSON similar to the following, but with the property names as giv
return None return None
@staticmethod async def _query_llm(self, prompt: str, schema: dict) -> dict:
async def _query_llm(prompt: str, schema: dict) -> dict:
""" """
Query an LLM with the given prompt and schema, return an instance of a dict conforming to Query an LLM with the given prompt and schema, return an instance of a dict conforming to
that schema. that schema.
@@ -333,7 +300,7 @@ Respond with a JSON similar to the following, but with the property names as giv
}, },
}, },
"reasoning_effort": "low", "reasoning_effort": "low",
"temperature": settings.llm_settings.code_temperature, "temperature": self.temperature,
"stream": False, "stream": False,
}, },
timeout=None, timeout=None,
@@ -342,4 +309,5 @@ Respond with a JSON similar to the following, but with the property names as giv
response_json = response.json() response_json = response.json()
json_message = response_json["choices"][0]["message"]["content"] json_message = response_json["choices"][0]["message"]["content"]
return json.loads(json_message) beliefs = json.loads(json_message)
return beliefs

View File

@@ -0,0 +1,14 @@
from pydantic import BaseModel
from control_backend.schemas.program import Belief as ProgramBelief
class BeliefList(BaseModel):
"""
Represents a list of beliefs, separated from a program. Useful in agents which need to
communicate beliefs.
:ivar: beliefs: The list of beliefs.
"""
beliefs: list[ProgramBelief]

View File

@@ -43,7 +43,6 @@ class SemanticBelief(ProgramElement):
:ivar description: Description of how to form the belief, used by the LLM. :ivar description: Description of how to form the belief, used by the LLM.
""" """
name: str = ""
description: str description: str
@@ -113,10 +112,12 @@ class Goal(ProgramElement):
for example when the achieving of the goal is dependent on the user's reply, this means for example when the achieving of the goal is dependent on the user's reply, this means
that the achieved status will be set from somewhere else in the program. that the achieved status will be set from somewhere else in the program.
:ivar description: A description of the goal, used to determine if it has been achieved.
:ivar plan: The plan to execute. :ivar plan: The plan to execute.
:ivar can_fail: Whether we can fail to achieve the goal after executing the plan. :ivar can_fail: Whether we can fail to achieve the goal after executing the plan.
""" """
description: str
plan: Plan plan: Plan
can_fail: bool = True can_fail: bool = True

View File

@@ -20,7 +20,7 @@ def mock_agentspeak_env():
@pytest.fixture @pytest.fixture
def agent(): def agent():
agent = BDICoreAgent("bdi_agent", "dummy.asl") agent = BDICoreAgent("bdi_agent")
agent.send = AsyncMock() agent.send = AsyncMock()
agent.bdi_agent = MagicMock() agent.bdi_agent = MagicMock()
return agent return agent
@@ -133,14 +133,14 @@ async def test_custom_actions(agent):
# Invoke action # Invoke action
mock_term = MagicMock() mock_term = MagicMock()
mock_term.args = ["Hello", "Norm", "Goal"] mock_term.args = ["Hello", "Norm"]
mock_intention = MagicMock() mock_intention = MagicMock()
# Run generator # Run generator
gen = action_fn(agent, mock_term, mock_intention) gen = action_fn(agent, mock_term, mock_intention)
next(gen) # Execute next(gen) # Execute
agent._send_to_llm.assert_called_with("Hello", "Norm", "Goal") agent._send_to_llm.assert_called_with("Hello", "Norm", "")
def test_add_belief_sets_event(agent): def test_add_belief_sets_event(agent):

View File

@@ -32,6 +32,8 @@ def make_valid_program_json(norm="N1", goal="G1") -> str:
Goal( Goal(
id=uuid.uuid4(), id=uuid.uuid4(),
name=goal, name=goal,
description="This description can be used to determine whether the goal "
"has been achieved.",
plan=Plan( plan=Plan(
id=uuid.uuid4(), id=uuid.uuid4(),
name="Goal Plan", name="Goal Plan",
@@ -75,6 +77,7 @@ async def test_receive_programs_valid_and_invalid():
] ]
manager = BDIProgramManager(name="program_manager_test") manager = BDIProgramManager(name="program_manager_test")
manager._internal_pub_socket = AsyncMock()
manager.sub_socket = sub manager.sub_socket = sub
manager._create_agentspeak_and_send_to_bdi = AsyncMock() manager._create_agentspeak_and_send_to_bdi = AsyncMock()

View File

@@ -8,9 +8,11 @@ import pytest
from control_backend.agents.bdi import TextBeliefExtractorAgent from control_backend.agents.bdi import TextBeliefExtractorAgent
from control_backend.core.agent_system import InternalMessage from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList
from control_backend.schemas.belief_message import BeliefMessage from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.program import ( from control_backend.schemas.program import (
ConditionalNorm, ConditionalNorm,
KeywordBelief,
LLMAction, LLMAction,
Phase, Phase,
Plan, Plan,
@@ -186,13 +188,31 @@ async def test_retry_query_llm_fail_immediately(agent):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_extracting_beliefs_from_program(agent, sample_program): async def test_extracting_semantic_beliefs(agent):
"""
The Program Manager sends beliefs to this agent. Test whether the agent handles them correctly.
"""
assert len(agent.available_beliefs) == 0 assert len(agent.available_beliefs) == 0
beliefs = BeliefList(
beliefs=[
KeywordBelief(
id=uuid.uuid4(),
name="keyword_hello",
keyword="hello",
),
SemanticBelief(
id=uuid.uuid4(), name="semantic_hello_1", description="Some semantic belief 1"
),
SemanticBelief(
id=uuid.uuid4(), name="semantic_hello_2", description="Some semantic belief 2"
),
]
)
await agent.handle_message( await agent.handle_message(
InternalMessage( InternalMessage(
to=settings.agent_settings.text_belief_extractor_name, to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.bdi_program_manager_name, sender=settings.agent_settings.bdi_program_manager_name,
body=sample_program.model_dump_json(), body=beliefs.model_dump_json(),
), ),
) )
assert len(agent.available_beliefs) == 2 assert len(agent.available_beliefs) == 2

View File

@@ -43,6 +43,8 @@ def make_valid_program_dict():
Goal( Goal(
id=uuid.uuid4(), id=uuid.uuid4(),
name="Some goal", name="Some goal",
description="This description can be used to determine whether the goal "
"has been achieved.",
plan=Plan( plan=Plan(
id=uuid.uuid4(), id=uuid.uuid4(),
name="Goal Plan", name="Goal Plan",

View File

@@ -31,6 +31,7 @@ def base_goal() -> Goal:
return Goal( return Goal(
id=uuid.uuid4(), id=uuid.uuid4(),
name="testGoalName", name="testGoalName",
description="This description can be used to determine whether the goal has been achieved.",
plan=Plan( plan=Plan(
id=uuid.uuid4(), id=uuid.uuid4(),
name="testGoalPlanName", name="testGoalPlanName",