Merge branch 'dev' into feat/reset-experiment-and-phase
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from fastapi import FastAPI
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.testclient import TestClient
|
||||
from starlette.responses import StreamingResponse
|
||||
|
||||
@@ -61,3 +61,67 @@ async def test_log_stream_endpoint_lines(client):
|
||||
# Optional: assert subscribe/connect were called
|
||||
assert dummy_socket.subscribed # at least some log levels subscribed
|
||||
assert dummy_socket.connected # connect was called
|
||||
|
||||
|
||||
@patch("control_backend.api.v1.endpoints.logs.LOGGING_DIR")
|
||||
def test_files_endpoint(LOGGING_DIR, client):
|
||||
file_1, file_2 = MagicMock(), MagicMock()
|
||||
file_1.name = "file_1"
|
||||
file_2.name = "file_2"
|
||||
LOGGING_DIR.glob.return_value = [file_1, file_2]
|
||||
result = client.get("/api/logs/files")
|
||||
|
||||
assert result.status_code == 200
|
||||
assert result.json() == ["file_1", "file_2"]
|
||||
|
||||
|
||||
@patch("control_backend.api.v1.endpoints.logs.FileResponse")
|
||||
@patch("control_backend.api.v1.endpoints.logs.LOGGING_DIR")
|
||||
def test_log_file_endpoint_success(LOGGING_DIR, MockFileResponse, client):
|
||||
mock_file_path = MagicMock()
|
||||
mock_file_path.is_relative_to.return_value = True
|
||||
mock_file_path.is_file.return_value = True
|
||||
mock_file_path.name = "test.log"
|
||||
|
||||
LOGGING_DIR.__truediv__ = MagicMock(return_value=mock_file_path)
|
||||
mock_file_path.resolve.return_value = mock_file_path
|
||||
|
||||
MockFileResponse.return_value = MagicMock()
|
||||
|
||||
result = client.get("/api/logs/files/test.log")
|
||||
|
||||
assert result.status_code == 200
|
||||
MockFileResponse.assert_called_once_with(mock_file_path, filename="test.log")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@patch("control_backend.api.v1.endpoints.logs.LOGGING_DIR")
|
||||
async def test_log_file_endpoint_path_traversal(LOGGING_DIR):
|
||||
from control_backend.api.v1.endpoints.logs import log_file
|
||||
|
||||
mock_file_path = MagicMock()
|
||||
mock_file_path.is_relative_to.return_value = False
|
||||
|
||||
LOGGING_DIR.__truediv__ = MagicMock(return_value=mock_file_path)
|
||||
mock_file_path.resolve.return_value = mock_file_path
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await log_file("../secret.txt")
|
||||
|
||||
assert exc_info.value.status_code == 400
|
||||
assert exc_info.value.detail == "Invalid filename."
|
||||
|
||||
|
||||
@patch("control_backend.api.v1.endpoints.logs.LOGGING_DIR")
|
||||
def test_log_file_endpoint_file_not_found(LOGGING_DIR, client):
|
||||
mock_file_path = MagicMock()
|
||||
mock_file_path.is_relative_to.return_value = True
|
||||
mock_file_path.is_file.return_value = False
|
||||
|
||||
LOGGING_DIR.__truediv__ = MagicMock(return_value=mock_file_path)
|
||||
mock_file_path.resolve.return_value = mock_file_path
|
||||
|
||||
result = client.get("/api/logs/files/nonexistent.log")
|
||||
|
||||
assert result.status_code == 404
|
||||
assert result.json()["detail"] == "File not found."
|
||||
|
||||
45
test/unit/logging/test_dated_file_handler.py
Normal file
45
test/unit/logging/test_dated_file_handler.py
Normal file
@@ -0,0 +1,45 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from control_backend.logging.dated_file_handler import DatedFileHandler
|
||||
|
||||
|
||||
@patch("control_backend.logging.dated_file_handler.DatedFileHandler._open")
|
||||
def test_reset(open_):
|
||||
stream = MagicMock()
|
||||
open_.return_value = stream
|
||||
|
||||
# A file should be opened when the logger is created
|
||||
handler = DatedFileHandler(file_prefix="anything")
|
||||
assert open_.call_count == 1
|
||||
|
||||
# Upon reset, the current file should be closed, and a new one should be opened
|
||||
handler.do_rollover()
|
||||
assert stream.close.call_count == 1
|
||||
assert open_.call_count == 2
|
||||
|
||||
|
||||
@patch("control_backend.logging.dated_file_handler.Path")
|
||||
@patch("control_backend.logging.dated_file_handler.DatedFileHandler._open")
|
||||
def test_creates_dir(open_, Path_):
|
||||
stream = MagicMock()
|
||||
open_.return_value = stream
|
||||
|
||||
test_path = MagicMock()
|
||||
test_path.parent.is_dir.return_value = False
|
||||
Path_.return_value = test_path
|
||||
|
||||
DatedFileHandler(file_prefix="anything")
|
||||
|
||||
# The directory should've been created
|
||||
test_path.parent.mkdir.assert_called_once()
|
||||
|
||||
|
||||
@patch("control_backend.logging.dated_file_handler.DatedFileHandler._open")
|
||||
def test_invalid_constructor(_):
|
||||
with pytest.raises(ValueError):
|
||||
DatedFileHandler(file_prefix=None)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
DatedFileHandler(file_prefix="")
|
||||
218
test/unit/logging/test_optional_field_formatter.py
Normal file
218
test/unit/logging/test_optional_field_formatter.py
Normal file
@@ -0,0 +1,218 @@
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
from control_backend.logging.optional_field_formatter import OptionalFieldFormatter
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def logger():
|
||||
"""Create a fresh logger for each test."""
|
||||
logger = logging.getLogger(f"test_{id(object())}")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logger.handlers = []
|
||||
return logger
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def log_output(logger):
|
||||
"""Capture log output and return a function to get it."""
|
||||
|
||||
class ListHandler(logging.Handler):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.records = []
|
||||
|
||||
def emit(self, record):
|
||||
self.records.append(self.format(record))
|
||||
|
||||
handler = ListHandler()
|
||||
logger.addHandler(handler)
|
||||
|
||||
def get_output():
|
||||
return handler.records
|
||||
|
||||
return get_output
|
||||
|
||||
|
||||
def test_optional_field_present(logger, log_output):
|
||||
"""Optional field should appear when provided in extra."""
|
||||
formatter = OptionalFieldFormatter("%(levelname)s - %(role?)s - %(message)s")
|
||||
logger.handlers[0].setFormatter(formatter)
|
||||
|
||||
logger.info("test message", extra={"role": "user"})
|
||||
|
||||
assert log_output() == ["INFO - user - test message"]
|
||||
|
||||
|
||||
def test_optional_field_missing_no_default(logger, log_output):
|
||||
"""Missing optional field with no default should be None."""
|
||||
formatter = OptionalFieldFormatter("%(levelname)s - %(role?)s - %(message)s")
|
||||
logger.handlers[0].setFormatter(formatter)
|
||||
|
||||
logger.info("test message")
|
||||
|
||||
assert log_output() == ["INFO - None - test message"]
|
||||
|
||||
|
||||
def test_optional_field_missing_with_default(logger, log_output):
|
||||
"""Missing optional field should use provided default."""
|
||||
formatter = OptionalFieldFormatter(
|
||||
"%(levelname)s - %(role?)s - %(message)s", defaults={"role": "assistant"}
|
||||
)
|
||||
logger.handlers[0].setFormatter(formatter)
|
||||
|
||||
logger.info("test message")
|
||||
|
||||
assert log_output() == ["INFO - assistant - test message"]
|
||||
|
||||
|
||||
def test_optional_field_overrides_default(logger, log_output):
|
||||
"""Provided extra value should override default."""
|
||||
formatter = OptionalFieldFormatter(
|
||||
"%(levelname)s - %(role?)s - %(message)s", defaults={"role": "assistant"}
|
||||
)
|
||||
logger.handlers[0].setFormatter(formatter)
|
||||
|
||||
logger.info("test message", extra={"role": "user"})
|
||||
|
||||
assert log_output() == ["INFO - user - test message"]
|
||||
|
||||
|
||||
def test_multiple_optional_fields(logger, log_output):
|
||||
"""Multiple optional fields should work independently."""
|
||||
formatter = OptionalFieldFormatter(
|
||||
"%(levelname)s - %(role?)s - %(request_id?)s - %(message)s", defaults={"role": "assistant"}
|
||||
)
|
||||
logger.handlers[0].setFormatter(formatter)
|
||||
|
||||
logger.info("test", extra={"request_id": "123"})
|
||||
|
||||
assert log_output() == ["INFO - assistant - 123 - test"]
|
||||
|
||||
|
||||
def test_mixed_optional_and_required_fields(logger, log_output):
|
||||
"""Standard fields should work alongside optional fields."""
|
||||
formatter = OptionalFieldFormatter("%(levelname)s %(name)s %(role?)s %(message)s")
|
||||
logger.handlers[0].setFormatter(formatter)
|
||||
|
||||
logger.info("test", extra={"role": "user"})
|
||||
|
||||
output = log_output()[0]
|
||||
assert "INFO" in output
|
||||
assert "user" in output
|
||||
assert "test" in output
|
||||
|
||||
|
||||
def test_no_optional_fields(logger, log_output):
|
||||
"""Formatter should work normally with no optional fields."""
|
||||
formatter = OptionalFieldFormatter("%(levelname)s %(message)s")
|
||||
logger.handlers[0].setFormatter(formatter)
|
||||
|
||||
logger.info("test message")
|
||||
|
||||
assert log_output() == ["INFO test message"]
|
||||
|
||||
|
||||
def test_integer_format_specifier(logger, log_output):
|
||||
"""Optional fields with %d specifier should work."""
|
||||
formatter = OptionalFieldFormatter(
|
||||
"%(levelname)s %(count?)d %(message)s", defaults={"count": 0}
|
||||
)
|
||||
logger.handlers[0].setFormatter(formatter)
|
||||
|
||||
logger.info("test", extra={"count": 42})
|
||||
|
||||
assert log_output() == ["INFO 42 test"]
|
||||
|
||||
|
||||
def test_float_format_specifier(logger, log_output):
|
||||
"""Optional fields with %f specifier should work."""
|
||||
formatter = OptionalFieldFormatter(
|
||||
"%(levelname)s %(duration?)f %(message)s", defaults={"duration": 0.0}
|
||||
)
|
||||
logger.handlers[0].setFormatter(formatter)
|
||||
|
||||
logger.info("test", extra={"duration": 1.5})
|
||||
|
||||
assert "1.5" in log_output()[0]
|
||||
|
||||
|
||||
def test_empty_string_default(logger, log_output):
|
||||
"""Empty string default should work."""
|
||||
formatter = OptionalFieldFormatter("%(levelname)s %(role?)s %(message)s", defaults={"role": ""})
|
||||
logger.handlers[0].setFormatter(formatter)
|
||||
|
||||
logger.info("test")
|
||||
|
||||
assert log_output() == ["INFO test"]
|
||||
|
||||
|
||||
def test_none_format_string():
|
||||
"""None format string should not raise."""
|
||||
formatter = OptionalFieldFormatter(fmt=None)
|
||||
assert formatter.optional_fields == set()
|
||||
|
||||
|
||||
def test_optional_fields_parsed_correctly():
|
||||
"""Check that optional fields are correctly identified."""
|
||||
formatter = OptionalFieldFormatter("%(asctime)s %(role?)s %(level?)d %(name)s")
|
||||
|
||||
assert formatter.optional_fields == {("role", "s"), ("level", "d")}
|
||||
|
||||
|
||||
def test_format_string_normalized():
|
||||
"""Check that ? is removed from format string."""
|
||||
formatter = OptionalFieldFormatter("%(role?)s %(message)s")
|
||||
|
||||
assert "?" not in formatter._style._fmt
|
||||
assert "%(role)s" in formatter._style._fmt
|
||||
|
||||
|
||||
def test_field_with_underscore(logger, log_output):
|
||||
"""Field names with underscores should work."""
|
||||
formatter = OptionalFieldFormatter("%(levelname)s %(user_id?)s %(message)s")
|
||||
logger.handlers[0].setFormatter(formatter)
|
||||
|
||||
logger.info("test", extra={"user_id": "abc123"})
|
||||
|
||||
assert log_output() == ["INFO abc123 test"]
|
||||
|
||||
|
||||
def test_field_with_numbers(logger, log_output):
|
||||
"""Field names with numbers should work."""
|
||||
formatter = OptionalFieldFormatter("%(levelname)s %(field2?)s %(message)s")
|
||||
logger.handlers[0].setFormatter(formatter)
|
||||
|
||||
logger.info("test", extra={"field2": "value"})
|
||||
|
||||
assert log_output() == ["INFO value test"]
|
||||
|
||||
|
||||
def test_multiple_log_calls(logger, log_output):
|
||||
"""Formatter should work correctly across multiple log calls."""
|
||||
formatter = OptionalFieldFormatter(
|
||||
"%(levelname)s %(role?)s %(message)s", defaults={"role": "other"}
|
||||
)
|
||||
logger.handlers[0].setFormatter(formatter)
|
||||
|
||||
logger.info("first", extra={"role": "assistant"})
|
||||
logger.info("second")
|
||||
logger.info("third", extra={"role": "user"})
|
||||
|
||||
assert log_output() == [
|
||||
"INFO assistant first",
|
||||
"INFO other second",
|
||||
"INFO user third",
|
||||
]
|
||||
|
||||
|
||||
def test_default_not_mutated(logger, log_output):
|
||||
"""Original defaults dict should not be mutated."""
|
||||
defaults = {"role": "other"}
|
||||
formatter = OptionalFieldFormatter("%(levelname)s %(role?)s %(message)s", defaults=defaults)
|
||||
logger.handlers[0].setFormatter(formatter)
|
||||
|
||||
logger.info("test")
|
||||
|
||||
assert defaults == {"role": "other"}
|
||||
83
test/unit/logging/test_partial_filter.py
Normal file
83
test/unit/logging/test_partial_filter.py
Normal file
@@ -0,0 +1,83 @@
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
from control_backend.logging import PartialFilter
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def logger():
|
||||
"""Create a fresh logger for each test."""
|
||||
logger = logging.getLogger(f"test_{id(object())}")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logger.handlers = []
|
||||
return logger
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def log_output(logger):
|
||||
"""Capture log output and return a function to get it."""
|
||||
|
||||
class ListHandler(logging.Handler):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.records = []
|
||||
|
||||
def emit(self, record):
|
||||
self.records.append(self.format(record))
|
||||
|
||||
handler = ListHandler()
|
||||
handler.addFilter(PartialFilter())
|
||||
handler.setFormatter(logging.Formatter("%(message)s"))
|
||||
logger.addHandler(handler)
|
||||
|
||||
return lambda: handler.records
|
||||
|
||||
|
||||
def test_no_partial_attribute(logger, log_output):
|
||||
"""Records without partial attribute should pass through."""
|
||||
logger.info("normal message")
|
||||
|
||||
assert log_output() == ["normal message"]
|
||||
|
||||
|
||||
def test_partial_true_filtered(logger, log_output):
|
||||
"""Records with partial=True should be filtered out."""
|
||||
logger.info("partial message", extra={"partial": True})
|
||||
|
||||
assert log_output() == []
|
||||
|
||||
|
||||
def test_partial_false_passes(logger, log_output):
|
||||
"""Records with partial=False should pass through."""
|
||||
logger.info("complete message", extra={"partial": False})
|
||||
|
||||
assert log_output() == ["complete message"]
|
||||
|
||||
|
||||
def test_partial_none_passes(logger, log_output):
|
||||
"""Records with partial=None should pass through."""
|
||||
logger.info("message", extra={"partial": None})
|
||||
|
||||
assert log_output() == ["message"]
|
||||
|
||||
|
||||
def test_partial_truthy_value_passes(logger, log_output):
|
||||
"""
|
||||
Records with truthy but non-True partial should pass through, that is, only when it's exactly
|
||||
``True`` should it pass.
|
||||
"""
|
||||
logger.info("message", extra={"partial": "yes"})
|
||||
|
||||
assert log_output() == ["message"]
|
||||
|
||||
|
||||
def test_multiple_records_mixed(logger, log_output):
|
||||
"""Filter should handle mixed records correctly."""
|
||||
logger.info("first")
|
||||
logger.info("second", extra={"partial": True})
|
||||
logger.info("third", extra={"partial": False})
|
||||
logger.info("fourth", extra={"partial": True})
|
||||
logger.info("fifth")
|
||||
|
||||
assert log_output() == ["first", "third", "fifth"]
|
||||
Reference in New Issue
Block a user