Compare commits
7 Commits
feat/seman
...
feat/reset
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c91b999104 | ||
|
|
1360567820 | ||
|
|
cc0d5af28c | ||
|
|
be88323cf7 | ||
|
|
76dfcb23ef | ||
|
|
34afca6652 | ||
|
|
324a63e5cc |
@@ -83,6 +83,8 @@ class RobotGestureAgent(BaseAgent):
|
||||
self.subsocket.close()
|
||||
if self.pubsocket:
|
||||
self.pubsocket.close()
|
||||
if self.repsocket:
|
||||
self.repsocket.close()
|
||||
await super().stop()
|
||||
|
||||
async def handle_message(self, msg: InternalMessage):
|
||||
|
||||
@@ -8,6 +8,7 @@ from zmq.asyncio import Context
|
||||
from control_backend.agents import BaseAgent
|
||||
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
|
||||
from control_backend.core.config import settings
|
||||
from control_backend.schemas.internal_message import InternalMessage
|
||||
|
||||
from ..actuation.robot_speech_agent import RobotSpeechAgent
|
||||
from ..perception import VADAgent
|
||||
@@ -47,6 +48,8 @@ class RICommunicationAgent(BaseAgent):
|
||||
self._req_socket: azmq.Socket | None = None
|
||||
self.pub_socket: azmq.Socket | None = None
|
||||
self.connected = False
|
||||
self.gesture_agent: RobotGestureAgent | None = None
|
||||
self.speech_agent: RobotSpeechAgent | None = None
|
||||
|
||||
async def setup(self):
|
||||
"""
|
||||
@@ -140,6 +143,7 @@ class RICommunicationAgent(BaseAgent):
|
||||
|
||||
# At this point, we have a valid response
|
||||
try:
|
||||
self.logger.debug("Negotiation successful. Handling rn")
|
||||
await self._handle_negotiation_response(received_message)
|
||||
# Let UI know that we're connected
|
||||
topic = b"ping"
|
||||
@@ -188,6 +192,7 @@ class RICommunicationAgent(BaseAgent):
|
||||
address=addr,
|
||||
bind=bind,
|
||||
)
|
||||
self.speech_agent = robot_speech_agent
|
||||
robot_gesture_agent = RobotGestureAgent(
|
||||
settings.agent_settings.robot_gesture_name,
|
||||
address=addr,
|
||||
@@ -195,6 +200,7 @@ class RICommunicationAgent(BaseAgent):
|
||||
gesture_data=gesture_data,
|
||||
single_gesture_data=single_gesture_data,
|
||||
)
|
||||
self.gesture_agent = robot_gesture_agent
|
||||
await robot_speech_agent.start()
|
||||
await asyncio.sleep(0.1) # Small delay
|
||||
await robot_gesture_agent.start()
|
||||
@@ -225,6 +231,7 @@ class RICommunicationAgent(BaseAgent):
|
||||
while self._running:
|
||||
if not self.connected:
|
||||
await asyncio.sleep(settings.behaviour_settings.sleep_s)
|
||||
self.logger.debug("Not connected, skipping ping loop iteration.")
|
||||
continue
|
||||
|
||||
# We need to listen and send pings.
|
||||
@@ -288,13 +295,36 @@ class RICommunicationAgent(BaseAgent):
|
||||
# Tell UI we're disconnected.
|
||||
topic = b"ping"
|
||||
data = json.dumps(False).encode()
|
||||
self.logger.debug("1")
|
||||
if self.pub_socket:
|
||||
try:
|
||||
self.logger.debug("2")
|
||||
await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5)
|
||||
except TimeoutError:
|
||||
self.logger.debug("3")
|
||||
self.logger.warning("Connection ping for router timed out.")
|
||||
|
||||
# Try to reboot/renegotiate
|
||||
if self.gesture_agent is not None:
|
||||
await self.gesture_agent.stop()
|
||||
|
||||
if self.speech_agent is not None:
|
||||
await self.speech_agent.stop()
|
||||
|
||||
if self.pub_socket is not None:
|
||||
self.pub_socket.close()
|
||||
|
||||
self.logger.debug("Restarting communication negotiation.")
|
||||
if await self._negotiate_connection(max_retries=1):
|
||||
if await self._negotiate_connection(max_retries=2):
|
||||
self.connected = True
|
||||
|
||||
async def handle_message(self, msg: InternalMessage):
|
||||
"""
|
||||
Handle an incoming message.
|
||||
|
||||
Currently not implemented for this agent.
|
||||
|
||||
:param msg: The received message.
|
||||
:raises NotImplementedError: Always, since this method is not implemented.
|
||||
"""
|
||||
self.logger.warning("custom warning for handle msg in ri coms %s", self.name)
|
||||
|
||||
@@ -7,6 +7,7 @@ import zmq.asyncio as azmq
|
||||
|
||||
from control_backend.agents import BaseAgent
|
||||
from control_backend.core.config import settings
|
||||
from control_backend.schemas.internal_message import InternalMessage
|
||||
|
||||
from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus
|
||||
from .transcription_agent.transcription_agent import TranscriptionAgent
|
||||
@@ -86,6 +87,12 @@ class VADAgent(BaseAgent):
|
||||
self.audio_buffer = np.array([], dtype=np.float32)
|
||||
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
|
||||
self._ready = asyncio.Event()
|
||||
|
||||
# Pause control
|
||||
self._reset_needed = False
|
||||
self._paused = asyncio.Event()
|
||||
self._paused.set() # Not paused at start
|
||||
|
||||
self.model = None
|
||||
|
||||
async def setup(self):
|
||||
@@ -213,6 +220,16 @@ class VADAgent(BaseAgent):
|
||||
"""
|
||||
await self._ready.wait()
|
||||
while self._running:
|
||||
await self._paused.wait()
|
||||
|
||||
# After being unpaused, reset stream and buffers
|
||||
if self._reset_needed:
|
||||
self.logger.debug("Resuming: resetting stream and buffers.")
|
||||
await self._reset_stream()
|
||||
self.audio_buffer = np.array([], dtype=np.float32)
|
||||
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
|
||||
self._reset_needed = False
|
||||
|
||||
assert self.audio_in_poller is not None
|
||||
data = await self.audio_in_poller.poll()
|
||||
if data is None:
|
||||
@@ -254,3 +271,27 @@ class VADAgent(BaseAgent):
|
||||
# At this point, we know that the speech has ended.
|
||||
# Prepend the last chunk that had no speech, for a more fluent boundary
|
||||
self.audio_buffer = chunk
|
||||
|
||||
async def handle_message(self, msg: InternalMessage):
|
||||
"""
|
||||
Handle incoming messages.
|
||||
|
||||
Expects messages to pause or resume the VAD processing from User Interrupt Agent.
|
||||
|
||||
:param msg: The received internal message.
|
||||
"""
|
||||
sender = msg.sender
|
||||
|
||||
if sender == settings.agent_settings.user_interrupt_name:
|
||||
if msg.body == "PAUSE":
|
||||
self.logger.info("Pausing VAD processing.")
|
||||
self._paused.clear()
|
||||
# If the robot needs to pick up speaking where it left off, do not set _reset_needed
|
||||
self._reset_needed = True
|
||||
elif msg.body == "RESUME":
|
||||
self.logger.info("Resuming VAD processing.")
|
||||
self._paused.set()
|
||||
else:
|
||||
self.logger.warning(f"Unknown command from User Interrupt Agent: {msg.body}")
|
||||
else:
|
||||
self.logger.debug(f"Ignoring message from unknown sender: {sender}")
|
||||
|
||||
@@ -6,7 +6,12 @@ from zmq.asyncio import Context
|
||||
from control_backend.agents import BaseAgent
|
||||
from control_backend.core.agent_system import InternalMessage
|
||||
from control_backend.core.config import settings
|
||||
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand
|
||||
from control_backend.schemas.ri_message import (
|
||||
GestureCommand,
|
||||
PauseCommand,
|
||||
RIEndpoint,
|
||||
SpeechCommand,
|
||||
)
|
||||
|
||||
|
||||
class UserInterruptAgent(BaseAgent):
|
||||
@@ -71,6 +76,19 @@ class UserInterruptAgent(BaseAgent):
|
||||
"Forwarded button press (override) with context '%s' to BDIProgramManager.",
|
||||
event_context,
|
||||
)
|
||||
elif event_type == "pause":
|
||||
self.logger.debug(
|
||||
"Received pause/resume button press with context '%s'.", event_context
|
||||
)
|
||||
await self._send_pause_command(event_context)
|
||||
if event_context:
|
||||
self.logger.info("Sent pause command.")
|
||||
else:
|
||||
self.logger.info("Sent resume command.")
|
||||
|
||||
elif event_type in ["next_phase", "reset_phase", "reset_experiment"]:
|
||||
await self._send_experiment_control_to_bdi_core(event_type)
|
||||
|
||||
else:
|
||||
self.logger.warning(
|
||||
"Received button press with unknown type '%s' (context: '%s').",
|
||||
@@ -78,6 +96,36 @@ class UserInterruptAgent(BaseAgent):
|
||||
event_context,
|
||||
)
|
||||
|
||||
async def _send_experiment_control_to_bdi_core(self, type):
|
||||
"""
|
||||
method to send experiment control buttons to bdi core.
|
||||
|
||||
:param type: the type of control button we should send to the bdi core.
|
||||
"""
|
||||
# Switch which thread we should send to bdi core
|
||||
thread = ""
|
||||
match type:
|
||||
case "next_phase":
|
||||
thread = "force_next_phase"
|
||||
case "reset_phase":
|
||||
thread = "reset_current_phase"
|
||||
case "reset_experiment":
|
||||
thread = "reset_experiment"
|
||||
case _:
|
||||
self.logger.warning(
|
||||
"Received unknown experiment control type '%s' to send to BDI Core.",
|
||||
type,
|
||||
)
|
||||
|
||||
out_msg = InternalMessage(
|
||||
to=settings.agent_settings.bdi_core_name,
|
||||
sender=self.name,
|
||||
thread=thread,
|
||||
body="",
|
||||
)
|
||||
self.logger.debug("Sending experiment control '%s' to BDI Core.", thread)
|
||||
await self.send(out_msg)
|
||||
|
||||
async def _send_to_speech_agent(self, text_to_say: str):
|
||||
"""
|
||||
method to send prioritized speech command to RobotSpeechAgent.
|
||||
@@ -130,6 +178,38 @@ class UserInterruptAgent(BaseAgent):
|
||||
belief_id,
|
||||
)
|
||||
|
||||
async def _send_pause_command(self, pause):
|
||||
"""
|
||||
Send a pause command to the Robot Interface via the RI Communication Agent.
|
||||
Send a pause command to the other internal agents; for now just VAD agent.
|
||||
"""
|
||||
cmd = PauseCommand(data=pause)
|
||||
message = InternalMessage(
|
||||
to=settings.agent_settings.ri_communication_name,
|
||||
sender=self.name,
|
||||
body=cmd.model_dump_json(),
|
||||
)
|
||||
await self.send(message)
|
||||
|
||||
if pause == "true":
|
||||
# Send pause to VAD agent
|
||||
vad_message = InternalMessage(
|
||||
to=settings.agent_settings.vad_name,
|
||||
sender=self.name,
|
||||
body="PAUSE",
|
||||
)
|
||||
await self.send(vad_message)
|
||||
self.logger.info("Sent pause command to VAD Agent and RI Communication Agent.")
|
||||
else:
|
||||
# Send resume to VAD agent
|
||||
vad_message = InternalMessage(
|
||||
to=settings.agent_settings.vad_name,
|
||||
sender=self.name,
|
||||
body="RESUME",
|
||||
)
|
||||
await self.send(vad_message)
|
||||
self.logger.info("Sent resume command to VAD Agent and RI Communication Agent.")
|
||||
|
||||
async def setup(self):
|
||||
"""
|
||||
Initialize the agent.
|
||||
|
||||
@@ -14,6 +14,7 @@ class RIEndpoint(str, Enum):
|
||||
GESTURE_TAG = "actuate/gesture/tag"
|
||||
PING = "ping"
|
||||
NEGOTIATE_PORTS = "negotiate/ports"
|
||||
PAUSE = ""
|
||||
|
||||
|
||||
class RIMessage(BaseModel):
|
||||
@@ -64,3 +65,15 @@ class GestureCommand(RIMessage):
|
||||
if self.endpoint not in allowed:
|
||||
raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG")
|
||||
return self
|
||||
|
||||
|
||||
class PauseCommand(RIMessage):
|
||||
"""
|
||||
A specific command to pause or unpause the robot's actions.
|
||||
|
||||
:ivar endpoint: Fixed to ``RIEndpoint.PAUSE``.
|
||||
:ivar data: A boolean indicating whether to pause (True) or unpause (False).
|
||||
"""
|
||||
|
||||
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.PAUSE)
|
||||
data: bool
|
||||
|
||||
Reference in New Issue
Block a user