Compare commits

...

35 Commits

Author SHA1 Message Date
Twirre Meulenbelt
4b71981a3e fix: some bugs and some tests
ref: N25B-429
2026-01-12 09:00:50 +01:00
866d7c4958 fix: end phase loop correctly notifies about user_said
ref: N25B-429
2026-01-08 15:13:12 +01:00
133019a928 feat: trigger name and trigger checks on belief update
ref: N25B-429
2026-01-08 14:04:44 +01:00
4d0ba69443 fix: don't re-add user_said upon phase transition
ref: N25B-429
2026-01-08 13:44:25 +01:00
625ef0c365 feat: phase transition waits for all goals
ref: N25B-429
2026-01-08 13:36:03 +01:00
b88758fa76 feat: phase transition independent of response
ref: N25B-429
2026-01-08 13:33:37 +01:00
Twirre Meulenbelt
45719c580b feat: prepend more silence before speech audio for better transcription beginnings
ref: N25B-429
2026-01-08 10:49:13 +01:00
5a61225c6f feat: reset extractor history
ref: N25B-429
2026-01-07 18:10:13 +01:00
a30cea5231 Merge branch 'feat/semantic-beliefs' into feat/extra-agentspeak-functionality 2026-01-07 17:51:30 +01:00
Twirre Meulenbelt
93d67ccb66 feat: add reset functionality to semantic belief extractor
ref: N25B-432
2026-01-07 17:50:47 +01:00
240624f887 Merge branch 'dev' into feat/extra-agentspeak-functionality
# Conflicts:
#	src/control_backend/agents/bdi/bdi_program_manager.py
#	src/control_backend/agents/llm/llm_agent.py
#	test/unit/agents/bdi/test_bdi_program_manager.py
2026-01-07 17:46:48 +01:00
8a77e8e1c7 feat: check goals only for this phase
Since conversation history still remains we can still check at a later point.

ref: N25B-429
2026-01-07 17:31:24 +01:00
3b4dccc760 Merge branch 'feat/semantic-beliefs' into feat/extra-agentspeak-functionality
# Conflicts:
#	src/control_backend/agents/bdi/bdi_program_manager.py
2026-01-07 17:20:52 +01:00
3d49e44cf7 fix: complete pipeline working
User interrupts still need to be tested.

ref: N25B-429
2026-01-07 17:13:58 +01:00
Twirre Meulenbelt
aa5b386f65 feat: semantically determine goal completion
ref: N25B-432
2026-01-07 17:08:23 +01:00
Twirre Meulenbelt
3189b9fee3 fix: let belief extractor send user_said belief
ref: N25B-429
2026-01-07 15:19:23 +01:00
07d70cb781 fix: single dispatch order
ref: N25B-429
2026-01-07 13:02:23 +01:00
af832980c8 feat: general slugify method
ref: N25B-429
2026-01-07 12:24:46 +01:00
Twirre Meulenbelt
cabe35cdbd feat: integrate AgentSpeak with semantic belief extraction
ref: N25B-429
2026-01-07 11:44:48 +01:00
Twirre Meulenbelt
de8e829d3e Merge remote-tracking branch 'origin/feat/agentspeak-generation' into feat/semantic-beliefs
# Conflicts:
#	test/unit/agents/bdi/test_bdi_program_manager.py
2026-01-06 15:30:59 +01:00
Björn Otgaar
612a96940d Merge branch 'feat/environment-variables' into 'dev'
Docs for environment variables, parameterize some constants

See merge request ics/sp/2025/n25b/pepperplus-cb!38
2026-01-06 09:02:49 +00:00
Pim Hutting
4c20656c75 Merge branch 'feat/program-reset-llm' into 'dev'
feat: made program reset LLM

See merge request ics/sp/2025/n25b/pepperplus-cb!39
2026-01-02 15:13:05 +00:00
Pim Hutting
6ca86e4b81 feat: made program reset LLM 2026-01-02 15:13:04 +00:00
Twirre Meulenbelt
42ee5c76d8 test: create tests for belief extractor agent
Includes changes in schemas. Change type of `norms` in `Program` imperceptibly, big changes in schema of `BeliefMessage` to support deleting beliefs.

ref: N25B-380
2025-12-29 17:12:02 +01:00
Twirre Meulenbelt
7d798f2e77 Merge remote-tracking branch 'origin/dev' into feat/environment-variables
# Conflicts:
#	src/control_backend/core/config.py
#	test/unit/agents/actuation/test_robot_speech_agent.py
2025-12-29 12:40:16 +01:00
Twirre Meulenbelt
5282c2471f Merge remote-tracking branch 'origin/dev' into feat/environment-variables
# Conflicts:
#	src/control_backend/core/config.py
#	test/unit/agents/actuation/test_robot_speech_agent.py
2025-12-29 12:35:39 +01:00
Twirre Meulenbelt
57b1276cb5 test: make tests work again after changing Program schema
ref: N25B-380
2025-12-29 12:31:51 +01:00
Twirre Meulenbelt
7e0dc9ce1c Merge remote-tracking branch 'origin/feat/agentspeak-generation' into feat/semantic-beliefs
# Conflicts:
#	src/control_backend/schemas/program.py
2025-12-23 17:36:39 +01:00
Twirre Meulenbelt
71cefdfef3 fix: add types to all config properties
ref: N25B-380
2025-12-23 17:14:49 +01:00
Twirre Meulenbelt
33501093a1 feat: extract semantic beliefs from conversation
ref: N25B-380
2025-12-23 17:09:58 +01:00
Luijkx,S.O.H. (Storm)
adbb7ffd5c Merge branch 'feat/user-interrupt-agent' into 'dev'
create UserInterruptAgent with connection to UI

See merge request ics/sp/2025/n25b/pepperplus-cb!40
2025-12-22 13:56:03 +00:00
Pim Hutting
0501a9fba3 create UserInterruptAgent with connection to UI 2025-12-22 13:56:02 +00:00
Twirre Meulenbelt
0c682d6440 feat: introduce .env.example, docs
The example includes options that are expected to be changed. It also includes a reference to where in the docs you can find a full list of options.

ref: N25B-352
2025-12-11 13:35:19 +01:00
Twirre Meulenbelt
32d8f20dc9 feat: parameterize RI host
Was "localhost" in RI Communication Agent, now uses configurable setting. Secretly also removing "localhost" from VAD agent, as its socket should be something that's "inproc".

ref: N25B-352
2025-12-11 12:12:15 +01:00
Twirre Meulenbelt
9cc0e39955 fix: failures main tests since VAD agent initialization was changed
The test still expects the VAD agent to be started in main, rather than in the RI Communication Agent.

ref: N25B-356
2025-12-11 12:04:24 +01:00
41 changed files with 2153 additions and 311 deletions

20
.env.example Normal file
View File

@@ -0,0 +1,20 @@
# Example .env file. To use, make a copy, call it ".env" (i.e. removing the ".example" suffix), then you edit values.
# The hostname of the Robot Interface. Change if the Control Backend and Robot Interface are running on different computers.
RI_HOST="localhost"
# URL for the local LLM API. Must be an API that implements the OpenAI Chat Completions API, but most do.
LLM_SETTINGS__LOCAL_LLM_URL="http://localhost:1234/v1/chat/completions"
# Name of the local LLM model to use.
LLM_SETTINGS__LOCAL_LLM_MODEL="gpt-oss"
# Number of non-speech chunks to wait before speech ended. A chunk is approximately 31 ms. Increasing this number allows longer pauses in speech, but also increases response time.
BEHAVIOUR_SETTINGS__VAD_NON_SPEECH_PATIENCE_CHUNKS=15
# Timeout in milliseconds for socket polling. Increase this number if network latency/jitter is high, often the case when using Wi-Fi. Perhaps 500 ms. A symptom of this issue is transcriptions getting cut off.
BEHAVIOUR_SETTINGS__SOCKET_POLLER_TIMEOUT_MS=100
# For an exhaustive list of options, see the control_backend.core.config module in the docs.

View File

@@ -0,0 +1,9 @@
%{first_multiline_commit_description}
To verify:
- [ ] Style checks pass
- [ ] Pipeline (tests) pass
- [ ] Documentation is up to date
- [ ] Tests are up to date (new code is covered)
- [ ] ...

View File

@@ -27,6 +27,7 @@ This + part might differ based on what model you choose.
copy the model name in the module loaded and replace local_llm_modelL. In settings.
## Running
To run the project (development server), execute the following command (while inside the root repository):
@@ -34,6 +35,14 @@ To run the project (development server), execute the following command (while in
uv run fastapi dev src/control_backend/main.py
```
### Environment Variables
You can use environment variables to change settings. Make a copy of the [`.env.example`](.env.example) file, name it `.env` and put it in the root directory. The file itself describes how to do the configuration.
For an exhaustive list of environment options, see the `control_backend.core.config` module in the docs.
## Testing
Testing happens automatically when opening a merge request to any branch. If you want to manually run the test suite, you can do so by running the following for unit tests:

View File

@@ -28,15 +28,18 @@ class RobotGestureAgent(BaseAgent):
address = ""
bind = False
gesture_data = []
single_gesture_data = []
def __init__(
self,
name: str,
address=settings.zmq_settings.ri_command_address,
address: str,
bind=False,
gesture_data=None,
single_gesture_data=None,
):
self.gesture_data = gesture_data or []
self.single_gesture_data = single_gesture_data or []
super().__init__(name)
self.address = address
self.bind = bind
@@ -99,7 +102,13 @@ class RobotGestureAgent(BaseAgent):
gesture_command.data,
)
return
elif gesture_command.endpoint == RIEndpoint.GESTURE_SINGLE:
if gesture_command.data not in self.single_gesture_data:
self.logger.warning(
"Received gesture '%s' which is not in available gestures. Early returning",
gesture_command.data,
)
return
await self.pubsocket.send_json(gesture_command.model_dump())
except Exception:
self.logger.exception("Error processing internal message.")

View File

@@ -145,7 +145,10 @@ class AgentSpeakGenerator:
type=TriggerType.ADDED_BELIEF,
trigger_literal=AstLiteral("user_said", [AstVar("Message")]),
context=[AstLiteral("phase", [AstString("end")])],
body=[AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("reply"))],
body=[
AstStatement(StatementType.DO_ACTION, AstLiteral("notify_user_said")),
AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("reply")),
],
)
)
@@ -157,7 +160,7 @@ class AgentSpeakGenerator:
previous_goal = None
for goal in phase.goals:
self._process_goal(goal, phase, previous_goal)
self._process_goal(goal, phase, previous_goal, main_goal=True)
previous_goal = goal
for trigger in phase.triggers:
@@ -171,26 +174,41 @@ class AgentSpeakGenerator:
self._astify(to_phase) if to_phase else AstLiteral("phase", [AstString("end")])
)
context = [from_phase_ast, ~AstLiteral("responded_this_turn")]
if from_phase and from_phase.goals:
context.append(self._astify(from_phase.goals[-1], achieved=True))
context = [from_phase_ast]
if from_phase:
for goal in from_phase.goals:
context.append(self._astify(goal, achieved=True))
body = [
AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast),
AstStatement(StatementType.ADD_BELIEF, to_phase_ast),
]
if from_phase:
body.extend(
[
AstStatement(
StatementType.TEST_GOAL, AstLiteral("user_said", [AstVar("Message")])
),
AstStatement(
StatementType.REPLACE_BELIEF, AstLiteral("user_said", [AstVar("Message")])
),
]
# if from_phase:
# body.extend(
# [
# AstStatement(
# StatementType.TEST_GOAL, AstLiteral("user_said", [AstVar("Message")])
# ),
# AstStatement(
# StatementType.REPLACE_BELIEF, AstLiteral("user_said", [AstVar("Message")])
# ),
# ]
# )
# Notify outside world about transition
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"notify_transition_phase",
[
AstString(str(from_phase.id)),
AstString(str(to_phase.id) if to_phase else "end"),
],
),
)
)
self._asp.plans.append(
AstPlan(TriggerType.ADDED_GOAL, AstLiteral("transition_phase"), context, body)
@@ -213,6 +231,11 @@ class AgentSpeakGenerator:
def _add_default_loop(self, phase: Phase) -> None:
actions = []
actions.append(
AstStatement(
StatementType.DO_ACTION, AstLiteral("notify_user_said", [AstVar("Message")])
)
)
actions.append(AstStatement(StatementType.REMOVE_BELIEF, AstLiteral("responded_this_turn")))
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("check_triggers")))
@@ -236,6 +259,7 @@ class AgentSpeakGenerator:
phase: Phase,
previous_goal: Goal | None = None,
continues_response: bool = False,
main_goal: bool = False,
) -> None:
context: list[AstExpression] = [self._astify(phase)]
context.append(~self._astify(goal, achieved=True))
@@ -245,6 +269,13 @@ class AgentSpeakGenerator:
context.append(~AstLiteral("responded_this_turn"))
body = []
if main_goal: # UI only needs to know about the main goals
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral("notify_goal_start", [AstString(self.slugify(goal))]),
)
)
subgoals = []
for step in goal.plan.steps:
@@ -283,11 +314,23 @@ class AgentSpeakGenerator:
body = []
subgoals = []
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral("notify_trigger_start", [AstString(self.slugify(trigger))]),
)
)
for step in trigger.plan.steps:
body.append(self._step_to_statement(step))
if isinstance(step, Goal):
step.can_fail = False # triggers are continuous sequence
subgoals.append(step)
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral("notify_trigger_end", [AstString(self.slugify(trigger))]),
)
)
self._asp.plans.append(
AstPlan(
@@ -298,6 +341,9 @@ class AgentSpeakGenerator:
)
)
# Force trigger (from UI)
self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(trigger), [], body))
for subgoal in subgoals:
self._process_goal(subgoal, phase, continues_response=True)
@@ -332,7 +378,7 @@ class AgentSpeakGenerator:
@_astify.register
def _(self, sb: SemanticBelief) -> AstExpression:
return AstLiteral(f"semantic_{self._slugify_str(sb.description)}")
return AstLiteral(self.slugify(sb))
@_astify.register
def _(self, ib: InferredBelief) -> AstExpression:
@@ -355,6 +401,10 @@ class AgentSpeakGenerator:
def _(self, goal: Goal, achieved: bool = False) -> AstExpression:
return AstLiteral(f"{'achieved_' if achieved else ''}{self._slugify_str(goal.name)}")
@_astify.register
def _(self, trigger: Trigger) -> AstExpression:
return AstLiteral(self.slugify(trigger))
@_astify.register
def _(self, sa: SpeechAction) -> AstExpression:
return AstLiteral("say", [AstString(sa.text)])
@@ -368,6 +418,26 @@ class AgentSpeakGenerator:
def _(self, la: LLMAction) -> AstExpression:
return AstLiteral("reply_with_goal", [AstString(la.goal)])
@singledispatchmethod
@staticmethod
def slugify(element: ProgramElement) -> str:
raise NotImplementedError(f"Cannot convert element {element} to a slug.")
@slugify.register
@staticmethod
def _(sb: SemanticBelief) -> str:
return f"semantic_{AgentSpeakGenerator._slugify_str(sb.name)}"
@slugify.register
@staticmethod
def _(g: Goal) -> str:
return AgentSpeakGenerator._slugify_str(g.name)
@slugify.register
@staticmethod
def _(t: Trigger):
return f"trigger_{AgentSpeakGenerator._slugify_str(t.name)}"
@staticmethod
def _slugify_str(text: str) -> str:
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])

View File

@@ -1,5 +1,6 @@
import asyncio
import copy
import json
import time
from collections.abc import Iterable
@@ -11,9 +12,9 @@ from pydantic import ValidationError
from control_backend.agents.base import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief, BeliefMessage
from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.llm_prompt_message import LLMPromptMessage
from control_backend.schemas.ri_message import SpeechCommand
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand
DELIMITER = ";\n" # TODO: temporary until we support lists in AgentSpeak
@@ -100,7 +101,6 @@ class BDICoreAgent(BaseAgent):
maybe_more_work = True
while maybe_more_work:
maybe_more_work = False
self.logger.debug("Stepping BDI.")
if self.bdi_agent.step():
maybe_more_work = True
@@ -128,8 +128,8 @@ class BDICoreAgent(BaseAgent):
if msg.thread == "beliefs":
try:
beliefs = BeliefMessage.model_validate_json(msg.body).beliefs
self._apply_beliefs(beliefs)
belief_changes = BeliefMessage.model_validate_json(msg.body)
self._apply_belief_changes(belief_changes)
except ValidationError:
self.logger.exception("Error processing belief.")
return
@@ -155,22 +155,40 @@ class BDICoreAgent(BaseAgent):
body=cmd.model_dump_json(),
)
await self.send(out_msg)
case settings.agent_settings.user_interrupt_name:
content = msg.body
self.logger.debug("Received user interruption: %s", content)
def _apply_beliefs(self, beliefs: list[Belief]):
match msg.thread:
case "force_phase_transition":
self._set_goal("transition_phase")
case "force_trigger":
self._force_trigger(msg.body)
case _:
self.logger.warning("Received unknow user interruption: %s", msg)
def _apply_belief_changes(self, belief_changes: BeliefMessage):
"""
Update the belief base with a list of new beliefs.
If ``replace=True`` is set on a belief, it removes all existing beliefs with that name
before adding the new one.
For beliefs in ``belief_changes.replace``, it removes all existing beliefs with that name
before adding one new one.
:param belief_changes: The changes in beliefs to apply.
"""
if not beliefs:
if not belief_changes.create and not belief_changes.replace and not belief_changes.delete:
return
for belief in beliefs:
if belief.replace:
self._remove_all_with_name(belief.name)
for belief in belief_changes.create:
self._add_belief(belief.name, belief.arguments)
for belief in belief_changes.replace:
self._remove_all_with_name(belief.name)
self._add_belief(belief.name, belief.arguments)
for belief in belief_changes.delete:
self._remove_belief(belief.name, belief.arguments)
def _add_belief(self, name: str, args: list[str] = None):
"""
Add a single belief to the BDI agent.
@@ -194,16 +212,35 @@ class BDICoreAgent(BaseAgent):
agentspeak.runtime.Intention(),
)
# Check for transitions
self.bdi_agent.call(
agentspeak.Trigger.addition,
agentspeak.GoalType.achievement,
agentspeak.Literal("transition_phase"),
agentspeak.runtime.Intention(),
)
# Check triggers
self.bdi_agent.call(
agentspeak.Trigger.addition,
agentspeak.GoalType.achievement,
agentspeak.Literal("check_triggers"),
agentspeak.runtime.Intention(),
)
self._wake_bdi_loop.set()
self.logger.debug(f"Added belief {self.format_belief_string(name, args)}")
def _remove_belief(self, name: str, args: Iterable[str]):
def _remove_belief(self, name: str, args: Iterable[str] | None):
"""
Removes a specific belief (with arguments), if it exists.
"""
new_args = (agentspeak.Literal(arg) for arg in args)
term = agentspeak.Literal(name, new_args)
if args is None:
term = agentspeak.Literal(name)
else:
new_args = (agentspeak.Literal(arg) for arg in args)
term = agentspeak.Literal(name, new_args)
result = self.bdi_agent.call(
agentspeak.Trigger.removal,
@@ -243,6 +280,37 @@ class BDICoreAgent(BaseAgent):
self.logger.debug(f"Removed {removed_count} beliefs.")
def _set_goal(self, name: str, args: Iterable[str] | None = None):
args = args or []
if args:
merged_args = DELIMITER.join(arg for arg in args)
new_args = (agentspeak.Literal(merged_args),)
term = agentspeak.Literal(name, new_args)
else:
term = agentspeak.Literal(name)
self.bdi_agent.call(
agentspeak.Trigger.addition,
agentspeak.GoalType.achievement,
term,
agentspeak.runtime.Intention(),
)
self._wake_bdi_loop.set()
self.logger.debug(f"Set goal !{self.format_belief_string(name, args)}.")
def _force_trigger(self, name: str):
self.bdi_agent.call(
agentspeak.Trigger.addition,
agentspeak.GoalType.achievement,
agentspeak.Literal(name),
agentspeak.runtime.Intention(),
)
self.logger.info("Manually forced trigger %s.", name)
def _add_custom_actions(self) -> None:
"""
Add any custom actions here. Inside `@self.actions.add()`, the first argument is
@@ -251,7 +319,7 @@ class BDICoreAgent(BaseAgent):
"""
@self.actions.add(".reply", 2)
def _reply(agent: "BDICoreAgent", term, intention):
def _reply(agent, term, intention):
"""
Let the LLM generate a response to a user's utterance with the current norms and goals.
"""
@@ -284,7 +352,7 @@ class BDICoreAgent(BaseAgent):
yield
@self.actions.add(".say", 1)
def _say(agent: "BDICoreAgent", term, intention):
def _say(agent, term, intention):
"""
Make the robot say the given text instantly.
"""
@@ -298,12 +366,21 @@ class BDICoreAgent(BaseAgent):
sender=settings.agent_settings.bdi_core_name,
body=speech_command.model_dump_json(),
)
# TODO: add to conversation history
self.add_behavior(self.send(speech_message))
chat_history_message = InternalMessage(
to=settings.agent_settings.llm_name,
thread="assistant_message",
body=str(message_text),
)
self.add_behavior(self.send(chat_history_message))
yield
@self.actions.add(".gesture", 2)
def _gesture(agent: "BDICoreAgent", term, intention):
def _gesture(agent, term, intention):
"""
Make the robot perform the given gesture instantly.
"""
@@ -316,13 +393,113 @@ class BDICoreAgent(BaseAgent):
gesture_name,
)
# gesture = Gesture(type=gesture_type, name=gesture_name)
# gesture_message = InternalMessage(
# to=settings.agent_settings.robot_gesture_name,
# sender=settings.agent_settings.bdi_core_name,
# body=gesture.model_dump_json(),
# )
# asyncio.create_task(agent.send(gesture_message))
if str(gesture_type) == "single":
endpoint = RIEndpoint.GESTURE_SINGLE
elif str(gesture_type) == "tag":
endpoint = RIEndpoint.GESTURE_TAG
else:
self.logger.warning("Gesture type %s could not be resolved.", gesture_type)
endpoint = RIEndpoint.GESTURE_SINGLE
gesture_command = GestureCommand(endpoint=endpoint, data=gesture_name)
gesture_message = InternalMessage(
to=settings.agent_settings.robot_gesture_name,
sender=settings.agent_settings.bdi_core_name,
body=gesture_command.model_dump_json(),
)
self.add_behavior(self.send(gesture_message))
yield
@self.actions.add(".notify_user_said", 1)
def _notify_user_said(agent, term, intention):
user_said = agentspeak.grounded(term.args[0], intention.scope)
msg = InternalMessage(
to=settings.agent_settings.llm_name, thread="user_message", body=str(user_said)
)
self.add_behavior(self.send(msg))
yield
@self.actions.add(".notify_trigger_start", 1)
def _notify_trigger_start(agent, term, intention):
"""
Notify the UI about the trigger we just started doing.
"""
trigger_name = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug("Started trigger %s", trigger_name)
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
sender=self.name,
thread="trigger_start",
body=str(trigger_name),
)
# TODO: check with Pim
self.add_behavior(self.send(msg))
yield
@self.actions.add(".notify_trigger_end", 1)
def _notify_trigger_end(agent, term, intention):
"""
Notify the UI about the trigger we just started doing.
"""
trigger_name = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug("Finished trigger %s", trigger_name)
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
sender=self.name,
thread="trigger_end",
body=str(trigger_name),
)
# TODO: check with Pim
self.add_behavior(self.send(msg))
yield
@self.actions.add(".notify_goal_start", 1)
def _notify_goal_start(agent, term, intention):
"""
Notify the UI about the goal we just started chasing.
"""
goal_name = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug("Started chasing goal %s", goal_name)
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
sender=self.name,
thread="goal_start",
body=str(goal_name),
)
self.add_behavior(self.send(msg))
yield
@self.actions.add(".notify_transition_phase", 2)
def _notify_transition_phase(agent, term, intention):
"""
Notify the BDI program manager about a phase transition.
"""
old = agentspeak.grounded(term.args[0], intention.scope)
new = agentspeak.grounded(term.args[1], intention.scope)
msg = InternalMessage(
to=settings.agent_settings.bdi_program_manager_name,
thread="transition_phase",
body=json.dumps({"old": str(old), "new": str(new)}),
)
self.add_behavior(self.send(msg))
yield
async def _send_to_llm(self, text: str, norms: str, goals: str):
@@ -334,13 +511,14 @@ class BDICoreAgent(BaseAgent):
to=settings.agent_settings.llm_name,
sender=self.name,
body=prompt.model_dump_json(),
thread="prompt_message",
)
await self.send(msg)
self.logger.info("Message sent to LLM agent: %s", text)
@staticmethod
def format_belief_string(name: str, args: Iterable[str] = []):
def format_belief_string(name: str, args: Iterable[str] | None = []):
"""
Given a belief's name and its args, return a string of the form "name(*args)"
"""
return f"{name}{'(' if args else ''}{','.join(args)}{')' if args else ''}"
return f"{name}{'(' if args else ''}{','.join(args or [])}{')' if args else ''}"

View File

@@ -1,3 +1,6 @@
import asyncio
import json
import zmq
from pydantic import ValidationError
from zmq.asyncio import Context
@@ -5,8 +8,16 @@ from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList, GoalList
from control_backend.schemas.internal_message import InternalMessage
from control_backend.schemas.program import Program
from control_backend.schemas.program import (
Belief,
ConditionalNorm,
Goal,
InferredBelief,
Phase,
Program,
)
class BDIProgramManager(BaseAgent):
@@ -21,20 +32,20 @@ class BDIProgramManager(BaseAgent):
:ivar sub_socket: The ZMQ SUB socket used to receive program updates.
"""
_program: Program
_phase: Phase | None
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sub_socket = None
def _initialize_internal_state(self, program: Program):
self._program = program
self._phase = program.phases[0] # start in first phase
async def _create_agentspeak_and_send_to_bdi(self, program: Program):
"""
Convert a received program into BDI beliefs and send them to the BDI Core Agent.
Currently, it takes the **first phase** of the program and extracts:
- **Norms**: Constraints or rules the agent must follow.
- **Goals**: Objectives the agent must achieve.
These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will
overwrite any existing norms/goals of the same name in the BDI agent.
Convert a received program into an AgentSpeak file and send it to the BDI Core Agent.
:param program: The program object received from the API.
"""
@@ -56,12 +67,134 @@ class BDIProgramManager(BaseAgent):
await self.send(msg)
async def handle_message(self, msg: InternalMessage):
match msg.thread:
case "transition_phase":
phases = json.loads(msg.body)
await self._transition_phase(phases["old"], phases["new"])
async def _transition_phase(self, old: str, new: str):
assert old == str(self._phase.id)
if new == "end":
self._phase = None
return
for phase in self._program.phases:
if str(phase.id) == new:
self._phase = phase
await self._send_beliefs_to_semantic_belief_extractor()
await self._send_goals_to_semantic_belief_extractor()
# Notify user interaction agent
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
thread="transition_phase",
body=str(self._phase.id),
)
self.add_behavior(self.send(msg))
def _extract_current_beliefs(self) -> list[Belief]:
beliefs: list[Belief] = []
for norm in self._phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += self._extract_beliefs_from_belief(norm.condition)
for trigger in self._phase.triggers:
beliefs += self._extract_beliefs_from_belief(trigger.condition)
return beliefs
@staticmethod
def _extract_beliefs_from_belief(belief: Belief) -> list[Belief]:
if isinstance(belief, InferredBelief):
return BDIProgramManager._extract_beliefs_from_belief(
belief.left
) + BDIProgramManager._extract_beliefs_from_belief(belief.right)
return [belief]
async def _send_beliefs_to_semantic_belief_extractor(self):
"""
Extract beliefs from the program and send them to the Semantic Belief Extractor Agent.
"""
beliefs = BeliefList(beliefs=self._extract_current_beliefs())
message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=self.name,
body=beliefs.model_dump_json(),
thread="beliefs",
)
await self.send(message)
def _extract_current_goals(self) -> list[Goal]:
"""
Extract all goals from the program, including subgoals.
:return: A list of Goal objects.
"""
goals: list[Goal] = []
def extract_goals_from_goal(goal_: Goal) -> list[Goal]:
goals_: list[Goal] = [goal]
for plan in goal_.plan:
if isinstance(plan, Goal):
goals_.extend(extract_goals_from_goal(plan))
return goals_
for goal in self._phase.goals:
goals.extend(extract_goals_from_goal(goal))
return goals
async def _send_goals_to_semantic_belief_extractor(self):
"""
Extract goals for the current phase and send them to the Semantic Belief Extractor Agent.
"""
goals = GoalList(goals=self._extract_current_goals())
message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=self.name,
body=goals.model_dump_json(),
thread="goals",
)
await self.send(message)
async def _send_clear_llm_history(self):
"""
Clear the LLM Agent's conversation history.
Sends an empty history to the LLM Agent to reset its state.
"""
message = InternalMessage(
to=settings.agent_settings.llm_name,
body="clear_history",
)
await self.send(message)
self.logger.debug("Sent message to LLM agent to clear history.")
extractor_msg = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
thread="conversation_history",
body="reset",
)
await self.send(extractor_msg)
self.logger.debug("Sent message to extractor agent to clear history.")
async def _receive_programs(self):
"""
Continuous loop that receives program updates from the HTTP endpoint.
It listens to the ``program`` topic on the internal ZMQ SUB socket.
When a program is received, it is validated and forwarded to BDI via :meth:`_send_to_bdi`.
Additionally, the LLM history is cleared via :meth:`_send_clear_llm_history`.
"""
while True:
topic, body = await self.sub_socket.recv_multipart()
@@ -69,10 +202,18 @@ class BDIProgramManager(BaseAgent):
try:
program = Program.model_validate_json(body)
except ValidationError:
self.logger.exception("Received an invalid program.")
self.logger.warning("Received an invalid program.")
continue
await self._create_agentspeak_and_send_to_bdi(program)
self._initialize_internal_state(program)
await self._send_clear_llm_history()
await asyncio.gather(
self._create_agentspeak_and_send_to_bdi(program),
self._send_beliefs_to_semantic_belief_extractor(),
self._send_goals_to_semantic_belief_extractor(),
)
async def setup(self):
"""

View File

@@ -101,7 +101,7 @@ class BDIBeliefCollectorAgent(BaseAgent):
:return: A Belief object if the input is valid or None.
"""
try:
return Belief(name=name, arguments=arguments, replace=name == "user_said")
return Belief(name=name, arguments=arguments)
except ValidationError:
return None
@@ -144,7 +144,7 @@ class BDIBeliefCollectorAgent(BaseAgent):
msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=BeliefMessage(beliefs=beliefs).model_dump_json(),
body=BeliefMessage(create=beliefs).model_dump_json(),
thread="beliefs",
)

View File

@@ -1,5 +1,6 @@
norms("").
+user_said(Message) : norms(Norms) <-
.notify_user_said(Message);
-user_said(Message);
.reply(Message, Norms).

View File

@@ -1,8 +1,46 @@
import asyncio
import json
import httpx
from pydantic import BaseModel, ValidationError
from control_backend.agents.base import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList, GoalList
from control_backend.schemas.belief_message import Belief as InternalBelief
from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.chat_history import ChatHistory, ChatMessage
from control_backend.schemas.program import Goal, SemanticBelief
type JSONLike = None | bool | int | float | str | list["JSONLike"] | dict[str, "JSONLike"]
class BeliefState(BaseModel):
true: set[InternalBelief] = set()
false: set[InternalBelief] = set()
def difference(self, other: "BeliefState") -> "BeliefState":
return BeliefState(
true=self.true - other.true,
false=self.false - other.false,
)
def union(self, other: "BeliefState") -> "BeliefState":
return BeliefState(
true=self.true | other.true,
false=self.false | other.false,
)
def __sub__(self, other):
return self.difference(other)
def __or__(self, other):
return self.union(other)
def __bool__(self):
return bool(self.true) or bool(self.false)
class TextBeliefExtractorAgent(BaseAgent):
@@ -12,54 +50,433 @@ class TextBeliefExtractorAgent(BaseAgent):
This agent is responsible for processing raw text (e.g., from speech transcription) and
extracting semantic beliefs from it.
In the current demonstration version, it performs a simple wrapping of the user's input
into a ``user_said`` belief. In a full implementation, this agent would likely interact
with an LLM or NLU engine to extract intent, entities, and other structured information.
It uses the available beliefs received from the program manager to try to extract beliefs from a
user's message, sends and updated beliefs to the BDI core, and forms a ``user_said`` belief from
the message itself.
"""
def __init__(self, name: str):
super().__init__(name)
self._llm = self.LLM(self, settings.llm_settings.n_parallel)
self.belief_inferrer = SemanticBeliefInferrer(self._llm)
self.goal_inferrer = GoalAchievementInferrer(self._llm)
self._current_beliefs = BeliefState()
self._current_goal_completions: dict[str, bool] = {}
self.conversation = ChatHistory(messages=[])
async def setup(self):
"""
Initialize the agent and its resources.
"""
self.logger.info("Settting up %s.", self.name)
# Setup LLM belief context if needed (currently demo is just passthrough)
self.beliefs = {"mood": ["X"], "car": ["Y"]}
self.logger.info("Setting up %s.", self.name)
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming messages, primarily from the Transcription Agent.
Handle incoming messages. Expect messages from the Transcriber agent, LLM agent, and the
Program manager agent.
:param msg: The received message containing transcribed text.
:param msg: The received message.
"""
sender = msg.sender
if sender == settings.agent_settings.transcription_name:
self.logger.debug("Received text from transcriber: %s", msg.body)
await self._process_transcription_demo(msg.body)
else:
self.logger.info("Discarding message from %s", sender)
async def _process_transcription_demo(self, txt: str):
match sender:
case settings.agent_settings.transcription_name:
self.logger.debug("Received text from transcriber: %s", msg.body)
self._apply_conversation_message(ChatMessage(role="user", content=msg.body))
await self._user_said(msg.body)
await self._infer_new_beliefs()
await self._infer_goal_completions()
case settings.agent_settings.llm_name:
self.logger.debug("Received text from LLM: %s", msg.body)
self._apply_conversation_message(ChatMessage(role="assistant", content=msg.body))
case settings.agent_settings.bdi_program_manager_name:
await self._handle_program_manager_message(msg)
case _:
self.logger.info("Discarding message from %s", sender)
return
def _apply_conversation_message(self, message: ChatMessage):
"""
Process the transcribed text and generate beliefs.
Save the chat message to our conversation history, taking into account the conversation
length limit.
**Demo Implementation:**
Currently, this method takes the raw text ``txt`` and wraps it into a belief structure:
``user_said("txt")``.
This belief is then sent to the :class:`BDIBeliefCollectorAgent`.
:param txt: The raw transcribed text string.
:param message: The chat message to add to the conversation history.
"""
# For demo, just wrapping user text as user_said belief
belief = {"beliefs": {"user_said": [txt]}, "type": "belief_extraction_text"}
payload = json.dumps(belief)
length_limit = settings.behaviour_settings.conversation_history_length_limit
self.conversation.messages = (self.conversation.messages + [message])[-length_limit:]
belief_msg = InternalMessage(
to=settings.agent_settings.bdi_belief_collector_name,
sender=self.name,
body=payload,
thread="beliefs",
async def _handle_program_manager_message(self, msg: InternalMessage):
"""
Handle a message from the program manager: extract available beliefs and goals from it.
:param msg: The received message from the program manager.
"""
match msg.thread:
case "beliefs":
self._handle_beliefs_message(msg)
await self._infer_new_beliefs()
case "goals":
self._handle_goals_message(msg)
await self._infer_goal_completions()
case "conversation_history":
if msg.body == "reset":
self._reset()
case _:
self.logger.warning("Received unexpected message from %s", msg.sender)
def _reset(self):
self.conversation = ChatHistory(messages=[])
self.belief_inferrer.available_beliefs.clear()
self._current_beliefs = BeliefState()
self.goal_inferrer.goals.clear()
self._current_goal_completions = {}
def _handle_beliefs_message(self, msg: InternalMessage):
try:
belief_list = BeliefList.model_validate_json(msg.body)
except ValidationError:
self.logger.warning(
"Received message from program manager but it is not a valid list of beliefs."
)
return
available_beliefs = [b for b in belief_list.beliefs if isinstance(b, SemanticBelief)]
self.belief_inferrer.available_beliefs = available_beliefs
self.logger.debug(
"Received %d semantic beliefs from the program manager: %s",
len(available_beliefs),
", ".join(b.name for b in available_beliefs),
)
def _handle_goals_message(self, msg: InternalMessage):
try:
goals_list = GoalList.model_validate_json(msg.body)
except ValidationError:
self.logger.warning(
"Received message from program manager but it is not a valid list of goals."
)
return
# Use only goals that can fail, as the others are always assumed to be completed
available_goals = [g for g in goals_list.goals if g.can_fail]
self.goal_inferrer.goals = available_goals
self.logger.debug(
"Received %d failable goals from the program manager: %s",
len(available_goals),
", ".join(g.name for g in available_goals),
)
async def _user_said(self, text: str):
"""
Create a belief for the user's full speech.
:param text: User's transcribed text.
"""
belief_msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=BeliefMessage(
replace=[InternalBelief(name="user_said", arguments=[text])],
).model_dump_json(),
thread="beliefs",
)
await self.send(belief_msg)
self.logger.info("Sent %d beliefs to the belief collector.", len(belief["beliefs"]))
async def _infer_new_beliefs(self):
conversation_beliefs = await self.belief_inferrer.infer_from_conversation(self.conversation)
new_beliefs = conversation_beliefs - self._current_beliefs
if not new_beliefs:
self.logger.debug("No new beliefs detected.")
return
self._current_beliefs |= new_beliefs
belief_changes = BeliefMessage(
create=list(new_beliefs.true),
delete=list(new_beliefs.false),
)
message = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=belief_changes.model_dump_json(),
thread="beliefs",
)
await self.send(message)
async def _infer_goal_completions(self):
goal_completions = await self.goal_inferrer.infer_from_conversation(self.conversation)
new_achieved = [
InternalBelief(name=goal, arguments=None)
for goal, achieved in goal_completions.items()
if achieved and self._current_goal_completions.get(goal) != achieved
]
new_not_achieved = [
InternalBelief(name=goal, arguments=None)
for goal, achieved in goal_completions.items()
if not achieved and self._current_goal_completions.get(goal) != achieved
]
for goal, achieved in goal_completions.items():
self._current_goal_completions[goal] = achieved
if not new_achieved and not new_not_achieved:
self.logger.debug("No goal achievement changes detected.")
return
belief_changes = BeliefMessage(
create=new_achieved,
delete=new_not_achieved,
)
message = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=belief_changes.model_dump_json(),
thread="beliefs",
)
await self.send(message)
class LLM:
"""
Class that handles sending structured generation requests to an LLM.
"""
def __init__(self, agent: "TextBeliefExtractorAgent", n_parallel: int):
self._agent = agent
self._semaphore = asyncio.Semaphore(n_parallel)
async def query(self, prompt: str, schema: dict, tries: int = 3) -> JSONLike | None:
"""
Query the LLM with the given prompt and schema, return an instance of a dict conforming
to this schema. Try ``tries`` times, or return None.
:param prompt: Prompt to be queried.
:param schema: Schema to be queried.
:param tries: Number of times to try to query the LLM.
:return: An instance of a dict conforming to this schema, or None if failed.
"""
try_count = 0
while try_count < tries:
try_count += 1
try:
return await self._query_llm(prompt, schema)
except (httpx.HTTPError, json.JSONDecodeError, KeyError) as e:
if try_count < tries:
continue
self._agent.logger.exception(
"Failed to get LLM response after %d tries.",
try_count,
exc_info=e,
)
return None
async def _query_llm(self, prompt: str, schema: dict) -> JSONLike:
"""
Query an LLM with the given prompt and schema, return an instance of a dict conforming
to that schema.
:param prompt: The prompt to be queried.
:param schema: Schema to use during response.
:return: A dict conforming to this schema.
:raises httpx.HTTPStatusError: If the LLM server responded with an error.
:raises json.JSONDecodeError: If the LLM response was not valid JSON. May happen if the
response was cut off early due to length limitations.
:raises KeyError: If the LLM server responded with no error, but the response was
invalid.
"""
async with self._semaphore:
async with httpx.AsyncClient() as client:
response = await client.post(
settings.llm_settings.local_llm_url,
json={
"model": settings.llm_settings.local_llm_model,
"messages": [{"role": "user", "content": prompt}],
"response_format": {
"type": "json_schema",
"json_schema": {
"name": "Beliefs",
"strict": True,
"schema": schema,
},
},
"reasoning_effort": "low",
"temperature": settings.llm_settings.code_temperature,
"stream": False,
},
timeout=30.0,
)
response.raise_for_status()
response_json = response.json()
json_message = response_json["choices"][0]["message"]["content"]
return json.loads(json_message)
class SemanticBeliefInferrer:
"""
Class that handles only prompting an LLM for semantic beliefs.
"""
def __init__(
self,
llm: "TextBeliefExtractorAgent.LLM",
available_beliefs: list[SemanticBelief] | None = None,
):
self._llm = llm
self.available_beliefs: list[SemanticBelief] = available_beliefs or []
async def infer_from_conversation(self, conversation: ChatHistory) -> BeliefState:
"""
Process conversation history to extract beliefs, semantically. The result is an object that
describes all beliefs that hold or don't hold based on the full conversation.
:param conversation: The conversation history to be processed.
:return: An object that describes beliefs.
"""
# Return instantly if there are no beliefs to infer
if not self.available_beliefs:
return BeliefState()
n_parallel = max(1, min(settings.llm_settings.n_parallel - 1, len(self.available_beliefs)))
all_beliefs: list[dict[str, bool | None] | None] = await asyncio.gather(
*[
self._infer_beliefs(conversation, beliefs)
for beliefs in self._split_into_chunks(self.available_beliefs, n_parallel)
]
)
retval = BeliefState()
for beliefs in all_beliefs:
if beliefs is None:
continue
for belief_name, belief_holds in beliefs.items():
if belief_holds is None:
continue
belief = InternalBelief(name=belief_name, arguments=None)
if belief_holds:
retval.true.add(belief)
else:
retval.false.add(belief)
return retval
@staticmethod
def _split_into_chunks[T](items: list[T], n: int) -> list[list[T]]:
"""
Split a list into ``n`` chunks, making each chunk approximately ``len(items) / n`` long.
:param items: The list of items to split.
:param n: The number of desired chunks.
:return: A list of chunks each approximately ``len(items) / n`` long.
"""
k, m = divmod(len(items), n)
return [items[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n)]
async def _infer_beliefs(
self,
conversation: ChatHistory,
beliefs: list[SemanticBelief],
) -> dict[str, bool | None] | None:
"""
Infer given beliefs based on the given conversation.
:param conversation: The conversation to infer beliefs from.
:param beliefs: The beliefs to infer.
:return: A dict containing belief names and a boolean whether they hold, or None if the
belief cannot be inferred based on the given conversation.
"""
example = {
"example_belief": True,
}
prompt = f"""{self._format_conversation(conversation)}
Given the above conversation, what beliefs can be inferred?
If there is no relevant information about a belief belief, give null.
In case messages conflict, prefer using the most recent messages for inference.
Choose from the following list of beliefs, formatted as `- <belief_name>: <description>`:
{self._format_beliefs(beliefs)}
Respond with a JSON similar to the following, but with the property names as given above:
{json.dumps(example, indent=2)}
"""
schema = self._create_beliefs_schema(beliefs)
return await self._llm.query(prompt, schema)
@staticmethod
def _create_belief_schema(belief: SemanticBelief) -> tuple[str, dict]:
return AgentSpeakGenerator.slugify(belief), {
"type": ["boolean", "null"],
"description": belief.description,
}
@staticmethod
def _create_beliefs_schema(beliefs: list[SemanticBelief]) -> dict:
belief_schemas = [
SemanticBeliefInferrer._create_belief_schema(belief) for belief in beliefs
]
return {
"type": "object",
"properties": dict(belief_schemas),
"required": [name for name, _ in belief_schemas],
}
@staticmethod
def _format_message(message: ChatMessage):
return f"{message.role.upper()}:\n{message.content}"
@staticmethod
def _format_conversation(conversation: ChatHistory):
return "\n\n".join(
[SemanticBeliefInferrer._format_message(message) for message in conversation.messages]
)
@staticmethod
def _format_beliefs(beliefs: list[SemanticBelief]):
return "\n".join(
[f"- {AgentSpeakGenerator.slugify(belief)}: {belief.description}" for belief in beliefs]
)
class GoalAchievementInferrer(SemanticBeliefInferrer):
def __init__(self, llm: TextBeliefExtractorAgent.LLM):
super().__init__(llm)
self.goals = []
async def infer_from_conversation(self, conversation: ChatHistory) -> dict[str, bool]:
"""
Determine which goals have been achieved based on the given conversation.
:param conversation: The conversation to infer goal completion from.
:return: A mapping of goals and a boolean whether they have been achieved.
"""
if not self.goals:
return {}
goals_achieved = await asyncio.gather(
*[self._infer_goal(conversation, g) for g in self.goals]
)
return {
f"achieved_{AgentSpeakGenerator.slugify(goal)}": achieved
for goal, achieved in zip(self.goals, goals_achieved, strict=True)
}
async def _infer_goal(self, conversation: ChatHistory, goal: Goal) -> bool:
prompt = f"""{self._format_conversation(conversation)}
Given the above conversation, what has the following goal been achieved?
The name of the goal: {goal.name}
Description of the goal: {goal.description}
Answer with literally only `true` or `false` (without backticks)."""
schema = {
"type": "boolean",
}
return await self._llm.query(prompt, schema)

View File

@@ -38,7 +38,7 @@ class RICommunicationAgent(BaseAgent):
def __init__(
self,
name: str,
address=settings.zmq_settings.ri_command_address,
address=settings.zmq_settings.ri_communication_address,
bind=False,
):
super().__init__(name)
@@ -168,7 +168,7 @@ class RICommunicationAgent(BaseAgent):
bind = port_data["bind"]
if not bind:
addr = f"tcp://localhost:{port}"
addr = f"tcp://{settings.ri_host}:{port}"
else:
addr = f"tcp://*:{port}"
@@ -182,6 +182,7 @@ class RICommunicationAgent(BaseAgent):
self._req_socket.bind(addr)
case "actuation":
gesture_data = port_data.get("gestures", [])
single_gesture_data = port_data.get("single_gestures", [])
robot_speech_agent = RobotSpeechAgent(
settings.agent_settings.robot_speech_name,
address=addr,
@@ -192,6 +193,7 @@ class RICommunicationAgent(BaseAgent):
address=addr,
bind=bind,
gesture_data=gesture_data,
single_gesture_data=single_gesture_data,
)
await robot_speech_agent.start()
await asyncio.sleep(0.1) # Small delay
@@ -246,7 +248,8 @@ class RICommunicationAgent(BaseAgent):
self._req_socket.recv_json(), timeout=seconds_to_wait_total / 2
)
self.logger.debug(f'Received message "{message}" from RI.')
if "endpoint" in message and message["endpoint"] != "ping":
self.logger.debug(f'Received message "{message}" from RI.')
if "endpoint" not in message:
self.logger.warning("No received endpoint in message, expected ping endpoint.")
continue

View File

@@ -46,14 +46,23 @@ class LLMAgent(BaseAgent):
:param msg: The received internal message.
"""
if msg.sender == settings.agent_settings.bdi_core_name:
self.logger.debug("Processing message from BDI core.")
try:
prompt_message = LLMPromptMessage.model_validate_json(msg.body)
await self._process_bdi_message(prompt_message)
except ValidationError:
self.logger.debug("Prompt message from BDI core is invalid.")
match msg.thread:
case "prompt_message":
try:
prompt_message = LLMPromptMessage.model_validate_json(msg.body)
await self._process_bdi_message(prompt_message)
except ValidationError:
self.logger.debug("Prompt message from BDI core is invalid.")
case "assistant_message":
self.history.append({"role": "assistant", "content": msg.body})
case "user_message":
self.history.append({"role": "user", "content": msg.body})
elif msg.sender == settings.agent_settings.bdi_program_manager_name:
if msg.body == "clear_history":
self.logger.debug("Clearing conversation history.")
self.history.clear()
else:
self.logger.debug("Message ignored (not from BDI core.")
self.logger.debug("Message ignored.")
async def _process_bdi_message(self, message: LLMPromptMessage):
"""
@@ -64,11 +73,12 @@ class LLMAgent(BaseAgent):
:param message: The parsed prompt message containing text, norms, and goals.
"""
full_message = ""
async for chunk in self._query_llm(message.text, message.norms, message.goals):
await self._send_reply(chunk)
self.logger.debug(
"Finished processing BDI message. Response sent in chunks to BDI core."
)
full_message += chunk
self.logger.debug("Finished processing BDI message. Response sent in chunks to BDI core.")
await self._send_full_reply(full_message)
async def _send_reply(self, msg: str):
"""
@@ -83,6 +93,19 @@ class LLMAgent(BaseAgent):
)
await self.send(reply)
async def _send_full_reply(self, msg: str):
"""
Sends a response message (full) to agents that need it.
:param msg: The text content of the message.
"""
message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=self.name,
body=msg,
)
await self.send(message)
async def _query_llm(
self, prompt: str, norms: list[str], goals: list[str]
) -> AsyncGenerator[str]:
@@ -100,13 +123,6 @@ class LLMAgent(BaseAgent):
:param goals: Goals the LLM should achieve.
:yield: Fragments of the LLM-generated content (e.g., sentences/phrases).
"""
self.history.append(
{
"role": "user",
"content": prompt,
}
)
instructions = LLMInstructions(norms if norms else None, goals if goals else None)
messages = [
{
@@ -172,7 +188,7 @@ class LLMAgent(BaseAgent):
json={
"model": settings.llm_settings.local_llm_model,
"messages": messages,
"temperature": 0.3,
"temperature": settings.llm_settings.chat_temperature,
"stream": True,
},
) as response:

View File

@@ -103,12 +103,11 @@ class VADAgent(BaseAgent):
self._connect_audio_in_socket()
audio_out_port = self._connect_audio_out_socket()
if audio_out_port is None:
audio_out_address = self._connect_audio_out_socket()
if audio_out_address is None:
self.logger.error("Could not bind output socket, stopping.")
await self.stop()
return
audio_out_address = f"tcp://localhost:{audio_out_port}"
# Connect to internal communication socket
self.program_sub_socket = azmq.Context.instance().socket(zmq.SUB)
@@ -161,13 +160,14 @@ class VADAgent(BaseAgent):
self.audio_in_socket.connect(self.audio_in_address)
self.audio_in_poller = SocketPoller[bytes](self.audio_in_socket)
def _connect_audio_out_socket(self) -> int | None:
def _connect_audio_out_socket(self) -> str | None:
"""
Returns the port bound, or None if binding failed.
Returns the address that was bound to, or None if binding failed.
"""
try:
self.audio_out_socket = azmq.Context.instance().socket(zmq.PUB)
return self.audio_out_socket.bind_to_random_port("tcp://localhost", max_tries=100)
self.audio_out_socket.bind(settings.zmq_settings.vad_pub_address)
return settings.zmq_settings.vad_pub_address
except zmq.ZMQBindError:
self.logger.error("Failed to bind an audio output socket after 100 tries.")
self.audio_out_socket = None
@@ -229,10 +229,11 @@ class VADAgent(BaseAgent):
assert self.model is not None
prob = self.model(torch.from_numpy(chunk), settings.vad_settings.sample_rate_hz).item()
non_speech_patience = settings.behaviour_settings.vad_non_speech_patience_chunks
begin_silence_length = settings.behaviour_settings.vad_begin_silence_chunks
prob_threshold = settings.behaviour_settings.vad_prob_threshold
if prob > prob_threshold:
if self.i_since_speech > non_speech_patience:
if self.i_since_speech > non_speech_patience + begin_silence_length:
self.logger.debug("Speech started.")
self.audio_buffer = np.append(self.audio_buffer, chunk)
self.i_since_speech = 0
@@ -246,11 +247,12 @@ class VADAgent(BaseAgent):
continue
# Speech probably ended. Make sure we have a usable amount of data.
if len(self.audio_buffer) >= 3 * len(chunk):
if len(self.audio_buffer) > begin_silence_length * len(chunk):
self.logger.debug("Speech ended.")
assert self.audio_out_socket is not None
await self.audio_out_socket.send(self.audio_buffer[: -2 * len(chunk)].tobytes())
# At this point, we know that the speech has ended.
# Prepend the last chunk that had no speech, for a more fluent boundary
self.audio_buffer = chunk
# At this point, we know that there is no speech.
# Prepend the last few chunks that had no speech, for a more fluent boundary.
self.audio_buffer = np.append(self.audio_buffer, chunk)
self.audio_buffer = self.audio_buffer[-begin_silence_length * len(chunk) :]

View File

@@ -0,0 +1,146 @@
import json
import zmq
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand
class UserInterruptAgent(BaseAgent):
"""
User Interrupt Agent.
This agent receives button_pressed events from the external HTTP API
(via ZMQ) and uses the associated context to trigger one of the following actions:
- Send a prioritized message to the `RobotSpeechAgent`
- Send a prioritized gesture to the `RobotGestureAgent`
- Send a belief override to the `BDIProgramManager`in order to activate a
trigger/conditional norm or complete a goal.
Prioritized actions clear the current RI queue before inserting the new item,
ensuring they are executed immediately after Pepper's current action has been fulfilled.
:ivar sub_socket: The ZMQ SUB socket used to receive user intterupts.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sub_socket = None
async def _receive_button_event(self):
"""
The behaviour of the UserInterruptAgent.
Continuous loop that receives button_pressed events from the button_pressed HTTP endpoint.
These events contain a type and a context.
These are the different types and contexts:
- type: "speech", context: string that the robot has to say.
- type: "gesture", context: single gesture name that the robot has to perform.
- type: "override", context: belief_id that overrides the goal/trigger/conditional norm.
"""
while True:
topic, body = await self.sub_socket.recv_multipart()
try:
event_data = json.loads(body)
event_type = event_data.get("type") # e.g., "speech", "gesture"
event_context = event_data.get("context") # e.g., "Hello, I am Pepper!"
except json.JSONDecodeError:
self.logger.error("Received invalid JSON payload on topic %s", topic)
continue
if event_type == "speech":
await self._send_to_speech_agent(event_context)
self.logger.info(
"Forwarded button press (speech) with context '%s' to RobotSpeechAgent.",
event_context,
)
elif event_type == "gesture":
await self._send_to_gesture_agent(event_context)
self.logger.info(
"Forwarded button press (gesture) with context '%s' to RobotGestureAgent.",
event_context,
)
elif event_type == "override":
await self._send_to_program_manager(event_context)
self.logger.info(
"Forwarded button press (override) with context '%s' to BDIProgramManager.",
event_context,
)
else:
self.logger.warning(
"Received button press with unknown type '%s' (context: '%s').",
event_type,
event_context,
)
async def _send_to_speech_agent(self, text_to_say: str):
"""
method to send prioritized speech command to RobotSpeechAgent.
:param text_to_say: The string that the robot has to say.
"""
cmd = SpeechCommand(data=text_to_say, is_priority=True)
out_msg = InternalMessage(
to=settings.agent_settings.robot_speech_name,
sender=self.name,
body=cmd.model_dump_json(),
)
await self.send(out_msg)
async def _send_to_gesture_agent(self, single_gesture_name: str):
"""
method to send prioritized gesture command to RobotGestureAgent.
:param single_gesture_name: The gesture tag that the robot has to perform.
"""
# the endpoint is set to always be GESTURE_SINGLE for user interrupts
cmd = GestureCommand(
endpoint=RIEndpoint.GESTURE_SINGLE, data=single_gesture_name, is_priority=True
)
out_msg = InternalMessage(
to=settings.agent_settings.robot_gesture_name,
sender=self.name,
body=cmd.model_dump_json(),
)
await self.send(out_msg)
async def _send_to_program_manager(self, belief_id: str):
"""
Send a button_override belief to the BDIProgramManager.
:param belief_id: The belief_id that overrides the goal/trigger/conditional norm.
this id can belong to a basic belief or an inferred belief.
See also: https://utrechtuniversity.youtrack.cloud/articles/N25B-A-27/UI-components
"""
data = {"belief": belief_id}
message = InternalMessage(
to=settings.agent_settings.bdi_program_manager_name,
sender=self.name,
body=json.dumps(data),
thread="belief_override_id",
)
await self.send(message)
self.logger.info(
"Sent button_override belief with id '%s' to Program manager.",
belief_id,
)
async def setup(self):
"""
Initialize the agent.
Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic.
Starts the background behavior to receive the user interrupts.
"""
context = Context.instance()
self.sub_socket = context.socket(zmq.SUB)
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
self.sub_socket.subscribe("button_pressed")
self.add_behavior(self._receive_button_event())

View File

@@ -0,0 +1,31 @@
import logging
from fastapi import APIRouter, Request
from control_backend.schemas.events import ButtonPressedEvent
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/button_pressed", status_code=202)
async def receive_button_event(event: ButtonPressedEvent, request: Request):
"""
Endpoint to handle external button press events.
Validates the event payload and publishes it to the internal 'button_pressed' topic.
Subscribers (in this case user_interrupt_agent) will pick this up to trigger
specific behaviors or state changes.
:param event: The parsed ButtonPressedEvent object.
:param request: The FastAPI request object.
"""
logger.debug("Received button event: %s | %s", event.type, event.context)
topic = b"button_pressed"
body = event.model_dump_json().encode()
pub_socket = request.app.state.endpoints_pub_socket
await pub_socket.send_multipart([topic, body])
return {"status": "Event received"}

View File

@@ -1,6 +1,6 @@
from fastapi.routing import APIRouter
from control_backend.api.v1.endpoints import logs, message, program, robot, sse
from control_backend.api.v1.endpoints import button_pressed, logs, message, program, robot, sse
api_router = APIRouter()
@@ -13,3 +13,5 @@ api_router.include_router(robot.router, prefix="/robot", tags=["Pings", "Command
api_router.include_router(logs.router, tags=["Logs"])
api_router.include_router(program.router, tags=["Program"])
api_router.include_router(button_pressed.router, tags=["Button Pressed Events"])

View File

@@ -131,6 +131,7 @@ class BaseAgent(ABC):
:param message: The message to send.
"""
target = AgentDirectory.get(message.to)
message.sender = self.name
if target:
await target.inbox.put(message)
self.logger.debug(f"Sent message {message.body} to {message.to} via regular inbox.")
@@ -192,7 +193,16 @@ class BaseAgent(ABC):
:param coro: The coroutine to execute as a task.
"""
task = asyncio.create_task(coro)
async def try_coro(coro_: Coroutine):
try:
await coro_
except asyncio.CancelledError:
self.logger.debug("A behavior was canceled successfully: %s", coro_)
except Exception:
self.logger.warning("An exception occurred in a behavior.", exc_info=True)
task = asyncio.create_task(try_coro(coro))
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
return task

View File

@@ -1,3 +1,12 @@
"""
An exhaustive overview of configurable options. All of these can be set using environment variables
by nesting with double underscores (__). Start from the ``Settings`` class.
For example, ``settings.ri_host`` becomes ``RI_HOST``, and
``settings.zmq_settings.ri_communication_address`` becomes
``ZMQ_SETTINGS__RI_COMMUNICATION_ADDRESS``.
"""
from pydantic import BaseModel
from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -8,16 +17,17 @@ class ZMQSettings(BaseModel):
:ivar internal_pub_address: Address for the internal PUB socket.
:ivar internal_sub_address: Address for the internal SUB socket.
:ivar ri_command_address: Address for sending commands to the Robot Interface.
:ivar ri_communication_address: Address for receiving communication from the Robot Interface.
:ivar vad_agent_address: Address for the Voice Activity Detection (VAD) agent.
:ivar ri_communication_address: Address for the endpoint that the Robot Interface connects to.
:ivar vad_pub_address: Address that the VAD agent binds to and publishes audio segments to.
"""
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
internal_pub_address: str = "tcp://localhost:5560"
internal_sub_address: str = "tcp://localhost:5561"
ri_command_address: str = "tcp://localhost:0000"
ri_communication_address: str = "tcp://*:5555"
internal_gesture_rep_adress: str = "tcp://localhost:7788"
vad_pub_address: str = "inproc://vad_stream"
class AgentSettings(BaseModel):
@@ -36,6 +46,8 @@ class AgentSettings(BaseModel):
:ivar robot_speech_name: Name of the Robot Speech Agent.
"""
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
# agent names
bdi_core_name: str = "bdi_core_agent"
bdi_belief_collector_name: str = "belief_collector_agent"
@@ -48,6 +60,7 @@ class AgentSettings(BaseModel):
ri_communication_name: str = "ri_communication_agent"
robot_speech_name: str = "robot_speech_agent"
robot_gesture_name: str = "robot_gesture_agent"
user_interrupt_name: str = "user_interrupt_agent"
class BehaviourSettings(BaseModel):
@@ -60,12 +73,16 @@ class BehaviourSettings(BaseModel):
:ivar vad_prob_threshold: Probability threshold for Voice Activity Detection.
:ivar vad_initial_since_speech: Initial value for 'since speech' counter in VAD.
:ivar vad_non_speech_patience_chunks: Number of non-speech chunks to wait before speech ended.
:ivar vad_begin_silence_chunks: The number of chunks of silence to prepend to speech chunks.
:ivar transcription_max_concurrent_tasks: Maximum number of concurrent transcription tasks.
:ivar transcription_words_per_minute: Estimated words per minute for transcription timing.
:ivar transcription_words_per_token: Estimated words per token for transcription timing.
:ivar transcription_token_buffer: Buffer for transcription tokens.
:ivar conversation_history_length_limit: The maximum amount of messages to extract beliefs from.
"""
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
sleep_s: float = 1.0
comm_setup_max_retries: int = 5
socket_poller_timeout_ms: int = 100
@@ -73,7 +90,8 @@ class BehaviourSettings(BaseModel):
# VAD settings
vad_prob_threshold: float = 0.5
vad_initial_since_speech: int = 100
vad_non_speech_patience_chunks: int = 3
vad_non_speech_patience_chunks: int = 15
vad_begin_silence_chunks: int = 6
# transcription behaviour
transcription_max_concurrent_tasks: int = 3
@@ -81,6 +99,9 @@ class BehaviourSettings(BaseModel):
transcription_words_per_token: float = 0.75 # (3 words = 4 tokens)
transcription_token_buffer: int = 10
# Text belief extractor settings
conversation_history_length_limit: int = 10
class LLMSettings(BaseModel):
"""
@@ -88,10 +109,19 @@ class LLMSettings(BaseModel):
:ivar local_llm_url: URL for the local LLM API.
:ivar local_llm_model: Name of the local LLM model to use.
:ivar chat_temperature: The temperature to use while generating chat responses.
:ivar code_temperature: The temperature to use while generating code-like responses like during
belief inference.
:ivar n_parallel: The number of parallel calls allowed to be made to the LLM.
"""
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
local_llm_url: str = "http://localhost:1234/v1/chat/completions"
local_llm_model: str = "gpt-oss"
chat_temperature: float = 1.0
code_temperature: float = 0.3
n_parallel: int = 4
class VADSettings(BaseModel):
@@ -103,6 +133,8 @@ class VADSettings(BaseModel):
:ivar sample_rate_hz: Sample rate in Hz for the VAD model.
"""
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
repo_or_dir: str = "snakers4/silero-vad"
model_name: str = "silero_vad"
sample_rate_hz: int = 16000
@@ -116,6 +148,8 @@ class SpeechModelSettings(BaseModel):
:ivar openai_model_name: Model name for OpenAI-based speech recognition.
"""
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
# model identifiers for speech recognition
mlx_model_name: str = "mlx-community/whisper-small.en-mlx"
openai_model_name: str = "small.en"
@@ -127,6 +161,7 @@ class Settings(BaseSettings):
:ivar app_title: Title of the application.
:ivar ui_url: URL of the frontend UI.
:ivar ri_host: The hostname of the Robot Interface.
:ivar zmq_settings: ZMQ configuration.
:ivar agent_settings: Agent name configuration.
:ivar behaviour_settings: Behavior configuration.
@@ -139,6 +174,8 @@ class Settings(BaseSettings):
ui_url: str = "http://localhost:5173"
ri_host: str = "localhost"
zmq_settings: ZMQSettings = ZMQSettings()
agent_settings: AgentSettings = AgentSettings()

View File

@@ -39,6 +39,9 @@ from control_backend.agents.communication import RICommunicationAgent
# LLM Agents
from control_backend.agents.llm import LLMAgent
# User Interrupt Agent
from control_backend.agents.user_interrupt.user_interrupt_agent import UserInterruptAgent
# Other backend imports
from control_backend.api.v1.router import api_router
from control_backend.core.config import settings
@@ -137,6 +140,12 @@ async def lifespan(app: FastAPI):
"name": settings.agent_settings.bdi_program_manager_name,
},
),
"UserInterruptAgent": (
UserInterruptAgent,
{
"name": settings.agent_settings.user_interrupt_name,
},
),
}
agents = []

View File

@@ -0,0 +1,19 @@
from pydantic import BaseModel
from control_backend.schemas.program import Belief as ProgramBelief
from control_backend.schemas.program import Goal
class BeliefList(BaseModel):
"""
Represents a list of beliefs, separated from a program. Useful in agents which need to
communicate beliefs.
:ivar: beliefs: The list of beliefs.
"""
beliefs: list[ProgramBelief]
class GoalList(BaseModel):
goals: list[Goal]

View File

@@ -6,18 +6,30 @@ class Belief(BaseModel):
Represents a single belief in the BDI system.
:ivar name: The functor or name of the belief (e.g., 'user_said').
:ivar arguments: A list of string arguments for the belief.
:ivar replace: If True, existing beliefs with this name should be replaced by this one.
:ivar arguments: A list of string arguments for the belief, or None if the belief has no
arguments.
"""
name: str
arguments: list[str]
replace: bool = False
arguments: list[str] | None
# To make it hashable
model_config = {"frozen": True}
class BeliefMessage(BaseModel):
"""
A container for transporting a list of beliefs between agents.
A container for communicating beliefs between agents.
:ivar create: Beliefs to create.
:ivar delete: Beliefs to delete.
:ivar replace: Beliefs to replace. Deletes all beliefs with the same name, replacing them with
one new belief.
"""
beliefs: list[Belief]
create: list[Belief] = []
delete: list[Belief] = []
replace: list[Belief] = []
def has_values(self) -> bool:
return len(self.create) > 0 or len(self.delete) > 0 or len(self.replace) > 0

View File

@@ -0,0 +1,10 @@
from pydantic import BaseModel
class ChatMessage(BaseModel):
role: str
content: str
class ChatHistory(BaseModel):
messages: list[ChatMessage]

View File

@@ -0,0 +1,6 @@
from pydantic import BaseModel
class ButtonPressedEvent(BaseModel):
type: str
context: str

View File

@@ -12,6 +12,6 @@ class InternalMessage(BaseModel):
"""
to: str
sender: str
sender: str | None = None
body: str
thread: str | None = None

View File

@@ -43,7 +43,6 @@ class SemanticBelief(ProgramElement):
:ivar description: Description of how to form the belief, used by the LLM.
"""
name: str = ""
description: str
@@ -113,10 +112,12 @@ class Goal(ProgramElement):
for example when the achieving of the goal is dependent on the user's reply, this means
that the achieved status will be set from somewhere else in the program.
:ivar description: A description of the goal, used to determine if it has been achieved.
:ivar plan: The plan to execute.
:ivar can_fail: Whether we can fail to achieve the goal after executing the plan.
"""
description: str = ""
plan: Plan
can_fail: bool = True
@@ -179,7 +180,6 @@ class Trigger(ProgramElement):
:ivar plan: The plan to execute.
"""
name: str = ""
condition: Belief
plan: Plan

View File

@@ -38,6 +38,7 @@ class SpeechCommand(RIMessage):
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH)
data: str
is_priority: bool = False
class GestureCommand(RIMessage):
@@ -52,6 +53,7 @@ class GestureCommand(RIMessage):
RIEndpoint.GESTURE_SINGLE, RIEndpoint.GESTURE_TAG
]
data: str
is_priority: bool = False
@model_validator(mode="after")
def check_endpoint(self):

View File

@@ -91,7 +91,7 @@ def test_out_socket_creation(zmq_context):
assert per_vad_agent.audio_out_socket is not None
zmq_context.return_value.socket.assert_called_once_with(zmq.PUB)
zmq_context.return_value.socket.return_value.bind_to_random_port.assert_called_once()
zmq_context.return_value.socket.return_value.bind.assert_called_once_with("inproc://vad_stream")
@pytest.mark.asyncio

View File

@@ -73,7 +73,7 @@ async def test_setup_connect(zmq_context, mocker):
async def test_handle_message_sends_valid_gesture_command():
"""Internal message with valid gesture tag is forwarded to robot pub socket."""
pubsocket = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.pubsocket = pubsocket
payload = {
@@ -91,7 +91,7 @@ async def test_handle_message_sends_valid_gesture_command():
async def test_handle_message_sends_non_gesture_command():
"""Internal message with non-gesture endpoint is not forwarded by this agent."""
pubsocket = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.pubsocket = pubsocket
payload = {"endpoint": "some_other_endpoint", "data": "invalid_tag_not_in_list"}
@@ -107,7 +107,7 @@ async def test_handle_message_sends_non_gesture_command():
async def test_handle_message_rejects_invalid_gesture_tag():
"""Internal message with invalid gesture tag is not forwarded."""
pubsocket = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.pubsocket = pubsocket
# Use a tag that's not in gesture_data
@@ -123,7 +123,7 @@ async def test_handle_message_rejects_invalid_gesture_tag():
async def test_handle_message_invalid_payload():
"""Invalid payload is caught and does not send."""
pubsocket = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.pubsocket = pubsocket
msg = InternalMessage(to="robot", sender="tester", body=json.dumps({"bad": "data"}))
@@ -142,12 +142,12 @@ async def test_zmq_command_loop_valid_gesture_payload():
async def recv_once():
# stop after first iteration
agent._running = False
return (b"command", json.dumps(command).encode("utf-8"))
return b"command", json.dumps(command).encode("utf-8")
fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.subsocket = fake_socket
agent.pubsocket = fake_socket
agent._running = True
@@ -165,12 +165,12 @@ async def test_zmq_command_loop_valid_non_gesture_payload():
async def recv_once():
agent._running = False
return (b"command", json.dumps(command).encode("utf-8"))
return b"command", json.dumps(command).encode("utf-8")
fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.subsocket = fake_socket
agent.pubsocket = fake_socket
agent._running = True
@@ -188,12 +188,12 @@ async def test_zmq_command_loop_invalid_gesture_tag():
async def recv_once():
agent._running = False
return (b"command", json.dumps(command).encode("utf-8"))
return b"command", json.dumps(command).encode("utf-8")
fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.subsocket = fake_socket
agent.pubsocket = fake_socket
agent._running = True
@@ -210,12 +210,12 @@ async def test_zmq_command_loop_invalid_json():
async def recv_once():
agent._running = False
return (b"command", b"{not_json}")
return b"command", b"{not_json}"
fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.subsocket = fake_socket
agent.pubsocket = fake_socket
agent._running = True
@@ -232,12 +232,12 @@ async def test_zmq_command_loop_ignores_send_gestures_topic():
async def recv_once():
agent._running = False
return (b"send_gestures", b"{}")
return b"send_gestures", b"{}"
fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.subsocket = fake_socket
agent.pubsocket = fake_socket
agent._running = True
@@ -259,7 +259,9 @@ async def test_fetch_gestures_loop_without_amount():
fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no", "wave", "point"])
agent = RobotGestureAgent(
"robot_gesture", gesture_data=["hello", "yes", "no", "wave", "point"], address=""
)
agent.repsocket = fake_repsocket
agent._running = True
@@ -287,7 +289,9 @@ async def test_fetch_gestures_loop_with_amount():
fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no", "wave", "point"])
agent = RobotGestureAgent(
"robot_gesture", gesture_data=["hello", "yes", "no", "wave", "point"], address=""
)
agent.repsocket = fake_repsocket
agent._running = True
@@ -315,7 +319,7 @@ async def test_fetch_gestures_loop_with_integer_request():
fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.repsocket = fake_repsocket
agent._running = True
@@ -340,7 +344,7 @@ async def test_fetch_gestures_loop_with_invalid_json():
fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.repsocket = fake_repsocket
agent._running = True
@@ -365,7 +369,7 @@ async def test_fetch_gestures_loop_with_non_integer_json():
fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.repsocket = fake_repsocket
agent._running = True
@@ -381,7 +385,7 @@ async def test_fetch_gestures_loop_with_non_integer_json():
def test_gesture_data_attribute():
"""Test that gesture_data returns the expected list."""
gesture_data = ["hello", "yes", "no", "wave"]
agent = RobotGestureAgent("robot_gesture", gesture_data=gesture_data)
agent = RobotGestureAgent("robot_gesture", gesture_data=gesture_data, address="")
assert agent.gesture_data == gesture_data
assert isinstance(agent.gesture_data, list)
@@ -398,7 +402,7 @@ async def test_stop_closes_sockets():
pubsocket = MagicMock()
subsocket = MagicMock()
repsocket = MagicMock()
agent = RobotGestureAgent("robot_gesture")
agent = RobotGestureAgent("robot_gesture", address="")
agent.pubsocket = pubsocket
agent.subsocket = subsocket
agent.repsocket = repsocket
@@ -415,7 +419,7 @@ async def test_stop_closes_sockets():
async def test_initialization_with_custom_gesture_data():
"""Agent can be initialized with custom gesture data."""
custom_gestures = ["custom1", "custom2", "custom3"]
agent = RobotGestureAgent("robot_gesture", gesture_data=custom_gestures)
agent = RobotGestureAgent("robot_gesture", gesture_data=custom_gestures, address="")
assert agent.gesture_data == custom_gestures
@@ -432,7 +436,7 @@ async def test_fetch_gestures_loop_handles_exception():
fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.repsocket = fake_repsocket
agent.logger = MagicMock()
agent._running = True

View File

@@ -64,7 +64,7 @@ async def test_handle_message_sends_command():
agent = mock_speech_agent()
agent.pubsocket = pubsocket
payload = {"endpoint": "actuate/speech", "data": "hello"}
payload = {"endpoint": "actuate/speech", "data": "hello", "is_priority": False}
msg = InternalMessage(to="robot", sender="tester", body=json.dumps(payload))
await agent.handle_message(msg)
@@ -75,7 +75,7 @@ async def test_handle_message_sends_command():
@pytest.mark.asyncio
async def test_zmq_command_loop_valid_payload(zmq_context):
"""UI command is read from SUB and published."""
command = {"endpoint": "actuate/speech", "data": "hello"}
command = {"endpoint": "actuate/speech", "data": "hello", "is_priority": False}
fake_socket = AsyncMock()
async def recv_once():

View File

@@ -20,7 +20,7 @@ def mock_agentspeak_env():
@pytest.fixture
def agent():
agent = BDICoreAgent("bdi_agent", "dummy.asl")
agent = BDICoreAgent("bdi_agent")
agent.send = AsyncMock()
agent.bdi_agent = MagicMock()
return agent
@@ -51,7 +51,7 @@ async def test_handle_belief_collector_message(agent, mock_settings):
msg = InternalMessage(
to="bdi_agent",
sender=mock_settings.agent_settings.bdi_belief_collector_name,
body=BeliefMessage(beliefs=beliefs).model_dump_json(),
body=BeliefMessage(create=beliefs).model_dump_json(),
thread="beliefs",
)
@@ -64,6 +64,26 @@ async def test_handle_belief_collector_message(agent, mock_settings):
assert args[2] == agentspeak.Literal("user_said", (agentspeak.Literal("Hello"),))
@pytest.mark.asyncio
async def test_handle_delete_belief_message(agent, mock_settings):
"""Test that incoming beliefs to be deleted are removed from the BDI agent"""
beliefs = [Belief(name="user_said", arguments=["Hello"])]
msg = InternalMessage(
to="bdi_agent",
sender=mock_settings.agent_settings.bdi_belief_collector_name,
body=BeliefMessage(delete=beliefs).model_dump_json(),
thread="beliefs",
)
await agent.handle_message(msg)
# Expect bdi_agent.call to be triggered to remove belief
args = agent.bdi_agent.call.call_args.args
assert args[0] == agentspeak.Trigger.removal
assert args[1] == agentspeak.GoalType.belief
assert args[2] == agentspeak.Literal("user_said", (agentspeak.Literal("Hello"),))
@pytest.mark.asyncio
async def test_incorrect_belief_collector_message(agent, mock_settings):
"""Test that incorrect message format triggers an exception."""
@@ -113,14 +133,14 @@ async def test_custom_actions(agent):
# Invoke action
mock_term = MagicMock()
mock_term.args = ["Hello", "Norm", "Goal"]
mock_term.args = ["Hello", "Norm"]
mock_intention = MagicMock()
# Run generator
gen = action_fn(agent, mock_term, mock_intention)
next(gen) # Execute
agent._send_to_llm.assert_called_with("Hello", "Norm", "Goal")
agent._send_to_llm.assert_called_with("Hello", "Norm", "")
def test_add_belief_sets_event(agent):
@@ -128,7 +148,8 @@ def test_add_belief_sets_event(agent):
agent._wake_bdi_loop = MagicMock()
belief = Belief(name="test_belief", arguments=["a", "b"])
agent._apply_beliefs([belief])
belief_changes = BeliefMessage(replace=[belief])
agent._apply_belief_changes(belief_changes)
assert agent.bdi_agent.call.called
agent._wake_bdi_loop.set.assert_called()
@@ -137,7 +158,7 @@ def test_add_belief_sets_event(agent):
def test_apply_beliefs_empty_returns(agent):
"""Line: if not beliefs: return"""
agent._wake_bdi_loop = MagicMock()
agent._apply_beliefs([])
agent._apply_belief_changes(BeliefMessage())
agent.bdi_agent.call.assert_not_called()
agent._wake_bdi_loop.set.assert_not_called()
@@ -220,8 +241,9 @@ def test_replace_belief_calls_remove_all(agent):
agent._remove_all_with_name = MagicMock()
agent._wake_bdi_loop = MagicMock()
belief = Belief(name="user_said", arguments=["Hello"], replace=True)
agent._apply_beliefs([belief])
belief = Belief(name="user_said", arguments=["Hello"])
belief_changes = BeliefMessage(replace=[belief])
agent._apply_belief_changes(belief_changes)
agent._remove_all_with_name.assert_called_with("user_said")

View File

@@ -1,6 +1,6 @@
import asyncio
import json
import sys
import uuid
from unittest.mock import AsyncMock
import pytest
@@ -8,31 +8,47 @@ import pytest
from control_backend.agents.bdi.bdi_program_manager import BDIProgramManager
from control_backend.core.agent_system import InternalMessage
from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.program import Program
from control_backend.schemas.program import BasicNorm, Goal, Phase, Plan, Program
# Fix Windows Proactor loop for zmq
if sys.platform.startswith("win"):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
def make_valid_program_json(norm="N1", goal="G1"):
return json.dumps(
{
"phases": [
{
"id": "phase1",
"label": "Phase 1",
"triggers": [],
"norms": [{"id": "n1", "label": "Norm 1", "norm": norm}],
"goals": [
{"id": "g1", "label": "Goal 1", "description": goal, "achieved": False}
],
}
]
}
)
def make_valid_program_json(norm="N1", goal="G1") -> str:
return Program(
phases=[
Phase(
id=uuid.uuid4(),
name="Basic Phase",
norms=[
BasicNorm(
id=uuid.uuid4(),
name=norm,
norm=norm,
),
],
goals=[
Goal(
id=uuid.uuid4(),
name=goal,
description="This description can be used to determine whether the goal "
"has been achieved.",
plan=Plan(
id=uuid.uuid4(),
name="Goal Plan",
steps=[],
),
can_fail=False,
),
],
triggers=[],
),
],
).model_dump_json()
@pytest.mark.skip(reason="Functionality being rebuilt.")
@pytest.mark.asyncio
async def test_send_to_bdi():
manager = BDIProgramManager(name="program_manager_test")
@@ -61,8 +77,10 @@ async def test_receive_programs_valid_and_invalid():
]
manager = BDIProgramManager(name="program_manager_test")
manager._internal_pub_socket = AsyncMock()
manager.sub_socket = sub
manager._create_agentspeak_and_send_to_bdi = AsyncMock()
manager._send_clear_llm_history = AsyncMock()
try:
# Will give StopAsyncIteration when the predefined `sub.recv_multipart` side-effects run out
@@ -73,5 +91,26 @@ async def test_receive_programs_valid_and_invalid():
# Only valid Program should have triggered _send_to_bdi
assert manager._create_agentspeak_and_send_to_bdi.await_count == 1
forwarded: Program = manager._create_agentspeak_and_send_to_bdi.await_args[0][0]
assert forwarded.phases[0].norms[0].norm == "N1"
assert forwarded.phases[0].goals[0].description == "G1"
assert forwarded.phases[0].norms[0].name == "N1"
assert forwarded.phases[0].goals[0].name == "G1"
# Verify history clear was triggered
assert manager._send_clear_llm_history.await_count == 1
@pytest.mark.asyncio
async def test_send_clear_llm_history(mock_settings):
# Ensure the mock returns a string for the agent name (just like in your LLM tests)
mock_settings.agent_settings.llm_agent_name = "llm_agent"
manager = BDIProgramManager(name="program_manager_test")
manager.send = AsyncMock()
await manager._send_clear_llm_history()
assert manager.send.await_count == 2
msg: InternalMessage = manager.send.await_args_list[0][0][0]
# Verify the content and recipient
assert msg.body == "clear_history"
assert msg.to == "llm_agent"

View File

@@ -86,7 +86,7 @@ async def test_send_beliefs_to_bdi(agent):
sent: InternalMessage = agent.send.call_args.args[0]
assert sent.to == settings.agent_settings.bdi_core_name
assert sent.thread == "beliefs"
assert json.loads(sent.body)["beliefs"] == [belief.model_dump() for belief in beliefs]
assert json.loads(sent.body)["create"] == [belief.model_dump() for belief in beliefs]
@pytest.mark.asyncio

View File

@@ -0,0 +1,376 @@
import json
import uuid
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from control_backend.agents.bdi import TextBeliefExtractorAgent
from control_backend.agents.bdi.text_belief_extractor_agent import BeliefState
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList
from control_backend.schemas.belief_message import Belief as InternalBelief
from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.chat_history import ChatHistory, ChatMessage
from control_backend.schemas.program import (
ConditionalNorm,
KeywordBelief,
LLMAction,
Phase,
Plan,
Program,
SemanticBelief,
Trigger,
)
@pytest.fixture
def llm():
llm = TextBeliefExtractorAgent.LLM(MagicMock(), 4)
llm._query_llm = AsyncMock()
return llm
@pytest.fixture
def agent(llm):
with patch(
"control_backend.agents.bdi.text_belief_extractor_agent.TextBeliefExtractorAgent.LLM",
return_value=llm,
):
agent = TextBeliefExtractorAgent("text_belief_agent")
agent.send = AsyncMock()
return agent
@pytest.fixture
def sample_program():
return Program(
phases=[
Phase(
name="Some phase",
id=uuid.uuid4(),
norms=[
ConditionalNorm(
name="Some norm",
id=uuid.uuid4(),
norm="Use nautical terms.",
critical=False,
condition=SemanticBelief(
name="is_pirate",
id=uuid.uuid4(),
description="The user is a pirate. Perhaps because they say "
"they are, or because they speak like a pirate "
'with terms like "arr".',
),
),
],
goals=[],
triggers=[
Trigger(
name="Some trigger",
id=uuid.uuid4(),
condition=SemanticBelief(
name="no_more_booze",
id=uuid.uuid4(),
description="There is no more alcohol.",
),
plan=Plan(
name="Some plan",
id=uuid.uuid4(),
steps=[
LLMAction(
name="Some action",
id=uuid.uuid4(),
goal="Suggest eating chocolate instead.",
),
],
),
),
],
),
],
)
def make_msg(sender: str, body: str, thread: str | None = None) -> InternalMessage:
return InternalMessage(to="unused", sender=sender, body=body, thread=thread)
@pytest.mark.asyncio
async def test_handle_message_ignores_other_agents(agent):
msg = make_msg("unknown", "some data", None)
await agent.handle_message(msg)
agent.send.assert_not_called() # noqa # `agent.send` has no such property, but we mock it.
@pytest.mark.asyncio
async def test_handle_message_from_transcriber(agent, mock_settings):
transcription = "hello world"
msg = make_msg(mock_settings.agent_settings.transcription_name, transcription, None)
await agent.handle_message(msg)
agent.send.assert_awaited_once() # noqa # `agent.send` has no such property, but we mock it.
sent: InternalMessage = agent.send.call_args.args[0] # noqa
assert sent.to == mock_settings.agent_settings.bdi_core_name
assert sent.thread == "beliefs"
parsed = BeliefMessage.model_validate_json(sent.body)
replaced_last = parsed.replace.pop()
assert replaced_last.name == "user_said"
assert replaced_last.arguments == [transcription]
@pytest.mark.asyncio
async def test_query_llm():
mock_response = MagicMock()
mock_response.json.return_value = {
"choices": [
{
"message": {
"content": "null",
}
}
]
}
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
mock_async_client = MagicMock()
mock_async_client.__aenter__.return_value = mock_client
mock_async_client.__aexit__.return_value = None
with patch(
"control_backend.agents.bdi.text_belief_extractor_agent.httpx.AsyncClient",
return_value=mock_async_client,
):
llm = TextBeliefExtractorAgent.LLM(MagicMock(), 4)
res = await llm._query_llm("hello world", {"type": "null"})
# Response content was set as "null", so should be deserialized as None
assert res is None
@pytest.mark.asyncio
async def test_retry_query_llm_success(llm):
llm._query_llm.return_value = None
res = await llm.query("hello world", {"type": "null"})
llm._query_llm.assert_called_once()
assert res is None
@pytest.mark.asyncio
async def test_retry_query_llm_success_after_failure(llm):
llm._query_llm.side_effect = [KeyError(), "real value"]
res = await llm.query("hello world", {"type": "string"})
assert llm._query_llm.call_count == 2
assert res == "real value"
@pytest.mark.asyncio
async def test_retry_query_llm_failures(llm):
llm._query_llm.side_effect = [KeyError(), KeyError(), KeyError(), "real value"]
res = await llm.query("hello world", {"type": "string"})
assert llm._query_llm.call_count == 3
assert res is None
@pytest.mark.asyncio
async def test_retry_query_llm_fail_immediately(llm):
llm._query_llm.side_effect = [KeyError(), "real value"]
res = await llm.query("hello world", {"type": "string"}, tries=1)
assert llm._query_llm.call_count == 1
assert res is None
@pytest.mark.asyncio
async def test_extracting_semantic_beliefs(agent):
"""
The Program Manager sends beliefs to this agent. Test whether the agent handles them correctly.
"""
assert len(agent.belief_inferrer.available_beliefs) == 0
beliefs = BeliefList(
beliefs=[
KeywordBelief(
id=uuid.uuid4(),
name="keyword_hello",
keyword="hello",
),
SemanticBelief(
id=uuid.uuid4(), name="semantic_hello_1", description="Some semantic belief 1"
),
SemanticBelief(
id=uuid.uuid4(), name="semantic_hello_2", description="Some semantic belief 2"
),
]
)
await agent.handle_message(
InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.bdi_program_manager_name,
body=beliefs.model_dump_json(),
thread="beliefs",
),
)
assert len(agent.belief_inferrer.available_beliefs) == 2
@pytest.mark.asyncio
async def test_handle_invalid_beliefs(agent, sample_program):
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
assert len(agent.belief_inferrer.available_beliefs) == 2
await agent.handle_message(
InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.bdi_program_manager_name,
body=json.dumps({"phases": "Invalid"}),
thread="beliefs",
),
)
assert len(agent.belief_inferrer.available_beliefs) == 2
@pytest.mark.asyncio
async def test_handle_robot_response(agent):
initial_length = len(agent.conversation.messages)
response = "Hi, I'm Pepper. What's your name?"
await agent.handle_message(
InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.llm_name,
body=response,
),
)
assert len(agent.conversation.messages) == initial_length + 1
assert agent.conversation.messages[-1].role == "assistant"
assert agent.conversation.messages[-1].content == response
@pytest.mark.asyncio
async def test_simulated_real_turn_with_beliefs(agent, llm, sample_program):
"""Test sending user message to extract beliefs from."""
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
# Send a user message with the belief that there's no more booze
llm._query_llm.return_value = {"is_pirate": None, "no_more_booze": True}
assert len(agent.conversation.messages) == 0
await agent.handle_message(
InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.transcription_name,
body="We're all out of schnaps.",
),
)
assert len(agent.conversation.messages) == 1
# There should be a belief set and sent to the BDI core, as well as the user_said belief
assert agent.send.call_count == 2
# First should be the beliefs message
message: InternalMessage = agent.send.call_args_list[1].args[0]
beliefs = BeliefMessage.model_validate_json(message.body)
assert len(beliefs.create) == 1
assert beliefs.create[0].name == "no_more_booze"
@pytest.mark.asyncio
async def test_simulated_real_turn_no_beliefs(agent, llm, sample_program):
"""Test a user message to extract beliefs from, but no beliefs are formed."""
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
# Send a user message with no new beliefs
llm._query_llm.return_value = {"is_pirate": None, "no_more_booze": None}
await agent.handle_message(
InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.transcription_name,
body="Hello there!",
),
)
# Only the user_said belief should've been sent
agent.send.assert_called_once()
@pytest.mark.asyncio
async def test_simulated_real_turn_no_new_beliefs(agent, llm, sample_program):
"""
Test a user message to extract beliefs from, but no new beliefs are formed because they already
existed.
"""
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
agent._current_beliefs = BeliefState(true={InternalBelief(name="is_pirate", arguments=None)})
# Send a user message with the belief the user is a pirate, still
llm._query_llm.return_value = {"is_pirate": True, "no_more_booze": None}
await agent.handle_message(
InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.transcription_name,
body="Arr, nice to meet you, matey.",
),
)
# Only the user_said belief should've been sent, as no beliefs have changed
agent.send.assert_called_once()
@pytest.mark.asyncio
async def test_simulated_real_turn_remove_belief(agent, llm, sample_program):
"""
Test a user message to extract beliefs from, but an existing belief is determined no longer to
hold.
"""
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
agent._current_beliefs = BeliefState(
true={InternalBelief(name="no_more_booze", arguments=None)},
)
# Send a user message with the belief the user is a pirate, still
llm._query_llm.return_value = {"is_pirate": None, "no_more_booze": False}
await agent.handle_message(
InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.transcription_name,
body="I found an untouched barrel of wine!",
),
)
# Both user_said and belief change should've been sent
assert agent.send.call_count == 2
# Agent's current beliefs should've changed
assert any(b.name == "no_more_booze" for b in agent._current_beliefs.false)
@pytest.mark.asyncio
async def test_llm_failure_handling(agent, llm, sample_program):
"""
Check that the agent handles failures gracefully without crashing.
"""
llm._query_llm.side_effect = httpx.HTTPError("")
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
belief_changes = await agent.belief_inferrer.infer_from_conversation(
ChatHistory(
messages=[ChatMessage(role="user", content="Good day!")],
),
)
assert len(belief_changes.true) == 0
assert len(belief_changes.false) == 0

View File

@@ -1,65 +0,0 @@
import json
from unittest.mock import AsyncMock
import pytest
from control_backend.agents.bdi import (
TextBeliefExtractorAgent,
)
from control_backend.core.agent_system import InternalMessage
@pytest.fixture
def agent():
agent = TextBeliefExtractorAgent("text_belief_agent")
agent.send = AsyncMock()
return agent
def make_msg(sender: str, body: str, thread: str | None = None) -> InternalMessage:
return InternalMessage(to="unused", sender=sender, body=body, thread=thread)
@pytest.mark.asyncio
async def test_handle_message_ignores_other_agents(agent):
msg = make_msg("unknown", "some data", None)
await agent.handle_message(msg)
agent.send.assert_not_called() # noqa # `agent.send` has no such property, but we mock it.
@pytest.mark.asyncio
async def test_handle_message_from_transcriber(agent, mock_settings):
transcription = "hello world"
msg = make_msg(mock_settings.agent_settings.transcription_name, transcription, None)
await agent.handle_message(msg)
agent.send.assert_awaited_once() # noqa # `agent.send` has no such property, but we mock it.
sent: InternalMessage = agent.send.call_args.args[0] # noqa
assert sent.to == mock_settings.agent_settings.bdi_belief_collector_name
assert sent.thread == "beliefs"
parsed = json.loads(sent.body)
assert parsed == {"beliefs": {"user_said": [transcription]}, "type": "belief_extraction_text"}
@pytest.mark.asyncio
async def test_process_transcription_demo(agent, mock_settings):
transcription = "this is a test"
await agent._process_transcription_demo(transcription)
agent.send.assert_awaited_once() # noqa # `agent.send` has no such property, but we mock it.
sent: InternalMessage = agent.send.call_args.args[0] # noqa
assert sent.to == mock_settings.agent_settings.bdi_belief_collector_name
assert sent.thread == "beliefs"
parsed = json.loads(sent.body)
assert parsed["beliefs"]["user_said"] == [transcription]
@pytest.mark.asyncio
async def test_setup_initializes_beliefs(agent):
"""Covers the setup method and ensures beliefs are initialized."""
await agent.setup()
assert agent.beliefs == {"mood": ["X"], "car": ["Y"]}

View File

@@ -67,6 +67,7 @@ async def test_setup_success_connects_and_starts_robot(zmq_context):
address="tcp://localhost:5556",
bind=False,
gesture_data=[],
single_gesture_data=[],
)
agent.add_behavior.assert_called_once()

View File

@@ -66,7 +66,7 @@ async def test_llm_processing_success(mock_httpx_client, mock_settings):
# "Hello world." constitutes one sentence/chunk based on punctuation split
# The agent should call send once with the full sentence
assert agent.send.called
args = agent.send.call_args[0][0]
args = agent.send.call_args_list[0][0][0]
assert args.to == mock_settings.agent_settings.bdi_core_name
assert "Hello world." in args.body
@@ -197,6 +197,9 @@ async def test_query_llm_yields_final_tail_chunk(mock_settings):
agent = LLMAgent("llm_agent")
agent.send = AsyncMock()
agent.logger = MagicMock()
agent.logger.llm = MagicMock()
# Patch _stream_query_llm to yield tokens that do NOT end with punctuation
async def fake_stream(messages):
yield "Hello"
@@ -262,3 +265,23 @@ async def test_stream_query_llm_skips_non_data_lines(mock_httpx_client, mock_set
# Only the valid 'data:' line should yield content
assert tokens == ["Hi"]
@pytest.mark.asyncio
async def test_clear_history_command(mock_settings):
"""Test that the 'clear_history' message clears the agent's memory."""
# setup LLM to have some history
mock_settings.agent_settings.bdi_program_manager_name = "bdi_program_manager_agent"
agent = LLMAgent("llm_agent")
agent.history = [
{"role": "user", "content": "Old conversation context"},
{"role": "assistant", "content": "Old response"},
]
assert len(agent.history) == 2
msg = InternalMessage(
to="llm_agent",
sender=mock_settings.agent_settings.bdi_program_manager_name,
body="clear_history",
)
await agent.handle_message(msg)
assert len(agent.history) == 0

View File

@@ -7,6 +7,15 @@ import zmq
from control_backend.agents.perception.vad_agent import VADAgent
# We don't want to use real ZMQ in unit tests, for example because it can give errors when sockets
# aren't closed properly.
@pytest.fixture(autouse=True)
def mock_zmq():
with patch("zmq.asyncio.Context") as mock:
mock.instance.return_value = MagicMock()
yield mock
@pytest.fixture
def audio_out_socket():
return AsyncMock()
@@ -140,12 +149,10 @@ async def test_vad_model_load_failure_stops_agent(vad_agent):
# Patch stop to an AsyncMock so we can check it was awaited
vad_agent.stop = AsyncMock()
result = await vad_agent.setup()
await vad_agent.setup()
# Assert stop was called
vad_agent.stop.assert_awaited_once()
# Assert setup returned None
assert result is None
@pytest.mark.asyncio
@@ -155,7 +162,7 @@ async def test_audio_out_bind_failure_sets_none_and_logs(vad_agent, caplog):
audio_out_socket is set to None, None is returned, and an error is logged.
"""
mock_socket = MagicMock()
mock_socket.bind_to_random_port.side_effect = zmq.ZMQBindError()
mock_socket.bind.side_effect = zmq.ZMQBindError()
with patch("control_backend.agents.perception.vad_agent.azmq.Context.instance") as mock_ctx:
mock_ctx.return_value.socket.return_value = mock_socket

View File

@@ -0,0 +1,146 @@
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock
import pytest
from control_backend.agents.user_interrupt.user_interrupt_agent import UserInterruptAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.ri_message import RIEndpoint
@pytest.fixture
def agent():
agent = UserInterruptAgent(name="user_interrupt_agent")
agent.send = AsyncMock()
agent.logger = MagicMock()
agent.sub_socket = AsyncMock()
return agent
@pytest.mark.asyncio
async def test_send_to_speech_agent(agent):
"""Verify speech command format."""
await agent._send_to_speech_agent("Hello World")
agent.send.assert_awaited_once()
sent_msg: InternalMessage = agent.send.call_args.args[0]
assert sent_msg.to == settings.agent_settings.robot_speech_name
body = json.loads(sent_msg.body)
assert body["data"] == "Hello World"
assert body["is_priority"] is True
@pytest.mark.asyncio
async def test_send_to_gesture_agent(agent):
"""Verify gesture command format."""
await agent._send_to_gesture_agent("wave_hand")
agent.send.assert_awaited_once()
sent_msg: InternalMessage = agent.send.call_args.args[0]
assert sent_msg.to == settings.agent_settings.robot_gesture_name
body = json.loads(sent_msg.body)
assert body["data"] == "wave_hand"
assert body["is_priority"] is True
assert body["endpoint"] == RIEndpoint.GESTURE_SINGLE.value
@pytest.mark.asyncio
async def test_send_to_program_manager(agent):
"""Verify belief update format."""
context_str = "2"
await agent._send_to_program_manager(context_str)
agent.send.assert_awaited_once()
sent_msg: InternalMessage = agent.send.call_args.args[0]
assert sent_msg.to == settings.agent_settings.bdi_program_manager_name
assert sent_msg.thread == "belief_override_id"
body = json.loads(sent_msg.body)
assert body["belief"] == context_str
@pytest.mark.asyncio
async def test_receive_loop_routing_success(agent):
"""
Test that the loop correctly:
1. Receives 'button_pressed' topic from ZMQ
2. Parses the JSON payload to find 'type' and 'context'
3. Calls the correct handler method based on 'type'
"""
# Prepare JSON payloads as bytes
payload_speech = json.dumps({"type": "speech", "context": "Hello Speech"}).encode()
payload_gesture = json.dumps({"type": "gesture", "context": "Hello Gesture"}).encode()
payload_override = json.dumps({"type": "override", "context": "Hello Override"}).encode()
agent.sub_socket.recv_multipart.side_effect = [
(b"button_pressed", payload_speech),
(b"button_pressed", payload_gesture),
(b"button_pressed", payload_override),
asyncio.CancelledError, # Stop the infinite loop
]
agent._send_to_speech_agent = AsyncMock()
agent._send_to_gesture_agent = AsyncMock()
agent._send_to_program_manager = AsyncMock()
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
await asyncio.sleep(0)
# Speech
agent._send_to_speech_agent.assert_awaited_once_with("Hello Speech")
# Gesture
agent._send_to_gesture_agent.assert_awaited_once_with("Hello Gesture")
# Override
agent._send_to_program_manager.assert_awaited_once_with("Hello Override")
assert agent._send_to_speech_agent.await_count == 1
assert agent._send_to_gesture_agent.await_count == 1
assert agent._send_to_program_manager.await_count == 1
@pytest.mark.asyncio
async def test_receive_loop_unknown_type(agent):
"""Test that unknown 'type' values in the JSON log a warning and do not crash."""
# Prepare a payload with an unknown type
payload_unknown = json.dumps({"type": "unknown_thing", "context": "some_data"}).encode()
agent.sub_socket.recv_multipart.side_effect = [
(b"button_pressed", payload_unknown),
asyncio.CancelledError,
]
agent._send_to_speech_agent = AsyncMock()
agent._send_to_gesture_agent = AsyncMock()
agent._send_to_belief_collector = AsyncMock()
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
await asyncio.sleep(0)
# Ensure no handlers were called
agent._send_to_speech_agent.assert_not_called()
agent._send_to_gesture_agent.assert_not_called()
agent._send_to_belief_collector.assert_not_called()
agent.logger.warning.assert_called_with(
"Received button press with unknown type '%s' (context: '%s').",
"unknown_thing",
"some_data",
)

View File

@@ -1,4 +1,5 @@
import json
import uuid
from unittest.mock import AsyncMock
import pytest
@@ -6,7 +7,7 @@ from fastapi import FastAPI
from fastapi.testclient import TestClient
from control_backend.api.v1.endpoints import program
from control_backend.schemas.program import Program
from control_backend.schemas.program import BasicNorm, Goal, Phase, Plan, Program
@pytest.fixture
@@ -25,29 +26,39 @@ def client(app):
def make_valid_program_dict():
"""Helper to create a valid Program JSON structure."""
return {
"phases": [
{
"id": "phase1",
"label": "basephase",
"norms": [{"id": "n1", "label": "norm", "norm": "be nice"}],
"goals": [
{"id": "g1", "label": "goal", "description": "test goal", "achieved": False}
# Converting to JSON using Pydantic because it knows how to convert a UUID object
program_json_str = Program(
phases=[
Phase(
id=uuid.uuid4(),
name="Basic Phase",
norms=[
BasicNorm(
id=uuid.uuid4(),
name="Some norm",
norm="Do normal.",
),
],
"triggers": [
{
"id": "t1",
"label": "trigger",
"type": "keywords",
"keywords": [
{"id": "kw1", "keyword": "keyword1"},
{"id": "kw2", "keyword": "keyword2"},
],
},
goals=[
Goal(
id=uuid.uuid4(),
name="Some goal",
description="This description can be used to determine whether the goal "
"has been achieved.",
plan=Plan(
id=uuid.uuid4(),
name="Goal Plan",
steps=[],
),
can_fail=False,
),
],
}
]
}
triggers=[],
),
],
).model_dump_json()
# Converting back to a dict because that's what's expected
return json.loads(program_json_str)
def test_receive_program_success(client):
@@ -71,7 +82,8 @@ def test_receive_program_success(client):
sent_bytes = args[0][1]
sent_obj = json.loads(sent_bytes.decode())
expected_obj = Program.model_validate(program_dict).model_dump()
# Converting to JSON using Pydantic because it knows how to handle UUIDs
expected_obj = json.loads(Program.model_validate(program_dict).model_dump_json())
assert sent_obj == expected_obj

View File

@@ -1,49 +1,66 @@
import uuid
import pytest
from pydantic import ValidationError
from control_backend.schemas.program import (
BasicNorm,
ConditionalNorm,
Goal,
KeywordTrigger,
Norm,
InferredBelief,
KeywordBelief,
LogicalOperator,
Phase,
Plan,
Program,
TriggerKeyword,
SemanticBelief,
Trigger,
)
def base_norm() -> Norm:
return Norm(
id="norm1",
label="testNorm",
def base_norm() -> BasicNorm:
return BasicNorm(
id=uuid.uuid4(),
name="testNormName",
norm="testNormNorm",
critical=False,
)
def base_goal() -> Goal:
return Goal(
id="goal1",
label="testGoal",
description="testGoalDescription",
achieved=False,
id=uuid.uuid4(),
name="testGoalName",
description="This description can be used to determine whether the goal has been achieved.",
plan=Plan(
id=uuid.uuid4(),
name="testGoalPlanName",
steps=[],
),
can_fail=False,
)
def base_trigger() -> KeywordTrigger:
return KeywordTrigger(
id="trigger1",
label="testTrigger",
type="keywords",
keywords=[
TriggerKeyword(id="keyword1", keyword="testKeyword1"),
TriggerKeyword(id="keyword1", keyword="testKeyword2"),
],
def base_trigger() -> Trigger:
return Trigger(
id=uuid.uuid4(),
name="testTriggerName",
condition=KeywordBelief(
id=uuid.uuid4(),
name="testTriggerKeywordBeliefTriggerName",
keyword="Keyword",
),
plan=Plan(
id=uuid.uuid4(),
name="testTriggerPlanName",
steps=[],
),
)
def base_phase() -> Phase:
return Phase(
id="phase1",
label="basephase",
id=uuid.uuid4(),
norms=[base_norm()],
goals=[base_goal()],
triggers=[base_trigger()],
@@ -58,7 +75,7 @@ def invalid_program() -> dict:
# wrong types inside phases list (not Phase objects)
return {
"phases": [
{"id": "phase1"}, # incomplete
{"id": uuid.uuid4()}, # incomplete
{"not_a_phase": True},
]
}
@@ -77,11 +94,112 @@ def test_valid_deepprogram():
# validate nested components directly
phase = validated.phases[0]
assert isinstance(phase.goals[0], Goal)
assert isinstance(phase.triggers[0], KeywordTrigger)
assert isinstance(phase.norms[0], Norm)
assert isinstance(phase.triggers[0], Trigger)
assert isinstance(phase.norms[0], BasicNorm)
def test_invalid_program():
bad = invalid_program()
with pytest.raises(ValidationError):
Program.model_validate(bad)
def test_conditional_norm_parsing():
"""
Check that pydantic is able to preserve the type of the norm, that it doesn't lose its
"condition" field when serializing and deserializing.
"""
norm = ConditionalNorm(
name="testNormName",
id=uuid.uuid4(),
norm="testNormNorm",
critical=False,
condition=KeywordBelief(
name="testKeywordBelief",
id=uuid.uuid4(),
keyword="testKeywordBelief",
),
)
program = Program(
phases=[
Phase(
name="Some phase",
id=uuid.uuid4(),
norms=[norm],
goals=[],
triggers=[],
),
],
)
parsed_program = Program.model_validate_json(program.model_dump_json())
parsed_norm = parsed_program.phases[0].norms[0]
assert hasattr(parsed_norm, "condition")
assert isinstance(parsed_norm, ConditionalNorm)
def test_belief_type_parsing():
"""
Check that pydantic is able to discern between the different types of beliefs when serializing
and deserializing.
"""
keyword_belief = KeywordBelief(
name="testKeywordBelief",
id=uuid.uuid4(),
keyword="something",
)
semantic_belief = SemanticBelief(
name="testSemanticBelief",
id=uuid.uuid4(),
description="something",
)
inferred_belief = InferredBelief(
name="testInferredBelief",
id=uuid.uuid4(),
operator=LogicalOperator.OR,
left=keyword_belief,
right=semantic_belief,
)
program = Program(
phases=[
Phase(
name="Some phase",
id=uuid.uuid4(),
norms=[],
goals=[],
triggers=[
Trigger(
name="testTriggerKeywordTrigger",
id=uuid.uuid4(),
condition=keyword_belief,
plan=Plan(name="testTriggerPlanName", id=uuid.uuid4(), steps=[]),
),
Trigger(
name="testTriggerSemanticTrigger",
id=uuid.uuid4(),
condition=semantic_belief,
plan=Plan(name="testTriggerPlanName", id=uuid.uuid4(), steps=[]),
),
Trigger(
name="testTriggerInferredTrigger",
id=uuid.uuid4(),
condition=inferred_belief,
plan=Plan(name="testTriggerPlanName", id=uuid.uuid4(), steps=[]),
),
],
),
],
)
parsed_program = Program.model_validate_json(program.model_dump_json())
parsed_keyword_belief = parsed_program.phases[0].triggers[0].condition
assert isinstance(parsed_keyword_belief, KeywordBelief)
parsed_semantic_belief = parsed_program.phases[0].triggers[1].condition
assert isinstance(parsed_semantic_belief, SemanticBelief)
parsed_inferred_belief = parsed_program.phases[0].triggers[2].condition
assert isinstance(parsed_inferred_belief, InferredBelief)