Compare commits

...

60 Commits

Author SHA1 Message Date
Pim Hutting
7c10c50336 chore: removed resetExperiment from backened
now it happens in UI

ref: N25B-400
2026-01-16 14:29:46 +01:00
Pim Hutting
6d03ba8a41 feat: added extra endpoint for norm pings
also made sure that you cannot skip phase on end phase

ref: N25B-400
2026-01-16 14:28:27 +01:00
Pim Hutting
041fc4ab6e chore: cond_norms unachieve and via belief msg 2026-01-15 09:02:52 +01:00
39e1bb1ead fix: sync issues
ref: N25B-447
2026-01-14 15:28:29 +01:00
8f6662e64a feat: phase transitions
ref: N25B-446
2026-01-14 13:22:51 +01:00
0794c549a8 chore: remove agentspeak file from tracking 2026-01-14 11:27:29 +01:00
ff24ab7a27 fix: default behavior and end phase
ref: N25B-448
2026-01-14 11:24:19 +01:00
43ac8ad69f chore: delete outdated files
ref: N25B-446
2026-01-14 10:58:41 +01:00
Twirre Meulenbelt
f7669c021b feat: support force completed goals in semantic belief agent
ref: N25B-427
2026-01-13 17:04:44 +01:00
Björn Otgaar
8f52f8bf0c Merge branch 'feat/monitoringpage-cb' of git.science.uu.nl:ics/sp/2025/n25b/pepperplus-cb into feat/monitoringpage-cb 2026-01-13 14:03:40 +01:00
Björn Otgaar
2a94a45b34 chore: adjust 'phase_id' to 'id' for correct payload 2026-01-13 14:03:37 +01:00
f87651f691 fix: achieved goal in bdi core
ref: N25B-400
2026-01-13 12:26:18 +01:00
Pim Hutting
65e0b2d250 feat: added correct message
ref: N25B-400
2026-01-13 12:05:38 +01:00
177e844349 feat: send achieved goal from interrupt->manager->semantic
ref: N25B-400
2026-01-13 11:46:17 +01:00
Pim Hutting
0df6040444 feat: added sending goal overwrites in Userinter.
ref: N25B-400
2026-01-13 11:26:03 +01:00
Twirre Meulenbelt
af81bd8620 Merge branch 'feat/multiple-receivers' into feat/monitoringpage-cb
# Conflicts:
#	src/control_backend/core/agent_system.py
#	src/control_backend/schemas/internal_message.py
2026-01-13 11:14:18 +01:00
Twirre Meulenbelt
70e05b6c92 test: sending to multiple agents, including remote
ref: N25B-441
2026-01-13 11:10:35 +01:00
c0b8fb8612 feat: able to send to multiple receivers
ref: N25B-441
2026-01-13 11:06:42 +01:00
Pim Hutting
d499111ea4 feat: added pause functionality
Storms code wasnt fully included in Bjorns branch

ref: N25B-400
2026-01-13 00:52:04 +01:00
Pim Hutting
72c2c57f26 chore: merged button functionality and fix bug
merged björns branch that has the following button functionality
-Pause/resume
-Next phase
-Restart phase
-reset experiment
fix bug where norms where not properly sent to the user interrupt agent

ref: N25B-400
2026-01-12 19:31:50 +01:00
Pim Hutting
4a014b577a Merge remote-tracking branch 'origin/feat/reset-skip-buttons' into feat/monitoringpage-cb 2026-01-12 19:19:31 +01:00
Pim Hutting
c45a258b22 fix: fixed a bug where norms where not updated
Now in UserInterruptAgent we store the norm.norm and not the slugified norm

ref: N25B-400
2026-01-12 19:07:05 +01:00
0f09276477 fix: send norms back to UI
ref: N25B-400
2026-01-12 17:02:39 +01:00
4e113c2d5c fix: default plan and norm force
ref: N25B-400
2026-01-12 16:20:24 +01:00
Pim Hutting
54c835cc0f feat: added force_norm handling in BDI core agent
ref: N25B-400
2026-01-12 15:37:04 +01:00
Pim Hutting
c4ccbcd354 Merge remote-tracking branch 'origin/feat/extra-agentspeak-functionality' into feat/monitoringpage-cb 2026-01-12 15:24:48 +01:00
Pim Hutting
d202abcd1b fix: phases update correctly
there was a bug where phases would not update without restarting cb

ref: N25B-400
2026-01-12 12:51:24 +01:00
Twirre Meulenbelt
4b71981a3e fix: some bugs and some tests
ref: N25B-429
2026-01-12 09:00:50 +01:00
866d7c4958 fix: end phase loop correctly notifies about user_said
ref: N25B-429
2026-01-08 15:13:12 +01:00
Pim Hutting
5e2126fc21 chore: code cleanup
ref: N25B-400
2026-01-08 15:05:43 +01:00
Pim Hutting
500bbc2d82 feat: added goal start sending functionality
ref: N25B-400
2026-01-08 14:52:55 +01:00
133019a928 feat: trigger name and trigger checks on belief update
ref: N25B-429
2026-01-08 14:04:44 +01:00
4d0ba69443 fix: don't re-add user_said upon phase transition
ref: N25B-429
2026-01-08 13:44:25 +01:00
625ef0c365 feat: phase transition waits for all goals
ref: N25B-429
2026-01-08 13:36:03 +01:00
b88758fa76 feat: phase transition independent of response
ref: N25B-429
2026-01-08 13:33:37 +01:00
Pim Hutting
3a8d1730a1 fix: made mapping for conditional norms only
ref: N25B-400
2026-01-08 12:29:16 +01:00
Pim Hutting
b27e5180c4 feat: small implementation change
ref: N25B-400
2026-01-08 11:25:53 +01:00
Pim Hutting
6b34f4b82c fix: small bugfix
ref: N25B-400
2026-01-08 10:59:24 +01:00
Twirre Meulenbelt
45719c580b feat: prepend more silence before speech audio for better transcription beginnings
ref: N25B-429
2026-01-08 10:49:13 +01:00
Pim Hutting
4bf2be6359 feat: added a functionality for monitoring page
ref: N25B-400
2026-01-08 09:56:10 +01:00
Pim Hutting
20e5e46639 Merge remote-tracking branch 'origin/feat/extra-agentspeak-functionality' into feat/monitoringpage-cb 2026-01-07 22:42:40 +01:00
Pim Hutting
365d449666 feat: commit before I can merge new changes
ref: N25B-400
2026-01-07 22:41:59 +01:00
5a61225c6f feat: reset extractor history
ref: N25B-429
2026-01-07 18:10:13 +01:00
a30cea5231 Merge branch 'feat/semantic-beliefs' into feat/extra-agentspeak-functionality 2026-01-07 17:51:30 +01:00
Twirre Meulenbelt
93d67ccb66 feat: add reset functionality to semantic belief extractor
ref: N25B-432
2026-01-07 17:50:47 +01:00
240624f887 Merge branch 'dev' into feat/extra-agentspeak-functionality
# Conflicts:
#	src/control_backend/agents/bdi/bdi_program_manager.py
#	src/control_backend/agents/llm/llm_agent.py
#	test/unit/agents/bdi/test_bdi_program_manager.py
2026-01-07 17:46:48 +01:00
Pim Hutting
be6bbbb849 feat: added endpoint userinterrupt to userinterrupt
ref: N25B-400
2026-01-07 17:42:54 +01:00
8a77e8e1c7 feat: check goals only for this phase
Since conversation history still remains we can still check at a later point.

ref: N25B-429
2026-01-07 17:31:24 +01:00
3b4dccc760 Merge branch 'feat/semantic-beliefs' into feat/extra-agentspeak-functionality
# Conflicts:
#	src/control_backend/agents/bdi/bdi_program_manager.py
2026-01-07 17:20:52 +01:00
3d49e44cf7 fix: complete pipeline working
User interrupts still need to be tested.

ref: N25B-429
2026-01-07 17:13:58 +01:00
Twirre Meulenbelt
aa5b386f65 feat: semantically determine goal completion
ref: N25B-432
2026-01-07 17:08:23 +01:00
Twirre Meulenbelt
3189b9fee3 fix: let belief extractor send user_said belief
ref: N25B-429
2026-01-07 15:19:23 +01:00
Björn Otgaar
612a96940d Merge branch 'feat/environment-variables' into 'dev'
Docs for environment variables, parameterize some constants

See merge request ics/sp/2025/n25b/pepperplus-cb!38
2026-01-06 09:02:49 +00:00
Pim Hutting
4c20656c75 Merge branch 'feat/program-reset-llm' into 'dev'
feat: made program reset LLM

See merge request ics/sp/2025/n25b/pepperplus-cb!39
2026-01-02 15:13:05 +00:00
Pim Hutting
6ca86e4b81 feat: made program reset LLM 2026-01-02 15:13:04 +00:00
Twirre Meulenbelt
7d798f2e77 Merge remote-tracking branch 'origin/dev' into feat/environment-variables
# Conflicts:
#	src/control_backend/core/config.py
#	test/unit/agents/actuation/test_robot_speech_agent.py
2025-12-29 12:40:16 +01:00
Twirre Meulenbelt
5282c2471f Merge remote-tracking branch 'origin/dev' into feat/environment-variables
# Conflicts:
#	src/control_backend/core/config.py
#	test/unit/agents/actuation/test_robot_speech_agent.py
2025-12-29 12:35:39 +01:00
Twirre Meulenbelt
0c682d6440 feat: introduce .env.example, docs
The example includes options that are expected to be changed. It also includes a reference to where in the docs you can find a full list of options.

ref: N25B-352
2025-12-11 13:35:19 +01:00
Twirre Meulenbelt
32d8f20dc9 feat: parameterize RI host
Was "localhost" in RI Communication Agent, now uses configurable setting. Secretly also removing "localhost" from VAD agent, as its socket should be something that's "inproc".

ref: N25B-352
2025-12-11 12:12:15 +01:00
Twirre Meulenbelt
9cc0e39955 fix: failures main tests since VAD agent initialization was changed
The test still expects the VAD agent to be started in main, rather than in the RI Communication Agent.

ref: N25B-356
2025-12-11 12:04:24 +01:00
33 changed files with 1688 additions and 1160 deletions

20
.env.example Normal file
View File

@@ -0,0 +1,20 @@
# Example .env file. To use, make a copy, call it ".env" (i.e. removing the ".example" suffix), then you edit values.
# The hostname of the Robot Interface. Change if the Control Backend and Robot Interface are running on different computers.
RI_HOST="localhost"
# URL for the local LLM API. Must be an API that implements the OpenAI Chat Completions API, but most do.
LLM_SETTINGS__LOCAL_LLM_URL="http://localhost:1234/v1/chat/completions"
# Name of the local LLM model to use.
LLM_SETTINGS__LOCAL_LLM_MODEL="gpt-oss"
# Number of non-speech chunks to wait before speech ended. A chunk is approximately 31 ms. Increasing this number allows longer pauses in speech, but also increases response time.
BEHAVIOUR_SETTINGS__VAD_NON_SPEECH_PATIENCE_CHUNKS=15
# Timeout in milliseconds for socket polling. Increase this number if network latency/jitter is high, often the case when using Wi-Fi. Perhaps 500 ms. A symptom of this issue is transcriptions getting cut off.
BEHAVIOUR_SETTINGS__SOCKET_POLLER_TIMEOUT_MS=100
# For an exhaustive list of options, see the control_backend.core.config module in the docs.

2
.gitignore vendored
View File

@@ -222,6 +222,8 @@ __marimo__/
docs/* docs/*
!docs/conf.py !docs/conf.py
# Generated files
agentspeak.asl

View File

@@ -27,6 +27,7 @@ This + part might differ based on what model you choose.
copy the model name in the module loaded and replace local_llm_modelL. In settings. copy the model name in the module loaded and replace local_llm_modelL. In settings.
## Running ## Running
To run the project (development server), execute the following command (while inside the root repository): To run the project (development server), execute the following command (while inside the root repository):
@@ -34,6 +35,14 @@ To run the project (development server), execute the following command (while in
uv run fastapi dev src/control_backend/main.py uv run fastapi dev src/control_backend/main.py
``` ```
### Environment Variables
You can use environment variables to change settings. Make a copy of the [`.env.example`](.env.example) file, name it `.env` and put it in the root directory. The file itself describes how to do the configuration.
For an exhaustive list of environment options, see the `control_backend.core.config` module in the docs.
## Testing ## Testing
Testing happens automatically when opening a merge request to any branch. If you want to manually run the test suite, you can do so by running the following for unit tests: Testing happens automatically when opening a merge request to any branch. If you want to manually run the test suite, you can do so by running the following for unit tests:

View File

@@ -33,7 +33,7 @@ class RobotGestureAgent(BaseAgent):
def __init__( def __init__(
self, self,
name: str, name: str,
address=settings.zmq_settings.ri_command_address, address: str,
bind=False, bind=False,
gesture_data=None, gesture_data=None,
single_gesture_data=None, single_gesture_data=None,

View File

@@ -3,9 +3,11 @@ from functools import singledispatchmethod
from slugify import slugify from slugify import slugify
from control_backend.agents.bdi.agentspeak_ast import ( from control_backend.agents.bdi.agentspeak_ast import (
AstAtom,
AstBinaryOp, AstBinaryOp,
AstExpression, AstExpression,
AstLiteral, AstLiteral,
AstNumber,
AstPlan, AstPlan,
AstProgram, AstProgram,
AstRule, AstRule,
@@ -17,6 +19,7 @@ from control_backend.agents.bdi.agentspeak_ast import (
TriggerType, TriggerType,
) )
from control_backend.schemas.program import ( from control_backend.schemas.program import (
BaseGoal,
BasicNorm, BasicNorm,
ConditionalNorm, ConditionalNorm,
GestureAction, GestureAction,
@@ -42,7 +45,13 @@ class AgentSpeakGenerator:
def generate(self, program: Program) -> str: def generate(self, program: Program) -> str:
self._asp = AstProgram() self._asp = AstProgram()
self._asp.rules.append(AstRule(self._astify(program.phases[0]))) if program.phases:
self._asp.rules.append(AstRule(self._astify(program.phases[0])))
else:
self._asp.rules.append(AstRule(AstLiteral("phase", [AstString("end")])))
self._asp.rules.append(AstRule(AstLiteral("!notify_cycle")))
self._add_keyword_inference() self._add_keyword_inference()
self._add_default_plans() self._add_default_plans()
@@ -70,6 +79,7 @@ class AgentSpeakGenerator:
self._add_reply_with_goal_plan() self._add_reply_with_goal_plan()
self._add_say_plan() self._add_say_plan()
self._add_reply_plan() self._add_reply_plan()
self._add_notify_cycle_plan()
def _add_reply_with_goal_plan(self): def _add_reply_with_goal_plan(self):
self._asp.plans.append( self._asp.plans.append(
@@ -132,6 +142,29 @@ class AgentSpeakGenerator:
) )
) )
def _add_notify_cycle_plan(self):
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("notify_cycle"),
[],
[
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"findall",
[AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")],
),
),
AstStatement(
StatementType.DO_ACTION, AstLiteral("notify_norms", [AstVar("Norms")])
),
AstStatement(StatementType.DO_ACTION, AstLiteral("wait", [AstNumber(100)])),
AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("notify_cycle")),
],
)
)
def _process_phases(self, phases: list[Phase]) -> None: def _process_phases(self, phases: list[Phase]) -> None:
for curr_phase, next_phase in zip([None] + phases, phases + [None], strict=True): for curr_phase, next_phase in zip([None] + phases, phases + [None], strict=True):
if curr_phase: if curr_phase:
@@ -145,7 +178,12 @@ class AgentSpeakGenerator:
type=TriggerType.ADDED_BELIEF, type=TriggerType.ADDED_BELIEF,
trigger_literal=AstLiteral("user_said", [AstVar("Message")]), trigger_literal=AstLiteral("user_said", [AstVar("Message")]),
context=[AstLiteral("phase", [AstString("end")])], context=[AstLiteral("phase", [AstString("end")])],
body=[AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("reply"))], body=[
AstStatement(
StatementType.DO_ACTION, AstLiteral("notify_user_said", [AstVar("Message")])
),
AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("reply")),
],
) )
) )
@@ -157,7 +195,7 @@ class AgentSpeakGenerator:
previous_goal = None previous_goal = None
for goal in phase.goals: for goal in phase.goals:
self._process_goal(goal, phase, previous_goal) self._process_goal(goal, phase, previous_goal, main_goal=True)
previous_goal = goal previous_goal = goal
for trigger in phase.triggers: for trigger in phase.triggers:
@@ -171,29 +209,57 @@ class AgentSpeakGenerator:
self._astify(to_phase) if to_phase else AstLiteral("phase", [AstString("end")]) self._astify(to_phase) if to_phase else AstLiteral("phase", [AstString("end")])
) )
context = [from_phase_ast, ~AstLiteral("responded_this_turn")] check_context = [from_phase_ast]
if from_phase and from_phase.goals: if from_phase:
context.append(self._astify(from_phase.goals[-1], achieved=True)) for goal in from_phase.goals:
check_context.append(self._astify(goal, achieved=True))
force_context = [from_phase_ast]
body = [ body = [
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"notify_transition_phase",
[
AstString(str(from_phase.id)),
AstString(str(to_phase.id) if to_phase else "end"),
],
),
),
AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast), AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast),
AstStatement(StatementType.ADD_BELIEF, to_phase_ast), AstStatement(StatementType.ADD_BELIEF, to_phase_ast),
] ]
if from_phase: # if from_phase:
body.extend( # body.extend(
[ # [
AstStatement( # AstStatement(
StatementType.TEST_GOAL, AstLiteral("user_said", [AstVar("Message")]) # StatementType.TEST_GOAL, AstLiteral("user_said", [AstVar("Message")])
), # ),
AstStatement( # AstStatement(
StatementType.REPLACE_BELIEF, AstLiteral("user_said", [AstVar("Message")]) # StatementType.REPLACE_BELIEF, AstLiteral("user_said", [AstVar("Message")])
), # ),
] # ]
) # )
# Check
self._asp.plans.append( self._asp.plans.append(
AstPlan(TriggerType.ADDED_GOAL, AstLiteral("transition_phase"), context, body) AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("transition_phase"),
check_context,
[
AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("force_transition_phase")),
],
)
)
# Force
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL, AstLiteral("force_transition_phase"), force_context, body
)
) )
def _process_norm(self, norm: Norm, phase: Phase) -> None: def _process_norm(self, norm: Norm, phase: Phase) -> None:
@@ -201,7 +267,11 @@ class AgentSpeakGenerator:
match norm: match norm:
case ConditionalNorm(condition=cond): case ConditionalNorm(condition=cond):
rule = AstRule(self._astify(norm), self._astify(phase) & self._astify(cond)) rule = AstRule(
self._astify(norm),
self._astify(phase) & self._astify(cond)
| AstAtom(f"force_{self.slugify(norm)}"),
)
case BasicNorm(): case BasicNorm():
rule = AstRule(self._astify(norm), self._astify(phase)) rule = AstRule(self._astify(norm), self._astify(phase))
@@ -213,6 +283,11 @@ class AgentSpeakGenerator:
def _add_default_loop(self, phase: Phase) -> None: def _add_default_loop(self, phase: Phase) -> None:
actions = [] actions = []
actions.append(
AstStatement(
StatementType.DO_ACTION, AstLiteral("notify_user_said", [AstVar("Message")])
)
)
actions.append(AstStatement(StatementType.REMOVE_BELIEF, AstLiteral("responded_this_turn"))) actions.append(AstStatement(StatementType.REMOVE_BELIEF, AstLiteral("responded_this_turn")))
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("check_triggers"))) actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("check_triggers")))
@@ -236,6 +311,7 @@ class AgentSpeakGenerator:
phase: Phase, phase: Phase,
previous_goal: Goal | None = None, previous_goal: Goal | None = None,
continues_response: bool = False, continues_response: bool = False,
main_goal: bool = False,
) -> None: ) -> None:
context: list[AstExpression] = [self._astify(phase)] context: list[AstExpression] = [self._astify(phase)]
context.append(~self._astify(goal, achieved=True)) context.append(~self._astify(goal, achieved=True))
@@ -245,6 +321,13 @@ class AgentSpeakGenerator:
context.append(~AstLiteral("responded_this_turn")) context.append(~AstLiteral("responded_this_turn"))
body = [] body = []
if main_goal: # UI only needs to know about the main goals
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral("notify_goal_start", [AstString(self.slugify(goal))]),
)
)
subgoals = [] subgoals = []
for step in goal.plan.steps: for step in goal.plan.steps:
@@ -283,12 +366,28 @@ class AgentSpeakGenerator:
body = [] body = []
subgoals = [] subgoals = []
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral("notify_trigger_start", [AstString(self.slugify(trigger))]),
)
)
for step in trigger.plan.steps: for step in trigger.plan.steps:
body.append(self._step_to_statement(step)) body.append(self._step_to_statement(step))
if isinstance(step, Goal): if isinstance(step, Goal):
step.can_fail = False # triggers are continuous sequence step.can_fail = False # triggers are continuous sequence
subgoals.append(step) subgoals.append(step)
# Arbitrary wait for UI to display nicely
body.append(AstStatement(StatementType.DO_ACTION, AstLiteral("wait", [AstNumber(2000)])))
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral("notify_trigger_end", [AstString(self.slugify(trigger))]),
)
)
self._asp.plans.append( self._asp.plans.append(
AstPlan( AstPlan(
TriggerType.ADDED_GOAL, TriggerType.ADDED_GOAL,
@@ -298,6 +397,9 @@ class AgentSpeakGenerator:
) )
) )
# Force trigger (from UI)
self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(trigger), [], body))
for subgoal in subgoals: for subgoal in subgoals:
self._process_goal(subgoal, phase, continues_response=True) self._process_goal(subgoal, phase, continues_response=True)
@@ -322,6 +424,16 @@ class AgentSpeakGenerator:
) )
) )
# Force phase transition fallback
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("force_transition_phase"),
[],
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
@singledispatchmethod @singledispatchmethod
def _astify(self, element: ProgramElement) -> AstExpression: def _astify(self, element: ProgramElement) -> AstExpression:
raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.") raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.")
@@ -332,13 +444,7 @@ class AgentSpeakGenerator:
@_astify.register @_astify.register
def _(self, sb: SemanticBelief) -> AstExpression: def _(self, sb: SemanticBelief) -> AstExpression:
return AstLiteral(self.get_semantic_belief_slug(sb)) return AstLiteral(self.slugify(sb))
@staticmethod
def get_semantic_belief_slug(sb: SemanticBelief) -> str:
# If you need a method like this for other types, make a public slugify singledispatch for
# all types.
return f"semantic_{AgentSpeakGenerator._slugify_str(sb.name)}"
@_astify.register @_astify.register
def _(self, ib: InferredBelief) -> AstExpression: def _(self, ib: InferredBelief) -> AstExpression:
@@ -383,6 +489,11 @@ class AgentSpeakGenerator:
def slugify(element: ProgramElement) -> str: def slugify(element: ProgramElement) -> str:
raise NotImplementedError(f"Cannot convert element {element} to a slug.") raise NotImplementedError(f"Cannot convert element {element} to a slug.")
@slugify.register
@staticmethod
def _(n: Norm) -> str:
return f"norm_{AgentSpeakGenerator._slugify_str(n.norm)}"
@slugify.register @slugify.register
@staticmethod @staticmethod
def _(sb: SemanticBelief) -> str: def _(sb: SemanticBelief) -> str:
@@ -390,7 +501,7 @@ class AgentSpeakGenerator:
@slugify.register @slugify.register
@staticmethod @staticmethod
def _(g: Goal) -> str: def _(g: BaseGoal) -> str:
return AgentSpeakGenerator._slugify_str(g.name) return AgentSpeakGenerator._slugify_str(g.name)
@slugify.register @slugify.register

View File

@@ -1,203 +0,0 @@
import typing
from dataclasses import dataclass, field
# --- Types ---
@dataclass
class BeliefLiteral:
"""
Represents a literal or atom.
Example: phase(1), user_said("hello"), ~started
"""
functor: str
args: list[str] = field(default_factory=list)
negated: bool = False
def __str__(self):
# In ASL, 'not' is usually for closed-world assumption (prolog style),
# '~' is for explicit negation in beliefs.
# For simplicity in behavior trees, we often use 'not' for conditions.
prefix = "not " if self.negated else ""
if not self.args:
return f"{prefix}{self.functor}"
# Clean args to ensure strings are quoted if they look like strings,
# but usually the converter handles the quoting of string literals.
args_str = ", ".join(self.args)
return f"{prefix}{self.functor}({args_str})"
@dataclass
class GoalLiteral:
name: str
def __str__(self):
return f"!{self.name}"
@dataclass
class ActionLiteral:
"""
Represents a step in a plan body.
Example: .say("Hello") or !achieve_goal
"""
code: str
def __str__(self):
return self.code
@dataclass
class BinaryOp:
"""
Represents logical operations.
Example: (A & B) | C
"""
left: "Expression | str"
operator: typing.Literal["&", "|"]
right: "Expression | str"
def __str__(self):
l_str = str(self.left)
r_str = str(self.right)
if isinstance(self.left, BinaryOp):
l_str = f"({l_str})"
if isinstance(self.right, BinaryOp):
r_str = f"({r_str})"
return f"{l_str} {self.operator} {r_str}"
Literal = BeliefLiteral | GoalLiteral | ActionLiteral
Expression = Literal | BinaryOp | str
@dataclass
class Rule:
"""
Represents an inference rule.
Example: head :- body.
"""
head: Expression
body: Expression | None = None
def __str__(self):
if not self.body:
return f"{self.head}."
return f"{self.head} :- {self.body}."
@dataclass
class PersistentRule:
"""
Represents an inference rule, where the inferred belief is persistent when formed.
"""
head: Expression
body: Expression
def __str__(self):
if not self.body:
raise Exception("Rule without body should not be persistent.")
lines = []
if isinstance(self.body, BinaryOp):
lines.append(f"+{self.body.left}")
if self.body.operator == "&":
lines.append(f" : {self.body.right}")
lines.append(f" <- +{self.head}.")
if self.body.operator == "|":
lines.append(f"+{self.body.right}")
lines.append(f" <- +{self.head}.")
return "\n".join(lines)
@dataclass
class Plan:
"""
Represents a plan.
Syntax: +trigger : context <- body.
"""
trigger: BeliefLiteral | GoalLiteral
context: list[Expression] = field(default_factory=list)
body: list[ActionLiteral] = field(default_factory=list)
def __str__(self):
# Indentation settings
INDENT = " "
ARROW = "\n <- "
COLON = "\n : "
# Build Header
header = f"+{self.trigger}"
if self.context:
ctx_str = f" &\n{INDENT}".join(str(c) for c in self.context)
header += f"{COLON}{ctx_str}"
# Case 1: Empty body
if not self.body:
return f"{header}."
# Case 2: Short body (optional optimization, keeping it uniform usually better)
header += ARROW
lines = []
# We start the first action on the same line or next line.
# Let's put it on the next line for readability if there are multiple.
if len(self.body) == 1:
return f"{header}{self.body[0]}."
# First item
lines.append(f"{header}{self.body[0]};")
# Middle items
for item in self.body[1:-1]:
lines.append(f"{INDENT}{item};")
# Last item
lines.append(f"{INDENT}{self.body[-1]}.")
return "\n".join(lines)
@dataclass
class AgentSpeakFile:
"""
Root element representing the entire generated file.
"""
initial_beliefs: list[Rule] = field(default_factory=list)
inference_rules: list[Rule | PersistentRule] = field(default_factory=list)
plans: list[Plan] = field(default_factory=list)
def __str__(self):
sections = []
if self.initial_beliefs:
sections.append("// --- Initial Beliefs & Facts ---")
sections.extend(str(rule) for rule in self.initial_beliefs)
sections.append("")
if self.inference_rules:
sections.append("// --- Inference Rules ---")
sections.extend(str(rule) for rule in self.inference_rules if isinstance(rule, Rule))
sections.append("")
sections.extend(
str(rule) for rule in self.inference_rules if isinstance(rule, PersistentRule)
)
sections.append("")
if self.plans:
sections.append("// --- Plans ---")
# Separate plans by a newline for readability
sections.extend(str(plan) + "\n" for plan in self.plans)
return "\n".join(sections)

View File

@@ -1,425 +0,0 @@
import asyncio
import time
from functools import singledispatchmethod
from slugify import slugify
from control_backend.agents.bdi import BDICoreAgent
from control_backend.agents.bdi.asl_ast import (
ActionLiteral,
AgentSpeakFile,
BeliefLiteral,
BinaryOp,
Expression,
GoalLiteral,
PersistentRule,
Plan,
Rule,
)
from control_backend.agents.bdi.bdi_program_manager import test_program
from control_backend.schemas.program import (
BasicBelief,
Belief,
ConditionalNorm,
GestureAction,
Goal,
InferredBelief,
KeywordBelief,
LLMAction,
LogicalOperator,
Phase,
Program,
ProgramElement,
SemanticBelief,
SpeechAction,
)
async def do_things():
res = input("Wanna generate")
if res == "y":
program = AgentSpeakGenerator().generate(test_program)
filename = f"{int(time.time())}.asl"
with open(filename, "w") as f:
f.write(program)
else:
# filename = "0test.asl"
filename = "1766062491.asl"
bdi_agent = BDICoreAgent("BDICoreAgent", filename)
flag = asyncio.Event()
await bdi_agent.start()
await flag.wait()
def do_other_things():
print(AgentSpeakGenerator().generate(test_program))
class AgentSpeakGenerator:
"""
Converts a Pydantic Program behavior model into an AgentSpeak(L) AST,
then renders it to a string.
"""
def generate(self, program: Program) -> str:
asl = AgentSpeakFile()
self._generate_startup(program, asl)
for i, phase in enumerate(program.phases):
next_phase = program.phases[i + 1] if i < len(program.phases) - 1 else None
self._generate_phase_flow(phase, next_phase, asl)
self._generate_norms(phase, asl)
self._generate_goals(phase, asl)
self._generate_triggers(phase, asl)
self._generate_fallbacks(program, asl)
return str(asl)
# --- Section: Startup & Phase Management ---
def _generate_startup(self, program: Program, asl: AgentSpeakFile):
if not program.phases:
return
# Initial belief: phase(start).
asl.initial_beliefs.append(Rule(head=BeliefLiteral("phase", ['"start"'])))
# Startup plan: +started : phase(start) <- -phase(start); +phase(first_id).
asl.plans.append(
Plan(
trigger=BeliefLiteral("started"),
context=[BeliefLiteral("phase", ['"start"'])],
body=[
ActionLiteral('-phase("start")'),
ActionLiteral(f'+phase("{program.phases[0].id}")'),
],
)
)
# Initial plans:
asl.plans.append(
Plan(
trigger=GoalLiteral("generate_response_with_goal(Goal)"),
context=[BeliefLiteral("user_said", ["Message"])],
body=[
ActionLiteral("+responded_this_turn"),
ActionLiteral(".findall(Norm, norm(Norm), Norms)"),
ActionLiteral(".reply_with_goal(Message, Norms, Goal)"),
],
)
)
def _generate_phase_flow(self, phase: Phase, next_phase: Phase | None, asl: AgentSpeakFile):
"""Generates the main loop listener and the transition logic for this phase."""
# +user_said(Message) : phase(ID) <- !goal1; !goal2; !transition_phase.
goal_actions = [ActionLiteral("-responded_this_turn")]
goal_actions += [
ActionLiteral(f"!check_{self._slugify_str(keyword)}")
for keyword in self._get_keyword_conditionals(phase)
]
goal_actions += [ActionLiteral(f"!{self._slugify(g)}") for g in phase.goals]
goal_actions.append(ActionLiteral("!transition_phase"))
asl.plans.append(
Plan(
trigger=BeliefLiteral("user_said", ["Message"]),
context=[BeliefLiteral("phase", [f'"{phase.id}"'])],
body=goal_actions,
)
)
# +!transition_phase : phase(ID) <- -phase(ID); +(NEXT_ID).
next_id = str(next_phase.id) if next_phase else "end"
transition_context = [BeliefLiteral("phase", [f'"{phase.id}"'])]
if phase.goals:
transition_context.append(BeliefLiteral(f"achieved_{self._slugify(phase.goals[-1])}"))
asl.plans.append(
Plan(
trigger=GoalLiteral("transition_phase"),
context=transition_context,
body=[
ActionLiteral(f'-phase("{phase.id}")'),
ActionLiteral(f'+phase("{next_id}")'),
ActionLiteral("user_said(Anything)"),
ActionLiteral("-+user_said(Anything)"),
],
)
)
def _get_keyword_conditionals(self, phase: Phase) -> list[str]:
res = []
for belief in self._extract_basic_beliefs_from_phase(phase):
if isinstance(belief, KeywordBelief):
res.append(belief.keyword)
return res
# --- Section: Norms & Beliefs ---
def _generate_norms(self, phase: Phase, asl: AgentSpeakFile):
for norm in phase.norms:
norm_slug = f'"{norm.norm}"'
head = BeliefLiteral("norm", [norm_slug])
# Base context is the phase
phase_lit = BeliefLiteral("phase", [f'"{phase.id}"'])
if isinstance(norm, ConditionalNorm):
self._ensure_belief_inference(norm.condition, asl)
condition_expr = self._belief_to_expr(norm.condition)
body = BinaryOp(phase_lit, "&", condition_expr)
else:
body = phase_lit
asl.inference_rules.append(Rule(head=head, body=body))
def _ensure_belief_inference(self, belief: Belief, asl: AgentSpeakFile):
"""
Recursively adds rules to infer beliefs.
Checks strictly to avoid duplicates if necessary,
though ASL engines often handle redefinition or we can use a set to track processed IDs.
"""
if isinstance(belief, KeywordBelief):
pass
# # Rule: keyword_said("word") :- user_said(M) & .substring("word", M, P) & P >= 0.
# kwd_slug = f'"{belief.keyword}"'
# head = BeliefLiteral("keyword_said", [kwd_slug])
#
# # Avoid duplicates
# if any(str(r.head) == str(head) for r in asl.inference_rules):
# return
#
# body = BinaryOp(
# BeliefLiteral("user_said", ["Message"]),
# "&",
# BinaryOp(f".substring({kwd_slug}, Message, Pos)", "&", "Pos >= 0"),
# )
#
# asl.inference_rules.append(Rule(head=head, body=body))
elif isinstance(belief, InferredBelief):
self._ensure_belief_inference(belief.left, asl)
self._ensure_belief_inference(belief.right, asl)
slug = self._slugify(belief)
head = BeliefLiteral(slug)
if any(str(r.head) == str(head) for r in asl.inference_rules):
return
op_char = "&" if belief.operator == LogicalOperator.AND else "|"
body = BinaryOp(
self._belief_to_expr(belief.left), op_char, self._belief_to_expr(belief.right)
)
asl.inference_rules.append(PersistentRule(head=head, body=body))
def _belief_to_expr(self, belief: Belief) -> Expression:
if isinstance(belief, KeywordBelief):
return BeliefLiteral("keyword_said", [f'"{belief.keyword}"'])
else:
return BeliefLiteral(self._slugify(belief))
# --- Section: Goals ---
def _generate_goals(self, phase: Phase, asl: AgentSpeakFile):
previous_goal: Goal | None = None
for goal in phase.goals:
self._generate_goal_plan_recursive(goal, str(phase.id), previous_goal, asl)
previous_goal = goal
def _generate_goal_plan_recursive(
self,
goal: Goal,
phase_id: str,
previous_goal: Goal | None,
asl: AgentSpeakFile,
responded_needed: bool = True,
can_fail: bool = True,
):
goal_slug = self._slugify(goal)
# phase(ID) & not responded_this_turn & not achieved_goal
context = [
BeliefLiteral("phase", [f'"{phase_id}"']),
]
if responded_needed:
context.append(BeliefLiteral("responded_this_turn", negated=True))
if can_fail:
context.append(BeliefLiteral(f"achieved_{goal_slug}", negated=True))
if previous_goal:
prev_slug = self._slugify(previous_goal)
context.append(BeliefLiteral(f"achieved_{prev_slug}"))
body_actions = []
sub_goals_to_process = []
for step in goal.plan.steps:
if isinstance(step, Goal):
sub_slug = self._slugify(step)
body_actions.append(ActionLiteral(f"!{sub_slug}"))
sub_goals_to_process.append(step)
elif isinstance(step, SpeechAction):
body_actions.append(ActionLiteral(f'.say("{step.text}")'))
elif isinstance(step, GestureAction):
body_actions.append(ActionLiteral(f'.gesture("{step.gesture}")'))
elif isinstance(step, LLMAction):
body_actions.append(ActionLiteral(f'!generate_response_with_goal("{step.goal}")'))
# Mark achievement
if not goal.can_fail:
body_actions.append(ActionLiteral(f"+achieved_{goal_slug}"))
asl.plans.append(Plan(trigger=GoalLiteral(goal_slug), context=context, body=body_actions))
asl.plans.append(
Plan(trigger=GoalLiteral(goal_slug), context=[], body=[ActionLiteral("true")])
)
prev_sub = None
for sub_goal in sub_goals_to_process:
self._generate_goal_plan_recursive(sub_goal, phase_id, prev_sub, asl)
prev_sub = sub_goal
# --- Section: Triggers ---
def _generate_triggers(self, phase: Phase, asl: AgentSpeakFile):
for keyword in self._get_keyword_conditionals(phase):
asl.plans.append(
Plan(
trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"),
context=[
ActionLiteral(
f'user_said(Message) & .substring("{keyword}", Message, Pos) & Pos >= 0'
)
],
body=[
ActionLiteral(f'+keyword_said("{keyword}")'),
ActionLiteral(f'-keyword_said("{keyword}")'),
],
)
)
asl.plans.append(
Plan(
trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"),
body=[ActionLiteral("true")],
)
)
for trigger in phase.triggers:
self._ensure_belief_inference(trigger.condition, asl)
trigger_belief_slug = self._belief_to_expr(trigger.condition)
body_actions = []
sub_goals = []
for step in trigger.plan.steps:
if isinstance(step, Goal):
sub_slug = self._slugify(step)
body_actions.append(ActionLiteral(f"!{sub_slug}"))
sub_goals.append(step)
elif isinstance(step, SpeechAction):
body_actions.append(ActionLiteral(f'.say("{step.text}")'))
elif isinstance(step, GestureAction):
body_actions.append(
ActionLiteral(f'.gesture("{step.gesture.type}", "{step.gesture.name}")')
)
elif isinstance(step, LLMAction):
body_actions.append(
ActionLiteral(f'!generate_response_with_goal("{step.goal}")')
)
asl.plans.append(
Plan(
trigger=BeliefLiteral(trigger_belief_slug),
context=[BeliefLiteral("phase", [f'"{phase.id}"'])],
body=body_actions,
)
)
# Recurse for triggered goals
prev_sub = None
for sub_goal in sub_goals:
self._generate_goal_plan_recursive(
sub_goal, str(phase.id), prev_sub, asl, False, False
)
prev_sub = sub_goal
# --- Section: Fallbacks ---
def _generate_fallbacks(self, program: Program, asl: AgentSpeakFile):
asl.plans.append(
Plan(trigger=GoalLiteral("transition_phase"), context=[], body=[ActionLiteral("true")])
)
# --- Helpers ---
@singledispatchmethod
def _slugify(self, element: ProgramElement) -> str:
if element.name:
raise NotImplementedError("Cannot slugify this element.")
return self._slugify_str(element.name)
@_slugify.register
def _(self, goal: Goal) -> str:
if goal.name:
return self._slugify_str(goal.name)
return f"goal_{goal.id.hex}"
@_slugify.register
def _(self, kwb: KeywordBelief) -> str:
return f"keyword_said({kwb.keyword})"
@_slugify.register
def _(self, sb: SemanticBelief) -> str:
return self._slugify_str(sb.description)
@_slugify.register
def _(self, ib: InferredBelief) -> str:
return self._slugify_str(ib.name)
def _slugify_str(self, text: str) -> str:
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])
def _extract_basic_beliefs_from_program(self, program: Program) -> list[BasicBelief]:
beliefs = []
for phase in program.phases:
beliefs.extend(self._extract_basic_beliefs_from_phase(phase))
return beliefs
def _extract_basic_beliefs_from_phase(self, phase: Phase) -> list[BasicBelief]:
beliefs = []
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += self._extract_basic_beliefs_from_belief(norm.condition)
for trigger in phase.triggers:
beliefs += self._extract_basic_beliefs_from_belief(trigger.condition)
return beliefs
def _extract_basic_beliefs_from_belief(self, belief: Belief) -> list[BasicBelief]:
if isinstance(belief, InferredBelief):
return self._extract_basic_beliefs_from_belief(
belief.left
) + self._extract_basic_beliefs_from_belief(belief.right)
return [belief]
if __name__ == "__main__":
asyncio.run(do_things())
# do_other_things()y

View File

@@ -1,5 +1,6 @@
import asyncio import asyncio
import copy import copy
import json
import time import time
from collections.abc import Iterable from collections.abc import Iterable
@@ -13,7 +14,7 @@ from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.belief_message import BeliefMessage from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.llm_prompt_message import LLMPromptMessage from control_backend.schemas.llm_prompt_message import LLMPromptMessage
from control_backend.schemas.ri_message import SpeechCommand from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand
DELIMITER = ";\n" # TODO: temporary until we support lists in AgentSpeak DELIMITER = ";\n" # TODO: temporary until we support lists in AgentSpeak
@@ -100,14 +101,12 @@ class BDICoreAgent(BaseAgent):
maybe_more_work = True maybe_more_work = True
while maybe_more_work: while maybe_more_work:
maybe_more_work = False maybe_more_work = False
self.logger.debug("Stepping BDI.")
if self.bdi_agent.step(): if self.bdi_agent.step():
maybe_more_work = True maybe_more_work = True
if not maybe_more_work: if not maybe_more_work:
deadline = self.bdi_agent.shortest_deadline() deadline = self.bdi_agent.shortest_deadline()
if deadline: if deadline:
self.logger.debug("Sleeping until %s", deadline)
await asyncio.sleep(deadline - time.time()) await asyncio.sleep(deadline - time.time())
maybe_more_work = True maybe_more_work = True
else: else:
@@ -155,6 +154,20 @@ class BDICoreAgent(BaseAgent):
body=cmd.model_dump_json(), body=cmd.model_dump_json(),
) )
await self.send(out_msg) await self.send(out_msg)
case settings.agent_settings.user_interrupt_name:
self.logger.debug("Received user interruption: %s", msg)
match msg.thread:
case "force_phase_transition":
self._set_goal("transition_phase")
case "force_trigger":
self._force_trigger(msg.body)
case "force_norm":
self._force_norm(msg.body)
case "force_next_phase":
self._force_next_phase()
case _:
self.logger.warning("Received unknow user interruption: %s", msg)
def _apply_belief_changes(self, belief_changes: BeliefMessage): def _apply_belief_changes(self, belief_changes: BeliefMessage):
""" """
@@ -201,16 +214,35 @@ class BDICoreAgent(BaseAgent):
agentspeak.runtime.Intention(), agentspeak.runtime.Intention(),
) )
# Check for transitions
self.bdi_agent.call(
agentspeak.Trigger.addition,
agentspeak.GoalType.achievement,
agentspeak.Literal("transition_phase"),
agentspeak.runtime.Intention(),
)
# Check triggers
self.bdi_agent.call(
agentspeak.Trigger.addition,
agentspeak.GoalType.achievement,
agentspeak.Literal("check_triggers"),
agentspeak.runtime.Intention(),
)
self._wake_bdi_loop.set() self._wake_bdi_loop.set()
self.logger.debug(f"Added belief {self.format_belief_string(name, args)}") self.logger.debug(f"Added belief {self.format_belief_string(name, args)}")
def _remove_belief(self, name: str, args: Iterable[str]): def _remove_belief(self, name: str, args: Iterable[str] | None):
""" """
Removes a specific belief (with arguments), if it exists. Removes a specific belief (with arguments), if it exists.
""" """
new_args = (agentspeak.Literal(arg) for arg in args) if args is None:
term = agentspeak.Literal(name, new_args) term = agentspeak.Literal(name)
else:
new_args = (agentspeak.Literal(arg) for arg in args)
term = agentspeak.Literal(name, new_args)
result = self.bdi_agent.call( result = self.bdi_agent.call(
agentspeak.Trigger.removal, agentspeak.Trigger.removal,
@@ -250,6 +282,43 @@ class BDICoreAgent(BaseAgent):
self.logger.debug(f"Removed {removed_count} beliefs.") self.logger.debug(f"Removed {removed_count} beliefs.")
def _set_goal(self, name: str, args: Iterable[str] | None = None):
args = args or []
if args:
merged_args = DELIMITER.join(arg for arg in args)
new_args = (agentspeak.Literal(merged_args),)
term = agentspeak.Literal(name, new_args)
else:
term = agentspeak.Literal(name)
self.bdi_agent.call(
agentspeak.Trigger.addition,
agentspeak.GoalType.achievement,
term,
agentspeak.runtime.Intention(),
)
self._wake_bdi_loop.set()
self.logger.debug(f"Set goal !{self.format_belief_string(name, args)}.")
def _force_trigger(self, name: str):
self._set_goal(name)
self.logger.info("Manually forced trigger %s.", name)
# TODO: make this compatible for critical norms
def _force_norm(self, name: str):
self._add_belief(f"force_{name}")
self.logger.info("Manually forced norm %s.", name)
def _force_next_phase(self):
self._set_goal("force_transition_phase")
self.logger.info("Manually forced phase transition.")
def _add_custom_actions(self) -> None: def _add_custom_actions(self) -> None:
""" """
Add any custom actions here. Inside `@self.actions.add()`, the first argument is Add any custom actions here. Inside `@self.actions.add()`, the first argument is
@@ -258,16 +327,13 @@ class BDICoreAgent(BaseAgent):
""" """
@self.actions.add(".reply", 2) @self.actions.add(".reply", 2)
def _reply(agent: "BDICoreAgent", term, intention): def _reply(agent, term, intention):
""" """
Let the LLM generate a response to a user's utterance with the current norms and goals. Let the LLM generate a response to a user's utterance with the current norms and goals.
""" """
message_text = agentspeak.grounded(term.args[0], intention.scope) message_text = agentspeak.grounded(term.args[0], intention.scope)
norms = agentspeak.grounded(term.args[1], intention.scope) norms = agentspeak.grounded(term.args[1], intention.scope)
self.logger.debug("Norms: %s", norms)
self.logger.debug("User text: %s", message_text)
self.add_behavior(self._send_to_llm(str(message_text), str(norms), "")) self.add_behavior(self._send_to_llm(str(message_text), str(norms), ""))
yield yield
@@ -280,18 +346,24 @@ class BDICoreAgent(BaseAgent):
message_text = agentspeak.grounded(term.args[0], intention.scope) message_text = agentspeak.grounded(term.args[0], intention.scope)
norms = agentspeak.grounded(term.args[1], intention.scope) norms = agentspeak.grounded(term.args[1], intention.scope)
goal = agentspeak.grounded(term.args[2], intention.scope) goal = agentspeak.grounded(term.args[2], intention.scope)
self.logger.debug(
'"reply_with_goal" action called with message=%s, norms=%s, goal=%s',
message_text,
norms,
goal,
)
self.add_behavior(self._send_to_llm(str(message_text), str(norms), str(goal))) self.add_behavior(self._send_to_llm(str(message_text), str(norms), str(goal)))
yield yield
@self.actions.add(".notify_norms", 1)
def _notify_norms(agent, term, intention):
norms = agentspeak.grounded(term.args[0], intention.scope)
norm_update_message = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
thread="active_norms_update",
body=str(norms),
)
self.add_behavior(self.send(norm_update_message, should_log=False))
yield
@self.actions.add(".say", 1) @self.actions.add(".say", 1)
def _say(agent: "BDICoreAgent", term, intention): def _say(agent, term, intention):
""" """
Make the robot say the given text instantly. Make the robot say the given text instantly.
""" """
@@ -305,12 +377,21 @@ class BDICoreAgent(BaseAgent):
sender=settings.agent_settings.bdi_core_name, sender=settings.agent_settings.bdi_core_name,
body=speech_command.model_dump_json(), body=speech_command.model_dump_json(),
) )
# TODO: add to conversation history
self.add_behavior(self.send(speech_message)) self.add_behavior(self.send(speech_message))
chat_history_message = InternalMessage(
to=settings.agent_settings.llm_name,
thread="assistant_message",
body=str(message_text),
)
self.add_behavior(self.send(chat_history_message))
yield yield
@self.actions.add(".gesture", 2) @self.actions.add(".gesture", 2)
def _gesture(agent: "BDICoreAgent", term, intention): def _gesture(agent, term, intention):
""" """
Make the robot perform the given gesture instantly. Make the robot perform the given gesture instantly.
""" """
@@ -323,15 +404,118 @@ class BDICoreAgent(BaseAgent):
gesture_name, gesture_name,
) )
# gesture = Gesture(type=gesture_type, name=gesture_name) if str(gesture_type) == "single":
# gesture_message = InternalMessage( endpoint = RIEndpoint.GESTURE_SINGLE
# to=settings.agent_settings.robot_gesture_name, elif str(gesture_type) == "tag":
# sender=settings.agent_settings.bdi_core_name, endpoint = RIEndpoint.GESTURE_TAG
# body=gesture.model_dump_json(), else:
# ) self.logger.warning("Gesture type %s could not be resolved.", gesture_type)
# asyncio.create_task(agent.send(gesture_message)) endpoint = RIEndpoint.GESTURE_SINGLE
gesture_command = GestureCommand(endpoint=endpoint, data=gesture_name)
gesture_message = InternalMessage(
to=settings.agent_settings.robot_gesture_name,
sender=settings.agent_settings.bdi_core_name,
body=gesture_command.model_dump_json(),
)
self.add_behavior(self.send(gesture_message))
yield yield
@self.actions.add(".notify_user_said", 1)
def _notify_user_said(agent, term, intention):
user_said = agentspeak.grounded(term.args[0], intention.scope)
msg = InternalMessage(
to=settings.agent_settings.llm_name, thread="user_message", body=str(user_said)
)
self.add_behavior(self.send(msg))
yield
@self.actions.add(".notify_trigger_start", 1)
def _notify_trigger_start(agent, term, intention):
"""
Notify the UI about the trigger we just started doing.
"""
trigger_name = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug("Started trigger %s", trigger_name)
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
sender=self.name,
thread="trigger_start",
body=str(trigger_name),
)
# TODO: check with Pim
self.add_behavior(self.send(msg))
yield
@self.actions.add(".notify_trigger_end", 1)
def _notify_trigger_end(agent, term, intention):
"""
Notify the UI about the trigger we just started doing.
"""
trigger_name = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug("Finished trigger %s", trigger_name)
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
sender=self.name,
thread="trigger_end",
body=str(trigger_name),
)
self.add_behavior(self.send(msg))
yield
@self.actions.add(".notify_goal_start", 1)
def _notify_goal_start(agent, term, intention):
"""
Notify the UI about the goal we just started chasing.
"""
goal_name = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug("Started chasing goal %s", goal_name)
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
sender=self.name,
thread="goal_start",
body=str(goal_name),
)
self.add_behavior(self.send(msg))
yield
@self.actions.add(".notify_transition_phase", 2)
def _notify_transition_phase(agent, term, intention):
"""
Notify the BDI program manager about a phase transition.
"""
old = agentspeak.grounded(term.args[0], intention.scope)
new = agentspeak.grounded(term.args[1], intention.scope)
msg = InternalMessage(
to=settings.agent_settings.bdi_program_manager_name,
thread="transition_phase",
body=json.dumps({"old": str(old), "new": str(new)}),
)
self.add_behavior(self.send(msg))
yield
@self.actions.add(".notify_ui", 0)
def _notify_ui(agent, term, intention):
pass
async def _send_to_llm(self, text: str, norms: str, goals: str): async def _send_to_llm(self, text: str, norms: str, goals: str):
""" """
Sends a text query to the LLM agent asynchronously. Sends a text query to the LLM agent asynchronously.
@@ -341,13 +525,14 @@ class BDICoreAgent(BaseAgent):
to=settings.agent_settings.llm_name, to=settings.agent_settings.llm_name,
sender=self.name, sender=self.name,
body=prompt.model_dump_json(), body=prompt.model_dump_json(),
thread="prompt_message",
) )
await self.send(msg) await self.send(msg)
self.logger.info("Message sent to LLM agent: %s", text) self.logger.info("Message sent to LLM agent: %s", text)
@staticmethod @staticmethod
def format_belief_string(name: str, args: Iterable[str] = []): def format_belief_string(name: str, args: Iterable[str] | None = []):
""" """
Given a belief's name and its args, return a string of the form "name(*args)" Given a belief's name and its args, return a string of the form "name(*args)"
""" """
return f"{name}{'(' if args else ''}{','.join(args)}{')' if args else ''}" return f"{name}{'(' if args else ''}{','.join(args or [])}{')' if args else ''}"

View File

@@ -1,4 +1,5 @@
import asyncio import asyncio
import json
import zmq import zmq
from pydantic import ValidationError from pydantic import ValidationError
@@ -7,9 +8,16 @@ from zmq.asyncio import Context
from control_backend.agents import BaseAgent from control_backend.agents import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList from control_backend.schemas.belief_list import BeliefList, GoalList
from control_backend.schemas.internal_message import InternalMessage from control_backend.schemas.internal_message import InternalMessage
from control_backend.schemas.program import Belief, ConditionalNorm, InferredBelief, Program from control_backend.schemas.program import (
Belief,
ConditionalNorm,
Goal,
InferredBelief,
Phase,
Program,
)
class BDIProgramManager(BaseAgent): class BDIProgramManager(BaseAgent):
@@ -24,20 +32,30 @@ class BDIProgramManager(BaseAgent):
:ivar sub_socket: The ZMQ SUB socket used to receive program updates. :ivar sub_socket: The ZMQ SUB socket used to receive program updates.
""" """
_program: Program
_phase: Phase | None
def __init__(self, **kwargs): def __init__(self, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self.sub_socket = None self.sub_socket = None
def _initialize_internal_state(self, program: Program):
self._program = program
self._phase = program.phases[0] # start in first phase
self._goal_mapping: dict[str, Goal] = {}
for phase in program.phases:
for goal in phase.goals:
self._populate_goal_mapping_with_goal(goal)
def _populate_goal_mapping_with_goal(self, goal: Goal):
self._goal_mapping[str(goal.id)] = goal
for step in goal.plan.steps:
if isinstance(step, Goal):
self._populate_goal_mapping_with_goal(step)
async def _create_agentspeak_and_send_to_bdi(self, program: Program): async def _create_agentspeak_and_send_to_bdi(self, program: Program):
""" """
Convert a received program into BDI beliefs and send them to the BDI Core Agent. Convert a received program into an AgentSpeak file and send it to the BDI Core Agent.
Currently, it takes the **first phase** of the program and extracts:
- **Norms**: Constraints or rules the agent must follow.
- **Goals**: Objectives the agent must achieve.
These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will
overwrite any existing norms/goals of the same name in the BDI agent.
:param program: The program object received from the API. :param program: The program object received from the API.
""" """
@@ -59,17 +77,63 @@ class BDIProgramManager(BaseAgent):
await self.send(msg) await self.send(msg)
@staticmethod async def handle_message(self, msg: InternalMessage):
def _extract_beliefs_from_program(program: Program) -> list[Belief]: match msg.thread:
case "transition_phase":
phases = json.loads(msg.body)
await self._transition_phase(phases["old"], phases["new"])
case "achieve_goal":
goal_id = msg.body
await self._send_achieved_goal_to_semantic_belief_extractor(goal_id)
async def _transition_phase(self, old: str, new: str):
if old != str(self._phase.id):
self.logger.warning(
f"Phase transition desync detected! ASL requested move from '{old}', "
f"but Python is currently in '{self._phase.id}'. Request ignored."
)
return
if new == "end":
self._phase = None
# Notify user interaction agent
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
thread="transition_phase",
body="end",
)
self.logger.info("Transitioned to end phase, notifying UserInterruptAgent.")
self.add_behavior(self.send(msg))
return
for phase in self._program.phases:
if str(phase.id) == new:
self._phase = phase
await self._send_beliefs_to_semantic_belief_extractor()
await self._send_goals_to_semantic_belief_extractor()
# Notify user interaction agent
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
thread="transition_phase",
body=str(self._phase.id),
)
self.logger.info(f"Transitioned to phase {new}, notifying UserInterruptAgent.")
self.add_behavior(self.send(msg))
def _extract_current_beliefs(self) -> list[Belief]:
beliefs: list[Belief] = [] beliefs: list[Belief] = []
for phase in program.phases: for norm in self._phase.norms:
for norm in phase.norms: if isinstance(norm, ConditionalNorm):
if isinstance(norm, ConditionalNorm): beliefs += self._extract_beliefs_from_belief(norm.condition)
beliefs += BDIProgramManager._extract_beliefs_from_belief(norm.condition)
for trigger in phase.triggers: for trigger in self._phase.triggers:
beliefs += BDIProgramManager._extract_beliefs_from_belief(trigger.condition) beliefs += self._extract_beliefs_from_belief(trigger.condition)
return beliefs return beliefs
@@ -81,13 +145,11 @@ class BDIProgramManager(BaseAgent):
) + BDIProgramManager._extract_beliefs_from_belief(belief.right) ) + BDIProgramManager._extract_beliefs_from_belief(belief.right)
return [belief] return [belief]
async def _send_beliefs_to_semantic_belief_extractor(self, program: Program): async def _send_beliefs_to_semantic_belief_extractor(self):
""" """
Extract beliefs from the program and send them to the Semantic Belief Extractor Agent. Extract beliefs from the program and send them to the Semantic Belief Extractor Agent.
:param program: The program received from the API.
""" """
beliefs = BeliefList(beliefs=self._extract_beliefs_from_program(program)) beliefs = BeliefList(beliefs=self._extract_current_beliefs())
message = InternalMessage( message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name, to=settings.agent_settings.text_belief_extractor_name,
@@ -98,12 +160,94 @@ class BDIProgramManager(BaseAgent):
await self.send(message) await self.send(message)
@staticmethod
def _extract_goals_from_goal(goal: Goal) -> list[Goal]:
"""
Extract all goals from a given goal, that is: the goal itself and any subgoals.
:return: All goals within and including the given goal.
"""
goals: list[Goal] = [goal]
for plan in goal.plan:
if isinstance(plan, Goal):
goals.extend(BDIProgramManager._extract_goals_from_goal(plan))
return goals
def _extract_current_goals(self) -> list[Goal]:
"""
Extract all goals from the program, including subgoals.
:return: A list of Goal objects.
"""
goals: list[Goal] = []
for goal in self._phase.goals:
goals.extend(self._extract_goals_from_goal(goal))
return goals
async def _send_goals_to_semantic_belief_extractor(self):
"""
Extract goals for the current phase and send them to the Semantic Belief Extractor Agent.
"""
goals = GoalList(goals=self._extract_current_goals())
message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=self.name,
body=goals.model_dump_json(),
thread="goals",
)
await self.send(message)
async def _send_achieved_goal_to_semantic_belief_extractor(self, achieved_goal_id: str):
"""
Inform the semantic belief extractor when a goal is marked achieved.
:param achieved_goal_id: The id of the achieved goal.
"""
goal = self._goal_mapping.get(achieved_goal_id)
if goal is None:
self.logger.debug(f"Goal with ID {achieved_goal_id} marked achieved but was not found.")
return
goals = self._extract_goals_from_goal(goal)
message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
body=GoalList(goals=goals).model_dump_json(),
thread="achieved_goals",
)
await self.send(message)
async def _send_clear_llm_history(self):
"""
Clear the LLM Agent's conversation history.
Sends an empty history to the LLM Agent to reset its state.
"""
message = InternalMessage(
to=settings.agent_settings.llm_name,
body="clear_history",
)
await self.send(message)
self.logger.debug("Sent message to LLM agent to clear history.")
extractor_msg = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
thread="conversation_history",
body="reset",
)
await self.send(extractor_msg)
self.logger.debug("Sent message to extractor agent to clear history.")
async def _receive_programs(self): async def _receive_programs(self):
""" """
Continuous loop that receives program updates from the HTTP endpoint. Continuous loop that receives program updates from the HTTP endpoint.
It listens to the ``program`` topic on the internal ZMQ SUB socket. It listens to the ``program`` topic on the internal ZMQ SUB socket.
When a program is received, it is validated and forwarded to BDI via :meth:`_send_to_bdi`. When a program is received, it is validated and forwarded to BDI via :meth:`_send_to_bdi`.
Additionally, the LLM history is cleared via :meth:`_send_clear_llm_history`.
""" """
while True: while True:
topic, body = await self.sub_socket.recv_multipart() topic, body = await self.sub_socket.recv_multipart()
@@ -111,21 +255,43 @@ class BDIProgramManager(BaseAgent):
try: try:
program = Program.model_validate_json(body) program = Program.model_validate_json(body)
except ValidationError: except ValidationError:
self.logger.exception("Received an invalid program.") self.logger.warning("Received an invalid program.")
continue continue
self._initialize_internal_state(program)
await self._send_program_to_user_interrupt(program)
await self._send_clear_llm_history()
await asyncio.gather( await asyncio.gather(
self._create_agentspeak_and_send_to_bdi(program), self._create_agentspeak_and_send_to_bdi(program),
self._send_beliefs_to_semantic_belief_extractor(program), self._send_beliefs_to_semantic_belief_extractor(),
self._send_goals_to_semantic_belief_extractor(),
) )
async def _send_program_to_user_interrupt(self, program: Program):
"""
Send the received program to the User Interrupt Agent.
:param program: The program object received from the API.
"""
msg = InternalMessage(
sender=self.name,
to=settings.agent_settings.user_interrupt_name,
body=program.model_dump_json(),
thread="new_program",
)
await self.send(msg)
async def setup(self): async def setup(self):
""" """
Initialize the agent. Initialize the agent.
Connects the internal ZMQ SUB socket and subscribes to the 'program' topic. Connects the internal ZMQ SUB socket and subscribes to the 'program' topic.
Starts the background behavior to receive programs. Starts the background behavior to receive programs. Initializes a default program.
""" """
await self._create_agentspeak_and_send_to_bdi(Program(phases=[]))
context = Context.instance() context = Context.instance()
self.sub_socket = context.socket(zmq.SUB) self.sub_socket = context.socket(zmq.SUB)

View File

@@ -101,7 +101,7 @@ class BDIBeliefCollectorAgent(BaseAgent):
:return: A Belief object if the input is valid or None. :return: A Belief object if the input is valid or None.
""" """
try: try:
return Belief(name=name, arguments=arguments, replace=name == "user_said") return Belief(name=name, arguments=arguments)
except ValidationError: except ValidationError:
return None return None

View File

@@ -1,5 +1,34 @@
norms(""). phase("end").
keyword_said(Keyword) :- (user_said(Message) & .substring(Keyword, Message, Pos)) & (Pos >= 0).
+user_said(Message) : norms(Norms) <-
-user_said(Message); +!reply_with_goal(Goal)
.reply(Message, Norms). : user_said(Message)
<- +responded_this_turn;
.findall(Norm, norm(Norm), Norms);
.reply_with_goal(Message, Norms, Goal).
+!say(Text)
<- +responded_this_turn;
.say(Text).
+!reply
: user_said(Message)
<- +responded_this_turn;
.findall(Norm, norm(Norm), Norms);
.reply(Message, Norms).
+!notify_cycle
<- .notify_ui;
.wait(1).
+user_said(Message)
: phase("end")
<- .notify_user_said(Message);
!reply.
+!check_triggers
<- true.
+!transition_phase
<- true.

View File

@@ -2,17 +2,45 @@ import asyncio
import json import json
import httpx import httpx
from pydantic import ValidationError from pydantic import BaseModel, ValidationError
from control_backend.agents.base import BaseAgent from control_backend.agents.base import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.agent_system import InternalMessage from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList from control_backend.schemas.belief_list import BeliefList, GoalList
from control_backend.schemas.belief_message import Belief as InternalBelief from control_backend.schemas.belief_message import Belief as InternalBelief
from control_backend.schemas.belief_message import BeliefMessage from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.chat_history import ChatHistory, ChatMessage from control_backend.schemas.chat_history import ChatHistory, ChatMessage
from control_backend.schemas.program import SemanticBelief from control_backend.schemas.program import BaseGoal, SemanticBelief
type JSONLike = None | bool | int | float | str | list["JSONLike"] | dict[str, "JSONLike"]
class BeliefState(BaseModel):
true: set[InternalBelief] = set()
false: set[InternalBelief] = set()
def difference(self, other: "BeliefState") -> "BeliefState":
return BeliefState(
true=self.true - other.true,
false=self.false - other.false,
)
def union(self, other: "BeliefState") -> "BeliefState":
return BeliefState(
true=self.true | other.true,
false=self.false | other.false,
)
def __sub__(self, other):
return self.difference(other)
def __or__(self, other):
return self.union(other)
def __bool__(self):
return bool(self.true) or bool(self.false)
class TextBeliefExtractorAgent(BaseAgent): class TextBeliefExtractorAgent(BaseAgent):
@@ -27,12 +55,15 @@ class TextBeliefExtractorAgent(BaseAgent):
the message itself. the message itself.
""" """
def __init__(self, name: str, temperature: float = settings.llm_settings.code_temperature): def __init__(self, name: str):
super().__init__(name) super().__init__(name)
self.beliefs: dict[str, bool] = {} self._llm = self.LLM(self, settings.llm_settings.n_parallel)
self.available_beliefs: list[SemanticBelief] = [] self.belief_inferrer = SemanticBeliefInferrer(self._llm)
self.goal_inferrer = GoalAchievementInferrer(self._llm)
self._current_beliefs = BeliefState()
self._current_goal_completions: dict[str, bool] = {}
self._force_completed_goals: set[BaseGoal] = set()
self.conversation = ChatHistory(messages=[]) self.conversation = ChatHistory(messages=[])
self.temperature = temperature
async def setup(self): async def setup(self):
""" """
@@ -53,13 +84,14 @@ class TextBeliefExtractorAgent(BaseAgent):
case settings.agent_settings.transcription_name: case settings.agent_settings.transcription_name:
self.logger.debug("Received text from transcriber: %s", msg.body) self.logger.debug("Received text from transcriber: %s", msg.body)
self._apply_conversation_message(ChatMessage(role="user", content=msg.body)) self._apply_conversation_message(ChatMessage(role="user", content=msg.body))
await self._infer_new_beliefs()
await self._user_said(msg.body) await self._user_said(msg.body)
await self._infer_new_beliefs()
await self._infer_goal_completions()
case settings.agent_settings.llm_name: case settings.agent_settings.llm_name:
self.logger.debug("Received text from LLM: %s", msg.body) self.logger.debug("Received text from LLM: %s", msg.body)
self._apply_conversation_message(ChatMessage(role="assistant", content=msg.body)) self._apply_conversation_message(ChatMessage(role="assistant", content=msg.body))
case settings.agent_settings.bdi_program_manager_name: case settings.agent_settings.bdi_program_manager_name:
self._handle_program_manager_message(msg) await self._handle_program_manager_message(msg)
case _: case _:
self.logger.info("Discarding message from %s", sender) self.logger.info("Discarding message from %s", sender)
return return
@@ -74,12 +106,35 @@ class TextBeliefExtractorAgent(BaseAgent):
length_limit = settings.behaviour_settings.conversation_history_length_limit length_limit = settings.behaviour_settings.conversation_history_length_limit
self.conversation.messages = (self.conversation.messages + [message])[-length_limit:] self.conversation.messages = (self.conversation.messages + [message])[-length_limit:]
def _handle_program_manager_message(self, msg: InternalMessage): async def _handle_program_manager_message(self, msg: InternalMessage):
""" """
Handle a message from the program manager: extract available beliefs from it. Handle a message from the program manager: extract available beliefs and goals from it.
:param msg: The received message from the program manager. :param msg: The received message from the program manager.
""" """
match msg.thread:
case "beliefs":
self._handle_beliefs_message(msg)
await self._infer_new_beliefs()
case "goals":
self._handle_goals_message(msg)
await self._infer_goal_completions()
case "achieved_goals":
self._handle_goal_achieved_message(msg)
case "conversation_history":
if msg.body == "reset":
self._reset_phase()
case _:
self.logger.warning("Received unexpected message from %s", msg.sender)
def _reset_phase(self):
self.conversation = ChatHistory(messages=[])
self.belief_inferrer.available_beliefs.clear()
self._current_beliefs = BeliefState()
self.goal_inferrer.goals.clear()
self._current_goal_completions = {}
def _handle_beliefs_message(self, msg: InternalMessage):
try: try:
belief_list = BeliefList.model_validate_json(msg.body) belief_list = BeliefList.model_validate_json(msg.body)
except ValidationError: except ValidationError:
@@ -88,133 +143,262 @@ class TextBeliefExtractorAgent(BaseAgent):
) )
return return
self.available_beliefs = [b for b in belief_list.beliefs if isinstance(b, SemanticBelief)] available_beliefs = [b for b in belief_list.beliefs if isinstance(b, SemanticBelief)]
self.belief_inferrer.available_beliefs = available_beliefs
self.logger.debug( self.logger.debug(
"Received %d beliefs from the program manager.", "Received %d semantic beliefs from the program manager: %s",
len(self.available_beliefs), len(available_beliefs),
", ".join(b.name for b in available_beliefs),
) )
def _handle_goals_message(self, msg: InternalMessage):
try:
goals_list = GoalList.model_validate_json(msg.body)
except ValidationError:
self.logger.warning(
"Received message from program manager but it is not a valid list of goals."
)
return
# Use only goals that can fail, as the others are always assumed to be completed
available_goals = {g for g in goals_list.goals if g.can_fail}
available_goals -= self._force_completed_goals
self.goal_inferrer.goals = available_goals
self.logger.debug(
"Received %d failable goals from the program manager: %s",
len(available_goals),
", ".join(g.name for g in available_goals),
)
def _handle_goal_achieved_message(self, msg: InternalMessage):
# NOTE: When goals can be marked unachieved, remember to re-add them to the goal_inferrer
try:
goals_list = GoalList.model_validate_json(msg.body)
except ValidationError:
self.logger.warning(
"Received goal achieved message from the program manager, "
"but it is not a valid list of goals."
)
return
for goal in goals_list.goals:
self._force_completed_goals.add(goal)
self._current_goal_completions[f"achieved_{AgentSpeakGenerator.slugify(goal)}"] = True
self.goal_inferrer.goals -= self._force_completed_goals
async def _user_said(self, text: str): async def _user_said(self, text: str):
""" """
Create a belief for the user's full speech. Create a belief for the user's full speech.
:param text: User's transcribed text. :param text: User's transcribed text.
""" """
belief = {"beliefs": {"user_said": [text]}, "type": "belief_extraction_text"}
payload = json.dumps(belief)
belief_msg = InternalMessage( belief_msg = InternalMessage(
to=settings.agent_settings.bdi_belief_collector_name, to=settings.agent_settings.bdi_core_name,
sender=self.name, sender=self.name,
body=payload, body=BeliefMessage(
replace=[InternalBelief(name="user_said", arguments=[text])],
).model_dump_json(),
thread="beliefs", thread="beliefs",
) )
await self.send(belief_msg) await self.send(belief_msg)
async def _infer_new_beliefs(self): async def _infer_new_beliefs(self):
""" conversation_beliefs = await self.belief_inferrer.infer_from_conversation(self.conversation)
Process conversation history to extract beliefs, semantically. Any changed beliefs are sent
to the BDI core. new_beliefs = conversation_beliefs - self._current_beliefs
""" if not new_beliefs:
# Return instantly if there are no beliefs to infer self.logger.debug("No new beliefs detected.")
if not self.available_beliefs:
return return
candidate_beliefs = await self._infer_turn() self._current_beliefs |= new_beliefs
belief_changes = BeliefMessage()
for belief_key, belief_value in candidate_beliefs.items():
if belief_value is None:
continue
old_belief_value = self.beliefs.get(belief_key)
if belief_value == old_belief_value:
continue
self.beliefs[belief_key] = belief_value belief_changes = BeliefMessage(
create=list(new_beliefs.true),
delete=list(new_beliefs.false),
)
belief = InternalBelief(name=belief_key, arguments=None) message = InternalMessage(
if belief_value:
belief_changes.create.append(belief)
else:
belief_changes.delete.append(belief)
# Return if there were no changes in beliefs
if not belief_changes.has_values():
return
beliefs_message = InternalMessage(
to=settings.agent_settings.bdi_core_name, to=settings.agent_settings.bdi_core_name,
sender=self.name, sender=self.name,
body=belief_changes.model_dump_json(), body=belief_changes.model_dump_json(),
thread="beliefs", thread="beliefs",
) )
await self.send(beliefs_message) await self.send(message)
@staticmethod async def _infer_goal_completions(self):
def _split_into_chunks[T](items: list[T], n: int) -> list[list[T]]: goal_completions = await self.goal_inferrer.infer_from_conversation(self.conversation)
k, m = divmod(len(items), n)
return [items[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n)]
async def _infer_turn(self) -> dict: new_achieved = [
InternalBelief(name=goal, arguments=None)
for goal, achieved in goal_completions.items()
if achieved and self._current_goal_completions.get(goal) != achieved
]
new_not_achieved = [
InternalBelief(name=goal, arguments=None)
for goal, achieved in goal_completions.items()
if not achieved and self._current_goal_completions.get(goal) != achieved
]
for goal, achieved in goal_completions.items():
self._current_goal_completions[goal] = achieved
if not new_achieved and not new_not_achieved:
self.logger.debug("No goal achievement changes detected.")
return
belief_changes = BeliefMessage(
create=new_achieved,
delete=new_not_achieved,
)
message = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=belief_changes.model_dump_json(),
thread="beliefs",
)
await self.send(message)
class LLM:
""" """
Process the stored conversation history to extract semantic beliefs. Returns a list of Class that handles sending structured generation requests to an LLM.
beliefs that have been set to ``True``, ``False`` or ``None``.
:return: A dict mapping belief names to a value ``True``, ``False`` or ``None``.
""" """
def __init__(self, agent: "TextBeliefExtractorAgent", n_parallel: int):
self._agent = agent
self._semaphore = asyncio.Semaphore(n_parallel)
async def query(self, prompt: str, schema: dict, tries: int = 3) -> JSONLike | None:
"""
Query the LLM with the given prompt and schema, return an instance of a dict conforming
to this schema. Try ``tries`` times, or return None.
:param prompt: Prompt to be queried.
:param schema: Schema to be queried.
:param tries: Number of times to try to query the LLM.
:return: An instance of a dict conforming to this schema, or None if failed.
"""
try_count = 0
while try_count < tries:
try_count += 1
try:
return await self._query_llm(prompt, schema)
except (httpx.HTTPError, json.JSONDecodeError, KeyError) as e:
if try_count < tries:
continue
self._agent.logger.exception(
"Failed to get LLM response after %d tries.",
try_count,
exc_info=e,
)
return None
async def _query_llm(self, prompt: str, schema: dict) -> JSONLike:
"""
Query an LLM with the given prompt and schema, return an instance of a dict conforming
to that schema.
:param prompt: The prompt to be queried.
:param schema: Schema to use during response.
:return: A dict conforming to this schema.
:raises httpx.HTTPStatusError: If the LLM server responded with an error.
:raises json.JSONDecodeError: If the LLM response was not valid JSON. May happen if the
response was cut off early due to length limitations.
:raises KeyError: If the LLM server responded with no error, but the response was
invalid.
"""
async with self._semaphore:
async with httpx.AsyncClient() as client:
response = await client.post(
settings.llm_settings.local_llm_url,
json={
"model": settings.llm_settings.local_llm_model,
"messages": [{"role": "user", "content": prompt}],
"response_format": {
"type": "json_schema",
"json_schema": {
"name": "Beliefs",
"strict": True,
"schema": schema,
},
},
"reasoning_effort": "low",
"temperature": settings.llm_settings.code_temperature,
"stream": False,
},
timeout=30.0,
)
response.raise_for_status()
response_json = response.json()
json_message = response_json["choices"][0]["message"]["content"]
return json.loads(json_message)
class SemanticBeliefInferrer:
"""
Class that handles only prompting an LLM for semantic beliefs.
"""
def __init__(
self,
llm: "TextBeliefExtractorAgent.LLM",
available_beliefs: list[SemanticBelief] | None = None,
):
self._llm = llm
self.available_beliefs: list[SemanticBelief] = available_beliefs or []
async def infer_from_conversation(self, conversation: ChatHistory) -> BeliefState:
"""
Process conversation history to extract beliefs, semantically. The result is an object that
describes all beliefs that hold or don't hold based on the full conversation.
:param conversation: The conversation history to be processed.
:return: An object that describes beliefs.
"""
# Return instantly if there are no beliefs to infer
if not self.available_beliefs:
return BeliefState()
n_parallel = max(1, min(settings.llm_settings.n_parallel - 1, len(self.available_beliefs))) n_parallel = max(1, min(settings.llm_settings.n_parallel - 1, len(self.available_beliefs)))
all_beliefs = await asyncio.gather( all_beliefs: list[dict[str, bool | None] | None] = await asyncio.gather(
*[ *[
self._infer_beliefs(self.conversation, beliefs) self._infer_beliefs(conversation, beliefs)
for beliefs in self._split_into_chunks(self.available_beliefs, n_parallel) for beliefs in self._split_into_chunks(self.available_beliefs, n_parallel)
] ]
) )
retval = {} retval = BeliefState()
for beliefs in all_beliefs: for beliefs in all_beliefs:
if beliefs is None: if beliefs is None:
continue continue
retval.update(beliefs) for belief_name, belief_holds in beliefs.items():
if belief_holds is None:
continue
belief = InternalBelief(name=belief_name, arguments=None)
if belief_holds:
retval.true.add(belief)
else:
retval.false.add(belief)
return retval return retval
@staticmethod @staticmethod
def _create_belief_schema(belief: SemanticBelief) -> tuple[str, dict]: def _split_into_chunks[T](items: list[T], n: int) -> list[list[T]]:
return AgentSpeakGenerator.slugify(belief), { """
"type": ["boolean", "null"], Split a list into ``n`` chunks, making each chunk approximately ``len(items) / n`` long.
"description": belief.description,
}
@staticmethod :param items: The list of items to split.
def _create_beliefs_schema(beliefs: list[SemanticBelief]) -> dict: :param n: The number of desired chunks.
belief_schemas = [ :return: A list of chunks each approximately ``len(items) / n`` long.
TextBeliefExtractorAgent._create_belief_schema(belief) for belief in beliefs """
] k, m = divmod(len(items), n)
return [items[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n)]
return {
"type": "object",
"properties": dict(belief_schemas),
"required": [name for name, _ in belief_schemas],
}
@staticmethod
def _format_message(message: ChatMessage):
return f"{message.role.upper()}:\n{message.content}"
@staticmethod
def _format_conversation(conversation: ChatHistory):
return "\n\n".join(
[TextBeliefExtractorAgent._format_message(message) for message in conversation.messages]
)
@staticmethod
def _format_beliefs(beliefs: list[SemanticBelief]):
return "\n".join(
[f"- {AgentSpeakGenerator.slugify(belief)}: {belief.description}" for belief in beliefs]
)
async def _infer_beliefs( async def _infer_beliefs(
self, self,
conversation: ChatHistory, conversation: ChatHistory,
beliefs: list[SemanticBelief], beliefs: list[SemanticBelief],
) -> dict | None: ) -> dict[str, bool | None] | None:
""" """
Infer given beliefs based on the given conversation. Infer given beliefs based on the given conversation.
:param conversation: The conversation to infer beliefs from. :param conversation: The conversation to infer beliefs from.
@@ -241,70 +425,79 @@ Respond with a JSON similar to the following, but with the property names as giv
schema = self._create_beliefs_schema(beliefs) schema = self._create_beliefs_schema(beliefs)
return await self._retry_query_llm(prompt, schema) return await self._llm.query(prompt, schema)
async def _retry_query_llm(self, prompt: str, schema: dict, tries: int = 3) -> dict | None: @staticmethod
def _create_belief_schema(belief: SemanticBelief) -> tuple[str, dict]:
return AgentSpeakGenerator.slugify(belief), {
"type": ["boolean", "null"],
"description": belief.description,
}
@staticmethod
def _create_beliefs_schema(beliefs: list[SemanticBelief]) -> dict:
belief_schemas = [
SemanticBeliefInferrer._create_belief_schema(belief) for belief in beliefs
]
return {
"type": "object",
"properties": dict(belief_schemas),
"required": [name for name, _ in belief_schemas],
}
@staticmethod
def _format_message(message: ChatMessage):
return f"{message.role.upper()}:\n{message.content}"
@staticmethod
def _format_conversation(conversation: ChatHistory):
return "\n\n".join(
[SemanticBeliefInferrer._format_message(message) for message in conversation.messages]
)
@staticmethod
def _format_beliefs(beliefs: list[SemanticBelief]):
return "\n".join(
[f"- {AgentSpeakGenerator.slugify(belief)}: {belief.description}" for belief in beliefs]
)
class GoalAchievementInferrer(SemanticBeliefInferrer):
def __init__(self, llm: TextBeliefExtractorAgent.LLM):
super().__init__(llm)
self.goals: set[BaseGoal] = set()
async def infer_from_conversation(self, conversation: ChatHistory) -> dict[str, bool]:
""" """
Query the LLM with the given prompt and schema, return an instance of a dict conforming Determine which goals have been achieved based on the given conversation.
to this schema. Try ``tries`` times, or return None.
:param prompt: Prompt to be queried. :param conversation: The conversation to infer goal completion from.
:param schema: Schema to be queried. :return: A mapping of goals and a boolean whether they have been achieved.
:return: An instance of a dict conforming to this schema, or None if failed.
""" """
try_count = 0 if not self.goals:
while try_count < tries: return {}
try_count += 1
try: goals_achieved = await asyncio.gather(
return await self._query_llm(prompt, schema) *[self._infer_goal(conversation, g) for g in self.goals]
except (httpx.HTTPError, json.JSONDecodeError, KeyError) as e: )
if try_count < tries: return {
continue f"achieved_{AgentSpeakGenerator.slugify(goal)}": achieved
self.logger.exception( for goal, achieved in zip(self.goals, goals_achieved, strict=True)
"Failed to get LLM response after %d tries.", }
try_count,
exc_info=e,
)
return None async def _infer_goal(self, conversation: ChatHistory, goal: BaseGoal) -> bool:
prompt = f"""{self._format_conversation(conversation)}
async def _query_llm(self, prompt: str, schema: dict) -> dict: Given the above conversation, what has the following goal been achieved?
"""
Query an LLM with the given prompt and schema, return an instance of a dict conforming to
that schema.
:param prompt: The prompt to be queried. The name of the goal: {goal.name}
:param schema: Schema to use during response. Description of the goal: {goal.description}
:return: A dict conforming to this schema.
:raises httpx.HTTPStatusError: If the LLM server responded with an error.
:raises json.JSONDecodeError: If the LLM response was not valid JSON. May happen if the
response was cut off early due to length limitations.
:raises KeyError: If the LLM server responded with no error, but the response was invalid.
"""
async with httpx.AsyncClient() as client:
response = await client.post(
settings.llm_settings.local_llm_url,
json={
"model": settings.llm_settings.local_llm_model,
"messages": [{"role": "user", "content": prompt}],
"response_format": {
"type": "json_schema",
"json_schema": {
"name": "Beliefs",
"strict": True,
"schema": schema,
},
},
"reasoning_effort": "low",
"temperature": self.temperature,
"stream": False,
},
timeout=None,
)
response.raise_for_status()
response_json = response.json() Answer with literally only `true` or `false` (without backticks)."""
json_message = response_json["choices"][0]["message"]["content"]
beliefs = json.loads(json_message) schema = {
return beliefs "type": "boolean",
}
return await self._llm.query(prompt, schema)

View File

@@ -3,12 +3,14 @@ import json
import zmq import zmq
import zmq.asyncio as azmq import zmq.asyncio as azmq
from pydantic import ValidationError
from zmq.asyncio import Context from zmq.asyncio import Context
from control_backend.agents import BaseAgent from control_backend.agents import BaseAgent
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.internal_message import InternalMessage from control_backend.schemas.internal_message import InternalMessage
from control_backend.schemas.ri_message import PauseCommand
from ..actuation.robot_speech_agent import RobotSpeechAgent from ..actuation.robot_speech_agent import RobotSpeechAgent
from ..perception import VADAgent from ..perception import VADAgent
@@ -39,7 +41,7 @@ class RICommunicationAgent(BaseAgent):
def __init__( def __init__(
self, self,
name: str, name: str,
address=settings.zmq_settings.ri_command_address, address=settings.zmq_settings.ri_communication_address,
bind=False, bind=False,
): ):
super().__init__(name) super().__init__(name)
@@ -172,7 +174,7 @@ class RICommunicationAgent(BaseAgent):
bind = port_data["bind"] bind = port_data["bind"]
if not bind: if not bind:
addr = f"tcp://localhost:{port}" addr = f"tcp://{settings.ri_host}:{port}"
else: else:
addr = f"tcp://*:{port}" addr = f"tcp://*:{port}"
@@ -255,7 +257,8 @@ class RICommunicationAgent(BaseAgent):
self._req_socket.recv_json(), timeout=seconds_to_wait_total / 2 self._req_socket.recv_json(), timeout=seconds_to_wait_total / 2
) )
self.logger.debug(f'Received message "{message}" from RI.') if "endpoint" in message and message["endpoint"] != "ping":
self.logger.debug(f'Received message "{message}" from RI.')
if "endpoint" not in message: if "endpoint" not in message:
self.logger.warning("No received endpoint in message, expected ping endpoint.") self.logger.warning("No received endpoint in message, expected ping endpoint.")
continue continue
@@ -319,12 +322,9 @@ class RICommunicationAgent(BaseAgent):
self.connected = True self.connected = True
async def handle_message(self, msg: InternalMessage): async def handle_message(self, msg: InternalMessage):
""" try:
Handle an incoming message. pause_command = PauseCommand.model_validate_json(msg.body)
self._req_socket.send_json(pause_command.model_dump())
Currently not implemented for this agent. self.logger.debug(self._req_socket.recv_json())
except ValidationError:
:param msg: The received message. self.logger.warning("Incorrect message format for PauseCommand.")
:raises NotImplementedError: Always, since this method is not implemented.
"""
self.logger.warning("custom warning for handle msg in ri coms %s", self.name)

View File

@@ -46,14 +46,23 @@ class LLMAgent(BaseAgent):
:param msg: The received internal message. :param msg: The received internal message.
""" """
if msg.sender == settings.agent_settings.bdi_core_name: if msg.sender == settings.agent_settings.bdi_core_name:
self.logger.debug("Processing message from BDI core.") match msg.thread:
try: case "prompt_message":
prompt_message = LLMPromptMessage.model_validate_json(msg.body) try:
await self._process_bdi_message(prompt_message) prompt_message = LLMPromptMessage.model_validate_json(msg.body)
except ValidationError: await self._process_bdi_message(prompt_message)
self.logger.debug("Prompt message from BDI core is invalid.") except ValidationError:
self.logger.debug("Prompt message from BDI core is invalid.")
case "assistant_message":
self.history.append({"role": "assistant", "content": msg.body})
case "user_message":
self.history.append({"role": "user", "content": msg.body})
elif msg.sender == settings.agent_settings.bdi_program_manager_name:
if msg.body == "clear_history":
self.logger.debug("Clearing conversation history.")
self.history.clear()
else: else:
self.logger.debug("Message ignored (not from BDI core.") self.logger.debug("Message ignored.")
async def _process_bdi_message(self, message: LLMPromptMessage): async def _process_bdi_message(self, message: LLMPromptMessage):
""" """
@@ -114,13 +123,6 @@ class LLMAgent(BaseAgent):
:param goals: Goals the LLM should achieve. :param goals: Goals the LLM should achieve.
:yield: Fragments of the LLM-generated content (e.g., sentences/phrases). :yield: Fragments of the LLM-generated content (e.g., sentences/phrases).
""" """
self.history.append(
{
"role": "user",
"content": prompt,
}
)
instructions = LLMInstructions(norms if norms else None, goals if goals else None) instructions = LLMInstructions(norms if norms else None, goals if goals else None)
messages = [ messages = [
{ {

View File

@@ -110,12 +110,11 @@ class VADAgent(BaseAgent):
self._connect_audio_in_socket() self._connect_audio_in_socket()
audio_out_port = self._connect_audio_out_socket() audio_out_address = self._connect_audio_out_socket()
if audio_out_port is None: if audio_out_address is None:
self.logger.error("Could not bind output socket, stopping.") self.logger.error("Could not bind output socket, stopping.")
await self.stop() await self.stop()
return return
audio_out_address = f"tcp://localhost:{audio_out_port}"
# Connect to internal communication socket # Connect to internal communication socket
self.program_sub_socket = azmq.Context.instance().socket(zmq.SUB) self.program_sub_socket = azmq.Context.instance().socket(zmq.SUB)
@@ -168,13 +167,14 @@ class VADAgent(BaseAgent):
self.audio_in_socket.connect(self.audio_in_address) self.audio_in_socket.connect(self.audio_in_address)
self.audio_in_poller = SocketPoller[bytes](self.audio_in_socket) self.audio_in_poller = SocketPoller[bytes](self.audio_in_socket)
def _connect_audio_out_socket(self) -> int | None: def _connect_audio_out_socket(self) -> str | None:
""" """
Returns the port bound, or None if binding failed. Returns the address that was bound to, or None if binding failed.
""" """
try: try:
self.audio_out_socket = azmq.Context.instance().socket(zmq.PUB) self.audio_out_socket = azmq.Context.instance().socket(zmq.PUB)
return self.audio_out_socket.bind_to_random_port("tcp://localhost", max_tries=100) self.audio_out_socket.bind(settings.zmq_settings.vad_pub_address)
return settings.zmq_settings.vad_pub_address
except zmq.ZMQBindError: except zmq.ZMQBindError:
self.logger.error("Failed to bind an audio output socket after 100 tries.") self.logger.error("Failed to bind an audio output socket after 100 tries.")
self.audio_out_socket = None self.audio_out_socket = None
@@ -246,10 +246,11 @@ class VADAgent(BaseAgent):
assert self.model is not None assert self.model is not None
prob = self.model(torch.from_numpy(chunk), settings.vad_settings.sample_rate_hz).item() prob = self.model(torch.from_numpy(chunk), settings.vad_settings.sample_rate_hz).item()
non_speech_patience = settings.behaviour_settings.vad_non_speech_patience_chunks non_speech_patience = settings.behaviour_settings.vad_non_speech_patience_chunks
begin_silence_length = settings.behaviour_settings.vad_begin_silence_chunks
prob_threshold = settings.behaviour_settings.vad_prob_threshold prob_threshold = settings.behaviour_settings.vad_prob_threshold
if prob > prob_threshold: if prob > prob_threshold:
if self.i_since_speech > non_speech_patience: if self.i_since_speech > non_speech_patience + begin_silence_length:
self.logger.debug("Speech started.") self.logger.debug("Speech started.")
self.audio_buffer = np.append(self.audio_buffer, chunk) self.audio_buffer = np.append(self.audio_buffer, chunk)
self.i_since_speech = 0 self.i_since_speech = 0
@@ -263,7 +264,7 @@ class VADAgent(BaseAgent):
continue continue
# Speech probably ended. Make sure we have a usable amount of data. # Speech probably ended. Make sure we have a usable amount of data.
if len(self.audio_buffer) >= 3 * len(chunk): if len(self.audio_buffer) > begin_silence_length * len(chunk):
self.logger.debug("Speech ended.") self.logger.debug("Speech ended.")
assert self.audio_out_socket is not None assert self.audio_out_socket is not None
await self.audio_out_socket.send(self.audio_buffer[: -2 * len(chunk)].tobytes()) await self.audio_out_socket.send(self.audio_buffer[: -2 * len(chunk)].tobytes())

View File

@@ -4,8 +4,11 @@ import zmq
from zmq.asyncio import Context from zmq.asyncio import Context
from control_backend.agents import BaseAgent from control_backend.agents import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.agent_system import InternalMessage from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief, BeliefMessage
from control_backend.schemas.program import ConditionalNorm, Program
from control_backend.schemas.ri_message import ( from control_backend.schemas.ri_message import (
GestureCommand, GestureCommand,
PauseCommand, PauseCommand,
@@ -23,18 +26,45 @@ class UserInterruptAgent(BaseAgent):
- Send a prioritized message to the `RobotSpeechAgent` - Send a prioritized message to the `RobotSpeechAgent`
- Send a prioritized gesture to the `RobotGestureAgent` - Send a prioritized gesture to the `RobotGestureAgent`
- Send a belief override to the `BDIProgramManager`in order to activate a - Send a belief override to the `BDI Core` in order to activate a
trigger/conditional norm or complete a goal. trigger/conditional norm or complete a goal.
Prioritized actions clear the current RI queue before inserting the new item, Prioritized actions clear the current RI queue before inserting the new item,
ensuring they are executed immediately after Pepper's current action has been fulfilled. ensuring they are executed immediately after Pepper's current action has been fulfilled.
:ivar sub_socket: The ZMQ SUB socket used to receive user intterupts. :ivar sub_socket: The ZMQ SUB socket used to receive user interrupts.
""" """
def __init__(self, **kwargs): def __init__(self, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self.sub_socket = None self.sub_socket = None
self.pub_socket = None
self._trigger_map = {}
self._trigger_reverse_map = {}
self._goal_map = {} # id -> sluggified goal
self._goal_reverse_map = {} # sluggified goal -> id
self._cond_norm_map = {} # id -> sluggified cond norm
self._cond_norm_reverse_map = {} # sluggified cond norm -> id
async def setup(self):
"""
Initialize the agent.
Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic.
Starts the background behavior to receive the user interrupts.
"""
context = Context.instance()
self.sub_socket = context.socket(zmq.SUB)
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
self.sub_socket.subscribe("button_pressed")
self.pub_socket = context.socket(zmq.PUB)
self.pub_socket.connect(settings.zmq_settings.internal_pub_address)
self.add_behavior(self._receive_button_event())
async def _receive_button_event(self): async def _receive_button_event(self):
""" """
@@ -45,7 +75,11 @@ class UserInterruptAgent(BaseAgent):
These are the different types and contexts: These are the different types and contexts:
- type: "speech", context: string that the robot has to say. - type: "speech", context: string that the robot has to say.
- type: "gesture", context: single gesture name that the robot has to perform. - type: "gesture", context: single gesture name that the robot has to perform.
- type: "override", context: belief_id that overrides the goal/trigger/conditional norm. - type: "override", context: id that belongs to the goal/trigger/conditional norm.
- type: "override_unachieve", context: id that belongs to the conditional norm to unachieve.
- type: "next_phase", context: None, indicates to the BDI Core to
- type: "pause", context: boolean indicating whether to pause
- type: "reset_phase", context: None, indicates to the BDI Core to
""" """
while True: while True:
topic, body = await self.sub_socket.recv_multipart() topic, body = await self.sub_socket.recv_multipart()
@@ -58,73 +92,200 @@ class UserInterruptAgent(BaseAgent):
self.logger.error("Received invalid JSON payload on topic %s", topic) self.logger.error("Received invalid JSON payload on topic %s", topic)
continue continue
if event_type == "speech": self.logger.debug("Received event type %s", event_type)
await self._send_to_speech_agent(event_context)
self.logger.info(
"Forwarded button press (speech) with context '%s' to RobotSpeechAgent.",
event_context,
)
elif event_type == "gesture":
await self._send_to_gesture_agent(event_context)
self.logger.info(
"Forwarded button press (gesture) with context '%s' to RobotGestureAgent.",
event_context,
)
elif event_type == "override":
await self._send_to_program_manager(event_context)
self.logger.info(
"Forwarded button press (override) with context '%s' to BDIProgramManager.",
event_context,
)
elif event_type == "pause":
self.logger.debug(
"Received pause/resume button press with context '%s'.", event_context
)
await self._send_pause_command(event_context)
if event_context:
self.logger.info("Sent pause command.")
else:
self.logger.info("Sent resume command.")
elif event_type in ["next_phase", "reset_phase", "reset_experiment"]: match event_type:
await self._send_experiment_control_to_bdi_core(event_type) case "speech":
await self._send_to_speech_agent(event_context)
self.logger.info(
"Forwarded button press (speech) with context '%s' to RobotSpeechAgent.",
event_context,
)
case "gesture":
await self._send_to_gesture_agent(event_context)
self.logger.info(
"Forwarded button press (gesture) with context '%s' to RobotGestureAgent.",
event_context,
)
case "override":
ui_id = str(event_context)
if asl_trigger := self._trigger_map.get(ui_id):
await self._send_to_bdi("force_trigger", asl_trigger)
self.logger.info(
"Forwarded button press (override) with context '%s' to BDI Core.",
event_context,
)
elif asl_cond_norm := self._cond_norm_map.get(ui_id):
await self._send_to_bdi_belief(asl_cond_norm, "cond_norm")
self.logger.info(
"Forwarded button press (override) with context '%s' to BDI Core.",
event_context,
)
elif asl_goal := self._goal_map.get(ui_id):
await self._send_to_bdi_belief(asl_goal, "goal")
self.logger.info(
"Forwarded button press (override) with context '%s' to BDI Core.",
event_context,
)
# Send achieve_goal to program manager to update semantic belief extractor
goal_achieve_msg = InternalMessage(
to=settings.agent_settings.bdi_program_manager_name,
thread="achieve_goal",
body=ui_id,
)
else: await self.send(goal_achieve_msg)
self.logger.warning( else:
"Received button press with unknown type '%s' (context: '%s').", self.logger.warning("Could not determine which element to override.")
event_type, case "override_unachieve":
event_context, ui_id = str(event_context)
) if asl_cond_norm := self._cond_norm_map.get(ui_id):
await self._send_to_bdi_belief(asl_cond_norm, "cond_norm", True)
self.logger.info(
"Forwarded button press (override_unachieve)"
"with context '%s' to BDI Core.",
event_context,
)
else:
self.logger.warning(
"Could not determine which conditional norm to unachieve."
)
async def _send_experiment_control_to_bdi_core(self, type): case "pause":
self.logger.debug(
"Received pause/resume button press with context '%s'.", event_context
)
await self._send_pause_command(event_context)
if event_context:
self.logger.info("Sent pause command.")
else:
self.logger.info("Sent resume command.")
case "next_phase" | "reset_phase":
await self._send_experiment_control_to_bdi_core(event_type)
case _:
self.logger.warning(
"Received button press with unknown type '%s' (context: '%s').",
event_type,
event_context,
)
async def handle_message(self, msg: InternalMessage):
""" """
method to send experiment control buttons to bdi core. Handle commands received from other internal Python agents.
:param type: the type of control button we should send to the bdi core.
""" """
# Switch which thread we should send to bdi core match msg.thread:
thread = "" case "new_program":
match type: self._create_mapping(msg.body)
case "next_phase": case "trigger_start":
thread = "force_next_phase" # msg.body is the sluggified trigger
case "reset_phase": asl_slug = msg.body
thread = "reset_current_phase" ui_id = self._trigger_reverse_map.get(asl_slug)
case "reset_experiment":
thread = "reset_experiment" if ui_id:
payload = {"type": "trigger_update", "id": ui_id, "achieved": True}
await self._send_experiment_update(payload)
self.logger.info(f"UI Update: Trigger {asl_slug} started (ID: {ui_id})")
case "trigger_end":
asl_slug = msg.body
ui_id = self._trigger_reverse_map.get(asl_slug)
if ui_id:
payload = {"type": "trigger_update", "id": ui_id, "achieved": False}
await self._send_experiment_update(payload)
self.logger.info(f"UI Update: Trigger {asl_slug} ended (ID: {ui_id})")
case "transition_phase":
new_phase_id = msg.body
self.logger.info(f"Phase transition detected: {new_phase_id}")
payload = {"type": "phase_update", "id": new_phase_id}
await self._send_experiment_update(payload)
case "goal_start":
goal_name = msg.body
ui_id = self._goal_reverse_map.get(goal_name)
if ui_id:
payload = {"type": "goal_update", "id": ui_id}
await self._send_experiment_update(payload)
self.logger.info(f"UI Update: Goal {goal_name} started (ID: {ui_id})")
case "active_norms_update":
active_norms_asl = [
s.strip("() '\",") for s in msg.body.split(",") if s.strip("() '\",")
]
await self._broadcast_cond_norms(active_norms_asl)
case _: case _:
self.logger.warning( self.logger.debug(f"Received internal message on unhandled thread: {msg.thread}")
"Received unknown experiment control type '%s' to send to BDI Core.",
type,
)
out_msg = InternalMessage( async def _broadcast_cond_norms(self, active_slugs: list[str]):
to=settings.agent_settings.bdi_core_name, """
sender=self.name, Sends the current state of all conditional norms to the UI.
thread=thread, :param active_slugs: A list of slugs (strings) currently active in the BDI core.
body="", """
) updates = []
self.logger.debug("Sending experiment control '%s' to BDI Core.", thread) for asl_slug, ui_id in self._cond_norm_reverse_map.items():
await self.send(out_msg) is_active = asl_slug in active_slugs
updates.append({"id": ui_id, "active": is_active})
payload = {"type": "cond_norms_state_update", "norms": updates}
if self.pub_socket:
topic = b"status"
body = json.dumps(payload).encode("utf-8")
await self.pub_socket.send_multipart([topic, body])
# self.logger.info(f"UI Update: Active norms {updates}")
def _create_mapping(self, program_json: str):
"""
Create mappings between UI IDs and ASL slugs for triggers, goals, and conditional norms
"""
try:
program = Program.model_validate_json(program_json)
self._trigger_map = {}
self._trigger_reverse_map = {}
self._goal_map = {}
self._cond_norm_map = {}
self._cond_norm_reverse_map = {}
for phase in program.phases:
for trigger in phase.triggers:
slug = AgentSpeakGenerator.slugify(trigger)
self._trigger_map[str(trigger.id)] = slug
self._trigger_reverse_map[slug] = str(trigger.id)
for goal in phase.goals:
self._goal_map[str(goal.id)] = AgentSpeakGenerator.slugify(goal)
self._goal_reverse_map[AgentSpeakGenerator.slugify(goal)] = str(goal.id)
for goal, id in self._goal_reverse_map.items():
self.logger.debug(f"Goal mapping: UI ID {goal} -> {id}")
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
asl_slug = AgentSpeakGenerator.slugify(norm)
norm_id = str(norm.id)
self._cond_norm_map[norm_id] = asl_slug
self._cond_norm_reverse_map[norm.norm] = norm_id
self.logger.debug("Added conditional norm %s", asl_slug)
self.logger.info(
f"Mapped {len(self._trigger_map)} triggers and {len(self._goal_map)} goals "
f"and {len(self._cond_norm_map)} conditional norms for UserInterruptAgent."
)
except Exception as e:
self.logger.error(f"Mapping failed: {e}")
async def _send_experiment_update(self, data, should_log: bool = True):
"""
Sends an update to the 'experiment' topic.
The SSE endpoint will pick this up and push it to the UI.
"""
if self.pub_socket:
topic = b"experiment"
body = json.dumps(data).encode("utf-8")
await self.pub_socket.send_multipart([topic, body])
if should_log:
self.logger.debug(f"Sent experiment update: {data}")
async def _send_to_speech_agent(self, text_to_say: str): async def _send_to_speech_agent(self, text_to_say: str):
""" """
@@ -157,26 +318,60 @@ class UserInterruptAgent(BaseAgent):
) )
await self.send(out_msg) await self.send(out_msg)
async def _send_to_program_manager(self, belief_id: str): async def _send_to_bdi(self, thread: str, body: str):
""" """Send slug of trigger to BDI"""
Send a button_override belief to the BDIProgramManager. msg = InternalMessage(to=settings.agent_settings.bdi_core_name, thread=thread, body=body)
await self.send(msg)
self.logger.info(f"Directly forced {thread} in BDI: {body}")
:param belief_id: The belief_id that overrides the goal/trigger/conditional norm. async def _send_to_bdi_belief(self, asl: str, asl_type: str, unachieve: bool = False):
this id can belong to a basic belief or an inferred belief. """Send belief to BDI Core"""
See also: https://utrechtuniversity.youtrack.cloud/articles/N25B-A-27/UI-components if asl_type == "goal":
belief_name = f"achieved_{asl}"
elif asl_type == "cond_norm":
belief_name = f"force_{asl}"
else:
self.logger.warning("Tried to send belief with unknown type")
belief = Belief(name=belief_name, arguments=None)
self.logger.debug(f"Sending belief to BDI Core: {belief_name}")
# Conditional norms are unachieved by removing the belief
belief_message = (
BeliefMessage(delete=[belief]) if unachieve else BeliefMessage(create=[belief])
)
msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
thread="beliefs",
body=belief_message.model_dump_json(),
)
await self.send(msg)
async def _send_experiment_control_to_bdi_core(self, type):
""" """
data = {"belief": belief_id} method to send experiment control buttons to bdi core.
message = InternalMessage(
to=settings.agent_settings.bdi_program_manager_name, :param type: the type of control button we should send to the bdi core.
"""
# Switch which thread we should send to bdi core
thread = ""
match type:
case "next_phase":
thread = "force_next_phase"
case "reset_phase":
thread = "reset_current_phase"
case _:
self.logger.warning(
"Received unknown experiment control type '%s' to send to BDI Core.",
type,
)
out_msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name, sender=self.name,
body=json.dumps(data), thread=thread,
thread="belief_override_id", body="",
)
await self.send(message)
self.logger.info(
"Sent button_override belief with id '%s' to Program manager.",
belief_id,
) )
self.logger.debug("Sending experiment control '%s' to BDI Core.", thread)
await self.send(out_msg)
async def _send_pause_command(self, pause): async def _send_pause_command(self, pause):
""" """
@@ -209,18 +404,3 @@ class UserInterruptAgent(BaseAgent):
) )
await self.send(vad_message) await self.send(vad_message)
self.logger.info("Sent resume command to VAD Agent and RI Communication Agent.") self.logger.info("Sent resume command to VAD Agent and RI Communication Agent.")
async def setup(self):
"""
Initialize the agent.
Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic.
Starts the background behavior to receive the user interrupts.
"""
context = Context.instance()
self.sub_socket = context.socket(zmq.SUB)
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
self.sub_socket.subscribe("button_pressed")
self.add_behavior(self._receive_button_event())

View File

@@ -1,31 +0,0 @@
import logging
from fastapi import APIRouter, Request
from control_backend.schemas.events import ButtonPressedEvent
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/button_pressed", status_code=202)
async def receive_button_event(event: ButtonPressedEvent, request: Request):
"""
Endpoint to handle external button press events.
Validates the event payload and publishes it to the internal 'button_pressed' topic.
Subscribers (in this case user_interrupt_agent) will pick this up to trigger
specific behaviors or state changes.
:param event: The parsed ButtonPressedEvent object.
:param request: The FastAPI request object.
"""
logger.debug("Received button event: %s | %s", event.type, event.context)
topic = b"button_pressed"
body = event.model_dump_json().encode()
pub_socket = request.app.state.endpoints_pub_socket
await pub_socket.send_multipart([topic, body])
return {"status": "Event received"}

View File

@@ -137,7 +137,6 @@ async def ping_stream(request: Request):
logger.info("Client disconnected from SSE") logger.info("Client disconnected from SSE")
break break
logger.debug(f"Yielded new connection event in robot ping router: {str(connected)}")
connectedJson = json.dumps(connected) connectedJson = json.dumps(connected)
yield (f"data: {connectedJson}\n\n") yield (f"data: {connectedJson}\n\n")

View File

@@ -0,0 +1,94 @@
import asyncio
import logging
import zmq
import zmq.asyncio
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
from zmq.asyncio import Context
from control_backend.core.config import settings
from control_backend.schemas.events import ButtonPressedEvent
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/button_pressed", status_code=202)
async def receive_button_event(event: ButtonPressedEvent, request: Request):
"""
Endpoint to handle external button press events.
Validates the event payload and publishes it to the internal 'button_pressed' topic.
Subscribers (in this case user_interrupt_agent) will pick this up to trigger
specific behaviors or state changes.
:param event: The parsed ButtonPressedEvent object.
:param request: The FastAPI request object.
"""
logger.debug("Received button event: %s | %s", event.type, event.context)
topic = b"button_pressed"
body = event.model_dump_json().encode()
pub_socket = request.app.state.endpoints_pub_socket
await pub_socket.send_multipart([topic, body])
return {"status": "Event received"}
@router.get("/experiment_stream")
async def experiment_stream(request: Request):
# Use the asyncio-compatible context
context = Context.instance()
socket = context.socket(zmq.SUB)
# Connect and subscribe
socket.connect(settings.zmq_settings.internal_sub_address)
socket.subscribe(b"experiment")
async def gen():
try:
while True:
# Check if client closed the tab
if await request.is_disconnected():
logger.error("Client disconnected from experiment stream.")
break
try:
parts = await asyncio.wait_for(socket.recv_multipart(), timeout=10.0)
_, message = parts
yield f"data: {message.decode().strip()}\n\n"
except TimeoutError:
continue
finally:
socket.close()
return StreamingResponse(gen(), media_type="text/event-stream")
@router.get("/status_stream")
async def status_stream(request: Request):
context = Context.instance()
socket = context.socket(zmq.SUB)
socket.connect(settings.zmq_settings.internal_sub_address)
socket.subscribe(b"status")
async def gen():
try:
while True:
if await request.is_disconnected():
break
try:
# Shorter timeout since this is frequent
parts = await asyncio.wait_for(socket.recv_multipart(), timeout=0.5)
_, message = parts
yield f"data: {message.decode().strip()}\n\n"
except TimeoutError:
yield ": ping\n\n" # Keep the connection alive
continue
finally:
socket.close()
return StreamingResponse(gen(), media_type="text/event-stream")

View File

@@ -1,6 +1,6 @@
from fastapi.routing import APIRouter from fastapi.routing import APIRouter
from control_backend.api.v1.endpoints import button_pressed, logs, message, program, robot, sse from control_backend.api.v1.endpoints import logs, message, program, robot, sse, user_interact
api_router = APIRouter() api_router = APIRouter()
@@ -14,4 +14,4 @@ api_router.include_router(logs.router, tags=["Logs"])
api_router.include_router(program.router, tags=["Program"]) api_router.include_router(program.router, tags=["Program"])
api_router.include_router(button_pressed.router, tags=["Button Pressed Events"]) api_router.include_router(user_interact.router, tags=["Button Pressed Events"])

View File

@@ -60,6 +60,9 @@ class BaseAgent(ABC):
self._tasks: set[asyncio.Task] = set() self._tasks: set[asyncio.Task] = set()
self._running = False self._running = False
self._internal_pub_socket: None | azmq.Socket = None
self._internal_sub_socket: None | azmq.Socket = None
# Register immediately # Register immediately
AgentDirectory.register(name, self) AgentDirectory.register(name, self)
@@ -117,7 +120,7 @@ class BaseAgent(ABC):
task.cancel() task.cancel()
self.logger.info(f"Agent {self.name} stopped") self.logger.info(f"Agent {self.name} stopped")
async def send(self, message: InternalMessage): async def send(self, message: InternalMessage, should_log: bool = True):
""" """
Send a message to another agent. Send a message to another agent.
@@ -130,16 +133,26 @@ class BaseAgent(ABC):
:param message: The message to send. :param message: The message to send.
""" """
target = AgentDirectory.get(message.to) message.sender = self.name
if target: to = message.to
await target.inbox.put(message) receivers = [to] if isinstance(to, str) else to
self.logger.debug(f"Sent message {message.body} to {message.to} via regular inbox.")
else: for receiver in receivers:
# Apparently target agent is on a different process, send via ZMQ target = AgentDirectory.get(receiver)
topic = f"internal/{message.to}".encode()
body = message.model_dump_json().encode() if target:
await self._internal_pub_socket.send_multipart([topic, body]) await target.inbox.put(message)
self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.") if should_log:
self.logger.debug(
f"Sent message {message.body} to {message.to} via regular inbox."
)
else:
# Apparently target agent is on a different process, send via ZMQ
topic = f"internal/{receiver}".encode()
body = message.model_dump_json().encode()
await self._internal_pub_socket.send_multipart([topic, body])
if should_log:
self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.")
async def _process_inbox(self): async def _process_inbox(self):
""" """
@@ -149,7 +162,6 @@ class BaseAgent(ABC):
""" """
while self._running: while self._running:
msg = await self.inbox.get() msg = await self.inbox.get()
self.logger.debug(f"Received message from {msg.sender}.")
await self.handle_message(msg) await self.handle_message(msg)
async def _receive_internal_zmq_loop(self): async def _receive_internal_zmq_loop(self):
@@ -192,7 +204,16 @@ class BaseAgent(ABC):
:param coro: The coroutine to execute as a task. :param coro: The coroutine to execute as a task.
""" """
task = asyncio.create_task(coro)
async def try_coro(coro_: Coroutine):
try:
await coro_
except asyncio.CancelledError:
self.logger.debug("A behavior was canceled successfully: %s", coro_)
except Exception:
self.logger.warning("An exception occurred in a behavior.", exc_info=True)
task = asyncio.create_task(try_coro(coro))
self._tasks.add(task) self._tasks.add(task)
task.add_done_callback(self._tasks.discard) task.add_done_callback(self._tasks.discard)
return task return task

View File

@@ -1,3 +1,12 @@
"""
An exhaustive overview of configurable options. All of these can be set using environment variables
by nesting with double underscores (__). Start from the ``Settings`` class.
For example, ``settings.ri_host`` becomes ``RI_HOST``, and
``settings.zmq_settings.ri_communication_address`` becomes
``ZMQ_SETTINGS__RI_COMMUNICATION_ADDRESS``.
"""
from pydantic import BaseModel from pydantic import BaseModel
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -8,16 +17,17 @@ class ZMQSettings(BaseModel):
:ivar internal_pub_address: Address for the internal PUB socket. :ivar internal_pub_address: Address for the internal PUB socket.
:ivar internal_sub_address: Address for the internal SUB socket. :ivar internal_sub_address: Address for the internal SUB socket.
:ivar ri_command_address: Address for sending commands to the Robot Interface. :ivar ri_communication_address: Address for the endpoint that the Robot Interface connects to.
:ivar ri_communication_address: Address for receiving communication from the Robot Interface. :ivar vad_pub_address: Address that the VAD agent binds to and publishes audio segments to.
:ivar vad_agent_address: Address for the Voice Activity Detection (VAD) agent.
""" """
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
internal_pub_address: str = "tcp://localhost:5560" internal_pub_address: str = "tcp://localhost:5560"
internal_sub_address: str = "tcp://localhost:5561" internal_sub_address: str = "tcp://localhost:5561"
ri_command_address: str = "tcp://localhost:0000"
ri_communication_address: str = "tcp://*:5555" ri_communication_address: str = "tcp://*:5555"
internal_gesture_rep_adress: str = "tcp://localhost:7788" internal_gesture_rep_adress: str = "tcp://localhost:7788"
vad_pub_address: str = "inproc://vad_stream"
class AgentSettings(BaseModel): class AgentSettings(BaseModel):
@@ -36,6 +46,8 @@ class AgentSettings(BaseModel):
:ivar robot_speech_name: Name of the Robot Speech Agent. :ivar robot_speech_name: Name of the Robot Speech Agent.
""" """
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
# agent names # agent names
bdi_core_name: str = "bdi_core_agent" bdi_core_name: str = "bdi_core_agent"
bdi_belief_collector_name: str = "belief_collector_agent" bdi_belief_collector_name: str = "belief_collector_agent"
@@ -61,6 +73,7 @@ class BehaviourSettings(BaseModel):
:ivar vad_prob_threshold: Probability threshold for Voice Activity Detection. :ivar vad_prob_threshold: Probability threshold for Voice Activity Detection.
:ivar vad_initial_since_speech: Initial value for 'since speech' counter in VAD. :ivar vad_initial_since_speech: Initial value for 'since speech' counter in VAD.
:ivar vad_non_speech_patience_chunks: Number of non-speech chunks to wait before speech ended. :ivar vad_non_speech_patience_chunks: Number of non-speech chunks to wait before speech ended.
:ivar vad_begin_silence_chunks: The number of chunks of silence to prepend to speech chunks.
:ivar transcription_max_concurrent_tasks: Maximum number of concurrent transcription tasks. :ivar transcription_max_concurrent_tasks: Maximum number of concurrent transcription tasks.
:ivar transcription_words_per_minute: Estimated words per minute for transcription timing. :ivar transcription_words_per_minute: Estimated words per minute for transcription timing.
:ivar transcription_words_per_token: Estimated words per token for transcription timing. :ivar transcription_words_per_token: Estimated words per token for transcription timing.
@@ -68,6 +81,8 @@ class BehaviourSettings(BaseModel):
:ivar conversation_history_length_limit: The maximum amount of messages to extract beliefs from. :ivar conversation_history_length_limit: The maximum amount of messages to extract beliefs from.
""" """
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
sleep_s: float = 1.0 sleep_s: float = 1.0
comm_setup_max_retries: int = 5 comm_setup_max_retries: int = 5
socket_poller_timeout_ms: int = 100 socket_poller_timeout_ms: int = 100
@@ -75,7 +90,8 @@ class BehaviourSettings(BaseModel):
# VAD settings # VAD settings
vad_prob_threshold: float = 0.5 vad_prob_threshold: float = 0.5
vad_initial_since_speech: int = 100 vad_initial_since_speech: int = 100
vad_non_speech_patience_chunks: int = 3 vad_non_speech_patience_chunks: int = 15
vad_begin_silence_chunks: int = 6
# transcription behaviour # transcription behaviour
transcription_max_concurrent_tasks: int = 3 transcription_max_concurrent_tasks: int = 3
@@ -99,6 +115,8 @@ class LLMSettings(BaseModel):
:ivar n_parallel: The number of parallel calls allowed to be made to the LLM. :ivar n_parallel: The number of parallel calls allowed to be made to the LLM.
""" """
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
local_llm_url: str = "http://localhost:1234/v1/chat/completions" local_llm_url: str = "http://localhost:1234/v1/chat/completions"
local_llm_model: str = "gpt-oss" local_llm_model: str = "gpt-oss"
chat_temperature: float = 1.0 chat_temperature: float = 1.0
@@ -115,6 +133,8 @@ class VADSettings(BaseModel):
:ivar sample_rate_hz: Sample rate in Hz for the VAD model. :ivar sample_rate_hz: Sample rate in Hz for the VAD model.
""" """
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
repo_or_dir: str = "snakers4/silero-vad" repo_or_dir: str = "snakers4/silero-vad"
model_name: str = "silero_vad" model_name: str = "silero_vad"
sample_rate_hz: int = 16000 sample_rate_hz: int = 16000
@@ -128,6 +148,8 @@ class SpeechModelSettings(BaseModel):
:ivar openai_model_name: Model name for OpenAI-based speech recognition. :ivar openai_model_name: Model name for OpenAI-based speech recognition.
""" """
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
# model identifiers for speech recognition # model identifiers for speech recognition
mlx_model_name: str = "mlx-community/whisper-small.en-mlx" mlx_model_name: str = "mlx-community/whisper-small.en-mlx"
openai_model_name: str = "small.en" openai_model_name: str = "small.en"
@@ -139,6 +161,7 @@ class Settings(BaseSettings):
:ivar app_title: Title of the application. :ivar app_title: Title of the application.
:ivar ui_url: URL of the frontend UI. :ivar ui_url: URL of the frontend UI.
:ivar ri_host: The hostname of the Robot Interface.
:ivar zmq_settings: ZMQ configuration. :ivar zmq_settings: ZMQ configuration.
:ivar agent_settings: Agent name configuration. :ivar agent_settings: Agent name configuration.
:ivar behaviour_settings: Behavior configuration. :ivar behaviour_settings: Behavior configuration.
@@ -151,6 +174,8 @@ class Settings(BaseSettings):
ui_url: str = "http://localhost:5173" ui_url: str = "http://localhost:5173"
ri_host: str = "localhost"
zmq_settings: ZMQSettings = ZMQSettings() zmq_settings: ZMQSettings = ZMQSettings()
agent_settings: AgentSettings = AgentSettings() agent_settings: AgentSettings = AgentSettings()

View File

@@ -1,5 +1,6 @@
from pydantic import BaseModel from pydantic import BaseModel
from control_backend.schemas.program import BaseGoal
from control_backend.schemas.program import Belief as ProgramBelief from control_backend.schemas.program import Belief as ProgramBelief
@@ -12,3 +13,7 @@ class BeliefList(BaseModel):
""" """
beliefs: list[ProgramBelief] beliefs: list[ProgramBelief]
class GoalList(BaseModel):
goals: list[BaseGoal]

View File

@@ -11,7 +11,10 @@ class Belief(BaseModel):
""" """
name: str name: str
arguments: list[str] | None arguments: list[str] | None = None
# To make it hashable
model_config = {"frozen": True}
class BeliefMessage(BaseModel): class BeliefMessage(BaseModel):

View File

@@ -1,3 +1,5 @@
from collections.abc import Iterable
from pydantic import BaseModel from pydantic import BaseModel
@@ -11,7 +13,7 @@ class InternalMessage(BaseModel):
:ivar thread: An optional thread identifier/topic to categorize the message (e.g., 'beliefs'). :ivar thread: An optional thread identifier/topic to categorize the message (e.g., 'beliefs').
""" """
to: str to: str | Iterable[str]
sender: str sender: str | None = None
body: str body: str
thread: str | None = None thread: str | None = None

View File

@@ -15,6 +15,9 @@ class ProgramElement(BaseModel):
name: str name: str
id: UUID4 id: UUID4
# To make program elements hashable
model_config = {"frozen": True}
class LogicalOperator(Enum): class LogicalOperator(Enum):
AND = "AND" AND = "AND"
@@ -105,23 +108,33 @@ class Plan(ProgramElement):
steps: list[PlanElement] steps: list[PlanElement]
class Goal(ProgramElement): class BaseGoal(ProgramElement):
""" """
Represents an objective to be achieved. To reach the goal, we should execute Represents an objective to be achieved. This base version does not include a plan to achieve
the corresponding plan. If we can fail to achieve a goal after executing the plan, this goal, and is used in semantic belief extraction.
for example when the achieving of the goal is dependent on the user's reply, this means
that the achieved status will be set from somewhere else in the program.
:ivar description: A description of the goal, used to determine if it has been achieved. :ivar description: A description of the goal, used to determine if it has been achieved.
:ivar plan: The plan to execute.
:ivar can_fail: Whether we can fail to achieve the goal after executing the plan. :ivar can_fail: Whether we can fail to achieve the goal after executing the plan.
""" """
description: str description: str = ""
plan: Plan
can_fail: bool = True can_fail: bool = True
class Goal(BaseGoal):
"""
Represents an objective to be achieved. To reach the goal, we should execute the corresponding
plan. It inherits from the BaseGoal a variable `can_fail`, which if true will cause the
completion to be determined based on the conversation.
Instances of this goal are not hashable because a plan is not hashable.
:ivar plan: The plan to execute.
"""
plan: Plan
type Action = SpeechAction | GestureAction | LLMAction type Action = SpeechAction | GestureAction | LLMAction
@@ -180,7 +193,6 @@ class Trigger(ProgramElement):
:ivar plan: The plan to execute. :ivar plan: The plan to execute.
""" """
name: str = ""
condition: Belief condition: Belief
plan: Plan plan: Plan

View File

@@ -91,7 +91,7 @@ def test_out_socket_creation(zmq_context):
assert per_vad_agent.audio_out_socket is not None assert per_vad_agent.audio_out_socket is not None
zmq_context.return_value.socket.assert_called_once_with(zmq.PUB) zmq_context.return_value.socket.assert_called_once_with(zmq.PUB)
zmq_context.return_value.socket.return_value.bind_to_random_port.assert_called_once() zmq_context.return_value.socket.return_value.bind.assert_called_once_with("inproc://vad_stream")
@pytest.mark.asyncio @pytest.mark.asyncio

View File

@@ -73,7 +73,7 @@ async def test_setup_connect(zmq_context, mocker):
async def test_handle_message_sends_valid_gesture_command(): async def test_handle_message_sends_valid_gesture_command():
"""Internal message with valid gesture tag is forwarded to robot pub socket.""" """Internal message with valid gesture tag is forwarded to robot pub socket."""
pubsocket = AsyncMock() pubsocket = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"]) agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.pubsocket = pubsocket agent.pubsocket = pubsocket
payload = { payload = {
@@ -91,7 +91,7 @@ async def test_handle_message_sends_valid_gesture_command():
async def test_handle_message_sends_non_gesture_command(): async def test_handle_message_sends_non_gesture_command():
"""Internal message with non-gesture endpoint is not forwarded by this agent.""" """Internal message with non-gesture endpoint is not forwarded by this agent."""
pubsocket = AsyncMock() pubsocket = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"]) agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.pubsocket = pubsocket agent.pubsocket = pubsocket
payload = {"endpoint": "some_other_endpoint", "data": "invalid_tag_not_in_list"} payload = {"endpoint": "some_other_endpoint", "data": "invalid_tag_not_in_list"}
@@ -107,7 +107,7 @@ async def test_handle_message_sends_non_gesture_command():
async def test_handle_message_rejects_invalid_gesture_tag(): async def test_handle_message_rejects_invalid_gesture_tag():
"""Internal message with invalid gesture tag is not forwarded.""" """Internal message with invalid gesture tag is not forwarded."""
pubsocket = AsyncMock() pubsocket = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"]) agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.pubsocket = pubsocket agent.pubsocket = pubsocket
# Use a tag that's not in gesture_data # Use a tag that's not in gesture_data
@@ -123,7 +123,7 @@ async def test_handle_message_rejects_invalid_gesture_tag():
async def test_handle_message_invalid_payload(): async def test_handle_message_invalid_payload():
"""Invalid payload is caught and does not send.""" """Invalid payload is caught and does not send."""
pubsocket = AsyncMock() pubsocket = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"]) agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.pubsocket = pubsocket agent.pubsocket = pubsocket
msg = InternalMessage(to="robot", sender="tester", body=json.dumps({"bad": "data"})) msg = InternalMessage(to="robot", sender="tester", body=json.dumps({"bad": "data"}))
@@ -142,12 +142,12 @@ async def test_zmq_command_loop_valid_gesture_payload():
async def recv_once(): async def recv_once():
# stop after first iteration # stop after first iteration
agent._running = False agent._running = False
return (b"command", json.dumps(command).encode("utf-8")) return b"command", json.dumps(command).encode("utf-8")
fake_socket.recv_multipart = recv_once fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock() fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"]) agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.subsocket = fake_socket agent.subsocket = fake_socket
agent.pubsocket = fake_socket agent.pubsocket = fake_socket
agent._running = True agent._running = True
@@ -165,12 +165,12 @@ async def test_zmq_command_loop_valid_non_gesture_payload():
async def recv_once(): async def recv_once():
agent._running = False agent._running = False
return (b"command", json.dumps(command).encode("utf-8")) return b"command", json.dumps(command).encode("utf-8")
fake_socket.recv_multipart = recv_once fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock() fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"]) agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.subsocket = fake_socket agent.subsocket = fake_socket
agent.pubsocket = fake_socket agent.pubsocket = fake_socket
agent._running = True agent._running = True
@@ -188,12 +188,12 @@ async def test_zmq_command_loop_invalid_gesture_tag():
async def recv_once(): async def recv_once():
agent._running = False agent._running = False
return (b"command", json.dumps(command).encode("utf-8")) return b"command", json.dumps(command).encode("utf-8")
fake_socket.recv_multipart = recv_once fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock() fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"]) agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.subsocket = fake_socket agent.subsocket = fake_socket
agent.pubsocket = fake_socket agent.pubsocket = fake_socket
agent._running = True agent._running = True
@@ -210,12 +210,12 @@ async def test_zmq_command_loop_invalid_json():
async def recv_once(): async def recv_once():
agent._running = False agent._running = False
return (b"command", b"{not_json}") return b"command", b"{not_json}"
fake_socket.recv_multipart = recv_once fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock() fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"]) agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.subsocket = fake_socket agent.subsocket = fake_socket
agent.pubsocket = fake_socket agent.pubsocket = fake_socket
agent._running = True agent._running = True
@@ -232,12 +232,12 @@ async def test_zmq_command_loop_ignores_send_gestures_topic():
async def recv_once(): async def recv_once():
agent._running = False agent._running = False
return (b"send_gestures", b"{}") return b"send_gestures", b"{}"
fake_socket.recv_multipart = recv_once fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock() fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"]) agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.subsocket = fake_socket agent.subsocket = fake_socket
agent.pubsocket = fake_socket agent.pubsocket = fake_socket
agent._running = True agent._running = True
@@ -259,7 +259,9 @@ async def test_fetch_gestures_loop_without_amount():
fake_repsocket.recv = recv_once fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock() fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no", "wave", "point"]) agent = RobotGestureAgent(
"robot_gesture", gesture_data=["hello", "yes", "no", "wave", "point"], address=""
)
agent.repsocket = fake_repsocket agent.repsocket = fake_repsocket
agent._running = True agent._running = True
@@ -287,7 +289,9 @@ async def test_fetch_gestures_loop_with_amount():
fake_repsocket.recv = recv_once fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock() fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no", "wave", "point"]) agent = RobotGestureAgent(
"robot_gesture", gesture_data=["hello", "yes", "no", "wave", "point"], address=""
)
agent.repsocket = fake_repsocket agent.repsocket = fake_repsocket
agent._running = True agent._running = True
@@ -315,7 +319,7 @@ async def test_fetch_gestures_loop_with_integer_request():
fake_repsocket.recv = recv_once fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock() fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"]) agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.repsocket = fake_repsocket agent.repsocket = fake_repsocket
agent._running = True agent._running = True
@@ -340,7 +344,7 @@ async def test_fetch_gestures_loop_with_invalid_json():
fake_repsocket.recv = recv_once fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock() fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"]) agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.repsocket = fake_repsocket agent.repsocket = fake_repsocket
agent._running = True agent._running = True
@@ -365,7 +369,7 @@ async def test_fetch_gestures_loop_with_non_integer_json():
fake_repsocket.recv = recv_once fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock() fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"]) agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.repsocket = fake_repsocket agent.repsocket = fake_repsocket
agent._running = True agent._running = True
@@ -381,7 +385,7 @@ async def test_fetch_gestures_loop_with_non_integer_json():
def test_gesture_data_attribute(): def test_gesture_data_attribute():
"""Test that gesture_data returns the expected list.""" """Test that gesture_data returns the expected list."""
gesture_data = ["hello", "yes", "no", "wave"] gesture_data = ["hello", "yes", "no", "wave"]
agent = RobotGestureAgent("robot_gesture", gesture_data=gesture_data) agent = RobotGestureAgent("robot_gesture", gesture_data=gesture_data, address="")
assert agent.gesture_data == gesture_data assert agent.gesture_data == gesture_data
assert isinstance(agent.gesture_data, list) assert isinstance(agent.gesture_data, list)
@@ -398,7 +402,7 @@ async def test_stop_closes_sockets():
pubsocket = MagicMock() pubsocket = MagicMock()
subsocket = MagicMock() subsocket = MagicMock()
repsocket = MagicMock() repsocket = MagicMock()
agent = RobotGestureAgent("robot_gesture") agent = RobotGestureAgent("robot_gesture", address="")
agent.pubsocket = pubsocket agent.pubsocket = pubsocket
agent.subsocket = subsocket agent.subsocket = subsocket
agent.repsocket = repsocket agent.repsocket = repsocket
@@ -415,7 +419,7 @@ async def test_stop_closes_sockets():
async def test_initialization_with_custom_gesture_data(): async def test_initialization_with_custom_gesture_data():
"""Agent can be initialized with custom gesture data.""" """Agent can be initialized with custom gesture data."""
custom_gestures = ["custom1", "custom2", "custom3"] custom_gestures = ["custom1", "custom2", "custom3"]
agent = RobotGestureAgent("robot_gesture", gesture_data=custom_gestures) agent = RobotGestureAgent("robot_gesture", gesture_data=custom_gestures, address="")
assert agent.gesture_data == custom_gestures assert agent.gesture_data == custom_gestures
@@ -432,7 +436,7 @@ async def test_fetch_gestures_loop_handles_exception():
fake_repsocket.recv = recv_once fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock() fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"]) agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"], address="")
agent.repsocket = fake_repsocket agent.repsocket = fake_repsocket
agent.logger = MagicMock() agent.logger = MagicMock()
agent._running = True agent._running = True

View File

@@ -80,6 +80,7 @@ async def test_receive_programs_valid_and_invalid():
manager._internal_pub_socket = AsyncMock() manager._internal_pub_socket = AsyncMock()
manager.sub_socket = sub manager.sub_socket = sub
manager._create_agentspeak_and_send_to_bdi = AsyncMock() manager._create_agentspeak_and_send_to_bdi = AsyncMock()
manager._send_clear_llm_history = AsyncMock()
try: try:
# Will give StopAsyncIteration when the predefined `sub.recv_multipart` side-effects run out # Will give StopAsyncIteration when the predefined `sub.recv_multipart` side-effects run out
@@ -92,3 +93,26 @@ async def test_receive_programs_valid_and_invalid():
forwarded: Program = manager._create_agentspeak_and_send_to_bdi.await_args[0][0] forwarded: Program = manager._create_agentspeak_and_send_to_bdi.await_args[0][0]
assert forwarded.phases[0].norms[0].name == "N1" assert forwarded.phases[0].norms[0].name == "N1"
assert forwarded.phases[0].goals[0].name == "G1" assert forwarded.phases[0].goals[0].name == "G1"
# Verify history clear was triggered
assert (
manager._send_clear_llm_history.await_count == 2
) # first sends program to UserInterrupt, then clears LLM
@pytest.mark.asyncio
async def test_send_clear_llm_history(mock_settings):
# Ensure the mock returns a string for the agent name (just like in your LLM tests)
mock_settings.agent_settings.llm_agent_name = "llm_agent"
manager = BDIProgramManager(name="program_manager_test")
manager.send = AsyncMock()
await manager._send_clear_llm_history()
assert manager.send.await_count == 2
msg: InternalMessage = manager.send.await_args_list[0][0][0]
# Verify the content and recipient
assert msg.body == "clear_history"
assert msg.to == "llm_agent"

View File

@@ -6,10 +6,13 @@ import httpx
import pytest import pytest
from control_backend.agents.bdi import TextBeliefExtractorAgent from control_backend.agents.bdi import TextBeliefExtractorAgent
from control_backend.agents.bdi.text_belief_extractor_agent import BeliefState
from control_backend.core.agent_system import InternalMessage from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.belief_list import BeliefList from control_backend.schemas.belief_list import BeliefList
from control_backend.schemas.belief_message import Belief as InternalBelief
from control_backend.schemas.belief_message import BeliefMessage from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.chat_history import ChatHistory, ChatMessage
from control_backend.schemas.program import ( from control_backend.schemas.program import (
ConditionalNorm, ConditionalNorm,
KeywordBelief, KeywordBelief,
@@ -23,11 +26,21 @@ from control_backend.schemas.program import (
@pytest.fixture @pytest.fixture
def agent(): def llm():
agent = TextBeliefExtractorAgent("text_belief_agent") llm = TextBeliefExtractorAgent.LLM(MagicMock(), 4)
agent.send = AsyncMock() llm._query_llm = AsyncMock()
agent._query_llm = AsyncMock() return llm
return agent
@pytest.fixture
def agent(llm):
with patch(
"control_backend.agents.bdi.text_belief_extractor_agent.TextBeliefExtractorAgent.LLM",
return_value=llm,
):
agent = TextBeliefExtractorAgent("text_belief_agent")
agent.send = AsyncMock()
return agent
@pytest.fixture @pytest.fixture
@@ -102,24 +115,12 @@ async def test_handle_message_from_transcriber(agent, mock_settings):
agent.send.assert_awaited_once() # noqa # `agent.send` has no such property, but we mock it. agent.send.assert_awaited_once() # noqa # `agent.send` has no such property, but we mock it.
sent: InternalMessage = agent.send.call_args.args[0] # noqa sent: InternalMessage = agent.send.call_args.args[0] # noqa
assert sent.to == mock_settings.agent_settings.bdi_belief_collector_name assert sent.to == mock_settings.agent_settings.bdi_core_name
assert sent.thread == "beliefs" assert sent.thread == "beliefs"
parsed = json.loads(sent.body) parsed = BeliefMessage.model_validate_json(sent.body)
assert parsed == {"beliefs": {"user_said": [transcription]}, "type": "belief_extraction_text"} replaced_last = parsed.replace.pop()
assert replaced_last.name == "user_said"
assert replaced_last.arguments == [transcription]
@pytest.mark.asyncio
async def test_process_user_said(agent, mock_settings):
transcription = "this is a test"
await agent._user_said(transcription)
agent.send.assert_awaited_once() # noqa # `agent.send` has no such property, but we mock it.
sent: InternalMessage = agent.send.call_args.args[0] # noqa
assert sent.to == mock_settings.agent_settings.bdi_belief_collector_name
assert sent.thread == "beliefs"
parsed = json.loads(sent.body)
assert parsed["beliefs"]["user_said"] == [transcription]
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -144,46 +145,46 @@ async def test_query_llm():
"control_backend.agents.bdi.text_belief_extractor_agent.httpx.AsyncClient", "control_backend.agents.bdi.text_belief_extractor_agent.httpx.AsyncClient",
return_value=mock_async_client, return_value=mock_async_client,
): ):
agent = TextBeliefExtractorAgent("text_belief_agent") llm = TextBeliefExtractorAgent.LLM(MagicMock(), 4)
res = await agent._query_llm("hello world", {"type": "null"}) res = await llm._query_llm("hello world", {"type": "null"})
# Response content was set as "null", so should be deserialized as None # Response content was set as "null", so should be deserialized as None
assert res is None assert res is None
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_retry_query_llm_success(agent): async def test_retry_query_llm_success(llm):
agent._query_llm.return_value = None llm._query_llm.return_value = None
res = await agent._retry_query_llm("hello world", {"type": "null"}) res = await llm.query("hello world", {"type": "null"})
agent._query_llm.assert_called_once() llm._query_llm.assert_called_once()
assert res is None assert res is None
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_retry_query_llm_success_after_failure(agent): async def test_retry_query_llm_success_after_failure(llm):
agent._query_llm.side_effect = [KeyError(), "real value"] llm._query_llm.side_effect = [KeyError(), "real value"]
res = await agent._retry_query_llm("hello world", {"type": "string"}) res = await llm.query("hello world", {"type": "string"})
assert agent._query_llm.call_count == 2 assert llm._query_llm.call_count == 2
assert res == "real value" assert res == "real value"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_retry_query_llm_failures(agent): async def test_retry_query_llm_failures(llm):
agent._query_llm.side_effect = [KeyError(), KeyError(), KeyError(), "real value"] llm._query_llm.side_effect = [KeyError(), KeyError(), KeyError(), "real value"]
res = await agent._retry_query_llm("hello world", {"type": "string"}) res = await llm.query("hello world", {"type": "string"})
assert agent._query_llm.call_count == 3 assert llm._query_llm.call_count == 3
assert res is None assert res is None
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_retry_query_llm_fail_immediately(agent): async def test_retry_query_llm_fail_immediately(llm):
agent._query_llm.side_effect = [KeyError(), "real value"] llm._query_llm.side_effect = [KeyError(), "real value"]
res = await agent._retry_query_llm("hello world", {"type": "string"}, tries=1) res = await llm.query("hello world", {"type": "string"}, tries=1)
assert agent._query_llm.call_count == 1 assert llm._query_llm.call_count == 1
assert res is None assert res is None
@@ -192,7 +193,7 @@ async def test_extracting_semantic_beliefs(agent):
""" """
The Program Manager sends beliefs to this agent. Test whether the agent handles them correctly. The Program Manager sends beliefs to this agent. Test whether the agent handles them correctly.
""" """
assert len(agent.available_beliefs) == 0 assert len(agent.belief_inferrer.available_beliefs) == 0
beliefs = BeliefList( beliefs = BeliefList(
beliefs=[ beliefs=[
KeywordBelief( KeywordBelief(
@@ -213,26 +214,28 @@ async def test_extracting_semantic_beliefs(agent):
to=settings.agent_settings.text_belief_extractor_name, to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.bdi_program_manager_name, sender=settings.agent_settings.bdi_program_manager_name,
body=beliefs.model_dump_json(), body=beliefs.model_dump_json(),
thread="beliefs",
), ),
) )
assert len(agent.available_beliefs) == 2 assert len(agent.belief_inferrer.available_beliefs) == 2
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_handle_invalid_program(agent, sample_program): async def test_handle_invalid_beliefs(agent, sample_program):
agent.available_beliefs.append(sample_program.phases[0].norms[0].condition) agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.available_beliefs.append(sample_program.phases[0].triggers[0].condition) agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
assert len(agent.available_beliefs) == 2 assert len(agent.belief_inferrer.available_beliefs) == 2
await agent.handle_message( await agent.handle_message(
InternalMessage( InternalMessage(
to=settings.agent_settings.text_belief_extractor_name, to=settings.agent_settings.text_belief_extractor_name,
sender=settings.agent_settings.bdi_program_manager_name, sender=settings.agent_settings.bdi_program_manager_name,
body=json.dumps({"phases": "Invalid"}), body=json.dumps({"phases": "Invalid"}),
thread="beliefs",
), ),
) )
assert len(agent.available_beliefs) == 2 assert len(agent.belief_inferrer.available_beliefs) == 2
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -254,13 +257,13 @@ async def test_handle_robot_response(agent):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simulated_real_turn_with_beliefs(agent, sample_program): async def test_simulated_real_turn_with_beliefs(agent, llm, sample_program):
"""Test sending user message to extract beliefs from.""" """Test sending user message to extract beliefs from."""
agent.available_beliefs.append(sample_program.phases[0].norms[0].condition) agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.available_beliefs.append(sample_program.phases[0].triggers[0].condition) agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
# Send a user message with the belief that there's no more booze # Send a user message with the belief that there's no more booze
agent._query_llm.return_value = {"is_pirate": None, "no_more_booze": True} llm._query_llm.return_value = {"is_pirate": None, "no_more_booze": True}
assert len(agent.conversation.messages) == 0 assert len(agent.conversation.messages) == 0
await agent.handle_message( await agent.handle_message(
InternalMessage( InternalMessage(
@@ -275,20 +278,20 @@ async def test_simulated_real_turn_with_beliefs(agent, sample_program):
assert agent.send.call_count == 2 assert agent.send.call_count == 2
# First should be the beliefs message # First should be the beliefs message
message: InternalMessage = agent.send.call_args_list[0].args[0] message: InternalMessage = agent.send.call_args_list[1].args[0]
beliefs = BeliefMessage.model_validate_json(message.body) beliefs = BeliefMessage.model_validate_json(message.body)
assert len(beliefs.create) == 1 assert len(beliefs.create) == 1
assert beliefs.create[0].name == "no_more_booze" assert beliefs.create[0].name == "no_more_booze"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simulated_real_turn_no_beliefs(agent, sample_program): async def test_simulated_real_turn_no_beliefs(agent, llm, sample_program):
"""Test a user message to extract beliefs from, but no beliefs are formed.""" """Test a user message to extract beliefs from, but no beliefs are formed."""
agent.available_beliefs.append(sample_program.phases[0].norms[0].condition) agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.available_beliefs.append(sample_program.phases[0].triggers[0].condition) agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
# Send a user message with no new beliefs # Send a user message with no new beliefs
agent._query_llm.return_value = {"is_pirate": None, "no_more_booze": None} llm._query_llm.return_value = {"is_pirate": None, "no_more_booze": None}
await agent.handle_message( await agent.handle_message(
InternalMessage( InternalMessage(
to=settings.agent_settings.text_belief_extractor_name, to=settings.agent_settings.text_belief_extractor_name,
@@ -302,17 +305,17 @@ async def test_simulated_real_turn_no_beliefs(agent, sample_program):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simulated_real_turn_no_new_beliefs(agent, sample_program): async def test_simulated_real_turn_no_new_beliefs(agent, llm, sample_program):
""" """
Test a user message to extract beliefs from, but no new beliefs are formed because they already Test a user message to extract beliefs from, but no new beliefs are formed because they already
existed. existed.
""" """
agent.available_beliefs.append(sample_program.phases[0].norms[0].condition) agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.available_beliefs.append(sample_program.phases[0].triggers[0].condition) agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
agent.beliefs["is_pirate"] = True agent._current_beliefs = BeliefState(true={InternalBelief(name="is_pirate", arguments=None)})
# Send a user message with the belief the user is a pirate, still # Send a user message with the belief the user is a pirate, still
agent._query_llm.return_value = {"is_pirate": True, "no_more_booze": None} llm._query_llm.return_value = {"is_pirate": True, "no_more_booze": None}
await agent.handle_message( await agent.handle_message(
InternalMessage( InternalMessage(
to=settings.agent_settings.text_belief_extractor_name, to=settings.agent_settings.text_belief_extractor_name,
@@ -326,17 +329,19 @@ async def test_simulated_real_turn_no_new_beliefs(agent, sample_program):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simulated_real_turn_remove_belief(agent, sample_program): async def test_simulated_real_turn_remove_belief(agent, llm, sample_program):
""" """
Test a user message to extract beliefs from, but an existing belief is determined no longer to Test a user message to extract beliefs from, but an existing belief is determined no longer to
hold. hold.
""" """
agent.available_beliefs.append(sample_program.phases[0].norms[0].condition) agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.available_beliefs.append(sample_program.phases[0].triggers[0].condition) agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
agent.beliefs["no_more_booze"] = True agent._current_beliefs = BeliefState(
true={InternalBelief(name="no_more_booze", arguments=None)},
)
# Send a user message with the belief the user is a pirate, still # Send a user message with the belief the user is a pirate, still
agent._query_llm.return_value = {"is_pirate": None, "no_more_booze": False} llm._query_llm.return_value = {"is_pirate": None, "no_more_booze": False}
await agent.handle_message( await agent.handle_message(
InternalMessage( InternalMessage(
to=settings.agent_settings.text_belief_extractor_name, to=settings.agent_settings.text_belief_extractor_name,
@@ -349,18 +354,23 @@ async def test_simulated_real_turn_remove_belief(agent, sample_program):
assert agent.send.call_count == 2 assert agent.send.call_count == 2
# Agent's current beliefs should've changed # Agent's current beliefs should've changed
assert not agent.beliefs["no_more_booze"] assert any(b.name == "no_more_booze" for b in agent._current_beliefs.false)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_llm_failure_handling(agent, sample_program): async def test_llm_failure_handling(agent, llm, sample_program):
""" """
Check that the agent handles failures gracefully without crashing. Check that the agent handles failures gracefully without crashing.
""" """
agent._query_llm.side_effect = httpx.HTTPError("") llm._query_llm.side_effect = httpx.HTTPError("")
agent.available_beliefs.append(sample_program.phases[0].norms[0].condition) agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].norms[0].condition)
agent.available_beliefs.append(sample_program.phases[0].triggers[0].condition) agent.belief_inferrer.available_beliefs.append(sample_program.phases[0].triggers[0].condition)
belief_changes = await agent._infer_turn() belief_changes = await agent.belief_inferrer.infer_from_conversation(
ChatHistory(
messages=[ChatMessage(role="user", content="Good day!")],
),
)
assert len(belief_changes) == 0 assert len(belief_changes.true) == 0
assert len(belief_changes.false) == 0

View File

@@ -265,3 +265,23 @@ async def test_stream_query_llm_skips_non_data_lines(mock_httpx_client, mock_set
# Only the valid 'data:' line should yield content # Only the valid 'data:' line should yield content
assert tokens == ["Hi"] assert tokens == ["Hi"]
@pytest.mark.asyncio
async def test_clear_history_command(mock_settings):
"""Test that the 'clear_history' message clears the agent's memory."""
# setup LLM to have some history
mock_settings.agent_settings.bdi_program_manager_name = "bdi_program_manager_agent"
agent = LLMAgent("llm_agent")
agent.history = [
{"role": "user", "content": "Old conversation context"},
{"role": "assistant", "content": "Old response"},
]
assert len(agent.history) == 2
msg = InternalMessage(
to="llm_agent",
sender=mock_settings.agent_settings.bdi_program_manager_name,
body="clear_history",
)
await agent.handle_message(msg)
assert len(agent.history) == 0

View File

@@ -7,6 +7,15 @@ import zmq
from control_backend.agents.perception.vad_agent import VADAgent from control_backend.agents.perception.vad_agent import VADAgent
# We don't want to use real ZMQ in unit tests, for example because it can give errors when sockets
# aren't closed properly.
@pytest.fixture(autouse=True)
def mock_zmq():
with patch("zmq.asyncio.Context") as mock:
mock.instance.return_value = MagicMock()
yield mock
@pytest.fixture @pytest.fixture
def audio_out_socket(): def audio_out_socket():
return AsyncMock() return AsyncMock()
@@ -140,12 +149,10 @@ async def test_vad_model_load_failure_stops_agent(vad_agent):
# Patch stop to an AsyncMock so we can check it was awaited # Patch stop to an AsyncMock so we can check it was awaited
vad_agent.stop = AsyncMock() vad_agent.stop = AsyncMock()
result = await vad_agent.setup() await vad_agent.setup()
# Assert stop was called # Assert stop was called
vad_agent.stop.assert_awaited_once() vad_agent.stop.assert_awaited_once()
# Assert setup returned None
assert result is None
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -155,7 +162,7 @@ async def test_audio_out_bind_failure_sets_none_and_logs(vad_agent, caplog):
audio_out_socket is set to None, None is returned, and an error is logged. audio_out_socket is set to None, None is returned, and an error is logged.
""" """
mock_socket = MagicMock() mock_socket = MagicMock()
mock_socket.bind_to_random_port.side_effect = zmq.ZMQBindError() mock_socket.bind.side_effect = zmq.ZMQBindError()
with patch("control_backend.agents.perception.vad_agent.azmq.Context.instance") as mock_ctx: with patch("control_backend.agents.perception.vad_agent.azmq.Context.instance") as mock_ctx:
mock_ctx.return_value.socket.return_value = mock_socket mock_ctx.return_value.socket.return_value = mock_socket

View File

@@ -99,12 +99,75 @@ async def test_send_to_local_agent(monkeypatch):
# Patch inbox.put # Patch inbox.put
target.inbox.put = AsyncMock() target.inbox.put = AsyncMock()
message = InternalMessage(to="receiver", sender="sender", body="hello") message = InternalMessage(to=target.name, sender=sender.name, body="hello")
await sender.send(message) await sender.send(message)
target.inbox.put.assert_awaited_once_with(message) target.inbox.put.assert_awaited_once_with(message)
sender.logger.debug.assert_called_once()
@pytest.mark.asyncio
async def test_send_to_zmq_agent(monkeypatch):
sender = DummyAgent("sender")
target = "remote_receiver"
# Fake logger
sender.logger = MagicMock()
# Fake zmq
sender._internal_pub_socket = AsyncMock()
message = InternalMessage(to=target, sender=sender.name, body="hello")
await sender.send(message)
zmq_calls = sender._internal_pub_socket.send_multipart.call_args[0][0]
assert zmq_calls[0] == f"internal/{target}".encode()
@pytest.mark.asyncio
async def test_send_to_multiple_local_agents(monkeypatch):
sender = DummyAgent("sender")
target1 = DummyAgent("receiver1")
target2 = DummyAgent("receiver2")
# Fake logger
sender.logger = MagicMock()
# Patch inbox.put
target1.inbox.put = AsyncMock()
target2.inbox.put = AsyncMock()
message = InternalMessage(to=[target1.name, target2.name], sender=sender.name, body="hello")
await sender.send(message)
target1.inbox.put.assert_awaited_once_with(message)
target2.inbox.put.assert_awaited_once_with(message)
@pytest.mark.asyncio
async def test_send_to_multiple_agents(monkeypatch):
sender = DummyAgent("sender")
target1 = DummyAgent("receiver1")
target2 = "remote_receiver"
# Fake logger
sender.logger = MagicMock()
# Fake zmq
sender._internal_pub_socket = AsyncMock()
# Patch inbox.put
target1.inbox.put = AsyncMock()
message = InternalMessage(to=[target1.name, target2], sender=sender.name, body="hello")
await sender.send(message)
target1.inbox.put.assert_awaited_once_with(message)
zmq_calls = sender._internal_pub_socket.send_multipart.call_args[0][0]
assert zmq_calls[0] == f"internal/{target2}".encode()
@pytest.mark.asyncio @pytest.mark.asyncio