# -*- coding: utf-8 -*- from __future__ import unicode_literals import json import mock import pytest from robot_interface.endpoints.face_detector import FaceDetectionSender from robot_interface.state import state @pytest.fixture(autouse=True) def initialized_state(monkeypatch): """ Fully initialize global state so __getattribute__ allows access. """ # Bypass the initialization guard monkeypatch.setattr(state, "is_initialized", True, raising=False) # Install a controllable exit_event exit_event = mock.Mock() exit_event.is_set = mock.Mock(return_value=True) monkeypatch.setattr(state, "exit_event", exit_event, raising=False) # Default qi_session is None unless overridden monkeypatch.setattr(state, "qi_session", None, raising=False) yield def test_start_face_detection_no_qi_session(): """ Returns early when qi_session is None. """ sender = FaceDetectionSender(mock.Mock()) sender.start_face_detection() assert sender._face_thread is None assert sender._face_service is None assert sender._memory_service is None def test_start_face_detection_happy_path(mocker): """ Initializes services and starts background thread. """ mock_face = mock.Mock() mock_memory = mock.Mock() mock_qi = mock.Mock() mock_qi.service.side_effect = lambda name: { "ALFaceDetection": mock_face, "ALMemory": mock_memory, }[name] state.qi_session = mock_qi fake_thread = mock.Mock() mocker.patch("threading.Thread", return_value=fake_thread) sender = FaceDetectionSender(mock.Mock()) sender.start_face_detection() mock_face.setTrackingEnabled.assert_called_with(False) mock_face.setRecognitionEnabled.assert_called_with(False) mock_face.subscribe.assert_called_once() fake_thread.start.assert_called_once() def test_face_loop_face_detected_true(mocker): """ Sends face_detected=True when face data exists. """ sender = FaceDetectionSender(mock.Mock()) sender._memory_service = mock.Mock() sender._memory_service.getData.return_value = [0, [[1]]] sender.socket = mock.Mock() mocker.patch("time.sleep") state.exit_event.is_set.side_effect = [False, True] sender._face_loop() sent = sender.socket.send.call_args[0][0] payload = json.loads(sent.decode("utf-8")) assert payload["face_detected"] is True def test_face_loop_face_detected_false(mocker): """ Sends face_detected=False when no face data exists. """ sender = FaceDetectionSender(mock.Mock()) sender._memory_service = mock.Mock() sender._memory_service.getData.return_value = [] sender.socket = mock.Mock() mocker.patch("time.sleep") state.exit_event.is_set.side_effect = [False, True] sender._face_loop() sent = sender.socket.send.call_args[0][0] payload = json.loads(sent.decode("utf-8")) assert not payload["face_detected"] def test_face_loop_handles_exception(mocker): """ Exceptions inside loop are swallowed. """ sender = FaceDetectionSender(mock.Mock()) sender._memory_service = mock.Mock() sender._memory_service.getData.side_effect = Exception("boom") sender.socket = mock.Mock() mocker.patch("time.sleep") state.exit_event.is_set.side_effect = [False, True] # Must not raise sender._face_loop() def test_stop_face_detection_happy_path(): """ Unsubscribes and disables tracking. """ sender = FaceDetectionSender(mock.Mock()) mock_face = mock.Mock() sender._face_service = mock_face sender.stop_face_detection() mock_face.unsubscribe.assert_called_once() mock_face.setTrackingEnabled.assert_called_with(False) def test_stop_face_detection_exception(): """ stop_face_detection swallows service exceptions. """ sender = FaceDetectionSender(mock.Mock()) mock_face = mock.Mock() mock_face.unsubscribe.side_effect = Exception("fail") sender._face_service = mock_face sender.stop_face_detection() def test_close_calls_stop_face_detection(mocker): """ close() calls parent close and stop_face_detection(). """ sender = FaceDetectionSender(mock.Mock()) mocker.patch.object(sender, "stop_face_detection") mocker.patch( "robot_interface.endpoints.face_detector.SocketBase.close" ) sender.close() sender.stop_face_detection.assert_called_once()