# coding=utf-8 import mock import pytest import zmq from robot_interface.endpoints.video_sender import VideoSender @pytest.fixture def zmq_context(): """ Yields a real ZMQ context for socket creation. """ context = zmq.Context() yield context context.term() def test_init_defaults(zmq_context, mocker): """ Test initialization of the VideoSender. """ # We patch settings to ensure valid port access inside the class logic, # although the default arg is evaluated at import time. mocker.patch("robot_interface.endpoints.video_sender.settings") mock_zmq = mock.Mock() sender = VideoSender(mock_zmq) # Verify socket type is PUB assert sender.identifier == "video" def test_start_no_qi_session(mocker): """ Test that the loop does not start if no Qi session is available. """ # Mock state to return None for qi_session mock_state = mocker.patch("robot_interface.endpoints.video_sender.state") mock_state.qi_session = None mock_threading = mocker.patch("robot_interface.endpoints.video_sender.threading") mock_zmq = mock.Mock() sender = VideoSender(mock_zmq) sender.start_video_rcv() # Assertions mock_threading.Thread.assert_not_called() def test_start_success(mocker): """ Test successful startup of the video receiver thread. """ # Mock the Qi Session and Service mock_state = mocker.patch("robot_interface.endpoints.video_sender.state") mock_session = mock.Mock() mock_state.qi_session = mock_session mock_video_service = mock.Mock() mock_session.service.return_value = mock_video_service mock_video_service.subscribeCamera.return_value = "test_subscriber_id" # Mock Settings mock_settings = mocker.patch("robot_interface.endpoints.video_sender.settings") mock_settings.video_config.camera_index = 0 mock_settings.video_config.resolution = 2 mock_settings.video_config.color_space = 11 mock_settings.video_config.fps = 30 mock_settings.video_config.stream_name = "test_stream" mock_threading = mocker.patch("robot_interface.endpoints.video_sender.threading") # Run mock_zmq = mock.Mock() sender = VideoSender(mock_zmq) sender.start_video_rcv() # Assertions mock_session.service.assert_called_with("ALVideoDevice") mock_video_service.subscribeCamera.assert_called_with("test_stream", 0, 2, 11, 30) mock_threading.Thread.assert_called_once() # Verify arguments passed to the thread target call_args = mock_threading.Thread.call_args[1] assert call_args["target"] == sender.video_rcv_loop assert call_args["args"] == (mock_video_service, "test_subscriber_id") # Ensure thread was started mock_threading.Thread.return_value.start.assert_called_once() def test_video_loop_happy_path(mocker): """ Test the main loop: Wait -> Get Image -> Send -> Repeat/Exit. """ # Mock settings for image buffer index mock_settings = mocker.patch("robot_interface.endpoints.video_sender.settings") mock_settings.video_config.image_buffer = 6 # Mock Video Service to return a fake image structure # Standard NaoQi image is a list, binary data is usually at index 6 fake_image_data = b"binary_jpeg_data" fake_image_list = [0] * 7 fake_image_list[6] = fake_image_data mock_service = mock.Mock() mock_service.getImageRemote.return_value = fake_image_list # Mock Events: # exit_event: False (start), False (loop once), True (break) mock_state = mocker.patch("robot_interface.endpoints.video_sender.state") mock_state.exit_event.is_set.side_effect = [False, False, True] # Run mock_zmq = mock.Mock() sender = VideoSender(mock_zmq) sender.socket = mock.Mock() # Mock the socket to verify send sender.video_rcv_loop(mock_service, "sub_id") # Assertions mock_state.active_event.wait.assert_called() mock_service.getImageRemote.assert_called_with("sub_id") sender.socket.send.assert_called_with(fake_image_data) def test_video_loop_exit_during_wait(zmq_context, mocker): """ Test that the loop breaks immediately if exit_event is set while waiting. """ mock_service = mock.Mock() mock_state = mocker.patch("robot_interface.endpoints.video_sender.state") # 1. Loop check: False (enter loop) # 2. Wait happens (mock returns instantly) # 3. Post-wait check: True (break) mock_state.exit_event.is_set.side_effect = [False, True] mock_zqm = mock.Mock() sender = VideoSender(mock_zqm) sender.video_rcv_loop(mock_service, "sub_id") # Assert we never tried to get an image mock_service.getImageRemote.assert_not_called() def test_video_loop_exception_handling(zmq_context, mocker): """ Test that exceptions during image retrieval are caught and logged, and do not crash the thread. """ mock_settings = mocker.patch("robot_interface.endpoints.video_sender.settings") mock_service = mock.Mock() # First call raises Exception, Second call works (if we allowed it, but we exit) mock_service.getImageRemote.side_effect = Exception("Camera disconnected") mock_state = mocker.patch("robot_interface.endpoints.video_sender.state") # Loop runs once then exits mock_state.exit_event.is_set.side_effect = [False, False, True] mock_zmq = mock.Mock() sender = VideoSender(mock_zmq) sender.socket = mock.Mock() sender.video_rcv_loop(mock_service, "sub_id") # Assertions # Ensure loop didn't crash; it should have completed the iteration and checked exit_event assert mock_state.exit_event.is_set.call_count >= 2