From 324a63e5cc60437aa7cd9f868bfe30a381070614 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Wed, 7 Jan 2026 14:45:42 +0100 Subject: [PATCH 1/7] chore: add styles to user_interrupt_agent --- .../user_interrupt/user_interrupt_agent.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py index b2efc41..8a4d2a2 100644 --- a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py +++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py @@ -71,6 +71,16 @@ class UserInterruptAgent(BaseAgent): "Forwarded button press (override) with context '%s' to BDIProgramManager.", event_context, ) + + elif event_type == "next_phase": + _ = 1 + + elif event_type == "reset_phase": + _ = 1 + + elif event_type == " reset_experiment": + _ = 1 + else: self.logger.warning( "Received button press with unknown type '%s' (context: '%s').", @@ -78,6 +88,15 @@ class UserInterruptAgent(BaseAgent): event_context, ) + async def _send_experiment_control_to_bdi_core(self, type): + out_msg = InternalMessage( + to=settings.agent_settings.bdi_core_name, + sender=self.name, + thread=type, + body="", + ) + await self.send(out_msg) + async def _send_to_speech_agent(self, text_to_say: str): """ method to send prioritized speech command to RobotSpeechAgent. From 34afca6652dde4fbb4aa993d97454f11225ef5a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Wed, 7 Jan 2026 15:07:33 +0100 Subject: [PATCH 2/7] chore: automatically send the experiment controls to the bdi core in the user interupt agent. --- .../user_interrupt/user_interrupt_agent.py | 33 ++++++++++++++----- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py index 8a4d2a2..e58a42b 100644 --- a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py +++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py @@ -72,14 +72,8 @@ class UserInterruptAgent(BaseAgent): event_context, ) - elif event_type == "next_phase": - _ = 1 - - elif event_type == "reset_phase": - _ = 1 - - elif event_type == " reset_experiment": - _ = 1 + elif event_type in ["next_phase", "reset_phase", "reset_experiment"]: + await self._send_experiment_control_to_bdi_core(event_type) else: self.logger.warning( @@ -89,12 +83,33 @@ class UserInterruptAgent(BaseAgent): ) async def _send_experiment_control_to_bdi_core(self, type): + """ + method to send experiment control buttons to bdi core. + + :param type: the type of control button we should send to the bdi core. + """ + # Switch which thread we should send to bdi core + thread = "" + match type: + case "next_phase": + thread = "force_next_phase" + case "reset_phase": + thread = "reset_current_phase" + case "reset_experiment": + thread = "reset_experiment" + case _: + self.logger.warning( + "Received unknown experiment control type '%s' to send to BDI Core.", + type, + ) + out_msg = InternalMessage( to=settings.agent_settings.bdi_core_name, sender=self.name, - thread=type, + thread=thread, body="", ) + self.logger.debug("Sending experiment control '%s' to BDI Core.", thread) await self.send(out_msg) async def _send_to_speech_agent(self, text_to_say: str): From 76dfcb23ef3f5777877eeb2bcaf4a73a0858297d Mon Sep 17 00:00:00 2001 From: Storm Date: Wed, 7 Jan 2026 16:03:49 +0100 Subject: [PATCH 3/7] feat: added pause functionality ref: N25B-350 --- .../agents/perception/vad_agent.py | 41 ++++++++++++++++ .../user_interrupt/user_interrupt_agent.py | 47 ++++++++++++++++++- src/control_backend/schemas/ri_message.py | 11 +++++ 3 files changed, 97 insertions(+), 2 deletions(-) diff --git a/src/control_backend/agents/perception/vad_agent.py b/src/control_backend/agents/perception/vad_agent.py index 8ccff0a..320a849 100644 --- a/src/control_backend/agents/perception/vad_agent.py +++ b/src/control_backend/agents/perception/vad_agent.py @@ -7,6 +7,7 @@ import zmq.asyncio as azmq from control_backend.agents import BaseAgent from control_backend.core.config import settings +from control_backend.schemas.internal_message import InternalMessage from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus from .transcription_agent.transcription_agent import TranscriptionAgent @@ -86,6 +87,12 @@ class VADAgent(BaseAgent): self.audio_buffer = np.array([], dtype=np.float32) self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech self._ready = asyncio.Event() + + # Pause control + self._reset_needed = False + self._paused = asyncio.Event() + self._paused.set() # Not paused at start + self.model = None async def setup(self): @@ -213,6 +220,16 @@ class VADAgent(BaseAgent): """ await self._ready.wait() while self._running: + await self._paused.wait() + + # After being unpaused, reset stream and buffers + if self._reset_needed: + self.logger.debug("Resuming: resetting stream and buffers.") + await self._reset_stream() + self.audio_buffer = np.array([], dtype=np.float32) + self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech + self._reset_needed = False + assert self.audio_in_poller is not None data = await self.audio_in_poller.poll() if data is None: @@ -254,3 +271,27 @@ class VADAgent(BaseAgent): # At this point, we know that the speech has ended. # Prepend the last chunk that had no speech, for a more fluent boundary self.audio_buffer = chunk + +async def handle_message(self, msg: InternalMessage): + """ + Handle incoming messages. + + Expects messages to pause or resume the VAD processing from User Interrupt Agent. + + :param msg: The received internal message. + """ + sender = msg.sender + + if sender == settings.agent_settings.user_interrupt_name: + if msg.body == "PAUSE": + self.logger.info("Pausing VAD processing.") + self._paused.clear() + # If the robot needs to pick up speaking where it left off, do not set _reset_needed + self._reset_needed = True + elif msg.body == "RESUME": + self.logger.info("Resuming VAD processing.") + self._paused.set() + else: + self.logger.warning(f"Unknown command from User Interrupt Agent: {msg.body}") + else: + self.logger.debug(f"Ignoring message from unknown sender: {sender}") diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py index e58a42b..842231a 100644 --- a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py +++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py @@ -6,7 +6,12 @@ from zmq.asyncio import Context from control_backend.agents import BaseAgent from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings -from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand +from control_backend.schemas.ri_message import ( + GestureCommand, + PauseCommand, + RIEndpoint, + SpeechCommand, +) class UserInterruptAgent(BaseAgent): @@ -71,7 +76,12 @@ class UserInterruptAgent(BaseAgent): "Forwarded button press (override) with context '%s' to BDIProgramManager.", event_context, ) - + elif event_type == "pause": + await self._send_pause_command(event_context) + if event_context: + self.logger.info("Sent pause command.") + else: + self.logger.info("Sent resume command.") elif event_type in ["next_phase", "reset_phase", "reset_experiment"]: await self._send_experiment_control_to_bdi_core(event_type) @@ -163,6 +173,39 @@ class UserInterruptAgent(BaseAgent): "Sent button_override belief with id '%s' to Program manager.", belief_id, ) + + async def _send_pause_command(self, pause : bool): + """ + Send a pause command to the Robot Interface via the RI Communication Agent. + Send a pause command to the other internal agents; for now just VAD agent. + """ + cmd = PauseCommand(data=pause) + message = InternalMessage( + to=settings.agent_settings.ri_communication_name, + sender=self.name, + body=cmd.model_dump_json(), + ) + await self.send(message) + + if pause: + # Send pause to VAD agent + vad_message = InternalMessage( + to=settings.agent_settings.vad_name, + sender=self.name, + body="PAUSE", + ) + await self.send(vad_message) + self.logger.info("Sent pause command to VAD Agent and RI Communication Agent.") + else: + # Send resume to VAD agent + vad_message = InternalMessage( + to=settings.agent_settings.vad_name, + sender=self.name, + body="RESUME", + ) + await self.send(vad_message) + self.logger.info("Sent resume command to VAD Agent and RI Communication Agent.") + async def setup(self): """ diff --git a/src/control_backend/schemas/ri_message.py b/src/control_backend/schemas/ri_message.py index a48dec6..7c1ef22 100644 --- a/src/control_backend/schemas/ri_message.py +++ b/src/control_backend/schemas/ri_message.py @@ -64,3 +64,14 @@ class GestureCommand(RIMessage): if self.endpoint not in allowed: raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG") return self + +class PauseCommand(RIMessage): + """ + A specific command to pause or unpause the robot's actions. + + :ivar endpoint: Fixed to ``RIEndpoint.PAUSE``. + :ivar data: A boolean indicating whether to pause (True) or unpause (False). + """ + + endpoint: RIEndpoint = RIEndpoint(RIEndpoint.PAUSE) + data: bool From be88323cf76333f0b2dfcb98c2d93b1723977a19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Wed, 7 Jan 2026 18:24:35 +0100 Subject: [PATCH 4/7] chore: add one endpoint fo avoid errors --- .../agents/user_interrupt/user_interrupt_agent.py | 8 ++++---- src/control_backend/schemas/ri_message.py | 2 ++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py index 842231a..50df979 100644 --- a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py +++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py @@ -82,6 +82,7 @@ class UserInterruptAgent(BaseAgent): self.logger.info("Sent pause command.") else: self.logger.info("Sent resume command.") + elif event_type in ["next_phase", "reset_phase", "reset_experiment"]: await self._send_experiment_control_to_bdi_core(event_type) @@ -173,11 +174,11 @@ class UserInterruptAgent(BaseAgent): "Sent button_override belief with id '%s' to Program manager.", belief_id, ) - - async def _send_pause_command(self, pause : bool): + + async def _send_pause_command(self, pause: bool): """ Send a pause command to the Robot Interface via the RI Communication Agent. - Send a pause command to the other internal agents; for now just VAD agent. + Send a pause command to the other internal agents; for now just VAD agent. """ cmd = PauseCommand(data=pause) message = InternalMessage( @@ -206,7 +207,6 @@ class UserInterruptAgent(BaseAgent): await self.send(vad_message) self.logger.info("Sent resume command to VAD Agent and RI Communication Agent.") - async def setup(self): """ Initialize the agent. diff --git a/src/control_backend/schemas/ri_message.py b/src/control_backend/schemas/ri_message.py index 7c1ef22..e6eafa3 100644 --- a/src/control_backend/schemas/ri_message.py +++ b/src/control_backend/schemas/ri_message.py @@ -14,6 +14,7 @@ class RIEndpoint(str, Enum): GESTURE_TAG = "actuate/gesture/tag" PING = "ping" NEGOTIATE_PORTS = "negotiate/ports" + PAUSE = "" class RIMessage(BaseModel): @@ -65,6 +66,7 @@ class GestureCommand(RIMessage): raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG") return self + class PauseCommand(RIMessage): """ A specific command to pause or unpause the robot's actions. From cc0d5af28c200eb7feec05a2bcb3c6b3a71b5dd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Thu, 8 Jan 2026 12:56:22 +0100 Subject: [PATCH 5/7] chore: fixing bugs --- .../agents/communication/ri_communication_agent.py | 12 ++++++++++++ .../agents/user_interrupt/user_interrupt_agent.py | 7 +++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/control_backend/agents/communication/ri_communication_agent.py b/src/control_backend/agents/communication/ri_communication_agent.py index 34e5b25..45e3511 100644 --- a/src/control_backend/agents/communication/ri_communication_agent.py +++ b/src/control_backend/agents/communication/ri_communication_agent.py @@ -8,6 +8,7 @@ from zmq.asyncio import Context from control_backend.agents import BaseAgent from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent from control_backend.core.config import settings +from control_backend.schemas.internal_message import InternalMessage from ..actuation.robot_speech_agent import RobotSpeechAgent from ..perception import VADAgent @@ -298,3 +299,14 @@ class RICommunicationAgent(BaseAgent): self.logger.debug("Restarting communication negotiation.") if await self._negotiate_connection(max_retries=1): self.connected = True + + async def handle_message(self, msg: InternalMessage): + """ + Handle an incoming message. + + Currently not implemented for this agent. + + :param msg: The received message. + :raises NotImplementedError: Always, since this method is not implemented. + """ + self.logger.warning("custom warning for handle msg in ri coms %s", self.name) diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py index 50df979..f48e14b 100644 --- a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py +++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py @@ -77,6 +77,9 @@ class UserInterruptAgent(BaseAgent): event_context, ) elif event_type == "pause": + self.logger.debug( + "Received pause/resume button press with context '%s'.", event_context + ) await self._send_pause_command(event_context) if event_context: self.logger.info("Sent pause command.") @@ -175,7 +178,7 @@ class UserInterruptAgent(BaseAgent): belief_id, ) - async def _send_pause_command(self, pause: bool): + async def _send_pause_command(self, pause): """ Send a pause command to the Robot Interface via the RI Communication Agent. Send a pause command to the other internal agents; for now just VAD agent. @@ -188,7 +191,7 @@ class UserInterruptAgent(BaseAgent): ) await self.send(message) - if pause: + if pause == "true": # Send pause to VAD agent vad_message = InternalMessage( to=settings.agent_settings.vad_name, From 13605678209b23c46ff026bc56f3cefc63cba0d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Thu, 8 Jan 2026 13:01:38 +0100 Subject: [PATCH 6/7] chore: indenting --- src/control_backend/agents/perception/vad_agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/control_backend/agents/perception/vad_agent.py b/src/control_backend/agents/perception/vad_agent.py index 320a849..9c5a3ef 100644 --- a/src/control_backend/agents/perception/vad_agent.py +++ b/src/control_backend/agents/perception/vad_agent.py @@ -272,7 +272,7 @@ class VADAgent(BaseAgent): # Prepend the last chunk that had no speech, for a more fluent boundary self.audio_buffer = chunk -async def handle_message(self, msg: InternalMessage): + async def handle_message(self, msg: InternalMessage): """ Handle incoming messages. From c91b9991048ae0840f54e5dbb847333295958d14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Thu, 8 Jan 2026 15:31:44 +0100 Subject: [PATCH 7/7] chore: fix bugs and make sure connected robots work --- .../agents/actuation/robot_gesture_agent.py | 2 ++ .../communication/ri_communication_agent.py | 20 ++++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/control_backend/agents/actuation/robot_gesture_agent.py b/src/control_backend/agents/actuation/robot_gesture_agent.py index 4f5dd79..5e40c04 100644 --- a/src/control_backend/agents/actuation/robot_gesture_agent.py +++ b/src/control_backend/agents/actuation/robot_gesture_agent.py @@ -83,6 +83,8 @@ class RobotGestureAgent(BaseAgent): self.subsocket.close() if self.pubsocket: self.pubsocket.close() + if self.repsocket: + self.repsocket.close() await super().stop() async def handle_message(self, msg: InternalMessage): diff --git a/src/control_backend/agents/communication/ri_communication_agent.py b/src/control_backend/agents/communication/ri_communication_agent.py index 45e3511..7fcd07b 100644 --- a/src/control_backend/agents/communication/ri_communication_agent.py +++ b/src/control_backend/agents/communication/ri_communication_agent.py @@ -48,6 +48,8 @@ class RICommunicationAgent(BaseAgent): self._req_socket: azmq.Socket | None = None self.pub_socket: azmq.Socket | None = None self.connected = False + self.gesture_agent: RobotGestureAgent | None = None + self.speech_agent: RobotSpeechAgent | None = None async def setup(self): """ @@ -141,6 +143,7 @@ class RICommunicationAgent(BaseAgent): # At this point, we have a valid response try: + self.logger.debug("Negotiation successful. Handling rn") await self._handle_negotiation_response(received_message) # Let UI know that we're connected topic = b"ping" @@ -189,6 +192,7 @@ class RICommunicationAgent(BaseAgent): address=addr, bind=bind, ) + self.speech_agent = robot_speech_agent robot_gesture_agent = RobotGestureAgent( settings.agent_settings.robot_gesture_name, address=addr, @@ -196,6 +200,7 @@ class RICommunicationAgent(BaseAgent): gesture_data=gesture_data, single_gesture_data=single_gesture_data, ) + self.gesture_agent = robot_gesture_agent await robot_speech_agent.start() await asyncio.sleep(0.1) # Small delay await robot_gesture_agent.start() @@ -226,6 +231,7 @@ class RICommunicationAgent(BaseAgent): while self._running: if not self.connected: await asyncio.sleep(settings.behaviour_settings.sleep_s) + self.logger.debug("Not connected, skipping ping loop iteration.") continue # We need to listen and send pings. @@ -289,15 +295,27 @@ class RICommunicationAgent(BaseAgent): # Tell UI we're disconnected. topic = b"ping" data = json.dumps(False).encode() + self.logger.debug("1") if self.pub_socket: try: + self.logger.debug("2") await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5) except TimeoutError: + self.logger.debug("3") self.logger.warning("Connection ping for router timed out.") # Try to reboot/renegotiate + if self.gesture_agent is not None: + await self.gesture_agent.stop() + + if self.speech_agent is not None: + await self.speech_agent.stop() + + if self.pub_socket is not None: + self.pub_socket.close() + self.logger.debug("Restarting communication negotiation.") - if await self._negotiate_connection(max_retries=1): + if await self._negotiate_connection(max_retries=2): self.connected = True async def handle_message(self, msg: InternalMessage):