feat: implement pausing functionality in CB

ref: N25B-350
This commit is contained in:
Storm
2026-01-06 18:08:43 +01:00
parent 867837dcc4
commit d1ad2c1549
5 changed files with 75 additions and 7 deletions

View File

@@ -1,14 +1,14 @@
import asyncio import asyncio
import json import json
from pydantic import ValidationError
import zmq import zmq
import zmq.asyncio as azmq import zmq.asyncio as azmq
from pydantic import ValidationError
from zmq.asyncio import Context from zmq.asyncio import Context
from control_backend.agents import BaseAgent from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.ri_message import PauseCommand from control_backend.schemas.ri_message import PauseCommand

View File

@@ -1,8 +1,12 @@
import asyncio import asyncio
import json import json
import zmq
from zmq.asyncio import Context
from control_backend.agents.base import BaseAgent from control_backend.agents.base import BaseAgent
from control_backend.core.agent_system import InternalMessage from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
class TestPauseAgent(BaseAgent): class TestPauseAgent(BaseAgent):
@@ -10,8 +14,12 @@ class TestPauseAgent(BaseAgent):
super().__init__(name) super().__init__(name)
async def setup(self): async def setup(self):
self.logger.debug("TestPauseAgent setup complete.") context = Context.instance()
self.pub_socket = context.socket(zmq.PUB)
self.pub_socket.connect(settings.zmq_settings.internal_pub_address)
self.add_behavior(self._pause_command_loop()) self.add_behavior(self._pause_command_loop())
self.logger.debug("TestPauseAgent setup complete.")
async def _pause_command_loop(self): async def _pause_command_loop(self):
print("Starting Pause command test loop.") print("Starting Pause command test loop.")
@@ -27,6 +35,14 @@ class TestPauseAgent(BaseAgent):
body=json.dumps(pause_command), body=json.dumps(pause_command),
) )
await self.send(message) await self.send(message)
# User interrupt message
data = {
"type": "pause",
"context": True,
}
await self.pub_socket.send_multipart([b"button_pressed", json.dumps(data).encode()])
self.logger.info("Pausing robot actions.") self.logger.info("Pausing robot actions.")
await asyncio.sleep(15) # Simulate delay between messages await asyncio.sleep(15) # Simulate delay between messages
@@ -40,5 +56,13 @@ class TestPauseAgent(BaseAgent):
body=json.dumps(pause_command), body=json.dumps(pause_command),
) )
await self.send(message) await self.send(message)
# User interrupt message
data = {
"type": "pause",
"context": False,
}
await self.pub_socket.send_multipart([b"button_pressed", json.dumps(data).encode()])
self.logger.info("Resuming robot actions.") self.logger.info("Resuming robot actions.")
await asyncio.sleep(15) # Simulate delay between messages await asyncio.sleep(15) # Simulate delay between messages

View File

@@ -286,6 +286,7 @@ class VADAgent(BaseAgent):
if msg.body == "PAUSE": if msg.body == "PAUSE":
self.logger.info("Pausing VAD processing.") self.logger.info("Pausing VAD processing.")
self._paused.clear() self._paused.clear()
# If the robot needs to pick up speaking where it left off, do not set _reset_needed
self._reset_needed = True self._reset_needed = True
elif msg.body == "RESUME": elif msg.body == "RESUME":
self.logger.info("Resuming VAD processing.") self.logger.info("Resuming VAD processing.")

View File

@@ -6,7 +6,12 @@ from zmq.asyncio import Context
from control_backend.agents import BaseAgent from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand from control_backend.schemas.ri_message import (
GestureCommand,
PauseCommand,
RIEndpoint,
SpeechCommand,
)
class UserInterruptAgent(BaseAgent): class UserInterruptAgent(BaseAgent):
@@ -71,6 +76,12 @@ class UserInterruptAgent(BaseAgent):
"Forwarded button press (override) with context '%s' to BDIProgramManager.", "Forwarded button press (override) with context '%s' to BDIProgramManager.",
event_context, event_context,
) )
elif event_type == "pause":
await self._send_pause_command(event_context)
if event_context:
self.logger.info("Sent pause command.")
else:
self.logger.info("Sent resume command.")
else: else:
self.logger.warning( self.logger.warning(
"Received button press with unknown type '%s' (context: '%s').", "Received button press with unknown type '%s' (context: '%s').",
@@ -130,6 +141,38 @@ class UserInterruptAgent(BaseAgent):
belief_id, belief_id,
) )
async def _send_pause_command(self, pause : bool):
"""
Send a pause command to the Robot Interface via the RI Communication Agent.
Send a pause command to the other internal agents; for now just VAD agent.
"""
cmd = PauseCommand(data=pause)
message = InternalMessage(
to=settings.agent_settings.ri_communication_name,
sender=self.name,
body=cmd.model_dump_json(),
)
await self.send(message)
if pause:
# Send pause to VAD agent
vad_message = InternalMessage(
to=settings.agent_settings.vad_name,
sender=self.name,
body="PAUSE",
)
await self.send(vad_message)
self.logger.info("Sent pause command to VAD Agent and RI Communication Agent.")
else:
# Send resume to VAD agent
vad_message = InternalMessage(
to=settings.agent_settings.vad_name,
sender=self.name,
body="RESUME",
)
await self.send(vad_message)
self.logger.info("Sent resume command to VAD Agent and RI Communication Agent.")
async def setup(self): async def setup(self):
""" """
Initialize the agent. Initialize the agent.

View File

@@ -39,11 +39,11 @@ from control_backend.agents.communication import RICommunicationAgent
# LLM Agents # LLM Agents
from control_backend.agents.llm import LLMAgent from control_backend.agents.llm import LLMAgent
# User Interrupt Agent
from control_backend.agents.user_interrupt.user_interrupt_agent import UserInterruptAgent
# Other backend imports # Other backend imports
from control_backend.agents.mock_agents.test_pause_ri import TestPauseAgent from control_backend.agents.mock_agents.test_pause_ri import TestPauseAgent
# User Interrupt Agent
from control_backend.agents.user_interrupt.user_interrupt_agent import UserInterruptAgent
from control_backend.api.v1.router import api_router from control_backend.api.v1.router import api_router
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.logging import setup_logging from control_backend.logging import setup_logging