Compare commits

..

43 Commits

Author SHA1 Message Date
Pim Hutting
7c10c50336 chore: removed resetExperiment from backened
now it happens in UI

ref: N25B-400
2026-01-16 14:29:46 +01:00
Pim Hutting
6d03ba8a41 feat: added extra endpoint for norm pings
also made sure that you cannot skip phase on end phase

ref: N25B-400
2026-01-16 14:28:27 +01:00
Pim Hutting
041fc4ab6e chore: cond_norms unachieve and via belief msg 2026-01-15 09:02:52 +01:00
39e1bb1ead fix: sync issues
ref: N25B-447
2026-01-14 15:28:29 +01:00
8f6662e64a feat: phase transitions
ref: N25B-446
2026-01-14 13:22:51 +01:00
0794c549a8 chore: remove agentspeak file from tracking 2026-01-14 11:27:29 +01:00
ff24ab7a27 fix: default behavior and end phase
ref: N25B-448
2026-01-14 11:24:19 +01:00
43ac8ad69f chore: delete outdated files
ref: N25B-446
2026-01-14 10:58:41 +01:00
Twirre Meulenbelt
f7669c021b feat: support force completed goals in semantic belief agent
ref: N25B-427
2026-01-13 17:04:44 +01:00
Björn Otgaar
8f52f8bf0c Merge branch 'feat/monitoringpage-cb' of git.science.uu.nl:ics/sp/2025/n25b/pepperplus-cb into feat/monitoringpage-cb 2026-01-13 14:03:40 +01:00
Björn Otgaar
2a94a45b34 chore: adjust 'phase_id' to 'id' for correct payload 2026-01-13 14:03:37 +01:00
f87651f691 fix: achieved goal in bdi core
ref: N25B-400
2026-01-13 12:26:18 +01:00
Pim Hutting
65e0b2d250 feat: added correct message
ref: N25B-400
2026-01-13 12:05:38 +01:00
177e844349 feat: send achieved goal from interrupt->manager->semantic
ref: N25B-400
2026-01-13 11:46:17 +01:00
Pim Hutting
0df6040444 feat: added sending goal overwrites in Userinter.
ref: N25B-400
2026-01-13 11:26:03 +01:00
Twirre Meulenbelt
af81bd8620 Merge branch 'feat/multiple-receivers' into feat/monitoringpage-cb
# Conflicts:
#	src/control_backend/core/agent_system.py
#	src/control_backend/schemas/internal_message.py
2026-01-13 11:14:18 +01:00
Twirre Meulenbelt
70e05b6c92 test: sending to multiple agents, including remote
ref: N25B-441
2026-01-13 11:10:35 +01:00
c0b8fb8612 feat: able to send to multiple receivers
ref: N25B-441
2026-01-13 11:06:42 +01:00
Pim Hutting
d499111ea4 feat: added pause functionality
Storms code wasnt fully included in Bjorns branch

ref: N25B-400
2026-01-13 00:52:04 +01:00
Pim Hutting
72c2c57f26 chore: merged button functionality and fix bug
merged björns branch that has the following button functionality
-Pause/resume
-Next phase
-Restart phase
-reset experiment
fix bug where norms where not properly sent to the user interrupt agent

ref: N25B-400
2026-01-12 19:31:50 +01:00
Pim Hutting
4a014b577a Merge remote-tracking branch 'origin/feat/reset-skip-buttons' into feat/monitoringpage-cb 2026-01-12 19:19:31 +01:00
Pim Hutting
c45a258b22 fix: fixed a bug where norms where not updated
Now in UserInterruptAgent we store the norm.norm and not the slugified norm

ref: N25B-400
2026-01-12 19:07:05 +01:00
0f09276477 fix: send norms back to UI
ref: N25B-400
2026-01-12 17:02:39 +01:00
4e113c2d5c fix: default plan and norm force
ref: N25B-400
2026-01-12 16:20:24 +01:00
Pim Hutting
54c835cc0f feat: added force_norm handling in BDI core agent
ref: N25B-400
2026-01-12 15:37:04 +01:00
Pim Hutting
c4ccbcd354 Merge remote-tracking branch 'origin/feat/extra-agentspeak-functionality' into feat/monitoringpage-cb 2026-01-12 15:24:48 +01:00
Pim Hutting
d202abcd1b fix: phases update correctly
there was a bug where phases would not update without restarting cb

ref: N25B-400
2026-01-12 12:51:24 +01:00
Björn Otgaar
c91b999104 chore: fix bugs and make sure connected robots work 2026-01-08 15:31:44 +01:00
Pim Hutting
5e2126fc21 chore: code cleanup
ref: N25B-400
2026-01-08 15:05:43 +01:00
Pim Hutting
500bbc2d82 feat: added goal start sending functionality
ref: N25B-400
2026-01-08 14:52:55 +01:00
Björn Otgaar
1360567820 chore: indenting 2026-01-08 13:01:38 +01:00
Björn Otgaar
cc0d5af28c chore: fixing bugs 2026-01-08 12:56:22 +01:00
Pim Hutting
3a8d1730a1 fix: made mapping for conditional norms only
ref: N25B-400
2026-01-08 12:29:16 +01:00
Pim Hutting
b27e5180c4 feat: small implementation change
ref: N25B-400
2026-01-08 11:25:53 +01:00
Pim Hutting
6b34f4b82c fix: small bugfix
ref: N25B-400
2026-01-08 10:59:24 +01:00
Pim Hutting
4bf2be6359 feat: added a functionality for monitoring page
ref: N25B-400
2026-01-08 09:56:10 +01:00
Pim Hutting
20e5e46639 Merge remote-tracking branch 'origin/feat/extra-agentspeak-functionality' into feat/monitoringpage-cb 2026-01-07 22:42:40 +01:00
Pim Hutting
365d449666 feat: commit before I can merge new changes
ref: N25B-400
2026-01-07 22:41:59 +01:00
Björn Otgaar
be88323cf7 chore: add one endpoint fo avoid errors 2026-01-07 18:24:35 +01:00
Pim Hutting
be6bbbb849 feat: added endpoint userinterrupt to userinterrupt
ref: N25B-400
2026-01-07 17:42:54 +01:00
Storm
76dfcb23ef feat: added pause functionality
ref: N25B-350
2026-01-07 16:03:49 +01:00
Björn Otgaar
34afca6652 chore: automatically send the experiment controls to the bdi core in the user interupt agent. 2026-01-07 15:07:33 +01:00
Björn Otgaar
324a63e5cc chore: add styles to user_interrupt_agent 2026-01-07 14:45:42 +01:00
24 changed files with 885 additions and 810 deletions

2
.gitignore vendored
View File

@@ -222,6 +222,8 @@ __marimo__/
docs/* docs/*
!docs/conf.py !docs/conf.py
# Generated files
agentspeak.asl

View File

@@ -83,6 +83,8 @@ class RobotGestureAgent(BaseAgent):
self.subsocket.close() self.subsocket.close()
if self.pubsocket: if self.pubsocket:
self.pubsocket.close() self.pubsocket.close()
if self.repsocket:
self.repsocket.close()
await super().stop() await super().stop()
async def handle_message(self, msg: InternalMessage): async def handle_message(self, msg: InternalMessage):

View File

@@ -3,9 +3,11 @@ from functools import singledispatchmethod
from slugify import slugify from slugify import slugify
from control_backend.agents.bdi.agentspeak_ast import ( from control_backend.agents.bdi.agentspeak_ast import (
AstAtom,
AstBinaryOp, AstBinaryOp,
AstExpression, AstExpression,
AstLiteral, AstLiteral,
AstNumber,
AstPlan, AstPlan,
AstProgram, AstProgram,
AstRule, AstRule,
@@ -17,6 +19,7 @@ from control_backend.agents.bdi.agentspeak_ast import (
TriggerType, TriggerType,
) )
from control_backend.schemas.program import ( from control_backend.schemas.program import (
BaseGoal,
BasicNorm, BasicNorm,
ConditionalNorm, ConditionalNorm,
GestureAction, GestureAction,
@@ -42,7 +45,13 @@ class AgentSpeakGenerator:
def generate(self, program: Program) -> str: def generate(self, program: Program) -> str:
self._asp = AstProgram() self._asp = AstProgram()
if program.phases:
self._asp.rules.append(AstRule(self._astify(program.phases[0]))) self._asp.rules.append(AstRule(self._astify(program.phases[0])))
else:
self._asp.rules.append(AstRule(AstLiteral("phase", [AstString("end")])))
self._asp.rules.append(AstRule(AstLiteral("!notify_cycle")))
self._add_keyword_inference() self._add_keyword_inference()
self._add_default_plans() self._add_default_plans()
@@ -70,6 +79,7 @@ class AgentSpeakGenerator:
self._add_reply_with_goal_plan() self._add_reply_with_goal_plan()
self._add_say_plan() self._add_say_plan()
self._add_reply_plan() self._add_reply_plan()
self._add_notify_cycle_plan()
def _add_reply_with_goal_plan(self): def _add_reply_with_goal_plan(self):
self._asp.plans.append( self._asp.plans.append(
@@ -132,6 +142,29 @@ class AgentSpeakGenerator:
) )
) )
def _add_notify_cycle_plan(self):
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("notify_cycle"),
[],
[
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"findall",
[AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")],
),
),
AstStatement(
StatementType.DO_ACTION, AstLiteral("notify_norms", [AstVar("Norms")])
),
AstStatement(StatementType.DO_ACTION, AstLiteral("wait", [AstNumber(100)])),
AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("notify_cycle")),
],
)
)
def _process_phases(self, phases: list[Phase]) -> None: def _process_phases(self, phases: list[Phase]) -> None:
for curr_phase, next_phase in zip([None] + phases, phases + [None], strict=True): for curr_phase, next_phase in zip([None] + phases, phases + [None], strict=True):
if curr_phase: if curr_phase:
@@ -146,7 +179,9 @@ class AgentSpeakGenerator:
trigger_literal=AstLiteral("user_said", [AstVar("Message")]), trigger_literal=AstLiteral("user_said", [AstVar("Message")]),
context=[AstLiteral("phase", [AstString("end")])], context=[AstLiteral("phase", [AstString("end")])],
body=[ body=[
AstStatement(StatementType.DO_ACTION, AstLiteral("notify_user_said")), AstStatement(
StatementType.DO_ACTION, AstLiteral("notify_user_said", [AstVar("Message")])
),
AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("reply")), AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("reply")),
], ],
) )
@@ -174,12 +209,24 @@ class AgentSpeakGenerator:
self._astify(to_phase) if to_phase else AstLiteral("phase", [AstString("end")]) self._astify(to_phase) if to_phase else AstLiteral("phase", [AstString("end")])
) )
context = [from_phase_ast] check_context = [from_phase_ast]
if from_phase: if from_phase:
for goal in from_phase.goals: for goal in from_phase.goals:
context.append(self._astify(goal, achieved=True)) check_context.append(self._astify(goal, achieved=True))
force_context = [from_phase_ast]
body = [ body = [
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"notify_transition_phase",
[
AstString(str(from_phase.id)),
AstString(str(to_phase.id) if to_phase else "end"),
],
),
),
AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast), AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast),
AstStatement(StatementType.ADD_BELIEF, to_phase_ast), AstStatement(StatementType.ADD_BELIEF, to_phase_ast),
] ]
@@ -196,22 +243,23 @@ class AgentSpeakGenerator:
# ] # ]
# ) # )
# Notify outside world about transition # Check
body.append( self._asp.plans.append(
AstStatement( AstPlan(
StatementType.DO_ACTION, TriggerType.ADDED_GOAL,
AstLiteral( AstLiteral("transition_phase"),
"notify_transition_phase", check_context,
[ [
AstString(str(from_phase.id)), AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("force_transition_phase")),
AstString(str(to_phase.id) if to_phase else "end"),
], ],
),
) )
) )
# Force
self._asp.plans.append( self._asp.plans.append(
AstPlan(TriggerType.ADDED_GOAL, AstLiteral("transition_phase"), context, body) AstPlan(
TriggerType.ADDED_GOAL, AstLiteral("force_transition_phase"), force_context, body
)
) )
def _process_norm(self, norm: Norm, phase: Phase) -> None: def _process_norm(self, norm: Norm, phase: Phase) -> None:
@@ -219,7 +267,11 @@ class AgentSpeakGenerator:
match norm: match norm:
case ConditionalNorm(condition=cond): case ConditionalNorm(condition=cond):
rule = AstRule(self._astify(norm), self._astify(phase) & self._astify(cond)) rule = AstRule(
self._astify(norm),
self._astify(phase) & self._astify(cond)
| AstAtom(f"force_{self.slugify(norm)}"),
)
case BasicNorm(): case BasicNorm():
rule = AstRule(self._astify(norm), self._astify(phase)) rule = AstRule(self._astify(norm), self._astify(phase))
@@ -325,6 +377,10 @@ class AgentSpeakGenerator:
if isinstance(step, Goal): if isinstance(step, Goal):
step.can_fail = False # triggers are continuous sequence step.can_fail = False # triggers are continuous sequence
subgoals.append(step) subgoals.append(step)
# Arbitrary wait for UI to display nicely
body.append(AstStatement(StatementType.DO_ACTION, AstLiteral("wait", [AstNumber(2000)])))
body.append( body.append(
AstStatement( AstStatement(
StatementType.DO_ACTION, StatementType.DO_ACTION,
@@ -368,6 +424,16 @@ class AgentSpeakGenerator:
) )
) )
# Force phase transition fallback
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("force_transition_phase"),
[],
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
@singledispatchmethod @singledispatchmethod
def _astify(self, element: ProgramElement) -> AstExpression: def _astify(self, element: ProgramElement) -> AstExpression:
raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.") raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.")
@@ -423,6 +489,11 @@ class AgentSpeakGenerator:
def slugify(element: ProgramElement) -> str: def slugify(element: ProgramElement) -> str:
raise NotImplementedError(f"Cannot convert element {element} to a slug.") raise NotImplementedError(f"Cannot convert element {element} to a slug.")
@slugify.register
@staticmethod
def _(n: Norm) -> str:
return f"norm_{AgentSpeakGenerator._slugify_str(n.norm)}"
@slugify.register @slugify.register
@staticmethod @staticmethod
def _(sb: SemanticBelief) -> str: def _(sb: SemanticBelief) -> str:
@@ -430,7 +501,7 @@ class AgentSpeakGenerator:
@slugify.register @slugify.register
@staticmethod @staticmethod
def _(g: Goal) -> str: def _(g: BaseGoal) -> str:
return AgentSpeakGenerator._slugify_str(g.name) return AgentSpeakGenerator._slugify_str(g.name)
@slugify.register @slugify.register

View File

@@ -1,203 +0,0 @@
import typing
from dataclasses import dataclass, field
# --- Types ---
@dataclass
class BeliefLiteral:
"""
Represents a literal or atom.
Example: phase(1), user_said("hello"), ~started
"""
functor: str
args: list[str] = field(default_factory=list)
negated: bool = False
def __str__(self):
# In ASL, 'not' is usually for closed-world assumption (prolog style),
# '~' is for explicit negation in beliefs.
# For simplicity in behavior trees, we often use 'not' for conditions.
prefix = "not " if self.negated else ""
if not self.args:
return f"{prefix}{self.functor}"
# Clean args to ensure strings are quoted if they look like strings,
# but usually the converter handles the quoting of string literals.
args_str = ", ".join(self.args)
return f"{prefix}{self.functor}({args_str})"
@dataclass
class GoalLiteral:
name: str
def __str__(self):
return f"!{self.name}"
@dataclass
class ActionLiteral:
"""
Represents a step in a plan body.
Example: .say("Hello") or !achieve_goal
"""
code: str
def __str__(self):
return self.code
@dataclass
class BinaryOp:
"""
Represents logical operations.
Example: (A & B) | C
"""
left: "Expression | str"
operator: typing.Literal["&", "|"]
right: "Expression | str"
def __str__(self):
l_str = str(self.left)
r_str = str(self.right)
if isinstance(self.left, BinaryOp):
l_str = f"({l_str})"
if isinstance(self.right, BinaryOp):
r_str = f"({r_str})"
return f"{l_str} {self.operator} {r_str}"
Literal = BeliefLiteral | GoalLiteral | ActionLiteral
Expression = Literal | BinaryOp | str
@dataclass
class Rule:
"""
Represents an inference rule.
Example: head :- body.
"""
head: Expression
body: Expression | None = None
def __str__(self):
if not self.body:
return f"{self.head}."
return f"{self.head} :- {self.body}."
@dataclass
class PersistentRule:
"""
Represents an inference rule, where the inferred belief is persistent when formed.
"""
head: Expression
body: Expression
def __str__(self):
if not self.body:
raise Exception("Rule without body should not be persistent.")
lines = []
if isinstance(self.body, BinaryOp):
lines.append(f"+{self.body.left}")
if self.body.operator == "&":
lines.append(f" : {self.body.right}")
lines.append(f" <- +{self.head}.")
if self.body.operator == "|":
lines.append(f"+{self.body.right}")
lines.append(f" <- +{self.head}.")
return "\n".join(lines)
@dataclass
class Plan:
"""
Represents a plan.
Syntax: +trigger : context <- body.
"""
trigger: BeliefLiteral | GoalLiteral
context: list[Expression] = field(default_factory=list)
body: list[ActionLiteral] = field(default_factory=list)
def __str__(self):
# Indentation settings
INDENT = " "
ARROW = "\n <- "
COLON = "\n : "
# Build Header
header = f"+{self.trigger}"
if self.context:
ctx_str = f" &\n{INDENT}".join(str(c) for c in self.context)
header += f"{COLON}{ctx_str}"
# Case 1: Empty body
if not self.body:
return f"{header}."
# Case 2: Short body (optional optimization, keeping it uniform usually better)
header += ARROW
lines = []
# We start the first action on the same line or next line.
# Let's put it on the next line for readability if there are multiple.
if len(self.body) == 1:
return f"{header}{self.body[0]}."
# First item
lines.append(f"{header}{self.body[0]};")
# Middle items
for item in self.body[1:-1]:
lines.append(f"{INDENT}{item};")
# Last item
lines.append(f"{INDENT}{self.body[-1]}.")
return "\n".join(lines)
@dataclass
class AgentSpeakFile:
"""
Root element representing the entire generated file.
"""
initial_beliefs: list[Rule] = field(default_factory=list)
inference_rules: list[Rule | PersistentRule] = field(default_factory=list)
plans: list[Plan] = field(default_factory=list)
def __str__(self):
sections = []
if self.initial_beliefs:
sections.append("// --- Initial Beliefs & Facts ---")
sections.extend(str(rule) for rule in self.initial_beliefs)
sections.append("")
if self.inference_rules:
sections.append("// --- Inference Rules ---")
sections.extend(str(rule) for rule in self.inference_rules if isinstance(rule, Rule))
sections.append("")
sections.extend(
str(rule) for rule in self.inference_rules if isinstance(rule, PersistentRule)
)
sections.append("")
if self.plans:
sections.append("// --- Plans ---")
# Separate plans by a newline for readability
sections.extend(str(plan) + "\n" for plan in self.plans)
return "\n".join(sections)

View File

@@ -1,425 +0,0 @@
import asyncio
import time
from functools import singledispatchmethod
from slugify import slugify
from control_backend.agents.bdi import BDICoreAgent
from control_backend.agents.bdi.asl_ast import (
ActionLiteral,
AgentSpeakFile,
BeliefLiteral,
BinaryOp,
Expression,
GoalLiteral,
PersistentRule,
Plan,
Rule,
)
from control_backend.agents.bdi.bdi_program_manager import test_program
from control_backend.schemas.program import (
BasicBelief,
Belief,
ConditionalNorm,
GestureAction,
Goal,
InferredBelief,
KeywordBelief,
LLMAction,
LogicalOperator,
Phase,
Program,
ProgramElement,
SemanticBelief,
SpeechAction,
)
async def do_things():
res = input("Wanna generate")
if res == "y":
program = AgentSpeakGenerator().generate(test_program)
filename = f"{int(time.time())}.asl"
with open(filename, "w") as f:
f.write(program)
else:
# filename = "0test.asl"
filename = "1766062491.asl"
bdi_agent = BDICoreAgent("BDICoreAgent", filename)
flag = asyncio.Event()
await bdi_agent.start()
await flag.wait()
def do_other_things():
print(AgentSpeakGenerator().generate(test_program))
class AgentSpeakGenerator:
"""
Converts a Pydantic Program behavior model into an AgentSpeak(L) AST,
then renders it to a string.
"""
def generate(self, program: Program) -> str:
asl = AgentSpeakFile()
self._generate_startup(program, asl)
for i, phase in enumerate(program.phases):
next_phase = program.phases[i + 1] if i < len(program.phases) - 1 else None
self._generate_phase_flow(phase, next_phase, asl)
self._generate_norms(phase, asl)
self._generate_goals(phase, asl)
self._generate_triggers(phase, asl)
self._generate_fallbacks(program, asl)
return str(asl)
# --- Section: Startup & Phase Management ---
def _generate_startup(self, program: Program, asl: AgentSpeakFile):
if not program.phases:
return
# Initial belief: phase(start).
asl.initial_beliefs.append(Rule(head=BeliefLiteral("phase", ['"start"'])))
# Startup plan: +started : phase(start) <- -phase(start); +phase(first_id).
asl.plans.append(
Plan(
trigger=BeliefLiteral("started"),
context=[BeliefLiteral("phase", ['"start"'])],
body=[
ActionLiteral('-phase("start")'),
ActionLiteral(f'+phase("{program.phases[0].id}")'),
],
)
)
# Initial plans:
asl.plans.append(
Plan(
trigger=GoalLiteral("generate_response_with_goal(Goal)"),
context=[BeliefLiteral("user_said", ["Message"])],
body=[
ActionLiteral("+responded_this_turn"),
ActionLiteral(".findall(Norm, norm(Norm), Norms)"),
ActionLiteral(".reply_with_goal(Message, Norms, Goal)"),
],
)
)
def _generate_phase_flow(self, phase: Phase, next_phase: Phase | None, asl: AgentSpeakFile):
"""Generates the main loop listener and the transition logic for this phase."""
# +user_said(Message) : phase(ID) <- !goal1; !goal2; !transition_phase.
goal_actions = [ActionLiteral("-responded_this_turn")]
goal_actions += [
ActionLiteral(f"!check_{self._slugify_str(keyword)}")
for keyword in self._get_keyword_conditionals(phase)
]
goal_actions += [ActionLiteral(f"!{self._slugify(g)}") for g in phase.goals]
goal_actions.append(ActionLiteral("!transition_phase"))
asl.plans.append(
Plan(
trigger=BeliefLiteral("user_said", ["Message"]),
context=[BeliefLiteral("phase", [f'"{phase.id}"'])],
body=goal_actions,
)
)
# +!transition_phase : phase(ID) <- -phase(ID); +(NEXT_ID).
next_id = str(next_phase.id) if next_phase else "end"
transition_context = [BeliefLiteral("phase", [f'"{phase.id}"'])]
if phase.goals:
transition_context.append(BeliefLiteral(f"achieved_{self._slugify(phase.goals[-1])}"))
asl.plans.append(
Plan(
trigger=GoalLiteral("transition_phase"),
context=transition_context,
body=[
ActionLiteral(f'-phase("{phase.id}")'),
ActionLiteral(f'+phase("{next_id}")'),
ActionLiteral("user_said(Anything)"),
ActionLiteral("-+user_said(Anything)"),
],
)
)
def _get_keyword_conditionals(self, phase: Phase) -> list[str]:
res = []
for belief in self._extract_basic_beliefs_from_phase(phase):
if isinstance(belief, KeywordBelief):
res.append(belief.keyword)
return res
# --- Section: Norms & Beliefs ---
def _generate_norms(self, phase: Phase, asl: AgentSpeakFile):
for norm in phase.norms:
norm_slug = f'"{norm.norm}"'
head = BeliefLiteral("norm", [norm_slug])
# Base context is the phase
phase_lit = BeliefLiteral("phase", [f'"{phase.id}"'])
if isinstance(norm, ConditionalNorm):
self._ensure_belief_inference(norm.condition, asl)
condition_expr = self._belief_to_expr(norm.condition)
body = BinaryOp(phase_lit, "&", condition_expr)
else:
body = phase_lit
asl.inference_rules.append(Rule(head=head, body=body))
def _ensure_belief_inference(self, belief: Belief, asl: AgentSpeakFile):
"""
Recursively adds rules to infer beliefs.
Checks strictly to avoid duplicates if necessary,
though ASL engines often handle redefinition or we can use a set to track processed IDs.
"""
if isinstance(belief, KeywordBelief):
pass
# # Rule: keyword_said("word") :- user_said(M) & .substring("word", M, P) & P >= 0.
# kwd_slug = f'"{belief.keyword}"'
# head = BeliefLiteral("keyword_said", [kwd_slug])
#
# # Avoid duplicates
# if any(str(r.head) == str(head) for r in asl.inference_rules):
# return
#
# body = BinaryOp(
# BeliefLiteral("user_said", ["Message"]),
# "&",
# BinaryOp(f".substring({kwd_slug}, Message, Pos)", "&", "Pos >= 0"),
# )
#
# asl.inference_rules.append(Rule(head=head, body=body))
elif isinstance(belief, InferredBelief):
self._ensure_belief_inference(belief.left, asl)
self._ensure_belief_inference(belief.right, asl)
slug = self._slugify(belief)
head = BeliefLiteral(slug)
if any(str(r.head) == str(head) for r in asl.inference_rules):
return
op_char = "&" if belief.operator == LogicalOperator.AND else "|"
body = BinaryOp(
self._belief_to_expr(belief.left), op_char, self._belief_to_expr(belief.right)
)
asl.inference_rules.append(PersistentRule(head=head, body=body))
def _belief_to_expr(self, belief: Belief) -> Expression:
if isinstance(belief, KeywordBelief):
return BeliefLiteral("keyword_said", [f'"{belief.keyword}"'])
else:
return BeliefLiteral(self._slugify(belief))
# --- Section: Goals ---
def _generate_goals(self, phase: Phase, asl: AgentSpeakFile):
previous_goal: Goal | None = None
for goal in phase.goals:
self._generate_goal_plan_recursive(goal, str(phase.id), previous_goal, asl)
previous_goal = goal
def _generate_goal_plan_recursive(
self,
goal: Goal,
phase_id: str,
previous_goal: Goal | None,
asl: AgentSpeakFile,
responded_needed: bool = True,
can_fail: bool = True,
):
goal_slug = self._slugify(goal)
# phase(ID) & not responded_this_turn & not achieved_goal
context = [
BeliefLiteral("phase", [f'"{phase_id}"']),
]
if responded_needed:
context.append(BeliefLiteral("responded_this_turn", negated=True))
if can_fail:
context.append(BeliefLiteral(f"achieved_{goal_slug}", negated=True))
if previous_goal:
prev_slug = self._slugify(previous_goal)
context.append(BeliefLiteral(f"achieved_{prev_slug}"))
body_actions = []
sub_goals_to_process = []
for step in goal.plan.steps:
if isinstance(step, Goal):
sub_slug = self._slugify(step)
body_actions.append(ActionLiteral(f"!{sub_slug}"))
sub_goals_to_process.append(step)
elif isinstance(step, SpeechAction):
body_actions.append(ActionLiteral(f'.say("{step.text}")'))
elif isinstance(step, GestureAction):
body_actions.append(ActionLiteral(f'.gesture("{step.gesture}")'))
elif isinstance(step, LLMAction):
body_actions.append(ActionLiteral(f'!generate_response_with_goal("{step.goal}")'))
# Mark achievement
if not goal.can_fail:
body_actions.append(ActionLiteral(f"+achieved_{goal_slug}"))
asl.plans.append(Plan(trigger=GoalLiteral(goal_slug), context=context, body=body_actions))
asl.plans.append(
Plan(trigger=GoalLiteral(goal_slug), context=[], body=[ActionLiteral("true")])
)
prev_sub = None
for sub_goal in sub_goals_to_process:
self._generate_goal_plan_recursive(sub_goal, phase_id, prev_sub, asl)
prev_sub = sub_goal
# --- Section: Triggers ---
def _generate_triggers(self, phase: Phase, asl: AgentSpeakFile):
for keyword in self._get_keyword_conditionals(phase):
asl.plans.append(
Plan(
trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"),
context=[
ActionLiteral(
f'user_said(Message) & .substring("{keyword}", Message, Pos) & Pos >= 0'
)
],
body=[
ActionLiteral(f'+keyword_said("{keyword}")'),
ActionLiteral(f'-keyword_said("{keyword}")'),
],
)
)
asl.plans.append(
Plan(
trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"),
body=[ActionLiteral("true")],
)
)
for trigger in phase.triggers:
self._ensure_belief_inference(trigger.condition, asl)
trigger_belief_slug = self._belief_to_expr(trigger.condition)
body_actions = []
sub_goals = []
for step in trigger.plan.steps:
if isinstance(step, Goal):
sub_slug = self._slugify(step)
body_actions.append(ActionLiteral(f"!{sub_slug}"))
sub_goals.append(step)
elif isinstance(step, SpeechAction):
body_actions.append(ActionLiteral(f'.say("{step.text}")'))
elif isinstance(step, GestureAction):
body_actions.append(
ActionLiteral(f'.gesture("{step.gesture.type}", "{step.gesture.name}")')
)
elif isinstance(step, LLMAction):
body_actions.append(
ActionLiteral(f'!generate_response_with_goal("{step.goal}")')
)
asl.plans.append(
Plan(
trigger=BeliefLiteral(trigger_belief_slug),
context=[BeliefLiteral("phase", [f'"{phase.id}"'])],
body=body_actions,
)
)
# Recurse for triggered goals
prev_sub = None
for sub_goal in sub_goals:
self._generate_goal_plan_recursive(
sub_goal, str(phase.id), prev_sub, asl, False, False
)
prev_sub = sub_goal
# --- Section: Fallbacks ---
def _generate_fallbacks(self, program: Program, asl: AgentSpeakFile):
asl.plans.append(
Plan(trigger=GoalLiteral("transition_phase"), context=[], body=[ActionLiteral("true")])
)
# --- Helpers ---
@singledispatchmethod
def _slugify(self, element: ProgramElement) -> str:
if element.name:
raise NotImplementedError("Cannot slugify this element.")
return self._slugify_str(element.name)
@_slugify.register
def _(self, goal: Goal) -> str:
if goal.name:
return self._slugify_str(goal.name)
return f"goal_{goal.id.hex}"
@_slugify.register
def _(self, kwb: KeywordBelief) -> str:
return f"keyword_said({kwb.keyword})"
@_slugify.register
def _(self, sb: SemanticBelief) -> str:
return self._slugify_str(sb.description)
@_slugify.register
def _(self, ib: InferredBelief) -> str:
return self._slugify_str(ib.name)
def _slugify_str(self, text: str) -> str:
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])
def _extract_basic_beliefs_from_program(self, program: Program) -> list[BasicBelief]:
beliefs = []
for phase in program.phases:
beliefs.extend(self._extract_basic_beliefs_from_phase(phase))
return beliefs
def _extract_basic_beliefs_from_phase(self, phase: Phase) -> list[BasicBelief]:
beliefs = []
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += self._extract_basic_beliefs_from_belief(norm.condition)
for trigger in phase.triggers:
beliefs += self._extract_basic_beliefs_from_belief(trigger.condition)
return beliefs
def _extract_basic_beliefs_from_belief(self, belief: Belief) -> list[BasicBelief]:
if isinstance(belief, InferredBelief):
return self._extract_basic_beliefs_from_belief(
belief.left
) + self._extract_basic_beliefs_from_belief(belief.right)
return [belief]
if __name__ == "__main__":
asyncio.run(do_things())
# do_other_things()y

View File

@@ -107,7 +107,6 @@ class BDICoreAgent(BaseAgent):
if not maybe_more_work: if not maybe_more_work:
deadline = self.bdi_agent.shortest_deadline() deadline = self.bdi_agent.shortest_deadline()
if deadline: if deadline:
self.logger.debug("Sleeping until %s", deadline)
await asyncio.sleep(deadline - time.time()) await asyncio.sleep(deadline - time.time())
maybe_more_work = True maybe_more_work = True
else: else:
@@ -156,14 +155,17 @@ class BDICoreAgent(BaseAgent):
) )
await self.send(out_msg) await self.send(out_msg)
case settings.agent_settings.user_interrupt_name: case settings.agent_settings.user_interrupt_name:
content = msg.body self.logger.debug("Received user interruption: %s", msg)
self.logger.debug("Received user interruption: %s", content)
match msg.thread: match msg.thread:
case "force_phase_transition": case "force_phase_transition":
self._set_goal("transition_phase") self._set_goal("transition_phase")
case "force_trigger": case "force_trigger":
self._force_trigger(msg.body) self._force_trigger(msg.body)
case "force_norm":
self._force_norm(msg.body)
case "force_next_phase":
self._force_next_phase()
case _: case _:
self.logger.warning("Received unknow user interruption: %s", msg) self.logger.warning("Received unknow user interruption: %s", msg)
@@ -302,15 +304,21 @@ class BDICoreAgent(BaseAgent):
self.logger.debug(f"Set goal !{self.format_belief_string(name, args)}.") self.logger.debug(f"Set goal !{self.format_belief_string(name, args)}.")
def _force_trigger(self, name: str): def _force_trigger(self, name: str):
self.bdi_agent.call( self._set_goal(name)
agentspeak.Trigger.addition,
agentspeak.GoalType.achievement,
agentspeak.Literal(name),
agentspeak.runtime.Intention(),
)
self.logger.info("Manually forced trigger %s.", name) self.logger.info("Manually forced trigger %s.", name)
# TODO: make this compatible for critical norms
def _force_norm(self, name: str):
self._add_belief(f"force_{name}")
self.logger.info("Manually forced norm %s.", name)
def _force_next_phase(self):
self._set_goal("force_transition_phase")
self.logger.info("Manually forced phase transition.")
def _add_custom_actions(self) -> None: def _add_custom_actions(self) -> None:
""" """
Add any custom actions here. Inside `@self.actions.add()`, the first argument is Add any custom actions here. Inside `@self.actions.add()`, the first argument is
@@ -326,9 +334,6 @@ class BDICoreAgent(BaseAgent):
message_text = agentspeak.grounded(term.args[0], intention.scope) message_text = agentspeak.grounded(term.args[0], intention.scope)
norms = agentspeak.grounded(term.args[1], intention.scope) norms = agentspeak.grounded(term.args[1], intention.scope)
self.logger.debug("Norms: %s", norms)
self.logger.debug("User text: %s", message_text)
self.add_behavior(self._send_to_llm(str(message_text), str(norms), "")) self.add_behavior(self._send_to_llm(str(message_text), str(norms), ""))
yield yield
@@ -341,16 +346,22 @@ class BDICoreAgent(BaseAgent):
message_text = agentspeak.grounded(term.args[0], intention.scope) message_text = agentspeak.grounded(term.args[0], intention.scope)
norms = agentspeak.grounded(term.args[1], intention.scope) norms = agentspeak.grounded(term.args[1], intention.scope)
goal = agentspeak.grounded(term.args[2], intention.scope) goal = agentspeak.grounded(term.args[2], intention.scope)
self.logger.debug(
'"reply_with_goal" action called with message=%s, norms=%s, goal=%s',
message_text,
norms,
goal,
)
self.add_behavior(self._send_to_llm(str(message_text), str(norms), str(goal))) self.add_behavior(self._send_to_llm(str(message_text), str(norms), str(goal)))
yield yield
@self.actions.add(".notify_norms", 1)
def _notify_norms(agent, term, intention):
norms = agentspeak.grounded(term.args[0], intention.scope)
norm_update_message = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
thread="active_norms_update",
body=str(norms),
)
self.add_behavior(self.send(norm_update_message, should_log=False))
yield
@self.actions.add(".say", 1) @self.actions.add(".say", 1)
def _say(agent, term, intention): def _say(agent, term, intention):
""" """
@@ -459,7 +470,6 @@ class BDICoreAgent(BaseAgent):
body=str(trigger_name), body=str(trigger_name),
) )
# TODO: check with Pim
self.add_behavior(self.send(msg)) self.add_behavior(self.send(msg))
yield yield
@@ -502,6 +512,10 @@ class BDICoreAgent(BaseAgent):
yield yield
@self.actions.add(".notify_ui", 0)
def _notify_ui(agent, term, intention):
pass
async def _send_to_llm(self, text: str, norms: str, goals: str): async def _send_to_llm(self, text: str, norms: str, goals: str):
""" """
Sends a text query to the LLM agent asynchronously. Sends a text query to the LLM agent asynchronously.

View File

@@ -42,6 +42,16 @@ class BDIProgramManager(BaseAgent):
def _initialize_internal_state(self, program: Program): def _initialize_internal_state(self, program: Program):
self._program = program self._program = program
self._phase = program.phases[0] # start in first phase self._phase = program.phases[0] # start in first phase
self._goal_mapping: dict[str, Goal] = {}
for phase in program.phases:
for goal in phase.goals:
self._populate_goal_mapping_with_goal(goal)
def _populate_goal_mapping_with_goal(self, goal: Goal):
self._goal_mapping[str(goal.id)] = goal
for step in goal.plan.steps:
if isinstance(step, Goal):
self._populate_goal_mapping_with_goal(step)
async def _create_agentspeak_and_send_to_bdi(self, program: Program): async def _create_agentspeak_and_send_to_bdi(self, program: Program):
""" """
@@ -73,12 +83,29 @@ class BDIProgramManager(BaseAgent):
phases = json.loads(msg.body) phases = json.loads(msg.body)
await self._transition_phase(phases["old"], phases["new"]) await self._transition_phase(phases["old"], phases["new"])
case "achieve_goal":
goal_id = msg.body
await self._send_achieved_goal_to_semantic_belief_extractor(goal_id)
async def _transition_phase(self, old: str, new: str): async def _transition_phase(self, old: str, new: str):
assert old == str(self._phase.id) if old != str(self._phase.id):
self.logger.warning(
f"Phase transition desync detected! ASL requested move from '{old}', "
f"but Python is currently in '{self._phase.id}'. Request ignored."
)
return
if new == "end": if new == "end":
self._phase = None self._phase = None
# Notify user interaction agent
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
thread="transition_phase",
body="end",
)
self.logger.info("Transitioned to end phase, notifying UserInterruptAgent.")
self.add_behavior(self.send(msg))
return return
for phase in self._program.phases: for phase in self._program.phases:
@@ -94,6 +121,7 @@ class BDIProgramManager(BaseAgent):
thread="transition_phase", thread="transition_phase",
body=str(self._phase.id), body=str(self._phase.id),
) )
self.logger.info(f"Transitioned to phase {new}, notifying UserInterruptAgent.")
self.add_behavior(self.send(msg)) self.add_behavior(self.send(msg))
@@ -132,6 +160,19 @@ class BDIProgramManager(BaseAgent):
await self.send(message) await self.send(message)
@staticmethod
def _extract_goals_from_goal(goal: Goal) -> list[Goal]:
"""
Extract all goals from a given goal, that is: the goal itself and any subgoals.
:return: All goals within and including the given goal.
"""
goals: list[Goal] = [goal]
for plan in goal.plan:
if isinstance(plan, Goal):
goals.extend(BDIProgramManager._extract_goals_from_goal(plan))
return goals
def _extract_current_goals(self) -> list[Goal]: def _extract_current_goals(self) -> list[Goal]:
""" """
Extract all goals from the program, including subgoals. Extract all goals from the program, including subgoals.
@@ -140,15 +181,8 @@ class BDIProgramManager(BaseAgent):
""" """
goals: list[Goal] = [] goals: list[Goal] = []
def extract_goals_from_goal(goal_: Goal) -> list[Goal]:
goals_: list[Goal] = [goal]
for plan in goal_.plan:
if isinstance(plan, Goal):
goals_.extend(extract_goals_from_goal(plan))
return goals_
for goal in self._phase.goals: for goal in self._phase.goals:
goals.extend(extract_goals_from_goal(goal)) goals.extend(self._extract_goals_from_goal(goal))
return goals return goals
@@ -167,6 +201,25 @@ class BDIProgramManager(BaseAgent):
await self.send(message) await self.send(message)
async def _send_achieved_goal_to_semantic_belief_extractor(self, achieved_goal_id: str):
"""
Inform the semantic belief extractor when a goal is marked achieved.
:param achieved_goal_id: The id of the achieved goal.
"""
goal = self._goal_mapping.get(achieved_goal_id)
if goal is None:
self.logger.debug(f"Goal with ID {achieved_goal_id} marked achieved but was not found.")
return
goals = self._extract_goals_from_goal(goal)
message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
body=GoalList(goals=goals).model_dump_json(),
thread="achieved_goals",
)
await self.send(message)
async def _send_clear_llm_history(self): async def _send_clear_llm_history(self):
""" """
Clear the LLM Agent's conversation history. Clear the LLM Agent's conversation history.
@@ -206,7 +259,7 @@ class BDIProgramManager(BaseAgent):
continue continue
self._initialize_internal_state(program) self._initialize_internal_state(program)
await self._send_program_to_user_interrupt(program)
await self._send_clear_llm_history() await self._send_clear_llm_history()
await asyncio.gather( await asyncio.gather(
@@ -215,13 +268,30 @@ class BDIProgramManager(BaseAgent):
self._send_goals_to_semantic_belief_extractor(), self._send_goals_to_semantic_belief_extractor(),
) )
async def _send_program_to_user_interrupt(self, program: Program):
"""
Send the received program to the User Interrupt Agent.
:param program: The program object received from the API.
"""
msg = InternalMessage(
sender=self.name,
to=settings.agent_settings.user_interrupt_name,
body=program.model_dump_json(),
thread="new_program",
)
await self.send(msg)
async def setup(self): async def setup(self):
""" """
Initialize the agent. Initialize the agent.
Connects the internal ZMQ SUB socket and subscribes to the 'program' topic. Connects the internal ZMQ SUB socket and subscribes to the 'program' topic.
Starts the background behavior to receive programs. Starts the background behavior to receive programs. Initializes a default program.
""" """
await self._create_agentspeak_and_send_to_bdi(Program(phases=[]))
context = Context.instance() context = Context.instance()
self.sub_socket = context.socket(zmq.SUB) self.sub_socket = context.socket(zmq.SUB)

View File

@@ -1,6 +1,34 @@
norms(""). phase("end").
keyword_said(Keyword) :- (user_said(Message) & .substring(Keyword, Message, Pos)) & (Pos >= 0).
+user_said(Message) : norms(Norms) <-
.notify_user_said(Message); +!reply_with_goal(Goal)
-user_said(Message); : user_said(Message)
<- +responded_this_turn;
.findall(Norm, norm(Norm), Norms);
.reply_with_goal(Message, Norms, Goal).
+!say(Text)
<- +responded_this_turn;
.say(Text).
+!reply
: user_said(Message)
<- +responded_this_turn;
.findall(Norm, norm(Norm), Norms);
.reply(Message, Norms). .reply(Message, Norms).
+!notify_cycle
<- .notify_ui;
.wait(1).
+user_said(Message)
: phase("end")
<- .notify_user_said(Message);
!reply.
+!check_triggers
<- true.
+!transition_phase
<- true.

View File

@@ -12,7 +12,7 @@ from control_backend.schemas.belief_list import BeliefList, GoalList
from control_backend.schemas.belief_message import Belief as InternalBelief from control_backend.schemas.belief_message import Belief as InternalBelief
from control_backend.schemas.belief_message import BeliefMessage from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.chat_history import ChatHistory, ChatMessage from control_backend.schemas.chat_history import ChatHistory, ChatMessage
from control_backend.schemas.program import Goal, SemanticBelief from control_backend.schemas.program import BaseGoal, SemanticBelief
type JSONLike = None | bool | int | float | str | list["JSONLike"] | dict[str, "JSONLike"] type JSONLike = None | bool | int | float | str | list["JSONLike"] | dict[str, "JSONLike"]
@@ -62,6 +62,7 @@ class TextBeliefExtractorAgent(BaseAgent):
self.goal_inferrer = GoalAchievementInferrer(self._llm) self.goal_inferrer = GoalAchievementInferrer(self._llm)
self._current_beliefs = BeliefState() self._current_beliefs = BeliefState()
self._current_goal_completions: dict[str, bool] = {} self._current_goal_completions: dict[str, bool] = {}
self._force_completed_goals: set[BaseGoal] = set()
self.conversation = ChatHistory(messages=[]) self.conversation = ChatHistory(messages=[])
async def setup(self): async def setup(self):
@@ -118,13 +119,15 @@ class TextBeliefExtractorAgent(BaseAgent):
case "goals": case "goals":
self._handle_goals_message(msg) self._handle_goals_message(msg)
await self._infer_goal_completions() await self._infer_goal_completions()
case "achieved_goals":
self._handle_goal_achieved_message(msg)
case "conversation_history": case "conversation_history":
if msg.body == "reset": if msg.body == "reset":
self._reset() self._reset_phase()
case _: case _:
self.logger.warning("Received unexpected message from %s", msg.sender) self.logger.warning("Received unexpected message from %s", msg.sender)
def _reset(self): def _reset_phase(self):
self.conversation = ChatHistory(messages=[]) self.conversation = ChatHistory(messages=[])
self.belief_inferrer.available_beliefs.clear() self.belief_inferrer.available_beliefs.clear()
self._current_beliefs = BeliefState() self._current_beliefs = BeliefState()
@@ -158,7 +161,8 @@ class TextBeliefExtractorAgent(BaseAgent):
return return
# Use only goals that can fail, as the others are always assumed to be completed # Use only goals that can fail, as the others are always assumed to be completed
available_goals = [g for g in goals_list.goals if g.can_fail] available_goals = {g for g in goals_list.goals if g.can_fail}
available_goals -= self._force_completed_goals
self.goal_inferrer.goals = available_goals self.goal_inferrer.goals = available_goals
self.logger.debug( self.logger.debug(
"Received %d failable goals from the program manager: %s", "Received %d failable goals from the program manager: %s",
@@ -166,6 +170,23 @@ class TextBeliefExtractorAgent(BaseAgent):
", ".join(g.name for g in available_goals), ", ".join(g.name for g in available_goals),
) )
def _handle_goal_achieved_message(self, msg: InternalMessage):
# NOTE: When goals can be marked unachieved, remember to re-add them to the goal_inferrer
try:
goals_list = GoalList.model_validate_json(msg.body)
except ValidationError:
self.logger.warning(
"Received goal achieved message from the program manager, "
"but it is not a valid list of goals."
)
return
for goal in goals_list.goals:
self._force_completed_goals.add(goal)
self._current_goal_completions[f"achieved_{AgentSpeakGenerator.slugify(goal)}"] = True
self.goal_inferrer.goals -= self._force_completed_goals
async def _user_said(self, text: str): async def _user_said(self, text: str):
""" """
Create a belief for the user's full speech. Create a belief for the user's full speech.
@@ -445,7 +466,7 @@ Respond with a JSON similar to the following, but with the property names as giv
class GoalAchievementInferrer(SemanticBeliefInferrer): class GoalAchievementInferrer(SemanticBeliefInferrer):
def __init__(self, llm: TextBeliefExtractorAgent.LLM): def __init__(self, llm: TextBeliefExtractorAgent.LLM):
super().__init__(llm) super().__init__(llm)
self.goals = [] self.goals: set[BaseGoal] = set()
async def infer_from_conversation(self, conversation: ChatHistory) -> dict[str, bool]: async def infer_from_conversation(self, conversation: ChatHistory) -> dict[str, bool]:
""" """
@@ -465,7 +486,7 @@ class GoalAchievementInferrer(SemanticBeliefInferrer):
for goal, achieved in zip(self.goals, goals_achieved, strict=True) for goal, achieved in zip(self.goals, goals_achieved, strict=True)
} }
async def _infer_goal(self, conversation: ChatHistory, goal: Goal) -> bool: async def _infer_goal(self, conversation: ChatHistory, goal: BaseGoal) -> bool:
prompt = f"""{self._format_conversation(conversation)} prompt = f"""{self._format_conversation(conversation)}
Given the above conversation, what has the following goal been achieved? Given the above conversation, what has the following goal been achieved?

View File

@@ -3,11 +3,14 @@ import json
import zmq import zmq
import zmq.asyncio as azmq import zmq.asyncio as azmq
from pydantic import ValidationError
from zmq.asyncio import Context from zmq.asyncio import Context
from control_backend.agents import BaseAgent from control_backend.agents import BaseAgent
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.internal_message import InternalMessage
from control_backend.schemas.ri_message import PauseCommand
from ..actuation.robot_speech_agent import RobotSpeechAgent from ..actuation.robot_speech_agent import RobotSpeechAgent
from ..perception import VADAgent from ..perception import VADAgent
@@ -47,6 +50,8 @@ class RICommunicationAgent(BaseAgent):
self._req_socket: azmq.Socket | None = None self._req_socket: azmq.Socket | None = None
self.pub_socket: azmq.Socket | None = None self.pub_socket: azmq.Socket | None = None
self.connected = False self.connected = False
self.gesture_agent: RobotGestureAgent | None = None
self.speech_agent: RobotSpeechAgent | None = None
async def setup(self): async def setup(self):
""" """
@@ -140,6 +145,7 @@ class RICommunicationAgent(BaseAgent):
# At this point, we have a valid response # At this point, we have a valid response
try: try:
self.logger.debug("Negotiation successful. Handling rn")
await self._handle_negotiation_response(received_message) await self._handle_negotiation_response(received_message)
# Let UI know that we're connected # Let UI know that we're connected
topic = b"ping" topic = b"ping"
@@ -188,6 +194,7 @@ class RICommunicationAgent(BaseAgent):
address=addr, address=addr,
bind=bind, bind=bind,
) )
self.speech_agent = robot_speech_agent
robot_gesture_agent = RobotGestureAgent( robot_gesture_agent = RobotGestureAgent(
settings.agent_settings.robot_gesture_name, settings.agent_settings.robot_gesture_name,
address=addr, address=addr,
@@ -195,6 +202,7 @@ class RICommunicationAgent(BaseAgent):
gesture_data=gesture_data, gesture_data=gesture_data,
single_gesture_data=single_gesture_data, single_gesture_data=single_gesture_data,
) )
self.gesture_agent = robot_gesture_agent
await robot_speech_agent.start() await robot_speech_agent.start()
await asyncio.sleep(0.1) # Small delay await asyncio.sleep(0.1) # Small delay
await robot_gesture_agent.start() await robot_gesture_agent.start()
@@ -225,6 +233,7 @@ class RICommunicationAgent(BaseAgent):
while self._running: while self._running:
if not self.connected: if not self.connected:
await asyncio.sleep(settings.behaviour_settings.sleep_s) await asyncio.sleep(settings.behaviour_settings.sleep_s)
self.logger.debug("Not connected, skipping ping loop iteration.")
continue continue
# We need to listen and send pings. # We need to listen and send pings.
@@ -289,13 +298,33 @@ class RICommunicationAgent(BaseAgent):
# Tell UI we're disconnected. # Tell UI we're disconnected.
topic = b"ping" topic = b"ping"
data = json.dumps(False).encode() data = json.dumps(False).encode()
self.logger.debug("1")
if self.pub_socket: if self.pub_socket:
try: try:
self.logger.debug("2")
await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5) await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5)
except TimeoutError: except TimeoutError:
self.logger.debug("3")
self.logger.warning("Connection ping for router timed out.") self.logger.warning("Connection ping for router timed out.")
# Try to reboot/renegotiate # Try to reboot/renegotiate
if self.gesture_agent is not None:
await self.gesture_agent.stop()
if self.speech_agent is not None:
await self.speech_agent.stop()
if self.pub_socket is not None:
self.pub_socket.close()
self.logger.debug("Restarting communication negotiation.") self.logger.debug("Restarting communication negotiation.")
if await self._negotiate_connection(max_retries=1): if await self._negotiate_connection(max_retries=2):
self.connected = True self.connected = True
async def handle_message(self, msg: InternalMessage):
try:
pause_command = PauseCommand.model_validate_json(msg.body)
self._req_socket.send_json(pause_command.model_dump())
self.logger.debug(self._req_socket.recv_json())
except ValidationError:
self.logger.warning("Incorrect message format for PauseCommand.")

View File

@@ -7,6 +7,7 @@ import zmq.asyncio as azmq
from control_backend.agents import BaseAgent from control_backend.agents import BaseAgent
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.internal_message import InternalMessage
from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus
from .transcription_agent.transcription_agent import TranscriptionAgent from .transcription_agent.transcription_agent import TranscriptionAgent
@@ -86,6 +87,12 @@ class VADAgent(BaseAgent):
self.audio_buffer = np.array([], dtype=np.float32) self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
self._ready = asyncio.Event() self._ready = asyncio.Event()
# Pause control
self._reset_needed = False
self._paused = asyncio.Event()
self._paused.set() # Not paused at start
self.model = None self.model = None
async def setup(self): async def setup(self):
@@ -213,6 +220,16 @@ class VADAgent(BaseAgent):
""" """
await self._ready.wait() await self._ready.wait()
while self._running: while self._running:
await self._paused.wait()
# After being unpaused, reset stream and buffers
if self._reset_needed:
self.logger.debug("Resuming: resetting stream and buffers.")
await self._reset_stream()
self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
self._reset_needed = False
assert self.audio_in_poller is not None assert self.audio_in_poller is not None
data = await self.audio_in_poller.poll() data = await self.audio_in_poller.poll()
if data is None: if data is None:
@@ -252,7 +269,30 @@ class VADAgent(BaseAgent):
assert self.audio_out_socket is not None assert self.audio_out_socket is not None
await self.audio_out_socket.send(self.audio_buffer[: -2 * len(chunk)].tobytes()) await self.audio_out_socket.send(self.audio_buffer[: -2 * len(chunk)].tobytes())
# At this point, we know that there is no speech. # At this point, we know that the speech has ended.
# Prepend the last few chunks that had no speech, for a more fluent boundary. # Prepend the last chunk that had no speech, for a more fluent boundary
self.audio_buffer = np.append(self.audio_buffer, chunk) self.audio_buffer = chunk
self.audio_buffer = self.audio_buffer[-begin_silence_length * len(chunk) :]
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming messages.
Expects messages to pause or resume the VAD processing from User Interrupt Agent.
:param msg: The received internal message.
"""
sender = msg.sender
if sender == settings.agent_settings.user_interrupt_name:
if msg.body == "PAUSE":
self.logger.info("Pausing VAD processing.")
self._paused.clear()
# If the robot needs to pick up speaking where it left off, do not set _reset_needed
self._reset_needed = True
elif msg.body == "RESUME":
self.logger.info("Resuming VAD processing.")
self._paused.set()
else:
self.logger.warning(f"Unknown command from User Interrupt Agent: {msg.body}")
else:
self.logger.debug(f"Ignoring message from unknown sender: {sender}")

View File

@@ -4,9 +4,17 @@ import zmq
from zmq.asyncio import Context from zmq.asyncio import Context
from control_backend.agents import BaseAgent from control_backend.agents import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.agent_system import InternalMessage from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand from control_backend.schemas.belief_message import Belief, BeliefMessage
from control_backend.schemas.program import ConditionalNorm, Program
from control_backend.schemas.ri_message import (
GestureCommand,
PauseCommand,
RIEndpoint,
SpeechCommand,
)
class UserInterruptAgent(BaseAgent): class UserInterruptAgent(BaseAgent):
@@ -18,18 +26,45 @@ class UserInterruptAgent(BaseAgent):
- Send a prioritized message to the `RobotSpeechAgent` - Send a prioritized message to the `RobotSpeechAgent`
- Send a prioritized gesture to the `RobotGestureAgent` - Send a prioritized gesture to the `RobotGestureAgent`
- Send a belief override to the `BDIProgramManager`in order to activate a - Send a belief override to the `BDI Core` in order to activate a
trigger/conditional norm or complete a goal. trigger/conditional norm or complete a goal.
Prioritized actions clear the current RI queue before inserting the new item, Prioritized actions clear the current RI queue before inserting the new item,
ensuring they are executed immediately after Pepper's current action has been fulfilled. ensuring they are executed immediately after Pepper's current action has been fulfilled.
:ivar sub_socket: The ZMQ SUB socket used to receive user intterupts. :ivar sub_socket: The ZMQ SUB socket used to receive user interrupts.
""" """
def __init__(self, **kwargs): def __init__(self, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self.sub_socket = None self.sub_socket = None
self.pub_socket = None
self._trigger_map = {}
self._trigger_reverse_map = {}
self._goal_map = {} # id -> sluggified goal
self._goal_reverse_map = {} # sluggified goal -> id
self._cond_norm_map = {} # id -> sluggified cond norm
self._cond_norm_reverse_map = {} # sluggified cond norm -> id
async def setup(self):
"""
Initialize the agent.
Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic.
Starts the background behavior to receive the user interrupts.
"""
context = Context.instance()
self.sub_socket = context.socket(zmq.SUB)
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
self.sub_socket.subscribe("button_pressed")
self.pub_socket = context.socket(zmq.PUB)
self.pub_socket.connect(settings.zmq_settings.internal_pub_address)
self.add_behavior(self._receive_button_event())
async def _receive_button_event(self): async def _receive_button_event(self):
""" """
@@ -40,7 +75,11 @@ class UserInterruptAgent(BaseAgent):
These are the different types and contexts: These are the different types and contexts:
- type: "speech", context: string that the robot has to say. - type: "speech", context: string that the robot has to say.
- type: "gesture", context: single gesture name that the robot has to perform. - type: "gesture", context: single gesture name that the robot has to perform.
- type: "override", context: belief_id that overrides the goal/trigger/conditional norm. - type: "override", context: id that belongs to the goal/trigger/conditional norm.
- type: "override_unachieve", context: id that belongs to the conditional norm to unachieve.
- type: "next_phase", context: None, indicates to the BDI Core to
- type: "pause", context: boolean indicating whether to pause
- type: "reset_phase", context: None, indicates to the BDI Core to
""" """
while True: while True:
topic, body = await self.sub_socket.recv_multipart() topic, body = await self.sub_socket.recv_multipart()
@@ -53,31 +92,201 @@ class UserInterruptAgent(BaseAgent):
self.logger.error("Received invalid JSON payload on topic %s", topic) self.logger.error("Received invalid JSON payload on topic %s", topic)
continue continue
if event_type == "speech": self.logger.debug("Received event type %s", event_type)
match event_type:
case "speech":
await self._send_to_speech_agent(event_context) await self._send_to_speech_agent(event_context)
self.logger.info( self.logger.info(
"Forwarded button press (speech) with context '%s' to RobotSpeechAgent.", "Forwarded button press (speech) with context '%s' to RobotSpeechAgent.",
event_context, event_context,
) )
elif event_type == "gesture": case "gesture":
await self._send_to_gesture_agent(event_context) await self._send_to_gesture_agent(event_context)
self.logger.info( self.logger.info(
"Forwarded button press (gesture) with context '%s' to RobotGestureAgent.", "Forwarded button press (gesture) with context '%s' to RobotGestureAgent.",
event_context, event_context,
) )
elif event_type == "override": case "override":
await self._send_to_program_manager(event_context) ui_id = str(event_context)
if asl_trigger := self._trigger_map.get(ui_id):
await self._send_to_bdi("force_trigger", asl_trigger)
self.logger.info( self.logger.info(
"Forwarded button press (override) with context '%s' to BDIProgramManager.", "Forwarded button press (override) with context '%s' to BDI Core.",
event_context,
)
elif asl_cond_norm := self._cond_norm_map.get(ui_id):
await self._send_to_bdi_belief(asl_cond_norm, "cond_norm")
self.logger.info(
"Forwarded button press (override) with context '%s' to BDI Core.",
event_context,
)
elif asl_goal := self._goal_map.get(ui_id):
await self._send_to_bdi_belief(asl_goal, "goal")
self.logger.info(
"Forwarded button press (override) with context '%s' to BDI Core.",
event_context,
)
# Send achieve_goal to program manager to update semantic belief extractor
goal_achieve_msg = InternalMessage(
to=settings.agent_settings.bdi_program_manager_name,
thread="achieve_goal",
body=ui_id,
)
await self.send(goal_achieve_msg)
else:
self.logger.warning("Could not determine which element to override.")
case "override_unachieve":
ui_id = str(event_context)
if asl_cond_norm := self._cond_norm_map.get(ui_id):
await self._send_to_bdi_belief(asl_cond_norm, "cond_norm", True)
self.logger.info(
"Forwarded button press (override_unachieve)"
"with context '%s' to BDI Core.",
event_context, event_context,
) )
else: else:
self.logger.warning(
"Could not determine which conditional norm to unachieve."
)
case "pause":
self.logger.debug(
"Received pause/resume button press with context '%s'.", event_context
)
await self._send_pause_command(event_context)
if event_context:
self.logger.info("Sent pause command.")
else:
self.logger.info("Sent resume command.")
case "next_phase" | "reset_phase":
await self._send_experiment_control_to_bdi_core(event_type)
case _:
self.logger.warning( self.logger.warning(
"Received button press with unknown type '%s' (context: '%s').", "Received button press with unknown type '%s' (context: '%s').",
event_type, event_type,
event_context, event_context,
) )
async def handle_message(self, msg: InternalMessage):
"""
Handle commands received from other internal Python agents.
"""
match msg.thread:
case "new_program":
self._create_mapping(msg.body)
case "trigger_start":
# msg.body is the sluggified trigger
asl_slug = msg.body
ui_id = self._trigger_reverse_map.get(asl_slug)
if ui_id:
payload = {"type": "trigger_update", "id": ui_id, "achieved": True}
await self._send_experiment_update(payload)
self.logger.info(f"UI Update: Trigger {asl_slug} started (ID: {ui_id})")
case "trigger_end":
asl_slug = msg.body
ui_id = self._trigger_reverse_map.get(asl_slug)
if ui_id:
payload = {"type": "trigger_update", "id": ui_id, "achieved": False}
await self._send_experiment_update(payload)
self.logger.info(f"UI Update: Trigger {asl_slug} ended (ID: {ui_id})")
case "transition_phase":
new_phase_id = msg.body
self.logger.info(f"Phase transition detected: {new_phase_id}")
payload = {"type": "phase_update", "id": new_phase_id}
await self._send_experiment_update(payload)
case "goal_start":
goal_name = msg.body
ui_id = self._goal_reverse_map.get(goal_name)
if ui_id:
payload = {"type": "goal_update", "id": ui_id}
await self._send_experiment_update(payload)
self.logger.info(f"UI Update: Goal {goal_name} started (ID: {ui_id})")
case "active_norms_update":
active_norms_asl = [
s.strip("() '\",") for s in msg.body.split(",") if s.strip("() '\",")
]
await self._broadcast_cond_norms(active_norms_asl)
case _:
self.logger.debug(f"Received internal message on unhandled thread: {msg.thread}")
async def _broadcast_cond_norms(self, active_slugs: list[str]):
"""
Sends the current state of all conditional norms to the UI.
:param active_slugs: A list of slugs (strings) currently active in the BDI core.
"""
updates = []
for asl_slug, ui_id in self._cond_norm_reverse_map.items():
is_active = asl_slug in active_slugs
updates.append({"id": ui_id, "active": is_active})
payload = {"type": "cond_norms_state_update", "norms": updates}
if self.pub_socket:
topic = b"status"
body = json.dumps(payload).encode("utf-8")
await self.pub_socket.send_multipart([topic, body])
# self.logger.info(f"UI Update: Active norms {updates}")
def _create_mapping(self, program_json: str):
"""
Create mappings between UI IDs and ASL slugs for triggers, goals, and conditional norms
"""
try:
program = Program.model_validate_json(program_json)
self._trigger_map = {}
self._trigger_reverse_map = {}
self._goal_map = {}
self._cond_norm_map = {}
self._cond_norm_reverse_map = {}
for phase in program.phases:
for trigger in phase.triggers:
slug = AgentSpeakGenerator.slugify(trigger)
self._trigger_map[str(trigger.id)] = slug
self._trigger_reverse_map[slug] = str(trigger.id)
for goal in phase.goals:
self._goal_map[str(goal.id)] = AgentSpeakGenerator.slugify(goal)
self._goal_reverse_map[AgentSpeakGenerator.slugify(goal)] = str(goal.id)
for goal, id in self._goal_reverse_map.items():
self.logger.debug(f"Goal mapping: UI ID {goal} -> {id}")
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
asl_slug = AgentSpeakGenerator.slugify(norm)
norm_id = str(norm.id)
self._cond_norm_map[norm_id] = asl_slug
self._cond_norm_reverse_map[norm.norm] = norm_id
self.logger.debug("Added conditional norm %s", asl_slug)
self.logger.info(
f"Mapped {len(self._trigger_map)} triggers and {len(self._goal_map)} goals "
f"and {len(self._cond_norm_map)} conditional norms for UserInterruptAgent."
)
except Exception as e:
self.logger.error(f"Mapping failed: {e}")
async def _send_experiment_update(self, data, should_log: bool = True):
"""
Sends an update to the 'experiment' topic.
The SSE endpoint will pick this up and push it to the UI.
"""
if self.pub_socket:
topic = b"experiment"
body = json.dumps(data).encode("utf-8")
await self.pub_socket.send_multipart([topic, body])
if should_log:
self.logger.debug(f"Sent experiment update: {data}")
async def _send_to_speech_agent(self, text_to_say: str): async def _send_to_speech_agent(self, text_to_say: str):
""" """
method to send prioritized speech command to RobotSpeechAgent. method to send prioritized speech command to RobotSpeechAgent.
@@ -109,38 +318,89 @@ class UserInterruptAgent(BaseAgent):
) )
await self.send(out_msg) await self.send(out_msg)
async def _send_to_program_manager(self, belief_id: str): async def _send_to_bdi(self, thread: str, body: str):
""" """Send slug of trigger to BDI"""
Send a button_override belief to the BDIProgramManager. msg = InternalMessage(to=settings.agent_settings.bdi_core_name, thread=thread, body=body)
await self.send(msg)
self.logger.info(f"Directly forced {thread} in BDI: {body}")
:param belief_id: The belief_id that overrides the goal/trigger/conditional norm. async def _send_to_bdi_belief(self, asl: str, asl_type: str, unachieve: bool = False):
this id can belong to a basic belief or an inferred belief. """Send belief to BDI Core"""
See also: https://utrechtuniversity.youtrack.cloud/articles/N25B-A-27/UI-components if asl_type == "goal":
belief_name = f"achieved_{asl}"
elif asl_type == "cond_norm":
belief_name = f"force_{asl}"
else:
self.logger.warning("Tried to send belief with unknown type")
belief = Belief(name=belief_name, arguments=None)
self.logger.debug(f"Sending belief to BDI Core: {belief_name}")
# Conditional norms are unachieved by removing the belief
belief_message = (
BeliefMessage(delete=[belief]) if unachieve else BeliefMessage(create=[belief])
)
msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
thread="beliefs",
body=belief_message.model_dump_json(),
)
await self.send(msg)
async def _send_experiment_control_to_bdi_core(self, type):
""" """
data = {"belief": belief_id} method to send experiment control buttons to bdi core.
message = InternalMessage(
to=settings.agent_settings.bdi_program_manager_name, :param type: the type of control button we should send to the bdi core.
"""
# Switch which thread we should send to bdi core
thread = ""
match type:
case "next_phase":
thread = "force_next_phase"
case "reset_phase":
thread = "reset_current_phase"
case _:
self.logger.warning(
"Received unknown experiment control type '%s' to send to BDI Core.",
type,
)
out_msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name, sender=self.name,
body=json.dumps(data), thread=thread,
thread="belief_override_id", body="",
)
self.logger.debug("Sending experiment control '%s' to BDI Core.", thread)
await self.send(out_msg)
async def _send_pause_command(self, pause):
"""
Send a pause command to the Robot Interface via the RI Communication Agent.
Send a pause command to the other internal agents; for now just VAD agent.
"""
cmd = PauseCommand(data=pause)
message = InternalMessage(
to=settings.agent_settings.ri_communication_name,
sender=self.name,
body=cmd.model_dump_json(),
) )
await self.send(message) await self.send(message)
self.logger.info(
"Sent button_override belief with id '%s' to Program manager.", if pause == "true":
belief_id, # Send pause to VAD agent
vad_message = InternalMessage(
to=settings.agent_settings.vad_name,
sender=self.name,
body="PAUSE",
) )
await self.send(vad_message)
async def setup(self): self.logger.info("Sent pause command to VAD Agent and RI Communication Agent.")
""" else:
Initialize the agent. # Send resume to VAD agent
vad_message = InternalMessage(
Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic. to=settings.agent_settings.vad_name,
Starts the background behavior to receive the user interrupts. sender=self.name,
""" body="RESUME",
context = Context.instance() )
await self.send(vad_message)
self.sub_socket = context.socket(zmq.SUB) self.logger.info("Sent resume command to VAD Agent and RI Communication Agent.")
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
self.sub_socket.subscribe("button_pressed")
self.add_behavior(self._receive_button_event())

View File

@@ -1,31 +0,0 @@
import logging
from fastapi import APIRouter, Request
from control_backend.schemas.events import ButtonPressedEvent
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/button_pressed", status_code=202)
async def receive_button_event(event: ButtonPressedEvent, request: Request):
"""
Endpoint to handle external button press events.
Validates the event payload and publishes it to the internal 'button_pressed' topic.
Subscribers (in this case user_interrupt_agent) will pick this up to trigger
specific behaviors or state changes.
:param event: The parsed ButtonPressedEvent object.
:param request: The FastAPI request object.
"""
logger.debug("Received button event: %s | %s", event.type, event.context)
topic = b"button_pressed"
body = event.model_dump_json().encode()
pub_socket = request.app.state.endpoints_pub_socket
await pub_socket.send_multipart([topic, body])
return {"status": "Event received"}

View File

@@ -137,7 +137,6 @@ async def ping_stream(request: Request):
logger.info("Client disconnected from SSE") logger.info("Client disconnected from SSE")
break break
logger.debug(f"Yielded new connection event in robot ping router: {str(connected)}")
connectedJson = json.dumps(connected) connectedJson = json.dumps(connected)
yield (f"data: {connectedJson}\n\n") yield (f"data: {connectedJson}\n\n")

View File

@@ -0,0 +1,94 @@
import asyncio
import logging
import zmq
import zmq.asyncio
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
from zmq.asyncio import Context
from control_backend.core.config import settings
from control_backend.schemas.events import ButtonPressedEvent
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/button_pressed", status_code=202)
async def receive_button_event(event: ButtonPressedEvent, request: Request):
"""
Endpoint to handle external button press events.
Validates the event payload and publishes it to the internal 'button_pressed' topic.
Subscribers (in this case user_interrupt_agent) will pick this up to trigger
specific behaviors or state changes.
:param event: The parsed ButtonPressedEvent object.
:param request: The FastAPI request object.
"""
logger.debug("Received button event: %s | %s", event.type, event.context)
topic = b"button_pressed"
body = event.model_dump_json().encode()
pub_socket = request.app.state.endpoints_pub_socket
await pub_socket.send_multipart([topic, body])
return {"status": "Event received"}
@router.get("/experiment_stream")
async def experiment_stream(request: Request):
# Use the asyncio-compatible context
context = Context.instance()
socket = context.socket(zmq.SUB)
# Connect and subscribe
socket.connect(settings.zmq_settings.internal_sub_address)
socket.subscribe(b"experiment")
async def gen():
try:
while True:
# Check if client closed the tab
if await request.is_disconnected():
logger.error("Client disconnected from experiment stream.")
break
try:
parts = await asyncio.wait_for(socket.recv_multipart(), timeout=10.0)
_, message = parts
yield f"data: {message.decode().strip()}\n\n"
except TimeoutError:
continue
finally:
socket.close()
return StreamingResponse(gen(), media_type="text/event-stream")
@router.get("/status_stream")
async def status_stream(request: Request):
context = Context.instance()
socket = context.socket(zmq.SUB)
socket.connect(settings.zmq_settings.internal_sub_address)
socket.subscribe(b"status")
async def gen():
try:
while True:
if await request.is_disconnected():
break
try:
# Shorter timeout since this is frequent
parts = await asyncio.wait_for(socket.recv_multipart(), timeout=0.5)
_, message = parts
yield f"data: {message.decode().strip()}\n\n"
except TimeoutError:
yield ": ping\n\n" # Keep the connection alive
continue
finally:
socket.close()
return StreamingResponse(gen(), media_type="text/event-stream")

View File

@@ -1,6 +1,6 @@
from fastapi.routing import APIRouter from fastapi.routing import APIRouter
from control_backend.api.v1.endpoints import button_pressed, logs, message, program, robot, sse from control_backend.api.v1.endpoints import logs, message, program, robot, sse, user_interact
api_router = APIRouter() api_router = APIRouter()
@@ -14,4 +14,4 @@ api_router.include_router(logs.router, tags=["Logs"])
api_router.include_router(program.router, tags=["Program"]) api_router.include_router(program.router, tags=["Program"])
api_router.include_router(button_pressed.router, tags=["Button Pressed Events"]) api_router.include_router(user_interact.router, tags=["Button Pressed Events"])

View File

@@ -60,6 +60,9 @@ class BaseAgent(ABC):
self._tasks: set[asyncio.Task] = set() self._tasks: set[asyncio.Task] = set()
self._running = False self._running = False
self._internal_pub_socket: None | azmq.Socket = None
self._internal_sub_socket: None | azmq.Socket = None
# Register immediately # Register immediately
AgentDirectory.register(name, self) AgentDirectory.register(name, self)
@@ -117,7 +120,7 @@ class BaseAgent(ABC):
task.cancel() task.cancel()
self.logger.info(f"Agent {self.name} stopped") self.logger.info(f"Agent {self.name} stopped")
async def send(self, message: InternalMessage): async def send(self, message: InternalMessage, should_log: bool = True):
""" """
Send a message to another agent. Send a message to another agent.
@@ -130,16 +133,25 @@ class BaseAgent(ABC):
:param message: The message to send. :param message: The message to send.
""" """
target = AgentDirectory.get(message.to)
message.sender = self.name message.sender = self.name
to = message.to
receivers = [to] if isinstance(to, str) else to
for receiver in receivers:
target = AgentDirectory.get(receiver)
if target: if target:
await target.inbox.put(message) await target.inbox.put(message)
self.logger.debug(f"Sent message {message.body} to {message.to} via regular inbox.") if should_log:
self.logger.debug(
f"Sent message {message.body} to {message.to} via regular inbox."
)
else: else:
# Apparently target agent is on a different process, send via ZMQ # Apparently target agent is on a different process, send via ZMQ
topic = f"internal/{message.to}".encode() topic = f"internal/{receiver}".encode()
body = message.model_dump_json().encode() body = message.model_dump_json().encode()
await self._internal_pub_socket.send_multipart([topic, body]) await self._internal_pub_socket.send_multipart([topic, body])
if should_log:
self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.") self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.")
async def _process_inbox(self): async def _process_inbox(self):
@@ -150,7 +162,6 @@ class BaseAgent(ABC):
""" """
while self._running: while self._running:
msg = await self.inbox.get() msg = await self.inbox.get()
self.logger.debug(f"Received message from {msg.sender}.")
await self.handle_message(msg) await self.handle_message(msg)
async def _receive_internal_zmq_loop(self): async def _receive_internal_zmq_loop(self):

View File

@@ -1,7 +1,7 @@
from pydantic import BaseModel from pydantic import BaseModel
from control_backend.schemas.program import BaseGoal
from control_backend.schemas.program import Belief as ProgramBelief from control_backend.schemas.program import Belief as ProgramBelief
from control_backend.schemas.program import Goal
class BeliefList(BaseModel): class BeliefList(BaseModel):
@@ -16,4 +16,4 @@ class BeliefList(BaseModel):
class GoalList(BaseModel): class GoalList(BaseModel):
goals: list[Goal] goals: list[BaseGoal]

View File

@@ -11,7 +11,7 @@ class Belief(BaseModel):
""" """
name: str name: str
arguments: list[str] | None arguments: list[str] | None = None
# To make it hashable # To make it hashable
model_config = {"frozen": True} model_config = {"frozen": True}

View File

@@ -1,3 +1,5 @@
from collections.abc import Iterable
from pydantic import BaseModel from pydantic import BaseModel
@@ -11,7 +13,7 @@ class InternalMessage(BaseModel):
:ivar thread: An optional thread identifier/topic to categorize the message (e.g., 'beliefs'). :ivar thread: An optional thread identifier/topic to categorize the message (e.g., 'beliefs').
""" """
to: str to: str | Iterable[str]
sender: str | None = None sender: str | None = None
body: str body: str
thread: str | None = None thread: str | None = None

View File

@@ -15,6 +15,9 @@ class ProgramElement(BaseModel):
name: str name: str
id: UUID4 id: UUID4
# To make program elements hashable
model_config = {"frozen": True}
class LogicalOperator(Enum): class LogicalOperator(Enum):
AND = "AND" AND = "AND"
@@ -105,23 +108,33 @@ class Plan(ProgramElement):
steps: list[PlanElement] steps: list[PlanElement]
class Goal(ProgramElement): class BaseGoal(ProgramElement):
""" """
Represents an objective to be achieved. To reach the goal, we should execute Represents an objective to be achieved. This base version does not include a plan to achieve
the corresponding plan. If we can fail to achieve a goal after executing the plan, this goal, and is used in semantic belief extraction.
for example when the achieving of the goal is dependent on the user's reply, this means
that the achieved status will be set from somewhere else in the program.
:ivar description: A description of the goal, used to determine if it has been achieved. :ivar description: A description of the goal, used to determine if it has been achieved.
:ivar plan: The plan to execute.
:ivar can_fail: Whether we can fail to achieve the goal after executing the plan. :ivar can_fail: Whether we can fail to achieve the goal after executing the plan.
""" """
description: str = "" description: str = ""
plan: Plan
can_fail: bool = True can_fail: bool = True
class Goal(BaseGoal):
"""
Represents an objective to be achieved. To reach the goal, we should execute the corresponding
plan. It inherits from the BaseGoal a variable `can_fail`, which if true will cause the
completion to be determined based on the conversation.
Instances of this goal are not hashable because a plan is not hashable.
:ivar plan: The plan to execute.
"""
plan: Plan
type Action = SpeechAction | GestureAction | LLMAction type Action = SpeechAction | GestureAction | LLMAction

View File

@@ -14,6 +14,7 @@ class RIEndpoint(str, Enum):
GESTURE_TAG = "actuate/gesture/tag" GESTURE_TAG = "actuate/gesture/tag"
PING = "ping" PING = "ping"
NEGOTIATE_PORTS = "negotiate/ports" NEGOTIATE_PORTS = "negotiate/ports"
PAUSE = ""
class RIMessage(BaseModel): class RIMessage(BaseModel):
@@ -64,3 +65,15 @@ class GestureCommand(RIMessage):
if self.endpoint not in allowed: if self.endpoint not in allowed:
raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG") raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG")
return self return self
class PauseCommand(RIMessage):
"""
A specific command to pause or unpause the robot's actions.
:ivar endpoint: Fixed to ``RIEndpoint.PAUSE``.
:ivar data: A boolean indicating whether to pause (True) or unpause (False).
"""
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.PAUSE)
data: bool

View File

@@ -95,7 +95,9 @@ async def test_receive_programs_valid_and_invalid():
assert forwarded.phases[0].goals[0].name == "G1" assert forwarded.phases[0].goals[0].name == "G1"
# Verify history clear was triggered # Verify history clear was triggered
assert manager._send_clear_llm_history.await_count == 1 assert (
manager._send_clear_llm_history.await_count == 2
) # first sends program to UserInterrupt, then clears LLM
@pytest.mark.asyncio @pytest.mark.asyncio

View File

@@ -99,12 +99,75 @@ async def test_send_to_local_agent(monkeypatch):
# Patch inbox.put # Patch inbox.put
target.inbox.put = AsyncMock() target.inbox.put = AsyncMock()
message = InternalMessage(to="receiver", sender="sender", body="hello") message = InternalMessage(to=target.name, sender=sender.name, body="hello")
await sender.send(message) await sender.send(message)
target.inbox.put.assert_awaited_once_with(message) target.inbox.put.assert_awaited_once_with(message)
sender.logger.debug.assert_called_once()
@pytest.mark.asyncio
async def test_send_to_zmq_agent(monkeypatch):
sender = DummyAgent("sender")
target = "remote_receiver"
# Fake logger
sender.logger = MagicMock()
# Fake zmq
sender._internal_pub_socket = AsyncMock()
message = InternalMessage(to=target, sender=sender.name, body="hello")
await sender.send(message)
zmq_calls = sender._internal_pub_socket.send_multipart.call_args[0][0]
assert zmq_calls[0] == f"internal/{target}".encode()
@pytest.mark.asyncio
async def test_send_to_multiple_local_agents(monkeypatch):
sender = DummyAgent("sender")
target1 = DummyAgent("receiver1")
target2 = DummyAgent("receiver2")
# Fake logger
sender.logger = MagicMock()
# Patch inbox.put
target1.inbox.put = AsyncMock()
target2.inbox.put = AsyncMock()
message = InternalMessage(to=[target1.name, target2.name], sender=sender.name, body="hello")
await sender.send(message)
target1.inbox.put.assert_awaited_once_with(message)
target2.inbox.put.assert_awaited_once_with(message)
@pytest.mark.asyncio
async def test_send_to_multiple_agents(monkeypatch):
sender = DummyAgent("sender")
target1 = DummyAgent("receiver1")
target2 = "remote_receiver"
# Fake logger
sender.logger = MagicMock()
# Fake zmq
sender._internal_pub_socket = AsyncMock()
# Patch inbox.put
target1.inbox.put = AsyncMock()
message = InternalMessage(to=[target1.name, target2], sender=sender.name, body="hello")
await sender.send(message)
target1.inbox.put.assert_awaited_once_with(message)
zmq_calls = sender._internal_pub_socket.send_multipart.call_args[0][0]
assert zmq_calls[0] == f"internal/{target2}".encode()
@pytest.mark.asyncio @pytest.mark.asyncio