Compare commits

..

3 Commits

Author SHA1 Message Date
Twirre Meulenbelt
93d67ccb66 feat: add reset functionality to semantic belief extractor
ref: N25B-432
2026-01-07 17:50:47 +01:00
Twirre Meulenbelt
aa5b386f65 feat: semantically determine goal completion
ref: N25B-432
2026-01-07 17:08:23 +01:00
Twirre Meulenbelt
3189b9fee3 fix: let belief extractor send user_said belief
ref: N25B-429
2026-01-07 15:19:23 +01:00
13 changed files with 396 additions and 336 deletions

View File

@@ -83,8 +83,6 @@ class RobotGestureAgent(BaseAgent):
self.subsocket.close()
if self.pubsocket:
self.pubsocket.close()
if self.repsocket:
self.repsocket.close()
await super().stop()
async def handle_message(self, msg: InternalMessage):

View File

@@ -205,12 +205,15 @@ class BDICoreAgent(BaseAgent):
self.logger.debug(f"Added belief {self.format_belief_string(name, args)}")
def _remove_belief(self, name: str, args: Iterable[str]):
def _remove_belief(self, name: str, args: Iterable[str] | None):
"""
Removes a specific belief (with arguments), if it exists.
"""
new_args = (agentspeak.Literal(arg) for arg in args)
term = agentspeak.Literal(name, new_args)
if args is None:
term = agentspeak.Literal(name)
else:
new_args = (agentspeak.Literal(arg) for arg in args)
term = agentspeak.Literal(name, new_args)
result = self.bdi_agent.call(
agentspeak.Trigger.removal,
@@ -346,8 +349,8 @@ class BDICoreAgent(BaseAgent):
self.logger.info("Message sent to LLM agent: %s", text)
@staticmethod
def format_belief_string(name: str, args: Iterable[str] = []):
def format_belief_string(name: str, args: Iterable[str] | None = []):
"""
Given a belief's name and its args, return a string of the form "name(*args)"
"""
return f"{name}{'(' if args else ''}{','.join(args)}{')' if args else ''}"
return f"{name}{'(' if args else ''}{','.join(args or [])}{')' if args else ''}"

View File

@@ -7,9 +7,9 @@ from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList
from control_backend.schemas.belief_list import BeliefList, GoalList
from control_backend.schemas.internal_message import InternalMessage
from control_backend.schemas.program import Belief, ConditionalNorm, InferredBelief, Program
from control_backend.schemas.program import Belief, ConditionalNorm, Goal, InferredBelief, Program
class BDIProgramManager(BaseAgent):
@@ -63,24 +63,23 @@ class BDIProgramManager(BaseAgent):
def _extract_beliefs_from_program(program: Program) -> list[Belief]:
beliefs: list[Belief] = []
def extract_beliefs_from_belief(belief: Belief) -> list[Belief]:
if isinstance(belief, InferredBelief):
return extract_beliefs_from_belief(belief.left) + extract_beliefs_from_belief(
belief.right
)
return [belief]
for phase in program.phases:
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += BDIProgramManager._extract_beliefs_from_belief(norm.condition)
beliefs += extract_beliefs_from_belief(norm.condition)
for trigger in phase.triggers:
beliefs += BDIProgramManager._extract_beliefs_from_belief(trigger.condition)
beliefs += extract_beliefs_from_belief(trigger.condition)
return beliefs
@staticmethod
def _extract_beliefs_from_belief(belief: Belief) -> list[Belief]:
if isinstance(belief, InferredBelief):
return BDIProgramManager._extract_beliefs_from_belief(
belief.left
) + BDIProgramManager._extract_beliefs_from_belief(belief.right)
return [belief]
async def _send_beliefs_to_semantic_belief_extractor(self, program: Program):
"""
Extract beliefs from the program and send them to the Semantic Belief Extractor Agent.
@@ -98,6 +97,46 @@ class BDIProgramManager(BaseAgent):
await self.send(message)
@staticmethod
def _extract_goals_from_program(program: Program) -> list[Goal]:
"""
Extract all goals from the program, including subgoals.
:param program: The program received from the API.
:return: A list of Goal objects.
"""
goals: list[Goal] = []
def extract_goals_from_goal(goal_: Goal) -> list[Goal]:
goals_: list[Goal] = [goal]
for plan in goal_.plan:
if isinstance(plan, Goal):
goals_.extend(extract_goals_from_goal(plan))
return goals_
for phase in program.phases:
for goal in phase.goals:
goals.extend(extract_goals_from_goal(goal))
return goals
async def _send_goals_to_semantic_belief_extractor(self, program: Program):
"""
Extract goals from the program and send them to the Semantic Belief Extractor Agent.
:param program: The program received from the API.
"""
goals = GoalList(goals=self._extract_goals_from_program(program))
message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=self.name,
body=goals.model_dump_json(),
thread="goals",
)
await self.send(message)
async def _receive_programs(self):
"""
Continuous loop that receives program updates from the HTTP endpoint.
@@ -117,6 +156,7 @@ class BDIProgramManager(BaseAgent):
await asyncio.gather(
self._create_agentspeak_and_send_to_bdi(program),
self._send_beliefs_to_semantic_belief_extractor(program),
self._send_goals_to_semantic_belief_extractor(program),
)
async def setup(self):

View File

@@ -101,7 +101,7 @@ class BDIBeliefCollectorAgent(BaseAgent):
:return: A Belief object if the input is valid or None.
"""
try:
return Belief(name=name, arguments=arguments, replace=name == "user_said")
return Belief(name=name, arguments=arguments)
except ValidationError:
return None

View File

@@ -2,17 +2,45 @@ import asyncio
import json
import httpx
from pydantic import ValidationError
from pydantic import BaseModel, ValidationError
from control_backend.agents.base import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList
from control_backend.schemas.belief_list import BeliefList, GoalList
from control_backend.schemas.belief_message import Belief as InternalBelief
from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.chat_history import ChatHistory, ChatMessage
from control_backend.schemas.program import SemanticBelief
from control_backend.schemas.program import Goal, SemanticBelief
type JSONLike = None | bool | int | float | str | list["JSONLike"] | dict[str, "JSONLike"]
class BeliefState(BaseModel):
true: set[InternalBelief] = set()
false: set[InternalBelief] = set()
def difference(self, other: "BeliefState") -> "BeliefState":
return BeliefState(
true=self.true - other.true,
false=self.false - other.false,
)
def union(self, other: "BeliefState") -> "BeliefState":
return BeliefState(
true=self.true | other.true,
false=self.false | other.false,
)
def __sub__(self, other):
return self.difference(other)
def __or__(self, other):
return self.union(other)
def __bool__(self):
return bool(self.true) or bool(self.false)
class TextBeliefExtractorAgent(BaseAgent):
@@ -27,12 +55,14 @@ class TextBeliefExtractorAgent(BaseAgent):
the message itself.
"""
def __init__(self, name: str, temperature: float = settings.llm_settings.code_temperature):
def __init__(self, name: str):
super().__init__(name)
self.beliefs: dict[str, bool] = {}
self.available_beliefs: list[SemanticBelief] = []
self._llm = self.LLM(self, settings.llm_settings.n_parallel)
self.belief_inferrer = SemanticBeliefInferrer(self._llm)
self.goal_inferrer = GoalAchievementInferrer(self._llm)
self._current_beliefs = BeliefState()
self._current_goal_completions: dict[str, bool] = {}
self.conversation = ChatHistory(messages=[])
self.temperature = temperature
async def setup(self):
"""
@@ -53,8 +83,9 @@ class TextBeliefExtractorAgent(BaseAgent):
case settings.agent_settings.transcription_name:
self.logger.debug("Received text from transcriber: %s", msg.body)
self._apply_conversation_message(ChatMessage(role="user", content=msg.body))
await self._infer_new_beliefs()
await self._user_said(msg.body)
await self._infer_new_beliefs()
await self._infer_goal_completions()
case settings.agent_settings.llm_name:
self.logger.debug("Received text from LLM: %s", msg.body)
self._apply_conversation_message(ChatMessage(role="assistant", content=msg.body))
@@ -76,10 +107,29 @@ class TextBeliefExtractorAgent(BaseAgent):
def _handle_program_manager_message(self, msg: InternalMessage):
"""
Handle a message from the program manager: extract available beliefs from it.
Handle a message from the program manager: extract available beliefs and goals from it.
:param msg: The received message from the program manager.
"""
match msg.thread:
case "beliefs":
self._handle_beliefs_message(msg)
case "goals":
self._handle_goals_message(msg)
case "conversation_history":
if msg.body == "reset":
self._reset()
case _:
self.logger.warning("Received unexpected message from %s", msg.sender)
def _reset(self):
self.conversation = ChatHistory(messages=[])
self.belief_inferrer.available_beliefs.clear()
self._current_beliefs = BeliefState()
self.goal_inferrer.goals.clear()
self._current_goal_completions = {}
def _handle_beliefs_message(self, msg: InternalMessage):
try:
belief_list = BeliefList.model_validate_json(msg.body)
except ValidationError:
@@ -88,10 +138,28 @@ class TextBeliefExtractorAgent(BaseAgent):
)
return
self.available_beliefs = [b for b in belief_list.beliefs if isinstance(b, SemanticBelief)]
available_beliefs = [b for b in belief_list.beliefs if isinstance(b, SemanticBelief)]
self.belief_inferrer.available_beliefs = available_beliefs
self.logger.debug(
"Received %d beliefs from the program manager.",
len(self.available_beliefs),
"Received %d semantic beliefs from the program manager.",
len(available_beliefs),
)
def _handle_goals_message(self, msg: InternalMessage):
try:
goals_list = GoalList.model_validate_json(msg.body)
except ValidationError:
self.logger.warning(
"Received message from program manager but it is not a valid list of goals."
)
return
# Use only goals that can fail, as the others are always assumed to be completed
available_goals = [g for g in goals_list.goals if g.can_fail]
self.goal_inferrer.goals = available_goals
self.logger.debug(
"Received %d failable goals from the program manager.",
len(available_goals),
)
async def _user_said(self, text: str):
@@ -100,121 +168,210 @@ class TextBeliefExtractorAgent(BaseAgent):
:param text: User's transcribed text.
"""
belief = {"beliefs": {"user_said": [text]}, "type": "belief_extraction_text"}
payload = json.dumps(belief)
belief_msg = InternalMessage(
to=settings.agent_settings.bdi_belief_collector_name,
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=payload,
body=BeliefMessage(
replace=[InternalBelief(name="user_said", arguments=[text])],
).model_dump_json(),
thread="beliefs",
)
await self.send(belief_msg)
async def _infer_new_beliefs(self):
"""
Process conversation history to extract beliefs, semantically. Any changed beliefs are sent
to the BDI core.
"""
# Return instantly if there are no beliefs to infer
if not self.available_beliefs:
conversation_beliefs = await self.belief_inferrer.infer_from_conversation(self.conversation)
new_beliefs = conversation_beliefs - self._current_beliefs
if not new_beliefs:
return
candidate_beliefs = await self._infer_turn()
belief_changes = BeliefMessage()
for belief_key, belief_value in candidate_beliefs.items():
if belief_value is None:
continue
old_belief_value = self.beliefs.get(belief_key)
if belief_value == old_belief_value:
continue
self._current_beliefs |= new_beliefs
self.beliefs[belief_key] = belief_value
belief_changes = BeliefMessage(
create=list(new_beliefs.true),
delete=list(new_beliefs.false),
)
belief = InternalBelief(name=belief_key, arguments=None)
if belief_value:
belief_changes.create.append(belief)
else:
belief_changes.delete.append(belief)
# Return if there were no changes in beliefs
if not belief_changes.has_values():
return
beliefs_message = InternalMessage(
message = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=belief_changes.model_dump_json(),
thread="beliefs",
)
await self.send(beliefs_message)
await self.send(message)
@staticmethod
def _split_into_chunks[T](items: list[T], n: int) -> list[list[T]]:
k, m = divmod(len(items), n)
return [items[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n)]
async def _infer_goal_completions(self):
goal_completions = await self.goal_inferrer.infer_from_conversation(self.conversation)
async def _infer_turn(self) -> dict:
new_achieved = [
InternalBelief(name=goal, arguments=None)
for goal, achieved in goal_completions.items()
if achieved and self._current_goal_completions.get(goal) != achieved
]
new_not_achieved = [
InternalBelief(name=goal, arguments=None)
for goal, achieved in goal_completions.items()
if not achieved and self._current_goal_completions.get(goal) != achieved
]
for goal, achieved in goal_completions.items():
self._current_goal_completions[goal] = achieved
if not new_achieved and not new_not_achieved:
return
belief_changes = BeliefMessage(
create=new_achieved,
delete=new_not_achieved,
)
message = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=belief_changes.model_dump_json(),
thread="beliefs",
)
await self.send(message)
class LLM:
"""
Process the stored conversation history to extract semantic beliefs. Returns a list of
beliefs that have been set to ``True``, ``False`` or ``None``.
:return: A dict mapping belief names to a value ``True``, ``False`` or ``None``.
Class that handles sending structured generation requests to an LLM.
"""
def __init__(self, agent: "TextBeliefExtractorAgent", n_parallel: int):
self._agent = agent
self._semaphore = asyncio.Semaphore(n_parallel)
async def query(self, prompt: str, schema: dict, tries: int = 3) -> JSONLike | None:
"""
Query the LLM with the given prompt and schema, return an instance of a dict conforming
to this schema. Try ``tries`` times, or return None.
:param prompt: Prompt to be queried.
:param schema: Schema to be queried.
:param tries: Number of times to try to query the LLM.
:return: An instance of a dict conforming to this schema, or None if failed.
"""
try_count = 0
while try_count < tries:
try_count += 1
try:
return await self._query_llm(prompt, schema)
except (httpx.HTTPError, json.JSONDecodeError, KeyError) as e:
if try_count < tries:
continue
self._agent.logger.exception(
"Failed to get LLM response after %d tries.",
try_count,
exc_info=e,
)
return None
async def _query_llm(self, prompt: str, schema: dict) -> JSONLike:
"""
Query an LLM with the given prompt and schema, return an instance of a dict conforming
to that schema.
:param prompt: The prompt to be queried.
:param schema: Schema to use during response.
:return: A dict conforming to this schema.
:raises httpx.HTTPStatusError: If the LLM server responded with an error.
:raises json.JSONDecodeError: If the LLM response was not valid JSON. May happen if the
response was cut off early due to length limitations.
:raises KeyError: If the LLM server responded with no error, but the response was
invalid.
"""
async with self._semaphore:
async with httpx.AsyncClient() as client:
response = await client.post(
settings.llm_settings.local_llm_url,
json={
"model": settings.llm_settings.local_llm_model,
"messages": [{"role": "user", "content": prompt}],
"response_format": {
"type": "json_schema",
"json_schema": {
"name": "Beliefs",
"strict": True,
"schema": schema,
},
},
"reasoning_effort": "low",
"temperature": settings.llm_settings.code_temperature,
"stream": False,
},
timeout=30.0,
)
response.raise_for_status()
response_json = response.json()
json_message = response_json["choices"][0]["message"]["content"]
return json.loads(json_message)
class SemanticBeliefInferrer:
"""
Class that handles only prompting an LLM for semantic beliefs.
"""
def __init__(
self,
llm: "TextBeliefExtractorAgent.LLM",
available_beliefs: list[SemanticBelief] | None = None,
):
self._llm = llm
self.available_beliefs: list[SemanticBelief] = available_beliefs or []
async def infer_from_conversation(self, conversation: ChatHistory) -> BeliefState:
"""
Process conversation history to extract beliefs, semantically. The result is an object that
describes all beliefs that hold or don't hold based on the full conversation.
:param conversation: The conversation history to be processed.
:return: An object that describes beliefs.
"""
# Return instantly if there are no beliefs to infer
if not self.available_beliefs:
return BeliefState()
n_parallel = max(1, min(settings.llm_settings.n_parallel - 1, len(self.available_beliefs)))
all_beliefs = await asyncio.gather(
all_beliefs: list[dict[str, bool | None] | None] = await asyncio.gather(
*[
self._infer_beliefs(self.conversation, beliefs)
self._infer_beliefs(conversation, beliefs)
for beliefs in self._split_into_chunks(self.available_beliefs, n_parallel)
]
)
retval = {}
retval = BeliefState()
for beliefs in all_beliefs:
if beliefs is None:
continue
retval.update(beliefs)
for belief_name, belief_holds in beliefs.items():
if belief_holds is None:
continue
belief = InternalBelief(name=belief_name, arguments=None)
if belief_holds:
retval.true.add(belief)
else:
retval.false.add(belief)
return retval
@staticmethod
def _create_belief_schema(belief: SemanticBelief) -> tuple[str, dict]:
return AgentSpeakGenerator.slugify(belief), {
"type": ["boolean", "null"],
"description": belief.description,
}
def _split_into_chunks[T](items: list[T], n: int) -> list[list[T]]:
"""
Split a list into ``n`` chunks, making each chunk approximately ``len(items) / n`` long.
@staticmethod
def _create_beliefs_schema(beliefs: list[SemanticBelief]) -> dict:
belief_schemas = [
TextBeliefExtractorAgent._create_belief_schema(belief) for belief in beliefs
]
return {
"type": "object",
"properties": dict(belief_schemas),
"required": [name for name, _ in belief_schemas],
}
@staticmethod
def _format_message(message: ChatMessage):
return f"{message.role.upper()}:\n{message.content}"
@staticmethod
def _format_conversation(conversation: ChatHistory):
return "\n\n".join(
[TextBeliefExtractorAgent._format_message(message) for message in conversation.messages]
)
@staticmethod
def _format_beliefs(beliefs: list[SemanticBelief]):
return "\n".join(
[f"- {AgentSpeakGenerator.slugify(belief)}: {belief.description}" for belief in beliefs]
)
:param items: The list of items to split.
:param n: The number of desired chunks.
:return: A list of chunks each approximately ``len(items) / n`` long.
"""
k, m = divmod(len(items), n)
return [items[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n)]
async def _infer_beliefs(
self,
conversation: ChatHistory,
beliefs: list[SemanticBelief],
) -> dict | None:
) -> dict[str, bool | None] | None:
"""
Infer given beliefs based on the given conversation.
:param conversation: The conversation to infer beliefs from.
@@ -241,70 +398,79 @@ Respond with a JSON similar to the following, but with the property names as giv
schema = self._create_beliefs_schema(beliefs)
return await self._retry_query_llm(prompt, schema)
return await self._llm.query(prompt, schema)
async def _retry_query_llm(self, prompt: str, schema: dict, tries: int = 3) -> dict | None:
@staticmethod
def _create_belief_schema(belief: SemanticBelief) -> tuple[str, dict]:
return AgentSpeakGenerator.slugify(belief), {
"type": ["boolean", "null"],
"description": belief.description,
}
@staticmethod
def _create_beliefs_schema(beliefs: list[SemanticBelief]) -> dict:
belief_schemas = [
SemanticBeliefInferrer._create_belief_schema(belief) for belief in beliefs
]
return {
"type": "object",
"properties": dict(belief_schemas),
"required": [name for name, _ in belief_schemas],
}
@staticmethod
def _format_message(message: ChatMessage):
return f"{message.role.upper()}:\n{message.content}"
@staticmethod
def _format_conversation(conversation: ChatHistory):
return "\n\n".join(
[SemanticBeliefInferrer._format_message(message) for message in conversation.messages]
)
@staticmethod
def _format_beliefs(beliefs: list[SemanticBelief]):
return "\n".join(
[f"- {AgentSpeakGenerator.slugify(belief)}: {belief.description}" for belief in beliefs]
)
class GoalAchievementInferrer(SemanticBeliefInferrer):
def __init__(self, llm: TextBeliefExtractorAgent.LLM):
super().__init__(llm)
self.goals = []
async def infer_from_conversation(self, conversation: ChatHistory) -> dict[str, bool]:
"""
Query the LLM with the given prompt and schema, return an instance of a dict conforming
to this schema. Try ``tries`` times, or return None.
Determine which goals have been achieved based on the given conversation.
:param prompt: Prompt to be queried.
:param schema: Schema to be queried.
:return: An instance of a dict conforming to this schema, or None if failed.
:param conversation: The conversation to infer goal completion from.
:return: A mapping of goals and a boolean whether they have been achieved.
"""
try_count = 0
while try_count < tries:
try_count += 1
if not self.goals:
return {}
try:
return await self._query_llm(prompt, schema)
except (httpx.HTTPError, json.JSONDecodeError, KeyError) as e:
if try_count < tries:
continue
self.logger.exception(
"Failed to get LLM response after %d tries.",
try_count,
exc_info=e,
)
goals_achieved = await asyncio.gather(
*[self._infer_goal(conversation, g) for g in self.goals]
)
return {
f"achieved_{AgentSpeakGenerator.slugify(goal)}": achieved
for goal, achieved in zip(self.goals, goals_achieved, strict=True)
}
return None
async def _infer_goal(self, conversation: ChatHistory, goal: Goal) -> bool:
prompt = f"""{self._format_conversation(conversation)}
async def _query_llm(self, prompt: str, schema: dict) -> dict:
"""
Query an LLM with the given prompt and schema, return an instance of a dict conforming to
that schema.
Given the above conversation, what has the following goal been achieved?
:param prompt: The prompt to be queried.
:param schema: Schema to use during response.
:return: A dict conforming to this schema.
:raises httpx.HTTPStatusError: If the LLM server responded with an error.
:raises json.JSONDecodeError: If the LLM response was not valid JSON. May happen if the
response was cut off early due to length limitations.
:raises KeyError: If the LLM server responded with no error, but the response was invalid.
"""
async with httpx.AsyncClient() as client:
response = await client.post(
settings.llm_settings.local_llm_url,
json={
"model": settings.llm_settings.local_llm_model,
"messages": [{"role": "user", "content": prompt}],
"response_format": {
"type": "json_schema",
"json_schema": {
"name": "Beliefs",
"strict": True,
"schema": schema,
},
},
"reasoning_effort": "low",
"temperature": self.temperature,
"stream": False,
},
timeout=None,
)
response.raise_for_status()
The name of the goal: {goal.name}
Description of the goal: {goal.description}
response_json = response.json()
json_message = response_json["choices"][0]["message"]["content"]
beliefs = json.loads(json_message)
return beliefs
Answer with literally only `true` or `false` (without backticks)."""
schema = {
"type": "boolean",
}
return await self._llm.query(prompt, schema)

View File

@@ -8,7 +8,6 @@ from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
from control_backend.core.config import settings
from control_backend.schemas.internal_message import InternalMessage
from ..actuation.robot_speech_agent import RobotSpeechAgent
from ..perception import VADAgent
@@ -48,8 +47,6 @@ class RICommunicationAgent(BaseAgent):
self._req_socket: azmq.Socket | None = None
self.pub_socket: azmq.Socket | None = None
self.connected = False
self.gesture_agent: RobotGestureAgent | None = None
self.speech_agent: RobotSpeechAgent | None = None
async def setup(self):
"""
@@ -143,7 +140,6 @@ class RICommunicationAgent(BaseAgent):
# At this point, we have a valid response
try:
self.logger.debug("Negotiation successful. Handling rn")
await self._handle_negotiation_response(received_message)
# Let UI know that we're connected
topic = b"ping"
@@ -192,7 +188,6 @@ class RICommunicationAgent(BaseAgent):
address=addr,
bind=bind,
)
self.speech_agent = robot_speech_agent
robot_gesture_agent = RobotGestureAgent(
settings.agent_settings.robot_gesture_name,
address=addr,
@@ -200,7 +195,6 @@ class RICommunicationAgent(BaseAgent):
gesture_data=gesture_data,
single_gesture_data=single_gesture_data,
)
self.gesture_agent = robot_gesture_agent
await robot_speech_agent.start()
await asyncio.sleep(0.1) # Small delay
await robot_gesture_agent.start()
@@ -231,7 +225,6 @@ class RICommunicationAgent(BaseAgent):
while self._running:
if not self.connected:
await asyncio.sleep(settings.behaviour_settings.sleep_s)
self.logger.debug("Not connected, skipping ping loop iteration.")
continue
# We need to listen and send pings.
@@ -295,36 +288,13 @@ class RICommunicationAgent(BaseAgent):
# Tell UI we're disconnected.
topic = b"ping"
data = json.dumps(False).encode()
self.logger.debug("1")
if self.pub_socket:
try:
self.logger.debug("2")
await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5)
except TimeoutError:
self.logger.debug("3")
self.logger.warning("Connection ping for router timed out.")
# Try to reboot/renegotiate
if self.gesture_agent is not None:
await self.gesture_agent.stop()
if self.speech_agent is not None:
await self.speech_agent.stop()
if self.pub_socket is not None:
self.pub_socket.close()
self.logger.debug("Restarting communication negotiation.")
if await self._negotiate_connection(max_retries=2):
if await self._negotiate_connection(max_retries=1):
self.connected = True
async def handle_message(self, msg: InternalMessage):
"""
Handle an incoming message.
Currently not implemented for this agent.
:param msg: The received message.
:raises NotImplementedError: Always, since this method is not implemented.
"""
self.logger.warning("custom warning for handle msg in ri coms %s", self.name)

View File

@@ -7,7 +7,6 @@ import zmq.asyncio as azmq
from control_backend.agents import BaseAgent
from control_backend.core.config import settings
from control_backend.schemas.internal_message import InternalMessage
from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus
from .transcription_agent.transcription_agent import TranscriptionAgent
@@ -87,12 +86,6 @@ class VADAgent(BaseAgent):
self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
self._ready = asyncio.Event()
# Pause control
self._reset_needed = False
self._paused = asyncio.Event()
self._paused.set() # Not paused at start
self.model = None
async def setup(self):
@@ -220,16 +213,6 @@ class VADAgent(BaseAgent):
"""
await self._ready.wait()
while self._running:
await self._paused.wait()
# After being unpaused, reset stream and buffers
if self._reset_needed:
self.logger.debug("Resuming: resetting stream and buffers.")
await self._reset_stream()
self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
self._reset_needed = False
assert self.audio_in_poller is not None
data = await self.audio_in_poller.poll()
if data is None:
@@ -271,27 +254,3 @@ class VADAgent(BaseAgent):
# At this point, we know that the speech has ended.
# Prepend the last chunk that had no speech, for a more fluent boundary
self.audio_buffer = chunk
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming messages.
Expects messages to pause or resume the VAD processing from User Interrupt Agent.
:param msg: The received internal message.
"""
sender = msg.sender
if sender == settings.agent_settings.user_interrupt_name:
if msg.body == "PAUSE":
self.logger.info("Pausing VAD processing.")
self._paused.clear()
# If the robot needs to pick up speaking where it left off, do not set _reset_needed
self._reset_needed = True
elif msg.body == "RESUME":
self.logger.info("Resuming VAD processing.")
self._paused.set()
else:
self.logger.warning(f"Unknown command from User Interrupt Agent: {msg.body}")
else:
self.logger.debug(f"Ignoring message from unknown sender: {sender}")

View File

@@ -6,12 +6,7 @@ from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.ri_message import (
GestureCommand,
PauseCommand,
RIEndpoint,
SpeechCommand,
)
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand
class UserInterruptAgent(BaseAgent):
@@ -76,19 +71,6 @@ class UserInterruptAgent(BaseAgent):
"Forwarded button press (override) with context '%s' to BDIProgramManager.",
event_context,
)
elif event_type == "pause":
self.logger.debug(
"Received pause/resume button press with context '%s'.", event_context
)
await self._send_pause_command(event_context)
if event_context:
self.logger.info("Sent pause command.")
else:
self.logger.info("Sent resume command.")
elif event_type in ["next_phase", "reset_phase", "reset_experiment"]:
await self._send_experiment_control_to_bdi_core(event_type)
else:
self.logger.warning(
"Received button press with unknown type '%s' (context: '%s').",
@@ -96,36 +78,6 @@ class UserInterruptAgent(BaseAgent):
event_context,
)
async def _send_experiment_control_to_bdi_core(self, type):
"""
method to send experiment control buttons to bdi core.
:param type: the type of control button we should send to the bdi core.
"""
# Switch which thread we should send to bdi core
thread = ""
match type:
case "next_phase":
thread = "force_next_phase"
case "reset_phase":
thread = "reset_current_phase"
case "reset_experiment":
thread = "reset_experiment"
case _:
self.logger.warning(
"Received unknown experiment control type '%s' to send to BDI Core.",
type,
)
out_msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
thread=thread,
body="",
)
self.logger.debug("Sending experiment control '%s' to BDI Core.", thread)
await self.send(out_msg)
async def _send_to_speech_agent(self, text_to_say: str):
"""
method to send prioritized speech command to RobotSpeechAgent.
@@ -178,38 +130,6 @@ class UserInterruptAgent(BaseAgent):
belief_id,
)
async def _send_pause_command(self, pause):
"""
Send a pause command to the Robot Interface via the RI Communication Agent.
Send a pause command to the other internal agents; for now just VAD agent.
"""
cmd = PauseCommand(data=pause)
message = InternalMessage(
to=settings.agent_settings.ri_communication_name,
sender=self.name,
body=cmd.model_dump_json(),
)
await self.send(message)
if pause == "true":
# Send pause to VAD agent
vad_message = InternalMessage(
to=settings.agent_settings.vad_name,
sender=self.name,
body="PAUSE",
)
await self.send(vad_message)
self.logger.info("Sent pause command to VAD Agent and RI Communication Agent.")
else:
# Send resume to VAD agent
vad_message = InternalMessage(
to=settings.agent_settings.vad_name,
sender=self.name,
body="RESUME",
)
await self.send(vad_message)
self.logger.info("Sent resume command to VAD Agent and RI Communication Agent.")
async def setup(self):
"""
Initialize the agent.

View File

@@ -192,7 +192,16 @@ class BaseAgent(ABC):
:param coro: The coroutine to execute as a task.
"""
task = asyncio.create_task(coro)
async def try_coro(coro_: Coroutine):
try:
await coro_
except asyncio.CancelledError:
self.logger.debug("A behavior was canceled successfully: %s", coro_)
except Exception:
self.logger.warning("An exception occurred in a behavior.", exc_info=True)
task = asyncio.create_task(try_coro(coro))
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
return task

View File

@@ -1,6 +1,7 @@
from pydantic import BaseModel
from control_backend.schemas.program import Belief as ProgramBelief
from control_backend.schemas.program import Goal
class BeliefList(BaseModel):
@@ -12,3 +13,7 @@ class BeliefList(BaseModel):
"""
beliefs: list[ProgramBelief]
class GoalList(BaseModel):
goals: list[Goal]

View File

@@ -13,6 +13,9 @@ class Belief(BaseModel):
name: str
arguments: list[str] | None
# To make it hashable
model_config = {"frozen": True}
class BeliefMessage(BaseModel):
"""

View File

@@ -117,7 +117,7 @@ class Goal(ProgramElement):
:ivar can_fail: Whether we can fail to achieve the goal after executing the plan.
"""
description: str
description: str = ""
plan: Plan
can_fail: bool = True

View File

@@ -14,7 +14,6 @@ class RIEndpoint(str, Enum):
GESTURE_TAG = "actuate/gesture/tag"
PING = "ping"
NEGOTIATE_PORTS = "negotiate/ports"
PAUSE = ""
class RIMessage(BaseModel):
@@ -65,15 +64,3 @@ class GestureCommand(RIMessage):
if self.endpoint not in allowed:
raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG")
return self
class PauseCommand(RIMessage):
"""
A specific command to pause or unpause the robot's actions.
:ivar endpoint: Fixed to ``RIEndpoint.PAUSE``.
:ivar data: A boolean indicating whether to pause (True) or unpause (False).
"""
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.PAUSE)
data: bool