Compare commits

..

8 Commits

Author SHA1 Message Date
2e717ec277 Merge branch 'fix/trigger-subgoal-error' into 'main'
Allow subgoals in triggers and empty plan

See merge request ics/sp/2025/n25b/pepperplus-cb!50
2026-01-30 19:53:51 +00:00
b53bf872a5 chore: better name checks for ProgramElement 2026-01-30 19:30:40 +01:00
1337b1f06b Merge branch 'main' into fix/trigger-subgoal-error 2026-01-30 18:19:31 +01:00
51015dbbfe Merge branch 'feat/longer-pauses-possible' into 'main'
Add interruption to stream query

See merge request ics/sp/2025/n25b/pepperplus-cb!52
2026-01-30 17:01:11 +00:00
21690da679 Merge branch 'feat/visual-emotion-recognition' into 'main'
feat: visual emotion recognition agent

See merge request ics/sp/2025/n25b/pepperplus-cb!54
2026-01-30 16:53:15 +00:00
Luijkx,S.O.H. (Storm)
45b8597f15 feat: visual emotion recognition agent 2026-01-30 16:53:15 +00:00
3579aee114 fix: allow interrupting on each token stream
ref: N25B-452
2026-01-29 15:35:42 +01:00
f79b65a6fa fix: allow subgoals in triggers and empty plan
Copies the goal and changes can_fail to false. Also add a warning for empty plans in goals.

ref: N25B-460
2026-01-29 15:18:09 +01:00
16 changed files with 442 additions and 381 deletions

View File

@@ -40,6 +40,7 @@ dev = [
] ]
test = [ test = [
"agentspeak>=0.2.2", "agentspeak>=0.2.2",
"deepface>=0.0.97",
"fastapi>=0.115.6", "fastapi>=0.115.6",
"httpx>=0.28.1", "httpx>=0.28.1",
"mlx-whisper>=0.4.3 ; sys_platform == 'darwin'", "mlx-whisper>=0.4.3 ; sys_platform == 'darwin'",
@@ -54,6 +55,7 @@ test = [
"pyyaml>=6.0.3", "pyyaml>=6.0.3",
"pyzmq>=27.1.0", "pyzmq>=27.1.0",
"soundfile>=0.13.1", "soundfile>=0.13.1",
"tf-keras>=2.20.1",
] ]
[tool.pytest.ini_options] [tool.pytest.ini_options]

View File

@@ -4,6 +4,7 @@ University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences) © Copyright Utrecht University (Department of Information and Computing Sciences)
""" """
import logging
from functools import singledispatchmethod from functools import singledispatchmethod
from slugify import slugify from slugify import slugify
@@ -30,7 +31,6 @@ from control_backend.schemas.program import (
BasicNorm, BasicNorm,
ConditionalNorm, ConditionalNorm,
EmotionBelief, EmotionBelief,
FaceBelief,
GestureAction, GestureAction,
Goal, Goal,
InferredBelief, InferredBelief,
@@ -67,6 +67,7 @@ class AgentSpeakGenerator:
""" """
_asp: AstProgram _asp: AstProgram
logger = logging.getLogger(__name__)
def generate(self, program: Program) -> str: def generate(self, program: Program) -> str:
""" """
@@ -480,7 +481,8 @@ class AgentSpeakGenerator:
:param main_goal: Whether this is a main goal (for UI notification purposes). :param main_goal: Whether this is a main goal (for UI notification purposes).
""" """
context: list[AstExpression] = [self._astify(phase)] context: list[AstExpression] = [self._astify(phase)]
context.append(~self._astify(goal, achieved=True)) if goal.can_fail:
context.append(~self._astify(goal, achieved=True))
if previous_goal and previous_goal.can_fail: if previous_goal and previous_goal.can_fail:
context.append(self._astify(previous_goal, achieved=True)) context.append(self._astify(previous_goal, achieved=True))
if not continues_response: if not continues_response:
@@ -504,6 +506,10 @@ class AgentSpeakGenerator:
if not goal.can_fail and not continues_response: if not goal.can_fail and not continues_response:
body.append(AstStatement(StatementType.ADD_BELIEF, self._astify(goal, achieved=True))) body.append(AstStatement(StatementType.ADD_BELIEF, self._astify(goal, achieved=True)))
if len(body) == 0:
self.logger.warning("Goal with no plan detected: %s", goal.name)
body.append(AstStatement(StatementType.EMPTY, AstLiteral("true")))
self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(goal), context, body)) self._asp.plans.append(AstPlan(TriggerType.ADDED_GOAL, self._astify(goal), context, body))
self._asp.plans.append( self._asp.plans.append(
@@ -564,10 +570,10 @@ class AgentSpeakGenerator:
) )
) )
for step in trigger.plan.steps: for step in trigger.plan.steps:
body.append(self._step_to_statement(step))
if isinstance(step, Goal): if isinstance(step, Goal):
step.can_fail = False # triggers are continuous sequence new_step = step.model_copy(update={"can_fail": False}) # triggers are sequence
subgoals.append(step) subgoals.append(new_step)
body.append(self._step_to_statement(step))
# Arbitrary wait for UI to display nicely # Arbitrary wait for UI to display nicely
body.append( body.append(
@@ -688,10 +694,6 @@ class AgentSpeakGenerator:
def _(self, eb: EmotionBelief) -> AstExpression: def _(self, eb: EmotionBelief) -> AstExpression:
return AstLiteral("emotion_detected", [AstAtom(eb.emotion)]) return AstLiteral("emotion_detected", [AstAtom(eb.emotion)])
@_astify.register
def _(self, eb: FaceBelief) -> AstExpression:
return AstLiteral("face_present")
@_astify.register @_astify.register
def _(self, ib: InferredBelief) -> AstExpression: def _(self, ib: InferredBelief) -> AstExpression:
""" """

View File

@@ -19,7 +19,7 @@ from control_backend.agents.perception.visual_emotion_recognition_agent.visual_e
from control_backend.core.config import settings from control_backend.core.config import settings
from ..actuation.robot_speech_agent import RobotSpeechAgent from ..actuation.robot_speech_agent import RobotSpeechAgent
from ..perception import FacePerceptionAgent, VADAgent from ..perception import VADAgent
class RICommunicationAgent(BaseAgent): class RICommunicationAgent(BaseAgent):
@@ -181,7 +181,7 @@ class RICommunicationAgent(BaseAgent):
bind = port_data["bind"] bind = port_data["bind"]
if not bind: if not bind:
addr = f"tcp://localhost:{port}" addr = f"tcp://{settings.ri_host}:{port}"
else: else:
addr = f"tcp://*:{port}" addr = f"tcp://*:{port}"
@@ -224,13 +224,6 @@ class RICommunicationAgent(BaseAgent):
) )
self.visual_emotion_recognition_agent = visual_emotion_agent self.visual_emotion_recognition_agent = visual_emotion_agent
await visual_emotion_agent.start() await visual_emotion_agent.start()
case "face":
face_agent = FacePerceptionAgent(
settings.agent_settings.face_agent_name,
zmq_address=addr,
zmq_bind=bind,
)
await face_agent.start()
case _: case _:
self.logger.warning("Unhandled negotiation id: %s", id) self.logger.warning("Unhandled negotiation id: %s", id)
@@ -345,3 +338,4 @@ class RICommunicationAgent(BaseAgent):
self.logger.debug("Restarting communication negotiation.") self.logger.debug("Restarting communication negotiation.")
if await self._negotiate_connection(max_retries=2): if await self._negotiate_connection(max_retries=2):
self.connected = True self.connected = True

View File

@@ -185,6 +185,9 @@ class LLMAgent(BaseAgent):
full_message = "" full_message = ""
current_chunk = "" current_chunk = ""
async for token in self._stream_query_llm(messages): async for token in self._stream_query_llm(messages):
if self._interrupted:
return
full_message += token full_message += token
current_chunk += token current_chunk += token

View File

@@ -7,7 +7,6 @@ Agents responsible for processing sensory input, such as audio transcription and
detection. detection.
""" """
from .face_rec_agent import FacePerceptionAgent as FacePerceptionAgent
from .transcription_agent.transcription_agent import ( from .transcription_agent.transcription_agent import (
TranscriptionAgent as TranscriptionAgent, TranscriptionAgent as TranscriptionAgent,
) )

View File

@@ -1,144 +0,0 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
import zmq
import zmq.asyncio as azmq
from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief, BeliefMessage
class FacePerceptionAgent(BaseAgent):
"""
Receives face presence updates from the RICommunicationAgent
via the internal PUB/SUB bus.
"""
def __init__(self, name: str, zmq_address: str, zmq_bind: bool):
"""
:param name: The name of the agent.
:param zmq_address: The ZMQ address to subscribe to, an endpoint which sends face presence
updates.
:param zmq_bind: Whether to connect to the ZMQ endpoint, or to bind.
"""
super().__init__(name)
self._zmq_address = zmq_address
self._zmq_bind = zmq_bind
self._socket: azmq.Socket | None = None
self._last_face_state: bool | None = None
# Pause functionality
# NOTE: flag is set when running, cleared when paused
self._paused = asyncio.Event()
self._paused.set()
async def setup(self):
self.logger.info("Starting FacePerceptionAgent")
if self._socket is None:
self._connect_socket()
self.add_behavior(self._poll_loop())
self.logger.info("Finished setting up %s", self.name)
def _connect_socket(self):
if self._socket is not None:
self.logger.warning("ZMQ socket already initialized. Did you call setup() twice?")
return
self._socket = azmq.Context.instance().socket(zmq.SUB)
self._socket.setsockopt_string(zmq.SUBSCRIBE, "")
if self._zmq_bind:
self._socket.bind(self._zmq_address)
else:
self._socket.connect(self._zmq_address)
async def _poll_loop(self):
if self._socket is None:
self.logger.warning("Connection not initialized before poll loop. Call setup() first.")
return
while self._running:
try:
await self._paused.wait()
response = await asyncio.wait_for(
self._socket.recv_json(), timeout=settings.behaviour_settings.sleep_s
)
face_present = response.get("face_detected", False)
if self._last_face_state is None:
self._last_face_state = face_present
continue
if face_present != self._last_face_state:
self._last_face_state = face_present
self.logger.debug("Face detected" if face_present else "Face lost")
await self._update_face_belief(face_present)
except TimeoutError:
pass
except Exception as e:
self.logger.error("Face polling failed", exc_info=e)
async def _post_face_belief(self, present: bool):
"""
Send a face_present belief update to the BDI Core Agent.
"""
if present:
belief_msg = BeliefMessage(create=[{"name": "face_present", "arguments": []}])
else:
belief_msg = BeliefMessage(delete=[{"name": "face_present", "arguments": []}])
msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
thread="beliefs",
body=belief_msg.model_dump_json(),
)
await self.send(msg)
async def _update_face_belief(self, present: bool):
"""
Add or remove the `face_present` belief in the BDI Core Agent.
"""
if present:
payload = BeliefMessage(create=[Belief(name="face_present").model_dump()])
else:
payload = BeliefMessage(delete=[Belief(name="face_present").model_dump()])
message = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
thread="beliefs",
body=payload.model_dump_json(),
)
await self.send(message)
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming pause/resume commands from User Interrupt Agent.
"""
sender = msg.sender
if sender == settings.agent_settings.user_interrupt_name:
if msg.body == "PAUSE":
self.logger.info("Pausing Face Perception processing.")
self._paused.clear()
self._last_face_state = None
elif msg.body == "RESUME":
self.logger.info("Resuming Face Perception processing.")
self._paused.set()
else:
self.logger.warning("Unknown command from User Interrupt Agent: %s", msg.body)
else:
self.logger.debug("Ignoring message from unknown sender: %s", sender)

View File

@@ -3,7 +3,6 @@ import json
import time import time
from collections import Counter, defaultdict from collections import Counter, defaultdict
import cv2
import numpy as np import numpy as np
import zmq import zmq
import zmq.asyncio as azmq import zmq.asyncio as azmq
@@ -64,6 +63,8 @@ class VisualEmotionRecognitionAgent(BaseAgent):
self.video_in_socket = azmq.Context.instance().socket(zmq.SUB) self.video_in_socket = azmq.Context.instance().socket(zmq.SUB)
self.video_in_socket.setsockopt(zmq.RCVHWM, 3)
if self.socket_bind: if self.socket_bind:
self.video_in_socket.bind(self.socket_address) self.video_in_socket.bind(self.socket_address)
else: else:
@@ -71,12 +72,9 @@ class VisualEmotionRecognitionAgent(BaseAgent):
self.video_in_socket.setsockopt_string(zmq.SUBSCRIBE, "") self.video_in_socket.setsockopt_string(zmq.SUBSCRIBE, "")
self.video_in_socket.setsockopt(zmq.RCVTIMEO, self.timeout_ms) self.video_in_socket.setsockopt(zmq.RCVTIMEO, self.timeout_ms)
self.video_in_socket.setsockopt(zmq.CONFLATE, 1)
self.add_behavior(self.emotion_update_loop()) self.add_behavior(self.emotion_update_loop())
self.logger.info("Finished setting up %s", self.name)
async def emotion_update_loop(self): async def emotion_update_loop(self):
""" """
Background loop to receive video frames, recognize emotions, and update beliefs. Background loop to receive video frames, recognize emotions, and update beliefs.
@@ -97,21 +95,18 @@ class VisualEmotionRecognitionAgent(BaseAgent):
try: try:
await self._paused.wait() await self._paused.wait()
frame_bytes = await self.video_in_socket.recv() width, height, image_bytes = await self.video_in_socket.recv_multipart()
width = int.from_bytes(width, 'little')
height = int.from_bytes(height, 'little')
# Convert bytes to a numpy buffer # Convert bytes to a numpy buffer
nparr = np.frombuffer(frame_bytes, np.uint8) image_array = np.frombuffer(image_bytes, np.uint8)
# Decode image into the generic Numpy Array DeepFace expects frame = image_array.reshape((height, width, 3))
frame_image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame_image is None:
# Could not decode image, skip this frame
self.logger.warning("Received invalid video frame, skipping.")
continue
# Get the dominant emotion from each face # Get the dominant emotion from each face
current_emotions = self.emotion_recognizer.sorted_dominant_emotions(frame_image) current_emotions = self.emotion_recognizer.sorted_dominant_emotions(frame)
# Update emotion counts for each detected face # Update emotion counts for each detected face
for i, emotion in enumerate(current_emotions): for i, emotion in enumerate(current_emotions):
face_stats[i][emotion] += 1 face_stats[i][emotion] += 1
@@ -135,6 +130,10 @@ class VisualEmotionRecognitionAgent(BaseAgent):
except zmq.Again: except zmq.Again:
self.logger.warning("No video frame received within timeout.") self.logger.warning("No video frame received within timeout.")
except Exception as e:
self.logger.error(f"Error in emotion recognition loop: {e}")
async def update_emotions(self, prev_emotions: set[str], emotions: set[str]): async def update_emotions(self, prev_emotions: set[str], emotions: set[str]):
""" """
Compare emotions from previous window and current emotions, Compare emotions from previous window and current emotions,
@@ -205,3 +204,4 @@ class VisualEmotionRecognitionAgent(BaseAgent):
""" """
self.video_in_socket.close() self.video_in_socket.close()
await super().stop() await super().stop()

View File

@@ -23,32 +23,31 @@ class VisualEmotionRecognizer(abc.ABC):
""" """
pass pass
class DeepFaceEmotionRecognizer(VisualEmotionRecognizer): class DeepFaceEmotionRecognizer(VisualEmotionRecognizer):
""" """
DeepFace-based implementation of VisualEmotionRecognizer. DeepFace-based implementation of VisualEmotionRecognizer.
DeepFape has proven to be quite a pessimistic model, so expect sad, fear and neutral DeepFape has proven to be quite a pessimistic model, so expect sad, fear and neutral
emotions to be over-represented. emotions to be over-represented.
""" """
def __init__(self): def __init__(self):
self.load_model() self.load_model()
def load_model(self): def load_model(self):
print("Loading Deepface Emotion Model...")
dummy_img = np.zeros((224, 224, 3), dtype=np.uint8) dummy_img = np.zeros((224, 224, 3), dtype=np.uint8)
# analyze does not take a model as an argument, calling it once on a dummy image to load # analyze does not take a model as an argument, calling it once on a dummy image to load
# the model # the model
DeepFace.analyze(dummy_img, actions=["emotion"], enforce_detection=False) DeepFace.analyze(dummy_img, actions=['emotion'], enforce_detection=False)
print("Deepface Emotion Model loaded.")
def sorted_dominant_emotions(self, image) -> list[str]: def sorted_dominant_emotions(self, image) -> list[str]:
analysis = DeepFace.analyze(image, actions=["emotion"], enforce_detection=False) analysis = DeepFace.analyze(image,
actions=['emotion'],
enforce_detection=False
)
# Sort faces by x coordinate to maintain left-to-right order # Sort faces by x coordinate to maintain left-to-right order
analysis.sort(key=lambda face: face["region"]["x"]) analysis.sort(key=lambda face: face['region']['x'])
analysis = [face for face in analysis if face["face_confidence"] >= 0.90] analysis = [face for face in analysis if face['face_confidence'] >= 0.90]
dominant_emotions = [face["dominant_emotion"] for face in analysis] dominant_emotions = [face['dominant_emotion'] for face in analysis]
return dominant_emotions return dominant_emotions

View File

@@ -404,28 +404,22 @@ class UserInterruptAgent(BaseAgent):
if pause == "true": if pause == "true":
# Send pause to VAD and VED agent # Send pause to VAD and VED agent
vad_message = InternalMessage( vad_message = InternalMessage(
to=[ to=[settings.agent_settings.vad_name,
settings.agent_settings.vad_name, settings.agent_settings.visual_emotion_recognition_name],
settings.agent_settings.visual_emotion_recognition_name,
settings.agent_settings.face_agent_name,
],
sender=self.name, sender=self.name,
body="PAUSE", body="PAUSE",
) )
await self.send(vad_message) await self.send(vad_message)
# Voice Activity Detection and Visual Emotion Recognition agents # Voice Activity Detection and Visual Emotion Recognition agents
self.logger.info("Sent pause command to perception agents.") self.logger.info("Sent pause command to VAD and VED agents.")
else: else:
# Send resume to VAD and VED agents # Send resume to VAD and VED agents
vad_message = InternalMessage( vad_message = InternalMessage(
to=[ to=[settings.agent_settings.vad_name,
settings.agent_settings.vad_name, settings.agent_settings.visual_emotion_recognition_name],
settings.agent_settings.visual_emotion_recognition_name,
settings.agent_settings.face_agent_name,
],
sender=self.name, sender=self.name,
body="RESUME", body="RESUME",
) )
await self.send(vad_message) await self.send(vad_message)
# Voice Activity Detection and Visual Emotion Recognition agents # Voice Activity Detection and Visual Emotion Recognition agents
self.logger.info("Sent resume command to perception agents.") self.logger.info("Sent resume command to VAD and VED agents.")

View File

@@ -12,7 +12,7 @@ api_router = APIRouter()
api_router.include_router(message.router, tags=["Messages"]) api_router.include_router(message.router, tags=["Messages"])
api_router.include_router(robot.router, prefix="/robot", tags=["Pings", "Commands", "Face"]) api_router.include_router(robot.router, prefix="/robot", tags=["Pings", "Commands"])
api_router.include_router(logs.router, tags=["Logs"]) api_router.include_router(logs.router, tags=["Logs"])

View File

@@ -64,7 +64,6 @@ class AgentSettings(BaseModel):
robot_speech_name: str = "robot_speech_agent" robot_speech_name: str = "robot_speech_agent"
robot_gesture_name: str = "robot_gesture_agent" robot_gesture_name: str = "robot_gesture_agent"
user_interrupt_name: str = "user_interrupt_agent" user_interrupt_name: str = "user_interrupt_agent"
face_agent_name: str = "face_detection_agent"
class BehaviourSettings(BaseModel): class BehaviourSettings(BaseModel):
@@ -83,12 +82,12 @@ class BehaviourSettings(BaseModel):
:ivar transcription_words_per_token: Estimated words per token for transcription timing. :ivar transcription_words_per_token: Estimated words per token for transcription timing.
:ivar transcription_token_buffer: Buffer for transcription tokens. :ivar transcription_token_buffer: Buffer for transcription tokens.
:ivar conversation_history_length_limit: The maximum amount of messages to extract beliefs from. :ivar conversation_history_length_limit: The maximum amount of messages to extract beliefs from.
:ivar trigger_time_to_wait: Amount of milliseconds to wait before informing the UI about trigger
completion.
:ivar visual_emotion_recognition_window_duration_s: Duration in seconds over which to aggregate :ivar visual_emotion_recognition_window_duration_s: Duration in seconds over which to aggregate
emotions and update emotion beliefs. emotions and update emotion beliefs.
:ivar visual_emotion_recognition_min_frames_per_face: Minimum number of frames per face required :ivar visual_emotion_recognition_min_frames_per_face: Minimum number of frames per face required
to consider a face valid. to consider a face valid.
:ivar trigger_time_to_wait: Amount of milliseconds to wait before informing the UI about trigger
completion.
""" """
# ATTENTION: When adding/removing settings, make sure to update the .env.example file # ATTENTION: When adding/removing settings, make sure to update the .env.example file
@@ -112,13 +111,12 @@ class BehaviourSettings(BaseModel):
# Text belief extractor settings # Text belief extractor settings
conversation_history_length_limit: int = 10 conversation_history_length_limit: int = 10
# AgentSpeak related settings
trigger_time_to_wait: int = 2000
agentspeak_file: str = "src/control_backend/agents/bdi/agentspeak.asl"
# Visual Emotion Recognition settings # Visual Emotion Recognition settings
visual_emotion_recognition_window_duration_s: int = 5 visual_emotion_recognition_window_duration_s: int = 5
visual_emotion_recognition_min_frames_per_face: int = 3 visual_emotion_recognition_min_frames_per_face: int = 3
# AgentSpeak related settings
trigger_time_to_wait: int = 2000
agentspeak_file: str = "src/control_backend/agents/bdi/agentspeak.asl"
class LLMSettings(BaseModel): class LLMSettings(BaseModel):

View File

@@ -7,7 +7,7 @@ University within the Software Project course.
from enum import Enum from enum import Enum
from typing import Literal from typing import Literal
from pydantic import UUID4, BaseModel from pydantic import UUID4, BaseModel, field_validator
class ProgramElement(BaseModel): class ProgramElement(BaseModel):
@@ -24,6 +24,13 @@ class ProgramElement(BaseModel):
# To make program elements hashable # To make program elements hashable
model_config = {"frozen": True} model_config = {"frozen": True}
@field_validator("name")
@classmethod
def name_must_not_start_with_number(cls, v: str) -> str:
if v and v[0].isdigit():
raise ValueError('Field "name" must not start with a number.')
return v
class LogicalOperator(Enum): class LogicalOperator(Enum):
""" """
@@ -41,8 +48,8 @@ class LogicalOperator(Enum):
OR = "OR" OR = "OR"
type Belief = KeywordBelief | SemanticBelief | InferredBelief | EmotionBelief | FaceBelief type Belief = KeywordBelief | SemanticBelief | InferredBelief | EmotionBelief
type BasicBelief = KeywordBelief | SemanticBelief | EmotionBelief | FaceBelief type BasicBelief = KeywordBelief | SemanticBelief | EmotionBelief
class KeywordBelief(ProgramElement): class KeywordBelief(ProgramElement):
@@ -117,16 +124,6 @@ class EmotionBelief(ProgramElement):
emotion: str emotion: str
class FaceBelief(ProgramElement):
"""
Represents the belief that at least one face is currently detected.
This belief is maintained by a perception agent (not inferred).
"""
face_present: bool
name: str = ""
class Norm(ProgramElement): class Norm(ProgramElement):
""" """
Base class for behavioral norms that guide the robot's interactions. Base class for behavioral norms that guide the robot's interactions.

View File

@@ -1,152 +0,0 @@
from unittest.mock import AsyncMock, MagicMock
import pytest
import zmq
import control_backend.agents.perception.face_rec_agent as face_module
from control_backend.agents.perception.face_rec_agent import FacePerceptionAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.schemas.belief_message import BeliefMessage
@pytest.fixture
def agent():
"""Return a FacePerceptionAgent instance for testing."""
return FacePerceptionAgent(
name="face_agent",
zmq_address="inproc://test",
zmq_bind=False,
)
@pytest.fixture
def socket():
"""Return a mocked ZMQ socket."""
sock = AsyncMock()
sock.setsockopt_string = MagicMock()
sock.connect = MagicMock()
sock.bind = MagicMock()
return sock
def test_connect_socket_connect(agent, socket, monkeypatch):
"""Test that _connect_socket properly connects when zmq_bind=False."""
ctx = MagicMock()
ctx.socket.return_value = socket
monkeypatch.setattr(face_module.azmq, "Context", MagicMock(instance=lambda: ctx))
agent._connect_socket()
socket.setsockopt_string.assert_called_once_with(zmq.SUBSCRIBE, "")
socket.connect.assert_called_once_with(agent._zmq_address)
socket.bind.assert_not_called()
def test_connect_socket_bind(agent, socket, monkeypatch):
"""Test that _connect_socket properly binds when zmq_bind=True."""
agent._zmq_bind = True
ctx = MagicMock()
ctx.socket.return_value = socket
monkeypatch.setattr(face_module.azmq, "Context", MagicMock(instance=lambda: ctx))
agent._connect_socket()
socket.bind.assert_called_once_with(agent._zmq_address)
socket.connect.assert_not_called()
def test_connect_socket_twice_is_noop(agent, socket):
"""Test that calling _connect_socket twice does not overwrite an existing socket."""
agent._socket = socket
agent._connect_socket()
assert agent._socket is socket
@pytest.mark.asyncio
async def test_update_face_belief_present(agent):
"""Test that _update_face_belief(True) creates the 'face_present' belief."""
agent.send = AsyncMock()
await agent._update_face_belief(True)
msg = agent.send.await_args.args[0]
payload = BeliefMessage.model_validate_json(msg.body)
assert payload.create[0].name == "face_present"
@pytest.mark.asyncio
async def test_update_face_belief_absent(agent):
"""Test that _update_face_belief(False) deletes the 'face_present' belief."""
agent.send = AsyncMock()
await agent._update_face_belief(False)
msg = agent.send.await_args.args[0]
payload = BeliefMessage.model_validate_json(msg.body)
assert payload.delete[0].name == "face_present"
@pytest.mark.asyncio
async def test_post_face_belief_present(agent):
"""Test that _post_face_belief(True) sends a belief creation message."""
agent.send = AsyncMock()
await agent._post_face_belief(True)
msg = agent.send.await_args.args[0]
assert '"create"' in msg.body and '"face_present"' in msg.body
@pytest.mark.asyncio
async def test_post_face_belief_absent(agent):
"""Test that _post_face_belief(False) sends a belief deletion message."""
agent.send = AsyncMock()
await agent._post_face_belief(False)
msg = agent.send.await_args.args[0]
assert '"delete"' in msg.body and '"face_present"' in msg.body
@pytest.mark.asyncio
async def test_handle_pause(agent):
"""Test that a 'PAUSE' message clears _paused and resets _last_face_state."""
agent._paused.set()
agent._last_face_state = True
msg = InternalMessage(
to=agent.name,
sender=face_module.settings.agent_settings.user_interrupt_name,
thread="cmd",
body="PAUSE",
)
await agent.handle_message(msg)
assert not agent._paused.is_set()
assert agent._last_face_state is None
@pytest.mark.asyncio
async def test_handle_resume(agent):
"""Test that a 'RESUME' message sets _paused."""
agent._paused.clear()
msg = InternalMessage(
to=agent.name,
sender=face_module.settings.agent_settings.user_interrupt_name,
thread="cmd",
body="RESUME",
)
await agent.handle_message(msg)
assert agent._paused.is_set()
@pytest.mark.asyncio
async def test_handle_unknown_command(agent):
"""Test that an unknown command from UserInterruptAgent is ignored (logs a warning)."""
msg = InternalMessage(
to=agent.name,
sender=face_module.settings.agent_settings.user_interrupt_name,
thread="cmd",
body="???",
)
await agent.handle_message(msg)
@pytest.mark.asyncio
async def test_handle_unknown_sender(agent):
"""Test that messages from unknown senders are ignored."""
msg = InternalMessage(
to=agent.name,
sender="someone_else",
thread="cmd",
body="PAUSE",
)
await agent.handle_message(msg)

View File

@@ -0,0 +1,338 @@
import asyncio
import json
import time
from unittest.mock import AsyncMock, MagicMock, patch
import numpy as np
import pytest
import zmq
from pydantic_core import ValidationError
# Adjust the import path to match your project structure
from control_backend.agents.perception.visual_emotion_recognition_agent.visual_emotion_recognition_agent import ( # noqa
VisualEmotionRecognitionAgent,
)
from control_backend.core.agent_system import InternalMessage
# -----------------------------------------------------------------------------
# Fixtures
# -----------------------------------------------------------------------------
@pytest.fixture
def mock_settings():
with patch("control_backend.agents.perception.visual_emotion_recognition_agent.visual_emotion_recognition_agent.settings") as mock: # noqa
# Set default values required by the agent
mock.behaviour_settings.visual_emotion_recognition_window_duration_s = 5
mock.behaviour_settings.visual_emotion_recognition_min_frames_per_face = 3
mock.agent_settings.bdi_core_name = "bdi_core_agent"
mock.agent_settings.user_interrupt_name = "user_interrupt_agent"
yield mock
@pytest.fixture
def mock_deepface():
with patch("control_backend.agents.perception.visual_emotion_recognition_agent.visual_emotion_recognition_agent.DeepFaceEmotionRecognizer") as mock: # noqa
instance = mock.return_value
instance.sorted_dominant_emotions.return_value = []
yield instance
@pytest.fixture
def mock_zmq_context():
with patch("zmq.asyncio.Context.instance") as mock_ctx:
mock_socket = MagicMock()
# Mock socket methods to return None or AsyncMock for async methods
mock_socket.bind = MagicMock()
mock_socket.connect = MagicMock()
mock_socket.setsockopt = MagicMock()
mock_socket.setsockopt_string = MagicMock()
mock_socket.recv_multipart = AsyncMock()
mock_socket.close = MagicMock()
mock_ctx.return_value.socket.return_value = mock_socket
yield mock_ctx
@pytest.fixture
def agent(mock_settings, mock_deepface, mock_zmq_context):
# Initialize agent with specific params to control testing
agent = VisualEmotionRecognitionAgent(
name="test_agent",
socket_address="tcp://localhost:5555",
bind=False,
timeout_ms=100,
window_duration=2,
min_frames_required=2
)
# Mock the internal send method from BaseAgent
agent.send = AsyncMock()
# Mock the add_behavior method from BaseAgent
agent.add_behavior = MagicMock()
# Mock the logger
agent.logger = MagicMock()
return agent
# -----------------------------------------------------------------------------
# Tests
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_initialization(agent):
"""Test that the agent initializes with correct attributes."""
assert agent.name == "test_agent"
assert agent.socket_address == "tcp://localhost:5555"
assert agent.socket_bind is False
assert agent.timeout_ms == 100
assert agent._paused.is_set()
@pytest.mark.asyncio
async def test_setup_connect(agent, mock_zmq_context, mock_deepface):
"""Test setup routine when binding is False (connect)."""
agent.socket_bind = False
await agent.setup()
socket = agent.video_in_socket
socket.connect.assert_called_with("tcp://localhost:5555")
socket.bind.assert_not_called()
socket.setsockopt.assert_any_call(zmq.RCVHWM, 3)
socket.setsockopt.assert_any_call(zmq.RCVTIMEO, 100)
agent.add_behavior.assert_called_once()
assert agent.emotion_recognizer == mock_deepface
@pytest.mark.asyncio
async def test_setup_bind(agent, mock_zmq_context):
"""Test setup routine when binding is True."""
agent.socket_bind = True
await agent.setup()
socket = agent.video_in_socket
socket.bind.assert_called_with("tcp://localhost:5555")
socket.connect.assert_not_called()
@pytest.mark.asyncio
async def test_emotion_update_loop_normal_flow(agent, mock_deepface):
"""
Test the main loop logic:
1. Receive frames
2. Aggregate stats
3. Trigger window update
4. Call update_emotions
"""
# Setup dependencies
await agent.setup()
agent._running = True
# Create fake image data (10x10 pixels)
width, height = 10, 10
image_bytes = np.zeros((10, 10, 3), dtype=np.uint8).tobytes()
w_bytes = width.to_bytes(4, 'little')
h_bytes = height.to_bytes(4, 'little')
# Mock ZMQ receive to return data 3 times, then stop the loop
# We use a side_effect on recv_multipart to simulate frames and then stop the loop
async def recv_side_effect():
if agent._running:
return w_bytes, h_bytes, image_bytes
raise asyncio.CancelledError()
agent.video_in_socket.recv_multipart.side_effect = recv_side_effect
# Mock DeepFace to return emotions
# Frame 1: Happy
# Frame 2: Happy
# Frame 3: Happy (Trigger window)
mock_deepface.sorted_dominant_emotions.side_effect = [
["happy"],
["happy"],
["happy"]
]
# Mock update_emotions to verify it's called
agent.update_emotions = AsyncMock()
# Mock time.time to simulate window passage
# We need time to advance significantly after the frames are collected
start_time = time.time()
with patch("time.time") as mock_time:
# Sequence of time calls:
# 1. Init next_window_time calculation
# 2. Loop 1 check
# 3. Loop 2 check
# 4. Loop 3 check (Make this one pass the window threshold)
mock_time.side_effect = [
start_time, # init
start_time + 0.1, # frame 1 check
start_time + 0.2, # frame 2 check
start_time + 10.0, # frame 3 check (triggers window reset)
start_time + 10.1, # next init
start_time + 10.2 # break loop
]
# We need to manually break the infinite loop after the update
# We can do this by wrapping update_emotions to set _running = False
async def stop_loop(*args, **kwargs):
agent._running = False
agent.update_emotions.side_effect = stop_loop
# Run the loop
await agent.emotion_update_loop()
# Verifications
assert agent.update_emotions.called
# Check that it detected 'happy' as dominant (2 required, 3 found)
call_args = agent.update_emotions.call_args
assert call_args is not None
# args: (prev_emotions, window_dominant_emotions)
assert call_args[0][1] == {"happy"}
@pytest.mark.asyncio
async def test_emotion_update_loop_insufficient_frames(agent, mock_deepface):
"""Test that emotions are NOT updated if min_frames_required is not met."""
await agent.setup()
agent._running = True
agent.min_frames_required = 5 # Set high requirement
width, height = 10, 10
image_bytes = np.zeros((10, 10, 3), dtype=np.uint8).tobytes()
w_bytes = width.to_bytes(4, 'little')
h_bytes = height.to_bytes(4, 'little')
agent.video_in_socket.recv_multipart.return_value = (w_bytes, h_bytes, image_bytes)
mock_deepface.sorted_dominant_emotions.return_value = ["sad"]
agent.update_emotions = AsyncMock()
with patch("time.time") as mock_time:
# Time setup to trigger window processing immediately
mock_time.side_effect = [0, 100, 101]
# Stop loop after first pass
async def stop_loop(*args, **kwargs):
agent._running = False
agent.update_emotions.side_effect = stop_loop
await agent.emotion_update_loop()
# It should call update_emotions with EMPTY set because min frames (5) > detected (1)
call_args = agent.update_emotions.call_args
assert call_args[0][1] == set()
@pytest.mark.asyncio
async def test_emotion_update_loop_zmq_again_and_exception(agent):
"""Test that the loop handles ZMQ timeouts (Again) and generic exceptions."""
await agent.setup()
agent._running = True
# Side effect:
# 1. Raise ZMQ Again (Timeout) -> should log warning
# 2. Raise Generic Exception -> should log error
# 3. Raise CancelledError -> stop loop (simulating stop)
agent.video_in_socket.recv_multipart.side_effect = [
zmq.Again(),
RuntimeError("Random Failure"),
asyncio.CancelledError() # To break loop cleanly
]
# We need to ensure the loop doesn't block on _paused
agent._paused.set()
# Run loop
try:
await agent.emotion_update_loop()
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_update_emotions_logic(agent, mock_settings):
"""Test the logic for calculating diffs and sending messages."""
agent.name = "viz_agent"
# Case 1: No change
await agent.update_emotions({"happy"}, {"happy"})
agent.send.assert_not_called()
# Case 2: Remove 'happy', Add 'sad'
await agent.update_emotions({"happy"}, {"sad"})
assert agent.send.called
call_args = agent.send.call_args
msg = call_args[0][0] # InternalMessage object
assert msg.to == mock_settings.agent_settings.bdi_core_name
assert msg.sender == "viz_agent"
assert msg.thread == "beliefs"
payload = json.loads(msg.body)
# Check Created Beliefs
assert len(payload["create"]) == 1
assert payload["create"][0]["name"] == "emotion_detected"
assert payload["create"][0]["arguments"] == ["sad"]
# Check Deleted Beliefs
assert len(payload["delete"]) == 1
assert payload["delete"][0]["name"] == "emotion_detected"
assert payload["delete"][0]["arguments"] == ["happy"]
@pytest.mark.asyncio
async def test_update_emotions_validation_error(agent):
"""Test that ValidationErrors during Belief creation are caught."""
# We patch Belief to raise ValidationError
with patch("control_backend.agents.perception.visual_emotion_recognition_agent.visual_emotion_recognition_agent.Belief") as MockBelief: # noqa
MockBelief.side_effect = ValidationError.from_exception_data("Simulated Error", [])
# Try to update emotions
await agent.update_emotions(prev_emotions={"happy"}, emotions={"sad"})
# Verify empty payload is sent (or payload with valid ones if mixed)
# In this case both failed, so payload lists should be empty
assert agent.send.called
msg = agent.send.call_args[0][0]
payload = json.loads(msg.body)
assert payload["create"] == []
assert payload["delete"] == []
@pytest.mark.asyncio
async def test_handle_message(agent, mock_settings):
"""Test message handling for Pause/Resume."""
# Setup
ui_name = mock_settings.agent_settings.user_interrupt_name
# 1. PAUSE message
msg_pause = InternalMessage(to="me", sender=ui_name, body="PAUSE")
await agent.handle_message(msg_pause)
assert not agent._paused.is_set() # Should be cleared (paused)
agent.logger.info.assert_called_with("Pausing Visual Emotion Recognition processing.")
# 2. RESUME message
msg_resume = InternalMessage(to="me", sender=ui_name, body="RESUME")
await agent.handle_message(msg_resume)
assert agent._paused.is_set() # Should be set (running)
# 3. Unknown command
msg_unknown = InternalMessage(to="me", sender=ui_name, body="DANCE")
await agent.handle_message(msg_unknown)
# 4. Unknown sender
msg_random = InternalMessage(to="me", sender="random_guy", body="PAUSE")
await agent.handle_message(msg_random)
@pytest.mark.asyncio
async def test_stop(agent, mock_zmq_context):
"""Test the stop method cleans up resources."""
# We need to mock super().stop(). Since we can't easily patch super(),
# and the provided BaseAgent code shows stop() just sets _running and cancels tasks,
# we can rely on the fact that VisualEmotionRecognitionAgent calls it.
# However, since we provided a 'agent' fixture that mocks things, we should verify specific cleanups. # noqa
await agent.setup()
with patch("control_backend.agents.BaseAgent.stop", new_callable=AsyncMock) as mock_super_stop:
await agent.stop()
# Verify socket closed
agent.video_in_socket.close.assert_called_once()
# Verify parent stop called
mock_super_stop.assert_called_once()

View File

@@ -303,6 +303,33 @@ async def test_send_experiment_control(agent):
assert msg.thread == "reset_experiment" assert msg.thread == "reset_experiment"
@pytest.mark.asyncio
async def test_send_pause_command(agent):
# --- Test PAUSE ---
await agent._send_pause_command("true")
# Should send exactly 1 message
assert agent.send.await_count == 1
# Extract the message object from the mock call
# call_args[0] are positional args, and [0] is the first arg (the message)
msg = agent.send.call_args[0][0]
# Verify Body
assert msg.body == "PAUSE"
# --- Test RESUME ---
agent.send.reset_mock()
await agent._send_pause_command("false")
# Should send exactly 1 message
assert agent.send.await_count == 1
msg = agent.send.call_args[0][0]
# Verify Body
assert msg.body == "RESUME"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_setup(agent): async def test_setup(agent):
"""Test the setup method initializes sockets correctly.""" """Test the setup method initializes sockets correctly."""

6
uv.lock generated
View File

@@ -1,5 +1,5 @@
version = 1 version = 1
revision = 2 revision = 3
requires-python = ">=3.13" requires-python = ">=3.13"
resolution-markers = [ resolution-markers = [
"python_full_version >= '3.14' and sys_platform == 'darwin'", "python_full_version >= '3.14' and sys_platform == 'darwin'",
@@ -1540,6 +1540,7 @@ dev = [
] ]
test = [ test = [
{ name = "agentspeak" }, { name = "agentspeak" },
{ name = "deepface" },
{ name = "fastapi" }, { name = "fastapi" },
{ name = "httpx" }, { name = "httpx" },
{ name = "mlx-whisper", marker = "sys_platform == 'darwin'" }, { name = "mlx-whisper", marker = "sys_platform == 'darwin'" },
@@ -1554,6 +1555,7 @@ test = [
{ name = "pyyaml" }, { name = "pyyaml" },
{ name = "pyzmq" }, { name = "pyzmq" },
{ name = "soundfile" }, { name = "soundfile" },
{ name = "tf-keras" },
] ]
[package.metadata] [package.metadata]
@@ -1593,6 +1595,7 @@ dev = [
] ]
test = [ test = [
{ name = "agentspeak", specifier = ">=0.2.2" }, { name = "agentspeak", specifier = ">=0.2.2" },
{ name = "deepface", specifier = ">=0.0.97" },
{ name = "fastapi", specifier = ">=0.115.6" }, { name = "fastapi", specifier = ">=0.115.6" },
{ name = "httpx", specifier = ">=0.28.1" }, { name = "httpx", specifier = ">=0.28.1" },
{ name = "mlx-whisper", marker = "sys_platform == 'darwin'", specifier = ">=0.4.3" }, { name = "mlx-whisper", marker = "sys_platform == 'darwin'", specifier = ">=0.4.3" },
@@ -1607,6 +1610,7 @@ test = [
{ name = "pyyaml", specifier = ">=6.0.3" }, { name = "pyyaml", specifier = ">=6.0.3" },
{ name = "pyzmq", specifier = ">=27.1.0" }, { name = "pyzmq", specifier = ">=27.1.0" },
{ name = "soundfile", specifier = ">=0.13.1" }, { name = "soundfile", specifier = ">=0.13.1" },
{ name = "tf-keras", specifier = ">=2.20.1" },
] ]
[[package]] [[package]]