feat: visual emotion recognition agent #54
@@ -6,23 +6,27 @@ import cv2
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
import zmq
|
import zmq
|
||||||
import zmq.asyncio as azmq
|
import zmq.asyncio as azmq
|
||||||
from control_backend.agents.perception.visual_emotion_recognition_agentvisual_emotion_recognizer import ( # noqa
|
|
||||||
DeepFaceEmotionRecognizer,
|
|
||||||
)
|
|
||||||
from pydantic_core import ValidationError
|
from pydantic_core import ValidationError
|
||||||
|
|
||||||
from control_backend.agents import BaseAgent
|
from control_backend.agents import BaseAgent
|
||||||
|
from control_backend.agents.perception.visual_emotion_recognition_agent.visual_emotion_recognizer import ( # noqa
|
||||||
|
DeepFaceEmotionRecognizer,
|
||||||
|
)
|
||||||
from control_backend.core.agent_system import InternalMessage
|
from control_backend.core.agent_system import InternalMessage
|
||||||
from control_backend.core.config import settings
|
from control_backend.core.config import settings
|
||||||
from control_backend.schemas.belief_message import Belief
|
from control_backend.schemas.belief_message import Belief
|
||||||
|
|
||||||
|
|
||||||
class VisualEmotionRecognitionAgent(BaseAgent):
|
class VisualEmotionRecognitionAgent(BaseAgent):
|
||||||
def __init__(self, name: str, socket_address: str, bind: bool = False, timeout_ms: int = 1000,
|
def __init__(
|
||||||
window_duration:
|
self,
|
||||||
int = settings.behaviour_settings.visual_emotion_recognition_window_duration_s
|
name: str,
|
||||||
, min_frames_required: int =
|
socket_address: str,
|
||||||
settings.behaviour_settings.visual_emotion_recognition_min_frames_per_face):
|
bind: bool = False,
|
||||||
|
timeout_ms: int = 1000,
|
||||||
|
window_duration: int = settings.behaviour_settings.visual_emotion_recognition_window_duration_s, # noqa
|
||||||
|
min_frames_required: int = settings.behaviour_settings.visual_emotion_recognition_min_frames_per_face, # noqa
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Initialize the Visual Emotion Recognition Agent.
|
Initialize the Visual Emotion Recognition Agent.
|
||||||
|
|
||||||
@@ -31,7 +35,7 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
:param bind: Whether to bind to the socket address (True) or connect (False)
|
:param bind: Whether to bind to the socket address (True) or connect (False)
|
||||||
:param timeout_ms: Timeout for socket receive operations in milliseconds
|
:param timeout_ms: Timeout for socket receive operations in milliseconds
|
||||||
:param window_duration: Duration in seconds over which to aggregate emotions
|
:param window_duration: Duration in seconds over which to aggregate emotions
|
||||||
:param min_frames_required: Minimum number of frames per face required to consider a face
|
:param min_frames_required: Minimum number of frames per face required to consider a face
|
||||||
valid
|
valid
|
||||||
"""
|
"""
|
||||||
super().__init__(name)
|
super().__init__(name)
|
||||||
@@ -78,29 +82,29 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
|
|
||||||
# Tracks counts of detected emotions per face index
|
# Tracks counts of detected emotions per face index
|
||||||
face_stats = defaultdict(Counter)
|
face_stats = defaultdict(Counter)
|
||||||
|
|
||||||
prev_dominant_emotions = set()
|
prev_dominant_emotions = set()
|
||||||
|
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
frame_bytes = await self.video_in_socket.recv()
|
frame_bytes = await self.video_in_socket.recv()
|
||||||
|
|
||||||
# Convert bytes to a numpy buffer
|
# Convert bytes to a numpy buffer
|
||||||
nparr = np.frombuffer(frame_bytes, np.uint8)
|
nparr = np.frombuffer(frame_bytes, np.uint8)
|
||||||
|
|
||||||
# Decode image into the generic Numpy Array DeepFace expects
|
# Decode image into the generic Numpy Array DeepFace expects
|
||||||
frame_image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
frame_image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||||||
|
|
||||||
if frame_image is None:
|
if frame_image is None:
|
||||||
# Could not decode image, skip this frame
|
# Could not decode image, skip this frame
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Get the dominant emotion from each face
|
# Get the dominant emotion from each face
|
||||||
current_emotions = self.emotion_recognizer.sorted_dominant_emotions(frame_image)
|
current_emotions = self.emotion_recognizer.sorted_dominant_emotions(frame_image)
|
||||||
# Update emotion counts for each detected face
|
# Update emotion counts for each detected face
|
||||||
for i, emotion in enumerate(current_emotions):
|
for i, emotion in enumerate(current_emotions):
|
||||||
face_stats[i][emotion] += 1
|
face_stats[i][emotion] += 1
|
||||||
|
|
||||||
# If window duration has passed, process the collected stats
|
# If window duration has passed, process the collected stats
|
||||||
if time.time() >= next_window_time:
|
if time.time() >= next_window_time:
|
||||||
window_dominant_emotions = set()
|
window_dominant_emotions = set()
|
||||||
@@ -111,35 +115,36 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
if total_detections >= self.min_frames_required:
|
if total_detections >= self.min_frames_required:
|
||||||
dominant_emotion = counter.most_common(1)[0][0]
|
dominant_emotion = counter.most_common(1)[0][0]
|
||||||
window_dominant_emotions.add(dominant_emotion)
|
window_dominant_emotions.add(dominant_emotion)
|
||||||
|
|
||||||
await self.update_emotions(prev_dominant_emotions, window_dominant_emotions)
|
await self.update_emotions(prev_dominant_emotions, window_dominant_emotions)
|
||||||
prev_dominant_emotions = window_dominant_emotions
|
prev_dominant_emotions = window_dominant_emotions
|
||||||
face_stats.clear()
|
face_stats.clear()
|
||||||
next_window_time = time.time() + self.window_duration
|
next_window_time = time.time() + self.window_duration
|
||||||
|
|
||||||
except zmq.Again:
|
except zmq.Again:
|
||||||
self.logger.warning("No video frame received within timeout.")
|
self.logger.warning("No video frame received within timeout.")
|
||||||
|
|
||||||
async def update_emotions(self, prev_emotions: set[str], emotions: set[str]):
|
async def update_emotions(self, prev_emotions: set[str], emotions: set[str]):
|
||||||
"""
|
"""
|
||||||
Compare emotions from previous window and current emotions,
|
Compare emotions from previous window and current emotions,
|
||||||
send updates to BDI Core Agent.
|
send updates to BDI Core Agent.
|
||||||
"""
|
"""
|
||||||
emotions_to_remove = prev_emotions - emotions
|
emotions_to_remove = prev_emotions - emotions
|
||||||
emotions_to_add = emotions - prev_emotions
|
emotions_to_add = emotions - prev_emotions
|
||||||
|
|
||||||
if not emotions_to_add and not emotions_to_remove:
|
if not emotions_to_add and not emotions_to_remove:
|
||||||
return
|
return
|
||||||
|
|
||||||
emotion_beliefs_remove = []
|
emotion_beliefs_remove = []
|
||||||
for emotion in emotions_to_remove:
|
for emotion in emotions_to_remove:
|
||||||
self.logger.info(f"Emotion '{emotion}' has disappeared.")
|
self.logger.info(f"Emotion '{emotion}' has disappeared.")
|
||||||
try:
|
try:
|
||||||
emotion_beliefs_remove.append(Belief(name="emotion_detected", arguments=[emotion],
|
emotion_beliefs_remove.append(
|
||||||
remove=True))
|
Belief(name="emotion_detected", arguments=[emotion], remove=True)
|
||||||
|
)
|
||||||
except ValidationError:
|
except ValidationError:
|
||||||
self.logger.warning("Invalid belief for emotion removal: %s", emotion)
|
self.logger.warning("Invalid belief for emotion removal: %s", emotion)
|
||||||
|
|
||||||
emotion_beliefs_add = []
|
emotion_beliefs_add = []
|
||||||
for emotion in emotions_to_add:
|
for emotion in emotions_to_add:
|
||||||
self.logger.info(f"New emotion detected: '{emotion}'")
|
self.logger.info(f"New emotion detected: '{emotion}'")
|
||||||
@@ -147,7 +152,7 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
emotion_beliefs_add.append(Belief(name="emotion_detected", arguments=[emotion]))
|
emotion_beliefs_add.append(Belief(name="emotion_detected", arguments=[emotion]))
|
||||||
except ValidationError:
|
except ValidationError:
|
||||||
self.logger.warning("Invalid belief for new emotion: %s", emotion)
|
self.logger.warning("Invalid belief for new emotion: %s", emotion)
|
||||||
|
|
||||||
beliefs_list_add = [b.model_dump() for b in emotion_beliefs_add]
|
beliefs_list_add = [b.model_dump() for b in emotion_beliefs_add]
|
||||||
beliefs_list_remove = [b.model_dump() for b in emotion_beliefs_remove]
|
beliefs_list_remove = [b.model_dump() for b in emotion_beliefs_remove]
|
||||||
payload = {"create": beliefs_list_add, "delete": beliefs_list_remove}
|
payload = {"create": beliefs_list_add, "delete": beliefs_list_remove}
|
||||||
@@ -158,4 +163,4 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
body=json.dumps(payload),
|
body=json.dumps(payload),
|
||||||
thread="beliefs",
|
thread="beliefs",
|
||||||
)
|
)
|
||||||
await self.send(message)
|
await self.send(message)
|
||||||
|
|||||||
Reference in New Issue
Block a user