Compare commits

...

11 Commits

Author SHA1 Message Date
Pim Hutting
02063a73b2 feat: added recursive mapping
ref: N25B-400
2026-01-22 11:15:13 +01:00
Storm
b9a47eeb0c Merge branch 'feat/visual-emotion-recognition' into demo 2026-01-20 12:48:27 +01:00
8575ddcbcf feat: add experiment log for phase transition
ref: N25B-453
2026-01-20 12:30:47 +01:00
59b35b31b2 feat: add UI log statement for triggers
ref: N25B-453
2026-01-20 12:08:31 +01:00
Twirre Meulenbelt
7516667545 feat: add useful experiment logs to various agents
ref: N25B-401
2026-01-20 11:58:30 +01:00
651f1b74a6 chore: long timeout for non-local LLM 2026-01-20 11:55:00 +01:00
5ed751de8c chore: add logs to .gitignore 2026-01-20 11:05:32 +01:00
89ebe45724 Merge remote-tracking branch 'origin/feat/experiment-logging' into demo 2026-01-20 11:04:31 +01:00
Twirre Meulenbelt
58881b5914 test: add test cases
ref: N25B-401
2026-01-19 12:47:59 +01:00
Twirre Meulenbelt
ba79d09c5d feat: log download endpoints
ref: N25B-401
2026-01-16 16:32:51 +01:00
Twirre Meulenbelt
4cda4e5e70 feat: experiment log stream, to file and UI
Adds a few new logging utility classes. One to save to files with a date, one to support optional fields in formats, last to filter partial log messages.

ref: N25B-401
2026-01-15 17:07:49 +01:00
21 changed files with 690 additions and 39 deletions

2
.gitignore vendored
View File

@@ -224,7 +224,7 @@ docs/*
# Generated files
agentspeak.asl
experiment-*.log

View File

@@ -1,36 +1,57 @@
version: 1
custom_levels:
OBSERVATION: 25
ACTION: 26
OBSERVATION: 24
ACTION: 25
CHAT: 26
LLM: 9
formatters:
# Console output
colored:
(): "colorlog.ColoredFormatter"
class: colorlog.ColoredFormatter
format: "{log_color}{asctime}.{msecs:03.0f} | {levelname:11} | {name:70} | {message}"
style: "{"
datefmt: "%H:%M:%S"
# User-facing UI (structured JSON)
json_experiment:
(): "pythonjsonlogger.jsonlogger.JsonFormatter"
json:
class: pythonjsonlogger.jsonlogger.JsonFormatter
format: "{name} {levelname} {levelno} {message} {created} {relativeCreated}"
style: "{"
# Experiment stream for console and file output, with optional `role` field
experiment:
class: control_backend.logging.OptionalFieldFormatter
format: "%(asctime)s %(levelname)s %(role?)s %(message)s"
defaults:
role: "-"
filters:
# Filter out any log records that have the extra field "partial" set to True, indicating that they
# will be replaced later.
partial:
(): control_backend.logging.PartialFilter
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: colored
filters: [partial]
stream: ext://sys.stdout
ui:
class: zmq.log.handlers.PUBHandler
level: LLM
formatter: json_experiment
formatter: json
file:
class: control_backend.logging.DatedFileHandler
formatter: experiment
filters: [partial]
# Directory must match config.logging_settings.experiment_log_directory
file_prefix: experiment_logs/experiment
# Level of external libraries
# Level for external libraries
root:
level: WARN
handlers: [console]
@@ -39,3 +60,6 @@ loggers:
control_backend:
level: LLM
handlers: [ui]
experiment: # This name must match config.logging_settings.experiment_logger_name
level: DEBUG
handlers: [ui, file]

View File

@@ -1,4 +1,5 @@
import json
import logging
import zmq
import zmq.asyncio as azmq
@@ -8,6 +9,8 @@ from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class RobotGestureAgent(BaseAgent):
"""
@@ -111,6 +114,7 @@ class RobotGestureAgent(BaseAgent):
gesture_command.data,
)
return
experiment_logger.action("Gesture: %s", gesture_command.data)
await self.pubsocket.send_json(gesture_command.model_dump())
except Exception:
self.logger.exception("Error processing internal message.")

View File

@@ -1,9 +1,10 @@
import logging
from abc import ABC
from control_backend.core.agent_system import BaseAgent as CoreBaseAgent
class BaseAgent(CoreBaseAgent):
class BaseAgent(CoreBaseAgent, ABC):
"""
The primary base class for all implementation agents.

View File

@@ -1,6 +1,7 @@
import asyncio
import copy
import json
import logging
import time
from collections.abc import Iterable
@@ -19,6 +20,9 @@ from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, Speec
DELIMITER = ";\n" # TODO: temporary until we support lists in AgentSpeak
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class BDICoreAgent(BaseAgent):
"""
BDI Core Agent.
@@ -207,6 +211,9 @@ class BDICoreAgent(BaseAgent):
else:
term = agentspeak.Literal(name)
if name != "user_said":
experiment_logger.observation(f"Formed new belief: {name}{f'={args}' if args else ''}")
self.bdi_agent.call(
agentspeak.Trigger.addition,
agentspeak.GoalType.belief,
@@ -244,6 +251,9 @@ class BDICoreAgent(BaseAgent):
new_args = (agentspeak.Literal(arg) for arg in args)
term = agentspeak.Literal(name, new_args)
if name != "user_said":
experiment_logger.observation(f"Removed belief: {name}{f'={args}' if args else ''}")
result = self.bdi_agent.call(
agentspeak.Trigger.removal,
agentspeak.GoalType.belief,
@@ -386,6 +396,8 @@ class BDICoreAgent(BaseAgent):
body=str(message_text),
)
experiment_logger.chat(str(message_text), extra={"role": "assistant"})
self.add_behavior(self.send(chat_history_message))
yield
@@ -441,6 +453,7 @@ class BDICoreAgent(BaseAgent):
trigger_name = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug("Started trigger %s", trigger_name)
experiment_logger.observation("Triggered: %s", trigger_name)
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,

View File

@@ -1,5 +1,6 @@
import asyncio
import json
import logging
import re
import uuid
from collections.abc import AsyncGenerator
@@ -14,6 +15,8 @@ from control_backend.core.config import settings
from ...schemas.llm_prompt_message import LLMPromptMessage
from .llm_instructions import LLMInstructions
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class LLMAgent(BaseAgent):
"""
@@ -170,7 +173,7 @@ class LLMAgent(BaseAgent):
*self.history,
]
message_id = str(uuid.uuid4()) # noqa
message_id = str(uuid.uuid4())
try:
full_message = ""
@@ -179,10 +182,9 @@ class LLMAgent(BaseAgent):
full_message += token
current_chunk += token
self.logger.llm(
"Received token: %s",
experiment_logger.chat(
full_message,
extra={"reference": message_id}, # Used in the UI to update old logs
extra={"role": "assistant", "reference": message_id, "partial": True},
)
# Stream the message in chunks separated by punctuation.
@@ -197,6 +199,11 @@ class LLMAgent(BaseAgent):
# Yield any remaining tail
if current_chunk:
yield current_chunk
experiment_logger.chat(
full_message,
extra={"role": "assistant", "reference": message_id, "partial": False},
)
except httpx.HTTPError as err:
self.logger.error("HTTP error.", exc_info=err)
yield "LLM service unavailable."
@@ -212,7 +219,7 @@ class LLMAgent(BaseAgent):
:yield: Raw text tokens (deltas) from the SSE stream.
:raises httpx.HTTPError: If the API returns a non-200 status.
"""
async with httpx.AsyncClient() as client:
async with httpx.AsyncClient(timeout=httpx.Timeout(20.0)) as client:
async with client.stream(
"POST",
settings.llm_settings.local_llm_url,

View File

@@ -1,4 +1,5 @@
import asyncio
import logging
import numpy as np
import zmq
@@ -10,6 +11,8 @@ from control_backend.core.config import settings
from .speech_recognizer import SpeechRecognizer
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class TranscriptionAgent(BaseAgent):
"""
@@ -25,6 +28,8 @@ class TranscriptionAgent(BaseAgent):
:ivar audio_in_socket: The ZMQ SUB socket instance.
:ivar speech_recognizer: The speech recognition engine instance.
:ivar _concurrency: Semaphore to limit concurrent transcriptions.
:ivar _current_speech_reference: The reference of the current user utterance, for synchronising
experiment logs.
"""
def __init__(self, audio_in_address: str):
@@ -39,6 +44,7 @@ class TranscriptionAgent(BaseAgent):
self.audio_in_socket: azmq.Socket | None = None
self.speech_recognizer = None
self._concurrency = None
self._current_speech_reference: str | None = None
async def setup(self):
"""
@@ -63,6 +69,10 @@ class TranscriptionAgent(BaseAgent):
self.logger.info("Finished setting up %s", self.name)
async def handle_message(self, msg: InternalMessage):
if msg.thread == "voice_activity":
self._current_speech_reference = msg.body
async def stop(self):
"""
Stop the agent and close sockets.
@@ -96,20 +106,21 @@ class TranscriptionAgent(BaseAgent):
async def _share_transcription(self, transcription: str):
"""
Share a transcription to the other agents that depend on it.
Share a transcription to the other agents that depend on it, and to experiment logs.
Currently sends to:
- :attr:`settings.agent_settings.text_belief_extractor_name`
- The UI via the experiment logger
:param transcription: The transcribed text.
"""
receiver_names = [
settings.agent_settings.text_belief_extractor_name,
]
experiment_logger.chat(
transcription,
extra={"role": "user", "reference": self._current_speech_reference, "partial": False},
)
for receiver_name in receiver_names:
message = InternalMessage(
to=receiver_name,
to=settings.agent_settings.text_belief_extractor_name,
sender=self.name,
body=transcription,
)
@@ -129,10 +140,9 @@ class TranscriptionAgent(BaseAgent):
audio = np.frombuffer(audio_data, dtype=np.float32)
speech = await self._transcribe(audio)
if not speech:
self.logger.info("Nothing transcribed.")
self.logger.debug("Nothing transcribed.")
continue
self.logger.info("Transcribed speech: %s", speech)
await self._share_transcription(speech)
except Exception as e:
self.logger.error(f"Error in transcription loop: {e}")

View File

@@ -1,4 +1,6 @@
import asyncio
import logging
import uuid
import numpy as np
import torch
@@ -12,6 +14,8 @@ from control_backend.schemas.internal_message import InternalMessage
from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus
from .transcription_agent.transcription_agent import TranscriptionAgent
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class SocketPoller[T]:
"""
@@ -252,6 +256,18 @@ class VADAgent(BaseAgent):
if prob > prob_threshold:
if self.i_since_speech > non_speech_patience + begin_silence_length:
self.logger.debug("Speech started.")
reference = str(uuid.uuid4())
experiment_logger.chat(
"...",
extra={"role": "user", "reference": reference, "partial": True},
)
await self.send(
InternalMessage(
to=settings.agent_settings.transcription_name,
body=reference,
thread="voice_activity",
)
)
self.audio_buffer = np.append(self.audio_buffer, chunk)
self.i_since_speech = 0
continue

View File

@@ -7,7 +7,6 @@ import numpy as np
import zmq
import zmq.asyncio as azmq
from pydantic_core import ValidationError
import struct
from control_backend.agents import BaseAgent
from control_backend.agents.perception.visual_emotion_recognition_agent.visual_emotion_recognizer import ( # noqa
@@ -126,7 +125,6 @@ class VisualEmotionRecognitionAgent(BaseAgent):
except zmq.Again:
self.logger.warning("No video frame received within timeout.")
async def update_emotions(self, prev_emotions: set[str], emotions: set[str]):
"""
Compare emotions from previous window and current emotions,

View File

@@ -1,4 +1,5 @@
import json
import logging
import zmq
from zmq.asyncio import Context
@@ -8,7 +9,7 @@ from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief, BeliefMessage
from control_backend.schemas.program import ConditionalNorm, Program
from control_backend.schemas.program import ConditionalNorm, Goal, Program
from control_backend.schemas.ri_message import (
GestureCommand,
PauseCommand,
@@ -16,6 +17,8 @@ from control_backend.schemas.ri_message import (
SpeechCommand,
)
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class UserInterruptAgent(BaseAgent):
"""
@@ -194,6 +197,7 @@ class UserInterruptAgent(BaseAgent):
case "transition_phase":
new_phase_id = msg.body
self.logger.info(f"Phase transition detected: {new_phase_id}")
experiment_logger.observation("Transitioned to next phase.")
payload = {"type": "phase_update", "id": new_phase_id}
@@ -246,6 +250,18 @@ class UserInterruptAgent(BaseAgent):
self._cond_norm_map = {}
self._cond_norm_reverse_map = {}
def _register_goal(goal: Goal):
"""Recursively register goals and their subgoals."""
slug = AgentSpeakGenerator.slugify(goal)
self._goal_map[str(goal.id)] = slug
self._goal_reverse_map[slug] = str(goal.id)
# Recursively check steps for subgoals
if goal.plan and goal.plan.steps:
for step in goal.plan.steps:
if isinstance(step, Goal):
_register_goal(step)
for phase in program.phases:
for trigger in phase.triggers:
slug = AgentSpeakGenerator.slugify(trigger)
@@ -253,8 +269,7 @@ class UserInterruptAgent(BaseAgent):
self._trigger_reverse_map[slug] = str(trigger.id)
for goal in phase.goals:
self._goal_map[str(goal.id)] = AgentSpeakGenerator.slugify(goal)
self._goal_reverse_map[AgentSpeakGenerator.slugify(goal)] = str(goal.id)
_register_goal(goal)
for goal, id in self._goal_reverse_map.items():
self.logger.debug(f"Goal mapping: UI ID {goal} -> {id}")
@@ -296,6 +311,7 @@ class UserInterruptAgent(BaseAgent):
:param text_to_say: The string that the robot has to say.
"""
experiment_logger.chat(text_to_say, extra={"role": "user"})
cmd = SpeechCommand(data=text_to_say, is_priority=True)
out_msg = InternalMessage(
to=settings.agent_settings.robot_speech_name,

View File

@@ -1,8 +1,9 @@
import logging
from pathlib import Path
import zmq
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from fastapi import APIRouter, HTTPException
from fastapi.responses import FileResponse, StreamingResponse
from zmq.asyncio import Context
from control_backend.core.config import settings
@@ -38,3 +39,29 @@ async def log_stream():
yield f"data: {message}\n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
LOGGING_DIR = Path(settings.logging_settings.experiment_log_directory).resolve()
@router.get("/logs/files")
@router.get("/api/logs/files")
async def log_directory():
"""
Get a list of all log files stored in the experiment log file directory.
"""
return [f.name for f in LOGGING_DIR.glob("*.log")]
@router.get("/logs/files/{filename}")
@router.get("/api/logs/files/{filename}")
async def log_file(filename: str):
# Prevent path-traversal
file_path = (LOGGING_DIR / filename).resolve() # This .resolve() is important
if not file_path.is_relative_to(LOGGING_DIR):
raise HTTPException(status_code=400, detail="Invalid filename.")
if not file_path.is_file():
raise HTTPException(status_code=404, detail="File not found.")
return FileResponse(file_path, filename=file_path.name)

View File

@@ -162,6 +162,20 @@ class SpeechModelSettings(BaseModel):
openai_model_name: str = "small.en"
class LoggingSettings(BaseModel):
"""
Configuration for logging.
:ivar logging_config_file: Path to the logging configuration file.
:ivar experiment_log_directory: Location of the experiment logs. Must match the logging config.
:ivar experiment_logger_name: Name of the experiment logger. Must match the logging config.
"""
logging_config_file: str = ".logging_config.yaml"
experiment_log_directory: str = "experiment_logs"
experiment_logger_name: str = "experiment"
class Settings(BaseSettings):
"""
Global application settings.
@@ -183,6 +197,8 @@ class Settings(BaseSettings):
ri_host: str = "localhost"
logging_settings: LoggingSettings = LoggingSettings()
zmq_settings: ZMQSettings = ZMQSettings()
agent_settings: AgentSettings = AgentSettings()

View File

@@ -1 +1,4 @@
from .dated_file_handler import DatedFileHandler as DatedFileHandler
from .optional_field_formatter import OptionalFieldFormatter as OptionalFieldFormatter
from .partial_filter import PartialFilter as PartialFilter
from .setup_logging import setup_logging as setup_logging

View File

@@ -0,0 +1,29 @@
from datetime import datetime
from logging import FileHandler
from pathlib import Path
class DatedFileHandler(FileHandler):
def __init__(self, file_prefix: str, **kwargs):
if not file_prefix:
raise ValueError("`file_prefix` argument cannot be empty.")
self._file_prefix = file_prefix
kwargs["filename"] = self._make_filename()
super().__init__(**kwargs)
def _make_filename(self) -> str:
filepath = Path(f"{self._file_prefix}-{datetime.now():%Y%m%d-%H%M%S}.log")
if not filepath.parent.is_dir():
filepath.parent.mkdir(parents=True, exist_ok=True)
return str(filepath)
def do_rollover(self):
self.acquire()
try:
if self.stream:
self.stream.close()
self.baseFilename = self._make_filename()
self.stream = self._open()
finally:
self.release()

View File

@@ -0,0 +1,67 @@
import logging
import re
class OptionalFieldFormatter(logging.Formatter):
"""
Logging formatter that supports optional fields marked by `?`.
Optional fields are denoted by placing a `?` after the field name inside
the parentheses, e.g., `%(role?)s`. If the field is not provided in the
log call's `extra` dict, it will use the default value from `defaults`
or `None` if no default is specified.
:param fmt: Format string with optional `%(name?)s` style fields.
:type fmt: str or None
:param datefmt: Date format string, passed to parent :class:`logging.Formatter`.
:type datefmt: str or None
:param style: Formatting style, must be '%'. Passed to parent.
:type style: str
:param defaults: Default values for optional fields when not provided.
:type defaults: dict or None
:example:
>>> formatter = OptionalFieldFormatter(
... fmt="%(asctime)s %(levelname)s %(role?)s %(message)s",
... defaults={"role": ""-""}
... )
>>> handler = logging.StreamHandler()
>>> handler.setFormatter(formatter)
>>> logger = logging.getLogger(__name__)
>>> logger.addHandler(handler)
>>>
>>> logger.chat("Hello there!", extra={"role": "USER"})
2025-01-15 10:30:00 CHAT USER Hello there!
>>>
>>> logger.info("A logging message")
2025-01-15 10:30:01 INFO - A logging message
.. note::
Only `%`-style formatting is supported. The `{` and `$` styles are not
compatible with this formatter.
.. seealso::
:class:`logging.Formatter` for base formatter documentation.
"""
# Match %(name?)s or %(name?)d etc.
OPTIONAL_PATTERN = re.compile(r"%\((\w+)\?\)([sdifFeEgGxXocrba%])")
def __init__(self, fmt=None, datefmt=None, style="%", defaults=None):
self.defaults = defaults or {}
self.optional_fields = set(self.OPTIONAL_PATTERN.findall(fmt or ""))
# Convert %(name?)s to %(name)s for standard formatting
normalized_fmt = self.OPTIONAL_PATTERN.sub(r"%(\1)\2", fmt or "")
super().__init__(normalized_fmt, datefmt, style)
def format(self, record):
for field, _ in self.optional_fields:
if not hasattr(record, field):
default = self.defaults.get(field, None)
setattr(record, field, default)
return super().format(record)

View File

@@ -0,0 +1,10 @@
import logging
class PartialFilter(logging.Filter):
"""
Class to filter any log records that have the "partial" attribute set to ``True``.
"""
def filter(self, record):
return getattr(record, "partial", False) is not True

View File

@@ -37,7 +37,7 @@ def add_logging_level(level_name: str, level_num: int, method_name: str | None =
setattr(logging, method_name, log_to_root)
def setup_logging(path: str = ".logging_config.yaml") -> None:
def setup_logging(path: str = settings.logging_settings.logging_config_file) -> None:
"""
Setup logging configuration of the CB. Tries to load the logging configuration from a file,
in which we specify custom loggers, formatters, handlers, etc.
@@ -65,7 +65,7 @@ def setup_logging(path: str = ".logging_config.yaml") -> None:
# Patch ZMQ PUBHandler to know about custom levels
if custom_levels:
for logger_name in ("control_backend",):
for logger_name in config.get("loggers", {}):
logger = logging.getLogger(logger_name)
for handler in logger.handlers:
if isinstance(handler, PUBHandler):

View File

@@ -1,7 +1,7 @@
from unittest.mock import patch
from unittest.mock import MagicMock, patch
import pytest
from fastapi import FastAPI
from fastapi import FastAPI, HTTPException
from fastapi.testclient import TestClient
from starlette.responses import StreamingResponse
@@ -61,3 +61,67 @@ async def test_log_stream_endpoint_lines(client):
# Optional: assert subscribe/connect were called
assert dummy_socket.subscribed # at least some log levels subscribed
assert dummy_socket.connected # connect was called
@patch("control_backend.api.v1.endpoints.logs.LOGGING_DIR")
def test_files_endpoint(LOGGING_DIR, client):
file_1, file_2 = MagicMock(), MagicMock()
file_1.name = "file_1"
file_2.name = "file_2"
LOGGING_DIR.glob.return_value = [file_1, file_2]
result = client.get("/api/logs/files")
assert result.status_code == 200
assert result.json() == ["file_1", "file_2"]
@patch("control_backend.api.v1.endpoints.logs.FileResponse")
@patch("control_backend.api.v1.endpoints.logs.LOGGING_DIR")
def test_log_file_endpoint_success(LOGGING_DIR, MockFileResponse, client):
mock_file_path = MagicMock()
mock_file_path.is_relative_to.return_value = True
mock_file_path.is_file.return_value = True
mock_file_path.name = "test.log"
LOGGING_DIR.__truediv__ = MagicMock(return_value=mock_file_path)
mock_file_path.resolve.return_value = mock_file_path
MockFileResponse.return_value = MagicMock()
result = client.get("/api/logs/files/test.log")
assert result.status_code == 200
MockFileResponse.assert_called_once_with(mock_file_path, filename="test.log")
@pytest.mark.asyncio
@patch("control_backend.api.v1.endpoints.logs.LOGGING_DIR")
async def test_log_file_endpoint_path_traversal(LOGGING_DIR):
from control_backend.api.v1.endpoints.logs import log_file
mock_file_path = MagicMock()
mock_file_path.is_relative_to.return_value = False
LOGGING_DIR.__truediv__ = MagicMock(return_value=mock_file_path)
mock_file_path.resolve.return_value = mock_file_path
with pytest.raises(HTTPException) as exc_info:
await log_file("../secret.txt")
assert exc_info.value.status_code == 400
assert exc_info.value.detail == "Invalid filename."
@patch("control_backend.api.v1.endpoints.logs.LOGGING_DIR")
def test_log_file_endpoint_file_not_found(LOGGING_DIR, client):
mock_file_path = MagicMock()
mock_file_path.is_relative_to.return_value = True
mock_file_path.is_file.return_value = False
LOGGING_DIR.__truediv__ = MagicMock(return_value=mock_file_path)
mock_file_path.resolve.return_value = mock_file_path
result = client.get("/api/logs/files/nonexistent.log")
assert result.status_code == 404
assert result.json()["detail"] == "File not found."

View File

@@ -0,0 +1,45 @@
from unittest.mock import MagicMock, patch
import pytest
from control_backend.logging.dated_file_handler import DatedFileHandler
@patch("control_backend.logging.dated_file_handler.DatedFileHandler._open")
def test_reset(open_):
stream = MagicMock()
open_.return_value = stream
# A file should be opened when the logger is created
handler = DatedFileHandler(file_prefix="anything")
assert open_.call_count == 1
# Upon reset, the current file should be closed, and a new one should be opened
handler.do_rollover()
assert stream.close.call_count == 1
assert open_.call_count == 2
@patch("control_backend.logging.dated_file_handler.Path")
@patch("control_backend.logging.dated_file_handler.DatedFileHandler._open")
def test_creates_dir(open_, Path_):
stream = MagicMock()
open_.return_value = stream
test_path = MagicMock()
test_path.parent.is_dir.return_value = False
Path_.return_value = test_path
DatedFileHandler(file_prefix="anything")
# The directory should've been created
test_path.parent.mkdir.assert_called_once()
@patch("control_backend.logging.dated_file_handler.DatedFileHandler._open")
def test_invalid_constructor(_):
with pytest.raises(ValueError):
DatedFileHandler(file_prefix=None)
with pytest.raises(ValueError):
DatedFileHandler(file_prefix="")

View File

@@ -0,0 +1,218 @@
import logging
import pytest
from control_backend.logging.optional_field_formatter import OptionalFieldFormatter
@pytest.fixture
def logger():
"""Create a fresh logger for each test."""
logger = logging.getLogger(f"test_{id(object())}")
logger.setLevel(logging.DEBUG)
logger.handlers = []
return logger
@pytest.fixture
def log_output(logger):
"""Capture log output and return a function to get it."""
class ListHandler(logging.Handler):
def __init__(self):
super().__init__()
self.records = []
def emit(self, record):
self.records.append(self.format(record))
handler = ListHandler()
logger.addHandler(handler)
def get_output():
return handler.records
return get_output
def test_optional_field_present(logger, log_output):
"""Optional field should appear when provided in extra."""
formatter = OptionalFieldFormatter("%(levelname)s - %(role?)s - %(message)s")
logger.handlers[0].setFormatter(formatter)
logger.info("test message", extra={"role": "user"})
assert log_output() == ["INFO - user - test message"]
def test_optional_field_missing_no_default(logger, log_output):
"""Missing optional field with no default should be None."""
formatter = OptionalFieldFormatter("%(levelname)s - %(role?)s - %(message)s")
logger.handlers[0].setFormatter(formatter)
logger.info("test message")
assert log_output() == ["INFO - None - test message"]
def test_optional_field_missing_with_default(logger, log_output):
"""Missing optional field should use provided default."""
formatter = OptionalFieldFormatter(
"%(levelname)s - %(role?)s - %(message)s", defaults={"role": "assistant"}
)
logger.handlers[0].setFormatter(formatter)
logger.info("test message")
assert log_output() == ["INFO - assistant - test message"]
def test_optional_field_overrides_default(logger, log_output):
"""Provided extra value should override default."""
formatter = OptionalFieldFormatter(
"%(levelname)s - %(role?)s - %(message)s", defaults={"role": "assistant"}
)
logger.handlers[0].setFormatter(formatter)
logger.info("test message", extra={"role": "user"})
assert log_output() == ["INFO - user - test message"]
def test_multiple_optional_fields(logger, log_output):
"""Multiple optional fields should work independently."""
formatter = OptionalFieldFormatter(
"%(levelname)s - %(role?)s - %(request_id?)s - %(message)s", defaults={"role": "assistant"}
)
logger.handlers[0].setFormatter(formatter)
logger.info("test", extra={"request_id": "123"})
assert log_output() == ["INFO - assistant - 123 - test"]
def test_mixed_optional_and_required_fields(logger, log_output):
"""Standard fields should work alongside optional fields."""
formatter = OptionalFieldFormatter("%(levelname)s %(name)s %(role?)s %(message)s")
logger.handlers[0].setFormatter(formatter)
logger.info("test", extra={"role": "user"})
output = log_output()[0]
assert "INFO" in output
assert "user" in output
assert "test" in output
def test_no_optional_fields(logger, log_output):
"""Formatter should work normally with no optional fields."""
formatter = OptionalFieldFormatter("%(levelname)s %(message)s")
logger.handlers[0].setFormatter(formatter)
logger.info("test message")
assert log_output() == ["INFO test message"]
def test_integer_format_specifier(logger, log_output):
"""Optional fields with %d specifier should work."""
formatter = OptionalFieldFormatter(
"%(levelname)s %(count?)d %(message)s", defaults={"count": 0}
)
logger.handlers[0].setFormatter(formatter)
logger.info("test", extra={"count": 42})
assert log_output() == ["INFO 42 test"]
def test_float_format_specifier(logger, log_output):
"""Optional fields with %f specifier should work."""
formatter = OptionalFieldFormatter(
"%(levelname)s %(duration?)f %(message)s", defaults={"duration": 0.0}
)
logger.handlers[0].setFormatter(formatter)
logger.info("test", extra={"duration": 1.5})
assert "1.5" in log_output()[0]
def test_empty_string_default(logger, log_output):
"""Empty string default should work."""
formatter = OptionalFieldFormatter("%(levelname)s %(role?)s %(message)s", defaults={"role": ""})
logger.handlers[0].setFormatter(formatter)
logger.info("test")
assert log_output() == ["INFO test"]
def test_none_format_string():
"""None format string should not raise."""
formatter = OptionalFieldFormatter(fmt=None)
assert formatter.optional_fields == set()
def test_optional_fields_parsed_correctly():
"""Check that optional fields are correctly identified."""
formatter = OptionalFieldFormatter("%(asctime)s %(role?)s %(level?)d %(name)s")
assert formatter.optional_fields == {("role", "s"), ("level", "d")}
def test_format_string_normalized():
"""Check that ? is removed from format string."""
formatter = OptionalFieldFormatter("%(role?)s %(message)s")
assert "?" not in formatter._style._fmt
assert "%(role)s" in formatter._style._fmt
def test_field_with_underscore(logger, log_output):
"""Field names with underscores should work."""
formatter = OptionalFieldFormatter("%(levelname)s %(user_id?)s %(message)s")
logger.handlers[0].setFormatter(formatter)
logger.info("test", extra={"user_id": "abc123"})
assert log_output() == ["INFO abc123 test"]
def test_field_with_numbers(logger, log_output):
"""Field names with numbers should work."""
formatter = OptionalFieldFormatter("%(levelname)s %(field2?)s %(message)s")
logger.handlers[0].setFormatter(formatter)
logger.info("test", extra={"field2": "value"})
assert log_output() == ["INFO value test"]
def test_multiple_log_calls(logger, log_output):
"""Formatter should work correctly across multiple log calls."""
formatter = OptionalFieldFormatter(
"%(levelname)s %(role?)s %(message)s", defaults={"role": "other"}
)
logger.handlers[0].setFormatter(formatter)
logger.info("first", extra={"role": "assistant"})
logger.info("second")
logger.info("third", extra={"role": "user"})
assert log_output() == [
"INFO assistant first",
"INFO other second",
"INFO user third",
]
def test_default_not_mutated(logger, log_output):
"""Original defaults dict should not be mutated."""
defaults = {"role": "other"}
formatter = OptionalFieldFormatter("%(levelname)s %(role?)s %(message)s", defaults=defaults)
logger.handlers[0].setFormatter(formatter)
logger.info("test")
assert defaults == {"role": "other"}

View File

@@ -0,0 +1,83 @@
import logging
import pytest
from control_backend.logging import PartialFilter
@pytest.fixture
def logger():
"""Create a fresh logger for each test."""
logger = logging.getLogger(f"test_{id(object())}")
logger.setLevel(logging.DEBUG)
logger.handlers = []
return logger
@pytest.fixture
def log_output(logger):
"""Capture log output and return a function to get it."""
class ListHandler(logging.Handler):
def __init__(self):
super().__init__()
self.records = []
def emit(self, record):
self.records.append(self.format(record))
handler = ListHandler()
handler.addFilter(PartialFilter())
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)
return lambda: handler.records
def test_no_partial_attribute(logger, log_output):
"""Records without partial attribute should pass through."""
logger.info("normal message")
assert log_output() == ["normal message"]
def test_partial_true_filtered(logger, log_output):
"""Records with partial=True should be filtered out."""
logger.info("partial message", extra={"partial": True})
assert log_output() == []
def test_partial_false_passes(logger, log_output):
"""Records with partial=False should pass through."""
logger.info("complete message", extra={"partial": False})
assert log_output() == ["complete message"]
def test_partial_none_passes(logger, log_output):
"""Records with partial=None should pass through."""
logger.info("message", extra={"partial": None})
assert log_output() == ["message"]
def test_partial_truthy_value_passes(logger, log_output):
"""
Records with truthy but non-True partial should pass through, that is, only when it's exactly
``True`` should it pass.
"""
logger.info("message", extra={"partial": "yes"})
assert log_output() == ["message"]
def test_multiple_records_mixed(logger, log_output):
"""Filter should handle mixed records correctly."""
logger.info("first")
logger.info("second", extra={"partial": True})
logger.info("third", extra={"partial": False})
logger.info("fourth", extra={"partial": True})
logger.info("fifth")
assert log_output() == ["first", "third", "fifth"]