Compare commits

...

14 Commits

Author SHA1 Message Date
Storm
cdb7fac53f Merge branch 'dev' into feat/pause-functionality 2026-01-07 15:50:45 +01:00
Storm
d1ad2c1549 feat: implement pausing functionality in CB
ref: N25B-350
2026-01-06 18:08:43 +01:00
Björn Otgaar
612a96940d Merge branch 'feat/environment-variables' into 'dev'
Docs for environment variables, parameterize some constants

See merge request ics/sp/2025/n25b/pepperplus-cb!38
2026-01-06 09:02:49 +00:00
Pim Hutting
4c20656c75 Merge branch 'feat/program-reset-llm' into 'dev'
feat: made program reset LLM

See merge request ics/sp/2025/n25b/pepperplus-cb!39
2026-01-02 15:13:05 +00:00
Pim Hutting
6ca86e4b81 feat: made program reset LLM 2026-01-02 15:13:04 +00:00
Storm
867837dcc4 feat: implemented pause functionality in VAD agent
Functionality is implemented by pausing the _streaming_loop function.

ref: N25B-350
2025-12-30 15:58:18 +02:00
Storm
9adeb1efff Merge branch 'feat/semantic-beliefs' into feat/pause-functionality 2025-12-30 15:52:12 +02:00
Twirre Meulenbelt
7d798f2e77 Merge remote-tracking branch 'origin/dev' into feat/environment-variables
# Conflicts:
#	src/control_backend/core/config.py
#	test/unit/agents/actuation/test_robot_speech_agent.py
2025-12-29 12:40:16 +01:00
Twirre Meulenbelt
5282c2471f Merge remote-tracking branch 'origin/dev' into feat/environment-variables
# Conflicts:
#	src/control_backend/core/config.py
#	test/unit/agents/actuation/test_robot_speech_agent.py
2025-12-29 12:35:39 +01:00
Storm
200bd27d9b Merge branch 'dev' into feat/pause-functionality 2025-12-29 12:45:14 +02:00
Storm
539e814c5a feat: functionality implemented for RI pausing functionality
Currently, no CB pausing functionality has been implemented yet. This commit only includes necessary changes to use RI pausing.

ref: N25B-350
2025-12-22 14:02:18 +01:00
Twirre Meulenbelt
0c682d6440 feat: introduce .env.example, docs
The example includes options that are expected to be changed. It also includes a reference to where in the docs you can find a full list of options.

ref: N25B-352
2025-12-11 13:35:19 +01:00
Twirre Meulenbelt
32d8f20dc9 feat: parameterize RI host
Was "localhost" in RI Communication Agent, now uses configurable setting. Secretly also removing "localhost" from VAD agent, as its socket should be something that's "inproc".

ref: N25B-352
2025-12-11 12:12:15 +01:00
Twirre Meulenbelt
9cc0e39955 fix: failures main tests since VAD agent initialization was changed
The test still expects the VAD agent to be started in main, rather than in the RI Communication Agent.

ref: N25B-356
2025-12-11 12:04:24 +01:00
6 changed files with 185 additions and 3 deletions

View File

@@ -3,11 +3,14 @@ import json
import zmq
import zmq.asyncio as azmq
from pydantic import ValidationError
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.ri_message import PauseCommand
from ..actuation.robot_speech_agent import RobotSpeechAgent
from ..perception import VADAgent
@@ -298,3 +301,11 @@ class RICommunicationAgent(BaseAgent):
self.logger.debug("Restarting communication negotiation.")
if await self._negotiate_connection(max_retries=1):
self.connected = True
async def handle_message(self, msg : InternalMessage):
try:
pause_command = PauseCommand.model_validate_json(msg.body)
self._req_socket.send_json(pause_command.model_dump())
self.logger.debug(self._req_socket.recv_json())
except ValidationError:
self.logger.warning("Incorrect message format for PauseCommand.")

View File

@@ -0,0 +1,68 @@
import asyncio
import json
import zmq
from zmq.asyncio import Context
from control_backend.agents.base import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
class TestPauseAgent(BaseAgent):
def __init__(self, name: str):
super().__init__(name)
async def setup(self):
context = Context.instance()
self.pub_socket = context.socket(zmq.PUB)
self.pub_socket.connect(settings.zmq_settings.internal_pub_address)
self.add_behavior(self._pause_command_loop())
self.logger.debug("TestPauseAgent setup complete.")
async def _pause_command_loop(self):
print("Starting Pause command test loop.")
while True:
pause_command = {
"endpoint": "pause",
"data": True,
}
message = InternalMessage(
to="ri_communication_agent",
sender=self.name,
body=json.dumps(pause_command),
)
await self.send(message)
# User interrupt message
data = {
"type": "pause",
"context": True,
}
await self.pub_socket.send_multipart([b"button_pressed", json.dumps(data).encode()])
self.logger.info("Pausing robot actions.")
await asyncio.sleep(15) # Simulate delay between messages
pause_command = {
"endpoint": "pause",
"data": False,
}
message = InternalMessage(
to="ri_communication_agent",
sender=self.name,
body=json.dumps(pause_command),
)
await self.send(message)
# User interrupt message
data = {
"type": "pause",
"context": False,
}
await self.pub_socket.send_multipart([b"button_pressed", json.dumps(data).encode()])
self.logger.info("Resuming robot actions.")
await asyncio.sleep(15) # Simulate delay between messages

View File

@@ -7,6 +7,7 @@ import zmq.asyncio as azmq
from control_backend.agents import BaseAgent
from control_backend.core.config import settings
from control_backend.schemas.internal_message import InternalMessage
from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus
from .transcription_agent.transcription_agent import TranscriptionAgent
@@ -86,6 +87,12 @@ class VADAgent(BaseAgent):
self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
self._ready = asyncio.Event()
# Pause control
self._reset_needed = False
self._paused = asyncio.Event()
self._paused.set() # Not paused at start
self.model = None
async def setup(self):
@@ -213,6 +220,16 @@ class VADAgent(BaseAgent):
"""
await self._ready.wait()
while self._running:
await self._paused.wait()
# After being unpaused, reset stream and buffers
if self._reset_needed:
self.logger.debug("Resuming: resetting stream and buffers.")
await self._reset_stream()
self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
self._reset_needed = False
assert self.audio_in_poller is not None
data = await self.audio_in_poller.poll()
if data is None:
@@ -254,3 +271,27 @@ class VADAgent(BaseAgent):
# At this point, we know that the speech has ended.
# Prepend the last chunk that had no speech, for a more fluent boundary
self.audio_buffer = chunk
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming messages.
Expects messages to pause or resume the VAD processing from User Interrupt Agent.
:param msg: The received internal message.
"""
sender = msg.sender
if sender == settings.agent_settings.user_interrupt_name:
if msg.body == "PAUSE":
self.logger.info("Pausing VAD processing.")
self._paused.clear()
# If the robot needs to pick up speaking where it left off, do not set _reset_needed
self._reset_needed = True
elif msg.body == "RESUME":
self.logger.info("Resuming VAD processing.")
self._paused.set()
else:
self.logger.warning(f"Unknown command from User Interrupt Agent: {msg.body}")
else:
self.logger.debug(f"Ignoring message from unknown sender: {sender}")

View File

@@ -6,7 +6,12 @@ from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand
from control_backend.schemas.ri_message import (
GestureCommand,
PauseCommand,
RIEndpoint,
SpeechCommand,
)
class UserInterruptAgent(BaseAgent):
@@ -71,6 +76,12 @@ class UserInterruptAgent(BaseAgent):
"Forwarded button press (override) with context '%s' to BDIProgramManager.",
event_context,
)
elif event_type == "pause":
await self._send_pause_command(event_context)
if event_context:
self.logger.info("Sent pause command.")
else:
self.logger.info("Sent resume command.")
else:
self.logger.warning(
"Received button press with unknown type '%s' (context: '%s').",
@@ -130,6 +141,38 @@ class UserInterruptAgent(BaseAgent):
belief_id,
)
async def _send_pause_command(self, pause : bool):
"""
Send a pause command to the Robot Interface via the RI Communication Agent.
Send a pause command to the other internal agents; for now just VAD agent.
"""
cmd = PauseCommand(data=pause)
message = InternalMessage(
to=settings.agent_settings.ri_communication_name,
sender=self.name,
body=cmd.model_dump_json(),
)
await self.send(message)
if pause:
# Send pause to VAD agent
vad_message = InternalMessage(
to=settings.agent_settings.vad_name,
sender=self.name,
body="PAUSE",
)
await self.send(vad_message)
self.logger.info("Sent pause command to VAD Agent and RI Communication Agent.")
else:
# Send resume to VAD agent
vad_message = InternalMessage(
to=settings.agent_settings.vad_name,
sender=self.name,
body="RESUME",
)
await self.send(vad_message)
self.logger.info("Sent resume command to VAD Agent and RI Communication Agent.")
async def setup(self):
"""
Initialize the agent.

View File

@@ -39,10 +39,11 @@ from control_backend.agents.communication import RICommunicationAgent
# LLM Agents
from control_backend.agents.llm import LLMAgent
# Other backend imports
from control_backend.agents.mock_agents.test_pause_ri import TestPauseAgent
# User Interrupt Agent
from control_backend.agents.user_interrupt.user_interrupt_agent import UserInterruptAgent
# Other backend imports
from control_backend.api.v1.router import api_router
from control_backend.core.config import settings
from control_backend.logging import setup_logging
@@ -141,6 +142,12 @@ async def lifespan(app: FastAPI):
"name": settings.agent_settings.bdi_program_manager_name,
},
),
"TestPauseAgent": (
TestPauseAgent,
{
"name": "pause_test_agent",
},
),
"UserInterruptAgent": (
UserInterruptAgent,
{

View File

@@ -14,6 +14,7 @@ class RIEndpoint(str, Enum):
GESTURE_TAG = "actuate/gesture/tag"
PING = "ping"
NEGOTIATE_PORTS = "negotiate/ports"
PAUSE = "pause"
class RIMessage(BaseModel):
@@ -64,3 +65,14 @@ class GestureCommand(RIMessage):
if self.endpoint not in allowed:
raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG")
return self
class PauseCommand(RIMessage):
"""
A specific command to pause or unpause the robot's actions.
:ivar endpoint: Fixed to ``RIEndpoint.PAUSE``.
:ivar data: A boolean indicating whether to pause (True) or unpause (False).
"""
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.PAUSE)
data: bool