diff --git a/src/control_backend/agents/actuation/robot_gesture_agent.py b/src/control_backend/agents/actuation/robot_gesture_agent.py index 3b264d2..997b684 100644 --- a/src/control_backend/agents/actuation/robot_gesture_agent.py +++ b/src/control_backend/agents/actuation/robot_gesture_agent.py @@ -83,6 +83,8 @@ class RobotGestureAgent(BaseAgent): self.subsocket.close() if self.pubsocket: self.pubsocket.close() + if self.repsocket: + self.repsocket.close() await super().stop() async def handle_message(self, msg: InternalMessage): diff --git a/src/control_backend/agents/communication/ri_communication_agent.py b/src/control_backend/agents/communication/ri_communication_agent.py index b12bac6..2377421 100644 --- a/src/control_backend/agents/communication/ri_communication_agent.py +++ b/src/control_backend/agents/communication/ri_communication_agent.py @@ -8,6 +8,7 @@ from zmq.asyncio import Context from control_backend.agents import BaseAgent from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent from control_backend.core.config import settings +from control_backend.schemas.internal_message import InternalMessage from ..actuation.robot_speech_agent import RobotSpeechAgent from ..perception import VADAgent @@ -47,6 +48,8 @@ class RICommunicationAgent(BaseAgent): self._req_socket: azmq.Socket | None = None self.pub_socket: azmq.Socket | None = None self.connected = False + self.gesture_agent: RobotGestureAgent | None = None + self.speech_agent: RobotSpeechAgent | None = None async def setup(self): """ @@ -140,6 +143,7 @@ class RICommunicationAgent(BaseAgent): # At this point, we have a valid response try: + self.logger.debug("Negotiation successful. Handling rn") await self._handle_negotiation_response(received_message) # Let UI know that we're connected topic = b"ping" @@ -188,6 +192,7 @@ class RICommunicationAgent(BaseAgent): address=addr, bind=bind, ) + self.speech_agent = robot_speech_agent robot_gesture_agent = RobotGestureAgent( settings.agent_settings.robot_gesture_name, address=addr, @@ -195,6 +200,7 @@ class RICommunicationAgent(BaseAgent): gesture_data=gesture_data, single_gesture_data=single_gesture_data, ) + self.gesture_agent = robot_gesture_agent await robot_speech_agent.start() await asyncio.sleep(0.1) # Small delay await robot_gesture_agent.start() @@ -225,6 +231,7 @@ class RICommunicationAgent(BaseAgent): while self._running: if not self.connected: await asyncio.sleep(settings.behaviour_settings.sleep_s) + self.logger.debug("Not connected, skipping ping loop iteration.") continue # We need to listen and send pings. @@ -289,13 +296,36 @@ class RICommunicationAgent(BaseAgent): # Tell UI we're disconnected. topic = b"ping" data = json.dumps(False).encode() + self.logger.debug("1") if self.pub_socket: try: + self.logger.debug("2") await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5) except TimeoutError: + self.logger.debug("3") self.logger.warning("Connection ping for router timed out.") # Try to reboot/renegotiate + if self.gesture_agent is not None: + await self.gesture_agent.stop() + + if self.speech_agent is not None: + await self.speech_agent.stop() + + if self.pub_socket is not None: + self.pub_socket.close() + self.logger.debug("Restarting communication negotiation.") - if await self._negotiate_connection(max_retries=1): + if await self._negotiate_connection(max_retries=2): self.connected = True + + async def handle_message(self, msg: InternalMessage): + """ + Handle an incoming message. + + Currently not implemented for this agent. + + :param msg: The received message. + :raises NotImplementedError: Always, since this method is not implemented. + """ + self.logger.warning("custom warning for handle msg in ri coms %s", self.name) diff --git a/src/control_backend/agents/perception/vad_agent.py b/src/control_backend/agents/perception/vad_agent.py index e47b27a..2b333f5 100644 --- a/src/control_backend/agents/perception/vad_agent.py +++ b/src/control_backend/agents/perception/vad_agent.py @@ -7,6 +7,7 @@ import zmq.asyncio as azmq from control_backend.agents import BaseAgent from control_backend.core.config import settings +from control_backend.schemas.internal_message import InternalMessage from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus from .transcription_agent.transcription_agent import TranscriptionAgent @@ -86,6 +87,12 @@ class VADAgent(BaseAgent): self.audio_buffer = np.array([], dtype=np.float32) self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech self._ready = asyncio.Event() + + # Pause control + self._reset_needed = False + self._paused = asyncio.Event() + self._paused.set() # Not paused at start + self.model = None async def setup(self): @@ -213,6 +220,16 @@ class VADAgent(BaseAgent): """ await self._ready.wait() while self._running: + await self._paused.wait() + + # After being unpaused, reset stream and buffers + if self._reset_needed: + self.logger.debug("Resuming: resetting stream and buffers.") + await self._reset_stream() + self.audio_buffer = np.array([], dtype=np.float32) + self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech + self._reset_needed = False + assert self.audio_in_poller is not None data = await self.audio_in_poller.poll() if data is None: @@ -252,7 +269,30 @@ class VADAgent(BaseAgent): assert self.audio_out_socket is not None await self.audio_out_socket.send(self.audio_buffer[: -2 * len(chunk)].tobytes()) - # At this point, we know that there is no speech. - # Prepend the last few chunks that had no speech, for a more fluent boundary. - self.audio_buffer = np.append(self.audio_buffer, chunk) - self.audio_buffer = self.audio_buffer[-begin_silence_length * len(chunk) :] + # At this point, we know that the speech has ended. + # Prepend the last chunk that had no speech, for a more fluent boundary + self.audio_buffer = chunk + + async def handle_message(self, msg: InternalMessage): + """ + Handle incoming messages. + + Expects messages to pause or resume the VAD processing from User Interrupt Agent. + + :param msg: The received internal message. + """ + sender = msg.sender + + if sender == settings.agent_settings.user_interrupt_name: + if msg.body == "PAUSE": + self.logger.info("Pausing VAD processing.") + self._paused.clear() + # If the robot needs to pick up speaking where it left off, do not set _reset_needed + self._reset_needed = True + elif msg.body == "RESUME": + self.logger.info("Resuming VAD processing.") + self._paused.set() + else: + self.logger.warning(f"Unknown command from User Interrupt Agent: {msg.body}") + else: + self.logger.debug(f"Ignoring message from unknown sender: {sender}") diff --git a/src/control_backend/schemas/ri_message.py b/src/control_backend/schemas/ri_message.py index a48dec6..e6eafa3 100644 --- a/src/control_backend/schemas/ri_message.py +++ b/src/control_backend/schemas/ri_message.py @@ -14,6 +14,7 @@ class RIEndpoint(str, Enum): GESTURE_TAG = "actuate/gesture/tag" PING = "ping" NEGOTIATE_PORTS = "negotiate/ports" + PAUSE = "" class RIMessage(BaseModel): @@ -64,3 +65,15 @@ class GestureCommand(RIMessage): if self.endpoint not in allowed: raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG") return self + + +class PauseCommand(RIMessage): + """ + A specific command to pause or unpause the robot's actions. + + :ivar endpoint: Fixed to ``RIEndpoint.PAUSE``. + :ivar data: A boolean indicating whether to pause (True) or unpause (False). + """ + + endpoint: RIEndpoint = RIEndpoint(RIEndpoint.PAUSE) + data: bool