feat: implemented visual emotion recogntion agent

ref: N25B-393
This commit is contained in:
Storm
2026-01-16 09:50:59 +01:00
parent 1c88ae6078
commit 0771b0d607
3 changed files with 99 additions and 15 deletions

View File

@@ -1,13 +1,17 @@
import asyncio
import zmq
import zmq.asyncio as azmq
import numpy as np
import cv2
from collections import defaultdict, Counter
import time
from control_backend.agents import BaseAgent
from control_backend.agents.perception.visual_emotion_detection_agent.visual_emotion_recognizer import DeepFaceEmotionRecognizer
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
# START FROM RI?
# START FROM RI COMMUNICATION AGENT?
class VisualEmotionRecognitionAgent(BaseAgent):
def __init__(self, socket_address: str, socket_bind: bool = False, timeout_ms: int = 1000):
@@ -32,19 +36,76 @@ class VisualEmotionRecognitionAgent(BaseAgent):
self.video_in_socket.setsockopt(zmq.RCVTIMEO, self.timeout_ms)
self.video_in_socket.setsockopt(zmq.CONFLATE, 1)
self.add_behavior(self.retrieve_frame())
self.add_behavior(self.emotion_update_loop())
async def retrieve_frame(self):
async def emotion_update_loop(self):
"""
Retrieve a video frame from the input socket.
:return: The received video frame, or None if timeout occurs.
"""
await asyncio.sleep(1) # Yield control to the event loop
try:
frame = await self.video_in_socket.recv()
# detected_emotions contains a list of dictionaries as follows:
detected_emotions = self.emotion_recognizer.detect(frame)
except zmq.Again:
self.logger.debug("No video frame received within timeout.")
return None
window_duration = 1 # seconds
next_window_time = time.time() + window_duration
# To detect false positives
# Minimal number of frames a face has to be detected to consider it valid
# Can also reduce false positives by ignoring faces that are too small; not implemented
# Also use face confidence thresholding in recognizer
min_frames_required = 2
face_stats = defaultdict(Counter)
prev_dominant_emotions = set()
while self._running:
try:
frame_bytes = await self.video_in_socket.recv()
# Convert bytes to a numpy buffer
nparr = np.frombuffer(frame_bytes, np.uint8)
# Decode image into the generic Numpy Array DeepFace expects
frame_image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame_image is None:
# Could not decode image, skip this frame
continue
# Get the dominant emotion from each face
current_emotions = self.emotion_recognizer.sorted_dominant_emotions(frame_image)
# Update emotion counts for each detected face
for i, emotion in enumerate(current_emotions):
face_stats[i][emotion] += 1
# If window duration has passed, process the collected stats
if time.time() >= next_window_time:
window_dominant_emotions = set()
# Determine dominant emotion for each face in the window
for _, counter in face_stats.items():
total_detections = sum(counter.values())
if total_detections >= min_frames_required:
dominant_emotion = counter.most_common(1)[0][0]
window_dominant_emotions.add(dominant_emotion)
await self.update_emotions(prev_dominant_emotions, window_dominant_emotions)
prev_dominant_emotions = window_dominant_emotions
face_stats.clear()
next_window_time = time.time() + window_duration
except zmq.Again:
self.logger.warning("No video frame received within timeout.")
async def update_emotions(self, prev_emotions, emotions):
# Remove emotions that are no longer present
emotions_to_remove = prev_emotions - emotions
for emotion in emotions_to_remove:
self.logger.info(f"Emotion '{emotion}' has disappeared.")
# Add new emotions that have appeared
new_emotions = emotions - prev_emotions
for emotion in new_emotions:
self.logger.info(f"New emotion detected: '{emotion}'")

View File

@@ -1,6 +1,7 @@
import abc
from deepface import DeepFace
import numpy as np
from collections import Counter
class VisualEmotionRecognizer(abc.ABC):
@abc.abstractmethod
@@ -9,7 +10,7 @@ class VisualEmotionRecognizer(abc.ABC):
pass
@abc.abstractmethod
def detect(self, image):
def sorted_dominant_emotions(self, image):
"""Recognize emotion from the given image.
:param image: The input image for emotion recognition.
@@ -29,7 +30,21 @@ class DeepFaceEmotionRecognizer(VisualEmotionRecognizer):
# the model
DeepFace.analyze(dummy_img, actions=['emotion'], enforce_detection=False)
print("Deepface Emotion Model loaded.")
def sorted_dominant_emotions(self, image):
analysis = DeepFace.analyze(image,
actions=['emotion'],
enforce_detection=False
)
# Sort faces by x coordinate to maintain left-to-right order
analysis.sort(key=lambda face: face['region']['x'])
def detect(self, image):
analysis = DeepFace.analyze(image, actions=['emotion'], enforce_detection=False)
return analysis['dominant_emotion']
analysis = [face for face in analysis if face['face_confidence'] >= 0.90]
# Return list of (dominant_emotion, face_confidence) tuples
dominant_emotions = [face['dominant_emotion'] for face in analysis]
return dominant_emotions

View File

@@ -40,6 +40,7 @@ from control_backend.agents.communication import RICommunicationAgent
from control_backend.agents.llm import LLMAgent
# User Interrupt Agent
from control_backend.agents.perception.visual_emotion_detection_agent.visual_emotion_recognition_agent import VisualEmotionRecognitionAgent
from control_backend.agents.user_interrupt.user_interrupt_agent import UserInterruptAgent
# Other backend imports
@@ -147,6 +148,13 @@ async def lifespan(app: FastAPI):
"name": settings.agent_settings.user_interrupt_name,
},
),
# TODO: Spawn agent from RI Communication Agent
"VisualEmotionRecognitionAgent": (
VisualEmotionRecognitionAgent,
{
"socket_address": "tcp://localhost:5556", # TODO: move to settings
},
),
}
agents = []