feat: face recognition agent #53
@@ -8,7 +8,7 @@ from zmq.asyncio import Context
|
|||||||
|
|
||||||
from control_backend.agents import BaseAgent
|
from control_backend.agents import BaseAgent
|
||||||
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
|
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
|
||||||
from control_backend.agents.perception.visual_emotion_recognition_agent.visual_emotion_recognition_agent import (
|
from control_backend.agents.perception.visual_emotion_recognition_agent.visual_emotion_recognition_agent import ( # noqa
|
||||||
VisualEmotionRecognitionAgent,
|
VisualEmotionRecognitionAgent,
|
||||||
)
|
)
|
||||||
from control_backend.core.config import settings
|
from control_backend.core.config import settings
|
||||||
|
|||||||
@@ -4,28 +4,50 @@ from collections import Counter, defaultdict
|
|||||||
|
|
||||||
import cv2
|
import cv2
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from pydantic_core import ValidationError
|
|
||||||
import zmq
|
import zmq
|
||||||
import zmq.asyncio as azmq
|
import zmq.asyncio as azmq
|
||||||
|
from control_backend.agents.perception.visual_emotion_recognition_agentvisual_emotion_recognizer import ( # noqa
|
||||||
from control_backend.agents import BaseAgent
|
|
||||||
from control_backend.agents.perception.visual_emotion_recognition_agent.visual_emotion_recognizer import (
|
|
||||||
DeepFaceEmotionRecognizer,
|
DeepFaceEmotionRecognizer,
|
||||||
)
|
)
|
||||||
|
from pydantic_core import ValidationError
|
||||||
|
|
||||||
|
from control_backend.agents import BaseAgent
|
||||||
from control_backend.core.agent_system import InternalMessage
|
from control_backend.core.agent_system import InternalMessage
|
||||||
from control_backend.core.config import settings
|
from control_backend.core.config import settings
|
||||||
from control_backend.schemas.belief_message import Belief
|
from control_backend.schemas.belief_message import Belief
|
||||||
|
|
||||||
# START FROM RI COMMUNICATION AGENT?
|
|
||||||
|
|
||||||
class VisualEmotionRecognitionAgent(BaseAgent):
|
class VisualEmotionRecognitionAgent(BaseAgent):
|
||||||
def __init__(self, name, socket_address: str, bind: bool = False, timeout_ms: int = 1000):
|
def __init__(self, name: str, socket_address: str, bind: bool = False, timeout_ms: int = 1000,
|
||||||
|
window_duration:
|
||||||
|
int = settings.behaviour_settings.visual_emotion_recognition_window_duration_s
|
||||||
|
, min_frames_required: int =
|
||||||
|
settings.behaviour_settings.visual_emotion_recognition_min_frames_per_face):
|
||||||
|
"""
|
||||||
|
Initialize the Visual Emotion Recognition Agent.
|
||||||
|
|
||||||
|
:param name: Name of the agent
|
||||||
|
:param socket_address: Address of the socket to connect or bind to
|
||||||
|
:param bind: Whether to bind to the socket address (True) or connect (False)
|
||||||
|
:param timeout_ms: Timeout for socket receive operations in milliseconds
|
||||||
|
:param window_duration: Duration in seconds over which to aggregate emotions
|
||||||
|
:param min_frames_required: Minimum number of frames per face required to consider a face
|
||||||
|
valid
|
||||||
|
"""
|
||||||
super().__init__(name)
|
super().__init__(name)
|
||||||
self.socket_address = socket_address
|
self.socket_address = socket_address
|
||||||
self.socket_bind = bind
|
self.socket_bind = bind
|
||||||
self.timeout_ms = timeout_ms
|
self.timeout_ms = timeout_ms
|
||||||
|
self.window_duration = window_duration
|
||||||
|
self.min_frames_required = min_frames_required
|
||||||
|
|
||||||
async def setup(self):
|
async def setup(self):
|
||||||
|
"""
|
||||||
|
Initialize the agent resources.
|
||||||
|
1. Initializes the :class:`VisualEmotionRecognizer`.
|
||||||
|
2. Connects to the video input ZMQ socket.
|
||||||
|
3. Starts the background emotion recognition loop.
|
||||||
|
"""
|
||||||
self.logger.info("Setting up %s.", self.name)
|
self.logger.info("Setting up %s.", self.name)
|
||||||
|
|
||||||
self.emotion_recognizer = DeepFaceEmotionRecognizer()
|
self.emotion_recognizer = DeepFaceEmotionRecognizer()
|
||||||
@@ -45,17 +67,16 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
|
|
||||||
async def emotion_update_loop(self):
|
async def emotion_update_loop(self):
|
||||||
"""
|
"""
|
||||||
Retrieve a video frame from the input socket.
|
Background loop to receive video frames, recognize emotions, and update beliefs.
|
||||||
|
1. Receives video frames from the ZMQ socket.
|
||||||
|
2. Uses the :class:`VisualEmotionRecognizer` to detect emotions.
|
||||||
|
3. Aggregates emotions over a time window.
|
||||||
|
4. Sends updates to the BDI Core Agent about detected emotions.
|
||||||
"""
|
"""
|
||||||
window_duration = 5 # seconds
|
# Next time to process the window and update emotions
|
||||||
next_window_time = time.time() + window_duration
|
next_window_time = time.time() + self.window_duration
|
||||||
|
|
||||||
# To detect false positives
|
|
||||||
# Minimal number of frames a face has to be detected to consider it valid
|
|
||||||
# Can also reduce false positives by ignoring faces that are too small; not implemented
|
|
||||||
# Also use face confidence thresholding in recognizer
|
|
||||||
min_frames_required = 2
|
|
||||||
|
|
||||||
|
# Tracks counts of detected emotions per face index
|
||||||
face_stats = defaultdict(Counter)
|
face_stats = defaultdict(Counter)
|
||||||
|
|
||||||
prev_dominant_emotions = set()
|
prev_dominant_emotions = set()
|
||||||
@@ -82,20 +103,19 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
|
|
||||||
# If window duration has passed, process the collected stats
|
# If window duration has passed, process the collected stats
|
||||||
if time.time() >= next_window_time:
|
if time.time() >= next_window_time:
|
||||||
print(face_stats)
|
|
||||||
window_dominant_emotions = set()
|
window_dominant_emotions = set()
|
||||||
# Determine dominant emotion for each face in the window
|
# Determine dominant emotion for each face in the window
|
||||||
for _, counter in face_stats.items():
|
for _, counter in face_stats.items():
|
||||||
total_detections = sum(counter.values())
|
total_detections = sum(counter.values())
|
||||||
|
|
||||||
if total_detections >= min_frames_required:
|
if total_detections >= self.min_frames_required:
|
||||||
dominant_emotion = counter.most_common(1)[0][0]
|
dominant_emotion = counter.most_common(1)[0][0]
|
||||||
window_dominant_emotions.add(dominant_emotion)
|
window_dominant_emotions.add(dominant_emotion)
|
||||||
|
|
||||||
await self.update_emotions(prev_dominant_emotions, window_dominant_emotions)
|
await self.update_emotions(prev_dominant_emotions, window_dominant_emotions)
|
||||||
prev_dominant_emotions = window_dominant_emotions
|
prev_dominant_emotions = window_dominant_emotions
|
||||||
face_stats.clear()
|
face_stats.clear()
|
||||||
next_window_time = time.time() + window_duration
|
next_window_time = time.time() + self.window_duration
|
||||||
|
|
||||||
except zmq.Again:
|
except zmq.Again:
|
||||||
self.logger.warning("No video frame received within timeout.")
|
self.logger.warning("No video frame received within timeout.")
|
||||||
@@ -112,16 +132,15 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
return
|
return
|
||||||
|
|
||||||
emotion_beliefs_remove = []
|
emotion_beliefs_remove = []
|
||||||
# Remove emotions that have disappeared
|
|
||||||
for emotion in emotions_to_remove:
|
for emotion in emotions_to_remove:
|
||||||
self.logger.info(f"Emotion '{emotion}' has disappeared.")
|
self.logger.info(f"Emotion '{emotion}' has disappeared.")
|
||||||
try:
|
try:
|
||||||
emotion_beliefs_remove.append(Belief(name="emotion_detected", arguments=[emotion], remove=True))
|
emotion_beliefs_remove.append(Belief(name="emotion_detected", arguments=[emotion],
|
||||||
|
remove=True))
|
||||||
except ValidationError:
|
except ValidationError:
|
||||||
self.logger.warning("Invalid belief for emotion removal: %s", emotion)
|
self.logger.warning("Invalid belief for emotion removal: %s", emotion)
|
||||||
|
|
||||||
emotion_beliefs_add = []
|
emotion_beliefs_add = []
|
||||||
# Add new emotions that have appeared
|
|
||||||
for emotion in emotions_to_add:
|
for emotion in emotions_to_add:
|
||||||
self.logger.info(f"New emotion detected: '{emotion}'")
|
self.logger.info(f"New emotion detected: '{emotion}'")
|
||||||
try:
|
try:
|
||||||
@@ -131,7 +150,7 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
|
|
||||||
beliefs_list_add = [b.model_dump() for b in emotion_beliefs_add]
|
beliefs_list_add = [b.model_dump() for b in emotion_beliefs_add]
|
||||||
beliefs_list_remove = [b.model_dump() for b in emotion_beliefs_remove]
|
beliefs_list_remove = [b.model_dump() for b in emotion_beliefs_remove]
|
||||||
payload = {"create": beliefs_list_add, "delete": beliefs_list_remove, "replace": []}
|
payload = {"create": beliefs_list_add, "delete": beliefs_list_remove}
|
||||||
|
|
||||||
message = InternalMessage(
|
message = InternalMessage(
|
||||||
to=settings.agent_settings.bdi_core_name,
|
to=settings.agent_settings.bdi_core_name,
|
||||||
|
|||||||
@@ -11,20 +11,28 @@ class VisualEmotionRecognizer(abc.ABC):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def sorted_dominant_emotions(self, image):
|
def sorted_dominant_emotions(self, image) -> list[str]:
|
||||||
"""Recognize emotion from the given image.
|
"""
|
||||||
|
Recognize dominant emotions from faces in the given image.
|
||||||
|
Emotions can be one of ['angry', 'disgust', 'fear', 'happy', 'sad', 'surprise', 'neutral'].
|
||||||
|
To minimize false positives, consider filtering faces with low confidence.
|
||||||
|
|
||||||
:param image: The input image for emotion recognition.
|
:param image: The input image for emotion recognition.
|
||||||
:return: Detected emotion label.
|
:return: List of dominant emotion detected for each face in the image,
|
||||||
|
sorted per face.
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class DeepFaceEmotionRecognizer(VisualEmotionRecognizer):
|
class DeepFaceEmotionRecognizer(VisualEmotionRecognizer):
|
||||||
|
"""
|
||||||
|
DeepFace-based implementation of VisualEmotionRecognizer.
|
||||||
|
DeepFape has proven to be quite a pessimistic model, so expect sad, fear and neutral
|
||||||
|
emotions to be over-represented.
|
||||||
|
"""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.load_model()
|
self.load_model()
|
||||||
|
|
||||||
def load_model(self):
|
def load_model(self):
|
||||||
# Initialize DeepFace model for emotion recognition
|
|
||||||
print("Loading Deepface Emotion Model...")
|
print("Loading Deepface Emotion Model...")
|
||||||
dummy_img = np.zeros((224, 224, 3), dtype=np.uint8)
|
dummy_img = np.zeros((224, 224, 3), dtype=np.uint8)
|
||||||
# analyze does not take a model as an argument, calling it once on a dummy image to load
|
# analyze does not take a model as an argument, calling it once on a dummy image to load
|
||||||
@@ -32,7 +40,7 @@ class DeepFaceEmotionRecognizer(VisualEmotionRecognizer):
|
|||||||
DeepFace.analyze(dummy_img, actions=['emotion'], enforce_detection=False)
|
DeepFace.analyze(dummy_img, actions=['emotion'], enforce_detection=False)
|
||||||
print("Deepface Emotion Model loaded.")
|
print("Deepface Emotion Model loaded.")
|
||||||
|
|
||||||
def sorted_dominant_emotions(self, image):
|
def sorted_dominant_emotions(self, image) -> list[str]:
|
||||||
analysis = DeepFace.analyze(image,
|
analysis = DeepFace.analyze(image,
|
||||||
actions=['emotion'],
|
actions=['emotion'],
|
||||||
enforce_detection=False
|
enforce_detection=False
|
||||||
@@ -41,12 +49,7 @@ class DeepFaceEmotionRecognizer(VisualEmotionRecognizer):
|
|||||||
# Sort faces by x coordinate to maintain left-to-right order
|
# Sort faces by x coordinate to maintain left-to-right order
|
||||||
analysis.sort(key=lambda face: face['region']['x'])
|
analysis.sort(key=lambda face: face['region']['x'])
|
||||||
|
|
||||||
# Fear op 0, boost 0.2 aan happy, sad -0.1, neutral +0.1
|
|
||||||
|
|
||||||
analysis = [face for face in analysis if face['face_confidence'] >= 0.90]
|
analysis = [face for face in analysis if face['face_confidence'] >= 0.90]
|
||||||
|
|
||||||
dominant_emotions = [face['dominant_emotion'] for face in analysis]
|
dominant_emotions = [face['dominant_emotion'] for face in analysis]
|
||||||
return dominant_emotions
|
return dominant_emotions
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -78,6 +78,10 @@ class BehaviourSettings(BaseModel):
|
|||||||
:ivar transcription_words_per_token: Estimated words per token for transcription timing.
|
:ivar transcription_words_per_token: Estimated words per token for transcription timing.
|
||||||
:ivar transcription_token_buffer: Buffer for transcription tokens.
|
:ivar transcription_token_buffer: Buffer for transcription tokens.
|
||||||
:ivar conversation_history_length_limit: The maximum amount of messages to extract beliefs from.
|
:ivar conversation_history_length_limit: The maximum amount of messages to extract beliefs from.
|
||||||
|
:ivar visual_emotion_recognition_window_duration_s: Duration in seconds over which to aggregate
|
||||||
|
emotions and update emotion beliefs.
|
||||||
|
:ivar visual_emotion_recognition_min_frames_per_face: Minimum number of frames per face required
|
||||||
|
to consider a face valid.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
|
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
|
||||||
@@ -101,6 +105,9 @@ class BehaviourSettings(BaseModel):
|
|||||||
# Text belief extractor settings
|
# Text belief extractor settings
|
||||||
conversation_history_length_limit: int = 10
|
conversation_history_length_limit: int = 10
|
||||||
|
|
||||||
|
# Visual Emotion Recognition settings
|
||||||
|
visual_emotion_recognition_window_duration_s: int = 5
|
||||||
|
visual_emotion_recognition_min_frames_per_face: int = 3
|
||||||
|
|
||||||
class LLMSettings(BaseModel):
|
class LLMSettings(BaseModel):
|
||||||
"""
|
"""
|
||||||
|
|||||||
Reference in New Issue
Block a user