176 lines
4.6 KiB
Python
176 lines
4.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
This program has been developed by students from the bachelor Computer Science at Utrecht
|
|
University within the Software Project course.
|
|
© Copyright Utrecht University (Department of Information and Computing Sciences)
|
|
"""
|
|
from __future__ import unicode_literals
|
|
|
|
import json
|
|
import mock
|
|
import pytest
|
|
|
|
from robot_interface.endpoints.face_detector import FaceDetectionSender
|
|
from robot_interface.state import state
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def initialized_state(monkeypatch):
|
|
"""
|
|
Fully initialize global state so __getattribute__ allows access.
|
|
"""
|
|
# Bypass the initialization guard
|
|
monkeypatch.setattr(state, "is_initialized", True, raising=False)
|
|
|
|
# Install a controllable exit_event
|
|
exit_event = mock.Mock()
|
|
exit_event.is_set = mock.Mock(return_value=True)
|
|
monkeypatch.setattr(state, "exit_event", exit_event, raising=False)
|
|
|
|
# Default qi_session is None unless overridden
|
|
monkeypatch.setattr(state, "qi_session", None, raising=False)
|
|
|
|
yield
|
|
|
|
|
|
def test_start_face_detection_no_qi_session():
|
|
"""
|
|
Returns early when qi_session is None.
|
|
"""
|
|
sender = FaceDetectionSender(mock.Mock())
|
|
sender.start_face_detection()
|
|
|
|
assert sender._face_thread is None
|
|
assert sender._face_service is None
|
|
assert sender._memory_service is None
|
|
|
|
|
|
def test_start_face_detection_happy_path(mocker):
|
|
"""
|
|
Initializes services and starts background thread.
|
|
"""
|
|
mock_face = mock.Mock()
|
|
mock_memory = mock.Mock()
|
|
|
|
mock_qi = mock.Mock()
|
|
mock_qi.service.side_effect = lambda name: {
|
|
"ALFaceDetection": mock_face,
|
|
"ALMemory": mock_memory,
|
|
}[name]
|
|
|
|
state.qi_session = mock_qi
|
|
|
|
fake_thread = mock.Mock()
|
|
mocker.patch("threading.Thread", return_value=fake_thread)
|
|
|
|
sender = FaceDetectionSender(mock.Mock())
|
|
sender.start_face_detection()
|
|
|
|
mock_face.setTrackingEnabled.assert_called_with(False)
|
|
mock_face.setRecognitionEnabled.assert_called_with(False)
|
|
mock_face.subscribe.assert_called_once()
|
|
fake_thread.start.assert_called_once()
|
|
|
|
|
|
def test_face_loop_face_detected_true(mocker):
|
|
"""
|
|
Sends face_detected=True when face data exists.
|
|
"""
|
|
sender = FaceDetectionSender(mock.Mock())
|
|
|
|
sender._memory_service = mock.Mock()
|
|
sender._memory_service.getData.return_value = [0, [[1]]]
|
|
sender.socket = mock.Mock()
|
|
|
|
mocker.patch("time.sleep")
|
|
state.exit_event.is_set.side_effect = [False, True]
|
|
|
|
sender._face_loop()
|
|
|
|
sent = sender.socket.send.call_args[0][0]
|
|
payload = json.loads(sent.decode("utf-8"))
|
|
|
|
assert payload["face_detected"] is True
|
|
|
|
|
|
def test_face_loop_face_detected_false(mocker):
|
|
"""
|
|
Sends face_detected=False when no face data exists.
|
|
"""
|
|
sender = FaceDetectionSender(mock.Mock())
|
|
|
|
sender._memory_service = mock.Mock()
|
|
sender._memory_service.getData.return_value = []
|
|
sender.socket = mock.Mock()
|
|
|
|
mocker.patch("time.sleep")
|
|
state.exit_event.is_set.side_effect = [False, True]
|
|
|
|
sender._face_loop()
|
|
|
|
sent = sender.socket.send.call_args[0][0]
|
|
payload = json.loads(sent.decode("utf-8"))
|
|
|
|
assert not payload["face_detected"]
|
|
|
|
|
|
def test_face_loop_handles_exception(mocker):
|
|
"""
|
|
Exceptions inside loop are swallowed.
|
|
"""
|
|
sender = FaceDetectionSender(mock.Mock())
|
|
|
|
sender._memory_service = mock.Mock()
|
|
sender._memory_service.getData.side_effect = Exception("boom")
|
|
sender.socket = mock.Mock()
|
|
|
|
mocker.patch("time.sleep")
|
|
state.exit_event.is_set.side_effect = [False, True]
|
|
|
|
# Must not raise
|
|
sender._face_loop()
|
|
|
|
|
|
def test_stop_face_detection_happy_path():
|
|
"""
|
|
Unsubscribes and disables tracking.
|
|
"""
|
|
sender = FaceDetectionSender(mock.Mock())
|
|
|
|
mock_face = mock.Mock()
|
|
sender._face_service = mock_face
|
|
|
|
sender.stop_face_detection()
|
|
|
|
mock_face.unsubscribe.assert_called_once()
|
|
mock_face.setTrackingEnabled.assert_called_with(False)
|
|
|
|
|
|
def test_stop_face_detection_exception():
|
|
"""
|
|
stop_face_detection swallows service exceptions.
|
|
"""
|
|
sender = FaceDetectionSender(mock.Mock())
|
|
|
|
mock_face = mock.Mock()
|
|
mock_face.unsubscribe.side_effect = Exception("fail")
|
|
sender._face_service = mock_face
|
|
|
|
sender.stop_face_detection()
|
|
|
|
|
|
def test_close_calls_stop_face_detection(mocker):
|
|
"""
|
|
close() calls parent close and stop_face_detection().
|
|
"""
|
|
sender = FaceDetectionSender(mock.Mock())
|
|
|
|
mocker.patch.object(sender, "stop_face_detection")
|
|
mocker.patch(
|
|
"robot_interface.endpoints.face_detector.SocketBase.close"
|
|
)
|
|
|
|
sender.close()
|
|
|
|
sender.stop_face_detection.assert_called_once()
|