feat: visual emotion recognition agent
ref: N25B-393
This commit is contained in:
@@ -0,0 +1,50 @@
|
||||
import asyncio
|
||||
import zmq
|
||||
import zmq.asyncio as azmq
|
||||
|
||||
from control_backend.agents import BaseAgent
|
||||
from control_backend.agents.perception.visual_emotion_detection_agent.visual_emotion_recognizer import DeepFaceEmotionRecognizer
|
||||
from control_backend.core.agent_system import InternalMessage
|
||||
from control_backend.core.config import settings
|
||||
|
||||
# START FROM RI?
|
||||
|
||||
class VisualEmotionRecognitionAgent(BaseAgent):
|
||||
def __init__(self, socket_address: str, socket_bind: bool = False, timeout_ms: int = 1000):
|
||||
super().__init__(settings.agent_settings.visual_emotion_recognition_name)
|
||||
self.socket_address = socket_address
|
||||
self.socket_bind = socket_bind
|
||||
self.timeout_ms = timeout_ms
|
||||
|
||||
async def setup(self):
|
||||
self.logger.info("Setting up %s.", self.name)
|
||||
|
||||
self.emotion_recognizer = DeepFaceEmotionRecognizer()
|
||||
|
||||
self.video_in_socket = azmq.Context.instance().socket(zmq.SUB)
|
||||
|
||||
if self.socket_bind:
|
||||
self.video_in_socket.bind(self.socket_address)
|
||||
else:
|
||||
self.video_in_socket.connect(self.socket_address)
|
||||
|
||||
self.video_in_socket.setsockopt_string(zmq.SUBSCRIBE, "")
|
||||
self.video_in_socket.setsockopt(zmq.RCVTIMEO, self.timeout_ms)
|
||||
self.video_in_socket.setsockopt(zmq.CONFLATE, 1)
|
||||
|
||||
self.add_behavior(self.retrieve_frame())
|
||||
|
||||
async def retrieve_frame(self):
|
||||
"""
|
||||
Retrieve a video frame from the input socket.
|
||||
|
||||
:return: The received video frame, or None if timeout occurs.
|
||||
"""
|
||||
await asyncio.sleep(1) # Yield control to the event loop
|
||||
try:
|
||||
frame = await self.video_in_socket.recv()
|
||||
# detected_emotions contains a list of dictionaries as follows:
|
||||
detected_emotions = self.emotion_recognizer.detect(frame)
|
||||
except zmq.Again:
|
||||
self.logger.debug("No video frame received within timeout.")
|
||||
return None
|
||||
@@ -0,0 +1,35 @@
|
||||
import abc
|
||||
from deepface import DeepFace
|
||||
import numpy as np
|
||||
|
||||
class VisualEmotionRecognizer(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def load_model(self):
|
||||
"""Load the visual emotion recognition model into memory."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def detect(self, image):
|
||||
"""Recognize emotion from the given image.
|
||||
|
||||
:param image: The input image for emotion recognition.
|
||||
:return: Detected emotion label.
|
||||
"""
|
||||
pass
|
||||
|
||||
class DeepFaceEmotionRecognizer(VisualEmotionRecognizer):
|
||||
def __init__(self):
|
||||
self.load_model()
|
||||
|
||||
def load_model(self):
|
||||
# Initialize DeepFace model for emotion recognition
|
||||
print("Loading Deepface Emotion Model...")
|
||||
dummy_img = np.zeros((224, 224, 3), dtype=np.uint8)
|
||||
# analyze does not take a model as an argument, calling it once on a dummy image to load
|
||||
# the model
|
||||
DeepFace.analyze(dummy_img, actions=['emotion'], enforce_detection=False)
|
||||
print("Deepface Emotion Model loaded.")
|
||||
|
||||
def detect(self, image):
|
||||
analysis = DeepFace.analyze(image, actions=['emotion'], enforce_detection=False)
|
||||
return analysis['dominant_emotion']
|
||||
Reference in New Issue
Block a user