Add documentation #31
@@ -10,6 +10,17 @@ from control_backend.schemas.ri_message import SpeechCommand
|
||||
|
||||
|
||||
class RobotSpeechAgent(BaseAgent):
|
||||
"""
|
||||
This agent acts as a bridge between the control backend and the Robot Interface (RI).
|
||||
It receives speech commands from other agents or from the UI,
|
||||
and forwards them to the robot via a ZMQ PUB socket.
|
||||
|
||||
:ivar subsocket: ZMQ SUB socket for receiving external commands (e.g., from UI).
|
||||
:ivar pubsocket: ZMQ PUB socket for sending commands to the Robot Interface.
|
||||
:ivar address: Address to bind/connect the PUB socket.
|
||||
:ivar bind: Whether to bind or connect the PUB socket.
|
||||
"""
|
||||
|
||||
subsocket: zmq.Socket
|
||||
pubsocket: zmq.Socket
|
||||
address = ""
|
||||
@@ -27,7 +38,11 @@ class RobotSpeechAgent(BaseAgent):
|
||||
|
||||
async def setup(self):
|
||||
"""
|
||||
Setup the robot speech command agent
|
||||
Initialize the agent.
|
||||
|
||||
1. Sets up the PUB socket to talk to the robot.
|
||||
2. Sets up the SUB socket to listen for "command" topics (from UI/External).
|
||||
3. Starts the loop for handling ZMQ commands.
|
||||
"""
|
||||
self.logger.info("Setting up %s", self.name)
|
||||
|
||||
@@ -58,7 +73,11 @@ class RobotSpeechAgent(BaseAgent):
|
||||
|
||||
async def handle_message(self, msg: InternalMessage):
|
||||
"""
|
||||
Handle commands received from other Python agents.
|
||||
Handle commands received from other internal Python agents.
|
||||
|
||||
Validates the message as a :class:`SpeechCommand` and forwards it to the robot.
|
||||
|
||||
:param msg: The internal message containing the command.
|
||||
"""
|
||||
try:
|
||||
speech_command = SpeechCommand.model_validate_json(msg.body)
|
||||
@@ -68,7 +87,9 @@ class RobotSpeechAgent(BaseAgent):
|
||||
|
||||
async def _zmq_command_loop(self):
|
||||
"""
|
||||
Handle commands from the UI.
|
||||
Loop to handle commands received via ZMQ (e.g., from the UI).
|
||||
|
||||
Listens on the 'command' topic, validates the JSON, and forwards it to the robot.
|
||||
"""
|
||||
while self._running:
|
||||
try:
|
||||
|
||||
@@ -5,14 +5,22 @@ from control_backend.core.agent_system import BaseAgent as CoreBaseAgent
|
||||
|
||||
class BaseAgent(CoreBaseAgent):
|
||||
"""
|
||||
Base agent class for our agents to inherit from. This just ensures
|
||||
all agents have a logger.
|
||||
The primary base class for all implementation agents.
|
||||
|
||||
Inherits from :class:`control_backend.core.agent_system.BaseAgent`.
|
||||
This class ensures that every agent instance is automatically equipped with a
|
||||
properly configured ``logger``.
|
||||
|
||||
:ivar logger: A logger instance named after the agent's package and class.
|
||||
"""
|
||||
|
||||
logger: logging.Logger
|
||||
|
||||
# Whenever a subclass is initiated, give it the correct logger
|
||||
def __init_subclass__(cls, **kwargs) -> None:
|
||||
"""
|
||||
Whenever a subclass is initiated, give it the correct logger.
|
||||
:param kwargs: Keyword arguments for the subclass.
|
||||
"""
|
||||
super().__init_subclass__(**kwargs)
|
||||
|
||||
cls.logger = logging.getLogger(__package__).getChild(cls.__name__)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from .bdi_core_agent.bdi_core_agent import BDICoreAgent as BDICoreAgent
|
||||
from control_backend.agents.bdi.bdi_core_agent import BDICoreAgent as BDICoreAgent
|
||||
|
||||
from .belief_collector_agent import (
|
||||
BDIBeliefCollectorAgent as BDIBeliefCollectorAgent,
|
||||
)
|
||||
|
||||
@@ -19,6 +19,27 @@ DELIMITER = ";\n" # TODO: temporary until we support lists in AgentSpeak
|
||||
|
||||
|
||||
class BDICoreAgent(BaseAgent):
|
||||
"""
|
||||
BDI Core Agent.
|
||||
|
||||
This is the central reasoning agent of the system, powered by the **AgentSpeak(L)** language.
|
||||
It maintains a belief base (representing the state of the world) and a set of plans (rules).
|
||||
|
||||
It runs an internal BDI (Belief-Desire-Intention) cycle using the ``agentspeak`` library.
|
||||
When beliefs change (e.g., via :meth:`_apply_beliefs`), the agent evaluates its plans to
|
||||
determine the best course of action.
|
||||
|
||||
**Custom Actions:**
|
||||
It defines custom actions (like ``.reply``) that allow the AgentSpeak code to interact with
|
||||
external Python agents (e.g., querying the LLM).
|
||||
|
||||
:ivar bdi_agent: The internal AgentSpeak agent instance.
|
||||
:ivar asl_file: Path to the AgentSpeak source file (.asl).
|
||||
:ivar env: The AgentSpeak environment.
|
||||
:ivar actions: A registry of custom actions available to the AgentSpeak code.
|
||||
:ivar _wake_bdi_loop: Event used to wake up the reasoning loop when new beliefs arrive.
|
||||
"""
|
||||
|
||||
bdi_agent: agentspeak.runtime.Agent
|
||||
|
||||
def __init__(self, name: str, asl: str):
|
||||
@@ -30,6 +51,13 @@ class BDICoreAgent(BaseAgent):
|
||||
self._wake_bdi_loop = asyncio.Event()
|
||||
|
||||
async def setup(self) -> None:
|
||||
"""
|
||||
Initialize the BDI agent.
|
||||
|
||||
1. Registers custom actions (like ``.reply``).
|
||||
2. Loads the .asl source file.
|
||||
3. Starts the reasoning loop (:meth:`_bdi_loop`) in the background.
|
||||
"""
|
||||
self.logger.debug("Setup started.")
|
||||
|
||||
self._add_custom_actions()
|
||||
@@ -42,6 +70,9 @@ class BDICoreAgent(BaseAgent):
|
||||
self.logger.debug("Setup complete.")
|
||||
|
||||
async def _load_asl(self):
|
||||
"""
|
||||
Load and parse the AgentSpeak source file.
|
||||
"""
|
||||
try:
|
||||
with open(self.asl_file) as source:
|
||||
self.bdi_agent = self.env.build_agent(source, self.actions)
|
||||
@@ -51,7 +82,11 @@ class BDICoreAgent(BaseAgent):
|
||||
|
||||
async def _bdi_loop(self):
|
||||
"""
|
||||
Runs the AgentSpeak BDI loop. Efficiently checks for when the next expected work will be.
|
||||
The main BDI reasoning loop.
|
||||
|
||||
It waits for the ``_wake_bdi_loop`` event (set when beliefs change or actions complete).
|
||||
When awake, it steps through the AgentSpeak interpreter. It also handles sleeping if
|
||||
the agent has deferred intentions (deadlines).
|
||||
"""
|
||||
while self._running:
|
||||
await (
|
||||
@@ -78,7 +113,12 @@ class BDICoreAgent(BaseAgent):
|
||||
|
||||
async def handle_message(self, msg: InternalMessage):
|
||||
"""
|
||||
Route incoming messages (Beliefs or LLM responses).
|
||||
Handle incoming messages.
|
||||
|
||||
- **Beliefs**: Updates the internal belief base.
|
||||
- **LLM Responses**: Forwards the generated text to the Robot Speech Agent (actuation).
|
||||
|
||||
:param msg: The received internal message.
|
||||
"""
|
||||
self.logger.debug("Processing message from %s.", msg.sender)
|
||||
|
||||
@@ -106,6 +146,12 @@ class BDICoreAgent(BaseAgent):
|
||||
await self.send(out_msg)
|
||||
|
||||
def _apply_beliefs(self, beliefs: list[Belief]):
|
||||
"""
|
||||
Update the belief base with a list of new beliefs.
|
||||
|
||||
If ``replace=True`` is set on a belief, it removes all existing beliefs with that name
|
||||
before adding the new one.
|
||||
"""
|
||||
if not beliefs:
|
||||
return
|
||||
|
||||
@@ -115,6 +161,12 @@ class BDICoreAgent(BaseAgent):
|
||||
self._add_belief(belief.name, belief.arguments)
|
||||
|
||||
def _add_belief(self, name: str, args: Iterable[str] = []):
|
||||
"""
|
||||
Add a single belief to the BDI agent.
|
||||
|
||||
:param name: The functor/name of the belief (e.g., "user_said").
|
||||
:param args: Arguments for the belief.
|
||||
"""
|
||||
# new_args = (agentspeak.Literal(arg) for arg in args) # TODO: Eventually support multiple
|
||||
merged_args = DELIMITER.join(arg for arg in args)
|
||||
new_args = (agentspeak.Literal(merged_args),)
|
||||
@@ -11,8 +11,14 @@ from control_backend.schemas.program import Program
|
||||
|
||||
class BDIProgramManager(BaseAgent):
|
||||
"""
|
||||
Will interpret programs received from the HTTP endpoint. Extracts norms, goals, triggers and
|
||||
forwards them to the BDI as beliefs.
|
||||
BDI Program Manager Agent.
|
||||
|
||||
This agent is responsible for receiving high-level programs (sequences of instructions/goals)
|
||||
from the external HTTP API (via ZMQ) and translating them into core beliefs (norms and goals)
|
||||
for the BDI Core Agent. In the future, it will be responsible for determining when goals are
|
||||
met, and passing on new norms and goals accordingly.
|
||||
|
||||
:ivar sub_socket: The ZMQ SUB socket used to receive program updates.
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
@@ -20,6 +26,18 @@ class BDIProgramManager(BaseAgent):
|
||||
self.sub_socket = None
|
||||
|
||||
async def _send_to_bdi(self, program: Program):
|
||||
"""
|
||||
Convert a received program into BDI beliefs and send them to the BDI Core Agent.
|
||||
|
||||
Currently, it takes the **first phase** of the program and extracts:
|
||||
- **Norms**: Constraints or rules the agent must follow.
|
||||
- **Goals**: Objectives the agent must achieve.
|
||||
|
||||
These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will
|
||||
overwrite any existing norms/goals of the same name in the BDI agent.
|
||||
|
||||
:param program: The program object received from the API.
|
||||
"""
|
||||
first_phase = program.phases[0]
|
||||
norms_belief = Belief(
|
||||
name="norms",
|
||||
@@ -44,7 +62,10 @@ class BDIProgramManager(BaseAgent):
|
||||
|
||||
async def _receive_programs(self):
|
||||
"""
|
||||
Continuously receive programs from the HTTP endpoint, sent to us over ZMQ.
|
||||
Continuous loop that receives program updates from the HTTP endpoint.
|
||||
|
||||
It listens to the ``program`` topic on the internal ZMQ SUB socket.
|
||||
When a program is received, it is validated and forwarded to BDI via :meth:`_send_to_bdi`.
|
||||
"""
|
||||
while True:
|
||||
topic, body = await self.sub_socket.recv_multipart()
|
||||
@@ -58,6 +79,12 @@ class BDIProgramManager(BaseAgent):
|
||||
await self._send_to_bdi(program)
|
||||
|
||||
async def setup(self):
|
||||
"""
|
||||
Initialize the agent.
|
||||
|
||||
Connects the internal ZMQ SUB socket and subscribes to the 'program' topic.
|
||||
Starts the background behavior to receive programs.
|
||||
"""
|
||||
context = Context.instance()
|
||||
|
||||
self.sub_socket = context.socket(zmq.SUB)
|
||||
|
||||
@@ -10,14 +10,33 @@ from control_backend.schemas.belief_message import Belief, BeliefMessage
|
||||
|
||||
class BDIBeliefCollectorAgent(BaseAgent):
|
||||
"""
|
||||
Continuously collects beliefs/emotions from extractor agents and forwards a
|
||||
unified belief packet to the BDI agent.
|
||||
BDI Belief Collector Agent.
|
||||
|
||||
This agent acts as a central aggregator for beliefs derived from various sources (e.g., text,
|
||||
emotion, vision). It receives raw extracted data from other agents,
|
||||
normalizes them into valid :class:`Belief` objects, and forwards them as a unified packet to the
|
||||
BDI Core Agent.
|
||||
|
||||
It serves as a funnel to ensure the BDI agent receives a consistent stream of beliefs.
|
||||
"""
|
||||
|
||||
async def setup(self):
|
||||
"""
|
||||
Initialize the agent.
|
||||
"""
|
||||
self.logger.info("Setting up %s", self.name)
|
||||
|
||||
async def handle_message(self, msg: InternalMessage):
|
||||
"""
|
||||
Handle incoming messages from other extractor agents.
|
||||
|
||||
Routes the message to specific handlers based on the 'type' field in the JSON body.
|
||||
Supported types:
|
||||
- ``belief_extraction_text``: Handled by :meth:`_handle_belief_text`
|
||||
- ``emotion_extraction_text``: Handled by :meth:`_handle_emo_text`
|
||||
|
||||
:param msg: The received internal message.
|
||||
"""
|
||||
sender_node = msg.sender
|
||||
|
||||
# Parse JSON payload
|
||||
@@ -49,12 +68,22 @@ class BDIBeliefCollectorAgent(BaseAgent):
|
||||
|
||||
async def _handle_belief_text(self, payload: dict, origin: str):
|
||||
"""
|
||||
Expected payload:
|
||||
{
|
||||
"type": "belief_extraction_text",
|
||||
"beliefs": {"user_said": ["Can you help me?"]}
|
||||
Process text-based belief extraction payloads.
|
||||
|
||||
}
|
||||
Expected payload format::
|
||||
|
||||
{
|
||||
"type": "belief_extraction_text",
|
||||
"beliefs": {
|
||||
"user_said": ["Can you help me?"],
|
||||
"intention": ["ask_help"]
|
||||
}
|
||||
}
|
||||
|
||||
Validates and converts the dictionary items into :class:`Belief` objects.
|
||||
|
||||
:param payload: The dictionary payload containing belief data.
|
||||
:param origin: The name of the sender agent.
|
||||
"""
|
||||
beliefs = payload.get("beliefs", {})
|
||||
|
||||
@@ -90,12 +119,24 @@ class BDIBeliefCollectorAgent(BaseAgent):
|
||||
await self._send_beliefs_to_bdi(beliefs, origin=origin)
|
||||
|
||||
async def _handle_emo_text(self, payload: dict, origin: str):
|
||||
"""TODO: implement (after we have emotional recognition)"""
|
||||
"""
|
||||
Process emotion extraction payloads.
|
||||
|
||||
**TODO**: Implement this method once emotion recognition is integrated.
|
||||
|
||||
:param payload: The dictionary payload containing emotion data.
|
||||
:param origin: The name of the sender agent.
|
||||
"""
|
||||
pass
|
||||
|
||||
async def _send_beliefs_to_bdi(self, beliefs: list[Belief], origin: str | None = None):
|
||||
"""
|
||||
Sends a unified belief packet to the BDI agent.
|
||||
Send a list of aggregated beliefs to the BDI Core Agent.
|
||||
|
||||
Wraps the beliefs in a :class:`BeliefMessage` and sends it via the 'beliefs' thread.
|
||||
|
||||
:param beliefs: The list of Belief objects to send.
|
||||
:param origin: (Optional) The original source of the beliefs (unused currently).
|
||||
"""
|
||||
if not beliefs:
|
||||
return
|
||||
|
||||
@@ -6,12 +6,31 @@ from control_backend.core.config import settings
|
||||
|
||||
|
||||
class TextBeliefExtractorAgent(BaseAgent):
|
||||
"""
|
||||
Text Belief Extractor Agent.
|
||||
|
||||
This agent is responsible for processing raw text (e.g., from speech transcription) and
|
||||
extracting semantic beliefs from it.
|
||||
|
||||
In the current demonstration version, it performs a simple wrapping of the user's input
|
||||
into a ``user_said`` belief. In a full implementation, this agent would likely interact
|
||||
with an LLM or NLU engine to extract intent, entities, and other structured information.
|
||||
"""
|
||||
|
||||
async def setup(self):
|
||||
"""
|
||||
Initialize the agent and its resources.
|
||||
"""
|
||||
self.logger.info("Settting up %s.", self.name)
|
||||
# Setup LLM belief context if needed (currently demo is just passthrough)
|
||||
self.beliefs = {"mood": ["X"], "car": ["Y"]}
|
||||
|
||||
async def handle_message(self, msg: InternalMessage):
|
||||
"""
|
||||
Handle incoming messages, primarily from the Transcription Agent.
|
||||
|
||||
:param msg: The received message containing transcribed text.
|
||||
"""
|
||||
sender = msg.sender
|
||||
if sender == settings.agent_settings.transcription_name:
|
||||
self.logger.debug("Received text from transcriber: %s", msg.body)
|
||||
@@ -21,7 +40,15 @@ class TextBeliefExtractorAgent(BaseAgent):
|
||||
|
||||
async def _process_transcription_demo(self, txt: str):
|
||||
"""
|
||||
Demo version to process the transcription input to beliefs.
|
||||
Process the transcribed text and generate beliefs.
|
||||
|
||||
**Demo Implementation:**
|
||||
Currently, this method takes the raw text ``txt`` and wraps it into a belief structure:
|
||||
``user_said("txt")``.
|
||||
|
||||
This belief is then sent to the :class:`BDIBeliefCollectorAgent`.
|
||||
|
||||
:param txt: The raw transcribed text string.
|
||||
"""
|
||||
# For demo, just wrapping user text as user_said belief
|
||||
belief = {"beliefs": {"user_said": [txt]}, "type": "belief_extraction_text"}
|
||||
|
||||
@@ -12,6 +12,27 @@ from ..actuation.robot_speech_agent import RobotSpeechAgent
|
||||
|
||||
|
||||
class RICommunicationAgent(BaseAgent):
|
||||
"""
|
||||
Robot Interface (RI) Communication Agent.
|
||||
|
||||
This agent manages the high-level connection negotiation and health checking (heartbeat)
|
||||
between the Control Backend and the Robot Interface (or UI).
|
||||
|
||||
It acts as a service discovery mechanism:
|
||||
1. It initiates a handshake (negotiation) to discover where other services (like the robot
|
||||
command listener) are listening.
|
||||
2. It spawns specific agents
|
||||
(like :class:`~control_backend.agents.actuation.robot_speech_agent.RobotSpeechAgent`)
|
||||
once the connection details are established.
|
||||
3. It maintains a "ping" loop to ensure the connection remains active.
|
||||
|
||||
:ivar _address: The ZMQ address to attempt the initial connection negotiation.
|
||||
:ivar _bind: Whether to bind or connect the negotiation socket.
|
||||
:ivar _req_socket: ZMQ REQ socket for negotiation and pings.
|
||||
:ivar pub_socket: ZMQ PUB socket for internal notifications (e.g., ping status).
|
||||
:ivar connected: Boolean flag indicating active connection status.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
@@ -27,8 +48,10 @@ class RICommunicationAgent(BaseAgent):
|
||||
|
||||
async def setup(self):
|
||||
"""
|
||||
Try to set up the communication agent, we have `behaviour_settings.comm_setup_max_retries`
|
||||
retries in case we don't have a response yet.
|
||||
Initialize the agent and attempt connection.
|
||||
|
||||
Tries to negotiate connection up to ``behaviour_settings.comm_setup_max_retries`` times.
|
||||
If successful, starts the :meth:`_listen_loop`.
|
||||
"""
|
||||
self.logger.info("Setting up %s", self.name)
|
||||
|
||||
@@ -45,7 +68,7 @@ class RICommunicationAgent(BaseAgent):
|
||||
|
||||
async def _setup_sockets(self, force=False):
|
||||
"""
|
||||
Sets up request socket for communication agent.
|
||||
Initialize ZMQ sockets (REQ for negotiation, PUB for internal updates).
|
||||
"""
|
||||
# Bind request socket
|
||||
if self._req_socket is None or force:
|
||||
@@ -62,6 +85,15 @@ class RICommunicationAgent(BaseAgent):
|
||||
async def _negotiate_connection(
|
||||
self, max_retries: int = settings.behaviour_settings.comm_setup_max_retries
|
||||
):
|
||||
"""
|
||||
Perform the handshake protocol with the Robot Interface.
|
||||
|
||||
Sends a ``negotiate/ports`` request and expects a configuration response containing
|
||||
port assignments for various services (e.g., actuation).
|
||||
|
||||
:param max_retries: Number of attempts before giving up.
|
||||
:return: True if negotiation succeeded, False otherwise.
|
||||
"""
|
||||
retries = 0
|
||||
while retries < max_retries:
|
||||
if self._req_socket is None:
|
||||
@@ -122,6 +154,12 @@ class RICommunicationAgent(BaseAgent):
|
||||
return False
|
||||
|
||||
async def _handle_negotiation_response(self, received_message):
|
||||
"""
|
||||
Parse the negotiation response and initialize services.
|
||||
|
||||
Based on the response, it might re-connect the main socket or spawn new agents
|
||||
(e.g., for robot actuation).
|
||||
"""
|
||||
for port_data in received_message["data"]:
|
||||
id = port_data["id"]
|
||||
port = port_data["port"]
|
||||
@@ -151,6 +189,10 @@ class RICommunicationAgent(BaseAgent):
|
||||
self.logger.warning("Unhandled negotiation id: %s", id)
|
||||
|
||||
async def stop(self):
|
||||
"""
|
||||
Closes all sockets.
|
||||
:return:
|
||||
"""
|
||||
if self._req_socket:
|
||||
self._req_socket.close()
|
||||
if self.pub_socket:
|
||||
@@ -159,7 +201,10 @@ class RICommunicationAgent(BaseAgent):
|
||||
|
||||
async def _listen_loop(self):
|
||||
"""
|
||||
Run the listening (ping) loop indefinitely.
|
||||
Maintain the connection via a heartbeat (ping) loop.
|
||||
|
||||
Sends a ``ping`` request periodically and waits for a reply.
|
||||
If pings fail repeatedly, it triggers a disconnection handler to restart negotiation.
|
||||
"""
|
||||
while self._running:
|
||||
if not self.connected:
|
||||
@@ -217,6 +262,11 @@ class RICommunicationAgent(BaseAgent):
|
||||
raise
|
||||
|
||||
async def _handle_disconnection(self):
|
||||
"""
|
||||
Handle connection loss.
|
||||
|
||||
Notifies the UI of disconnection (via internal PUB) and attempts to restart negotiation.
|
||||
"""
|
||||
self.connected = False
|
||||
|
||||
# Tell UI we're disconnected.
|
||||
|
||||
@@ -16,9 +16,17 @@ from .llm_instructions import LLMInstructions
|
||||
|
||||
class LLMAgent(BaseAgent):
|
||||
"""
|
||||
Agent responsible for processing user text input and querying a locally
|
||||
hosted LLM for text generation. Receives messages from the BDI Core Agent
|
||||
and responds with processed LLM output.
|
||||
LLM Agent.
|
||||
|
||||
This agent is responsible for processing user text input and querying a locally
|
||||
hosted LLM for text generation. It acts as the conversational brain of the system.
|
||||
|
||||
It receives :class:`~control_backend.schemas.llm_prompt_message.LLMPromptMessage`
|
||||
payloads from the BDI Core Agent, constructs a conversation history, queries the
|
||||
LLM via HTTP, and streams the response back to the BDI agent in natural chunks
|
||||
(e.g., sentence by sentence).
|
||||
|
||||
:ivar history: A list of dictionaries representing the conversation history (Role/Content).
|
||||
"""
|
||||
|
||||
def __init__(self, name: str):
|
||||
@@ -29,6 +37,14 @@ class LLMAgent(BaseAgent):
|
||||
self.logger.info("Setting up %s.", self.name)
|
||||
|
||||
async def handle_message(self, msg: InternalMessage):
|
||||
"""
|
||||
Handle incoming messages.
|
||||
|
||||
Expects messages from :attr:`settings.agent_settings.bdi_core_name` containing
|
||||
an :class:`LLMPromptMessage` in the body.
|
||||
|
||||
:param msg: The received internal message.
|
||||
"""
|
||||
if msg.sender == settings.agent_settings.bdi_core_name:
|
||||
self.logger.debug("Processing message from BDI core.")
|
||||
try:
|
||||
@@ -40,6 +56,14 @@ class LLMAgent(BaseAgent):
|
||||
self.logger.debug("Message ignored (not from BDI core.")
|
||||
|
||||
async def _process_bdi_message(self, message: LLMPromptMessage):
|
||||
"""
|
||||
Orchestrate the LLM query and response streaming.
|
||||
|
||||
Iterates over the chunks yielded by :meth:`_query_llm` and forwards them
|
||||
individually to the BDI agent via :meth:`_send_reply`.
|
||||
|
||||
:param message: The parsed prompt message containing text, norms, and goals.
|
||||
"""
|
||||
async for chunk in self._query_llm(message.text, message.norms, message.goals):
|
||||
await self._send_reply(chunk)
|
||||
self.logger.debug(
|
||||
@@ -48,7 +72,9 @@ class LLMAgent(BaseAgent):
|
||||
|
||||
async def _send_reply(self, msg: str):
|
||||
"""
|
||||
Sends a response message back to the BDI Core Agent.
|
||||
Sends a response message (chunk) back to the BDI Core Agent.
|
||||
|
||||
:param msg: The text content of the chunk.
|
||||
"""
|
||||
reply = InternalMessage(
|
||||
to=settings.agent_settings.bdi_core_name,
|
||||
@@ -61,13 +87,18 @@ class LLMAgent(BaseAgent):
|
||||
self, prompt: str, norms: list[str], goals: list[str]
|
||||
) -> AsyncGenerator[str]:
|
||||
"""
|
||||
Sends a chat completion request to the local LLM service and streams the response by
|
||||
yielding fragments separated by punctuation like.
|
||||
Send a chat completion request to the local LLM service and stream the response.
|
||||
|
||||
It constructs the full prompt using
|
||||
:class:`~control_backend.agents.llm.llm_instructions.LLMInstructions`.
|
||||
It streams the response from the LLM and buffers tokens until a natural break (punctuation)
|
||||
is reached, then yields the chunk. This ensures that the robot speaks in complete phrases
|
||||
rather than individual tokens.
|
||||
|
||||
:param prompt: Input text prompt to pass to the LLM.
|
||||
:param norms: Norms the LLM should hold itself to.
|
||||
:param goals: Goals the LLM should achieve.
|
||||
:yield: Fragments of the LLM-generated content.
|
||||
:yield: Fragments of the LLM-generated content (e.g., sentences/phrases).
|
||||
"""
|
||||
self.history.append(
|
||||
{
|
||||
@@ -85,7 +116,7 @@ class LLMAgent(BaseAgent):
|
||||
*self.history,
|
||||
]
|
||||
|
||||
message_id = str(uuid.uuid4())
|
||||
message_id = str(uuid.uuid4()) # noqa
|
||||
|
||||
try:
|
||||
full_message = ""
|
||||
@@ -127,8 +158,14 @@ class LLMAgent(BaseAgent):
|
||||
yield "Error processing the request."
|
||||
|
||||
async def _stream_query_llm(self, messages) -> AsyncGenerator[str]:
|
||||
"""Raises httpx.HTTPError when the API gives an error."""
|
||||
async with httpx.AsyncClient(timeout=None) as client:
|
||||
"""
|
||||
Perform the raw HTTP streaming request to the LLM API.
|
||||
|
||||
:param messages: The list of message dictionaries (role/content).
|
||||
:yield: Raw text tokens (deltas) from the SSE stream.
|
||||
:raises httpx.HTTPError: If the API returns a non-200 status.
|
||||
"""
|
||||
async with httpx.AsyncClient() as client:
|
||||
async with client.stream(
|
||||
"POST",
|
||||
settings.llm_settings.local_llm_url,
|
||||
|
||||
@@ -1,7 +1,14 @@
|
||||
class LLMInstructions:
|
||||
"""
|
||||
Defines structured instructions that are sent along with each request
|
||||
to the LLM to guide its behavior (norms, goals, etc.).
|
||||
Helper class to construct the system instructions for the LLM.
|
||||
|
||||
It combines the base persona (Pepper robot) with dynamic norms and goals
|
||||
provided by the BDI system.
|
||||
|
||||
If no norms/goals are given it assumes empty lists.
|
||||
|
||||
:ivar norms: A list of behavioral norms.
|
||||
:ivar goals: A list of specific conversational goals.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
@@ -23,8 +30,16 @@ class LLMInstructions:
|
||||
|
||||
def build_developer_instruction(self) -> str:
|
||||
"""
|
||||
Builds a multi-line formatted instruction string for the LLM.
|
||||
Includes only non-empty structured fields.
|
||||
Builds the final system prompt string.
|
||||
|
||||
The prompt includes:
|
||||
1. Persona definition.
|
||||
2. Constraint on response length.
|
||||
3. Instructions on how to handle goals (reach them in order, but prioritize natural flow).
|
||||
4. The specific list of norms.
|
||||
5. The specific list of goals.
|
||||
|
||||
:return: The formatted system prompt string.
|
||||
"""
|
||||
sections = [
|
||||
"You are a Pepper robot engaging in natural human conversation.",
|
||||
|
||||
@@ -14,15 +14,28 @@ from control_backend.core.config import settings
|
||||
|
||||
|
||||
class SpeechRecognizer(abc.ABC):
|
||||
"""
|
||||
Abstract base class for speech recognition backends.
|
||||
|
||||
Provides a common interface for loading models and transcribing audio,
|
||||
as well as heuristics for estimating token counts to optimize decoding.
|
||||
|
||||
:ivar limit_output_length: If True, limits the generated text length based on audio duration.
|
||||
"""
|
||||
|
||||
def __init__(self, limit_output_length=True):
|
||||
"""
|
||||
:param limit_output_length: When `True`, the length of the generated speech will be limited
|
||||
by the length of the input audio and some heuristics.
|
||||
:param limit_output_length: When ``True``, the length of the generated speech will be
|
||||
limited by the length of the input audio and some heuristics.
|
||||
"""
|
||||
self.limit_output_length = limit_output_length
|
||||
|
||||
@abc.abstractmethod
|
||||
def load_model(self): ...
|
||||
def load_model(self):
|
||||
"""
|
||||
Load the speech recognition model into memory.
|
||||
"""
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
def recognize_speech(self, audio: np.ndarray) -> str:
|
||||
@@ -30,15 +43,17 @@ class SpeechRecognizer(abc.ABC):
|
||||
Recognize speech from the given audio sample.
|
||||
|
||||
:param audio: A full utterance sample. Audio must be 16 kHz, mono, np.float32, values in the
|
||||
range [-1.0, 1.0].
|
||||
:return: Recognized speech.
|
||||
range [-1.0, 1.0].
|
||||
:return: The recognized speech text.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def _estimate_max_tokens(audio: np.ndarray) -> int:
|
||||
"""
|
||||
Estimate the maximum length of a given audio sample in tokens. Assumes a maximum speaking
|
||||
rate of 450 words per minute (3x average), and assumes that 3 words is 4 tokens.
|
||||
Estimate the maximum length of a given audio sample in tokens.
|
||||
|
||||
Assumes a maximum speaking rate of 450 words per minute (3x average), and assumes that
|
||||
3 words is approx. 4 tokens.
|
||||
|
||||
:param audio: The audio sample (16 kHz) to use for length estimation.
|
||||
:return: The estimated length of the transcribed audio in tokens.
|
||||
@@ -51,8 +66,10 @@ class SpeechRecognizer(abc.ABC):
|
||||
|
||||
def _get_decode_options(self, audio: np.ndarray) -> dict:
|
||||
"""
|
||||
Construct decoding options for the Whisper model.
|
||||
|
||||
:param audio: The audio sample (16 kHz) to use to determine options like max decode length.
|
||||
:return: A dict that can be used to construct `whisper.DecodingOptions`.
|
||||
:return: A dict that can be used to construct ``whisper.DecodingOptions`` (or equivalent).
|
||||
"""
|
||||
options = {}
|
||||
if self.limit_output_length:
|
||||
@@ -61,7 +78,12 @@ class SpeechRecognizer(abc.ABC):
|
||||
|
||||
@staticmethod
|
||||
def best_type():
|
||||
"""Get the best type of SpeechRecognizer based on system capabilities."""
|
||||
"""
|
||||
Factory method to get the best available `SpeechRecognizer`.
|
||||
|
||||
:return: An instance of :class:`MLXWhisperSpeechRecognizer` if on macOS with Apple Silicon,
|
||||
otherwise :class:`OpenAIWhisperSpeechRecognizer`.
|
||||
"""
|
||||
if torch.mps.is_available():
|
||||
print("Choosing MLX Whisper model.")
|
||||
return MLXWhisperSpeechRecognizer()
|
||||
@@ -71,12 +93,20 @@ class SpeechRecognizer(abc.ABC):
|
||||
|
||||
|
||||
class MLXWhisperSpeechRecognizer(SpeechRecognizer):
|
||||
"""
|
||||
Speech recognizer using the MLX framework (optimized for Apple Silicon).
|
||||
"""
|
||||
|
||||
def __init__(self, limit_output_length=True):
|
||||
super().__init__(limit_output_length)
|
||||
self.was_loaded = False
|
||||
self.model_name = settings.speech_model_settings.mlx_model_name
|
||||
|
||||
def load_model(self):
|
||||
"""
|
||||
Ensures the model is downloaded and cached. MLX loads dynamically, so this
|
||||
pre-fetches the model.
|
||||
"""
|
||||
if self.was_loaded:
|
||||
return
|
||||
# There appears to be no dedicated mechanism to preload a model, but this `get_model` does
|
||||
@@ -94,11 +124,18 @@ class MLXWhisperSpeechRecognizer(SpeechRecognizer):
|
||||
|
||||
|
||||
class OpenAIWhisperSpeechRecognizer(SpeechRecognizer):
|
||||
"""
|
||||
Speech recognizer using the standard OpenAI Whisper library (PyTorch).
|
||||
"""
|
||||
|
||||
def __init__(self, limit_output_length=True):
|
||||
super().__init__(limit_output_length)
|
||||
self.model = None
|
||||
|
||||
def load_model(self):
|
||||
"""
|
||||
Loads the OpenAI Whisper model onto the available device (CUDA or CPU).
|
||||
"""
|
||||
if self.model is not None:
|
||||
return
|
||||
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
|
||||
|
||||
@@ -13,11 +13,26 @@ from .speech_recognizer import SpeechRecognizer
|
||||
|
||||
class TranscriptionAgent(BaseAgent):
|
||||
"""
|
||||
An agent which listens to audio fragments with voice, transcribes them, and sends the
|
||||
transcription to other agents.
|
||||
Transcription Agent.
|
||||
|
||||
This agent listens to audio fragments (containing speech) on a ZMQ SUB socket,
|
||||
transcribes them using the configured :class:`SpeechRecognizer`, and sends the
|
||||
resulting text to other agents (e.g., the Text Belief Extractor).
|
||||
|
||||
It uses an internal semaphore to limit the number of concurrent transcription tasks.
|
||||
|
||||
:ivar audio_in_address: The ZMQ address to receive audio from (usually from VAD Agent).
|
||||
:ivar audio_in_socket: The ZMQ SUB socket instance.
|
||||
:ivar speech_recognizer: The speech recognition engine instance.
|
||||
:ivar _concurrency: Semaphore to limit concurrent transcriptions.
|
||||
"""
|
||||
|
||||
def __init__(self, audio_in_address: str):
|
||||
"""
|
||||
Initialize the Transcription Agent.
|
||||
|
||||
:param audio_in_address: The ZMQ address of the audio source (e.g., VAD output).
|
||||
"""
|
||||
super().__init__(settings.agent_settings.transcription_name)
|
||||
|
||||
self.audio_in_address = audio_in_address
|
||||
@@ -26,6 +41,13 @@ class TranscriptionAgent(BaseAgent):
|
||||
self._concurrency = None
|
||||
|
||||
async def setup(self):
|
||||
"""
|
||||
Initialize the agent resources.
|
||||
|
||||
1. Connects to the audio input ZMQ socket.
|
||||
2. Initializes the :class:`SpeechRecognizer` (choosing the best available backend).
|
||||
3. Starts the background transcription loop.
|
||||
"""
|
||||
self.logger.info("Setting up %s", self.name)
|
||||
|
||||
self._connect_audio_in_socket()
|
||||
@@ -42,23 +64,45 @@ class TranscriptionAgent(BaseAgent):
|
||||
self.logger.info("Finished setting up %s", self.name)
|
||||
|
||||
async def stop(self):
|
||||
"""
|
||||
Stop the agent and close sockets.
|
||||
"""
|
||||
assert self.audio_in_socket is not None
|
||||
self.audio_in_socket.close()
|
||||
self.audio_in_socket = None
|
||||
return await super().stop()
|
||||
|
||||
def _connect_audio_in_socket(self):
|
||||
"""
|
||||
Helper to connect the ZMQ SUB socket for audio input.
|
||||
"""
|
||||
self.audio_in_socket = azmq.Context.instance().socket(zmq.SUB)
|
||||
self.audio_in_socket.setsockopt_string(zmq.SUBSCRIBE, "")
|
||||
self.audio_in_socket.connect(self.audio_in_address)
|
||||
|
||||
async def _transcribe(self, audio: np.ndarray) -> str:
|
||||
"""
|
||||
Run the speech recognition on the audio data.
|
||||
|
||||
This runs in a separate thread (via `asyncio.to_thread`) to avoid blocking the event loop,
|
||||
constrained by the concurrency semaphore.
|
||||
|
||||
:param audio: The audio data as a numpy array.
|
||||
:return: The transcribed text string.
|
||||
"""
|
||||
assert self._concurrency is not None and self.speech_recognizer is not None
|
||||
async with self._concurrency:
|
||||
return await asyncio.to_thread(self.speech_recognizer.recognize_speech, audio)
|
||||
|
||||
async def _share_transcription(self, transcription: str):
|
||||
"""Share a transcription to the other agents that depend on it."""
|
||||
"""
|
||||
Share a transcription to the other agents that depend on it.
|
||||
|
||||
Currently sends to:
|
||||
- :attr:`settings.agent_settings.text_belief_extractor_name`
|
||||
|
||||
:param transcription: The transcribed text.
|
||||
"""
|
||||
receiver_names = [
|
||||
settings.agent_settings.text_belief_extractor_name,
|
||||
]
|
||||
@@ -72,6 +116,12 @@ class TranscriptionAgent(BaseAgent):
|
||||
await self.send(message)
|
||||
|
||||
async def _transcribing_loop(self) -> None:
|
||||
"""
|
||||
The main loop for receiving audio and triggering transcription.
|
||||
|
||||
Receives audio chunks from ZMQ, decodes them to float32, and calls :meth:`_transcribe`.
|
||||
If speech is found, it calls :meth:`_share_transcription`.
|
||||
"""
|
||||
while self._running:
|
||||
try:
|
||||
assert self.audio_in_socket is not None
|
||||
|
||||
@@ -15,6 +15,8 @@ class SocketPoller[T]:
|
||||
"""
|
||||
Convenience class for polling a socket for data with a timeout, persisting a zmq.Poller for
|
||||
multiple usages.
|
||||
|
||||
:param T: The type of data returned by the socket.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -35,7 +37,7 @@ class SocketPoller[T]:
|
||||
"""
|
||||
Get data from the socket, or None if the timeout is reached.
|
||||
|
||||
:param timeout_ms: If given, the timeout. Otherwise, `self.timeout_ms` is used.
|
||||
:param timeout_ms: If given, the timeout. Otherwise, ``self.timeout_ms`` is used.
|
||||
:return: Data from the socket or None.
|
||||
"""
|
||||
timeout_ms = timeout_ms or self.timeout_ms
|
||||
@@ -47,11 +49,27 @@ class SocketPoller[T]:
|
||||
|
||||
class VADAgent(BaseAgent):
|
||||
"""
|
||||
An agent which listens to an audio stream, does Voice Activity Detection (VAD), and sends
|
||||
fragments with detected speech to other agents over ZeroMQ.
|
||||
Voice Activity Detection (VAD) Agent.
|
||||
|
||||
This agent:
|
||||
1. Receives an audio stream (via ZMQ).
|
||||
2. Processes the audio using the Silero VAD model to detect speech.
|
||||
3. Buffers potential speech segments.
|
||||
4. Publishes valid speech fragments (containing speech plus small buffer) to a ZMQ PUB socket.
|
||||
5. Instantiates and starts agents (like :class:`TranscriptionAgent`) that use this output.
|
||||
|
||||
:ivar audio_in_address: Address of the input audio stream.
|
||||
:ivar audio_in_bind: Whether to bind or connect to the input address.
|
||||
:ivar audio_out_socket: ZMQ PUB socket for sending speech fragments.
|
||||
"""
|
||||
|
||||
def __init__(self, audio_in_address: str, audio_in_bind: bool):
|
||||
"""
|
||||
Initialize the VAD Agent.
|
||||
|
||||
:param audio_in_address: ZMQ address for input audio.
|
||||
:param audio_in_bind: True if this agent should bind to the input address, False to connect.
|
||||
"""
|
||||
super().__init__(settings.agent_settings.vad_name)
|
||||
|
||||
self.audio_in_address = audio_in_address
|
||||
@@ -67,6 +85,15 @@ class VADAgent(BaseAgent):
|
||||
self.model = None
|
||||
|
||||
async def setup(self):
|
||||
"""
|
||||
Initialize resources.
|
||||
|
||||
1. Connects audio input socket.
|
||||
2. Binds audio output socket (random port).
|
||||
3. Loads VAD model from Torch Hub.
|
||||
4. Starts the streaming loop.
|
||||
5. Instantiates and starts the :class:`TranscriptionAgent` with the output address.
|
||||
"""
|
||||
self.logger.info("Setting up %s", self.name)
|
||||
|
||||
self._connect_audio_in_socket()
|
||||
@@ -114,6 +141,10 @@ class VADAgent(BaseAgent):
|
||||
await super().stop()
|
||||
|
||||
def _connect_audio_in_socket(self):
|
||||
"""
|
||||
Connects (or binds) the socket for listening to audio from RI.
|
||||
:return:
|
||||
"""
|
||||
self.audio_in_socket = azmq.Context.instance().socket(zmq.SUB)
|
||||
self.audio_in_socket.setsockopt_string(zmq.SUBSCRIBE, "")
|
||||
if self.audio_in_bind:
|
||||
@@ -123,7 +154,9 @@ class VADAgent(BaseAgent):
|
||||
self.audio_in_poller = SocketPoller[bytes](self.audio_in_socket)
|
||||
|
||||
def _connect_audio_out_socket(self) -> int | None:
|
||||
"""Returns the port bound, or None if binding failed."""
|
||||
"""
|
||||
Returns the port bound, or None if binding failed.
|
||||
"""
|
||||
try:
|
||||
self.audio_out_socket = azmq.Context.instance().socket(zmq.PUB)
|
||||
return self.audio_out_socket.bind_to_random_port("tcp://localhost", max_tries=100)
|
||||
@@ -144,6 +177,15 @@ class VADAgent(BaseAgent):
|
||||
self._ready.set()
|
||||
|
||||
async def _streaming_loop(self):
|
||||
"""
|
||||
Main loop for processing audio stream.
|
||||
|
||||
1. Polls for new audio chunks.
|
||||
2. Passes chunk to VAD model.
|
||||
3. Manages `i_since_speech` counter to determine start/end of speech.
|
||||
4. Buffers speech + context.
|
||||
5. Sends complete speech segment to output socket when silence is detected.
|
||||
"""
|
||||
await self._ready.wait()
|
||||
while self._running:
|
||||
assert self.audio_in_poller is not None
|
||||
|
||||
@@ -15,6 +15,14 @@ router = APIRouter()
|
||||
# DO NOT LOG INSIDE THIS FUNCTION
|
||||
@router.get("/logs/stream")
|
||||
async def log_stream():
|
||||
"""
|
||||
Server-Sent Events (SSE) endpoint for real-time log streaming.
|
||||
|
||||
Subscribes to the internal ZMQ logging topic and forwards log records to the client.
|
||||
Allows the frontend to display live logs from the backend.
|
||||
|
||||
:return: A StreamingResponse yielding SSE data.
|
||||
"""
|
||||
context = Context.instance()
|
||||
socket = context.socket(zmq.SUB)
|
||||
|
||||
|
||||
@@ -11,6 +11,14 @@ router = APIRouter()
|
||||
|
||||
@router.post("/message", status_code=202)
|
||||
async def receive_message(message: Message, request: Request):
|
||||
"""
|
||||
Generic endpoint to receive text messages.
|
||||
|
||||
Publishes the message to the internal 'message' topic via ZMQ.
|
||||
|
||||
:param message: The message payload.
|
||||
:param request: The FastAPI request object (used to access app state).
|
||||
"""
|
||||
logger.info("Received message: %s", message.message)
|
||||
|
||||
topic = b"message"
|
||||
|
||||
@@ -11,8 +11,14 @@ router = APIRouter()
|
||||
@router.post("/program", status_code=202)
|
||||
async def receive_message(program: Program, request: Request):
|
||||
"""
|
||||
Receives a BehaviorProgram, pydantic checks it.
|
||||
Converts it into real Phase objects.
|
||||
Endpoint to upload a new Behavior Program.
|
||||
|
||||
Validates the program structure (phases, norms, goals) and publishes it to the internal
|
||||
'program' topic. The :class:`~control_backend.agents.bdi.bdi_program_manager.BDIProgramManager`
|
||||
will pick this up and update the BDI agent.
|
||||
|
||||
:param program: The parsed Program object.
|
||||
:param request: The FastAPI request object.
|
||||
"""
|
||||
logger.debug("Received raw program: %s", program)
|
||||
|
||||
|
||||
@@ -17,6 +17,16 @@ router = APIRouter()
|
||||
|
||||
@router.post("/command", status_code=202)
|
||||
async def receive_command(command: SpeechCommand, request: Request):
|
||||
"""
|
||||
Send a direct speech command to the robot.
|
||||
|
||||
Publishes the command to the internal 'command' topic. The
|
||||
:class:`~control_backend.agents.actuation.robot_speech_agent.RobotSpeechAgent`
|
||||
will forward this to the robot.
|
||||
|
||||
:param command: The speech command payload.
|
||||
:param request: The FastAPI request object.
|
||||
"""
|
||||
# Validate and retrieve data.
|
||||
SpeechCommand.model_validate(command)
|
||||
topic = b"command"
|
||||
@@ -29,12 +39,22 @@ async def receive_command(command: SpeechCommand, request: Request):
|
||||
|
||||
@router.get("/ping_check")
|
||||
async def ping(request: Request):
|
||||
"""
|
||||
Simple HTTP ping endpoint to check if the backend is reachable.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@router.get("/ping_stream")
|
||||
async def ping_stream(request: Request):
|
||||
"""Stream live updates whenever the device state changes."""
|
||||
"""
|
||||
SSE endpoint for monitoring the Robot Interface connection status.
|
||||
|
||||
Subscribes to the internal 'ping' topic (published by the RI Communication Agent)
|
||||
and yields status updates to the client.
|
||||
|
||||
:return: A StreamingResponse of connection status events.
|
||||
"""
|
||||
|
||||
async def event_stream():
|
||||
# Set up internal socket to receive ping updates
|
||||
|
||||
@@ -6,4 +6,7 @@ router = APIRouter()
|
||||
# TODO: implement
|
||||
@router.get("/sse")
|
||||
async def sse(request: Request):
|
||||
"""
|
||||
Placeholder for future Server-Sent Events endpoint.
|
||||
"""
|
||||
pass
|
||||
|
||||
@@ -31,19 +31,30 @@ class AgentDirectory:
|
||||
|
||||
class BaseAgent(ABC):
|
||||
"""
|
||||
Abstract base class for all agents. To make a new agent, inherit from
|
||||
`control_backend.agents.BaseAgent`, not this class. That ensures that a
|
||||
logger is present with the correct name pattern.
|
||||
Abstract base class for all agents in the system.
|
||||
|
||||
When subclassing, the `setup()` method needs to be overwritten. To handle
|
||||
messages from other agents, overwrite the `handle_message()` method. To
|
||||
send messages to other agents, use the `send()` method. To add custom
|
||||
behaviors/tasks to the agent, use the `add_background_task()` method.
|
||||
This class provides the foundational infrastructure for agent lifecycle management, messaging
|
||||
(both intra-process and inter-process via ZMQ), and asynchronous behavior execution.
|
||||
|
||||
.. warning::
|
||||
Do not inherit from this class directly for creating new agents. Instead, inherit from
|
||||
:class:`control_backend.agents.base.BaseAgent`, which ensures proper logger configuration.
|
||||
|
||||
:ivar name: The unique name of the agent.
|
||||
:ivar inbox: The queue for receiving internal messages.
|
||||
:ivar _tasks: A set of currently running asynchronous tasks/behaviors.
|
||||
:ivar _running: A boolean flag indicating if the agent is currently running.
|
||||
:ivar logger: The logger instance for the agent.
|
||||
"""
|
||||
|
||||
logger: logging.Logger
|
||||
|
||||
def __init__(self, name: str):
|
||||
"""
|
||||
Initialize the BaseAgent.
|
||||
|
||||
:param name: The unique identifier for this agent.
|
||||
"""
|
||||
self.name = name
|
||||
self.inbox: asyncio.Queue[InternalMessage] = asyncio.Queue()
|
||||
self._tasks: set[asyncio.Task] = set()
|
||||
@@ -54,11 +65,27 @@ class BaseAgent(ABC):
|
||||
|
||||
@abstractmethod
|
||||
async def setup(self):
|
||||
"""Overwrite this to initialize resources."""
|
||||
"""
|
||||
Initialize agent-specific resources.
|
||||
|
||||
This method must be overridden by subclasses. It is called after the agent has started
|
||||
and the ZMQ sockets have been initialized. Use this method to:
|
||||
|
||||
* Initialize connections (databases, APIs, etc.)
|
||||
* Add initial behaviors using :meth:`add_behavior`
|
||||
"""
|
||||
pass
|
||||
|
||||
async def start(self):
|
||||
"""Starts the agent and its loops."""
|
||||
"""
|
||||
Start the agent and its internal loops.
|
||||
|
||||
This method:
|
||||
1. Sets the running state to True.
|
||||
2. Initializes ZeroMQ PUB/SUB sockets for inter-process communication.
|
||||
3. Calls the user-defined :meth:`setup` method.
|
||||
4. Starts the inbox processing loop and the ZMQ receiver loop.
|
||||
"""
|
||||
self.logger.info(f"Starting agent {self.name}")
|
||||
self._running = True
|
||||
|
||||
@@ -80,7 +107,11 @@ class BaseAgent(ABC):
|
||||
self.add_behavior(self._receive_internal_zmq_loop())
|
||||
|
||||
async def stop(self):
|
||||
"""Stops the agent."""
|
||||
"""
|
||||
Stop the agent.
|
||||
|
||||
Sets the running state to False and cancels all running background tasks.
|
||||
"""
|
||||
self._running = False
|
||||
for task in self._tasks:
|
||||
task.cancel()
|
||||
@@ -88,7 +119,16 @@ class BaseAgent(ABC):
|
||||
|
||||
async def send(self, message: InternalMessage):
|
||||
"""
|
||||
Sends a message to another agent.
|
||||
Send a message to another agent.
|
||||
|
||||
This method intelligently routes the message:
|
||||
|
||||
* If the target agent is in the same process (found in :class:`AgentDirectory`),
|
||||
the message is put directly into its inbox.
|
||||
* If the target agent is not found locally, the message is serialized and sent
|
||||
via ZeroMQ to the internal publication address.
|
||||
|
||||
:param message: The message to send.
|
||||
"""
|
||||
target = AgentDirectory.get(message.to)
|
||||
if target:
|
||||
@@ -102,7 +142,11 @@ class BaseAgent(ABC):
|
||||
self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.")
|
||||
|
||||
async def _process_inbox(self):
|
||||
"""Default loop: equivalent to a CyclicBehaviour receiving messages."""
|
||||
"""
|
||||
Internal loop that processes messages from the inbox.
|
||||
|
||||
Reads messages from ``self.inbox`` and passes them to :meth:`handle_message`.
|
||||
"""
|
||||
while self._running:
|
||||
msg = await self.inbox.get()
|
||||
self.logger.debug(f"Received message from {msg.sender}.")
|
||||
@@ -110,8 +154,11 @@ class BaseAgent(ABC):
|
||||
|
||||
async def _receive_internal_zmq_loop(self):
|
||||
"""
|
||||
Listens for internal messages sent from agents on another process via ZMQ
|
||||
and puts them into the normal inbox.
|
||||
Internal loop that listens for ZMQ messages.
|
||||
|
||||
Subscribes to ``internal/<agent_name>`` topics. When a message is received,
|
||||
it is deserialized into an :class:`InternalMessage` and put into the local inbox.
|
||||
This bridges the gap between inter-process ZMQ communication and the intra-process inbox.
|
||||
"""
|
||||
while self._running:
|
||||
try:
|
||||
@@ -126,15 +173,24 @@ class BaseAgent(ABC):
|
||||
self.logger.exception("Could not process ZMQ message.")
|
||||
|
||||
async def handle_message(self, msg: InternalMessage):
|
||||
"""Override this to handle incoming messages."""
|
||||
"""
|
||||
Handle an incoming message.
|
||||
|
||||
This method must be overridden by subclasses to define how the agent reacts to messages.
|
||||
|
||||
:param msg: The received message.
|
||||
:raises NotImplementedError: If not overridden by the subclass.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def add_behavior(self, coro: Coroutine) -> Task:
|
||||
"""
|
||||
Helper to add a behavior to the agent. To add asynchronous behavior to an agent, define
|
||||
an `async` function and add it to the task list by calling :func:`add_behavior`
|
||||
with it. This should happen in the :func:`setup` method of the agent. For an example, see:
|
||||
:func:`~control_backend.agents.bdi.BDICoreAgent`.
|
||||
Add a background behavior (task) to the agent.
|
||||
|
||||
This is the preferred way to run continuous loops or long-running tasks within an agent.
|
||||
The task is tracked and will be automatically cancelled when :meth:`stop` is called.
|
||||
|
||||
:param coro: The coroutine to execute as a task.
|
||||
"""
|
||||
task = asyncio.create_task(coro)
|
||||
self._tasks.add(task)
|
||||
|
||||
@@ -3,6 +3,16 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
|
||||
class ZMQSettings(BaseModel):
|
||||
"""
|
||||
Configuration for ZeroMQ (ZMQ) addresses used for inter-process communication.
|
||||
|
||||
:ivar internal_pub_address: Address for the internal PUB socket.
|
||||
:ivar internal_sub_address: Address for the internal SUB socket.
|
||||
:ivar ri_command_address: Address for sending commands to the Robot Interface.
|
||||
:ivar ri_communication_address: Address for receiving communication from the Robot Interface.
|
||||
:ivar vad_agent_address: Address for the Voice Activity Detection (VAD) agent.
|
||||
"""
|
||||
|
||||
internal_pub_address: str = "tcp://localhost:5560"
|
||||
internal_sub_address: str = "tcp://localhost:5561"
|
||||
ri_command_address: str = "tcp://localhost:0000"
|
||||
@@ -11,6 +21,21 @@ class ZMQSettings(BaseModel):
|
||||
|
||||
|
||||
class AgentSettings(BaseModel):
|
||||
"""
|
||||
Names of the various agents in the system. These names are used for routing messages.
|
||||
|
||||
:ivar bdi_core_name: Name of the BDI Core Agent.
|
||||
:ivar bdi_belief_collector_name: Name of the Belief Collector Agent.
|
||||
:ivar bdi_program_manager_name: Name of the BDI Program Manager Agent.
|
||||
:ivar text_belief_extractor_name: Name of the Text Belief Extractor Agent.
|
||||
:ivar vad_name: Name of the Voice Activity Detection (VAD) Agent.
|
||||
:ivar llm_name: Name of the Large Language Model (LLM) Agent.
|
||||
:ivar test_name: Name of the Test Agent.
|
||||
:ivar transcription_name: Name of the Transcription Agent.
|
||||
:ivar ri_communication_name: Name of the RI Communication Agent.
|
||||
:ivar robot_speech_name: Name of the Robot Speech Agent.
|
||||
"""
|
||||
|
||||
# agent names
|
||||
bdi_core_name: str = "bdi_core_agent"
|
||||
bdi_belief_collector_name: str = "belief_collector_agent"
|
||||
@@ -25,6 +50,21 @@ class AgentSettings(BaseModel):
|
||||
|
||||
|
||||
class BehaviourSettings(BaseModel):
|
||||
"""
|
||||
Configuration for agent behaviors and parameters.
|
||||
|
||||
:ivar sleep_s: Default sleep time in seconds for loops.
|
||||
:ivar comm_setup_max_retries: Maximum number of retries for setting up communication.
|
||||
:ivar socket_poller_timeout_ms: Timeout in milliseconds for socket polling.
|
||||
:ivar vad_prob_threshold: Probability threshold for Voice Activity Detection.
|
||||
:ivar vad_initial_since_speech: Initial value for 'since speech' counter in VAD.
|
||||
:ivar vad_non_speech_patience_chunks: Number of non-speech chunks to wait before speech ended.
|
||||
:ivar transcription_max_concurrent_tasks: Maximum number of concurrent transcription tasks.
|
||||
:ivar transcription_words_per_minute: Estimated words per minute for transcription timing.
|
||||
:ivar transcription_words_per_token: Estimated words per token for transcription timing.
|
||||
:ivar transcription_token_buffer: Buffer for transcription tokens.
|
||||
"""
|
||||
|
||||
sleep_s: float = 1.0
|
||||
comm_setup_max_retries: int = 5
|
||||
socket_poller_timeout_ms: int = 100
|
||||
@@ -42,24 +82,58 @@ class BehaviourSettings(BaseModel):
|
||||
|
||||
|
||||
class LLMSettings(BaseModel):
|
||||
"""
|
||||
Configuration for the Large Language Model (LLM).
|
||||
|
||||
:ivar local_llm_url: URL for the local LLM API.
|
||||
:ivar local_llm_model: Name of the local LLM model to use.
|
||||
"""
|
||||
|
||||
local_llm_url: str = "http://localhost:1234/v1/chat/completions"
|
||||
local_llm_model: str = "openai/gpt-oss-20b"
|
||||
request_timeout_s: int = 120
|
||||
local_llm_model: str = "gpt-oss"
|
||||
|
||||
|
||||
class VADSettings(BaseModel):
|
||||
"""
|
||||
Configuration for Voice Activity Detection (VAD) model.
|
||||
|
||||
:ivar repo_or_dir: Repository or directory for the VAD model.
|
||||
:ivar model_name: Name of the VAD model.
|
||||
:ivar sample_rate_hz: Sample rate in Hz for the VAD model.
|
||||
"""
|
||||
|
||||
repo_or_dir: str = "snakers4/silero-vad"
|
||||
model_name: str = "silero_vad"
|
||||
sample_rate_hz: int = 16000
|
||||
|
||||
|
||||
class SpeechModelSettings(BaseModel):
|
||||
"""
|
||||
Configuration for speech recognition models.
|
||||
|
||||
:ivar mlx_model_name: Model name for MLX-based speech recognition.
|
||||
:ivar openai_model_name: Model name for OpenAI-based speech recognition.
|
||||
"""
|
||||
|
||||
# model identifiers for speech recognition
|
||||
mlx_model_name: str = "mlx-community/whisper-small.en-mlx"
|
||||
openai_model_name: str = "small.en"
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
"""
|
||||
Global application settings.
|
||||
|
||||
:ivar app_title: Title of the application.
|
||||
:ivar ui_url: URL of the frontend UI.
|
||||
:ivar zmq_settings: ZMQ configuration.
|
||||
:ivar agent_settings: Agent name configuration.
|
||||
:ivar behaviour_settings: Behavior configuration.
|
||||
:ivar vad_settings: VAD model configuration.
|
||||
:ivar speech_model_settings: Speech model configuration.
|
||||
:ivar llm_settings: LLM configuration.
|
||||
"""
|
||||
|
||||
app_title: str = "PepperPlus"
|
||||
|
||||
ui_url: str = "http://localhost:5173"
|
||||
|
||||
@@ -37,6 +37,12 @@ def add_logging_level(level_name: str, level_num: int, method_name: str | None =
|
||||
|
||||
|
||||
def setup_logging(path: str = ".logging_config.yaml") -> None:
|
||||
"""
|
||||
Setup logging configuration of the CB. Tries to load the logging configuration from a file,
|
||||
in which we specify custom loggers, formatters, handlers, etc.
|
||||
:param path:
|
||||
:return:
|
||||
"""
|
||||
if os.path.exists(path):
|
||||
with open(path) as f:
|
||||
try:
|
||||
|
||||
@@ -1,3 +1,20 @@
|
||||
"""
|
||||
Control Backend Main Application.
|
||||
|
||||
This module defines the FastAPI application that serves as the entry point for the
|
||||
Control Backend. It manages the lifecycle of the entire system, including:
|
||||
|
||||
1. **Socket Initialization**: Setting up the internal ZeroMQ PUB/SUB proxy for agent communication.
|
||||
2. **Agent Management**: Instantiating and starting all agents.
|
||||
3. **API Routing**: Exposing REST endpoints for external interaction.
|
||||
|
||||
Lifecycle Manager
|
||||
-----------------
|
||||
The :func:`lifespan` context manager handles the startup and shutdown sequences:
|
||||
- **Startup**: Configures logging, starts the ZMQ proxy, connects sockets, and launches agents.
|
||||
- **Shutdown**: Handles graceful cleanup (though currently minimal).
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import logging
|
||||
import threading
|
||||
@@ -34,6 +51,12 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def setup_sockets():
|
||||
"""
|
||||
Initialize and run the internal ZeroMQ Proxy (XPUB/XSUB).
|
||||
|
||||
This proxy acts as the central message bus, forwarding messages published on the
|
||||
internal PUB address to all subscribers on the internal SUB address.
|
||||
"""
|
||||
context = Context.instance()
|
||||
|
||||
internal_pub_socket = context.socket(zmq.XPUB)
|
||||
@@ -94,7 +117,7 @@ async def lifespan(app: FastAPI):
|
||||
BDICoreAgent,
|
||||
{
|
||||
"name": settings.agent_settings.bdi_core_name,
|
||||
"asl": "src/control_backend/agents/bdi/bdi_core_agent/rules.asl",
|
||||
"asl": "src/control_backend/agents/bdi/rules.asl",
|
||||
},
|
||||
),
|
||||
"BeliefCollectorAgent": (
|
||||
|
||||
@@ -2,10 +2,22 @@ from pydantic import BaseModel
|
||||
|
||||
|
||||
class Belief(BaseModel):
|
||||
"""
|
||||
Represents a single belief in the BDI system.
|
||||
|
||||
:ivar name: The functor or name of the belief (e.g., 'user_said').
|
||||
:ivar arguments: A list of string arguments for the belief.
|
||||
:ivar replace: If True, existing beliefs with this name should be replaced by this one.
|
||||
"""
|
||||
|
||||
name: str
|
||||
arguments: list[str]
|
||||
replace: bool = False
|
||||
|
||||
|
||||
class BeliefMessage(BaseModel):
|
||||
"""
|
||||
A container for transporting a list of beliefs between agents.
|
||||
"""
|
||||
|
||||
beliefs: list[Belief]
|
||||
|
||||
@@ -3,7 +3,12 @@ from pydantic import BaseModel
|
||||
|
||||
class InternalMessage(BaseModel):
|
||||
"""
|
||||
Represents a message to an agent.
|
||||
Standard message envelope for communication between agents within the Control Backend.
|
||||
|
||||
:ivar to: The name of the destination agent.
|
||||
:ivar sender: The name of the sending agent.
|
||||
:ivar body: The string payload (often a JSON-serialized model).
|
||||
:ivar thread: An optional thread identifier/topic to categorize the message (e.g., 'beliefs').
|
||||
"""
|
||||
|
||||
to: str
|
||||
|
||||
@@ -2,6 +2,17 @@ from pydantic import BaseModel
|
||||
|
||||
|
||||
class LLMPromptMessage(BaseModel):
|
||||
"""
|
||||
Payload sent from the BDI agent to the LLM agent.
|
||||
|
||||
Contains the user's text input along with the dynamic context (norms and goals)
|
||||
that the LLM should use to generate a response.
|
||||
|
||||
:ivar text: The user's input text.
|
||||
:ivar norms: A list of active behavioral norms.
|
||||
:ivar goals: A list of active goals to pursue.
|
||||
"""
|
||||
|
||||
text: str
|
||||
norms: list[str]
|
||||
goals: list[str]
|
||||
|
||||
@@ -2,4 +2,8 @@ from pydantic import BaseModel
|
||||
|
||||
|
||||
class Message(BaseModel):
|
||||
"""
|
||||
A simple generic message wrapper, typically used for simple API responses.
|
||||
"""
|
||||
|
||||
message: str
|
||||
|
||||
@@ -2,12 +2,29 @@ from pydantic import BaseModel
|
||||
|
||||
|
||||
class Norm(BaseModel):
|
||||
"""
|
||||
Represents a behavioral norm.
|
||||
|
||||
:ivar id: Unique identifier.
|
||||
:ivar label: Human-readable label.
|
||||
:ivar norm: The actual norm text describing the behavior.
|
||||
"""
|
||||
|
||||
id: str
|
||||
label: str
|
||||
norm: str
|
||||
|
||||
|
||||
class Goal(BaseModel):
|
||||
"""
|
||||
Represents an objective to be achieved.
|
||||
|
||||
:ivar id: Unique identifier.
|
||||
:ivar label: Human-readable label.
|
||||
:ivar description: Detailed description of the goal.
|
||||
:ivar achieved: Status flag indicating if the goal has been met.
|
||||
"""
|
||||
|
||||
id: str
|
||||
label: str
|
||||
description: str
|
||||
@@ -27,6 +44,16 @@ class KeywordTrigger(BaseModel):
|
||||
|
||||
|
||||
class Phase(BaseModel):
|
||||
"""
|
||||
A distinct phase within a program, containing norms, goals, and triggers.
|
||||
|
||||
:ivar id: Unique identifier.
|
||||
:ivar label: Human-readable label.
|
||||
:ivar norms: List of norms active in this phase.
|
||||
:ivar goals: List of goals to pursue in this phase.
|
||||
:ivar triggers: List of triggers that define transitions out of this phase.
|
||||
"""
|
||||
|
||||
id: str
|
||||
label: str
|
||||
norms: list[Norm]
|
||||
@@ -35,4 +62,10 @@ class Phase(BaseModel):
|
||||
|
||||
|
||||
class Program(BaseModel):
|
||||
"""
|
||||
Represents a complete interaction program, consisting of a sequence or set of phases.
|
||||
|
||||
:ivar phases: The list of phases that make up the program.
|
||||
"""
|
||||
|
||||
phases: list[Phase]
|
||||
|
||||
@@ -5,16 +5,34 @@ from pydantic import BaseModel
|
||||
|
||||
|
||||
class RIEndpoint(str, Enum):
|
||||
"""
|
||||
Enumeration of valid endpoints for the Robot Interface (RI).
|
||||
"""
|
||||
|
||||
SPEECH = "actuate/speech"
|
||||
PING = "ping"
|
||||
NEGOTIATE_PORTS = "negotiate/ports"
|
||||
|
||||
|
||||
class RIMessage(BaseModel):
|
||||
"""
|
||||
Base schema for messages sent to the Robot Interface.
|
||||
|
||||
:ivar endpoint: The target endpoint/action on the RI.
|
||||
:ivar data: The payload associated with the action.
|
||||
"""
|
||||
|
||||
endpoint: RIEndpoint
|
||||
data: Any
|
||||
|
||||
|
||||
class SpeechCommand(RIMessage):
|
||||
"""
|
||||
A specific command to make the robot speak.
|
||||
|
||||
:ivar endpoint: Fixed to ``RIEndpoint.SPEECH``.
|
||||
:ivar data: The text string to be spoken.
|
||||
"""
|
||||
|
||||
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH)
|
||||
data: str
|
||||
|
||||
@@ -4,7 +4,7 @@ from unittest.mock import AsyncMock, MagicMock, mock_open, patch
|
||||
import agentspeak
|
||||
import pytest
|
||||
|
||||
from control_backend.agents.bdi.bdi_core_agent.bdi_core_agent import BDICoreAgent
|
||||
from control_backend.agents.bdi.bdi_core_agent import BDICoreAgent
|
||||
from control_backend.core.agent_system import InternalMessage
|
||||
from control_backend.core.config import settings
|
||||
from control_backend.schemas.belief_message import Belief, BeliefMessage
|
||||
|
||||
Reference in New Issue
Block a user