Add useful experiment logs #48

Merged
0950726 merged 4 commits from feat/use-experiment-logs into dev 2026-01-27 17:45:42 +00:00
13 changed files with 143 additions and 111 deletions

View File

@@ -1,4 +1,5 @@
import json import json
import logging
import zmq import zmq
import zmq.asyncio as azmq import zmq.asyncio as azmq
@@ -8,6 +9,8 @@ from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint from control_backend.schemas.ri_message import GestureCommand, RIEndpoint
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class RobotGestureAgent(BaseAgent): class RobotGestureAgent(BaseAgent):
""" """
@@ -111,6 +114,7 @@ class RobotGestureAgent(BaseAgent):
gesture_command.data, gesture_command.data,
) )
return return
experiment_logger.action("Gesture: %s", gesture_command.data)
await self.pubsocket.send_json(gesture_command.model_dump()) await self.pubsocket.send_json(gesture_command.model_dump())
except Exception: except Exception:
self.logger.exception("Error processing internal message.") self.logger.exception("Error processing internal message.")

View File

@@ -1,6 +1,7 @@
import asyncio import asyncio
import copy import copy
import json import json
import logging
import time import time
from collections.abc import Iterable from collections.abc import Iterable
@@ -19,6 +20,9 @@ from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, Speec
DELIMITER = ";\n" # TODO: temporary until we support lists in AgentSpeak DELIMITER = ";\n" # TODO: temporary until we support lists in AgentSpeak
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class BDICoreAgent(BaseAgent): class BDICoreAgent(BaseAgent):
""" """
BDI Core Agent. BDI Core Agent.
@@ -207,6 +211,9 @@ class BDICoreAgent(BaseAgent):
else: else:
term = agentspeak.Literal(name) term = agentspeak.Literal(name)
if name != "user_said":
experiment_logger.observation(f"Formed new belief: {name}{f'={args}' if args else ''}")
self.bdi_agent.call( self.bdi_agent.call(
agentspeak.Trigger.addition, agentspeak.Trigger.addition,
agentspeak.GoalType.belief, agentspeak.GoalType.belief,
@@ -244,6 +251,9 @@ class BDICoreAgent(BaseAgent):
new_args = (agentspeak.Literal(arg) for arg in args) new_args = (agentspeak.Literal(arg) for arg in args)
term = agentspeak.Literal(name, new_args) term = agentspeak.Literal(name, new_args)
if name != "user_said":
experiment_logger.observation(f"Removed belief: {name}{f'={args}' if args else ''}")
result = self.bdi_agent.call( result = self.bdi_agent.call(
agentspeak.Trigger.removal, agentspeak.Trigger.removal,
agentspeak.GoalType.belief, agentspeak.GoalType.belief,
@@ -386,6 +396,8 @@ class BDICoreAgent(BaseAgent):
body=str(message_text), body=str(message_text),
) )
experiment_logger.chat(str(message_text), extra={"role": "assistant"})
self.add_behavior(self.send(chat_history_message)) self.add_behavior(self.send(chat_history_message))
yield yield

View File

@@ -1,10 +1,12 @@
import asyncio import asyncio
import json import json
import logging
import zmq import zmq
from pydantic import ValidationError from pydantic import ValidationError
from zmq.asyncio import Context from zmq.asyncio import Context
import control_backend
from control_backend.agents import BaseAgent from control_backend.agents import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.config import settings from control_backend.core.config import settings
@@ -19,6 +21,8 @@ from control_backend.schemas.program import (
Program, Program,
) )
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class BDIProgramManager(BaseAgent): class BDIProgramManager(BaseAgent):
""" """
@@ -277,6 +281,18 @@ class BDIProgramManager(BaseAgent):
await self.send(extractor_msg) await self.send(extractor_msg)
self.logger.debug("Sent message to extractor agent to clear history.") self.logger.debug("Sent message to extractor agent to clear history.")
@staticmethod
def _rollover_experiment_logs():
"""
A new experiment program started; make a new experiment log file.
"""
handlers = logging.getLogger(settings.logging_settings.experiment_logger_name).handlers
for handler in handlers:
if isinstance(handler, control_backend.logging.DatedFileHandler):
experiment_logger.action("Doing rollover...")
handler.do_rollover()
experiment_logger.debug("Finished rollover.")
async def _receive_programs(self): async def _receive_programs(self):
""" """
Continuous loop that receives program updates from the HTTP endpoint. Continuous loop that receives program updates from the HTTP endpoint.
@@ -297,6 +313,7 @@ class BDIProgramManager(BaseAgent):
self._initialize_internal_state(program) self._initialize_internal_state(program)
await self._send_program_to_user_interrupt(program) await self._send_program_to_user_interrupt(program)
await self._send_clear_llm_history() await self._send_clear_llm_history()
self._rollover_experiment_logs()
await asyncio.gather( await asyncio.gather(
self._create_agentspeak_and_send_to_bdi(program), self._create_agentspeak_and_send_to_bdi(program),

View File

@@ -1,5 +1,6 @@
import asyncio import asyncio
import json import json
import logging
import re import re
import uuid import uuid
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
@@ -14,6 +15,8 @@ from control_backend.core.config import settings
from ...schemas.llm_prompt_message import LLMPromptMessage from ...schemas.llm_prompt_message import LLMPromptMessage
from .llm_instructions import LLMInstructions from .llm_instructions import LLMInstructions
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class LLMAgent(BaseAgent): class LLMAgent(BaseAgent):
""" """
@@ -170,7 +173,7 @@ class LLMAgent(BaseAgent):
*self.history, *self.history,
] ]
message_id = str(uuid.uuid4()) # noqa message_id = str(uuid.uuid4())
try: try:
full_message = "" full_message = ""
@@ -179,10 +182,9 @@ class LLMAgent(BaseAgent):
full_message += token full_message += token
current_chunk += token current_chunk += token
self.logger.llm( experiment_logger.chat(
"Received token: %s",
full_message, full_message,
extra={"reference": message_id}, # Used in the UI to update old logs extra={"role": "assistant", "reference": message_id, "partial": True},
) )
# Stream the message in chunks separated by punctuation. # Stream the message in chunks separated by punctuation.
@@ -197,6 +199,11 @@ class LLMAgent(BaseAgent):
# Yield any remaining tail # Yield any remaining tail
if current_chunk: if current_chunk:
yield current_chunk yield current_chunk
experiment_logger.chat(
full_message,
extra={"role": "assistant", "reference": message_id, "partial": False},
)
except httpx.HTTPError as err: except httpx.HTTPError as err:
self.logger.error("HTTP error.", exc_info=err) self.logger.error("HTTP error.", exc_info=err)
yield "LLM service unavailable." yield "LLM service unavailable."

View File

@@ -1,4 +1,5 @@
import asyncio import asyncio
import logging
import numpy as np import numpy as np
import zmq import zmq
@@ -10,6 +11,8 @@ from control_backend.core.config import settings
from .speech_recognizer import SpeechRecognizer from .speech_recognizer import SpeechRecognizer
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class TranscriptionAgent(BaseAgent): class TranscriptionAgent(BaseAgent):
""" """
@@ -25,6 +28,8 @@ class TranscriptionAgent(BaseAgent):
:ivar audio_in_socket: The ZMQ SUB socket instance. :ivar audio_in_socket: The ZMQ SUB socket instance.
:ivar speech_recognizer: The speech recognition engine instance. :ivar speech_recognizer: The speech recognition engine instance.
:ivar _concurrency: Semaphore to limit concurrent transcriptions. :ivar _concurrency: Semaphore to limit concurrent transcriptions.
:ivar _current_speech_reference: The reference of the current user utterance, for synchronising
experiment logs.
""" """
def __init__(self, audio_in_address: str): def __init__(self, audio_in_address: str):
@@ -39,6 +44,7 @@ class TranscriptionAgent(BaseAgent):
self.audio_in_socket: azmq.Socket | None = None self.audio_in_socket: azmq.Socket | None = None
self.speech_recognizer = None self.speech_recognizer = None
self._concurrency = None self._concurrency = None
self._current_speech_reference: str | None = None
async def setup(self): async def setup(self):
""" """
@@ -63,6 +69,10 @@ class TranscriptionAgent(BaseAgent):
self.logger.info("Finished setting up %s", self.name) self.logger.info("Finished setting up %s", self.name)
async def handle_message(self, msg: InternalMessage):
if msg.thread == "voice_activity":
self._current_speech_reference = msg.body
async def stop(self): async def stop(self):
""" """
Stop the agent and close sockets. Stop the agent and close sockets.
@@ -96,24 +106,25 @@ class TranscriptionAgent(BaseAgent):
async def _share_transcription(self, transcription: str): async def _share_transcription(self, transcription: str):
""" """
Share a transcription to the other agents that depend on it. Share a transcription to the other agents that depend on it, and to experiment logs.
Currently sends to: Currently sends to:
- :attr:`settings.agent_settings.text_belief_extractor_name` - :attr:`settings.agent_settings.text_belief_extractor_name`
- The UI via the experiment logger
:param transcription: The transcribed text. :param transcription: The transcribed text.
""" """
receiver_names = [ experiment_logger.chat(
settings.agent_settings.text_belief_extractor_name, transcription,
] extra={"role": "user", "reference": self._current_speech_reference, "partial": False},
)
for receiver_name in receiver_names: message = InternalMessage(
message = InternalMessage( to=settings.agent_settings.text_belief_extractor_name,
to=receiver_name, sender=self.name,
sender=self.name, body=transcription,
body=transcription, )
) await self.send(message)
await self.send(message)
async def _transcribing_loop(self) -> None: async def _transcribing_loop(self) -> None:
""" """
@@ -129,10 +140,9 @@ class TranscriptionAgent(BaseAgent):
audio = np.frombuffer(audio_data, dtype=np.float32) audio = np.frombuffer(audio_data, dtype=np.float32)
speech = await self._transcribe(audio) speech = await self._transcribe(audio)
if not speech: if not speech:
self.logger.info("Nothing transcribed.") self.logger.debug("Nothing transcribed.")
continue continue
self.logger.info("Transcribed speech: %s", speech)
await self._share_transcription(speech) await self._share_transcription(speech)
except Exception as e: except Exception as e:
self.logger.error(f"Error in transcription loop: {e}") self.logger.error(f"Error in transcription loop: {e}")

View File

@@ -1,4 +1,6 @@
import asyncio import asyncio
import logging
import uuid
import numpy as np import numpy as np
import torch import torch
@@ -12,6 +14,8 @@ from control_backend.schemas.internal_message import InternalMessage
from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus
from .transcription_agent.transcription_agent import TranscriptionAgent from .transcription_agent.transcription_agent import TranscriptionAgent
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class SocketPoller[T]: class SocketPoller[T]:
""" """
@@ -252,6 +256,18 @@ class VADAgent(BaseAgent):
if prob > prob_threshold: if prob > prob_threshold:
if self.i_since_speech > non_speech_patience + begin_silence_length: if self.i_since_speech > non_speech_patience + begin_silence_length:
self.logger.debug("Speech started.") self.logger.debug("Speech started.")
reference = str(uuid.uuid4())
experiment_logger.chat(
"...",
extra={"role": "user", "reference": reference, "partial": True},
)
await self.send(
InternalMessage(
to=settings.agent_settings.transcription_name,
body=reference,
thread="voice_activity",
)
)
self.audio_buffer = np.append(self.audio_buffer, chunk) self.audio_buffer = np.append(self.audio_buffer, chunk)
self.i_since_speech = 0 self.i_since_speech = 0
continue continue
@@ -269,9 +285,10 @@ class VADAgent(BaseAgent):
assert self.audio_out_socket is not None assert self.audio_out_socket is not None
await self.audio_out_socket.send(self.audio_buffer[: -2 * len(chunk)].tobytes()) await self.audio_out_socket.send(self.audio_buffer[: -2 * len(chunk)].tobytes())
# At this point, we know that the speech has ended. # At this point, we know that there is no speech.
# Prepend the last chunk that had no speech, for a more fluent boundary # Prepend the last few chunks that had no speech, for a more fluent boundary.
self.audio_buffer = chunk self.audio_buffer = np.append(self.audio_buffer, chunk)
self.audio_buffer = self.audio_buffer[-begin_silence_length * len(chunk) :]
async def handle_message(self, msg: InternalMessage): async def handle_message(self, msg: InternalMessage):
""" """

View File

@@ -1,4 +1,5 @@
import json import json
import logging
import zmq import zmq
from zmq.asyncio import Context from zmq.asyncio import Context
@@ -16,6 +17,8 @@ from control_backend.schemas.ri_message import (
SpeechCommand, SpeechCommand,
) )
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class UserInterruptAgent(BaseAgent): class UserInterruptAgent(BaseAgent):
""" """
@@ -305,6 +308,7 @@ class UserInterruptAgent(BaseAgent):
:param text_to_say: The string that the robot has to say. :param text_to_say: The string that the robot has to say.
""" """
experiment_logger.chat(text_to_say, extra={"role": "assistant"})
cmd = SpeechCommand(data=text_to_say, is_priority=True) cmd = SpeechCommand(data=text_to_say, is_priority=True)
out_msg = InternalMessage( out_msg = InternalMessage(
to=settings.agent_settings.robot_speech_name, to=settings.agent_settings.robot_speech_name,

View File

@@ -1,5 +1,5 @@
import json import json
from unittest.mock import AsyncMock, MagicMock from unittest.mock import AsyncMock, MagicMock, patch
import pytest import pytest
import zmq import zmq
@@ -19,6 +19,12 @@ def zmq_context(mocker):
return mock_context return mock_context
@pytest.fixture(autouse=True)
def mock_experiment_logger():
with patch("control_backend.agents.actuation.robot_gesture_agent.experiment_logger") as logger:
yield logger
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_setup_bind(zmq_context, mocker): async def test_setup_bind(zmq_context, mocker):
"""Setup binds and subscribes to internal commands.""" """Setup binds and subscribes to internal commands."""

View File

@@ -26,6 +26,12 @@ def agent():
return agent return agent
@pytest.fixture(autouse=True)
def mock_experiment_logger():
with patch("control_backend.agents.bdi.bdi_core_agent.experiment_logger") as logger:
yield logger
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_setup_loads_asl(mock_agentspeak_env, agent): async def test_setup_loads_asl(mock_agentspeak_env, agent):
# Mock file opening # Mock file opening

View File

@@ -18,6 +18,12 @@ def mock_httpx_client():
yield mock_client yield mock_client
@pytest.fixture(autouse=True)
def mock_experiment_logger():
with patch("control_backend.agents.llm.llm_agent.experiment_logger") as logger:
yield logger
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_llm_processing_success(mock_httpx_client, mock_settings): async def test_llm_processing_success(mock_httpx_client, mock_settings):
# Setup the mock response for the stream # Setup the mock response for the stream

View File

@@ -14,6 +14,15 @@ from control_backend.agents.perception.transcription_agent.transcription_agent i
) )
@pytest.fixture(autouse=True)
def mock_experiment_logger():
with patch(
"control_backend.agents.perception"
".transcription_agent.transcription_agent.experiment_logger"
) as logger:
yield logger
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_transcription_agent_flow(mock_zmq_context): async def test_transcription_agent_flow(mock_zmq_context):
mock_sub = MagicMock() mock_sub = MagicMock()

View File

@@ -24,7 +24,9 @@ def audio_out_socket():
@pytest.fixture @pytest.fixture
def vad_agent(audio_out_socket): def vad_agent(audio_out_socket):
return VADAgent("tcp://localhost:5555", False) agent = VADAgent("tcp://localhost:5555", False)
agent._internal_pub_socket = AsyncMock()
return agent
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@@ -44,6 +46,12 @@ def patch_settings(monkeypatch):
monkeypatch.setattr(vad_agent.settings.vad_settings, "sample_rate_hz", 16_000, raising=False) monkeypatch.setattr(vad_agent.settings.vad_settings, "sample_rate_hz", 16_000, raising=False)
@pytest.fixture(autouse=True)
def mock_experiment_logger():
with patch("control_backend.agents.perception.vad_agent.experiment_logger") as logger:
yield logger
async def simulate_streaming_with_probabilities(streaming, probabilities: list[float]): async def simulate_streaming_with_probabilities(streaming, probabilities: list[float]):
""" """
Simulates a streaming scenario with given VAD model probabilities for testing purposes. Simulates a streaming scenario with given VAD model probabilities for testing purposes.
@@ -84,14 +92,15 @@ async def test_voice_activity_detected(audio_out_socket, vad_agent):
Test a scenario where there is voice activity detected between silences. Test a scenario where there is voice activity detected between silences.
""" """
speech_chunk_count = 5 speech_chunk_count = 5
probabilities = [0.0] * 5 + [1.0] * speech_chunk_count + [0.0] * 5 begin_silence_chunks = settings.behaviour_settings.vad_begin_silence_chunks
probabilities = [0.0] * 15 + [1.0] * speech_chunk_count + [0.0] * 5
vad_agent.audio_out_socket = audio_out_socket vad_agent.audio_out_socket = audio_out_socket
await simulate_streaming_with_probabilities(vad_agent, probabilities) await simulate_streaming_with_probabilities(vad_agent, probabilities)
audio_out_socket.send.assert_called_once() audio_out_socket.send.assert_called_once()
data = audio_out_socket.send.call_args[0][0] data = audio_out_socket.send.call_args[0][0]
assert isinstance(data, bytes) assert isinstance(data, bytes)
assert len(data) == 512 * 4 * (speech_chunk_count + 1) assert len(data) == 512 * 4 * (begin_silence_chunks + speech_chunk_count)
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -101,8 +110,9 @@ async def test_voice_activity_short_pause(audio_out_socket, vad_agent):
short pause. short pause.
""" """
speech_chunk_count = 5 speech_chunk_count = 5
begin_silence_chunks = settings.behaviour_settings.vad_begin_silence_chunks
probabilities = ( probabilities = (
[0.0] * 5 + [1.0] * speech_chunk_count + [0.0] + [1.0] * speech_chunk_count + [0.0] * 5 [0.0] * 15 + [1.0] * speech_chunk_count + [0.0] + [1.0] * speech_chunk_count + [0.0] * 5
) )
vad_agent.audio_out_socket = audio_out_socket vad_agent.audio_out_socket = audio_out_socket
await simulate_streaming_with_probabilities(vad_agent, probabilities) await simulate_streaming_with_probabilities(vad_agent, probabilities)
@@ -110,8 +120,8 @@ async def test_voice_activity_short_pause(audio_out_socket, vad_agent):
audio_out_socket.send.assert_called_once() audio_out_socket.send.assert_called_once()
data = audio_out_socket.send.call_args[0][0] data = audio_out_socket.send.call_args[0][0]
assert isinstance(data, bytes) assert isinstance(data, bytes)
# Expecting 13 chunks (2*5 with speech, 1 pause between, 1 as padding) # Expecting 13 chunks (2*5 with speech, 1 pause between, begin_silence_chunks as padding)
assert len(data) == 512 * 4 * (speech_chunk_count * 2 + 1 + 1) assert len(data) == 512 * 4 * (speech_chunk_count * 2 + 1 + begin_silence_chunks)
@pytest.mark.asyncio @pytest.mark.asyncio

View File

@@ -30,6 +30,14 @@ def agent():
return agent return agent
@pytest.fixture(autouse=True)
def mock_experiment_logger():
with patch(
"control_backend.agents.user_interrupt.user_interrupt_agent.experiment_logger"
) as logger:
yield logger
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_send_to_speech_agent(agent): async def test_send_to_speech_agent(agent):
"""Verify speech command format.""" """Verify speech command format."""
@@ -579,30 +587,6 @@ async def test_create_mapping_recursive_goals(agent):
assert agent._goal_reverse_map["child_goal"] == str(child_goal_id) assert agent._goal_reverse_map["child_goal"] == str(child_goal_id)
@pytest.mark.asyncio
async def test_setup(agent):
"""Test the setup method initializes sockets correctly."""
with patch("control_backend.agents.user_interrupt.user_interrupt_agent.Context") as MockContext:
mock_ctx_instance = MagicMock()
MockContext.instance.return_value = mock_ctx_instance
mock_sub = MagicMock()
mock_pub = MagicMock()
mock_ctx_instance.socket.side_effect = [mock_sub, mock_pub]
# MOCK add_behavior so we don't rely on internal attributes
agent.add_behavior = MagicMock()
await agent.setup()
# Check sockets
mock_sub.connect.assert_called_with(settings.zmq_settings.internal_sub_address)
mock_pub.connect.assert_called_with(settings.zmq_settings.internal_pub_address)
# Verify add_behavior was called
agent.add_behavior.assert_called_once()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_receive_loop_advanced_scenarios(agent): async def test_receive_loop_advanced_scenarios(agent):
""" """
@@ -706,63 +690,3 @@ async def test_receive_loop_advanced_scenarios(agent):
# Next Phase # Next Phase
agent._send_experiment_control_to_bdi_core.assert_awaited_with("next_phase") agent._send_experiment_control_to_bdi_core.assert_awaited_with("next_phase")
@pytest.mark.asyncio
async def test_handle_message_unknown_thread(agent):
"""Test handling of an unknown message thread (lines 213-214)."""
msg = InternalMessage(to="me", thread="unknown_thread", body="test")
await agent.handle_message(msg)
agent.logger.debug.assert_called_with(
"Received internal message on unhandled thread: unknown_thread"
)
@pytest.mark.asyncio
async def test_send_to_bdi_belief_edge_cases(agent):
"""
Covers:
- Unknown asl_type warning (lines 326-328)
- unachieve=True logic (lines 334-337)
"""
# 1. Unknown Type
await agent._send_to_bdi_belief("slug", "unknown_type")
agent.logger.warning.assert_called_with("Tried to send belief with unknown type")
agent.send.assert_not_called()
# Reset mock for part 2
agent.send.reset_mock()
# 2. Unachieve = True
await agent._send_to_bdi_belief("slug", "cond_norm", unachieve=True)
agent.send.assert_awaited()
sent_msg = agent.send.call_args.args[0]
# Verify it is a delete operation
body_obj = BeliefMessage.model_validate_json(sent_msg.body)
# Verify 'delete' has content
assert body_obj.delete is not None
assert len(body_obj.delete) == 1
assert body_obj.delete[0].name == "force_slug"
# Verify 'create' is empty (handling both None and [])
assert not body_obj.create
@pytest.mark.asyncio
async def test_send_experiment_control_unknown(agent):
"""Test sending an unknown experiment control type (lines 366-367)."""
await agent._send_experiment_control_to_bdi_core("invalid_command")
agent.logger.warning.assert_called_with(
"Received unknown experiment control type '%s' to send to BDI Core.", "invalid_command"
)
# Ensure it still sends an empty message (as per code logic, though thread is empty)
agent.send.assert_awaited()
msg = agent.send.call_args[0][0]
assert msg.thread == ""