From 0501a9fba375ed92551fee60b8b0831d917499fd Mon Sep 17 00:00:00 2001
From: Pim Hutting
Date: Mon, 22 Dec 2025 13:56:02 +0000
Subject: [PATCH] create UserInterruptAgent with connection to UI
---
.gitlab/merge_request_templates/default.md | 9 ++
.../agents/actuation/robot_gesture_agent.py | 11 +-
.../communication/ri_communication_agent.py | 2 +
.../agents/user_interrupt/__init__.py | 0
.../user_interrupt/user_interrupt_agent.py | 146 ++++++++++++++++++
.../api/v1/endpoints/button_pressed.py | 31 ++++
src/control_backend/api/v1/router.py | 4 +-
src/control_backend/core/config.py | 1 +
src/control_backend/main.py | 9 ++
src/control_backend/schemas/events.py | 6 +
src/control_backend/schemas/ri_message.py | 2 +
.../actuation/test_robot_speech_agent.py | 4 +-
.../test_ri_communication_agent.py | 1 +
test/unit/agents/llm/test_llm_agent.py | 3 +
.../user_interrupt/test_user_interrupt.py | 146 ++++++++++++++++++
15 files changed, 371 insertions(+), 4 deletions(-)
create mode 100644 .gitlab/merge_request_templates/default.md
create mode 100644 src/control_backend/agents/user_interrupt/__init__.py
create mode 100644 src/control_backend/agents/user_interrupt/user_interrupt_agent.py
create mode 100644 src/control_backend/api/v1/endpoints/button_pressed.py
create mode 100644 src/control_backend/schemas/events.py
create mode 100644 test/unit/agents/user_interrupt/test_user_interrupt.py
diff --git a/.gitlab/merge_request_templates/default.md b/.gitlab/merge_request_templates/default.md
new file mode 100644
index 0000000..7a76ac5
--- /dev/null
+++ b/.gitlab/merge_request_templates/default.md
@@ -0,0 +1,9 @@
+%{first_multiline_commit_description}
+
+To verify:
+
+- [ ] Style checks pass
+- [ ] Pipeline (tests) pass
+- [ ] Documentation is up to date
+- [ ] Tests are up to date (new code is covered)
+- [ ] ...
diff --git a/src/control_backend/agents/actuation/robot_gesture_agent.py b/src/control_backend/agents/actuation/robot_gesture_agent.py
index e641eba..4f5dd79 100644
--- a/src/control_backend/agents/actuation/robot_gesture_agent.py
+++ b/src/control_backend/agents/actuation/robot_gesture_agent.py
@@ -28,6 +28,7 @@ class RobotGestureAgent(BaseAgent):
address = ""
bind = False
gesture_data = []
+ single_gesture_data = []
def __init__(
self,
@@ -35,8 +36,10 @@ class RobotGestureAgent(BaseAgent):
address=settings.zmq_settings.ri_command_address,
bind=False,
gesture_data=None,
+ single_gesture_data=None,
):
self.gesture_data = gesture_data or []
+ self.single_gesture_data = single_gesture_data or []
super().__init__(name)
self.address = address
self.bind = bind
@@ -99,7 +102,13 @@ class RobotGestureAgent(BaseAgent):
gesture_command.data,
)
return
-
+ elif gesture_command.endpoint == RIEndpoint.GESTURE_SINGLE:
+ if gesture_command.data not in self.single_gesture_data:
+ self.logger.warning(
+ "Received gesture '%s' which is not in available gestures. Early returning",
+ gesture_command.data,
+ )
+ return
await self.pubsocket.send_json(gesture_command.model_dump())
except Exception:
self.logger.exception("Error processing internal message.")
diff --git a/src/control_backend/agents/communication/ri_communication_agent.py b/src/control_backend/agents/communication/ri_communication_agent.py
index a50892c..34e5b25 100644
--- a/src/control_backend/agents/communication/ri_communication_agent.py
+++ b/src/control_backend/agents/communication/ri_communication_agent.py
@@ -182,6 +182,7 @@ class RICommunicationAgent(BaseAgent):
self._req_socket.bind(addr)
case "actuation":
gesture_data = port_data.get("gestures", [])
+ single_gesture_data = port_data.get("single_gestures", [])
robot_speech_agent = RobotSpeechAgent(
settings.agent_settings.robot_speech_name,
address=addr,
@@ -192,6 +193,7 @@ class RICommunicationAgent(BaseAgent):
address=addr,
bind=bind,
gesture_data=gesture_data,
+ single_gesture_data=single_gesture_data,
)
await robot_speech_agent.start()
await asyncio.sleep(0.1) # Small delay
diff --git a/src/control_backend/agents/user_interrupt/__init__.py b/src/control_backend/agents/user_interrupt/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py
new file mode 100644
index 0000000..b2efc41
--- /dev/null
+++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py
@@ -0,0 +1,146 @@
+import json
+
+import zmq
+from zmq.asyncio import Context
+
+from control_backend.agents import BaseAgent
+from control_backend.core.agent_system import InternalMessage
+from control_backend.core.config import settings
+from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand
+
+
+class UserInterruptAgent(BaseAgent):
+ """
+ User Interrupt Agent.
+
+ This agent receives button_pressed events from the external HTTP API
+ (via ZMQ) and uses the associated context to trigger one of the following actions:
+
+ - Send a prioritized message to the `RobotSpeechAgent`
+ - Send a prioritized gesture to the `RobotGestureAgent`
+ - Send a belief override to the `BDIProgramManager`in order to activate a
+ trigger/conditional norm or complete a goal.
+
+ Prioritized actions clear the current RI queue before inserting the new item,
+ ensuring they are executed immediately after Pepper's current action has been fulfilled.
+
+ :ivar sub_socket: The ZMQ SUB socket used to receive user intterupts.
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ self.sub_socket = None
+
+ async def _receive_button_event(self):
+ """
+ The behaviour of the UserInterruptAgent.
+ Continuous loop that receives button_pressed events from the button_pressed HTTP endpoint.
+ These events contain a type and a context.
+
+ These are the different types and contexts:
+ - type: "speech", context: string that the robot has to say.
+ - type: "gesture", context: single gesture name that the robot has to perform.
+ - type: "override", context: belief_id that overrides the goal/trigger/conditional norm.
+ """
+ while True:
+ topic, body = await self.sub_socket.recv_multipart()
+
+ try:
+ event_data = json.loads(body)
+ event_type = event_data.get("type") # e.g., "speech", "gesture"
+ event_context = event_data.get("context") # e.g., "Hello, I am Pepper!"
+ except json.JSONDecodeError:
+ self.logger.error("Received invalid JSON payload on topic %s", topic)
+ continue
+
+ if event_type == "speech":
+ await self._send_to_speech_agent(event_context)
+ self.logger.info(
+ "Forwarded button press (speech) with context '%s' to RobotSpeechAgent.",
+ event_context,
+ )
+ elif event_type == "gesture":
+ await self._send_to_gesture_agent(event_context)
+ self.logger.info(
+ "Forwarded button press (gesture) with context '%s' to RobotGestureAgent.",
+ event_context,
+ )
+ elif event_type == "override":
+ await self._send_to_program_manager(event_context)
+ self.logger.info(
+ "Forwarded button press (override) with context '%s' to BDIProgramManager.",
+ event_context,
+ )
+ else:
+ self.logger.warning(
+ "Received button press with unknown type '%s' (context: '%s').",
+ event_type,
+ event_context,
+ )
+
+ async def _send_to_speech_agent(self, text_to_say: str):
+ """
+ method to send prioritized speech command to RobotSpeechAgent.
+
+ :param text_to_say: The string that the robot has to say.
+ """
+ cmd = SpeechCommand(data=text_to_say, is_priority=True)
+ out_msg = InternalMessage(
+ to=settings.agent_settings.robot_speech_name,
+ sender=self.name,
+ body=cmd.model_dump_json(),
+ )
+ await self.send(out_msg)
+
+ async def _send_to_gesture_agent(self, single_gesture_name: str):
+ """
+ method to send prioritized gesture command to RobotGestureAgent.
+
+ :param single_gesture_name: The gesture tag that the robot has to perform.
+ """
+ # the endpoint is set to always be GESTURE_SINGLE for user interrupts
+ cmd = GestureCommand(
+ endpoint=RIEndpoint.GESTURE_SINGLE, data=single_gesture_name, is_priority=True
+ )
+ out_msg = InternalMessage(
+ to=settings.agent_settings.robot_gesture_name,
+ sender=self.name,
+ body=cmd.model_dump_json(),
+ )
+ await self.send(out_msg)
+
+ async def _send_to_program_manager(self, belief_id: str):
+ """
+ Send a button_override belief to the BDIProgramManager.
+
+ :param belief_id: The belief_id that overrides the goal/trigger/conditional norm.
+ this id can belong to a basic belief or an inferred belief.
+ See also: https://utrechtuniversity.youtrack.cloud/articles/N25B-A-27/UI-components
+ """
+ data = {"belief": belief_id}
+ message = InternalMessage(
+ to=settings.agent_settings.bdi_program_manager_name,
+ sender=self.name,
+ body=json.dumps(data),
+ thread="belief_override_id",
+ )
+ await self.send(message)
+ self.logger.info(
+ "Sent button_override belief with id '%s' to Program manager.",
+ belief_id,
+ )
+
+ async def setup(self):
+ """
+ Initialize the agent.
+
+ Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic.
+ Starts the background behavior to receive the user interrupts.
+ """
+ context = Context.instance()
+
+ self.sub_socket = context.socket(zmq.SUB)
+ self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
+ self.sub_socket.subscribe("button_pressed")
+
+ self.add_behavior(self._receive_button_event())
diff --git a/src/control_backend/api/v1/endpoints/button_pressed.py b/src/control_backend/api/v1/endpoints/button_pressed.py
new file mode 100644
index 0000000..5a94a53
--- /dev/null
+++ b/src/control_backend/api/v1/endpoints/button_pressed.py
@@ -0,0 +1,31 @@
+import logging
+
+from fastapi import APIRouter, Request
+
+from control_backend.schemas.events import ButtonPressedEvent
+
+logger = logging.getLogger(__name__)
+router = APIRouter()
+
+
+@router.post("/button_pressed", status_code=202)
+async def receive_button_event(event: ButtonPressedEvent, request: Request):
+ """
+ Endpoint to handle external button press events.
+
+ Validates the event payload and publishes it to the internal 'button_pressed' topic.
+ Subscribers (in this case user_interrupt_agent) will pick this up to trigger
+ specific behaviors or state changes.
+
+ :param event: The parsed ButtonPressedEvent object.
+ :param request: The FastAPI request object.
+ """
+ logger.debug("Received button event: %s | %s", event.type, event.context)
+
+ topic = b"button_pressed"
+ body = event.model_dump_json().encode()
+
+ pub_socket = request.app.state.endpoints_pub_socket
+ await pub_socket.send_multipart([topic, body])
+
+ return {"status": "Event received"}
diff --git a/src/control_backend/api/v1/router.py b/src/control_backend/api/v1/router.py
index ce5a70b..ebba0db 100644
--- a/src/control_backend/api/v1/router.py
+++ b/src/control_backend/api/v1/router.py
@@ -1,6 +1,6 @@
from fastapi.routing import APIRouter
-from control_backend.api.v1.endpoints import logs, message, program, robot, sse
+from control_backend.api.v1.endpoints import button_pressed, logs, message, program, robot, sse
api_router = APIRouter()
@@ -13,3 +13,5 @@ api_router.include_router(robot.router, prefix="/robot", tags=["Pings", "Command
api_router.include_router(logs.router, tags=["Logs"])
api_router.include_router(program.router, tags=["Program"])
+
+api_router.include_router(button_pressed.router, tags=["Button Pressed Events"])
diff --git a/src/control_backend/core/config.py b/src/control_backend/core/config.py
index 2712d8a..927985b 100644
--- a/src/control_backend/core/config.py
+++ b/src/control_backend/core/config.py
@@ -48,6 +48,7 @@ class AgentSettings(BaseModel):
ri_communication_name: str = "ri_communication_agent"
robot_speech_name: str = "robot_speech_agent"
robot_gesture_name: str = "robot_gesture_agent"
+ user_interrupt_name: str = "user_interrupt_agent"
class BehaviourSettings(BaseModel):
diff --git a/src/control_backend/main.py b/src/control_backend/main.py
index 2c8b766..3509cbc 100644
--- a/src/control_backend/main.py
+++ b/src/control_backend/main.py
@@ -39,6 +39,9 @@ from control_backend.agents.communication import RICommunicationAgent
# LLM Agents
from control_backend.agents.llm import LLMAgent
+# User Interrupt Agent
+from control_backend.agents.user_interrupt.user_interrupt_agent import UserInterruptAgent
+
# Other backend imports
from control_backend.api.v1.router import api_router
from control_backend.core.config import settings
@@ -138,6 +141,12 @@ async def lifespan(app: FastAPI):
"name": settings.agent_settings.bdi_program_manager_name,
},
),
+ "UserInterruptAgent": (
+ UserInterruptAgent,
+ {
+ "name": settings.agent_settings.user_interrupt_name,
+ },
+ ),
}
agents = []
diff --git a/src/control_backend/schemas/events.py b/src/control_backend/schemas/events.py
new file mode 100644
index 0000000..46967f7
--- /dev/null
+++ b/src/control_backend/schemas/events.py
@@ -0,0 +1,6 @@
+from pydantic import BaseModel
+
+
+class ButtonPressedEvent(BaseModel):
+ type: str
+ context: str
diff --git a/src/control_backend/schemas/ri_message.py b/src/control_backend/schemas/ri_message.py
index 3f3abea..a48dec6 100644
--- a/src/control_backend/schemas/ri_message.py
+++ b/src/control_backend/schemas/ri_message.py
@@ -38,6 +38,7 @@ class SpeechCommand(RIMessage):
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH)
data: str
+ is_priority: bool = False
class GestureCommand(RIMessage):
@@ -52,6 +53,7 @@ class GestureCommand(RIMessage):
RIEndpoint.GESTURE_SINGLE, RIEndpoint.GESTURE_TAG
]
data: str
+ is_priority: bool = False
@model_validator(mode="after")
def check_endpoint(self):
diff --git a/test/unit/agents/actuation/test_robot_speech_agent.py b/test/unit/agents/actuation/test_robot_speech_agent.py
index 3cd8fbf..d95f66a 100644
--- a/test/unit/agents/actuation/test_robot_speech_agent.py
+++ b/test/unit/agents/actuation/test_robot_speech_agent.py
@@ -64,7 +64,7 @@ async def test_handle_message_sends_command():
agent = mock_speech_agent()
agent.pubsocket = pubsocket
- payload = {"endpoint": "actuate/speech", "data": "hello"}
+ payload = {"endpoint": "actuate/speech", "data": "hello", "is_priority": False}
msg = InternalMessage(to="robot", sender="tester", body=json.dumps(payload))
await agent.handle_message(msg)
@@ -75,7 +75,7 @@ async def test_handle_message_sends_command():
@pytest.mark.asyncio
async def test_zmq_command_loop_valid_payload(zmq_context):
"""UI command is read from SUB and published."""
- command = {"endpoint": "actuate/speech", "data": "hello"}
+ command = {"endpoint": "actuate/speech", "data": "hello", "is_priority": False}
fake_socket = AsyncMock()
async def recv_once():
diff --git a/test/unit/agents/communication/test_ri_communication_agent.py b/test/unit/agents/communication/test_ri_communication_agent.py
index 018b19d..06d8766 100644
--- a/test/unit/agents/communication/test_ri_communication_agent.py
+++ b/test/unit/agents/communication/test_ri_communication_agent.py
@@ -67,6 +67,7 @@ async def test_setup_success_connects_and_starts_robot(zmq_context):
address="tcp://localhost:5556",
bind=False,
gesture_data=[],
+ single_gesture_data=[],
)
agent.add_behavior.assert_called_once()
diff --git a/test/unit/agents/llm/test_llm_agent.py b/test/unit/agents/llm/test_llm_agent.py
index 62c189e..5e84d8d 100644
--- a/test/unit/agents/llm/test_llm_agent.py
+++ b/test/unit/agents/llm/test_llm_agent.py
@@ -197,6 +197,9 @@ async def test_query_llm_yields_final_tail_chunk(mock_settings):
agent = LLMAgent("llm_agent")
agent.send = AsyncMock()
+ agent.logger = MagicMock()
+ agent.logger.llm = MagicMock()
+
# Patch _stream_query_llm to yield tokens that do NOT end with punctuation
async def fake_stream(messages):
yield "Hello"
diff --git a/test/unit/agents/user_interrupt/test_user_interrupt.py b/test/unit/agents/user_interrupt/test_user_interrupt.py
new file mode 100644
index 0000000..7e3e700
--- /dev/null
+++ b/test/unit/agents/user_interrupt/test_user_interrupt.py
@@ -0,0 +1,146 @@
+import asyncio
+import json
+from unittest.mock import AsyncMock, MagicMock
+
+import pytest
+
+from control_backend.agents.user_interrupt.user_interrupt_agent import UserInterruptAgent
+from control_backend.core.agent_system import InternalMessage
+from control_backend.core.config import settings
+from control_backend.schemas.ri_message import RIEndpoint
+
+
+@pytest.fixture
+def agent():
+ agent = UserInterruptAgent(name="user_interrupt_agent")
+ agent.send = AsyncMock()
+ agent.logger = MagicMock()
+ agent.sub_socket = AsyncMock()
+ return agent
+
+
+@pytest.mark.asyncio
+async def test_send_to_speech_agent(agent):
+ """Verify speech command format."""
+ await agent._send_to_speech_agent("Hello World")
+
+ agent.send.assert_awaited_once()
+ sent_msg: InternalMessage = agent.send.call_args.args[0]
+
+ assert sent_msg.to == settings.agent_settings.robot_speech_name
+ body = json.loads(sent_msg.body)
+ assert body["data"] == "Hello World"
+ assert body["is_priority"] is True
+
+
+@pytest.mark.asyncio
+async def test_send_to_gesture_agent(agent):
+ """Verify gesture command format."""
+ await agent._send_to_gesture_agent("wave_hand")
+
+ agent.send.assert_awaited_once()
+ sent_msg: InternalMessage = agent.send.call_args.args[0]
+
+ assert sent_msg.to == settings.agent_settings.robot_gesture_name
+ body = json.loads(sent_msg.body)
+ assert body["data"] == "wave_hand"
+ assert body["is_priority"] is True
+ assert body["endpoint"] == RIEndpoint.GESTURE_SINGLE.value
+
+
+@pytest.mark.asyncio
+async def test_send_to_program_manager(agent):
+ """Verify belief update format."""
+ context_str = "2"
+
+ await agent._send_to_program_manager(context_str)
+
+ agent.send.assert_awaited_once()
+ sent_msg: InternalMessage = agent.send.call_args.args[0]
+
+ assert sent_msg.to == settings.agent_settings.bdi_program_manager_name
+ assert sent_msg.thread == "belief_override_id"
+
+ body = json.loads(sent_msg.body)
+
+ assert body["belief"] == context_str
+
+
+@pytest.mark.asyncio
+async def test_receive_loop_routing_success(agent):
+ """
+ Test that the loop correctly:
+ 1. Receives 'button_pressed' topic from ZMQ
+ 2. Parses the JSON payload to find 'type' and 'context'
+ 3. Calls the correct handler method based on 'type'
+ """
+ # Prepare JSON payloads as bytes
+ payload_speech = json.dumps({"type": "speech", "context": "Hello Speech"}).encode()
+ payload_gesture = json.dumps({"type": "gesture", "context": "Hello Gesture"}).encode()
+ payload_override = json.dumps({"type": "override", "context": "Hello Override"}).encode()
+
+ agent.sub_socket.recv_multipart.side_effect = [
+ (b"button_pressed", payload_speech),
+ (b"button_pressed", payload_gesture),
+ (b"button_pressed", payload_override),
+ asyncio.CancelledError, # Stop the infinite loop
+ ]
+
+ agent._send_to_speech_agent = AsyncMock()
+ agent._send_to_gesture_agent = AsyncMock()
+ agent._send_to_program_manager = AsyncMock()
+
+ try:
+ await agent._receive_button_event()
+ except asyncio.CancelledError:
+ pass
+
+ await asyncio.sleep(0)
+
+ # Speech
+ agent._send_to_speech_agent.assert_awaited_once_with("Hello Speech")
+
+ # Gesture
+ agent._send_to_gesture_agent.assert_awaited_once_with("Hello Gesture")
+
+ # Override
+ agent._send_to_program_manager.assert_awaited_once_with("Hello Override")
+
+ assert agent._send_to_speech_agent.await_count == 1
+ assert agent._send_to_gesture_agent.await_count == 1
+ assert agent._send_to_program_manager.await_count == 1
+
+
+@pytest.mark.asyncio
+async def test_receive_loop_unknown_type(agent):
+ """Test that unknown 'type' values in the JSON log a warning and do not crash."""
+
+ # Prepare a payload with an unknown type
+ payload_unknown = json.dumps({"type": "unknown_thing", "context": "some_data"}).encode()
+
+ agent.sub_socket.recv_multipart.side_effect = [
+ (b"button_pressed", payload_unknown),
+ asyncio.CancelledError,
+ ]
+
+ agent._send_to_speech_agent = AsyncMock()
+ agent._send_to_gesture_agent = AsyncMock()
+ agent._send_to_belief_collector = AsyncMock()
+
+ try:
+ await agent._receive_button_event()
+ except asyncio.CancelledError:
+ pass
+
+ await asyncio.sleep(0)
+
+ # Ensure no handlers were called
+ agent._send_to_speech_agent.assert_not_called()
+ agent._send_to_gesture_agent.assert_not_called()
+ agent._send_to_belief_collector.assert_not_called()
+
+ agent.logger.warning.assert_called_with(
+ "Received button press with unknown type '%s' (context: '%s').",
+ "unknown_thing",
+ "some_data",
+ )