docs: update existing docstrings and add new docs

ref: N25B-453
This commit is contained in:
Twirre Meulenbelt
2026-01-26 16:04:01 +01:00
parent b9df47b7d1
commit d8dc558d3e
2 changed files with 59 additions and 10 deletions

View File

@@ -25,11 +25,12 @@ class BDIProgramManager(BaseAgent):
BDI Program Manager Agent.
This agent is responsible for receiving high-level programs (sequences of instructions/goals)
from the external HTTP API (via ZMQ) and translating them into core beliefs (norms and goals)
for the BDI Core Agent. In the future, it will be responsible for determining when goals are
met, and passing on new norms and goals accordingly.
from the external HTTP API (via ZMQ), transforming it into an AgentSpeak program, sharing the
program and its components to other agents, and keeping agents informed of the current state.
:ivar sub_socket: The ZMQ SUB socket used to receive program updates.
:ivar _program: The current Program.
:ivar _phase: The current Phase.
"""
_program: Program
@@ -40,6 +41,12 @@ class BDIProgramManager(BaseAgent):
self.sub_socket = None
def _initialize_internal_state(self, program: Program):
"""
Initialize the state of the program manager given a new Program. Reset the tracking of the
current phase to the first phase, make a mapping of goal IDs to goals, used during the life
of the program.
:param program: The new program.
"""
self._program = program
self._phase = program.phases[0] # start in first phase
self._goal_mapping: dict[str, Goal] = {}
@@ -48,6 +55,11 @@ class BDIProgramManager(BaseAgent):
self._populate_goal_mapping_with_goal(goal)
def _populate_goal_mapping_with_goal(self, goal: Goal):
"""
Recurse through the given goal and its subgoals and add all goals found to the
``self._goal_mapping``.
:param goal: The goal to add to the ``self._goal_mapping``, including subgoals.
"""
self._goal_mapping[str(goal.id)] = goal
for step in goal.plan.steps:
if isinstance(step, Goal):
@@ -88,6 +100,13 @@ class BDIProgramManager(BaseAgent):
await self._send_achieved_goal_to_semantic_belief_extractor(goal_id)
async def _transition_phase(self, old: str, new: str):
"""
When receiving a signal from the BDI core that the phase has changed, apply this change to
the current state and inform other agents about the change.
:param old: The ID of the old phase.
:param new: The ID of the new phase.
"""
if old != str(self._phase.id):
self.logger.warning(
f"Phase transition desync detected! ASL requested move from '{old}', "
@@ -126,6 +145,7 @@ class BDIProgramManager(BaseAgent):
self.add_behavior(self.send(msg))
def _extract_current_beliefs(self) -> list[Belief]:
"""Extract beliefs from the current phase."""
beliefs: list[Belief] = []
for norm in self._phase.norms:
@@ -139,6 +159,7 @@ class BDIProgramManager(BaseAgent):
@staticmethod
def _extract_beliefs_from_belief(belief: Belief) -> list[Belief]:
"""Recursively extract beliefs from the given belief."""
if isinstance(belief, InferredBelief):
return BDIProgramManager._extract_beliefs_from_belief(
belief.left
@@ -146,9 +167,7 @@ class BDIProgramManager(BaseAgent):
return [belief]
async def _send_beliefs_to_semantic_belief_extractor(self):
"""
Extract beliefs from the program and send them to the Semantic Belief Extractor Agent.
"""
"""Extract beliefs from the program and send them to the Semantic Belief Extractor Agent."""
beliefs = BeliefList(beliefs=self._extract_current_beliefs())
message = InternalMessage(

View File

@@ -134,6 +134,10 @@ class TextBeliefExtractorAgent(BaseAgent):
self.logger.warning("Received unexpected message from %s", msg.sender)
def _reset_phase(self):
"""
Delete all state about the current phase, such as what beliefs exist and which ones are
true.
"""
self.conversation = ChatHistory(messages=[])
self.belief_inferrer.available_beliefs.clear()
self._current_beliefs = BeliefState()
@@ -141,6 +145,11 @@ class TextBeliefExtractorAgent(BaseAgent):
self._current_goal_completions = {}
def _handle_beliefs_message(self, msg: InternalMessage):
"""
Handle the message from the Program Manager agent containing the beliefs that exist for this
phase.
:param msg: A list of beliefs.
"""
try:
belief_list = BeliefList.model_validate_json(msg.body)
except ValidationError:
@@ -158,6 +167,11 @@ class TextBeliefExtractorAgent(BaseAgent):
)
def _handle_goals_message(self, msg: InternalMessage):
"""
Handle the message from the Program Manager agent containing the goals that exist for this
phase.
:param msg: A list of goals.
"""
try:
goals_list = GoalList.model_validate_json(msg.body)
except ValidationError:
@@ -177,6 +191,11 @@ class TextBeliefExtractorAgent(BaseAgent):
)
def _handle_goal_achieved_message(self, msg: InternalMessage):
"""
Handle message that gets sent when goals are marked achieved from a user interrupt. This
goal should then not be changed by this agent anymore.
:param msg: List of goals that are marked achieved.
"""
# NOTE: When goals can be marked unachieved, remember to re-add them to the goal_inferrer
try:
goals_list = GoalList.model_validate_json(msg.body)
@@ -210,6 +229,10 @@ class TextBeliefExtractorAgent(BaseAgent):
await self.send(belief_msg)
async def _infer_new_beliefs(self):
"""
Determine which beliefs hold and do not hold for the current conversation state. When
beliefs change, a message is sent to the BDI core.
"""
conversation_beliefs = await self.belief_inferrer.infer_from_conversation(self.conversation)
new_beliefs = conversation_beliefs - self._current_beliefs
@@ -233,6 +256,10 @@ class TextBeliefExtractorAgent(BaseAgent):
await self.send(message)
async def _infer_goal_completions(self):
"""
Determine which goals have been achieved given the current conversation state. When
a goal's achieved state changes, a message is sent to the BDI core.
"""
goal_completions = await self.goal_inferrer.infer_from_conversation(self.conversation)
new_achieved = [
@@ -374,19 +401,22 @@ class SemanticBeliefInferrer:
for beliefs in self._split_into_chunks(self.available_beliefs, n_parallel)
]
)
retval = BeliefState()
new_beliefs = BeliefState()
# Collect beliefs from all parallel calls
for beliefs in all_beliefs:
if beliefs is None:
continue
# For each, convert them to InternalBeliefs
for belief_name, belief_holds in beliefs.items():
# Skip beliefs that were marked not possible to determine
if belief_holds is None:
continue
belief = InternalBelief(name=belief_name, arguments=None)
if belief_holds:
retval.true.add(belief)
new_beliefs.true.add(belief)
else:
retval.false.add(belief)
return retval
new_beliefs.false.add(belief)
return new_beliefs
@staticmethod
def _split_into_chunks[T](items: list[T], n: int) -> list[list[T]]: