Merge remote-tracking branch 'origin/dev' into feat/agentspeak-generation

This commit is contained in:
Twirre Meulenbelt
2025-12-17 13:20:14 +01:00
34 changed files with 2172 additions and 81 deletions

View File

@@ -1 +1,2 @@
from .robot_gesture_agent import RobotGestureAgent as RobotGestureAgent
from .robot_speech_agent import RobotSpeechAgent as RobotSpeechAgent

View File

@@ -0,0 +1,162 @@
import json
import zmq
import zmq.asyncio as azmq
from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint
class RobotGestureAgent(BaseAgent):
"""
This agent acts as a bridge between the control backend and the Robot Interface (RI).
It receives gesture commands from other agents or from the UI,
and forwards them to the robot via a ZMQ PUB socket.
:ivar subsocket: ZMQ SUB socket for receiving external commands (e.g., from UI).
:ivar pubsocket: ZMQ PUB socket for sending commands to the Robot Interface.
:ivar address: Address to bind/connect the PUB socket.
:ivar bind: Whether to bind or connect the PUB socket.
:ivar gesture_data: A list of strings for available gestures
"""
subsocket: azmq.Socket
repsocket: azmq.Socket
pubsocket: azmq.Socket
address = ""
bind = False
gesture_data = []
def __init__(
self,
name: str,
address=settings.zmq_settings.ri_command_address,
bind=False,
gesture_data=None,
):
self.gesture_data = gesture_data or []
super().__init__(name)
self.address = address
self.bind = bind
async def setup(self):
"""
Initialize the agent.
1. Sets up the PUB socket to talk to the robot.
2. Sets up the SUB socket to listen for "command" topics (from UI/External).
3. Starts the loop for handling ZMQ commands.
"""
self.logger.info("Setting up %s", self.name)
context = azmq.Context.instance()
# To the robot
self.pubsocket = context.socket(zmq.PUB)
if self.bind:
self.pubsocket.bind(self.address)
else:
self.pubsocket.connect(self.address)
# Receive internal topics regarding commands
self.subsocket = context.socket(zmq.SUB)
self.subsocket.connect(settings.zmq_settings.internal_sub_address)
self.subsocket.setsockopt(zmq.SUBSCRIBE, b"command")
self.subsocket.setsockopt(zmq.SUBSCRIBE, b"send_gestures")
# REP socket for replying to gesture requests
self.repsocket = context.socket(zmq.REP)
self.repsocket.bind(settings.zmq_settings.internal_gesture_rep_adress)
self.add_behavior(self._zmq_command_loop())
self.add_behavior(self._fetch_gestures_loop())
self.logger.info("Finished setting up %s", self.name)
async def stop(self):
if self.subsocket:
self.subsocket.close()
if self.pubsocket:
self.pubsocket.close()
await super().stop()
async def handle_message(self, msg: InternalMessage):
"""
Handle commands received from other internal Python agents.
Validates the message as a :class:`GestureCommand` and forwards it to the robot.
:param msg: The internal message containing the command.
"""
try:
gesture_command = GestureCommand.model_validate_json(msg.body)
if gesture_command.endpoint == RIEndpoint.GESTURE_TAG:
if gesture_command.data not in self.gesture_data:
self.logger.warning(
"Received gesture tag '%s' which is not in available tags. Early returning",
gesture_command.data,
)
return
await self.pubsocket.send_json(gesture_command.model_dump())
except Exception:
self.logger.exception("Error processing internal message.")
async def _zmq_command_loop(self):
"""
Loop to handle commands received via ZMQ (e.g., from the UI).
Listens on the 'command' topic, validates the JSON and forwards it to the robot.
"""
while self._running:
try:
topic, body = await self.subsocket.recv_multipart()
# Don't process send_gestures here
if topic != b"command":
continue
body = json.loads(body)
gesture_command = GestureCommand.model_validate(body)
if gesture_command.endpoint == RIEndpoint.GESTURE_TAG:
if gesture_command.data not in self.gesture_data:
self.logger.warning(
"Received gesture tag '%s' which is not in available tags.\
Early returning",
gesture_command.data,
)
continue
await self.pubsocket.send_json(gesture_command.model_dump())
except Exception:
self.logger.exception("Error processing ZMQ message.")
async def _fetch_gestures_loop(self):
"""
Loop to handle fetching gestures received via ZMQ (e.g., from the UI).
Listens on the 'send_gestures' topic, and returns a list on the get_gestures topic.
"""
while self._running:
try:
# Get a request
body = await self.repsocket.recv()
# Figure out amount, if specified
try:
body = json.loads(body)
except json.JSONDecodeError:
body = None
amount = None
if isinstance(body, int):
amount = body
# Fetch tags from gesture data and respond
tags = self.gesture_data[:amount] if amount else self.gesture_data
response = json.dumps({"tags": tags}).encode()
await self.repsocket.send(response)
except Exception:
self.logger.exception("Error fetching gesture tags.")

View File

@@ -29,7 +29,7 @@ class RobotSpeechAgent(BaseAgent):
def __init__(
self,
name: str,
address=settings.zmq_settings.ri_command_address,
address: str,
bind=False,
):
super().__init__(name)

View File

@@ -6,9 +6,11 @@ import zmq.asyncio as azmq
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
from control_backend.core.config import settings
from ..actuation.robot_speech_agent import RobotSpeechAgent
from ..perception import VADAgent
class RICommunicationAgent(BaseAgent):
@@ -179,12 +181,24 @@ class RICommunicationAgent(BaseAgent):
else:
self._req_socket.bind(addr)
case "actuation":
ri_commands_agent = RobotSpeechAgent(
gesture_data = port_data.get("gestures", [])
robot_speech_agent = RobotSpeechAgent(
settings.agent_settings.robot_speech_name,
address=addr,
bind=bind,
)
await ri_commands_agent.start()
robot_gesture_agent = RobotGestureAgent(
settings.agent_settings.robot_gesture_name,
address=addr,
bind=bind,
gesture_data=gesture_data,
)
await robot_speech_agent.start()
await asyncio.sleep(0.1) # Small delay
await robot_gesture_agent.start()
case "audio":
vad_agent = VADAgent(audio_in_address=addr, audio_in_bind=bind)
await vad_agent.start()
case _:
self.logger.warning("Unhandled negotiation id: %s", id)

View File

@@ -125,7 +125,7 @@ class LLMAgent(BaseAgent):
full_message += token
current_chunk += token
self.logger.info(
self.logger.llm(
"Received token: %s",
full_message,
extra={"reference": message_id}, # Used in the UI to update old logs

View File

@@ -8,6 +8,7 @@ import zmq.asyncio as azmq
from control_backend.agents import BaseAgent
from control_backend.core.config import settings
from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus
from .transcription_agent.transcription_agent import TranscriptionAgent
@@ -61,6 +62,7 @@ class VADAgent(BaseAgent):
:ivar audio_in_address: Address of the input audio stream.
:ivar audio_in_bind: Whether to bind or connect to the input address.
:ivar audio_out_socket: ZMQ PUB socket for sending speech fragments.
:ivar program_sub_socket: ZMQ SUB socket for receiving program status updates.
"""
def __init__(self, audio_in_address: str, audio_in_bind: bool):
@@ -79,6 +81,8 @@ class VADAgent(BaseAgent):
self.audio_out_socket: azmq.Socket | None = None
self.audio_in_poller: SocketPoller | None = None
self.program_sub_socket: azmq.Socket | None = None
self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
self._ready = asyncio.Event()
@@ -90,9 +94,10 @@ class VADAgent(BaseAgent):
1. Connects audio input socket.
2. Binds audio output socket (random port).
3. Loads VAD model from Torch Hub.
4. Starts the streaming loop.
5. Instantiates and starts the :class:`TranscriptionAgent` with the output address.
3. Connects to program communication socket.
4. Loads VAD model from Torch Hub.
5. Starts the streaming loop.
6. Instantiates and starts the :class:`TranscriptionAgent` with the output address.
"""
self.logger.info("Setting up %s", self.name)
@@ -105,6 +110,11 @@ class VADAgent(BaseAgent):
return
audio_out_address = f"tcp://localhost:{audio_out_port}"
# Connect to internal communication socket
self.program_sub_socket = azmq.Context.instance().socket(zmq.SUB)
self.program_sub_socket.connect(settings.zmq_settings.internal_sub_address)
self.program_sub_socket.subscribe(PROGRAM_STATUS)
# Initialize VAD model
try:
self.model, _ = torch.hub.load(
@@ -117,10 +127,8 @@ class VADAgent(BaseAgent):
await self.stop()
return
# Warmup/reset
await self.reset_stream()
self.add_behavior(self._streaming_loop())
self.add_behavior(self._status_loop())
# Start agents dependent on the output audio fragments here
transcriber = TranscriptionAgent(audio_out_address)
@@ -165,7 +173,7 @@ class VADAgent(BaseAgent):
self.audio_out_socket = None
return None
async def reset_stream(self):
async def _reset_stream(self):
"""
Clears the ZeroMQ queue and sets ready state.
"""
@@ -176,6 +184,23 @@ class VADAgent(BaseAgent):
self.logger.info(f"Discarded {discarded} audio packets before starting.")
self._ready.set()
async def _status_loop(self):
"""Loop for checking program status. Only start listening if program is RUNNING."""
while self._running:
topic, body = await self.program_sub_socket.recv_multipart()
if topic != PROGRAM_STATUS:
continue
if body != ProgramStatus.RUNNING.value:
continue
# Program is now running, we can start our stream
await self._reset_stream()
# We don't care about further status updates
self.program_sub_socket.close()
break
async def _streaming_loop(self):
"""
Main loop for processing audio stream.

View File

@@ -8,15 +8,15 @@ from fastapi.responses import StreamingResponse
from zmq.asyncio import Context, Socket
from control_backend.core.config import settings
from control_backend.schemas.ri_message import SpeechCommand
from control_backend.schemas.ri_message import GestureCommand, SpeechCommand
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/command", status_code=202)
async def receive_command(command: SpeechCommand, request: Request):
@router.post("/command/speech", status_code=202)
async def receive_command_speech(command: SpeechCommand, request: Request):
"""
Send a direct speech command to the robot.
@@ -27,14 +27,32 @@ async def receive_command(command: SpeechCommand, request: Request):
:param command: The speech command payload.
:param request: The FastAPI request object.
"""
# Validate and retrieve data.
SpeechCommand.model_validate(command)
topic = b"command"
pub_socket: Socket = request.app.state.endpoints_pub_socket
await pub_socket.send_multipart([topic, command.model_dump_json().encode()])
return {"status": "Command received"}
return {"status": "Speech command received"}
@router.post("/command/gesture", status_code=202)
async def receive_command_gesture(command: GestureCommand, request: Request):
"""
Send a direct gesture command to the robot.
Publishes the command to the internal 'command' topic. The
:class:`~control_backend.agents.actuation.robot_speech_agent.RobotGestureAgent`
will forward this to the robot.
:param command: The speech command payload.
:param request: The FastAPI request object.
"""
topic = b"command"
pub_socket: Socket = request.app.state.endpoints_pub_socket
await pub_socket.send_multipart([topic, command.model_dump_json().encode()])
return {"status": "Gesture command received"}
@router.get("/ping_check")
@@ -45,6 +63,41 @@ async def ping(request: Request):
pass
@router.get("/commands/gesture/tags")
async def get_available_gesture_tags(request: Request, count=0):
"""
Endpoint to retrieve the available gesture tags for the robot.
:param request: The FastAPI request object.
:return: A list of available gesture tags.
"""
req_socket = Context.instance().socket(zmq.REQ)
req_socket.connect(settings.zmq_settings.internal_gesture_rep_adress)
# Check to see if we've got any count given in the query parameter
amount = count or None
timeout = 5 # seconds
await req_socket.send(f"{amount}".encode() if amount else b"None")
try:
body = await asyncio.wait_for(req_socket.recv(), timeout=timeout)
except TimeoutError:
body = '{"tags": []}'
logger.debug("Got timeout error fetching gestures.")
# Handle empty response and JSON decode errors
available_tags = []
if body:
try:
available_tags = json.loads(body).get("tags", [])
except json.JSONDecodeError as e:
logger.error(f"Failed to parse gesture tags JSON: {e}, body: {body}")
# Return empty list on JSON error
available_tags = []
return {"available_gesture_tags": available_tags}
@router.get("/ping_stream")
async def ping_stream(request: Request):
"""

View File

@@ -17,7 +17,7 @@ class ZMQSettings(BaseModel):
internal_sub_address: str = "tcp://localhost:5561"
ri_command_address: str = "tcp://localhost:0000"
ri_communication_address: str = "tcp://*:5555"
vad_agent_address: str = "tcp://localhost:5558"
internal_gesture_rep_adress: str = "tcp://localhost:7788"
class AgentSettings(BaseModel):
@@ -47,6 +47,7 @@ class AgentSettings(BaseModel):
transcription_name: str = "transcription_agent"
ri_communication_name: str = "ri_communication_agent"
robot_speech_name: str = "robot_speech_agent"
robot_gesture_name: str = "robot_gesture_agent"
class BehaviourSettings(BaseModel):

View File

@@ -4,6 +4,7 @@ import os
import yaml
import zmq
from zmq.log.handlers import PUBHandler
from control_backend.core.config import settings
@@ -51,15 +52,27 @@ def setup_logging(path: str = ".logging_config.yaml") -> None:
logging.warning(f"Could not load logging configuration: {e}")
config = {}
if "custom_levels" in config:
for level_name, level_num in config["custom_levels"].items():
add_logging_level(level_name, level_num)
custom_levels = config.get("custom_levels", {}) or {}
for level_name, level_num in custom_levels.items():
add_logging_level(level_name, level_num)
if config.get("handlers") is not None and config.get("handlers").get("ui"):
pub_socket = zmq.Context.instance().socket(zmq.PUB)
pub_socket.connect(settings.zmq_settings.internal_pub_address)
config["handlers"]["ui"]["interface_or_socket"] = pub_socket
logging.config.dictConfig(config)
# Patch ZMQ PUBHandler to know about custom levels
if custom_levels:
for logger_name in ("control_backend",):
logger = logging.getLogger(logger_name)
for handler in logger.handlers:
if isinstance(handler, PUBHandler):
# Use the INFO formatter as the default template
default_fmt = handler.formatters[logging.INFO]
for level_num in custom_levels.values():
handler.setFormatter(default_fmt, level=level_num)
else:
logging.warning("Logging config file not found. Using default logging configuration.")

View File

@@ -39,13 +39,11 @@ from control_backend.agents.communication import RICommunicationAgent
# LLM Agents
from control_backend.agents.llm import LLMAgent
# Perceive agents
from control_backend.agents.perception import VADAgent
# Other backend imports
from control_backend.api.v1.router import api_router
from control_backend.core.config import settings
from control_backend.logging import setup_logging
from control_backend.schemas.program_status import PROGRAM_STATUS, ProgramStatus
logger = logging.getLogger(__name__)
@@ -95,6 +93,8 @@ async def lifespan(app: FastAPI):
endpoints_pub_socket.connect(settings.zmq_settings.internal_pub_address)
app.state.endpoints_pub_socket = endpoints_pub_socket
await endpoints_pub_socket.send_multipart([PROGRAM_STATUS, ProgramStatus.STARTING.value])
# --- Initialize Agents ---
logger.info("Initializing and starting agents.")
@@ -132,10 +132,6 @@ async def lifespan(app: FastAPI):
"name": settings.agent_settings.text_belief_extractor_name,
},
),
"VADAgent": (
VADAgent,
{"audio_in_address": settings.zmq_settings.vad_agent_address, "audio_in_bind": False},
),
"ProgramManagerAgent": (
BDIProgramManager,
{
@@ -146,32 +142,28 @@ async def lifespan(app: FastAPI):
agents = []
vad_agent = None
for name, (agent_class, kwargs) in agents_to_start.items():
try:
logger.debug("Starting agent: %s", name)
agent_instance = agent_class(**kwargs)
await agent_instance.start()
if isinstance(agent_instance, VADAgent):
vad_agent = agent_instance
agents.append(agent_instance)
logger.info("Agent '%s' started successfully.", name)
except Exception as e:
logger.error("Failed to start agent '%s': %s", name, e, exc_info=True)
raise
assert vad_agent is not None
await vad_agent.reset_stream()
logger.info("Application startup complete.")
await endpoints_pub_socket.send_multipart([PROGRAM_STATUS, ProgramStatus.RUNNING.value])
yield
# --- APPLICATION SHUTDOWN ---
logger.info("%s is shutting down.", app.title)
# Potential shutdown logic goes here
await endpoints_pub_socket.send_multipart([PROGRAM_STATUS, ProgramStatus.STOPPING.value])
# Additional shutdown logic goes here
logger.info("Application shutdown complete.")

View File

@@ -0,0 +1,16 @@
from enum import Enum
PROGRAM_STATUS = b"internal/program_status"
"""A topic key for the program status."""
class ProgramStatus(Enum):
"""
Used in internal communication, to tell agents what the status of the program is.
For example, the VAD agent only starts listening when the program is RUNNING.
"""
STARTING = b"starting"
RUNNING = b"running"
STOPPING = b"stopping"

View File

@@ -1,7 +1,7 @@
from enum import Enum
from typing import Any
from typing import Any, Literal
from pydantic import BaseModel
from pydantic import BaseModel, model_validator
class RIEndpoint(str, Enum):
@@ -10,6 +10,8 @@ class RIEndpoint(str, Enum):
"""
SPEECH = "actuate/speech"
GESTURE_SINGLE = "actuate/gesture/single"
GESTURE_TAG = "actuate/gesture/tag"
PING = "ping"
NEGOTIATE_PORTS = "negotiate/ports"
@@ -36,3 +38,27 @@ class SpeechCommand(RIMessage):
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH)
data: str
class GestureCommand(RIMessage):
"""
A specific command to make the robot do a gesture.
:ivar endpoint: Should be ``RIEndpoint.GESTURE_SINGLE`` or ``RIEndpoint.GESTURE_TAG``.
:ivar data: The id of the gesture to be executed.
"""
endpoint: Literal[ # pyright: ignore[reportIncompatibleVariableOverride] - We validate this stricter rule ourselves
RIEndpoint.GESTURE_SINGLE, RIEndpoint.GESTURE_TAG
]
data: str
@model_validator(mode="after")
def check_endpoint(self):
allowed = {
RIEndpoint.GESTURE_SINGLE,
RIEndpoint.GESTURE_TAG,
}
if self.endpoint not in allowed:
raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG")
return self