Compare commits

...

7 Commits

Author SHA1 Message Date
Björn Otgaar
c91b999104 chore: fix bugs and make sure connected robots work 2026-01-08 15:31:44 +01:00
Björn Otgaar
1360567820 chore: indenting 2026-01-08 13:01:38 +01:00
Björn Otgaar
cc0d5af28c chore: fixing bugs 2026-01-08 12:56:22 +01:00
Björn Otgaar
be88323cf7 chore: add one endpoint fo avoid errors 2026-01-07 18:24:35 +01:00
Storm
76dfcb23ef feat: added pause functionality
ref: N25B-350
2026-01-07 16:03:49 +01:00
Björn Otgaar
34afca6652 chore: automatically send the experiment controls to the bdi core in the user interupt agent. 2026-01-07 15:07:33 +01:00
Björn Otgaar
324a63e5cc chore: add styles to user_interrupt_agent 2026-01-07 14:45:42 +01:00
5 changed files with 168 additions and 2 deletions

View File

@@ -83,6 +83,8 @@ class RobotGestureAgent(BaseAgent):
self.subsocket.close()
if self.pubsocket:
self.pubsocket.close()
if self.repsocket:
self.repsocket.close()
await super().stop()
async def handle_message(self, msg: InternalMessage):

View File

@@ -8,6 +8,7 @@ from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
from control_backend.core.config import settings
from control_backend.schemas.internal_message import InternalMessage
from ..actuation.robot_speech_agent import RobotSpeechAgent
from ..perception import VADAgent
@@ -47,6 +48,8 @@ class RICommunicationAgent(BaseAgent):
self._req_socket: azmq.Socket | None = None
self.pub_socket: azmq.Socket | None = None
self.connected = False
self.gesture_agent: RobotGestureAgent | None = None
self.speech_agent: RobotSpeechAgent | None = None
async def setup(self):
"""
@@ -140,6 +143,7 @@ class RICommunicationAgent(BaseAgent):
# At this point, we have a valid response
try:
self.logger.debug("Negotiation successful. Handling rn")
await self._handle_negotiation_response(received_message)
# Let UI know that we're connected
topic = b"ping"
@@ -188,6 +192,7 @@ class RICommunicationAgent(BaseAgent):
address=addr,
bind=bind,
)
self.speech_agent = robot_speech_agent
robot_gesture_agent = RobotGestureAgent(
settings.agent_settings.robot_gesture_name,
address=addr,
@@ -195,6 +200,7 @@ class RICommunicationAgent(BaseAgent):
gesture_data=gesture_data,
single_gesture_data=single_gesture_data,
)
self.gesture_agent = robot_gesture_agent
await robot_speech_agent.start()
await asyncio.sleep(0.1) # Small delay
await robot_gesture_agent.start()
@@ -225,6 +231,7 @@ class RICommunicationAgent(BaseAgent):
while self._running:
if not self.connected:
await asyncio.sleep(settings.behaviour_settings.sleep_s)
self.logger.debug("Not connected, skipping ping loop iteration.")
continue
# We need to listen and send pings.
@@ -288,13 +295,36 @@ class RICommunicationAgent(BaseAgent):
# Tell UI we're disconnected.
topic = b"ping"
data = json.dumps(False).encode()
self.logger.debug("1")
if self.pub_socket:
try:
self.logger.debug("2")
await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5)
except TimeoutError:
self.logger.debug("3")
self.logger.warning("Connection ping for router timed out.")
# Try to reboot/renegotiate
if self.gesture_agent is not None:
await self.gesture_agent.stop()
if self.speech_agent is not None:
await self.speech_agent.stop()
if self.pub_socket is not None:
self.pub_socket.close()
self.logger.debug("Restarting communication negotiation.")
if await self._negotiate_connection(max_retries=1):
if await self._negotiate_connection(max_retries=2):
self.connected = True
async def handle_message(self, msg: InternalMessage):
"""
Handle an incoming message.
Currently not implemented for this agent.
:param msg: The received message.
:raises NotImplementedError: Always, since this method is not implemented.
"""
self.logger.warning("custom warning for handle msg in ri coms %s", self.name)

View File

@@ -7,6 +7,7 @@ import zmq.asyncio as azmq
from control_backend.agents import BaseAgent
from control_backend.core.config import settings
from control_backend.schemas.internal_message import InternalMessage
from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus
from .transcription_agent.transcription_agent import TranscriptionAgent
@@ -86,6 +87,12 @@ class VADAgent(BaseAgent):
self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
self._ready = asyncio.Event()
# Pause control
self._reset_needed = False
self._paused = asyncio.Event()
self._paused.set() # Not paused at start
self.model = None
async def setup(self):
@@ -213,6 +220,16 @@ class VADAgent(BaseAgent):
"""
await self._ready.wait()
while self._running:
await self._paused.wait()
# After being unpaused, reset stream and buffers
if self._reset_needed:
self.logger.debug("Resuming: resetting stream and buffers.")
await self._reset_stream()
self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
self._reset_needed = False
assert self.audio_in_poller is not None
data = await self.audio_in_poller.poll()
if data is None:
@@ -254,3 +271,27 @@ class VADAgent(BaseAgent):
# At this point, we know that the speech has ended.
# Prepend the last chunk that had no speech, for a more fluent boundary
self.audio_buffer = chunk
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming messages.
Expects messages to pause or resume the VAD processing from User Interrupt Agent.
:param msg: The received internal message.
"""
sender = msg.sender
if sender == settings.agent_settings.user_interrupt_name:
if msg.body == "PAUSE":
self.logger.info("Pausing VAD processing.")
self._paused.clear()
# If the robot needs to pick up speaking where it left off, do not set _reset_needed
self._reset_needed = True
elif msg.body == "RESUME":
self.logger.info("Resuming VAD processing.")
self._paused.set()
else:
self.logger.warning(f"Unknown command from User Interrupt Agent: {msg.body}")
else:
self.logger.debug(f"Ignoring message from unknown sender: {sender}")

View File

@@ -6,7 +6,12 @@ from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand
from control_backend.schemas.ri_message import (
GestureCommand,
PauseCommand,
RIEndpoint,
SpeechCommand,
)
class UserInterruptAgent(BaseAgent):
@@ -71,6 +76,19 @@ class UserInterruptAgent(BaseAgent):
"Forwarded button press (override) with context '%s' to BDIProgramManager.",
event_context,
)
elif event_type == "pause":
self.logger.debug(
"Received pause/resume button press with context '%s'.", event_context
)
await self._send_pause_command(event_context)
if event_context:
self.logger.info("Sent pause command.")
else:
self.logger.info("Sent resume command.")
elif event_type in ["next_phase", "reset_phase", "reset_experiment"]:
await self._send_experiment_control_to_bdi_core(event_type)
else:
self.logger.warning(
"Received button press with unknown type '%s' (context: '%s').",
@@ -78,6 +96,36 @@ class UserInterruptAgent(BaseAgent):
event_context,
)
async def _send_experiment_control_to_bdi_core(self, type):
"""
method to send experiment control buttons to bdi core.
:param type: the type of control button we should send to the bdi core.
"""
# Switch which thread we should send to bdi core
thread = ""
match type:
case "next_phase":
thread = "force_next_phase"
case "reset_phase":
thread = "reset_current_phase"
case "reset_experiment":
thread = "reset_experiment"
case _:
self.logger.warning(
"Received unknown experiment control type '%s' to send to BDI Core.",
type,
)
out_msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
thread=thread,
body="",
)
self.logger.debug("Sending experiment control '%s' to BDI Core.", thread)
await self.send(out_msg)
async def _send_to_speech_agent(self, text_to_say: str):
"""
method to send prioritized speech command to RobotSpeechAgent.
@@ -130,6 +178,38 @@ class UserInterruptAgent(BaseAgent):
belief_id,
)
async def _send_pause_command(self, pause):
"""
Send a pause command to the Robot Interface via the RI Communication Agent.
Send a pause command to the other internal agents; for now just VAD agent.
"""
cmd = PauseCommand(data=pause)
message = InternalMessage(
to=settings.agent_settings.ri_communication_name,
sender=self.name,
body=cmd.model_dump_json(),
)
await self.send(message)
if pause == "true":
# Send pause to VAD agent
vad_message = InternalMessage(
to=settings.agent_settings.vad_name,
sender=self.name,
body="PAUSE",
)
await self.send(vad_message)
self.logger.info("Sent pause command to VAD Agent and RI Communication Agent.")
else:
# Send resume to VAD agent
vad_message = InternalMessage(
to=settings.agent_settings.vad_name,
sender=self.name,
body="RESUME",
)
await self.send(vad_message)
self.logger.info("Sent resume command to VAD Agent and RI Communication Agent.")
async def setup(self):
"""
Initialize the agent.

View File

@@ -14,6 +14,7 @@ class RIEndpoint(str, Enum):
GESTURE_TAG = "actuate/gesture/tag"
PING = "ping"
NEGOTIATE_PORTS = "negotiate/ports"
PAUSE = ""
class RIMessage(BaseModel):
@@ -64,3 +65,15 @@ class GestureCommand(RIMessage):
if self.endpoint not in allowed:
raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG")
return self
class PauseCommand(RIMessage):
"""
A specific command to pause or unpause the robot's actions.
:ivar endpoint: Fixed to ``RIEndpoint.PAUSE``.
:ivar data: A boolean indicating whether to pause (True) or unpause (False).
"""
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.PAUSE)
data: bool