From 76dfcb23ef3f5777877eeb2bcaf4a73a0858297d Mon Sep 17 00:00:00 2001 From: Storm Date: Wed, 7 Jan 2026 16:03:49 +0100 Subject: [PATCH] feat: added pause functionality ref: N25B-350 --- .../agents/perception/vad_agent.py | 41 ++++++++++++++++ .../user_interrupt/user_interrupt_agent.py | 47 ++++++++++++++++++- src/control_backend/schemas/ri_message.py | 11 +++++ 3 files changed, 97 insertions(+), 2 deletions(-) diff --git a/src/control_backend/agents/perception/vad_agent.py b/src/control_backend/agents/perception/vad_agent.py index 8ccff0a..320a849 100644 --- a/src/control_backend/agents/perception/vad_agent.py +++ b/src/control_backend/agents/perception/vad_agent.py @@ -7,6 +7,7 @@ import zmq.asyncio as azmq from control_backend.agents import BaseAgent from control_backend.core.config import settings +from control_backend.schemas.internal_message import InternalMessage from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus from .transcription_agent.transcription_agent import TranscriptionAgent @@ -86,6 +87,12 @@ class VADAgent(BaseAgent): self.audio_buffer = np.array([], dtype=np.float32) self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech self._ready = asyncio.Event() + + # Pause control + self._reset_needed = False + self._paused = asyncio.Event() + self._paused.set() # Not paused at start + self.model = None async def setup(self): @@ -213,6 +220,16 @@ class VADAgent(BaseAgent): """ await self._ready.wait() while self._running: + await self._paused.wait() + + # After being unpaused, reset stream and buffers + if self._reset_needed: + self.logger.debug("Resuming: resetting stream and buffers.") + await self._reset_stream() + self.audio_buffer = np.array([], dtype=np.float32) + self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech + self._reset_needed = False + assert self.audio_in_poller is not None data = await self.audio_in_poller.poll() if data is None: @@ -254,3 +271,27 @@ class VADAgent(BaseAgent): # At this point, we know that the speech has ended. # Prepend the last chunk that had no speech, for a more fluent boundary self.audio_buffer = chunk + +async def handle_message(self, msg: InternalMessage): + """ + Handle incoming messages. + + Expects messages to pause or resume the VAD processing from User Interrupt Agent. + + :param msg: The received internal message. + """ + sender = msg.sender + + if sender == settings.agent_settings.user_interrupt_name: + if msg.body == "PAUSE": + self.logger.info("Pausing VAD processing.") + self._paused.clear() + # If the robot needs to pick up speaking where it left off, do not set _reset_needed + self._reset_needed = True + elif msg.body == "RESUME": + self.logger.info("Resuming VAD processing.") + self._paused.set() + else: + self.logger.warning(f"Unknown command from User Interrupt Agent: {msg.body}") + else: + self.logger.debug(f"Ignoring message from unknown sender: {sender}") diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py index e58a42b..842231a 100644 --- a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py +++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py @@ -6,7 +6,12 @@ from zmq.asyncio import Context from control_backend.agents import BaseAgent from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings -from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand +from control_backend.schemas.ri_message import ( + GestureCommand, + PauseCommand, + RIEndpoint, + SpeechCommand, +) class UserInterruptAgent(BaseAgent): @@ -71,7 +76,12 @@ class UserInterruptAgent(BaseAgent): "Forwarded button press (override) with context '%s' to BDIProgramManager.", event_context, ) - + elif event_type == "pause": + await self._send_pause_command(event_context) + if event_context: + self.logger.info("Sent pause command.") + else: + self.logger.info("Sent resume command.") elif event_type in ["next_phase", "reset_phase", "reset_experiment"]: await self._send_experiment_control_to_bdi_core(event_type) @@ -163,6 +173,39 @@ class UserInterruptAgent(BaseAgent): "Sent button_override belief with id '%s' to Program manager.", belief_id, ) + + async def _send_pause_command(self, pause : bool): + """ + Send a pause command to the Robot Interface via the RI Communication Agent. + Send a pause command to the other internal agents; for now just VAD agent. + """ + cmd = PauseCommand(data=pause) + message = InternalMessage( + to=settings.agent_settings.ri_communication_name, + sender=self.name, + body=cmd.model_dump_json(), + ) + await self.send(message) + + if pause: + # Send pause to VAD agent + vad_message = InternalMessage( + to=settings.agent_settings.vad_name, + sender=self.name, + body="PAUSE", + ) + await self.send(vad_message) + self.logger.info("Sent pause command to VAD Agent and RI Communication Agent.") + else: + # Send resume to VAD agent + vad_message = InternalMessage( + to=settings.agent_settings.vad_name, + sender=self.name, + body="RESUME", + ) + await self.send(vad_message) + self.logger.info("Sent resume command to VAD Agent and RI Communication Agent.") + async def setup(self): """ diff --git a/src/control_backend/schemas/ri_message.py b/src/control_backend/schemas/ri_message.py index a48dec6..7c1ef22 100644 --- a/src/control_backend/schemas/ri_message.py +++ b/src/control_backend/schemas/ri_message.py @@ -64,3 +64,14 @@ class GestureCommand(RIMessage): if self.endpoint not in allowed: raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG") return self + +class PauseCommand(RIMessage): + """ + A specific command to pause or unpause the robot's actions. + + :ivar endpoint: Fixed to ``RIEndpoint.PAUSE``. + :ivar data: A boolean indicating whether to pause (True) or unpause (False). + """ + + endpoint: RIEndpoint = RIEndpoint(RIEndpoint.PAUSE) + data: bool