feat: implemented pausing, implemented graceful stopping, removed old RI pausing code

ref: N25B-393
This commit is contained in:
Storm
2026-01-20 18:53:24 +01:00
parent f9b807fc97
commit 0b1c2ce20a
3 changed files with 52 additions and 30 deletions

View File

@@ -3,7 +3,6 @@ import json
import zmq
import zmq.asyncio as azmq
from pydantic import ValidationError
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
@@ -12,8 +11,6 @@ from control_backend.agents.perception.visual_emotion_recognition_agent.visual_e
VisualEmotionRecognitionAgent,
)
from control_backend.core.config import settings
from control_backend.schemas.internal_message import InternalMessage
from control_backend.schemas.ri_message import PauseCommand
from ..actuation.robot_speech_agent import RobotSpeechAgent
from ..perception import VADAgent
@@ -335,12 +332,4 @@ class RICommunicationAgent(BaseAgent):
self.logger.debug("Restarting communication negotiation.")
if await self._negotiate_connection(max_retries=2):
self.connected = True
async def handle_message(self, msg: InternalMessage):
return
try:
pause_command = PauseCommand.model_validate_json(msg.body)
await self._req_socket.send_json(pause_command.model_dump())
self.logger.debug(await self._req_socket.recv_json())
except ValidationError:
self.logger.warning("Incorrect message format for PauseCommand.")

View File

@@ -1,3 +1,4 @@
import asyncio
import json
import time
from collections import Counter, defaultdict
@@ -7,7 +8,6 @@ import numpy as np
import zmq
import zmq.asyncio as azmq
from pydantic_core import ValidationError
import struct
from control_backend.agents import BaseAgent
from control_backend.agents.perception.visual_emotion_recognition_agent.visual_emotion_recognizer import ( # noqa
@@ -46,6 +46,11 @@ class VisualEmotionRecognitionAgent(BaseAgent):
self.window_duration = window_duration
self.min_frames_required = min_frames_required
# Pause functionality
# NOTE: flag is set when running, cleared when paused
self._paused = asyncio.Event()
self._paused.set()
async def setup(self):
"""
Initialize the agent resources.
@@ -88,6 +93,8 @@ class VisualEmotionRecognitionAgent(BaseAgent):
while self._running:
try:
await self._paused.wait()
frame_bytes = await self.video_in_socket.recv()
# Convert bytes to a numpy buffer
@@ -167,3 +174,34 @@ class VisualEmotionRecognitionAgent(BaseAgent):
thread="beliefs",
)
await self.send(message)
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming messages.
Expects messages to pause or resume the Visual Emotion Recognition
processing from User Interrupt Agent.
:param msg: The received internal message.
"""
sender = msg.sender
if sender == settings.agent_settings.user_interrupt_name:
if msg.body == "PAUSE":
self.logger.info("Pausing Visual Emotion Recognition processing.")
self._paused.clear()
elif msg.body == "RESUME":
self.logger.info("Resuming Visual Emotion Recognition processing.")
self._paused.set()
else:
self.logger.warning(f"Unknown command from User Interrupt Agent: {msg.body}")
else:
self.logger.debug(f"Ignoring message from unknown sender: {sender}")
async def stop(self):
"""
Clean up resources used by the agent.
"""
self.video_in_socket.close()
await super().stop()

View File

@@ -378,34 +378,29 @@ class UserInterruptAgent(BaseAgent):
self.logger.debug("Sending experiment control '%s' to BDI Core.", thread)
await self.send(out_msg)
async def _send_pause_command(self, pause):
async def _send_pause_command(self, pause: str):
"""
Send a pause command to the Robot Interface via the RI Communication Agent.
Send a pause command to the other internal agents; for now just VAD agent.
Send a pause command to the other internal agents; for now just VAD and VED agent.
"""
cmd = PauseCommand(data=pause)
message = InternalMessage(
to=settings.agent_settings.ri_communication_name,
sender=self.name,
body=cmd.model_dump_json(),
)
await self.send(message)
if pause == "true":
# Send pause to VAD agent
# Send pause to VAD and VED agent
vad_message = InternalMessage(
to=settings.agent_settings.vad_name,
to=[settings.agent_settings.vad_name,
settings.agent_settings.visual_emotion_recognition_name],
sender=self.name,
body="PAUSE",
)
await self.send(vad_message)
self.logger.info("Sent pause command to VAD Agent and RI Communication Agent.")
# Voice Activity Detection and Visual Emotion Recognition agents
self.logger.info("Sent pause command to VAD and VED agents.")
else:
# Send resume to VAD agent
# Send resume to VAD and VED agents
vad_message = InternalMessage(
to=settings.agent_settings.vad_name,
to=[settings.agent_settings.vad_name,
settings.agent_settings.visual_emotion_recognition_name],
sender=self.name,
body="RESUME",
)
await self.send(vad_message)
self.logger.info("Sent resume command to VAD Agent and RI Communication Agent.")
# Voice Activity Detection and Visual Emotion Recognition agents
self.logger.info("Sent resume command to VAD and VED agents.")