feat: functionality implemented for RI pausing functionality

Currently, no CB pausing functionality has been implemented yet. This commit only includes necessary changes to use RI pausing.

ref: N25B-350
This commit is contained in:
Storm
2025-12-22 14:02:18 +01:00
parent 3e7f2ef574
commit 539e814c5a
5 changed files with 75 additions and 1 deletions

View File

@@ -1,13 +1,16 @@
import asyncio
import json
from pydantic import ValidationError
import zmq
import zmq.asyncio as azmq
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
from control_backend.core.config import settings
from control_backend.schemas.ri_message import PauseCommand
from ..actuation.robot_speech_agent import RobotSpeechAgent
from ..perception import VADAgent
@@ -296,3 +299,11 @@ class RICommunicationAgent(BaseAgent):
self.logger.debug("Restarting communication negotiation.")
if await self._negotiate_connection(max_retries=1):
self.connected = True
async def handle_message(self, msg : InternalMessage):
try:
pause_command = PauseCommand.model_validate_json(msg.body)
self._req_socket.send_json(pause_command.model_dump())
self.logger.debug(self._req_socket.recv_json())
except ValidationError:
self.logger.warning("Incorrect message format for PauseCommand.")

View File

@@ -0,0 +1,44 @@
import asyncio
import json
from control_backend.agents.base import BaseAgent
from control_backend.core.agent_system import InternalMessage
class TestPauseAgent(BaseAgent):
def __init__(self, name: str):
super().__init__(name)
async def setup(self):
self.logger.debug("TestPauseAgent setup complete.")
self.add_behavior(self._pause_command_loop())
async def _pause_command_loop(self):
print("Starting Pause command test loop.")
while True:
pause_command = {
"endpoint": "pause",
"data": True,
}
message = InternalMessage(
to="ri_communication_agent",
sender=self.name,
body=json.dumps(pause_command),
)
await self.send(message)
self.logger.info("Pausing robot actions.")
await asyncio.sleep(15) # Simulate delay between messages
pause_command = {
"endpoint": "pause",
"data": False,
}
message = InternalMessage(
to="ri_communication_agent",
sender=self.name,
body=json.dumps(pause_command),
)
await self.send(message)
self.logger.info("Resuming robot actions.")
await asyncio.sleep(15) # Simulate delay between messages

View File

@@ -91,7 +91,7 @@ class LLMSettings(BaseModel):
"""
local_llm_url: str = "http://localhost:1234/v1/chat/completions"
local_llm_model: str = "gpt-oss"
local_llm_model: str = "google/gemma-3-1b"
class VADSettings(BaseModel):

View File

@@ -40,6 +40,7 @@ from control_backend.agents.communication import RICommunicationAgent
from control_backend.agents.llm import LLMAgent
# Other backend imports
from control_backend.agents.mock_agents.test_pause_ri import TestPauseAgent
from control_backend.api.v1.router import api_router
from control_backend.core.config import settings
from control_backend.logging import setup_logging
@@ -138,6 +139,12 @@ async def lifespan(app: FastAPI):
"name": settings.agent_settings.bdi_program_manager_name,
},
),
"TestPauseAgent": (
TestPauseAgent,
{
"name": "pause_test_agent",
},
),
}
agents = []

View File

@@ -14,6 +14,7 @@ class RIEndpoint(str, Enum):
GESTURE_TAG = "actuate/gesture/tag"
PING = "ping"
NEGOTIATE_PORTS = "negotiate/ports"
PAUSE = "pause"
class RIMessage(BaseModel):
@@ -62,3 +63,14 @@ class GestureCommand(RIMessage):
if self.endpoint not in allowed:
raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG")
return self
class PauseCommand(RIMessage):
"""
A specific command to pause or unpause the robot's actions.
:ivar endpoint: Fixed to ``RIEndpoint.PAUSE``.
:ivar data: A boolean indicating whether to pause (True) or unpause (False).
"""
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.PAUSE)
data: bool