The Big One #43
@@ -83,6 +83,8 @@ class RobotGestureAgent(BaseAgent):
|
|||||||
self.subsocket.close()
|
self.subsocket.close()
|
||||||
if self.pubsocket:
|
if self.pubsocket:
|
||||||
self.pubsocket.close()
|
self.pubsocket.close()
|
||||||
|
if self.repsocket:
|
||||||
|
self.repsocket.close()
|
||||||
await super().stop()
|
await super().stop()
|
||||||
|
|
||||||
async def handle_message(self, msg: InternalMessage):
|
async def handle_message(self, msg: InternalMessage):
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ from zmq.asyncio import Context
|
|||||||
from control_backend.agents import BaseAgent
|
from control_backend.agents import BaseAgent
|
||||||
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
|
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
|
||||||
from control_backend.core.config import settings
|
from control_backend.core.config import settings
|
||||||
|
from control_backend.schemas.internal_message import InternalMessage
|
||||||
|
|
||||||
from ..actuation.robot_speech_agent import RobotSpeechAgent
|
from ..actuation.robot_speech_agent import RobotSpeechAgent
|
||||||
from ..perception import VADAgent
|
from ..perception import VADAgent
|
||||||
@@ -47,6 +48,8 @@ class RICommunicationAgent(BaseAgent):
|
|||||||
self._req_socket: azmq.Socket | None = None
|
self._req_socket: azmq.Socket | None = None
|
||||||
self.pub_socket: azmq.Socket | None = None
|
self.pub_socket: azmq.Socket | None = None
|
||||||
self.connected = False
|
self.connected = False
|
||||||
|
self.gesture_agent: RobotGestureAgent | None = None
|
||||||
|
self.speech_agent: RobotSpeechAgent | None = None
|
||||||
|
|
||||||
async def setup(self):
|
async def setup(self):
|
||||||
"""
|
"""
|
||||||
@@ -140,6 +143,7 @@ class RICommunicationAgent(BaseAgent):
|
|||||||
|
|
||||||
# At this point, we have a valid response
|
# At this point, we have a valid response
|
||||||
try:
|
try:
|
||||||
|
self.logger.debug("Negotiation successful. Handling rn")
|
||||||
await self._handle_negotiation_response(received_message)
|
await self._handle_negotiation_response(received_message)
|
||||||
# Let UI know that we're connected
|
# Let UI know that we're connected
|
||||||
topic = b"ping"
|
topic = b"ping"
|
||||||
@@ -188,6 +192,7 @@ class RICommunicationAgent(BaseAgent):
|
|||||||
address=addr,
|
address=addr,
|
||||||
bind=bind,
|
bind=bind,
|
||||||
)
|
)
|
||||||
|
self.speech_agent = robot_speech_agent
|
||||||
robot_gesture_agent = RobotGestureAgent(
|
robot_gesture_agent = RobotGestureAgent(
|
||||||
settings.agent_settings.robot_gesture_name,
|
settings.agent_settings.robot_gesture_name,
|
||||||
address=addr,
|
address=addr,
|
||||||
@@ -195,6 +200,7 @@ class RICommunicationAgent(BaseAgent):
|
|||||||
gesture_data=gesture_data,
|
gesture_data=gesture_data,
|
||||||
single_gesture_data=single_gesture_data,
|
single_gesture_data=single_gesture_data,
|
||||||
)
|
)
|
||||||
|
self.gesture_agent = robot_gesture_agent
|
||||||
await robot_speech_agent.start()
|
await robot_speech_agent.start()
|
||||||
await asyncio.sleep(0.1) # Small delay
|
await asyncio.sleep(0.1) # Small delay
|
||||||
await robot_gesture_agent.start()
|
await robot_gesture_agent.start()
|
||||||
@@ -225,6 +231,7 @@ class RICommunicationAgent(BaseAgent):
|
|||||||
while self._running:
|
while self._running:
|
||||||
if not self.connected:
|
if not self.connected:
|
||||||
await asyncio.sleep(settings.behaviour_settings.sleep_s)
|
await asyncio.sleep(settings.behaviour_settings.sleep_s)
|
||||||
|
self.logger.debug("Not connected, skipping ping loop iteration.")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# We need to listen and send pings.
|
# We need to listen and send pings.
|
||||||
@@ -289,13 +296,36 @@ class RICommunicationAgent(BaseAgent):
|
|||||||
# Tell UI we're disconnected.
|
# Tell UI we're disconnected.
|
||||||
topic = b"ping"
|
topic = b"ping"
|
||||||
data = json.dumps(False).encode()
|
data = json.dumps(False).encode()
|
||||||
|
self.logger.debug("1")
|
||||||
if self.pub_socket:
|
if self.pub_socket:
|
||||||
try:
|
try:
|
||||||
|
self.logger.debug("2")
|
||||||
await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5)
|
await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5)
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
|
self.logger.debug("3")
|
||||||
self.logger.warning("Connection ping for router timed out.")
|
self.logger.warning("Connection ping for router timed out.")
|
||||||
|
|
||||||
# Try to reboot/renegotiate
|
# Try to reboot/renegotiate
|
||||||
|
if self.gesture_agent is not None:
|
||||||
|
await self.gesture_agent.stop()
|
||||||
|
|
||||||
|
if self.speech_agent is not None:
|
||||||
|
await self.speech_agent.stop()
|
||||||
|
|
||||||
|
if self.pub_socket is not None:
|
||||||
|
self.pub_socket.close()
|
||||||
|
|
||||||
self.logger.debug("Restarting communication negotiation.")
|
self.logger.debug("Restarting communication negotiation.")
|
||||||
if await self._negotiate_connection(max_retries=1):
|
if await self._negotiate_connection(max_retries=2):
|
||||||
self.connected = True
|
self.connected = True
|
||||||
|
|
||||||
|
async def handle_message(self, msg: InternalMessage):
|
||||||
|
"""
|
||||||
|
Handle an incoming message.
|
||||||
|
|
||||||
|
Currently not implemented for this agent.
|
||||||
|
|
||||||
|
:param msg: The received message.
|
||||||
|
:raises NotImplementedError: Always, since this method is not implemented.
|
||||||
|
"""
|
||||||
|
self.logger.warning("custom warning for handle msg in ri coms %s", self.name)
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import zmq.asyncio as azmq
|
|||||||
|
|
||||||
from control_backend.agents import BaseAgent
|
from control_backend.agents import BaseAgent
|
||||||
from control_backend.core.config import settings
|
from control_backend.core.config import settings
|
||||||
|
from control_backend.schemas.internal_message import InternalMessage
|
||||||
|
|
||||||
from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus
|
from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus
|
||||||
from .transcription_agent.transcription_agent import TranscriptionAgent
|
from .transcription_agent.transcription_agent import TranscriptionAgent
|
||||||
@@ -86,6 +87,12 @@ class VADAgent(BaseAgent):
|
|||||||
self.audio_buffer = np.array([], dtype=np.float32)
|
self.audio_buffer = np.array([], dtype=np.float32)
|
||||||
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
|
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
|
||||||
self._ready = asyncio.Event()
|
self._ready = asyncio.Event()
|
||||||
|
|
||||||
|
# Pause control
|
||||||
|
self._reset_needed = False
|
||||||
|
self._paused = asyncio.Event()
|
||||||
|
self._paused.set() # Not paused at start
|
||||||
|
|
||||||
self.model = None
|
self.model = None
|
||||||
|
|
||||||
async def setup(self):
|
async def setup(self):
|
||||||
@@ -213,6 +220,16 @@ class VADAgent(BaseAgent):
|
|||||||
"""
|
"""
|
||||||
await self._ready.wait()
|
await self._ready.wait()
|
||||||
while self._running:
|
while self._running:
|
||||||
|
await self._paused.wait()
|
||||||
|
|
||||||
|
# After being unpaused, reset stream and buffers
|
||||||
|
if self._reset_needed:
|
||||||
|
self.logger.debug("Resuming: resetting stream and buffers.")
|
||||||
|
await self._reset_stream()
|
||||||
|
self.audio_buffer = np.array([], dtype=np.float32)
|
||||||
|
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
|
||||||
|
self._reset_needed = False
|
||||||
|
|
||||||
assert self.audio_in_poller is not None
|
assert self.audio_in_poller is not None
|
||||||
data = await self.audio_in_poller.poll()
|
data = await self.audio_in_poller.poll()
|
||||||
if data is None:
|
if data is None:
|
||||||
@@ -252,7 +269,30 @@ class VADAgent(BaseAgent):
|
|||||||
assert self.audio_out_socket is not None
|
assert self.audio_out_socket is not None
|
||||||
await self.audio_out_socket.send(self.audio_buffer[: -2 * len(chunk)].tobytes())
|
await self.audio_out_socket.send(self.audio_buffer[: -2 * len(chunk)].tobytes())
|
||||||
|
|
||||||
# At this point, we know that there is no speech.
|
# At this point, we know that the speech has ended.
|
||||||
# Prepend the last few chunks that had no speech, for a more fluent boundary.
|
# Prepend the last chunk that had no speech, for a more fluent boundary
|
||||||
self.audio_buffer = np.append(self.audio_buffer, chunk)
|
self.audio_buffer = chunk
|
||||||
self.audio_buffer = self.audio_buffer[-begin_silence_length * len(chunk) :]
|
|
||||||
|
async def handle_message(self, msg: InternalMessage):
|
||||||
|
"""
|
||||||
|
Handle incoming messages.
|
||||||
|
|
||||||
|
Expects messages to pause or resume the VAD processing from User Interrupt Agent.
|
||||||
|
|
||||||
|
:param msg: The received internal message.
|
||||||
|
"""
|
||||||
|
sender = msg.sender
|
||||||
|
|
||||||
|
if sender == settings.agent_settings.user_interrupt_name:
|
||||||
|
if msg.body == "PAUSE":
|
||||||
|
self.logger.info("Pausing VAD processing.")
|
||||||
|
self._paused.clear()
|
||||||
|
# If the robot needs to pick up speaking where it left off, do not set _reset_needed
|
||||||
|
self._reset_needed = True
|
||||||
|
elif msg.body == "RESUME":
|
||||||
|
self.logger.info("Resuming VAD processing.")
|
||||||
|
self._paused.set()
|
||||||
|
else:
|
||||||
|
self.logger.warning(f"Unknown command from User Interrupt Agent: {msg.body}")
|
||||||
|
else:
|
||||||
|
self.logger.debug(f"Ignoring message from unknown sender: {sender}")
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ class RIEndpoint(str, Enum):
|
|||||||
GESTURE_TAG = "actuate/gesture/tag"
|
GESTURE_TAG = "actuate/gesture/tag"
|
||||||
PING = "ping"
|
PING = "ping"
|
||||||
NEGOTIATE_PORTS = "negotiate/ports"
|
NEGOTIATE_PORTS = "negotiate/ports"
|
||||||
|
PAUSE = ""
|
||||||
|
|
||||||
|
|
||||||
class RIMessage(BaseModel):
|
class RIMessage(BaseModel):
|
||||||
@@ -64,3 +65,15 @@ class GestureCommand(RIMessage):
|
|||||||
if self.endpoint not in allowed:
|
if self.endpoint not in allowed:
|
||||||
raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG")
|
raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG")
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
|
||||||
|
class PauseCommand(RIMessage):
|
||||||
|
"""
|
||||||
|
A specific command to pause or unpause the robot's actions.
|
||||||
|
|
||||||
|
:ivar endpoint: Fixed to ``RIEndpoint.PAUSE``.
|
||||||
|
:ivar data: A boolean indicating whether to pause (True) or unpause (False).
|
||||||
|
"""
|
||||||
|
|
||||||
|
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.PAUSE)
|
||||||
|
data: bool
|
||||||
|
|||||||
Reference in New Issue
Block a user