From 19b7efec05e60f92d5678518075ef030babf180a Mon Sep 17 00:00:00 2001 From: Storm Date: Thu, 22 Jan 2026 11:36:34 +0100 Subject: [PATCH] chore: implemented test video function If there is no qi session, the webcam of the device is used to send video. --- requirements.txt | 1 + src/robot_interface/endpoints/video_sender.py | 31 ++++- test/unit/test_video_sender.py | 117 ++++++++++++++++++ 3 files changed, 148 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index bc679f4..eb896d5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ sphinx sphinx_rtd_theme pre-commit python-dotenv +opencv-python==4.1.2.30 \ No newline at end of file diff --git a/src/robot_interface/endpoints/video_sender.py b/src/robot_interface/endpoints/video_sender.py index d822352..d935fca 100644 --- a/src/robot_interface/endpoints/video_sender.py +++ b/src/robot_interface/endpoints/video_sender.py @@ -1,6 +1,7 @@ import zmq import threading import logging +import cv2 from robot_interface.endpoints.socket_base import SocketBase from robot_interface.state import state @@ -28,7 +29,9 @@ class VideoSender(SocketBase): Will not start if no qi session is available. """ if not state.qi_session: - logging.info("No Qi session available. Not starting video loop.") + logging.info("No Qi session available. Starting video from webcam.") + thread = threading.Thread(target=self.test_video_stream) + thread.start() return video = state.qi_session.service("ALVideoDevice") @@ -59,3 +62,29 @@ class VideoSender(SocketBase): self.socket.send(img[settings.video_config.image_buffer]) except: logging.warn("Failed to retrieve video image from robot.") + + def test_video_stream(self): + """ + Test function to send video from local webcam instead of the robot. + """ + cap = cv2.VideoCapture(0) + if not cap.isOpened(): + logging.error("Could not open webcam for video stream test.") + return + + while not state.exit_event.is_set(): + + ret, frame = cap.read() + if not ret: + logging.warning("Failed to read frame from webcam.") + continue + + cv2.waitKey(1) + + small_frame = cv2.resize(frame, (320, 240), interpolation=cv2.INTER_AREA) + encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 70] + _, buffer = cv2.imencode('.jpg', small_frame, encode_param) + + self.socket.send(buffer.tobytes()) + + cap.release() diff --git a/test/unit/test_video_sender.py b/test/unit/test_video_sender.py index 430f658..928b503 100644 --- a/test/unit/test_video_sender.py +++ b/test/unit/test_video_sender.py @@ -97,3 +97,120 @@ def test_video_receive_error(zmq_context, mocker): sender.video_rcv_loop(mock_video_service, "stream_name") send_socket.assert_not_called() + +def test_video_stream_camera_fail(zmq_context, mocker): + """ + Test that the function logs an error and returns early if + the webcam cannot be opened. + """ + _patch_basics(mocker) + + # Mock cv2 and logging + mock_cv2 = mocker.patch("robot_interface.endpoints.video_sender.cv2") + mock_logging = mocker.patch("robot_interface.endpoints.video_sender.logging") + + # Setup the mock capture to fail isOpened() + mock_cap = mock.Mock() + mock_cap.isOpened.return_value = False + mock_cv2.VideoCapture.return_value = mock_cap + + sender = VideoSender(zmq_context) + sender.test_video_stream() + + # Assertions + mock_cv2.VideoCapture.assert_called_with(0) + + # Ensure the loop was never entered and cleanup didn't happen + assert not mock_cap.read.called + assert not mock_cap.release.called + + +def test_video_stream_read_fail(zmq_context, mocker): + """ + Test that the function logs a warning and continues the loop + if a specific frame fails to read. + """ + _patch_basics(mocker) + _patch_exit_event(mocker) # Run loop exactly once + + mock_cv2 = mocker.patch("robot_interface.endpoints.video_sender.cv2") + mock_logging = mocker.patch("robot_interface.endpoints.video_sender.logging") + + # Setup capture to open successfully, but fail the read() + mock_cap = mock.Mock() + mock_cap.isOpened.return_value = True + # Return (False, None) simulating a failed frame read + mock_cap.read.return_value = (False, None) + mock_cv2.VideoCapture.return_value = mock_cap + + sender = VideoSender(zmq_context) + # Mock the socket to ensure nothing is sent + sender.socket = mock.Mock() + + sender.test_video_stream() + + # Ensure we skipped the processing steps + assert not mock_cv2.resize.called + assert not sender.socket.send.called + + # Ensure cleanup happened at the end + mock_cap.release.assert_called_once() + + +def test_video_stream_success(zmq_context, mocker): + """ + Test the happy path: Frame read -> Resize -> Encode -> Send. + """ + _patch_basics(mocker) + _patch_exit_event(mocker) # Run loop exactly once + + mock_cv2 = mocker.patch("robot_interface.endpoints.video_sender.cv2") + + # Setup constants usually found in cv2 + mock_cv2.IMWRITE_JPEG_QUALITY = 1 + mock_cv2.INTER_AREA = 2 + + # Setup capture to work perfectly + mock_cap = mock.Mock() + mock_cap.isOpened.return_value = True + fake_frame = "original_frame_data" + mock_cap.read.return_value = (True, fake_frame) + mock_cv2.VideoCapture.return_value = mock_cap + + # Setup Resize and Encode + mock_cv2.resize.return_value = "small_frame_data" + + # Mock buffer behavior + mock_buffer = mock.Mock() + mock_buffer.tobytes.return_value = b"encoded_bytes" + # imencode returns (retval, buffer) + mock_cv2.imencode.return_value = (True, mock_buffer) + + sender = VideoSender(zmq_context) + sender.socket = mock.Mock() + + sender.test_video_stream() + + # Assertions + # 1. Check waitKey (the 1ms delay) + mock_cv2.waitKey.assert_called_with(1) + + # 2. Check Resize logic + mock_cv2.resize.assert_called_with( + fake_frame, + (320, 240), + interpolation=mock_cv2.INTER_AREA + ) + + # 3. Check Encode logic + mock_cv2.imencode.assert_called_with( + '.jpg', + "small_frame_data", + [mock_cv2.IMWRITE_JPEG_QUALITY, 70] + ) + + # 4. Check Socket Send + sender.socket.send.assert_called_with(b"encoded_bytes") + + # 5. Check Cleanup + mock_cap.release.assert_called_once() \ No newline at end of file