diff --git a/src/control_backend/agents/communication/ri_communication_agent.py b/src/control_backend/agents/communication/ri_communication_agent.py index 2056c0d..746705c 100644 --- a/src/control_backend/agents/communication/ri_communication_agent.py +++ b/src/control_backend/agents/communication/ri_communication_agent.py @@ -3,7 +3,6 @@ import json import zmq import zmq.asyncio as azmq -from pydantic import ValidationError from zmq.asyncio import Context from control_backend.agents import BaseAgent @@ -12,8 +11,6 @@ from control_backend.agents.perception.visual_emotion_recognition_agent.visual_e VisualEmotionRecognitionAgent, ) from control_backend.core.config import settings -from control_backend.schemas.internal_message import InternalMessage -from control_backend.schemas.ri_message import PauseCommand from ..actuation.robot_speech_agent import RobotSpeechAgent from ..perception import VADAgent @@ -335,12 +332,4 @@ class RICommunicationAgent(BaseAgent): self.logger.debug("Restarting communication negotiation.") if await self._negotiate_connection(max_retries=2): self.connected = True - - async def handle_message(self, msg: InternalMessage): - return - try: - pause_command = PauseCommand.model_validate_json(msg.body) - await self._req_socket.send_json(pause_command.model_dump()) - self.logger.debug(await self._req_socket.recv_json()) - except ValidationError: - self.logger.warning("Incorrect message format for PauseCommand.") + \ No newline at end of file diff --git a/src/control_backend/agents/perception/visual_emotion_recognition_agent/visual_emotion_recognition_agent.py b/src/control_backend/agents/perception/visual_emotion_recognition_agent/visual_emotion_recognition_agent.py index 5344b9b..52f97a2 100644 --- a/src/control_backend/agents/perception/visual_emotion_recognition_agent/visual_emotion_recognition_agent.py +++ b/src/control_backend/agents/perception/visual_emotion_recognition_agent/visual_emotion_recognition_agent.py @@ -1,3 +1,4 @@ +import asyncio import json import time from collections import Counter, defaultdict @@ -7,7 +8,6 @@ import numpy as np import zmq import zmq.asyncio as azmq from pydantic_core import ValidationError -import struct from control_backend.agents import BaseAgent from control_backend.agents.perception.visual_emotion_recognition_agent.visual_emotion_recognizer import ( # noqa @@ -46,6 +46,11 @@ class VisualEmotionRecognitionAgent(BaseAgent): self.window_duration = window_duration self.min_frames_required = min_frames_required + # Pause functionality + # NOTE: flag is set when running, cleared when paused + self._paused = asyncio.Event() + self._paused.set() + async def setup(self): """ Initialize the agent resources. @@ -88,6 +93,8 @@ class VisualEmotionRecognitionAgent(BaseAgent): while self._running: try: + await self._paused.wait() + frame_bytes = await self.video_in_socket.recv() # Convert bytes to a numpy buffer @@ -167,3 +174,34 @@ class VisualEmotionRecognitionAgent(BaseAgent): thread="beliefs", ) await self.send(message) + + async def handle_message(self, msg: InternalMessage): + """ + Handle incoming messages. + + Expects messages to pause or resume the Visual Emotion Recognition + processing from User Interrupt Agent. + + :param msg: The received internal message. + """ + sender = msg.sender + + if sender == settings.agent_settings.user_interrupt_name: + if msg.body == "PAUSE": + self.logger.info("Pausing Visual Emotion Recognition processing.") + self._paused.clear() + elif msg.body == "RESUME": + self.logger.info("Resuming Visual Emotion Recognition processing.") + self._paused.set() + else: + self.logger.warning(f"Unknown command from User Interrupt Agent: {msg.body}") + else: + self.logger.debug(f"Ignoring message from unknown sender: {sender}") + + + async def stop(self): + """ + Clean up resources used by the agent. + """ + self.video_in_socket.close() + await super().stop() \ No newline at end of file diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py index a42861a..1454c44 100644 --- a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py +++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py @@ -378,34 +378,29 @@ class UserInterruptAgent(BaseAgent): self.logger.debug("Sending experiment control '%s' to BDI Core.", thread) await self.send(out_msg) - async def _send_pause_command(self, pause): + async def _send_pause_command(self, pause: str): """ - Send a pause command to the Robot Interface via the RI Communication Agent. - Send a pause command to the other internal agents; for now just VAD agent. + Send a pause command to the other internal agents; for now just VAD and VED agent. """ - cmd = PauseCommand(data=pause) - message = InternalMessage( - to=settings.agent_settings.ri_communication_name, - sender=self.name, - body=cmd.model_dump_json(), - ) - await self.send(message) - if pause == "true": - # Send pause to VAD agent + # Send pause to VAD and VED agent vad_message = InternalMessage( - to=settings.agent_settings.vad_name, + to=[settings.agent_settings.vad_name, + settings.agent_settings.visual_emotion_recognition_name], sender=self.name, body="PAUSE", ) await self.send(vad_message) - self.logger.info("Sent pause command to VAD Agent and RI Communication Agent.") + # Voice Activity Detection and Visual Emotion Recognition agents + self.logger.info("Sent pause command to VAD and VED agents.") else: - # Send resume to VAD agent + # Send resume to VAD and VED agents vad_message = InternalMessage( - to=settings.agent_settings.vad_name, + to=[settings.agent_settings.vad_name, + settings.agent_settings.visual_emotion_recognition_name], sender=self.name, body="RESUME", ) await self.send(vad_message) - self.logger.info("Sent resume command to VAD Agent and RI Communication Agent.") + # Voice Activity Detection and Visual Emotion Recognition agents + self.logger.info("Sent resume command to VAD and VED agents.") \ No newline at end of file