# coding=utf-8 import mock import pytest import zmq from robot_interface.endpoints.video_sender import VideoSender from robot_interface.state import state from robot_interface.core.config import settings @pytest.fixture def zmq_context(): """Provide a ZMQ context.""" yield zmq.Context() def _patch_basics(mocker): """Common patches: prevent real threads, port binds, and state errors.""" mocker.patch("robot_interface.endpoints.socket_base.zmq.Socket.bind") mocker.patch("robot_interface.endpoints.video_sender.threading.Thread") mocker.patch.object(state, "is_initialized", True) def _patch_exit_event(mocker): """Make exit_event stop the loop after one iteration.""" fake_event = mock.Mock() fake_event.is_set.side_effect = [False, True] mocker.patch.object(state, "exit_event", fake_event) def test_no_qi_session(zmq_context, mocker): """Video loop should not start without a qi_session.""" _patch_basics(mocker) mocker.patch.object(state, "qi_session", None) sender = VideoSender(zmq_context) sender.start_video_rcv() assert not hasattr(sender, "thread") def test_video_streaming(zmq_context, mocker): """VideoSender should send retrieved image data.""" _patch_basics(mocker) _patch_exit_event(mocker) # Pepper's image buffer lives at index 6 mocker.patch.object(settings.video_config, "image_buffer", 6) mock_video_service = mock.Mock() mock_video_service.getImageRemote.return_value = [None]*6 + ["fake_img"] fake_session = mock.Mock() fake_session.service.return_value = mock_video_service mocker.patch.object(state, "qi_session", fake_session) mocker.patch.object( fake_session.service("ALVideoDevice"), "subscribeCamera", return_value="stream_name" ) sender = VideoSender(zmq_context) send_socket = mock.Mock() sender.socket.send = send_socket sender.start_video_rcv() sender.video_rcv_loop(mock_video_service, "stream_name") send_socket.assert_called_with("fake_img") def test_video_receive_error(zmq_context, mocker): """Errors retrieving images should not call send().""" _patch_basics(mocker) _patch_exit_event(mocker) mock_video_service = mock.Mock() mock_video_service.getImageRemote.side_effect = Exception("boom") fake_session = mock.Mock() fake_session.service.return_value = mock_video_service mocker.patch.object(state, "qi_session", fake_session) mocker.patch.object( fake_session.service("ALVideoDevice"), "subscribeCamera", return_value="stream_name" ) sender = VideoSender(zmq_context) send_socket = mock.Mock() sender.socket.send = send_socket sender.start_video_rcv() sender.video_rcv_loop(mock_video_service, "stream_name") send_socket.assert_not_called() def test_video_stream_camera_fail(zmq_context, mocker): """ Test that the function logs an error and returns early if the webcam cannot be opened. """ _patch_basics(mocker) # Mock cv2 and logging mock_cv2 = mocker.patch("robot_interface.endpoints.video_sender.cv2") mock_logging = mocker.patch("robot_interface.endpoints.video_sender.logging") # Setup the mock capture to fail isOpened() mock_cap = mock.Mock() mock_cap.isOpened.return_value = False mock_cv2.VideoCapture.return_value = mock_cap sender = VideoSender(zmq_context) sender.test_video_stream() # Assertions mock_cv2.VideoCapture.assert_called_with(0) # Ensure the loop was never entered and cleanup didn't happen assert not mock_cap.read.called assert not mock_cap.release.called def test_video_stream_read_fail(zmq_context, mocker): """ Test that the function logs a warning and continues the loop if a specific frame fails to read. """ _patch_basics(mocker) _patch_exit_event(mocker) # Run loop exactly once mock_cv2 = mocker.patch("robot_interface.endpoints.video_sender.cv2") mock_logging = mocker.patch("robot_interface.endpoints.video_sender.logging") # Setup capture to open successfully, but fail the read() mock_cap = mock.Mock() mock_cap.isOpened.return_value = True # Return (False, None) simulating a failed frame read mock_cap.read.return_value = (False, None) mock_cv2.VideoCapture.return_value = mock_cap sender = VideoSender(zmq_context) # Mock the socket to ensure nothing is sent sender.socket = mock.Mock() sender.test_video_stream() # Ensure we skipped the processing steps assert not mock_cv2.resize.called assert not sender.socket.send.called # Ensure cleanup happened at the end mock_cap.release.assert_called_once() def test_video_stream_success(zmq_context, mocker): """ Test the happy path: Frame read -> Resize -> Encode -> Send. """ _patch_basics(mocker) _patch_exit_event(mocker) # Run loop exactly once mock_cv2 = mocker.patch("robot_interface.endpoints.video_sender.cv2") # Setup constants usually found in cv2 mock_cv2.IMWRITE_JPEG_QUALITY = 1 mock_cv2.INTER_AREA = 2 # Setup capture to work perfectly mock_cap = mock.Mock() mock_cap.isOpened.return_value = True fake_frame = "original_frame_data" mock_cap.read.return_value = (True, fake_frame) mock_cv2.VideoCapture.return_value = mock_cap # Setup Resize and Encode mock_cv2.resize.return_value = "small_frame_data" # Mock buffer behavior mock_buffer = mock.Mock() mock_buffer.tobytes.return_value = b"encoded_bytes" # imencode returns (retval, buffer) mock_cv2.imencode.return_value = (True, mock_buffer) sender = VideoSender(zmq_context) sender.socket = mock.Mock() sender.test_video_stream() # Assertions # 1. Check waitKey (the 1ms delay) mock_cv2.waitKey.assert_called_with(1) # 2. Check Resize logic mock_cv2.resize.assert_called_with( fake_frame, (320, 240), interpolation=mock_cv2.INTER_AREA ) # 3. Check Encode logic mock_cv2.imencode.assert_called_with( '.jpg', "small_frame_data", [mock_cv2.IMWRITE_JPEG_QUALITY, 70] ) # 4. Check Socket Send sender.socket.send.assert_called_with(b"encoded_bytes") # 5. Check Cleanup mock_cap.release.assert_called_once()