test: 100% coverage
ref: N25B-393
This commit is contained in:
@@ -104,3 +104,24 @@ def test_video_receive_error(zmq_context, mocker):
|
||||
sender.video_rcv_loop(mock_video_service, "stream_name")
|
||||
|
||||
send_socket.assert_not_called()
|
||||
|
||||
def test_video_loop_keyboard_interrupt(zmq_context, mocker):
|
||||
"""Video loop should handle KeyboardInterrupt gracefully and unsubscribe."""
|
||||
_patch_basics(mocker)
|
||||
_patch_exit_event(mocker)
|
||||
|
||||
# We mock the video service to raise KeyboardInterrupt when accessed
|
||||
mock_video_service = mock.Mock()
|
||||
mock_video_service.getImageRemote.side_effect = KeyboardInterrupt
|
||||
|
||||
# Mock logging to verify the specific interrupt message is logged
|
||||
mock_logger = mocker.patch("robot_interface.endpoints.video_sender.logging")
|
||||
|
||||
sender = VideoSender(zmq_context)
|
||||
|
||||
# Execute the loop
|
||||
sender.video_rcv_loop(mock_video_service, "stream_name")
|
||||
|
||||
# Verify the 'finally' block executed (unsubscribe)
|
||||
mock_video_service.unsubscribe.assert_called_with("stream_name")
|
||||
mock_logger.info.assert_any_call("Unsubscribed from video stream.")
|
||||
|
||||
Reference in New Issue
Block a user