Merge branch 'feat/reset-experiment-and-phase' into feat/visual-emotion-recognition

This commit is contained in:
Storm
2026-01-19 11:45:31 +01:00
48 changed files with 2108 additions and 564 deletions

View File

@@ -1 +1,5 @@
"""
This package contains all agent implementations for the PepperPlus Control Backend.
"""
from .base import BaseAgent as BaseAgent

View File

@@ -1,2 +1,6 @@
"""
Agents responsible for controlling the robot's physical actions, such as speech and gestures.
"""
from .robot_gesture_agent import RobotGestureAgent as RobotGestureAgent
from .robot_speech_agent import RobotSpeechAgent as RobotSpeechAgent

View File

@@ -1,8 +1,10 @@
"""
Agents and utilities for the BDI (Belief-Desire-Intention) reasoning system,
implementing AgentSpeak(L) logic.
"""
from control_backend.agents.bdi.bdi_core_agent import BDICoreAgent as BDICoreAgent
from .belief_collector_agent import (
BDIBeliefCollectorAgent as BDIBeliefCollectorAgent,
)
from .text_belief_extractor_agent import (
TextBeliefExtractorAgent as TextBeliefExtractorAgent,
)

View File

@@ -77,10 +77,10 @@ class AstTerm(AstExpression, ABC):
return AstBinaryOp(self, BinaryOperatorType.NOT_EQUALS, _coalesce_expr(other))
@dataclass
@dataclass(eq=False)
class AstAtom(AstTerm):
"""
Grounded expression in all lowercase.
Represents a grounded atom in AgentSpeak (e.g., lowercase constants).
"""
value: str
@@ -89,10 +89,10 @@ class AstAtom(AstTerm):
return self.value.lower()
@dataclass
@dataclass(eq=False)
class AstVar(AstTerm):
"""
Ungrounded variable expression. First letter capitalized.
Represents an ungrounded variable in AgentSpeak (e.g., capitalized names).
"""
name: str
@@ -101,24 +101,36 @@ class AstVar(AstTerm):
return self.name.capitalize()
@dataclass
@dataclass(eq=False)
class AstNumber(AstTerm):
"""
Represents a numeric constant in AgentSpeak.
"""
value: int | float
def _to_agentspeak(self) -> str:
return str(self.value)
@dataclass
@dataclass(eq=False)
class AstString(AstTerm):
"""
Represents a string literal in AgentSpeak.
"""
value: str
def _to_agentspeak(self) -> str:
return f'"{self.value}"'
@dataclass
@dataclass(eq=False)
class AstLiteral(AstTerm):
"""
Represents a literal (functor and terms) in AgentSpeak.
"""
functor: str
terms: list[AstTerm] = field(default_factory=list)
@@ -142,6 +154,10 @@ class BinaryOperatorType(StrEnum):
@dataclass
class AstBinaryOp(AstExpression):
"""
Represents a binary logical or relational operation in AgentSpeak.
"""
left: AstExpression
operator: BinaryOperatorType
right: AstExpression
@@ -167,6 +183,10 @@ class AstBinaryOp(AstExpression):
@dataclass
class AstLogicalExpression(AstExpression):
"""
Represents a logical expression, potentially negated, in AgentSpeak.
"""
expression: AstExpression
negated: bool = False
@@ -208,6 +228,10 @@ class AstStatement(AstNode):
@dataclass
class AstRule(AstNode):
"""
Represents an inference rule in AgentSpeak. If there is no condition, it always holds.
"""
result: AstExpression
condition: AstExpression | None = None
@@ -231,6 +255,10 @@ class TriggerType(StrEnum):
@dataclass
class AstPlan(AstNode):
"""
Represents a plan in AgentSpeak, consisting of a trigger, context, and body.
"""
type: TriggerType
trigger_literal: AstExpression
context: list[AstExpression]
@@ -260,6 +288,10 @@ class AstPlan(AstNode):
@dataclass
class AstProgram(AstNode):
"""
Represents a full AgentSpeak program, consisting of rules and plans.
"""
rules: list[AstRule] = field(default_factory=list)
plans: list[AstPlan] = field(default_factory=list)

View File

@@ -40,9 +40,23 @@ from control_backend.schemas.program import (
class AgentSpeakGenerator:
"""
Generator class that translates a high-level :class:`~control_backend.schemas.program.Program`
into AgentSpeak(L) source code.
It handles the conversion of phases, norms, goals, and triggers into AgentSpeak rules and plans,
ensuring the robot follows the defined behavioral logic.
"""
_asp: AstProgram
def generate(self, program: Program) -> str:
"""
Translates a Program object into an AgentSpeak source string.
:param program: The behavior program to translate.
:return: The generated AgentSpeak code as a string.
"""
self._asp = AstProgram()
if program.phases:
@@ -424,6 +438,16 @@ class AgentSpeakGenerator:
)
)
# Force phase transition fallback
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("force_transition_phase"),
[],
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
@singledispatchmethod
def _astify(self, element: ProgramElement) -> AstExpression:
raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.")

View File

@@ -167,7 +167,7 @@ class BDICoreAgent(BaseAgent):
case "force_next_phase":
self._force_next_phase()
case _:
self.logger.warning("Received unknow user interruption: %s", msg)
self.logger.warning("Received unknown user interruption: %s", msg)
def _apply_belief_changes(self, belief_changes: BeliefMessage):
"""

View File

@@ -1,152 +0,0 @@
import json
from pydantic import ValidationError
from control_backend.agents.base import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief, BeliefMessage
class BDIBeliefCollectorAgent(BaseAgent):
"""
BDI Belief Collector Agent.
This agent acts as a central aggregator for beliefs derived from various sources (e.g., text,
emotion, vision). It receives raw extracted data from other agents,
normalizes them into valid :class:`Belief` objects, and forwards them as a unified packet to the
BDI Core Agent.
It serves as a funnel to ensure the BDI agent receives a consistent stream of beliefs.
"""
async def setup(self):
"""
Initialize the agent.
"""
self.logger.info("Setting up %s", self.name)
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming messages from other extractor agents.
Routes the message to specific handlers based on the 'type' field in the JSON body.
Supported types:
- ``belief_extraction_text``: Handled by :meth:`_handle_belief_text`
- ``emotion_extraction_text``: Handled by :meth:`_handle_emo_text`
:param msg: The received internal message.
"""
sender_node = msg.sender
# Parse JSON payload
try:
payload = json.loads(msg.body)
except Exception as e:
self.logger.warning(
"BeliefCollector: failed to parse JSON from %s. Body=%r Error=%s",
sender_node,
msg.body,
e,
)
return
msg_type = payload.get("type")
# Prefer explicit 'type' field
if msg_type == "belief_extraction_text":
self.logger.debug("Message routed to _handle_belief_text (sender=%s)", sender_node)
await self._handle_belief_text(payload, sender_node)
# This is not implemented yet, but we keep the structure for future use
elif msg_type == "emotion_extraction_text":
self.logger.debug("Message routed to _handle_emo_text (sender=%s)", sender_node)
await self._handle_emo_text(payload, sender_node)
else:
self.logger.warning(
"Unrecognized message (sender=%s, type=%r). Ignoring.", sender_node, msg_type
)
async def _handle_belief_text(self, payload: dict, origin: str):
"""
Process text-based belief extraction payloads.
Expected payload format::
{
"type": "belief_extraction_text",
"beliefs": {
"user_said": ["Can you help me?"],
"intention": ["ask_help"]
}
}
Validates and converts the dictionary items into :class:`Belief` objects.
:param payload: The dictionary payload containing belief data.
:param origin: The name of the sender agent.
"""
beliefs = payload.get("beliefs", {})
if not beliefs:
self.logger.debug("Received empty beliefs set.")
return
def try_create_belief(name, arguments) -> Belief | None:
"""
Create a belief object from name and arguments, or return None silently if the input is
not correct.
:param name: The name of the belief.
:param arguments: The arguments of the belief.
:return: A Belief object if the input is valid or None.
"""
try:
return Belief(name=name, arguments=arguments)
except ValidationError:
return None
beliefs = [
belief
for name, arguments in beliefs.items()
if (belief := try_create_belief(name, arguments)) is not None
]
self.logger.debug("Forwarding %d beliefs.", len(beliefs))
for belief in beliefs:
for argument in belief.arguments:
self.logger.debug(" - %s %s", belief.name, argument)
await self._send_beliefs_to_bdi(beliefs, origin=origin)
async def _handle_emo_text(self, payload: dict, origin: str):
"""
Process emotion extraction payloads.
**TODO**: Implement this method once emotion recognition is integrated.
:param payload: The dictionary payload containing emotion data.
:param origin: The name of the sender agent.
"""
pass
async def _send_beliefs_to_bdi(self, beliefs: list[Belief], origin: str | None = None):
"""
Send a list of aggregated beliefs to the BDI Core Agent.
Wraps the beliefs in a :class:`BeliefMessage` and sends it via the 'beliefs' thread.
:param beliefs: The list of Belief objects to send.
:param origin: (Optional) The original source of the beliefs (unused currently).
"""
if not beliefs:
return
msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=BeliefMessage(create=beliefs).model_dump_json(),
thread="beliefs",
)
await self.send(msg)
self.logger.info("Sent %d belief(s) to BDI core.", len(beliefs))

View File

@@ -18,6 +18,12 @@ type JSONLike = None | bool | int | float | str | list["JSONLike"] | dict[str, "
class BeliefState(BaseModel):
"""
Represents the state of inferred semantic beliefs.
Maintains sets of beliefs that are currently considered true or false.
"""
true: set[InternalBelief] = set()
false: set[InternalBelief] = set()
@@ -338,7 +344,7 @@ class TextBeliefExtractorAgent(BaseAgent):
class SemanticBeliefInferrer:
"""
Class that handles only prompting an LLM for semantic beliefs.
Infers semantic beliefs from conversation history using an LLM.
"""
def __init__(
@@ -464,6 +470,10 @@ Respond with a JSON similar to the following, but with the property names as giv
class GoalAchievementInferrer(SemanticBeliefInferrer):
"""
Infers whether specific conversational goals have been achieved using an LLM.
"""
def __init__(self, llm: TextBeliefExtractorAgent.LLM):
super().__init__(llm)
self.goals: set[BaseGoal] = set()

View File

@@ -1 +1,5 @@
"""
Agents responsible for external communication and service discovery.
"""
from .ri_communication_agent import RICommunicationAgent as RICommunicationAgent

View File

@@ -334,7 +334,7 @@ class RICommunicationAgent(BaseAgent):
async def handle_message(self, msg: InternalMessage):
try:
pause_command = PauseCommand.model_validate_json(msg.body)
self._req_socket.send_json(pause_command.model_dump())
self.logger.debug(self._req_socket.recv_json())
await self._req_socket.send_json(pause_command.model_dump())
self.logger.debug(await self._req_socket.recv_json())
except ValidationError:
self.logger.warning("Incorrect message format for PauseCommand.")

View File

@@ -1 +1,5 @@
"""
Agents that interface with Large Language Models for natural language processing and generation.
"""
from .llm_agent import LLMAgent as LLMAgent

View File

@@ -1,3 +1,8 @@
"""
Agents responsible for processing sensory input, such as audio transcription and voice activity
detection.
"""
from .transcription_agent.transcription_agent import (
TranscriptionAgent as TranscriptionAgent,
)

View File

@@ -74,7 +74,7 @@ class TranscriptionAgent(BaseAgent):
def _connect_audio_in_socket(self):
"""
Helper to connect the ZMQ SUB socket for audio input.
Connects the ZMQ SUB socket for receiving audio data.
"""
self.audio_in_socket = azmq.Context.instance().socket(zmq.SUB)
self.audio_in_socket.setsockopt_string(zmq.SUBSCRIBE, "")

View File

@@ -26,7 +26,7 @@ class UserInterruptAgent(BaseAgent):
- Send a prioritized message to the `RobotSpeechAgent`
- Send a prioritized gesture to the `RobotGestureAgent`
- Send a belief override to the `BDIProgramManager`in order to activate a
- Send a belief override to the `BDI Core` in order to activate a
trigger/conditional norm or complete a goal.
Prioritized actions clear the current RI queue before inserting the new item,
@@ -50,10 +50,8 @@ class UserInterruptAgent(BaseAgent):
async def setup(self):
"""
Initialize the agent.
Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic.
Starts the background behavior to receive the user interrupts.
Initialize the agent by setting up ZMQ sockets for receiving button events and
publishing updates.
"""
context = Context.instance()
@@ -68,17 +66,15 @@ class UserInterruptAgent(BaseAgent):
async def _receive_button_event(self):
"""
The behaviour of the UserInterruptAgent.
Continuous loop that receives button_pressed events from the button_pressed HTTP endpoint.
These events contain a type and a context.
Main loop to receive and process button press events from the UI.
These are the different types and contexts:
- type: "speech", context: string that the robot has to say.
- type: "gesture", context: single gesture name that the robot has to perform.
- type: "override", context: belief_id that overrides the goal/trigger/conditional norm.
- type: "pause", context: boolean indicating whether to pause
- type: "reset_phase", context: None, indicates to the BDI Core to
- type: "reset_experiment", context: None, indicates to the BDI Core to
Handles different event types:
- `speech`: Triggers immediate robot speech.
- `gesture`: Triggers an immediate robot gesture.
- `override`: Forces a belief, trigger, or goal completion in the BDI core.
- `override_unachieve`: Removes a belief from the BDI core.
- `pause`: Toggles the system's pause state.
- `next_phase` / `reset_phase`: Controls experiment flow.
"""
while True:
topic, body = await self.sub_socket.recv_multipart()
@@ -93,71 +89,88 @@ class UserInterruptAgent(BaseAgent):
self.logger.debug("Received event type %s", event_type)
if event_type == "speech":
await self._send_to_speech_agent(event_context)
self.logger.info(
"Forwarded button press (speech) with context '%s' to RobotSpeechAgent.",
event_context,
)
elif event_type == "gesture":
await self._send_to_gesture_agent(event_context)
self.logger.info(
"Forwarded button press (gesture) with context '%s' to RobotGestureAgent.",
event_context,
)
elif event_type == "override":
ui_id = str(event_context)
if asl_trigger := self._trigger_map.get(ui_id):
await self._send_to_bdi("force_trigger", asl_trigger)
match event_type:
case "speech":
await self._send_to_speech_agent(event_context)
self.logger.info(
"Forwarded button press (override) with context '%s' to BDI Core.",
"Forwarded button press (speech) with context '%s' to RobotSpeechAgent.",
event_context,
)
elif asl_cond_norm := self._cond_norm_map.get(ui_id):
await self._send_to_bdi("force_norm", asl_cond_norm)
case "gesture":
await self._send_to_gesture_agent(event_context)
self.logger.info(
"Forwarded button press (override) with context '%s' to BDIProgramManager.",
"Forwarded button press (gesture) with context '%s' to RobotGestureAgent.",
event_context,
)
elif asl_goal := self._goal_map.get(ui_id):
await self._send_to_bdi_belief(asl_goal)
self.logger.info(
"Forwarded button press (override) with context '%s' to BDI Core.",
case "override":
ui_id = str(event_context)
if asl_trigger := self._trigger_map.get(ui_id):
await self._send_to_bdi("force_trigger", asl_trigger)
self.logger.info(
"Forwarded button press (override) with context '%s' to BDI Core.",
event_context,
)
elif asl_cond_norm := self._cond_norm_map.get(ui_id):
await self._send_to_bdi_belief(asl_cond_norm, "cond_norm")
self.logger.info(
"Forwarded button press (override) with context '%s' to BDI Core.",
event_context,
)
elif asl_goal := self._goal_map.get(ui_id):
await self._send_to_bdi_belief(asl_goal, "goal")
self.logger.info(
"Forwarded button press (override) with context '%s' to BDI Core.",
event_context,
)
# Send achieve_goal to program manager to update semantic belief extractor
goal_achieve_msg = InternalMessage(
to=settings.agent_settings.bdi_program_manager_name,
thread="achieve_goal",
body=ui_id,
)
await self.send(goal_achieve_msg)
else:
self.logger.warning("Could not determine which element to override.")
case "override_unachieve":
ui_id = str(event_context)
if asl_cond_norm := self._cond_norm_map.get(ui_id):
await self._send_to_bdi_belief(asl_cond_norm, "cond_norm", True)
self.logger.info(
"Forwarded button press (override_unachieve)"
"with context '%s' to BDI Core.",
event_context,
)
else:
self.logger.warning(
"Could not determine which conditional norm to unachieve."
)
case "pause":
self.logger.debug(
"Received pause/resume button press with context '%s'.", event_context
)
await self._send_pause_command(event_context)
if event_context:
self.logger.info("Sent pause command.")
else:
self.logger.info("Sent resume command.")
case "next_phase" | "reset_phase":
await self._send_experiment_control_to_bdi_core(event_type)
case _:
self.logger.warning(
"Received button press with unknown type '%s' (context: '%s').",
event_type,
event_context,
)
goal_achieve_msg = InternalMessage(
to=settings.agent_settings.bdi_program_manager_name,
thread="achieve_goal",
body=ui_id,
)
await self.send(goal_achieve_msg)
else:
self.logger.warning("Could not determine which element to override.")
elif event_type == "pause":
self.logger.debug(
"Received pause/resume button press with context '%s'.", event_context
)
await self._send_pause_command(event_context)
if event_context:
self.logger.info("Sent pause command.")
else:
self.logger.info("Sent resume command.")
elif event_type in ["next_phase", "reset_phase", "reset_experiment"]:
await self._send_experiment_control_to_bdi_core(event_type)
else:
self.logger.warning(
"Received button press with unknown type '%s' (context: '%s').",
event_type,
event_context,
)
async def handle_message(self, msg: InternalMessage):
"""
Handle commands received from other internal Python agents.
Handles internal messages from other agents, such as program updates or trigger
notifications.
:param msg: The incoming :class:`~control_backend.core.agent_system.InternalMessage`.
"""
match msg.thread:
case "new_program":
@@ -171,11 +184,9 @@ class UserInterruptAgent(BaseAgent):
payload = {"type": "trigger_update", "id": ui_id, "achieved": True}
await self._send_experiment_update(payload)
self.logger.info(f"UI Update: Trigger {asl_slug} started (ID: {ui_id})")
case "trigger_end":
asl_slug = msg.body
ui_id = self._trigger_reverse_map.get(asl_slug)
if ui_id:
payload = {"type": "trigger_update", "id": ui_id, "achieved": False}
await self._send_experiment_update(payload)
@@ -195,31 +206,37 @@ class UserInterruptAgent(BaseAgent):
await self._send_experiment_update(payload)
self.logger.info(f"UI Update: Goal {goal_name} started (ID: {ui_id})")
case "active_norms_update":
norm_list = [s.strip("() '\",") for s in msg.body.split(",") if s.strip("() '\",")]
await self._broadcast_cond_norms(norm_list)
active_norms_asl = [
s.strip("() '\",") for s in msg.body.split(",") if s.strip("() '\",")
]
await self._broadcast_cond_norms(active_norms_asl)
case _:
self.logger.debug(f"Received internal message on unhandled thread: {msg.thread}")
async def _broadcast_cond_norms(self, active_slugs: list[str]):
"""
Sends the current state of all conditional norms to the UI.
:param active_slugs: A list of slugs (strings) currently active in the BDI core.
Broadcasts the current activation state of all conditional norms to the UI.
:param active_slugs: A list of sluggified norm names currently active in the BDI core.
"""
updates = []
for asl_slug, ui_id in self._cond_norm_reverse_map.items():
is_active = asl_slug in active_slugs
updates.append({"id": ui_id, "name": asl_slug, "active": is_active})
updates.append({"id": ui_id, "active": is_active})
payload = {"type": "cond_norms_state_update", "norms": updates}
await self._send_experiment_update(payload, should_log=False)
# self.logger.debug(f"Broadcasted state for {len(updates)} conditional norms.")
if self.pub_socket:
topic = b"status"
body = json.dumps(payload).encode("utf-8")
await self.pub_socket.send_multipart([topic, body])
# self.logger.info(f"UI Update: Active norms {updates}")
def _create_mapping(self, program_json: str):
"""
Create mappings between UI IDs and ASL slugs for triggers, goals, and conditional norms
Creates a bidirectional mapping between UI identifiers and AgentSpeak slugs.
:param program_json: The JSON representation of the behavioral program.
"""
try:
program = Program.model_validate_json(program_json)
@@ -261,8 +278,10 @@ class UserInterruptAgent(BaseAgent):
async def _send_experiment_update(self, data, should_log: bool = True):
"""
Sends an update to the 'experiment' topic.
The SSE endpoint will pick this up and push it to the UI.
Publishes an experiment state update to the internal ZMQ bus for the UI.
:param data: The update payload.
:param should_log: Whether to log the update.
"""
if self.pub_socket:
topic = b"experiment"
@@ -308,12 +327,20 @@ class UserInterruptAgent(BaseAgent):
await self.send(msg)
self.logger.info(f"Directly forced {thread} in BDI: {body}")
async def _send_to_bdi_belief(self, asl_goal: str):
async def _send_to_bdi_belief(self, asl: str, asl_type: str, unachieve: bool = False):
"""Send belief to BDI Core"""
belief_name = f"achieved_{asl_goal}"
if asl_type == "goal":
belief_name = f"achieved_{asl}"
elif asl_type == "cond_norm":
belief_name = f"force_{asl}"
else:
self.logger.warning("Tried to send belief with unknown type")
belief = Belief(name=belief_name, arguments=None)
self.logger.debug(f"Sending belief to BDI Core: {belief_name}")
belief_message = BeliefMessage(create=[belief])
# Conditional norms are unachieved by removing the belief
belief_message = (
BeliefMessage(delete=[belief]) if unachieve else BeliefMessage(create=[belief])
)
msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
thread="beliefs",