# -*- coding: utf-8 -*- """ This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences) """ import struct import mock import pytest import zmq from robot_interface.endpoints.video_sender import VideoSender from robot_interface.state import state from robot_interface.core.config import settings @pytest.fixture def zmq_context(): """Provide a ZMQ context.""" yield zmq.Context() def _patch_basics(mocker): """Common patches: prevent real threads, port binds, and state errors.""" mocker.patch("robot_interface.endpoints.socket_base.zmq.Socket.bind") mocker.patch("robot_interface.endpoints.video_sender.threading.Thread") mocker.patch.object(state, "is_initialized", True) def _patch_exit_event(mocker): """Make exit_event stop the loop after one iteration.""" fake_event = mock.Mock() fake_event.is_set.side_effect = [False, True] mocker.patch.object(state, "exit_event", fake_event) def test_no_qi_session(zmq_context, mocker): """Video loop should not start without a qi_session.""" _patch_basics(mocker) mocker.patch.object(state, "qi_session", None) sender = VideoSender(zmq_context) sender.start_video_rcv() assert not hasattr(sender, "thread") def test_video_streaming(zmq_context, mocker): """VideoSender should send retrieved image data.""" _patch_basics(mocker) _patch_exit_event(mocker) # Pepper's image buffer lives at index 6 mocker.patch.object(settings.video_config, "image_buffer", 6) test_width = 320 test_height = 240 mock_video_service = mock.Mock() mock_video_service.getImageRemote.return_value = [test_width, test_height, None, None, None, None, b"fake_img"] fake_session = mock.Mock() fake_session.service.return_value = mock_video_service mocker.patch.object(state, "qi_session", fake_session) mocker.patch.object( fake_session.service("ALVideoDevice"), "subscribeCamera", return_value="stream_name" ) sender = VideoSender(zmq_context) send_socket = mock.Mock() sender.socket.send_multipart = send_socket sender.start_video_rcv() sender.video_rcv_loop(mock_video_service, "stream_name") send_socket.assert_called_with([ struct.pack('