Compare commits

...

8 Commits

Author SHA1 Message Date
Pim Hutting
08812371fd chore: applied feedback and merged dev into branch
ref: N25B-355
2026-01-02 16:08:13 +01:00
Pim Hutting
6cf25cc587 Merge remote-tracking branch 'origin/dev' into feat/program-reset-llm 2026-01-02 16:06:14 +01:00
Luijkx,S.O.H. (Storm)
adbb7ffd5c Merge branch 'feat/user-interrupt-agent' into 'dev'
create UserInterruptAgent with connection to UI

See merge request ics/sp/2025/n25b/pepperplus-cb!40
2025-12-22 13:56:03 +00:00
Pim Hutting
0501a9fba3 create UserInterruptAgent with connection to UI 2025-12-22 13:56:02 +00:00
Pim Hutting
fb276133d9 chore: removed comment that was no longer needed
ref: N25B-355
2025-12-15 18:43:23 +01:00
Pim Hutting
35548f6864 chore: fixed test that wasn't passing
this was not my test
I stole the fix from Björn's cb2ri-gestures

ref: N25B-355
2025-12-15 18:40:24 +01:00
Pim Hutting
06503d568f Merge branch 'dev' of git.science.uu.nl:ics/sp/2025/n25b/pepperplus-cb into feat/program-reset-llm 2025-12-15 18:02:24 +01:00
Pim Hutting
cd0ca77af9 feat: made program reset LLM
also added test_bdi_program_manager back cause
it was somehow missing in my files

ref: N25B-355
2025-12-15 17:57:38 +01:00
18 changed files with 436 additions and 6 deletions

View File

@@ -0,0 +1,9 @@
%{first_multiline_commit_description}
To verify:
- [ ] Style checks pass
- [ ] Pipeline (tests) pass
- [ ] Documentation is up to date
- [ ] Tests are up to date (new code is covered)
- [ ] ...

View File

@@ -28,6 +28,7 @@ class RobotGestureAgent(BaseAgent):
address = "" address = ""
bind = False bind = False
gesture_data = [] gesture_data = []
single_gesture_data = []
def __init__( def __init__(
self, self,
@@ -35,8 +36,10 @@ class RobotGestureAgent(BaseAgent):
address=settings.zmq_settings.ri_command_address, address=settings.zmq_settings.ri_command_address,
bind=False, bind=False,
gesture_data=None, gesture_data=None,
single_gesture_data=None,
): ):
self.gesture_data = gesture_data or [] self.gesture_data = gesture_data or []
self.single_gesture_data = single_gesture_data or []
super().__init__(name) super().__init__(name)
self.address = address self.address = address
self.bind = bind self.bind = bind
@@ -99,7 +102,13 @@ class RobotGestureAgent(BaseAgent):
gesture_command.data, gesture_command.data,
) )
return return
elif gesture_command.endpoint == RIEndpoint.GESTURE_SINGLE:
if gesture_command.data not in self.single_gesture_data:
self.logger.warning(
"Received gesture '%s' which is not in available gestures. Early returning",
gesture_command.data,
)
return
await self.pubsocket.send_json(gesture_command.model_dump()) await self.pubsocket.send_json(gesture_command.model_dump())
except Exception: except Exception:
self.logger.exception("Error processing internal message.") self.logger.exception("Error processing internal message.")

View File

@@ -60,24 +60,41 @@ class BDIProgramManager(BaseAgent):
await self.send(message) await self.send(message)
self.logger.debug("Sent new norms and goals to the BDI agent.") self.logger.debug("Sent new norms and goals to the BDI agent.")
async def _send_clear_llm_history(self):
"""
Clear the LLM Agent's conversation history.
Sends a message to the LLM Agent instructing it to clear its history.
"""
message = InternalMessage(
to=settings.agent_settings.llm_name,
sender=self.name,
body="clear_history",
threads="clear history message",
)
await self.send(message)
self.logger.debug("Sent message to LLM agent to clear history.")
async def _receive_programs(self): async def _receive_programs(self):
""" """
Continuous loop that receives program updates from the HTTP endpoint. Continuous loop that receives program updates from the HTTP endpoint.
It listens to the ``program`` topic on the internal ZMQ SUB socket. It listens to the ``program`` topic on the internal ZMQ SUB socket.
When a program is received, it is validated and forwarded to BDI via :meth:`_send_to_bdi`. When a program is received, it is validated and forwarded to BDI via :meth:`_send_to_bdi`.
Additionally, the LLM history is cleared via :meth:`_send_clear_llm_history`.
""" """
while True: while True:
topic, body = await self.sub_socket.recv_multipart() topic, body = await self.sub_socket.recv_multipart()
try: try:
program = Program.model_validate_json(body) program = Program.model_validate_json(body)
await self._send_to_bdi(program)
await self._send_clear_llm_history()
except ValidationError: except ValidationError:
self.logger.exception("Received an invalid program.") self.logger.exception("Received an invalid program.")
continue continue
await self._send_to_bdi(program)
async def setup(self): async def setup(self):
""" """
Initialize the agent. Initialize the agent.

View File

@@ -182,6 +182,7 @@ class RICommunicationAgent(BaseAgent):
self._req_socket.bind(addr) self._req_socket.bind(addr)
case "actuation": case "actuation":
gesture_data = port_data.get("gestures", []) gesture_data = port_data.get("gestures", [])
single_gesture_data = port_data.get("single_gestures", [])
robot_speech_agent = RobotSpeechAgent( robot_speech_agent = RobotSpeechAgent(
settings.agent_settings.robot_speech_name, settings.agent_settings.robot_speech_name,
address=addr, address=addr,
@@ -192,6 +193,7 @@ class RICommunicationAgent(BaseAgent):
address=addr, address=addr,
bind=bind, bind=bind,
gesture_data=gesture_data, gesture_data=gesture_data,
single_gesture_data=single_gesture_data,
) )
await robot_speech_agent.start() await robot_speech_agent.start()
await asyncio.sleep(0.1) # Small delay await asyncio.sleep(0.1) # Small delay

View File

@@ -52,6 +52,10 @@ class LLMAgent(BaseAgent):
await self._process_bdi_message(prompt_message) await self._process_bdi_message(prompt_message)
except ValidationError: except ValidationError:
self.logger.debug("Prompt message from BDI core is invalid.") self.logger.debug("Prompt message from BDI core is invalid.")
elif msg.sender == settings.agent_settings.bdi_program_manager_name:
if msg.body == "clear_history":
self.logger.debug("Clearing conversation history.")
self.history.clear()
else: else:
self.logger.debug("Message ignored (not from BDI core.") self.logger.debug("Message ignored (not from BDI core.")

View File

@@ -0,0 +1,146 @@
import json
import zmq
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand
class UserInterruptAgent(BaseAgent):
"""
User Interrupt Agent.
This agent receives button_pressed events from the external HTTP API
(via ZMQ) and uses the associated context to trigger one of the following actions:
- Send a prioritized message to the `RobotSpeechAgent`
- Send a prioritized gesture to the `RobotGestureAgent`
- Send a belief override to the `BDIProgramManager`in order to activate a
trigger/conditional norm or complete a goal.
Prioritized actions clear the current RI queue before inserting the new item,
ensuring they are executed immediately after Pepper's current action has been fulfilled.
:ivar sub_socket: The ZMQ SUB socket used to receive user intterupts.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sub_socket = None
async def _receive_button_event(self):
"""
The behaviour of the UserInterruptAgent.
Continuous loop that receives button_pressed events from the button_pressed HTTP endpoint.
These events contain a type and a context.
These are the different types and contexts:
- type: "speech", context: string that the robot has to say.
- type: "gesture", context: single gesture name that the robot has to perform.
- type: "override", context: belief_id that overrides the goal/trigger/conditional norm.
"""
while True:
topic, body = await self.sub_socket.recv_multipart()
try:
event_data = json.loads(body)
event_type = event_data.get("type") # e.g., "speech", "gesture"
event_context = event_data.get("context") # e.g., "Hello, I am Pepper!"
except json.JSONDecodeError:
self.logger.error("Received invalid JSON payload on topic %s", topic)
continue
if event_type == "speech":
await self._send_to_speech_agent(event_context)
self.logger.info(
"Forwarded button press (speech) with context '%s' to RobotSpeechAgent.",
event_context,
)
elif event_type == "gesture":
await self._send_to_gesture_agent(event_context)
self.logger.info(
"Forwarded button press (gesture) with context '%s' to RobotGestureAgent.",
event_context,
)
elif event_type == "override":
await self._send_to_program_manager(event_context)
self.logger.info(
"Forwarded button press (override) with context '%s' to BDIProgramManager.",
event_context,
)
else:
self.logger.warning(
"Received button press with unknown type '%s' (context: '%s').",
event_type,
event_context,
)
async def _send_to_speech_agent(self, text_to_say: str):
"""
method to send prioritized speech command to RobotSpeechAgent.
:param text_to_say: The string that the robot has to say.
"""
cmd = SpeechCommand(data=text_to_say, is_priority=True)
out_msg = InternalMessage(
to=settings.agent_settings.robot_speech_name,
sender=self.name,
body=cmd.model_dump_json(),
)
await self.send(out_msg)
async def _send_to_gesture_agent(self, single_gesture_name: str):
"""
method to send prioritized gesture command to RobotGestureAgent.
:param single_gesture_name: The gesture tag that the robot has to perform.
"""
# the endpoint is set to always be GESTURE_SINGLE for user interrupts
cmd = GestureCommand(
endpoint=RIEndpoint.GESTURE_SINGLE, data=single_gesture_name, is_priority=True
)
out_msg = InternalMessage(
to=settings.agent_settings.robot_gesture_name,
sender=self.name,
body=cmd.model_dump_json(),
)
await self.send(out_msg)
async def _send_to_program_manager(self, belief_id: str):
"""
Send a button_override belief to the BDIProgramManager.
:param belief_id: The belief_id that overrides the goal/trigger/conditional norm.
this id can belong to a basic belief or an inferred belief.
See also: https://utrechtuniversity.youtrack.cloud/articles/N25B-A-27/UI-components
"""
data = {"belief": belief_id}
message = InternalMessage(
to=settings.agent_settings.bdi_program_manager_name,
sender=self.name,
body=json.dumps(data),
thread="belief_override_id",
)
await self.send(message)
self.logger.info(
"Sent button_override belief with id '%s' to Program manager.",
belief_id,
)
async def setup(self):
"""
Initialize the agent.
Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic.
Starts the background behavior to receive the user interrupts.
"""
context = Context.instance()
self.sub_socket = context.socket(zmq.SUB)
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
self.sub_socket.subscribe("button_pressed")
self.add_behavior(self._receive_button_event())

View File

@@ -0,0 +1,31 @@
import logging
from fastapi import APIRouter, Request
from control_backend.schemas.events import ButtonPressedEvent
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/button_pressed", status_code=202)
async def receive_button_event(event: ButtonPressedEvent, request: Request):
"""
Endpoint to handle external button press events.
Validates the event payload and publishes it to the internal 'button_pressed' topic.
Subscribers (in this case user_interrupt_agent) will pick this up to trigger
specific behaviors or state changes.
:param event: The parsed ButtonPressedEvent object.
:param request: The FastAPI request object.
"""
logger.debug("Received button event: %s | %s", event.type, event.context)
topic = b"button_pressed"
body = event.model_dump_json().encode()
pub_socket = request.app.state.endpoints_pub_socket
await pub_socket.send_multipart([topic, body])
return {"status": "Event received"}

View File

@@ -1,6 +1,6 @@
from fastapi.routing import APIRouter from fastapi.routing import APIRouter
from control_backend.api.v1.endpoints import logs, message, program, robot, sse from control_backend.api.v1.endpoints import button_pressed, logs, message, program, robot, sse
api_router = APIRouter() api_router = APIRouter()
@@ -13,3 +13,5 @@ api_router.include_router(robot.router, prefix="/robot", tags=["Pings", "Command
api_router.include_router(logs.router, tags=["Logs"]) api_router.include_router(logs.router, tags=["Logs"])
api_router.include_router(program.router, tags=["Program"]) api_router.include_router(program.router, tags=["Program"])
api_router.include_router(button_pressed.router, tags=["Button Pressed Events"])

View File

@@ -48,6 +48,7 @@ class AgentSettings(BaseModel):
ri_communication_name: str = "ri_communication_agent" ri_communication_name: str = "ri_communication_agent"
robot_speech_name: str = "robot_speech_agent" robot_speech_name: str = "robot_speech_agent"
robot_gesture_name: str = "robot_gesture_agent" robot_gesture_name: str = "robot_gesture_agent"
user_interrupt_name: str = "user_interrupt_agent"
class BehaviourSettings(BaseModel): class BehaviourSettings(BaseModel):

View File

@@ -39,6 +39,9 @@ from control_backend.agents.communication import RICommunicationAgent
# LLM Agents # LLM Agents
from control_backend.agents.llm import LLMAgent from control_backend.agents.llm import LLMAgent
# User Interrupt Agent
from control_backend.agents.user_interrupt.user_interrupt_agent import UserInterruptAgent
# Other backend imports # Other backend imports
from control_backend.api.v1.router import api_router from control_backend.api.v1.router import api_router
from control_backend.core.config import settings from control_backend.core.config import settings
@@ -138,6 +141,12 @@ async def lifespan(app: FastAPI):
"name": settings.agent_settings.bdi_program_manager_name, "name": settings.agent_settings.bdi_program_manager_name,
}, },
), ),
"UserInterruptAgent": (
UserInterruptAgent,
{
"name": settings.agent_settings.user_interrupt_name,
},
),
} }
agents = [] agents = []

View File

@@ -0,0 +1,6 @@
from pydantic import BaseModel
class ButtonPressedEvent(BaseModel):
type: str
context: str

View File

@@ -38,6 +38,7 @@ class SpeechCommand(RIMessage):
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH) endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH)
data: str data: str
is_priority: bool = False
class GestureCommand(RIMessage): class GestureCommand(RIMessage):
@@ -52,6 +53,7 @@ class GestureCommand(RIMessage):
RIEndpoint.GESTURE_SINGLE, RIEndpoint.GESTURE_TAG RIEndpoint.GESTURE_SINGLE, RIEndpoint.GESTURE_TAG
] ]
data: str data: str
is_priority: bool = False
@model_validator(mode="after") @model_validator(mode="after")
def check_endpoint(self): def check_endpoint(self):

View File

@@ -64,7 +64,7 @@ async def test_handle_message_sends_command():
agent = mock_speech_agent() agent = mock_speech_agent()
agent.pubsocket = pubsocket agent.pubsocket = pubsocket
payload = {"endpoint": "actuate/speech", "data": "hello"} payload = {"endpoint": "actuate/speech", "data": "hello", "is_priority": False}
msg = InternalMessage(to="robot", sender="tester", body=json.dumps(payload)) msg = InternalMessage(to="robot", sender="tester", body=json.dumps(payload))
await agent.handle_message(msg) await agent.handle_message(msg)
@@ -75,7 +75,7 @@ async def test_handle_message_sends_command():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_zmq_command_loop_valid_payload(zmq_context): async def test_zmq_command_loop_valid_payload(zmq_context):
"""UI command is read from SUB and published.""" """UI command is read from SUB and published."""
command = {"endpoint": "actuate/speech", "data": "hello"} command = {"endpoint": "actuate/speech", "data": "hello", "is_priority": False}
fake_socket = AsyncMock() fake_socket = AsyncMock()
async def recv_once(): async def recv_once():

View File

@@ -63,6 +63,7 @@ async def test_receive_programs_valid_and_invalid():
manager = BDIProgramManager(name="program_manager_test") manager = BDIProgramManager(name="program_manager_test")
manager.sub_socket = sub manager.sub_socket = sub
manager._send_to_bdi = AsyncMock() manager._send_to_bdi = AsyncMock()
manager._send_clear_llm_history = AsyncMock()
try: try:
# Will give StopAsyncIteration when the predefined `sub.recv_multipart` side-effects run out # Will give StopAsyncIteration when the predefined `sub.recv_multipart` side-effects run out
@@ -75,3 +76,24 @@ async def test_receive_programs_valid_and_invalid():
forwarded: Program = manager._send_to_bdi.await_args[0][0] forwarded: Program = manager._send_to_bdi.await_args[0][0]
assert forwarded.phases[0].norms[0].norm == "N1" assert forwarded.phases[0].norms[0].norm == "N1"
assert forwarded.phases[0].goals[0].description == "G1" assert forwarded.phases[0].goals[0].description == "G1"
# Verify history clear was triggered
assert manager._send_clear_llm_history.await_count == 1
@pytest.mark.asyncio
async def test_send_clear_llm_history(mock_settings):
# Ensure the mock returns a string for the agent name (just like in your LLM tests)
mock_settings.agent_settings.llm_agent_name = "llm_agent"
manager = BDIProgramManager(name="program_manager_test")
manager.send = AsyncMock()
await manager._send_clear_llm_history()
assert manager.send.await_count == 1
msg: InternalMessage = manager.send.await_args[0][0]
# Verify the content and recipient
assert msg.body == "clear_history"
assert msg.to == "llm_agent"

View File

@@ -67,6 +67,7 @@ async def test_setup_success_connects_and_starts_robot(zmq_context):
address="tcp://localhost:5556", address="tcp://localhost:5556",
bind=False, bind=False,
gesture_data=[], gesture_data=[],
single_gesture_data=[],
) )
agent.add_behavior.assert_called_once() agent.add_behavior.assert_called_once()

View File

@@ -197,6 +197,9 @@ async def test_query_llm_yields_final_tail_chunk(mock_settings):
agent = LLMAgent("llm_agent") agent = LLMAgent("llm_agent")
agent.send = AsyncMock() agent.send = AsyncMock()
agent.logger = MagicMock()
agent.logger.llm = MagicMock()
# Patch _stream_query_llm to yield tokens that do NOT end with punctuation # Patch _stream_query_llm to yield tokens that do NOT end with punctuation
async def fake_stream(messages): async def fake_stream(messages):
yield "Hello" yield "Hello"
@@ -262,3 +265,23 @@ async def test_stream_query_llm_skips_non_data_lines(mock_httpx_client, mock_set
# Only the valid 'data:' line should yield content # Only the valid 'data:' line should yield content
assert tokens == ["Hi"] assert tokens == ["Hi"]
@pytest.mark.asyncio
async def test_clear_history_command(mock_settings):
"""Test that the 'clear_history' message clears the agent's memory."""
# setup LLM to have some history
mock_settings.agent_settings.bdi_program_manager_name = "bdi_program_manager_agent"
agent = LLMAgent("llm_agent")
agent.history = [
{"role": "user", "content": "Old conversation context"},
{"role": "assistant", "content": "Old response"},
]
assert len(agent.history) == 2
msg = InternalMessage(
to="llm_agent",
sender=mock_settings.agent_settings.bdi_program_manager_name,
body="clear_history",
)
await agent.handle_message(msg)
assert len(agent.history) == 0

View File

@@ -0,0 +1,146 @@
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock
import pytest
from control_backend.agents.user_interrupt.user_interrupt_agent import UserInterruptAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.ri_message import RIEndpoint
@pytest.fixture
def agent():
agent = UserInterruptAgent(name="user_interrupt_agent")
agent.send = AsyncMock()
agent.logger = MagicMock()
agent.sub_socket = AsyncMock()
return agent
@pytest.mark.asyncio
async def test_send_to_speech_agent(agent):
"""Verify speech command format."""
await agent._send_to_speech_agent("Hello World")
agent.send.assert_awaited_once()
sent_msg: InternalMessage = agent.send.call_args.args[0]
assert sent_msg.to == settings.agent_settings.robot_speech_name
body = json.loads(sent_msg.body)
assert body["data"] == "Hello World"
assert body["is_priority"] is True
@pytest.mark.asyncio
async def test_send_to_gesture_agent(agent):
"""Verify gesture command format."""
await agent._send_to_gesture_agent("wave_hand")
agent.send.assert_awaited_once()
sent_msg: InternalMessage = agent.send.call_args.args[0]
assert sent_msg.to == settings.agent_settings.robot_gesture_name
body = json.loads(sent_msg.body)
assert body["data"] == "wave_hand"
assert body["is_priority"] is True
assert body["endpoint"] == RIEndpoint.GESTURE_SINGLE.value
@pytest.mark.asyncio
async def test_send_to_program_manager(agent):
"""Verify belief update format."""
context_str = "2"
await agent._send_to_program_manager(context_str)
agent.send.assert_awaited_once()
sent_msg: InternalMessage = agent.send.call_args.args[0]
assert sent_msg.to == settings.agent_settings.bdi_program_manager_name
assert sent_msg.thread == "belief_override_id"
body = json.loads(sent_msg.body)
assert body["belief"] == context_str
@pytest.mark.asyncio
async def test_receive_loop_routing_success(agent):
"""
Test that the loop correctly:
1. Receives 'button_pressed' topic from ZMQ
2. Parses the JSON payload to find 'type' and 'context'
3. Calls the correct handler method based on 'type'
"""
# Prepare JSON payloads as bytes
payload_speech = json.dumps({"type": "speech", "context": "Hello Speech"}).encode()
payload_gesture = json.dumps({"type": "gesture", "context": "Hello Gesture"}).encode()
payload_override = json.dumps({"type": "override", "context": "Hello Override"}).encode()
agent.sub_socket.recv_multipart.side_effect = [
(b"button_pressed", payload_speech),
(b"button_pressed", payload_gesture),
(b"button_pressed", payload_override),
asyncio.CancelledError, # Stop the infinite loop
]
agent._send_to_speech_agent = AsyncMock()
agent._send_to_gesture_agent = AsyncMock()
agent._send_to_program_manager = AsyncMock()
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
await asyncio.sleep(0)
# Speech
agent._send_to_speech_agent.assert_awaited_once_with("Hello Speech")
# Gesture
agent._send_to_gesture_agent.assert_awaited_once_with("Hello Gesture")
# Override
agent._send_to_program_manager.assert_awaited_once_with("Hello Override")
assert agent._send_to_speech_agent.await_count == 1
assert agent._send_to_gesture_agent.await_count == 1
assert agent._send_to_program_manager.await_count == 1
@pytest.mark.asyncio
async def test_receive_loop_unknown_type(agent):
"""Test that unknown 'type' values in the JSON log a warning and do not crash."""
# Prepare a payload with an unknown type
payload_unknown = json.dumps({"type": "unknown_thing", "context": "some_data"}).encode()
agent.sub_socket.recv_multipart.side_effect = [
(b"button_pressed", payload_unknown),
asyncio.CancelledError,
]
agent._send_to_speech_agent = AsyncMock()
agent._send_to_gesture_agent = AsyncMock()
agent._send_to_belief_collector = AsyncMock()
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
await asyncio.sleep(0)
# Ensure no handlers were called
agent._send_to_speech_agent.assert_not_called()
agent._send_to_gesture_agent.assert_not_called()
agent._send_to_belief_collector.assert_not_called()
agent.logger.warning.assert_called_with(
"Received button press with unknown type '%s' (context: '%s').",
"unknown_thing",
"some_data",
)