Merge branch 'feat/visual-emotion-recognition' into demo

This commit is contained in:
Storm
2026-01-20 12:48:27 +01:00
8 changed files with 1123 additions and 32 deletions

View File

@@ -7,6 +7,7 @@ requires-python = ">=3.13"
dependencies = [
"agentspeak>=0.2.2",
"colorlog>=6.10.1",
"deepface>=0.0.96",
"fastapi[all]>=0.115.6",
"mlx-whisper>=0.4.3 ; sys_platform == 'darwin'",
"numpy>=2.3.3",
@@ -21,6 +22,7 @@ dependencies = [
"silero-vad>=6.0.0",
"sphinx>=7.3.7",
"sphinx-rtd-theme>=3.0.2",
"tf-keras>=2.20.1",
"torch>=2.8.0",
"uvicorn>=0.37.0",
]

View File

@@ -22,6 +22,7 @@ from control_backend.schemas.program import (
BaseGoal,
BasicNorm,
ConditionalNorm,
EmotionBelief,
GestureAction,
Goal,
InferredBelief,
@@ -459,6 +460,10 @@ class AgentSpeakGenerator:
@_astify.register
def _(self, sb: SemanticBelief) -> AstExpression:
return AstLiteral(self.slugify(sb))
@_astify.register
def _(self, eb: EmotionBelief) -> AstExpression:
return AstLiteral("emotion_detected", [AstAtom(eb.emotion)])
@_astify.register
def _(self, ib: InferredBelief) -> AstExpression:

View File

@@ -8,6 +8,9 @@ from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
from control_backend.agents.perception.visual_emotion_recognition_agent.visual_emotion_recognition_agent import ( # noqa
VisualEmotionRecognitionAgent,
)
from control_backend.core.config import settings
from control_backend.schemas.internal_message import InternalMessage
from control_backend.schemas.ri_message import PauseCommand
@@ -52,6 +55,7 @@ class RICommunicationAgent(BaseAgent):
self.connected = False
self.gesture_agent: RobotGestureAgent | None = None
self.speech_agent: RobotSpeechAgent | None = None
self.visual_emotion_recognition_agent: VisualEmotionRecognitionAgent | None = None
async def setup(self):
"""
@@ -209,6 +213,14 @@ class RICommunicationAgent(BaseAgent):
case "audio":
vad_agent = VADAgent(audio_in_address=addr, audio_in_bind=bind)
await vad_agent.start()
case "video":
visual_emotion_agent = VisualEmotionRecognitionAgent(
settings.agent_settings.visual_emotion_recognition_name,
socket_address=addr,
bind=bind,
)
self.visual_emotion_recognition_agent = visual_emotion_agent
await visual_emotion_agent.start()
case _:
self.logger.warning("Unhandled negotiation id: %s", id)
@@ -313,6 +325,9 @@ class RICommunicationAgent(BaseAgent):
if self.speech_agent is not None:
await self.speech_agent.stop()
if self.visual_emotion_recognition_agent is not None:
await self.visual_emotion_recognition_agent.stop()
if self.pub_socket is not None:
self.pub_socket.close()
@@ -322,6 +337,7 @@ class RICommunicationAgent(BaseAgent):
self.connected = True
async def handle_message(self, msg: InternalMessage):
return
try:
pause_command = PauseCommand.model_validate_json(msg.body)
await self._req_socket.send_json(pause_command.model_dump())

View File

@@ -0,0 +1,169 @@
import json
import time
from collections import Counter, defaultdict
import cv2
import numpy as np
import zmq
import zmq.asyncio as azmq
from pydantic_core import ValidationError
import struct
from control_backend.agents import BaseAgent
from control_backend.agents.perception.visual_emotion_recognition_agent.visual_emotion_recognizer import ( # noqa
DeepFaceEmotionRecognizer,
)
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief
class VisualEmotionRecognitionAgent(BaseAgent):
def __init__(
self,
name: str,
socket_address: str,
bind: bool = False,
timeout_ms: int = 1000,
window_duration: int = settings.behaviour_settings.visual_emotion_recognition_window_duration_s, # noqa
min_frames_required: int = settings.behaviour_settings.visual_emotion_recognition_min_frames_per_face, # noqa
):
"""
Initialize the Visual Emotion Recognition Agent.
:param name: Name of the agent
:param socket_address: Address of the socket to connect or bind to
:param bind: Whether to bind to the socket address (True) or connect (False)
:param timeout_ms: Timeout for socket receive operations in milliseconds
:param window_duration: Duration in seconds over which to aggregate emotions
:param min_frames_required: Minimum number of frames per face required to consider a face
valid
"""
super().__init__(name)
self.socket_address = socket_address
self.socket_bind = bind
self.timeout_ms = timeout_ms
self.window_duration = window_duration
self.min_frames_required = min_frames_required
async def setup(self):
"""
Initialize the agent resources.
1. Initializes the :class:`VisualEmotionRecognizer`.
2. Connects to the video input ZMQ socket.
3. Starts the background emotion recognition loop.
"""
self.logger.info("Setting up %s.", self.name)
self.emotion_recognizer = DeepFaceEmotionRecognizer()
self.video_in_socket = azmq.Context.instance().socket(zmq.SUB)
if self.socket_bind:
self.video_in_socket.bind(self.socket_address)
else:
self.video_in_socket.connect(self.socket_address)
self.video_in_socket.setsockopt_string(zmq.SUBSCRIBE, "")
self.video_in_socket.setsockopt(zmq.RCVTIMEO, self.timeout_ms)
self.video_in_socket.setsockopt(zmq.CONFLATE, 1)
self.add_behavior(self.emotion_update_loop())
async def emotion_update_loop(self):
"""
Background loop to receive video frames, recognize emotions, and update beliefs.
1. Receives video frames from the ZMQ socket.
2. Uses the :class:`VisualEmotionRecognizer` to detect emotions.
3. Aggregates emotions over a time window.
4. Sends updates to the BDI Core Agent about detected emotions.
"""
# Next time to process the window and update emotions
next_window_time = time.time() + self.window_duration
# Tracks counts of detected emotions per face index
face_stats = defaultdict(Counter)
prev_dominant_emotions = set()
while self._running:
try:
frame_bytes = await self.video_in_socket.recv()
# Convert bytes to a numpy buffer
nparr = np.frombuffer(frame_bytes, np.uint8)
# Decode image into the generic Numpy Array DeepFace expects
frame_image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame_image is None:
# Could not decode image, skip this frame
self.logger.warning("Received invalid video frame, skipping.")
continue
# Get the dominant emotion from each face
current_emotions = self.emotion_recognizer.sorted_dominant_emotions(frame_image)
# Update emotion counts for each detected face
for i, emotion in enumerate(current_emotions):
face_stats[i][emotion] += 1
# If window duration has passed, process the collected stats
if time.time() >= next_window_time:
window_dominant_emotions = set()
# Determine dominant emotion for each face in the window
for _, counter in face_stats.items():
total_detections = sum(counter.values())
if total_detections >= self.min_frames_required:
dominant_emotion = counter.most_common(1)[0][0]
window_dominant_emotions.add(dominant_emotion)
await self.update_emotions(prev_dominant_emotions, window_dominant_emotions)
prev_dominant_emotions = window_dominant_emotions
face_stats.clear()
next_window_time = time.time() + self.window_duration
except zmq.Again:
self.logger.warning("No video frame received within timeout.")
async def update_emotions(self, prev_emotions: set[str], emotions: set[str]):
"""
Compare emotions from previous window and current emotions,
send updates to BDI Core Agent.
"""
emotions_to_remove = prev_emotions - emotions
emotions_to_add = emotions - prev_emotions
if not emotions_to_add and not emotions_to_remove:
return
emotion_beliefs_remove = []
for emotion in emotions_to_remove:
self.logger.info(f"Emotion '{emotion}' has disappeared.")
try:
emotion_beliefs_remove.append(
Belief(name="emotion_detected", arguments=[emotion], remove=True)
)
except ValidationError:
self.logger.warning("Invalid belief for emotion removal: %s", emotion)
emotion_beliefs_add = []
for emotion in emotions_to_add:
self.logger.info(f"New emotion detected: '{emotion}'")
try:
emotion_beliefs_add.append(Belief(name="emotion_detected", arguments=[emotion]))
except ValidationError:
self.logger.warning("Invalid belief for new emotion: %s", emotion)
beliefs_list_add = [b.model_dump() for b in emotion_beliefs_add]
beliefs_list_remove = [b.model_dump() for b in emotion_beliefs_remove]
payload = {"create": beliefs_list_add, "delete": beliefs_list_remove}
message = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=json.dumps(payload),
thread="beliefs",
)
await self.send(message)

View File

@@ -0,0 +1,55 @@
import abc
import numpy as np
from deepface import DeepFace
class VisualEmotionRecognizer(abc.ABC):
@abc.abstractmethod
def load_model(self):
"""Load the visual emotion recognition model into memory."""
pass
@abc.abstractmethod
def sorted_dominant_emotions(self, image) -> list[str]:
"""
Recognize dominant emotions from faces in the given image.
Emotions can be one of ['angry', 'disgust', 'fear', 'happy', 'sad', 'surprise', 'neutral'].
To minimize false positives, consider filtering faces with low confidence.
:param image: The input image for emotion recognition.
:return: List of dominant emotion detected for each face in the image,
sorted per face.
"""
pass
class DeepFaceEmotionRecognizer(VisualEmotionRecognizer):
"""
DeepFace-based implementation of VisualEmotionRecognizer.
DeepFape has proven to be quite a pessimistic model, so expect sad, fear and neutral
emotions to be over-represented.
"""
def __init__(self):
self.load_model()
def load_model(self):
print("Loading Deepface Emotion Model...")
dummy_img = np.zeros((224, 224, 3), dtype=np.uint8)
# analyze does not take a model as an argument, calling it once on a dummy image to load
# the model
DeepFace.analyze(dummy_img, actions=['emotion'], enforce_detection=False)
print("Deepface Emotion Model loaded.")
def sorted_dominant_emotions(self, image) -> list[str]:
analysis = DeepFace.analyze(image,
actions=['emotion'],
enforce_detection=False
)
# Sort faces by x coordinate to maintain left-to-right order
analysis.sort(key=lambda face: face['region']['x'])
analysis = [face for face in analysis if face['face_confidence'] >= 0.90]
dominant_emotions = [face['dominant_emotion'] for face in analysis]
return dominant_emotions

View File

@@ -50,6 +50,7 @@ class AgentSettings(BaseModel):
# agent names
bdi_core_name: str = "bdi_core_agent"
bdi_program_manager_name: str = "bdi_program_manager_agent"
visual_emotion_recognition_name: str = "visual_emotion_recognition_agent"
text_belief_extractor_name: str = "text_belief_extractor_agent"
vad_name: str = "vad_agent"
llm_name: str = "llm_agent"
@@ -77,6 +78,10 @@ class BehaviourSettings(BaseModel):
:ivar transcription_words_per_token: Estimated words per token for transcription timing.
:ivar transcription_token_buffer: Buffer for transcription tokens.
:ivar conversation_history_length_limit: The maximum amount of messages to extract beliefs from.
:ivar visual_emotion_recognition_window_duration_s: Duration in seconds over which to aggregate
emotions and update emotion beliefs.
:ivar visual_emotion_recognition_min_frames_per_face: Minimum number of frames per face required
to consider a face valid.
"""
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
@@ -100,6 +105,9 @@ class BehaviourSettings(BaseModel):
# Text belief extractor settings
conversation_history_length_limit: int = 10
# Visual Emotion Recognition settings
visual_emotion_recognition_window_duration_s: int = 5
visual_emotion_recognition_min_frames_per_face: int = 3
class LLMSettings(BaseModel):
"""

View File

@@ -28,8 +28,8 @@ class LogicalOperator(Enum):
OR = "OR"
type Belief = KeywordBelief | SemanticBelief | InferredBelief
type BasicBelief = KeywordBelief | SemanticBelief
type Belief = KeywordBelief | SemanticBelief | InferredBelief | EmotionBelief
type BasicBelief = KeywordBelief | SemanticBelief | EmotionBelief
class KeywordBelief(ProgramElement):
@@ -69,6 +69,15 @@ class InferredBelief(ProgramElement):
left: Belief
right: Belief
class EmotionBelief(ProgramElement):
"""
Represents a belief that is set when a certain emotion is detected.
:ivar emotion: The emotion on which this belief gets set.
"""
name: str = ""
emotion: str
class Norm(ProgramElement):
"""
@@ -226,3 +235,9 @@ class Program(BaseModel):
"""
phases: list[Phase]
if __name__ == "__main__":
input = input("Enter program JSON: ")
program = Program.model_validate_json(input)
print(program)

881
uv.lock generated

File diff suppressed because it is too large Load Diff