feat: (hopefully) face detection
Simplified implementation, relying on the already-present VED Agent. ref: N25B-395
This commit is contained in:
@@ -30,6 +30,7 @@ from control_backend.schemas.program import (
|
|||||||
BasicNorm,
|
BasicNorm,
|
||||||
ConditionalNorm,
|
ConditionalNorm,
|
||||||
EmotionBelief,
|
EmotionBelief,
|
||||||
|
FaceBelief,
|
||||||
GestureAction,
|
GestureAction,
|
||||||
Goal,
|
Goal,
|
||||||
InferredBelief,
|
InferredBelief,
|
||||||
@@ -687,6 +688,10 @@ class AgentSpeakGenerator:
|
|||||||
def _(self, eb: EmotionBelief) -> AstExpression:
|
def _(self, eb: EmotionBelief) -> AstExpression:
|
||||||
return AstLiteral("emotion_detected", [AstAtom(eb.emotion)])
|
return AstLiteral("emotion_detected", [AstAtom(eb.emotion)])
|
||||||
|
|
||||||
|
@_astify.register
|
||||||
|
def _(self, fb: FaceBelief) -> AstExpression:
|
||||||
|
return AstLiteral("face_present")
|
||||||
|
|
||||||
@_astify.register
|
@_astify.register
|
||||||
def _(self, ib: InferredBelief) -> AstExpression:
|
def _(self, ib: InferredBelief) -> AstExpression:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ from control_backend.agents.perception.visual_emotion_recognition_agent.visual_e
|
|||||||
)
|
)
|
||||||
from control_backend.core.agent_system import InternalMessage
|
from control_backend.core.agent_system import InternalMessage
|
||||||
from control_backend.core.config import settings
|
from control_backend.core.config import settings
|
||||||
from control_backend.schemas.belief_message import Belief
|
from control_backend.schemas.belief_message import Belief, BeliefMessage
|
||||||
|
|
||||||
|
|
||||||
class VisualEmotionRecognitionAgent(BaseAgent):
|
class VisualEmotionRecognitionAgent(BaseAgent):
|
||||||
@@ -44,6 +44,7 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
self.timeout_ms = timeout_ms
|
self.timeout_ms = timeout_ms
|
||||||
self.window_duration = window_duration
|
self.window_duration = window_duration
|
||||||
self.min_frames_required = min_frames_required
|
self.min_frames_required = min_frames_required
|
||||||
|
self._face_detected = False
|
||||||
|
|
||||||
# Pause functionality
|
# Pause functionality
|
||||||
# NOTE: flag is set when running, cleared when paused
|
# NOTE: flag is set when running, cleared when paused
|
||||||
@@ -97,8 +98,8 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
|
|
||||||
width, height, image_bytes = await self.video_in_socket.recv_multipart()
|
width, height, image_bytes = await self.video_in_socket.recv_multipart()
|
||||||
|
|
||||||
width = int.from_bytes(width, 'little')
|
width = int.from_bytes(width, "little")
|
||||||
height = int.from_bytes(height, 'little')
|
height = int.from_bytes(height, "little")
|
||||||
|
|
||||||
# Convert bytes to a numpy buffer
|
# Convert bytes to a numpy buffer
|
||||||
image_array = np.frombuffer(image_bytes, np.uint8)
|
image_array = np.frombuffer(image_bytes, np.uint8)
|
||||||
@@ -107,6 +108,15 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
|
|
||||||
# Get the dominant emotion from each face
|
# Get the dominant emotion from each face
|
||||||
current_emotions = self.emotion_recognizer.sorted_dominant_emotions(frame)
|
current_emotions = self.emotion_recognizer.sorted_dominant_emotions(frame)
|
||||||
|
|
||||||
|
# Form (or unform) face_detected belief
|
||||||
|
if len(current_emotions) == 0 and self._face_detected:
|
||||||
|
self._face_detected = False
|
||||||
|
await self._inform_face_detected()
|
||||||
|
elif len(current_emotions) > 0 and not self._face_detected:
|
||||||
|
self._face_detected = True
|
||||||
|
await self._inform_face_detected()
|
||||||
|
|
||||||
# Update emotion counts for each detected face
|
# Update emotion counts for each detected face
|
||||||
for i, emotion in enumerate(current_emotions):
|
for i, emotion in enumerate(current_emotions):
|
||||||
face_stats[i][emotion] += 1
|
face_stats[i][emotion] += 1
|
||||||
@@ -133,7 +143,6 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"Error in emotion recognition loop: {e}")
|
self.logger.error(f"Error in emotion recognition loop: {e}")
|
||||||
|
|
||||||
|
|
||||||
async def update_emotions(self, prev_emotions: set[str], emotions: set[str]):
|
async def update_emotions(self, prev_emotions: set[str], emotions: set[str]):
|
||||||
"""
|
"""
|
||||||
Compare emotions from previous window and current emotions,
|
Compare emotions from previous window and current emotions,
|
||||||
@@ -149,9 +158,7 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
for emotion in emotions_to_remove:
|
for emotion in emotions_to_remove:
|
||||||
self.logger.info(f"Emotion '{emotion}' has disappeared.")
|
self.logger.info(f"Emotion '{emotion}' has disappeared.")
|
||||||
try:
|
try:
|
||||||
emotion_beliefs_remove.append(
|
emotion_beliefs_remove.append(Belief(name="emotion_detected", arguments=[emotion]))
|
||||||
Belief(name="emotion_detected", arguments=[emotion], remove=True)
|
|
||||||
)
|
|
||||||
except ValidationError:
|
except ValidationError:
|
||||||
self.logger.warning("Invalid belief for emotion removal: %s", emotion)
|
self.logger.warning("Invalid belief for emotion removal: %s", emotion)
|
||||||
|
|
||||||
@@ -175,6 +182,20 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
)
|
)
|
||||||
await self.send(message)
|
await self.send(message)
|
||||||
|
|
||||||
|
async def _inform_face_detected(self):
|
||||||
|
if self._face_detected:
|
||||||
|
belief_message = BeliefMessage(create=[Belief(name="face_present")])
|
||||||
|
else:
|
||||||
|
belief_message = BeliefMessage(delete=[Belief(name="face_present")])
|
||||||
|
|
||||||
|
msg = InternalMessage(
|
||||||
|
to=settings.agent_settings.bdi_core_name,
|
||||||
|
thread="beliefs",
|
||||||
|
body=belief_message.model_dump_json(),
|
||||||
|
)
|
||||||
|
|
||||||
|
await self.send(msg)
|
||||||
|
|
||||||
async def handle_message(self, msg: InternalMessage):
|
async def handle_message(self, msg: InternalMessage):
|
||||||
"""
|
"""
|
||||||
Handle incoming messages.
|
Handle incoming messages.
|
||||||
@@ -204,4 +225,3 @@ class VisualEmotionRecognitionAgent(BaseAgent):
|
|||||||
"""
|
"""
|
||||||
self.video_in_socket.close()
|
self.video_in_socket.close()
|
||||||
await super().stop()
|
await super().stop()
|
||||||
|
|
||||||
@@ -41,8 +41,8 @@ class LogicalOperator(Enum):
|
|||||||
OR = "OR"
|
OR = "OR"
|
||||||
|
|
||||||
|
|
||||||
type Belief = KeywordBelief | SemanticBelief | InferredBelief | EmotionBelief
|
type Belief = KeywordBelief | SemanticBelief | InferredBelief | EmotionBelief | FaceBelief
|
||||||
type BasicBelief = KeywordBelief | SemanticBelief | EmotionBelief
|
type BasicBelief = KeywordBelief | SemanticBelief | EmotionBelief | FaceBelief
|
||||||
|
|
||||||
|
|
||||||
class KeywordBelief(ProgramElement):
|
class KeywordBelief(ProgramElement):
|
||||||
@@ -105,6 +105,7 @@ class InferredBelief(ProgramElement):
|
|||||||
left: Belief
|
left: Belief
|
||||||
right: Belief
|
right: Belief
|
||||||
|
|
||||||
|
|
||||||
class EmotionBelief(ProgramElement):
|
class EmotionBelief(ProgramElement):
|
||||||
"""
|
"""
|
||||||
Represents a belief that is set when a certain emotion is detected.
|
Represents a belief that is set when a certain emotion is detected.
|
||||||
@@ -115,6 +116,16 @@ class EmotionBelief(ProgramElement):
|
|||||||
name: str = ""
|
name: str = ""
|
||||||
emotion: str
|
emotion: str
|
||||||
|
|
||||||
|
|
||||||
|
class FaceBelief(ProgramElement):
|
||||||
|
"""
|
||||||
|
Represents the belief that at least one face is currently in view.
|
||||||
|
"""
|
||||||
|
|
||||||
|
name: str = ""
|
||||||
|
face_present: bool
|
||||||
|
|
||||||
|
|
||||||
class Norm(ProgramElement):
|
class Norm(ProgramElement):
|
||||||
"""
|
"""
|
||||||
Base class for behavioral norms that guide the robot's interactions.
|
Base class for behavioral norms that guide the robot's interactions.
|
||||||
|
|||||||
Reference in New Issue
Block a user