64 lines
1.9 KiB
Python
64 lines
1.9 KiB
Python
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
from starlette.responses import StreamingResponse
|
|
|
|
from control_backend.api.v1.endpoints import logs
|
|
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
"""TestClient with logs router included."""
|
|
app = FastAPI()
|
|
app.include_router(logs.router)
|
|
return TestClient(app)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_log_stream_endpoint_lines(client):
|
|
"""Call /logs/stream with a mocked ZMQ socket to cover all lines."""
|
|
|
|
# Dummy socket to mock ZMQ behavior
|
|
class DummySocket:
|
|
def __init__(self):
|
|
self.subscribed = []
|
|
self.connected = False
|
|
self.recv_count = 0
|
|
|
|
def subscribe(self, topic):
|
|
self.subscribed.append(topic)
|
|
|
|
def connect(self, addr):
|
|
self.connected = True
|
|
|
|
async def recv_multipart(self):
|
|
# Return one message, then stop generator
|
|
if self.recv_count == 0:
|
|
self.recv_count += 1
|
|
return (b"INFO", b"test message")
|
|
else:
|
|
raise StopAsyncIteration
|
|
|
|
dummy_socket = DummySocket()
|
|
|
|
# Patch Context.instance().socket() to return dummy socket
|
|
with patch("control_backend.api.v1.endpoints.logs.Context.instance") as mock_context:
|
|
mock_context.return_value.socket.return_value = dummy_socket
|
|
|
|
# Call the endpoint directly
|
|
response = await logs.log_stream()
|
|
assert isinstance(response, StreamingResponse)
|
|
|
|
# Fetch one chunk from the generator
|
|
gen = response.body_iterator
|
|
chunk = await gen.__anext__()
|
|
if isinstance(chunk, bytes):
|
|
chunk = chunk.decode("utf-8")
|
|
assert "data:" in chunk
|
|
|
|
# Optional: assert subscribe/connect were called
|
|
assert dummy_socket.subscribed # at least some log levels subscribed
|
|
assert dummy_socket.connected # connect was called
|