from unittest.mock import patch import pytest from fastapi import FastAPI from fastapi.testclient import TestClient from starlette.responses import StreamingResponse from control_backend.api.v1.endpoints import logs @pytest.fixture def client(): """TestClient with logs router included.""" app = FastAPI() app.include_router(logs.router) return TestClient(app) @pytest.mark.asyncio async def test_log_stream_endpoint_lines(client): """Call /logs/stream with a mocked ZMQ socket to cover all lines.""" # Dummy socket to mock ZMQ behavior class DummySocket: def __init__(self): self.subscribed = [] self.connected = False self.recv_count = 0 def subscribe(self, topic): self.subscribed.append(topic) def connect(self, addr): self.connected = True async def recv_multipart(self): # Return one message, then stop generator if self.recv_count == 0: self.recv_count += 1 return (b"INFO", b"test message") else: raise StopAsyncIteration dummy_socket = DummySocket() # Patch Context.instance().socket() to return dummy socket with patch("control_backend.api.v1.endpoints.logs.Context.instance") as mock_context: mock_context.return_value.socket.return_value = dummy_socket # Call the endpoint directly response = await logs.log_stream() assert isinstance(response, StreamingResponse) # Fetch one chunk from the generator gen = response.body_iterator chunk = await gen.__anext__() if isinstance(chunk, bytes): chunk = chunk.decode("utf-8") assert "data:" in chunk # Optional: assert subscribe/connect were called assert dummy_socket.subscribed # at least some log levels subscribed assert dummy_socket.connected # connect was called