7 Commits

Author SHA1 Message Date
06e3dad25d Merge branch 'fix/send-video' into 'main'
fix: send video

See merge request ics/sp/2025/n25b/pepperplus-ri!28
2026-01-30 19:19:03 +00:00
Storm
fe8bad1f8c Merge branch 'main' into fix/send-video 2026-01-30 17:28:13 +01:00
Storm
8333f2fc2a chore: removed numpy import 2026-01-29 13:09:25 +01:00
Storm
24c7fa216f test: 100% coverage
ref: N25B-393
2026-01-29 12:28:34 +01:00
Storm
56becd84ac test: fixed video_sender tests
ref: N25B-393
2026-01-29 12:16:48 +01:00
Storm
4a2cace1cf chore: changed socket option to set HWM to 3 (max 3 packets in queue 2026-01-29 12:02:28 +01:00
Storm
891ebf5e3f chore: changed video sending to work without cv2 2026-01-27 17:58:06 +01:00
8 changed files with 119 additions and 443 deletions

View File

@@ -68,7 +68,7 @@ class VideoConfig(object):
): ):
self.camera_index = get_config(camera_index, "VIDEO__CAMERA_INDEX", 0, int) self.camera_index = get_config(camera_index, "VIDEO__CAMERA_INDEX", 0, int)
self.resolution = get_config(resolution, "VIDEO__RESOLUTION", 2, int) self.resolution = get_config(resolution, "VIDEO__RESOLUTION", 2, int)
self.color_space = get_config(color_space, "VIDEO__COLOR_SPACE", 11, int) self.color_space = get_config(color_space, "VIDEO__COLOR_SPACE", 13, int)
self.fps = get_config(fps, "VIDEO__FPS", 15, int) self.fps = get_config(fps, "VIDEO__FPS", 15, int)
self.stream_name = get_config(stream_name, "VIDEO__STREAM_NAME", "Pepper Video") self.stream_name = get_config(stream_name, "VIDEO__STREAM_NAME", "Pepper Video")
self.image_buffer = get_config(image_buffer, "VIDEO__IMAGE_BUFFER", 6, int) self.image_buffer = get_config(image_buffer, "VIDEO__IMAGE_BUFFER", 6, int)

View File

@@ -84,23 +84,16 @@ class AudioSender(SocketBase):
chunk = audio_settings.chunk_size # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD chunk = audio_settings.chunk_size # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD
# Docs say this only raises an error if neither `input` nor `output` is True # Docs say this only raises an error if neither `input` nor `output` is True
def open_stream(): stream = self.audio.open(
return self.audio.open( format=pyaudio.paFloat32,
format=pyaudio.paFloat32, channels=audio_settings.channels,
channels=audio_settings.channels, rate=audio_settings.sample_rate,
rate=audio_settings.sample_rate, input=True,
input=True, input_device_index=self.microphone["index"],
input_device_index=self.microphone["index"], frames_per_buffer=chunk,
frames_per_buffer=chunk, )
)
stream = None
try: try:
# Test in case exit_event was set while waiting
if not state.exit_event.is_set():
stream = open_stream()
while not state.exit_event.is_set(): while not state.exit_event.is_set():
data = stream.read(chunk) data = stream.read(chunk)
if (state.is_speaking): continue # Do not send audio while the robot is speaking if (state.is_speaking): continue # Do not send audio while the robot is speaking
@@ -108,9 +101,5 @@ class AudioSender(SocketBase):
except IOError as e: except IOError as e:
logger.error("Stopped listening: failed to get audio from microphone.", exc_info=e) logger.error("Stopped listening: failed to get audio from microphone.", exc_info=e)
finally: finally:
if stream: stream.stop_stream()
try: stream.close()
stream.stop_stream()
stream.close()
except IOError:
pass # Ignore errors on closing

View File

@@ -79,30 +79,6 @@ class MainReceiver(ReceiverBase):
return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."} return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."}
@staticmethod
def _handle_pause(message):
"""
Handle a pause request. Pauses or resumes the video and audio streams.
:param message: The pause request message.
:type message: dict
:return: A response dictionary indicating success.
:rtype: dict[str, str]
"""
if message.get("data"):
if state.active_event.is_set():
state.active_event.clear()
return {"endpoint": "pause", "data": "Streams paused."}
else:
return {"endpoint": "pause", "data": "Streams are already paused."}
else:
if not state.active_event.is_set():
state.active_event.set()
return {"endpoint": "pause", "data": "Streams resumed."}
else:
return {"endpoint": "pause", "data": "Streams are already running."}
def handle_message(self, message): def handle_message(self, message):
""" """
Main entry point for handling incoming messages. Main entry point for handling incoming messages.
@@ -119,7 +95,5 @@ class MainReceiver(ReceiverBase):
return self._handle_ping(message) return self._handle_ping(message)
elif message["endpoint"].startswith("negotiate"): elif message["endpoint"].startswith("negotiate"):
return self._handle_negotiation(message) return self._handle_negotiation(message)
elif message["endpoint"] == "pause":
return self._handle_pause(message)
return {"endpoint": "error", "data": "The requested endpoint is not supported."} return {"endpoint": "error", "data": "The requested endpoint is not supported."}

View File

@@ -4,13 +4,12 @@ This program has been developed by students from the bachelor Computer Science a
University within the Software Project course. University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences) © Copyright Utrecht University (Department of Information and Computing Sciences)
""" """
import struct
import zmq import zmq
import threading import threading
import logging import logging
import numpy as np
import cv2 import struct
from robot_interface.endpoints.socket_base import SocketBase from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state from robot_interface.state import state
@@ -39,9 +38,6 @@ class VideoSender(SocketBase):
""" """
if not state.qi_session: if not state.qi_session:
logging.info("No Qi session available. Not starting video loop.") logging.info("No Qi session available. Not starting video loop.")
logging.info("Starting test video stream from local webcam.")
thread = threading.Thread(target=self.test_video_stream)
thread.start()
return return
video = state.qi_session.service("ALVideoDevice") video = state.qi_session.service("ALVideoDevice")
@@ -55,40 +51,9 @@ class VideoSender(SocketBase):
thread = threading.Thread(target=self.video_rcv_loop, args=(video, vid_stream_name)) thread = threading.Thread(target=self.video_rcv_loop, args=(video, vid_stream_name))
thread.start() thread.start()
def test_video_stream(self):
"""
Test function to send video from a local webcam instead of the robot.
"""
cap = cv2.VideoCapture(0)
if not cap.isOpened():
logging.error("Could not open webcam for video stream test.")
return
while not state.exit_event.is_set():
ret, frame = cap.read()
if not ret:
logging.warning("Failed to read frame from webcam.")
continue
if cv2.waitKey(1) & 0xFF == ord('q'): # << Add this: Updates the window
break
height, width, channels = frame.shape
pixel_data = frame.tobytes()
width_bytes = struct.pack('<I', width)
height_bytes = struct.pack('<I', height)
self.socket.send_multipart([width_bytes, height_bytes, pixel_data])
cap.release()
def video_rcv_loop(self, vid_service, vid_stream_name): def video_rcv_loop(self, vid_service, vid_stream_name):
""" """
The main loop of retrieving video images from the robot. The main loop of retrieving video images from the robot.
Sends the image data over the ZMQ socket in 3 parts: image width, image height and raw image bytes.
:param vid_service: The video service object that the active Qi session is connected to. :param vid_service: The video service object that the active Qi session is connected to.
:type vid_service: Object (Qi service object) :type vid_service: Object (Qi service object)
@@ -101,18 +66,18 @@ class VideoSender(SocketBase):
try: try:
img = vid_service.getImageRemote(vid_stream_name) img = vid_service.getImageRemote(vid_stream_name)
if img is not None: if img is not None:
image_bytes = img[6] raw_data = img[6]
width = img[0] width = img[0]
height = img[1] height = img[1]
width_bytes = struct.pack('<I', width) width_bytes = struct.pack('<I', width)
height_bytes = struct.pack('<I', height) height_bytes = struct.pack('<I', height)
self.socket.send_multipart([width_bytes, height_bytes, image_bytes]) self.socket.send_multipart([width_bytes, height_bytes, raw_data])
except KeyboardInterrupt:
logging.info("Video receiving loop interrupted by user.")
except: except:
logging.warn("Failed to retrieve video image from robot.") logging.warn("Failed to retrieve video image from robot.")
except KeyboardInterrupt:
logging.info("Video receiving loop interrupted by user.")
finally: finally:
vid_service.unsubscribe(vid_stream_name) vid_service.unsubscribe(vid_stream_name)
logging.info("Unsubscribed from video stream.") logging.info("Unsubscribed from video stream.")

View File

@@ -91,7 +91,6 @@ def main():
context = zmq.Context() context = zmq.Context()
state.initialize() state.initialize()
state.active_event.set()
try: try:
main_loop(context) main_loop(context)

View File

@@ -56,8 +56,6 @@ class State(object):
signal.signal(signal.SIGINT, handle_exit) signal.signal(signal.SIGINT, handle_exit)
signal.signal(signal.SIGTERM, handle_exit) signal.signal(signal.SIGTERM, handle_exit)
self.active_event = threading.Event()
self.qi_session = get_qi_session() self.qi_session = get_qi_session()
self.is_initialized = True self.is_initialized = True

View File

@@ -39,6 +39,7 @@ def test_no_microphone(zmq_context, mocker):
sender.start() sender.start()
assert sender.thread is None assert sender.thread is None
mock_info_logger.assert_called()
sender.wait_until_done() # Should return early because we didn't start a thread sender.wait_until_done() # Should return early because we didn't start a thread
@@ -78,7 +79,7 @@ def test_sending_audio(mocker):
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.exit_event.is_set.side_effect = [False, False, True] mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock() mock_zmq_context = mock.Mock()
send_socket = mock.Mock() send_socket = mock.Mock()
@@ -98,217 +99,6 @@ def test_sending_audio(mocker):
send_socket.assert_called() send_socket.assert_called()
# SENDING PAUSE RESUME?
def test_stream_initial_wait_exit(mocker):
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.exit_event.is_set.return_value = True
mock_state.active_event.is_set.return_value = False
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender._stream()
mock_pyaudio_instance.open.assert_not_called()
def test_stream_pause_and_resume(mocker):
mock_stream = mock.Mock()
mock_stream.read.return_value = b"data"
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
mock_pyaudio_instance.open.return_value = mock_stream
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.active_event.is_set.return_value = False
mock_state.exit_event.is_set.side_effect = [False, False, False, True]
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender.socket = mock.Mock()
sender._stream()
assert mock_pyaudio_instance.open.call_count == 2
assert mock_stream.close.call_count == 2
assert mock_stream.stop_stream.call_count == 2
assert mock_state.active_event.wait.called
def test_stream_exit_during_pause(mocker):
mock_stream = mock.Mock()
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
mock_pyaudio_instance.open.return_value = mock_stream
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.active_event.is_set.return_value = False
mock_state.exit_event.is_set.side_effect = [False, False, True]
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender._stream()
assert mock_pyaudio_instance.open.call_count == 1
assert mock_stream.close.call_count == 1
def test_stream_read_error_recovery(mocker):
stream_fail = mock.Mock()
stream_fail.read.side_effect = IOError("Overflow")
stream_ok = mock.Mock()
stream_ok.read.return_value = b"data"
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
mock_pyaudio_instance.open.side_effect = [stream_fail, stream_ok]
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.active_event.is_set.return_value = True
mock_state.exit_event.is_set.side_effect = [False, False, False, True]
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender.socket = mock.Mock()
sender._stream()
stream_fail.close.assert_called()
assert mock_pyaudio_instance.open.call_count == 2
sender.socket.send.assert_called_with(b"data")
def test_stream_fatal_error(mocker):
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
mock_pyaudio_instance.open.side_effect = IOError("Fatal error")
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.active_event.is_set.return_value = True
mock_state.exit_event.is_set.return_value = False
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender._stream()
def test_wait_until_done(mocker):
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender.wait_until_done()
mock_thread = mocker.Mock()
sender.thread = mock_thread
sender.wait_until_done()
mock_thread.join.assert_called_once()
assert sender.thread is None
def test_stream_pause_close_error(mocker):
"""
Tests that an IOError during stream closure (when pausing) is ignored,
covering the 'pass' statement in the pause logic.
"""
mock_stream = mock.Mock()
mock_stream.read.return_value = b"data"
# Raise IOError when stopping the stream during pause
mock_stream.stop_stream.side_effect = IOError("Failed to stop")
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
mock_pyaudio_instance.open.return_value = mock_stream
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
# 1. First False triggers the pause block
# 2. Second True resumes the loop
mock_state.active_event.is_set.side_effect = [False, True]
mock_state.exit_event.is_set.side_effect = [False, False, False, True]
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender.socket = mock.Mock()
sender._stream()
# Verification: The error should be swallowed, and the stream should re-open
assert mock_stream.stop_stream.called
assert mock_pyaudio_instance.open.call_count == 2
def test_stream_finally_close_error(mocker):
"""
Tests that an IOError during stream closure in the finally block is ignored,
covering the 'pass' statement in the finally logic.
"""
mock_stream = mock.Mock()
mock_stream.read.return_value = b"data"
# Raise IOError when stopping the stream at exit
mock_stream.stop_stream.side_effect = IOError("Cleanup failed")
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
mock_pyaudio_instance.open.return_value = mock_stream
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.active_event.is_set.return_value = True
mock_state.exit_event.is_set.side_effect = [False, False, True]
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender.socket = mock.Mock()
# Run
sender._stream()
# Assert: Should finish without raising exception despite the IOError in finally
assert mock_stream.stop_stream.called
def test_stream_recovery_failure(mocker):
"""
Tests the case where recovering from a read error (re-opening stream) also fails.
This ensures the outer try-except catches exceptions from the inner except block.
"""
mock_stream_initial = mock.Mock()
# Trigger the read error logic
mock_stream_initial.read.side_effect = IOError("Read failed")
mock_pyaudio_cls = mocker.patch("robot_interface.endpoints.audio_sender.pyaudio.PyAudio")
mock_pyaudio_instance = mock_pyaudio_cls.return_value
# First open works, Second open (recovery) fails fatally
mock_pyaudio_instance.open.side_effect = [
mock_stream_initial,
IOError("Recovery failed")
]
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.active_event.is_set.return_value = True
mock_state.exit_event.is_set.return_value = False
mock_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger")
mock_zmq_context = mock.Mock()
sender = AudioSender(mock_zmq_context)
sender.socket = mock.Mock()
sender._stream()
# Assert we hit the outer error log
mock_logger.error.assert_called()
def test_no_sending_if_speaking(mocker): def test_no_sending_if_speaking(mocker):
""" """

View File

@@ -5,167 +5,128 @@ University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences) © Copyright Utrecht University (Department of Information and Computing Sciences)
""" """
import struct
import mock import mock
import pytest import pytest
import zmq import zmq
from robot_interface.endpoints.video_sender import VideoSender from robot_interface.endpoints.video_sender import VideoSender
from robot_interface.state import state
from robot_interface.core.config import settings
@pytest.fixture @pytest.fixture
def zmq_context(): def zmq_context():
""" """Provide a ZMQ context."""
Yields a real ZMQ context for socket creation. yield zmq.Context()
"""
context = zmq.Context()
yield context
context.term()
def test_init_defaults(zmq_context, mocker):
"""
Test initialization of the VideoSender.
"""
# We patch settings to ensure valid port access inside the class logic,
# although the default arg is evaluated at import time.
mocker.patch("robot_interface.endpoints.video_sender.settings")
mock_zmq = mock.Mock() def _patch_basics(mocker):
sender = VideoSender(mock_zmq) """Common patches: prevent real threads, port binds, and state errors."""
mocker.patch("robot_interface.endpoints.socket_base.zmq.Socket.bind")
mocker.patch("robot_interface.endpoints.video_sender.threading.Thread")
mocker.patch.object(state, "is_initialized", True)
# Verify socket type is PUB
assert sender.identifier == "video"
def test_start_no_qi_session(mocker): def _patch_exit_event(mocker):
""" """Make exit_event stop the loop after one iteration."""
Test that the loop does not start if no Qi session is available. fake_event = mock.Mock()
""" fake_event.is_set.side_effect = [False, True]
# Mock state to return None for qi_session mocker.patch.object(state, "exit_event", fake_event)
mock_state = mocker.patch("robot_interface.endpoints.video_sender.state")
mock_state.qi_session = None
mock_threading = mocker.patch("robot_interface.endpoints.video_sender.threading")
mock_zmq = mock.Mock() def test_no_qi_session(zmq_context, mocker):
sender = VideoSender(mock_zmq) """Video loop should not start without a qi_session."""
_patch_basics(mocker)
mocker.patch.object(state, "qi_session", None)
sender = VideoSender(zmq_context)
sender.start_video_rcv() sender.start_video_rcv()
# Assertions assert not hasattr(sender, "thread")
mock_threading.Thread.assert_not_called()
def test_start_success(mocker):
""" def test_video_streaming(zmq_context, mocker):
Test successful startup of the video receiver thread. """VideoSender should send retrieved image data."""
""" _patch_basics(mocker)
# Mock the Qi Session and Service _patch_exit_event(mocker)
mock_state = mocker.patch("robot_interface.endpoints.video_sender.state")
mock_session = mock.Mock() # Pepper's image buffer lives at index 6
mock_state.qi_session = mock_session mocker.patch.object(settings.video_config, "image_buffer", 6)
test_width = 320
test_height = 240
mock_video_service = mock.Mock()
mock_video_service.getImageRemote.return_value = [test_width, test_height, None, None, None, None, b"fake_img"]
fake_session = mock.Mock()
fake_session.service.return_value = mock_video_service
mocker.patch.object(state, "qi_session", fake_session)
mocker.patch.object(
fake_session.service("ALVideoDevice"),
"subscribeCamera",
return_value="stream_name"
)
sender = VideoSender(zmq_context)
send_socket = mock.Mock()
sender.socket.send_multipart = send_socket
sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_called_with([
struct.pack('<I', 320),
struct.pack('<I', 240),
b"fake_img"
])
def test_video_receive_error(zmq_context, mocker):
"""Errors retrieving images should not call send()."""
_patch_basics(mocker)
_patch_exit_event(mocker)
mock_video_service = mock.Mock() mock_video_service = mock.Mock()
mock_session.service.return_value = mock_video_service mock_video_service.getImageRemote.side_effect = Exception("boom")
mock_video_service.subscribeCamera.return_value = "test_subscriber_id"
# Mock Settings fake_session = mock.Mock()
mock_settings = mocker.patch("robot_interface.endpoints.video_sender.settings") fake_session.service.return_value = mock_video_service
mock_settings.video_config.camera_index = 0 mocker.patch.object(state, "qi_session", fake_session)
mock_settings.video_config.resolution = 2
mock_settings.video_config.color_space = 11
mock_settings.video_config.fps = 30
mock_settings.video_config.stream_name = "test_stream"
mock_threading = mocker.patch("robot_interface.endpoints.video_sender.threading") mocker.patch.object(
fake_session.service("ALVideoDevice"),
"subscribeCamera",
return_value="stream_name"
)
sender = VideoSender(zmq_context)
send_socket = mock.Mock()
sender.socket.send_multipart = send_socket
# Run
mock_zmq = mock.Mock()
sender = VideoSender(mock_zmq)
sender.start_video_rcv() sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name")
# Assertions send_socket.assert_not_called()
mock_session.service.assert_called_with("ALVideoDevice")
mock_video_service.subscribeCamera.assert_called_with("test_stream", 0, 2, 11, 30)
mock_threading.Thread.assert_called_once() def test_video_loop_keyboard_interrupt(zmq_context, mocker):
# Verify arguments passed to the thread target """Video loop should handle KeyboardInterrupt gracefully and unsubscribe."""
call_args = mock_threading.Thread.call_args[1] _patch_basics(mocker)
assert call_args["target"] == sender.video_rcv_loop _patch_exit_event(mocker)
assert call_args["args"] == (mock_video_service, "test_subscriber_id")
# Ensure thread was started # We mock the video service to raise KeyboardInterrupt when accessed
mock_threading.Thread.return_value.start.assert_called_once() mock_video_service = mock.Mock()
mock_video_service.getImageRemote.side_effect = KeyboardInterrupt
def test_video_loop_happy_path(mocker): # Mock logging to verify the specific interrupt message is logged
""" mock_logger = mocker.patch("robot_interface.endpoints.video_sender.logging")
Test the main loop: Wait -> Get Image -> Send -> Repeat/Exit.
"""
# Mock settings for image buffer index
mock_settings = mocker.patch("robot_interface.endpoints.video_sender.settings")
mock_settings.video_config.image_buffer = 6
# Mock Video Service to return a fake image structure sender = VideoSender(zmq_context)
# Standard NaoQi image is a list, binary data is usually at index 6
fake_image_data = b"binary_jpeg_data"
fake_image_list = [0] * 7
fake_image_list[6] = fake_image_data
mock_service = mock.Mock() # Execute the loop
mock_service.getImageRemote.return_value = fake_image_list sender.video_rcv_loop(mock_video_service, "stream_name")
# Mock Events: # Verify the 'finally' block executed (unsubscribe)
# exit_event: False (start), False (loop once), True (break) mock_video_service.unsubscribe.assert_called_with("stream_name")
mock_state = mocker.patch("robot_interface.endpoints.video_sender.state") mock_logger.info.assert_any_call("Unsubscribed from video stream.")
mock_state.exit_event.is_set.side_effect = [False, False, True]
# Run
mock_zmq = mock.Mock()
sender = VideoSender(mock_zmq)
sender.socket = mock.Mock() # Mock the socket to verify send
sender.video_rcv_loop(mock_service, "sub_id")
# Assertions
mock_state.active_event.wait.assert_called()
mock_service.getImageRemote.assert_called_with("sub_id")
sender.socket.send.assert_called_with(fake_image_data)
def test_video_loop_exit_during_wait(zmq_context, mocker):
"""
Test that the loop breaks immediately if exit_event is set while waiting.
"""
mock_service = mock.Mock()
mock_state = mocker.patch("robot_interface.endpoints.video_sender.state")
# 1. Loop check: False (enter loop)
# 2. Wait happens (mock returns instantly)
# 3. Post-wait check: True (break)
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zqm = mock.Mock()
sender = VideoSender(mock_zqm)
sender.video_rcv_loop(mock_service, "sub_id")
# Assert we never tried to get an image
mock_service.getImageRemote.assert_not_called()
def test_video_loop_exception_handling(zmq_context, mocker):
"""
Test that exceptions during image retrieval are caught and logged,
and do not crash the thread.
"""
mock_settings = mocker.patch("robot_interface.endpoints.video_sender.settings")
mock_service = mock.Mock()
# First call raises Exception, Second call works (if we allowed it, but we exit)
mock_service.getImageRemote.side_effect = Exception("Camera disconnected")
mock_state = mocker.patch("robot_interface.endpoints.video_sender.state")
# Loop runs once then exits
mock_state.exit_event.is_set.side_effect = [False, False, True]
mock_zmq = mock.Mock()
sender = VideoSender(mock_zmq)
sender.socket = mock.Mock()
sender.video_rcv_loop(mock_service, "sub_id")
# Assertions
# Ensure loop didn't crash; it should have completed the iteration and checked exit_event
assert mock_state.exit_event.is_set.call_count >= 2