Add documentation #31

Merged
k.marinus merged 6 commits from docs/docs-cb into dev 2025-11-27 12:16:12 +00:00
30 changed files with 778 additions and 83 deletions

View File

@@ -10,6 +10,17 @@ from control_backend.schemas.ri_message import SpeechCommand
class RobotSpeechAgent(BaseAgent):
"""
This agent acts as a bridge between the control backend and the Robot Interface (RI).
It receives speech commands from other agents or from the UI,
and forwards them to the robot via a ZMQ PUB socket.
:ivar subsocket: ZMQ SUB socket for receiving external commands (e.g., from UI).
:ivar pubsocket: ZMQ PUB socket for sending commands to the Robot Interface.
:ivar address: Address to bind/connect the PUB socket.
:ivar bind: Whether to bind or connect the PUB socket.
"""
subsocket: zmq.Socket
pubsocket: zmq.Socket
address = ""
@@ -27,7 +38,11 @@ class RobotSpeechAgent(BaseAgent):
async def setup(self):
"""
Setup the robot speech command agent
Initialize the agent.
1. Sets up the PUB socket to talk to the robot.
2. Sets up the SUB socket to listen for "command" topics (from UI/External).
3. Starts the loop for handling ZMQ commands.
"""
self.logger.info("Setting up %s", self.name)
@@ -58,7 +73,11 @@ class RobotSpeechAgent(BaseAgent):
async def handle_message(self, msg: InternalMessage):
"""
Handle commands received from other Python agents.
Handle commands received from other internal Python agents.
Validates the message as a :class:`SpeechCommand` and forwards it to the robot.
:param msg: The internal message containing the command.
"""
try:
speech_command = SpeechCommand.model_validate_json(msg.body)
@@ -68,7 +87,9 @@ class RobotSpeechAgent(BaseAgent):
async def _zmq_command_loop(self):
"""
Handle commands from the UI.
Loop to handle commands received via ZMQ (e.g., from the UI).
Listens on the 'command' topic, validates the JSON, and forwards it to the robot.
"""
while self._running:
try:

View File

@@ -5,14 +5,22 @@ from control_backend.core.agent_system import BaseAgent as CoreBaseAgent
class BaseAgent(CoreBaseAgent):
"""
Base agent class for our agents to inherit from. This just ensures
all agents have a logger.
The primary base class for all implementation agents.
Inherits from :class:`control_backend.core.agent_system.BaseAgent`.
This class ensures that every agent instance is automatically equipped with a
properly configured ``logger``.
:ivar logger: A logger instance named after the agent's package and class.
"""
logger: logging.Logger
# Whenever a subclass is initiated, give it the correct logger
def __init_subclass__(cls, **kwargs) -> None:
"""
Whenever a subclass is initiated, give it the correct logger.
:param kwargs: Keyword arguments for the subclass.
"""
super().__init_subclass__(**kwargs)
cls.logger = logging.getLogger(__package__).getChild(cls.__name__)

View File

@@ -1,4 +1,5 @@
from .bdi_core_agent.bdi_core_agent import BDICoreAgent as BDICoreAgent
from control_backend.agents.bdi.bdi_core_agent import BDICoreAgent as BDICoreAgent
from .belief_collector_agent import (
BDIBeliefCollectorAgent as BDIBeliefCollectorAgent,
)

View File

@@ -19,6 +19,27 @@ DELIMITER = ";\n" # TODO: temporary until we support lists in AgentSpeak
class BDICoreAgent(BaseAgent):
"""
BDI Core Agent.
This is the central reasoning agent of the system, powered by the **AgentSpeak(L)** language.
It maintains a belief base (representing the state of the world) and a set of plans (rules).
It runs an internal BDI (Belief-Desire-Intention) cycle using the ``agentspeak`` library.
When beliefs change (e.g., via :meth:`_apply_beliefs`), the agent evaluates its plans to
determine the best course of action.
**Custom Actions:**
It defines custom actions (like ``.reply``) that allow the AgentSpeak code to interact with
external Python agents (e.g., querying the LLM).
:ivar bdi_agent: The internal AgentSpeak agent instance.
:ivar asl_file: Path to the AgentSpeak source file (.asl).
:ivar env: The AgentSpeak environment.
:ivar actions: A registry of custom actions available to the AgentSpeak code.
:ivar _wake_bdi_loop: Event used to wake up the reasoning loop when new beliefs arrive.
"""
bdi_agent: agentspeak.runtime.Agent
def __init__(self, name: str, asl: str):
@@ -30,6 +51,13 @@ class BDICoreAgent(BaseAgent):
self._wake_bdi_loop = asyncio.Event()
async def setup(self) -> None:
"""
Initialize the BDI agent.
1. Registers custom actions (like ``.reply``).
2. Loads the .asl source file.
3. Starts the reasoning loop (:meth:`_bdi_loop`) in the background.
"""
self.logger.debug("Setup started.")
self._add_custom_actions()
@@ -42,6 +70,9 @@ class BDICoreAgent(BaseAgent):
self.logger.debug("Setup complete.")
async def _load_asl(self):
"""
Load and parse the AgentSpeak source file.
"""
try:
with open(self.asl_file) as source:
self.bdi_agent = self.env.build_agent(source, self.actions)
@@ -51,7 +82,11 @@ class BDICoreAgent(BaseAgent):
async def _bdi_loop(self):
"""
Runs the AgentSpeak BDI loop. Efficiently checks for when the next expected work will be.
The main BDI reasoning loop.
It waits for the ``_wake_bdi_loop`` event (set when beliefs change or actions complete).
When awake, it steps through the AgentSpeak interpreter. It also handles sleeping if
the agent has deferred intentions (deadlines).
"""
while self._running:
await (
@@ -78,7 +113,12 @@ class BDICoreAgent(BaseAgent):
async def handle_message(self, msg: InternalMessage):
"""
Route incoming messages (Beliefs or LLM responses).
Handle incoming messages.
- **Beliefs**: Updates the internal belief base.
- **LLM Responses**: Forwards the generated text to the Robot Speech Agent (actuation).
:param msg: The received internal message.
"""
self.logger.debug("Processing message from %s.", msg.sender)
@@ -106,6 +146,12 @@ class BDICoreAgent(BaseAgent):
await self.send(out_msg)
def _apply_beliefs(self, beliefs: list[Belief]):
"""
Update the belief base with a list of new beliefs.
If ``replace=True`` is set on a belief, it removes all existing beliefs with that name
before adding the new one.
"""
if not beliefs:
return
@@ -115,6 +161,12 @@ class BDICoreAgent(BaseAgent):
self._add_belief(belief.name, belief.arguments)
def _add_belief(self, name: str, args: Iterable[str] = []):
"""
Add a single belief to the BDI agent.
:param name: The functor/name of the belief (e.g., "user_said").
:param args: Arguments for the belief.
"""
# new_args = (agentspeak.Literal(arg) for arg in args) # TODO: Eventually support multiple
merged_args = DELIMITER.join(arg for arg in args)
new_args = (agentspeak.Literal(merged_args),)

View File

@@ -11,8 +11,14 @@ from control_backend.schemas.program import Program
class BDIProgramManager(BaseAgent):
"""
Will interpret programs received from the HTTP endpoint. Extracts norms, goals, triggers and
forwards them to the BDI as beliefs.
BDI Program Manager Agent.
This agent is responsible for receiving high-level programs (sequences of instructions/goals)
from the external HTTP API (via ZMQ) and translating them into core beliefs (norms and goals)
for the BDI Core Agent. In the future, it will be responsible for determining when goals are
met, and passing on new norms and goals accordingly.
:ivar sub_socket: The ZMQ SUB socket used to receive program updates.
"""
def __init__(self, **kwargs):
@@ -20,6 +26,18 @@ class BDIProgramManager(BaseAgent):
self.sub_socket = None
async def _send_to_bdi(self, program: Program):
"""
Convert a received program into BDI beliefs and send them to the BDI Core Agent.
Currently, it takes the **first phase** of the program and extracts:
- **Norms**: Constraints or rules the agent must follow.
- **Goals**: Objectives the agent must achieve.
These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will
overwrite any existing norms/goals of the same name in the BDI agent.
:param program: The program object received from the API.
"""
first_phase = program.phases[0]
norms_belief = Belief(
name="norms",
@@ -44,7 +62,10 @@ class BDIProgramManager(BaseAgent):
async def _receive_programs(self):
"""
Continuously receive programs from the HTTP endpoint, sent to us over ZMQ.
Continuous loop that receives program updates from the HTTP endpoint.
It listens to the ``program`` topic on the internal ZMQ SUB socket.
When a program is received, it is validated and forwarded to BDI via :meth:`_send_to_bdi`.
"""
while True:
topic, body = await self.sub_socket.recv_multipart()
@@ -58,6 +79,12 @@ class BDIProgramManager(BaseAgent):
await self._send_to_bdi(program)
async def setup(self):
"""
Initialize the agent.
Connects the internal ZMQ SUB socket and subscribes to the 'program' topic.
Starts the background behavior to receive programs.
"""
context = Context.instance()
self.sub_socket = context.socket(zmq.SUB)

View File

@@ -10,14 +10,33 @@ from control_backend.schemas.belief_message import Belief, BeliefMessage
class BDIBeliefCollectorAgent(BaseAgent):
"""
Continuously collects beliefs/emotions from extractor agents and forwards a
unified belief packet to the BDI agent.
BDI Belief Collector Agent.
This agent acts as a central aggregator for beliefs derived from various sources (e.g., text,
emotion, vision). It receives raw extracted data from other agents,
normalizes them into valid :class:`Belief` objects, and forwards them as a unified packet to the
BDI Core Agent.
It serves as a funnel to ensure the BDI agent receives a consistent stream of beliefs.
"""
async def setup(self):
"""
Initialize the agent.
"""
self.logger.info("Setting up %s", self.name)
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming messages from other extractor agents.
Routes the message to specific handlers based on the 'type' field in the JSON body.
Supported types:
- ``belief_extraction_text``: Handled by :meth:`_handle_belief_text`
- ``emotion_extraction_text``: Handled by :meth:`_handle_emo_text`
:param msg: The received internal message.
"""
sender_node = msg.sender
# Parse JSON payload
@@ -49,12 +68,22 @@ class BDIBeliefCollectorAgent(BaseAgent):
async def _handle_belief_text(self, payload: dict, origin: str):
"""
Expected payload:
Process text-based belief extraction payloads.
Expected payload format::
{
"type": "belief_extraction_text",
"beliefs": {"user_said": ["Can you help me?"]}
"beliefs": {
"user_said": ["Can you help me?"],
"intention": ["ask_help"]
}
}
Validates and converts the dictionary items into :class:`Belief` objects.
:param payload: The dictionary payload containing belief data.
:param origin: The name of the sender agent.
"""
beliefs = payload.get("beliefs", {})
@@ -90,12 +119,24 @@ class BDIBeliefCollectorAgent(BaseAgent):
await self._send_beliefs_to_bdi(beliefs, origin=origin)
async def _handle_emo_text(self, payload: dict, origin: str):
"""TODO: implement (after we have emotional recognition)"""
"""
Process emotion extraction payloads.
**TODO**: Implement this method once emotion recognition is integrated.
:param payload: The dictionary payload containing emotion data.
:param origin: The name of the sender agent.
"""
pass
async def _send_beliefs_to_bdi(self, beliefs: list[Belief], origin: str | None = None):
"""
Sends a unified belief packet to the BDI agent.
Send a list of aggregated beliefs to the BDI Core Agent.
Wraps the beliefs in a :class:`BeliefMessage` and sends it via the 'beliefs' thread.
:param beliefs: The list of Belief objects to send.
:param origin: (Optional) The original source of the beliefs (unused currently).
"""
if not beliefs:
return

View File

@@ -6,12 +6,31 @@ from control_backend.core.config import settings
class TextBeliefExtractorAgent(BaseAgent):
"""
Text Belief Extractor Agent.
This agent is responsible for processing raw text (e.g., from speech transcription) and
extracting semantic beliefs from it.
In the current demonstration version, it performs a simple wrapping of the user's input
into a ``user_said`` belief. In a full implementation, this agent would likely interact
with an LLM or NLU engine to extract intent, entities, and other structured information.
"""
async def setup(self):
"""
Initialize the agent and its resources.
"""
self.logger.info("Settting up %s.", self.name)
# Setup LLM belief context if needed (currently demo is just passthrough)
self.beliefs = {"mood": ["X"], "car": ["Y"]}
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming messages, primarily from the Transcription Agent.
:param msg: The received message containing transcribed text.
"""
sender = msg.sender
if sender == settings.agent_settings.transcription_name:
self.logger.debug("Received text from transcriber: %s", msg.body)
@@ -21,7 +40,15 @@ class TextBeliefExtractorAgent(BaseAgent):
async def _process_transcription_demo(self, txt: str):
"""
Demo version to process the transcription input to beliefs.
Process the transcribed text and generate beliefs.
**Demo Implementation:**
Currently, this method takes the raw text ``txt`` and wraps it into a belief structure:
``user_said("txt")``.
This belief is then sent to the :class:`BDIBeliefCollectorAgent`.
:param txt: The raw transcribed text string.
"""
# For demo, just wrapping user text as user_said belief
belief = {"beliefs": {"user_said": [txt]}, "type": "belief_extraction_text"}

View File

@@ -12,6 +12,27 @@ from ..actuation.robot_speech_agent import RobotSpeechAgent
class RICommunicationAgent(BaseAgent):
"""
Robot Interface (RI) Communication Agent.
This agent manages the high-level connection negotiation and health checking (heartbeat)
between the Control Backend and the Robot Interface (or UI).
It acts as a service discovery mechanism:
1. It initiates a handshake (negotiation) to discover where other services (like the robot
command listener) are listening.
2. It spawns specific agents
(like :class:`~control_backend.agents.actuation.robot_speech_agent.RobotSpeechAgent`)
once the connection details are established.
3. It maintains a "ping" loop to ensure the connection remains active.
:ivar _address: The ZMQ address to attempt the initial connection negotiation.
:ivar _bind: Whether to bind or connect the negotiation socket.
:ivar _req_socket: ZMQ REQ socket for negotiation and pings.
:ivar pub_socket: ZMQ PUB socket for internal notifications (e.g., ping status).
:ivar connected: Boolean flag indicating active connection status.
"""
def __init__(
self,
name: str,
@@ -27,8 +48,10 @@ class RICommunicationAgent(BaseAgent):
async def setup(self):
"""
Try to set up the communication agent, we have `behaviour_settings.comm_setup_max_retries`
retries in case we don't have a response yet.
Initialize the agent and attempt connection.
Tries to negotiate connection up to ``behaviour_settings.comm_setup_max_retries`` times.
If successful, starts the :meth:`_listen_loop`.
"""
self.logger.info("Setting up %s", self.name)
@@ -45,7 +68,7 @@ class RICommunicationAgent(BaseAgent):
async def _setup_sockets(self, force=False):
"""
Sets up request socket for communication agent.
Initialize ZMQ sockets (REQ for negotiation, PUB for internal updates).
"""
# Bind request socket
if self._req_socket is None or force:
@@ -62,6 +85,15 @@ class RICommunicationAgent(BaseAgent):
async def _negotiate_connection(
self, max_retries: int = settings.behaviour_settings.comm_setup_max_retries
):
"""
Perform the handshake protocol with the Robot Interface.
Sends a ``negotiate/ports`` request and expects a configuration response containing
port assignments for various services (e.g., actuation).
:param max_retries: Number of attempts before giving up.
:return: True if negotiation succeeded, False otherwise.
"""
retries = 0
while retries < max_retries:
if self._req_socket is None:
@@ -122,6 +154,12 @@ class RICommunicationAgent(BaseAgent):
return False
async def _handle_negotiation_response(self, received_message):
"""
Parse the negotiation response and initialize services.
Based on the response, it might re-connect the main socket or spawn new agents
(e.g., for robot actuation).
"""
for port_data in received_message["data"]:
id = port_data["id"]
port = port_data["port"]
@@ -151,6 +189,10 @@ class RICommunicationAgent(BaseAgent):
self.logger.warning("Unhandled negotiation id: %s", id)
async def stop(self):
"""
Closes all sockets.
:return:
"""
if self._req_socket:
self._req_socket.close()
if self.pub_socket:
@@ -159,7 +201,10 @@ class RICommunicationAgent(BaseAgent):
async def _listen_loop(self):
"""
Run the listening (ping) loop indefinitely.
Maintain the connection via a heartbeat (ping) loop.
Sends a ``ping`` request periodically and waits for a reply.
If pings fail repeatedly, it triggers a disconnection handler to restart negotiation.
"""
while self._running:
if not self.connected:
@@ -217,6 +262,11 @@ class RICommunicationAgent(BaseAgent):
raise
async def _handle_disconnection(self):
"""
Handle connection loss.
Notifies the UI of disconnection (via internal PUB) and attempts to restart negotiation.
"""
self.connected = False
# Tell UI we're disconnected.

View File

@@ -16,9 +16,17 @@ from .llm_instructions import LLMInstructions
class LLMAgent(BaseAgent):
"""
Agent responsible for processing user text input and querying a locally
hosted LLM for text generation. Receives messages from the BDI Core Agent
and responds with processed LLM output.
LLM Agent.
This agent is responsible for processing user text input and querying a locally
hosted LLM for text generation. It acts as the conversational brain of the system.
It receives :class:`~control_backend.schemas.llm_prompt_message.LLMPromptMessage`
payloads from the BDI Core Agent, constructs a conversation history, queries the
LLM via HTTP, and streams the response back to the BDI agent in natural chunks
(e.g., sentence by sentence).
:ivar history: A list of dictionaries representing the conversation history (Role/Content).
"""
def __init__(self, name: str):
@@ -29,6 +37,14 @@ class LLMAgent(BaseAgent):
self.logger.info("Setting up %s.", self.name)
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming messages.
Expects messages from :attr:`settings.agent_settings.bdi_core_name` containing
an :class:`LLMPromptMessage` in the body.
:param msg: The received internal message.
"""
if msg.sender == settings.agent_settings.bdi_core_name:
self.logger.debug("Processing message from BDI core.")
try:
@@ -40,6 +56,14 @@ class LLMAgent(BaseAgent):
self.logger.debug("Message ignored (not from BDI core.")
async def _process_bdi_message(self, message: LLMPromptMessage):
"""
Orchestrate the LLM query and response streaming.
Iterates over the chunks yielded by :meth:`_query_llm` and forwards them
individually to the BDI agent via :meth:`_send_reply`.
:param message: The parsed prompt message containing text, norms, and goals.
"""
async for chunk in self._query_llm(message.text, message.norms, message.goals):
await self._send_reply(chunk)
self.logger.debug(
@@ -48,7 +72,9 @@ class LLMAgent(BaseAgent):
async def _send_reply(self, msg: str):
"""
Sends a response message back to the BDI Core Agent.
Sends a response message (chunk) back to the BDI Core Agent.
:param msg: The text content of the chunk.
"""
reply = InternalMessage(
to=settings.agent_settings.bdi_core_name,
@@ -61,13 +87,18 @@ class LLMAgent(BaseAgent):
self, prompt: str, norms: list[str], goals: list[str]
) -> AsyncGenerator[str]:
"""
Sends a chat completion request to the local LLM service and streams the response by
yielding fragments separated by punctuation like.
Send a chat completion request to the local LLM service and stream the response.
It constructs the full prompt using
:class:`~control_backend.agents.llm.llm_instructions.LLMInstructions`.
It streams the response from the LLM and buffers tokens until a natural break (punctuation)
is reached, then yields the chunk. This ensures that the robot speaks in complete phrases
rather than individual tokens.
:param prompt: Input text prompt to pass to the LLM.
:param norms: Norms the LLM should hold itself to.
:param goals: Goals the LLM should achieve.
:yield: Fragments of the LLM-generated content.
:yield: Fragments of the LLM-generated content (e.g., sentences/phrases).
"""
self.history.append(
{
@@ -85,7 +116,7 @@ class LLMAgent(BaseAgent):
*self.history,
]
message_id = str(uuid.uuid4())
message_id = str(uuid.uuid4()) # noqa
try:
full_message = ""
@@ -127,8 +158,14 @@ class LLMAgent(BaseAgent):
yield "Error processing the request."
async def _stream_query_llm(self, messages) -> AsyncGenerator[str]:
"""Raises httpx.HTTPError when the API gives an error."""
async with httpx.AsyncClient(timeout=None) as client:
"""
Perform the raw HTTP streaming request to the LLM API.
:param messages: The list of message dictionaries (role/content).
:yield: Raw text tokens (deltas) from the SSE stream.
:raises httpx.HTTPError: If the API returns a non-200 status.
"""
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
settings.llm_settings.local_llm_url,

View File

@@ -1,7 +1,14 @@
class LLMInstructions:
"""
Defines structured instructions that are sent along with each request
to the LLM to guide its behavior (norms, goals, etc.).
Helper class to construct the system instructions for the LLM.
It combines the base persona (Pepper robot) with dynamic norms and goals
provided by the BDI system.
If no norms/goals are given it assumes empty lists.
:ivar norms: A list of behavioral norms.
:ivar goals: A list of specific conversational goals.
"""
@staticmethod
@@ -23,8 +30,16 @@ class LLMInstructions:
def build_developer_instruction(self) -> str:
"""
Builds a multi-line formatted instruction string for the LLM.
Includes only non-empty structured fields.
Builds the final system prompt string.
The prompt includes:
1. Persona definition.
2. Constraint on response length.
3. Instructions on how to handle goals (reach them in order, but prioritize natural flow).
4. The specific list of norms.
5. The specific list of goals.
:return: The formatted system prompt string.
"""
sections = [
"You are a Pepper robot engaging in natural human conversation.",

View File

@@ -14,15 +14,28 @@ from control_backend.core.config import settings
class SpeechRecognizer(abc.ABC):
"""
Abstract base class for speech recognition backends.
Provides a common interface for loading models and transcribing audio,
as well as heuristics for estimating token counts to optimize decoding.
:ivar limit_output_length: If True, limits the generated text length based on audio duration.
"""
def __init__(self, limit_output_length=True):
"""
:param limit_output_length: When `True`, the length of the generated speech will be limited
by the length of the input audio and some heuristics.
:param limit_output_length: When ``True``, the length of the generated speech will be
limited by the length of the input audio and some heuristics.
"""
self.limit_output_length = limit_output_length
@abc.abstractmethod
def load_model(self): ...
def load_model(self):
"""
Load the speech recognition model into memory.
"""
...
@abc.abstractmethod
def recognize_speech(self, audio: np.ndarray) -> str:
@@ -31,14 +44,16 @@ class SpeechRecognizer(abc.ABC):
:param audio: A full utterance sample. Audio must be 16 kHz, mono, np.float32, values in the
range [-1.0, 1.0].
:return: Recognized speech.
:return: The recognized speech text.
"""
@staticmethod
def _estimate_max_tokens(audio: np.ndarray) -> int:
"""
Estimate the maximum length of a given audio sample in tokens. Assumes a maximum speaking
rate of 450 words per minute (3x average), and assumes that 3 words is 4 tokens.
Estimate the maximum length of a given audio sample in tokens.
Assumes a maximum speaking rate of 450 words per minute (3x average), and assumes that
3 words is approx. 4 tokens.
:param audio: The audio sample (16 kHz) to use for length estimation.
:return: The estimated length of the transcribed audio in tokens.
@@ -51,8 +66,10 @@ class SpeechRecognizer(abc.ABC):
def _get_decode_options(self, audio: np.ndarray) -> dict:
"""
Construct decoding options for the Whisper model.
:param audio: The audio sample (16 kHz) to use to determine options like max decode length.
:return: A dict that can be used to construct `whisper.DecodingOptions`.
:return: A dict that can be used to construct ``whisper.DecodingOptions`` (or equivalent).
"""
options = {}
if self.limit_output_length:
@@ -61,7 +78,12 @@ class SpeechRecognizer(abc.ABC):
@staticmethod
def best_type():
"""Get the best type of SpeechRecognizer based on system capabilities."""
"""
Factory method to get the best available `SpeechRecognizer`.
:return: An instance of :class:`MLXWhisperSpeechRecognizer` if on macOS with Apple Silicon,
otherwise :class:`OpenAIWhisperSpeechRecognizer`.
"""
if torch.mps.is_available():
print("Choosing MLX Whisper model.")
return MLXWhisperSpeechRecognizer()
@@ -71,12 +93,20 @@ class SpeechRecognizer(abc.ABC):
class MLXWhisperSpeechRecognizer(SpeechRecognizer):
"""
Speech recognizer using the MLX framework (optimized for Apple Silicon).
"""
def __init__(self, limit_output_length=True):
super().__init__(limit_output_length)
self.was_loaded = False
self.model_name = settings.speech_model_settings.mlx_model_name
def load_model(self):
"""
Ensures the model is downloaded and cached. MLX loads dynamically, so this
pre-fetches the model.
"""
if self.was_loaded:
return
# There appears to be no dedicated mechanism to preload a model, but this `get_model` does
@@ -94,11 +124,18 @@ class MLXWhisperSpeechRecognizer(SpeechRecognizer):
class OpenAIWhisperSpeechRecognizer(SpeechRecognizer):
"""
Speech recognizer using the standard OpenAI Whisper library (PyTorch).
"""
def __init__(self, limit_output_length=True):
super().__init__(limit_output_length)
self.model = None
def load_model(self):
"""
Loads the OpenAI Whisper model onto the available device (CUDA or CPU).
"""
if self.model is not None:
return
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

View File

@@ -13,11 +13,26 @@ from .speech_recognizer import SpeechRecognizer
class TranscriptionAgent(BaseAgent):
"""
An agent which listens to audio fragments with voice, transcribes them, and sends the
transcription to other agents.
Transcription Agent.
This agent listens to audio fragments (containing speech) on a ZMQ SUB socket,
transcribes them using the configured :class:`SpeechRecognizer`, and sends the
resulting text to other agents (e.g., the Text Belief Extractor).
It uses an internal semaphore to limit the number of concurrent transcription tasks.
:ivar audio_in_address: The ZMQ address to receive audio from (usually from VAD Agent).
:ivar audio_in_socket: The ZMQ SUB socket instance.
:ivar speech_recognizer: The speech recognition engine instance.
:ivar _concurrency: Semaphore to limit concurrent transcriptions.
"""
def __init__(self, audio_in_address: str):
"""
Initialize the Transcription Agent.
:param audio_in_address: The ZMQ address of the audio source (e.g., VAD output).
"""
super().__init__(settings.agent_settings.transcription_name)
self.audio_in_address = audio_in_address
@@ -26,6 +41,13 @@ class TranscriptionAgent(BaseAgent):
self._concurrency = None
async def setup(self):
"""
Initialize the agent resources.
1. Connects to the audio input ZMQ socket.
2. Initializes the :class:`SpeechRecognizer` (choosing the best available backend).
3. Starts the background transcription loop.
"""
self.logger.info("Setting up %s", self.name)
self._connect_audio_in_socket()
@@ -42,23 +64,45 @@ class TranscriptionAgent(BaseAgent):
self.logger.info("Finished setting up %s", self.name)
async def stop(self):
"""
Stop the agent and close sockets.
"""
assert self.audio_in_socket is not None
self.audio_in_socket.close()
self.audio_in_socket = None
return await super().stop()
def _connect_audio_in_socket(self):
"""
Helper to connect the ZMQ SUB socket for audio input.
"""
self.audio_in_socket = azmq.Context.instance().socket(zmq.SUB)
self.audio_in_socket.setsockopt_string(zmq.SUBSCRIBE, "")
self.audio_in_socket.connect(self.audio_in_address)
async def _transcribe(self, audio: np.ndarray) -> str:
"""
Run the speech recognition on the audio data.
This runs in a separate thread (via `asyncio.to_thread`) to avoid blocking the event loop,
constrained by the concurrency semaphore.
:param audio: The audio data as a numpy array.
:return: The transcribed text string.
"""
assert self._concurrency is not None and self.speech_recognizer is not None
async with self._concurrency:
return await asyncio.to_thread(self.speech_recognizer.recognize_speech, audio)
async def _share_transcription(self, transcription: str):
"""Share a transcription to the other agents that depend on it."""
"""
Share a transcription to the other agents that depend on it.
Currently sends to:
- :attr:`settings.agent_settings.text_belief_extractor_name`
:param transcription: The transcribed text.
"""
receiver_names = [
settings.agent_settings.text_belief_extractor_name,
]
@@ -72,6 +116,12 @@ class TranscriptionAgent(BaseAgent):
await self.send(message)
async def _transcribing_loop(self) -> None:
"""
The main loop for receiving audio and triggering transcription.
Receives audio chunks from ZMQ, decodes them to float32, and calls :meth:`_transcribe`.
If speech is found, it calls :meth:`_share_transcription`.
"""
while self._running:
try:
assert self.audio_in_socket is not None

View File

@@ -15,6 +15,8 @@ class SocketPoller[T]:
"""
Convenience class for polling a socket for data with a timeout, persisting a zmq.Poller for
multiple usages.
:param T: The type of data returned by the socket.
"""
def __init__(
@@ -35,7 +37,7 @@ class SocketPoller[T]:
"""
Get data from the socket, or None if the timeout is reached.
:param timeout_ms: If given, the timeout. Otherwise, `self.timeout_ms` is used.
:param timeout_ms: If given, the timeout. Otherwise, ``self.timeout_ms`` is used.
:return: Data from the socket or None.
"""
timeout_ms = timeout_ms or self.timeout_ms
@@ -47,11 +49,27 @@ class SocketPoller[T]:
class VADAgent(BaseAgent):
"""
An agent which listens to an audio stream, does Voice Activity Detection (VAD), and sends
fragments with detected speech to other agents over ZeroMQ.
Voice Activity Detection (VAD) Agent.
This agent:
1. Receives an audio stream (via ZMQ).
2. Processes the audio using the Silero VAD model to detect speech.
3. Buffers potential speech segments.
4. Publishes valid speech fragments (containing speech plus small buffer) to a ZMQ PUB socket.
5. Instantiates and starts agents (like :class:`TranscriptionAgent`) that use this output.
:ivar audio_in_address: Address of the input audio stream.
:ivar audio_in_bind: Whether to bind or connect to the input address.
:ivar audio_out_socket: ZMQ PUB socket for sending speech fragments.
"""
def __init__(self, audio_in_address: str, audio_in_bind: bool):
"""
Initialize the VAD Agent.
:param audio_in_address: ZMQ address for input audio.
:param audio_in_bind: True if this agent should bind to the input address, False to connect.
"""
super().__init__(settings.agent_settings.vad_name)
self.audio_in_address = audio_in_address
@@ -67,6 +85,15 @@ class VADAgent(BaseAgent):
self.model = None
async def setup(self):
"""
Initialize resources.
1. Connects audio input socket.
2. Binds audio output socket (random port).
3. Loads VAD model from Torch Hub.
4. Starts the streaming loop.
5. Instantiates and starts the :class:`TranscriptionAgent` with the output address.
"""
self.logger.info("Setting up %s", self.name)
self._connect_audio_in_socket()
@@ -114,6 +141,10 @@ class VADAgent(BaseAgent):
await super().stop()
def _connect_audio_in_socket(self):
"""
Connects (or binds) the socket for listening to audio from RI.
:return:
"""
self.audio_in_socket = azmq.Context.instance().socket(zmq.SUB)
self.audio_in_socket.setsockopt_string(zmq.SUBSCRIBE, "")
if self.audio_in_bind:
@@ -123,7 +154,9 @@ class VADAgent(BaseAgent):
self.audio_in_poller = SocketPoller[bytes](self.audio_in_socket)
def _connect_audio_out_socket(self) -> int | None:
"""Returns the port bound, or None if binding failed."""
"""
Returns the port bound, or None if binding failed.
"""
try:
self.audio_out_socket = azmq.Context.instance().socket(zmq.PUB)
return self.audio_out_socket.bind_to_random_port("tcp://localhost", max_tries=100)
@@ -144,6 +177,15 @@ class VADAgent(BaseAgent):
self._ready.set()
async def _streaming_loop(self):
"""
Main loop for processing audio stream.
1. Polls for new audio chunks.
2. Passes chunk to VAD model.
3. Manages `i_since_speech` counter to determine start/end of speech.
4. Buffers speech + context.
5. Sends complete speech segment to output socket when silence is detected.
"""
await self._ready.wait()
while self._running:
assert self.audio_in_poller is not None

View File

@@ -15,6 +15,14 @@ router = APIRouter()
# DO NOT LOG INSIDE THIS FUNCTION
@router.get("/logs/stream")
async def log_stream():
"""
Server-Sent Events (SSE) endpoint for real-time log streaming.
Subscribes to the internal ZMQ logging topic and forwards log records to the client.
Allows the frontend to display live logs from the backend.
:return: A StreamingResponse yielding SSE data.
"""
context = Context.instance()
socket = context.socket(zmq.SUB)

View File

@@ -11,6 +11,14 @@ router = APIRouter()
@router.post("/message", status_code=202)
async def receive_message(message: Message, request: Request):
"""
Generic endpoint to receive text messages.
Publishes the message to the internal 'message' topic via ZMQ.
:param message: The message payload.
:param request: The FastAPI request object (used to access app state).
"""
logger.info("Received message: %s", message.message)
topic = b"message"

View File

@@ -11,8 +11,14 @@ router = APIRouter()
@router.post("/program", status_code=202)
async def receive_message(program: Program, request: Request):
"""
Receives a BehaviorProgram, pydantic checks it.
Converts it into real Phase objects.
Endpoint to upload a new Behavior Program.
Validates the program structure (phases, norms, goals) and publishes it to the internal
'program' topic. The :class:`~control_backend.agents.bdi.bdi_program_manager.BDIProgramManager`
will pick this up and update the BDI agent.
:param program: The parsed Program object.
:param request: The FastAPI request object.
"""
logger.debug("Received raw program: %s", program)

View File

@@ -17,6 +17,16 @@ router = APIRouter()
@router.post("/command", status_code=202)
async def receive_command(command: SpeechCommand, request: Request):
"""
Send a direct speech command to the robot.
Publishes the command to the internal 'command' topic. The
:class:`~control_backend.agents.actuation.robot_speech_agent.RobotSpeechAgent`
will forward this to the robot.
:param command: The speech command payload.
:param request: The FastAPI request object.
"""
# Validate and retrieve data.
SpeechCommand.model_validate(command)
topic = b"command"
@@ -29,12 +39,22 @@ async def receive_command(command: SpeechCommand, request: Request):
@router.get("/ping_check")
async def ping(request: Request):
"""
Simple HTTP ping endpoint to check if the backend is reachable.
"""
pass
@router.get("/ping_stream")
async def ping_stream(request: Request):
"""Stream live updates whenever the device state changes."""
"""
SSE endpoint for monitoring the Robot Interface connection status.
Subscribes to the internal 'ping' topic (published by the RI Communication Agent)
and yields status updates to the client.
:return: A StreamingResponse of connection status events.
"""
async def event_stream():
# Set up internal socket to receive ping updates

View File

@@ -6,4 +6,7 @@ router = APIRouter()
# TODO: implement
@router.get("/sse")
async def sse(request: Request):
"""
Placeholder for future Server-Sent Events endpoint.
"""
pass

View File

@@ -31,19 +31,30 @@ class AgentDirectory:
class BaseAgent(ABC):
"""
Abstract base class for all agents. To make a new agent, inherit from
`control_backend.agents.BaseAgent`, not this class. That ensures that a
logger is present with the correct name pattern.
Abstract base class for all agents in the system.
When subclassing, the `setup()` method needs to be overwritten. To handle
messages from other agents, overwrite the `handle_message()` method. To
send messages to other agents, use the `send()` method. To add custom
behaviors/tasks to the agent, use the `add_background_task()` method.
This class provides the foundational infrastructure for agent lifecycle management, messaging
(both intra-process and inter-process via ZMQ), and asynchronous behavior execution.
.. warning::
Do not inherit from this class directly for creating new agents. Instead, inherit from
:class:`control_backend.agents.base.BaseAgent`, which ensures proper logger configuration.
:ivar name: The unique name of the agent.
:ivar inbox: The queue for receiving internal messages.
:ivar _tasks: A set of currently running asynchronous tasks/behaviors.
:ivar _running: A boolean flag indicating if the agent is currently running.
:ivar logger: The logger instance for the agent.
"""
logger: logging.Logger
def __init__(self, name: str):
"""
Initialize the BaseAgent.
:param name: The unique identifier for this agent.
"""
self.name = name
self.inbox: asyncio.Queue[InternalMessage] = asyncio.Queue()
self._tasks: set[asyncio.Task] = set()
@@ -54,11 +65,27 @@ class BaseAgent(ABC):
@abstractmethod
async def setup(self):
"""Overwrite this to initialize resources."""
"""
Initialize agent-specific resources.
This method must be overridden by subclasses. It is called after the agent has started
and the ZMQ sockets have been initialized. Use this method to:
* Initialize connections (databases, APIs, etc.)
* Add initial behaviors using :meth:`add_behavior`
"""
pass
async def start(self):
"""Starts the agent and its loops."""
"""
Start the agent and its internal loops.
This method:
1. Sets the running state to True.
2. Initializes ZeroMQ PUB/SUB sockets for inter-process communication.
3. Calls the user-defined :meth:`setup` method.
4. Starts the inbox processing loop and the ZMQ receiver loop.
"""
self.logger.info(f"Starting agent {self.name}")
self._running = True
@@ -80,7 +107,11 @@ class BaseAgent(ABC):
self.add_behavior(self._receive_internal_zmq_loop())
async def stop(self):
"""Stops the agent."""
"""
Stop the agent.
Sets the running state to False and cancels all running background tasks.
"""
self._running = False
for task in self._tasks:
task.cancel()
@@ -88,7 +119,16 @@ class BaseAgent(ABC):
async def send(self, message: InternalMessage):
"""
Sends a message to another agent.
Send a message to another agent.
This method intelligently routes the message:
* If the target agent is in the same process (found in :class:`AgentDirectory`),
the message is put directly into its inbox.
* If the target agent is not found locally, the message is serialized and sent
via ZeroMQ to the internal publication address.
:param message: The message to send.
"""
target = AgentDirectory.get(message.to)
if target:
@@ -102,7 +142,11 @@ class BaseAgent(ABC):
self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.")
async def _process_inbox(self):
"""Default loop: equivalent to a CyclicBehaviour receiving messages."""
"""
Internal loop that processes messages from the inbox.
Reads messages from ``self.inbox`` and passes them to :meth:`handle_message`.
"""
while self._running:
msg = await self.inbox.get()
self.logger.debug(f"Received message from {msg.sender}.")
@@ -110,8 +154,11 @@ class BaseAgent(ABC):
async def _receive_internal_zmq_loop(self):
"""
Listens for internal messages sent from agents on another process via ZMQ
and puts them into the normal inbox.
Internal loop that listens for ZMQ messages.
Subscribes to ``internal/<agent_name>`` topics. When a message is received,
it is deserialized into an :class:`InternalMessage` and put into the local inbox.
This bridges the gap between inter-process ZMQ communication and the intra-process inbox.
"""
while self._running:
try:
@@ -126,15 +173,24 @@ class BaseAgent(ABC):
self.logger.exception("Could not process ZMQ message.")
async def handle_message(self, msg: InternalMessage):
"""Override this to handle incoming messages."""
"""
Handle an incoming message.
This method must be overridden by subclasses to define how the agent reacts to messages.
:param msg: The received message.
:raises NotImplementedError: If not overridden by the subclass.
"""
raise NotImplementedError
def add_behavior(self, coro: Coroutine) -> Task:
"""
Helper to add a behavior to the agent. To add asynchronous behavior to an agent, define
an `async` function and add it to the task list by calling :func:`add_behavior`
with it. This should happen in the :func:`setup` method of the agent. For an example, see:
:func:`~control_backend.agents.bdi.BDICoreAgent`.
Add a background behavior (task) to the agent.
This is the preferred way to run continuous loops or long-running tasks within an agent.
The task is tracked and will be automatically cancelled when :meth:`stop` is called.
:param coro: The coroutine to execute as a task.
"""
task = asyncio.create_task(coro)
self._tasks.add(task)

View File

@@ -3,6 +3,16 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
class ZMQSettings(BaseModel):
"""
Configuration for ZeroMQ (ZMQ) addresses used for inter-process communication.
:ivar internal_pub_address: Address for the internal PUB socket.
:ivar internal_sub_address: Address for the internal SUB socket.
:ivar ri_command_address: Address for sending commands to the Robot Interface.
:ivar ri_communication_address: Address for receiving communication from the Robot Interface.
:ivar vad_agent_address: Address for the Voice Activity Detection (VAD) agent.
"""
internal_pub_address: str = "tcp://localhost:5560"
internal_sub_address: str = "tcp://localhost:5561"
ri_command_address: str = "tcp://localhost:0000"
@@ -11,6 +21,21 @@ class ZMQSettings(BaseModel):
class AgentSettings(BaseModel):
"""
Names of the various agents in the system. These names are used for routing messages.
:ivar bdi_core_name: Name of the BDI Core Agent.
:ivar bdi_belief_collector_name: Name of the Belief Collector Agent.
:ivar bdi_program_manager_name: Name of the BDI Program Manager Agent.
:ivar text_belief_extractor_name: Name of the Text Belief Extractor Agent.
:ivar vad_name: Name of the Voice Activity Detection (VAD) Agent.
:ivar llm_name: Name of the Large Language Model (LLM) Agent.
:ivar test_name: Name of the Test Agent.
:ivar transcription_name: Name of the Transcription Agent.
:ivar ri_communication_name: Name of the RI Communication Agent.
:ivar robot_speech_name: Name of the Robot Speech Agent.
"""
# agent names
bdi_core_name: str = "bdi_core_agent"
bdi_belief_collector_name: str = "belief_collector_agent"
@@ -25,6 +50,21 @@ class AgentSettings(BaseModel):
class BehaviourSettings(BaseModel):
"""
Configuration for agent behaviors and parameters.
:ivar sleep_s: Default sleep time in seconds for loops.
:ivar comm_setup_max_retries: Maximum number of retries for setting up communication.
:ivar socket_poller_timeout_ms: Timeout in milliseconds for socket polling.
:ivar vad_prob_threshold: Probability threshold for Voice Activity Detection.
:ivar vad_initial_since_speech: Initial value for 'since speech' counter in VAD.
:ivar vad_non_speech_patience_chunks: Number of non-speech chunks to wait before speech ended.
:ivar transcription_max_concurrent_tasks: Maximum number of concurrent transcription tasks.
:ivar transcription_words_per_minute: Estimated words per minute for transcription timing.
:ivar transcription_words_per_token: Estimated words per token for transcription timing.
:ivar transcription_token_buffer: Buffer for transcription tokens.
"""
sleep_s: float = 1.0
comm_setup_max_retries: int = 5
socket_poller_timeout_ms: int = 100
@@ -42,24 +82,58 @@ class BehaviourSettings(BaseModel):
class LLMSettings(BaseModel):
"""
Configuration for the Large Language Model (LLM).
:ivar local_llm_url: URL for the local LLM API.
:ivar local_llm_model: Name of the local LLM model to use.
"""
local_llm_url: str = "http://localhost:1234/v1/chat/completions"
local_llm_model: str = "openai/gpt-oss-20b"
request_timeout_s: int = 120
local_llm_model: str = "gpt-oss"
class VADSettings(BaseModel):
"""
Configuration for Voice Activity Detection (VAD) model.
:ivar repo_or_dir: Repository or directory for the VAD model.
:ivar model_name: Name of the VAD model.
:ivar sample_rate_hz: Sample rate in Hz for the VAD model.
"""
repo_or_dir: str = "snakers4/silero-vad"
model_name: str = "silero_vad"
sample_rate_hz: int = 16000
class SpeechModelSettings(BaseModel):
"""
Configuration for speech recognition models.
:ivar mlx_model_name: Model name for MLX-based speech recognition.
:ivar openai_model_name: Model name for OpenAI-based speech recognition.
"""
# model identifiers for speech recognition
mlx_model_name: str = "mlx-community/whisper-small.en-mlx"
openai_model_name: str = "small.en"
class Settings(BaseSettings):
"""
Global application settings.
:ivar app_title: Title of the application.
:ivar ui_url: URL of the frontend UI.
:ivar zmq_settings: ZMQ configuration.
:ivar agent_settings: Agent name configuration.
:ivar behaviour_settings: Behavior configuration.
:ivar vad_settings: VAD model configuration.
:ivar speech_model_settings: Speech model configuration.
:ivar llm_settings: LLM configuration.
"""
app_title: str = "PepperPlus"
ui_url: str = "http://localhost:5173"

View File

@@ -37,6 +37,12 @@ def add_logging_level(level_name: str, level_num: int, method_name: str | None =
def setup_logging(path: str = ".logging_config.yaml") -> None:
"""
Setup logging configuration of the CB. Tries to load the logging configuration from a file,
in which we specify custom loggers, formatters, handlers, etc.
:param path:
:return:
"""
if os.path.exists(path):
with open(path) as f:
try:

View File

@@ -1,3 +1,20 @@
"""
Control Backend Main Application.
This module defines the FastAPI application that serves as the entry point for the
Control Backend. It manages the lifecycle of the entire system, including:
1. **Socket Initialization**: Setting up the internal ZeroMQ PUB/SUB proxy for agent communication.
2. **Agent Management**: Instantiating and starting all agents.
3. **API Routing**: Exposing REST endpoints for external interaction.
Lifecycle Manager
-----------------
The :func:`lifespan` context manager handles the startup and shutdown sequences:
- **Startup**: Configures logging, starts the ZMQ proxy, connects sockets, and launches agents.
- **Shutdown**: Handles graceful cleanup (though currently minimal).
"""
import contextlib
import logging
import threading
@@ -34,6 +51,12 @@ logger = logging.getLogger(__name__)
def setup_sockets():
"""
Initialize and run the internal ZeroMQ Proxy (XPUB/XSUB).
This proxy acts as the central message bus, forwarding messages published on the
internal PUB address to all subscribers on the internal SUB address.
"""
context = Context.instance()
internal_pub_socket = context.socket(zmq.XPUB)
@@ -94,7 +117,7 @@ async def lifespan(app: FastAPI):
BDICoreAgent,
{
"name": settings.agent_settings.bdi_core_name,
"asl": "src/control_backend/agents/bdi/bdi_core_agent/rules.asl",
"asl": "src/control_backend/agents/bdi/rules.asl",
},
),
"BeliefCollectorAgent": (

View File

@@ -2,10 +2,22 @@ from pydantic import BaseModel
class Belief(BaseModel):
"""
Represents a single belief in the BDI system.
:ivar name: The functor or name of the belief (e.g., 'user_said').
:ivar arguments: A list of string arguments for the belief.
:ivar replace: If True, existing beliefs with this name should be replaced by this one.
"""
name: str
arguments: list[str]
replace: bool = False
class BeliefMessage(BaseModel):
"""
A container for transporting a list of beliefs between agents.
"""
beliefs: list[Belief]

View File

@@ -3,7 +3,12 @@ from pydantic import BaseModel
class InternalMessage(BaseModel):
"""
Represents a message to an agent.
Standard message envelope for communication between agents within the Control Backend.
:ivar to: The name of the destination agent.
:ivar sender: The name of the sending agent.
:ivar body: The string payload (often a JSON-serialized model).
:ivar thread: An optional thread identifier/topic to categorize the message (e.g., 'beliefs').
"""
to: str

View File

@@ -2,6 +2,17 @@ from pydantic import BaseModel
class LLMPromptMessage(BaseModel):
"""
Payload sent from the BDI agent to the LLM agent.
Contains the user's text input along with the dynamic context (norms and goals)
that the LLM should use to generate a response.
:ivar text: The user's input text.
:ivar norms: A list of active behavioral norms.
:ivar goals: A list of active goals to pursue.
"""
text: str
norms: list[str]
goals: list[str]

View File

@@ -2,4 +2,8 @@ from pydantic import BaseModel
class Message(BaseModel):
"""
A simple generic message wrapper, typically used for simple API responses.
"""
message: str

View File

@@ -2,12 +2,29 @@ from pydantic import BaseModel
class Norm(BaseModel):
"""
Represents a behavioral norm.
:ivar id: Unique identifier.
:ivar label: Human-readable label.
:ivar norm: The actual norm text describing the behavior.
"""
id: str
label: str
norm: str
class Goal(BaseModel):
"""
Represents an objective to be achieved.
:ivar id: Unique identifier.
:ivar label: Human-readable label.
:ivar description: Detailed description of the goal.
:ivar achieved: Status flag indicating if the goal has been met.
"""
id: str
label: str
description: str
@@ -27,6 +44,16 @@ class KeywordTrigger(BaseModel):
class Phase(BaseModel):
"""
A distinct phase within a program, containing norms, goals, and triggers.
:ivar id: Unique identifier.
:ivar label: Human-readable label.
:ivar norms: List of norms active in this phase.
:ivar goals: List of goals to pursue in this phase.
:ivar triggers: List of triggers that define transitions out of this phase.
"""
id: str
label: str
norms: list[Norm]
@@ -35,4 +62,10 @@ class Phase(BaseModel):
class Program(BaseModel):
"""
Represents a complete interaction program, consisting of a sequence or set of phases.
:ivar phases: The list of phases that make up the program.
"""
phases: list[Phase]

View File

@@ -5,16 +5,34 @@ from pydantic import BaseModel
class RIEndpoint(str, Enum):
"""
Enumeration of valid endpoints for the Robot Interface (RI).
"""
SPEECH = "actuate/speech"
PING = "ping"
NEGOTIATE_PORTS = "negotiate/ports"
class RIMessage(BaseModel):
"""
Base schema for messages sent to the Robot Interface.
:ivar endpoint: The target endpoint/action on the RI.
:ivar data: The payload associated with the action.
"""
endpoint: RIEndpoint
data: Any
class SpeechCommand(RIMessage):
"""
A specific command to make the robot speak.
:ivar endpoint: Fixed to ``RIEndpoint.SPEECH``.
:ivar data: The text string to be spoken.
"""
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH)
data: str

View File

@@ -4,7 +4,7 @@ from unittest.mock import AsyncMock, MagicMock, mock_open, patch
import agentspeak
import pytest
from control_backend.agents.bdi.bdi_core_agent.bdi_core_agent import BDICoreAgent
from control_backend.agents.bdi.bdi_core_agent import BDICoreAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief, BeliefMessage