Merge remote-tracking branch 'origin/dev' into feat/environment-variables

# Conflicts:
#	src/control_backend/core/config.py
#	test/unit/agents/actuation/test_robot_speech_agent.py
This commit is contained in:
Twirre Meulenbelt
2025-12-29 12:35:39 +01:00
23 changed files with 1477 additions and 49 deletions

View File

@@ -1 +1,2 @@
from .robot_gesture_agent import RobotGestureAgent as RobotGestureAgent
from .robot_speech_agent import RobotSpeechAgent as RobotSpeechAgent

View File

@@ -0,0 +1,171 @@
import json
import zmq
import zmq.asyncio as azmq
from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint
class RobotGestureAgent(BaseAgent):
"""
This agent acts as a bridge between the control backend and the Robot Interface (RI).
It receives gesture commands from other agents or from the UI,
and forwards them to the robot via a ZMQ PUB socket.
:ivar subsocket: ZMQ SUB socket for receiving external commands (e.g., from UI).
:ivar pubsocket: ZMQ PUB socket for sending commands to the Robot Interface.
:ivar address: Address to bind/connect the PUB socket.
:ivar bind: Whether to bind or connect the PUB socket.
:ivar gesture_data: A list of strings for available gestures
"""
subsocket: azmq.Socket
repsocket: azmq.Socket
pubsocket: azmq.Socket
address = ""
bind = False
gesture_data = []
single_gesture_data = []
def __init__(
self,
name: str,
address=settings.zmq_settings.ri_command_address,
bind=False,
gesture_data=None,
single_gesture_data=None,
):
self.gesture_data = gesture_data or []
self.single_gesture_data = single_gesture_data or []
super().__init__(name)
self.address = address
self.bind = bind
async def setup(self):
"""
Initialize the agent.
1. Sets up the PUB socket to talk to the robot.
2. Sets up the SUB socket to listen for "command" topics (from UI/External).
3. Starts the loop for handling ZMQ commands.
"""
self.logger.info("Setting up %s", self.name)
context = azmq.Context.instance()
# To the robot
self.pubsocket = context.socket(zmq.PUB)
if self.bind:
self.pubsocket.bind(self.address)
else:
self.pubsocket.connect(self.address)
# Receive internal topics regarding commands
self.subsocket = context.socket(zmq.SUB)
self.subsocket.connect(settings.zmq_settings.internal_sub_address)
self.subsocket.setsockopt(zmq.SUBSCRIBE, b"command")
self.subsocket.setsockopt(zmq.SUBSCRIBE, b"send_gestures")
# REP socket for replying to gesture requests
self.repsocket = context.socket(zmq.REP)
self.repsocket.bind(settings.zmq_settings.internal_gesture_rep_adress)
self.add_behavior(self._zmq_command_loop())
self.add_behavior(self._fetch_gestures_loop())
self.logger.info("Finished setting up %s", self.name)
async def stop(self):
if self.subsocket:
self.subsocket.close()
if self.pubsocket:
self.pubsocket.close()
await super().stop()
async def handle_message(self, msg: InternalMessage):
"""
Handle commands received from other internal Python agents.
Validates the message as a :class:`GestureCommand` and forwards it to the robot.
:param msg: The internal message containing the command.
"""
try:
gesture_command = GestureCommand.model_validate_json(msg.body)
if gesture_command.endpoint == RIEndpoint.GESTURE_TAG:
if gesture_command.data not in self.gesture_data:
self.logger.warning(
"Received gesture tag '%s' which is not in available tags. Early returning",
gesture_command.data,
)
return
elif gesture_command.endpoint == RIEndpoint.GESTURE_SINGLE:
if gesture_command.data not in self.single_gesture_data:
self.logger.warning(
"Received gesture '%s' which is not in available gestures. Early returning",
gesture_command.data,
)
return
await self.pubsocket.send_json(gesture_command.model_dump())
except Exception:
self.logger.exception("Error processing internal message.")
async def _zmq_command_loop(self):
"""
Loop to handle commands received via ZMQ (e.g., from the UI).
Listens on the 'command' topic, validates the JSON and forwards it to the robot.
"""
while self._running:
try:
topic, body = await self.subsocket.recv_multipart()
# Don't process send_gestures here
if topic != b"command":
continue
body = json.loads(body)
gesture_command = GestureCommand.model_validate(body)
if gesture_command.endpoint == RIEndpoint.GESTURE_TAG:
if gesture_command.data not in self.gesture_data:
self.logger.warning(
"Received gesture tag '%s' which is not in available tags.\
Early returning",
gesture_command.data,
)
continue
await self.pubsocket.send_json(gesture_command.model_dump())
except Exception:
self.logger.exception("Error processing ZMQ message.")
async def _fetch_gestures_loop(self):
"""
Loop to handle fetching gestures received via ZMQ (e.g., from the UI).
Listens on the 'send_gestures' topic, and returns a list on the get_gestures topic.
"""
while self._running:
try:
# Get a request
body = await self.repsocket.recv()
# Figure out amount, if specified
try:
body = json.loads(body)
except json.JSONDecodeError:
body = None
amount = None
if isinstance(body, int):
amount = body
# Fetch tags from gesture data and respond
tags = self.gesture_data[:amount] if amount else self.gesture_data
response = json.dumps({"tags": tags}).encode()
await self.repsocket.send(response)
except Exception:
self.logger.exception("Error fetching gesture tags.")

View File

@@ -6,6 +6,7 @@ import zmq.asyncio as azmq
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
from control_backend.core.config import settings
from ..actuation.robot_speech_agent import RobotSpeechAgent
@@ -180,12 +181,23 @@ class RICommunicationAgent(BaseAgent):
else:
self._req_socket.bind(addr)
case "actuation":
ri_commands_agent = RobotSpeechAgent(
gesture_data = port_data.get("gestures", [])
single_gesture_data = port_data.get("single_gestures", [])
robot_speech_agent = RobotSpeechAgent(
settings.agent_settings.robot_speech_name,
address=addr,
bind=bind,
)
await ri_commands_agent.start()
robot_gesture_agent = RobotGestureAgent(
settings.agent_settings.robot_gesture_name,
address=addr,
bind=bind,
gesture_data=gesture_data,
single_gesture_data=single_gesture_data,
)
await robot_speech_agent.start()
await asyncio.sleep(0.1) # Small delay
await robot_gesture_agent.start()
case "audio":
vad_agent = VADAgent(audio_in_address=addr, audio_in_bind=bind)
await vad_agent.start()

View File

@@ -125,7 +125,7 @@ class LLMAgent(BaseAgent):
full_message += token
current_chunk += token
self.logger.info(
self.logger.llm(
"Received token: %s",
full_message,
extra={"reference": message_id}, # Used in the UI to update old logs

View File

@@ -0,0 +1,146 @@
import json
import zmq
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand
class UserInterruptAgent(BaseAgent):
"""
User Interrupt Agent.
This agent receives button_pressed events from the external HTTP API
(via ZMQ) and uses the associated context to trigger one of the following actions:
- Send a prioritized message to the `RobotSpeechAgent`
- Send a prioritized gesture to the `RobotGestureAgent`
- Send a belief override to the `BDIProgramManager`in order to activate a
trigger/conditional norm or complete a goal.
Prioritized actions clear the current RI queue before inserting the new item,
ensuring they are executed immediately after Pepper's current action has been fulfilled.
:ivar sub_socket: The ZMQ SUB socket used to receive user intterupts.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sub_socket = None
async def _receive_button_event(self):
"""
The behaviour of the UserInterruptAgent.
Continuous loop that receives button_pressed events from the button_pressed HTTP endpoint.
These events contain a type and a context.
These are the different types and contexts:
- type: "speech", context: string that the robot has to say.
- type: "gesture", context: single gesture name that the robot has to perform.
- type: "override", context: belief_id that overrides the goal/trigger/conditional norm.
"""
while True:
topic, body = await self.sub_socket.recv_multipart()
try:
event_data = json.loads(body)
event_type = event_data.get("type") # e.g., "speech", "gesture"
event_context = event_data.get("context") # e.g., "Hello, I am Pepper!"
except json.JSONDecodeError:
self.logger.error("Received invalid JSON payload on topic %s", topic)
continue
if event_type == "speech":
await self._send_to_speech_agent(event_context)
self.logger.info(
"Forwarded button press (speech) with context '%s' to RobotSpeechAgent.",
event_context,
)
elif event_type == "gesture":
await self._send_to_gesture_agent(event_context)
self.logger.info(
"Forwarded button press (gesture) with context '%s' to RobotGestureAgent.",
event_context,
)
elif event_type == "override":
await self._send_to_program_manager(event_context)
self.logger.info(
"Forwarded button press (override) with context '%s' to BDIProgramManager.",
event_context,
)
else:
self.logger.warning(
"Received button press with unknown type '%s' (context: '%s').",
event_type,
event_context,
)
async def _send_to_speech_agent(self, text_to_say: str):
"""
method to send prioritized speech command to RobotSpeechAgent.
:param text_to_say: The string that the robot has to say.
"""
cmd = SpeechCommand(data=text_to_say, is_priority=True)
out_msg = InternalMessage(
to=settings.agent_settings.robot_speech_name,
sender=self.name,
body=cmd.model_dump_json(),
)
await self.send(out_msg)
async def _send_to_gesture_agent(self, single_gesture_name: str):
"""
method to send prioritized gesture command to RobotGestureAgent.
:param single_gesture_name: The gesture tag that the robot has to perform.
"""
# the endpoint is set to always be GESTURE_SINGLE for user interrupts
cmd = GestureCommand(
endpoint=RIEndpoint.GESTURE_SINGLE, data=single_gesture_name, is_priority=True
)
out_msg = InternalMessage(
to=settings.agent_settings.robot_gesture_name,
sender=self.name,
body=cmd.model_dump_json(),
)
await self.send(out_msg)
async def _send_to_program_manager(self, belief_id: str):
"""
Send a button_override belief to the BDIProgramManager.
:param belief_id: The belief_id that overrides the goal/trigger/conditional norm.
this id can belong to a basic belief or an inferred belief.
See also: https://utrechtuniversity.youtrack.cloud/articles/N25B-A-27/UI-components
"""
data = {"belief": belief_id}
message = InternalMessage(
to=settings.agent_settings.bdi_program_manager_name,
sender=self.name,
body=json.dumps(data),
thread="belief_override_id",
)
await self.send(message)
self.logger.info(
"Sent button_override belief with id '%s' to Program manager.",
belief_id,
)
async def setup(self):
"""
Initialize the agent.
Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic.
Starts the background behavior to receive the user interrupts.
"""
context = Context.instance()
self.sub_socket = context.socket(zmq.SUB)
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
self.sub_socket.subscribe("button_pressed")
self.add_behavior(self._receive_button_event())