diff --git a/src/control_backend/agents/actuation/robot_gesture_agent.py b/src/control_backend/agents/actuation/robot_gesture_agent.py index 997b684..9567940 100644 --- a/src/control_backend/agents/actuation/robot_gesture_agent.py +++ b/src/control_backend/agents/actuation/robot_gesture_agent.py @@ -1,4 +1,5 @@ import json +import logging import zmq import zmq.asyncio as azmq @@ -8,6 +9,8 @@ from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings from control_backend.schemas.ri_message import GestureCommand, RIEndpoint +experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name) + class RobotGestureAgent(BaseAgent): """ @@ -111,6 +114,7 @@ class RobotGestureAgent(BaseAgent): gesture_command.data, ) return + experiment_logger.action("Gesture: %s", gesture_command.data) await self.pubsocket.send_json(gesture_command.model_dump()) except Exception: self.logger.exception("Error processing internal message.") diff --git a/src/control_backend/agents/bdi/bdi_core_agent.py b/src/control_backend/agents/bdi/bdi_core_agent.py index 54b5149..698bbc4 100644 --- a/src/control_backend/agents/bdi/bdi_core_agent.py +++ b/src/control_backend/agents/bdi/bdi_core_agent.py @@ -1,6 +1,7 @@ import asyncio import copy import json +import logging import time from collections.abc import Iterable @@ -19,6 +20,9 @@ from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, Speec DELIMITER = ";\n" # TODO: temporary until we support lists in AgentSpeak +experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name) + + class BDICoreAgent(BaseAgent): """ BDI Core Agent. @@ -207,6 +211,9 @@ class BDICoreAgent(BaseAgent): else: term = agentspeak.Literal(name) + if name != "user_said": + experiment_logger.observation(f"Formed new belief: {name}{f'={args}' if args else ''}") + self.bdi_agent.call( agentspeak.Trigger.addition, agentspeak.GoalType.belief, @@ -244,6 +251,9 @@ class BDICoreAgent(BaseAgent): new_args = (agentspeak.Literal(arg) for arg in args) term = agentspeak.Literal(name, new_args) + if name != "user_said": + experiment_logger.observation(f"Removed belief: {name}{f'={args}' if args else ''}") + result = self.bdi_agent.call( agentspeak.Trigger.removal, agentspeak.GoalType.belief, @@ -386,6 +396,8 @@ class BDICoreAgent(BaseAgent): body=str(message_text), ) + experiment_logger.chat(str(message_text), extra={"role": "assistant"}) + self.add_behavior(self.send(chat_history_message)) yield diff --git a/src/control_backend/agents/bdi/bdi_program_manager.py b/src/control_backend/agents/bdi/bdi_program_manager.py index 6e8a594..3ea6a62 100644 --- a/src/control_backend/agents/bdi/bdi_program_manager.py +++ b/src/control_backend/agents/bdi/bdi_program_manager.py @@ -1,10 +1,12 @@ import asyncio import json +import logging import zmq from pydantic import ValidationError from zmq.asyncio import Context +import control_backend from control_backend.agents import BaseAgent from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator from control_backend.core.config import settings @@ -19,6 +21,8 @@ from control_backend.schemas.program import ( Program, ) +experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name) + class BDIProgramManager(BaseAgent): """ @@ -277,6 +281,18 @@ class BDIProgramManager(BaseAgent): await self.send(extractor_msg) self.logger.debug("Sent message to extractor agent to clear history.") + @staticmethod + def _rollover_experiment_logs(): + """ + A new experiment program started; make a new experiment log file. + """ + handlers = logging.getLogger(settings.logging_settings.experiment_logger_name).handlers + for handler in handlers: + if isinstance(handler, control_backend.logging.DatedFileHandler): + experiment_logger.action("Doing rollover...") + handler.do_rollover() + experiment_logger.debug("Finished rollover.") + async def _receive_programs(self): """ Continuous loop that receives program updates from the HTTP endpoint. @@ -297,6 +313,7 @@ class BDIProgramManager(BaseAgent): self._initialize_internal_state(program) await self._send_program_to_user_interrupt(program) await self._send_clear_llm_history() + self._rollover_experiment_logs() await asyncio.gather( self._create_agentspeak_and_send_to_bdi(program), diff --git a/src/control_backend/agents/llm/llm_agent.py b/src/control_backend/agents/llm/llm_agent.py index 8d81249..08a77e3 100644 --- a/src/control_backend/agents/llm/llm_agent.py +++ b/src/control_backend/agents/llm/llm_agent.py @@ -1,5 +1,6 @@ import asyncio import json +import logging import re import uuid from collections.abc import AsyncGenerator @@ -14,6 +15,8 @@ from control_backend.core.config import settings from ...schemas.llm_prompt_message import LLMPromptMessage from .llm_instructions import LLMInstructions +experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name) + class LLMAgent(BaseAgent): """ @@ -170,7 +173,7 @@ class LLMAgent(BaseAgent): *self.history, ] - message_id = str(uuid.uuid4()) # noqa + message_id = str(uuid.uuid4()) try: full_message = "" @@ -179,10 +182,9 @@ class LLMAgent(BaseAgent): full_message += token current_chunk += token - self.logger.llm( - "Received token: %s", + experiment_logger.chat( full_message, - extra={"reference": message_id}, # Used in the UI to update old logs + extra={"role": "assistant", "reference": message_id, "partial": True}, ) # Stream the message in chunks separated by punctuation. @@ -197,6 +199,11 @@ class LLMAgent(BaseAgent): # Yield any remaining tail if current_chunk: yield current_chunk + + experiment_logger.chat( + full_message, + extra={"role": "assistant", "reference": message_id, "partial": False}, + ) except httpx.HTTPError as err: self.logger.error("HTTP error.", exc_info=err) yield "LLM service unavailable." diff --git a/src/control_backend/agents/perception/transcription_agent/transcription_agent.py b/src/control_backend/agents/perception/transcription_agent/transcription_agent.py index 795623d..e69fea6 100644 --- a/src/control_backend/agents/perception/transcription_agent/transcription_agent.py +++ b/src/control_backend/agents/perception/transcription_agent/transcription_agent.py @@ -1,4 +1,5 @@ import asyncio +import logging import numpy as np import zmq @@ -10,6 +11,8 @@ from control_backend.core.config import settings from .speech_recognizer import SpeechRecognizer +experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name) + class TranscriptionAgent(BaseAgent): """ @@ -25,6 +28,8 @@ class TranscriptionAgent(BaseAgent): :ivar audio_in_socket: The ZMQ SUB socket instance. :ivar speech_recognizer: The speech recognition engine instance. :ivar _concurrency: Semaphore to limit concurrent transcriptions. + :ivar _current_speech_reference: The reference of the current user utterance, for synchronising + experiment logs. """ def __init__(self, audio_in_address: str): @@ -39,6 +44,7 @@ class TranscriptionAgent(BaseAgent): self.audio_in_socket: azmq.Socket | None = None self.speech_recognizer = None self._concurrency = None + self._current_speech_reference: str | None = None async def setup(self): """ @@ -63,6 +69,10 @@ class TranscriptionAgent(BaseAgent): self.logger.info("Finished setting up %s", self.name) + async def handle_message(self, msg: InternalMessage): + if msg.thread == "voice_activity": + self._current_speech_reference = msg.body + async def stop(self): """ Stop the agent and close sockets. @@ -96,24 +106,25 @@ class TranscriptionAgent(BaseAgent): async def _share_transcription(self, transcription: str): """ - Share a transcription to the other agents that depend on it. + Share a transcription to the other agents that depend on it, and to experiment logs. Currently sends to: - :attr:`settings.agent_settings.text_belief_extractor_name` + - The UI via the experiment logger :param transcription: The transcribed text. """ - receiver_names = [ - settings.agent_settings.text_belief_extractor_name, - ] + experiment_logger.chat( + transcription, + extra={"role": "user", "reference": self._current_speech_reference, "partial": False}, + ) - for receiver_name in receiver_names: - message = InternalMessage( - to=receiver_name, - sender=self.name, - body=transcription, - ) - await self.send(message) + message = InternalMessage( + to=settings.agent_settings.text_belief_extractor_name, + sender=self.name, + body=transcription, + ) + await self.send(message) async def _transcribing_loop(self) -> None: """ @@ -129,10 +140,9 @@ class TranscriptionAgent(BaseAgent): audio = np.frombuffer(audio_data, dtype=np.float32) speech = await self._transcribe(audio) if not speech: - self.logger.info("Nothing transcribed.") + self.logger.debug("Nothing transcribed.") continue - self.logger.info("Transcribed speech: %s", speech) await self._share_transcription(speech) except Exception as e: self.logger.error(f"Error in transcription loop: {e}") diff --git a/src/control_backend/agents/perception/vad_agent.py b/src/control_backend/agents/perception/vad_agent.py index 2b333f5..f397563 100644 --- a/src/control_backend/agents/perception/vad_agent.py +++ b/src/control_backend/agents/perception/vad_agent.py @@ -1,4 +1,6 @@ import asyncio +import logging +import uuid import numpy as np import torch @@ -12,6 +14,8 @@ from control_backend.schemas.internal_message import InternalMessage from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus from .transcription_agent.transcription_agent import TranscriptionAgent +experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name) + class SocketPoller[T]: """ @@ -252,6 +256,18 @@ class VADAgent(BaseAgent): if prob > prob_threshold: if self.i_since_speech > non_speech_patience + begin_silence_length: self.logger.debug("Speech started.") + reference = str(uuid.uuid4()) + experiment_logger.chat( + "...", + extra={"role": "user", "reference": reference, "partial": True}, + ) + await self.send( + InternalMessage( + to=settings.agent_settings.transcription_name, + body=reference, + thread="voice_activity", + ) + ) self.audio_buffer = np.append(self.audio_buffer, chunk) self.i_since_speech = 0 continue @@ -269,9 +285,10 @@ class VADAgent(BaseAgent): assert self.audio_out_socket is not None await self.audio_out_socket.send(self.audio_buffer[: -2 * len(chunk)].tobytes()) - # At this point, we know that the speech has ended. - # Prepend the last chunk that had no speech, for a more fluent boundary - self.audio_buffer = chunk + # At this point, we know that there is no speech. + # Prepend the last few chunks that had no speech, for a more fluent boundary. + self.audio_buffer = np.append(self.audio_buffer, chunk) + self.audio_buffer = self.audio_buffer[-begin_silence_length * len(chunk) :] async def handle_message(self, msg: InternalMessage): """ diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py index 25f24af..7975aa6 100644 --- a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py +++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py @@ -1,4 +1,5 @@ import json +import logging import zmq from zmq.asyncio import Context @@ -16,6 +17,8 @@ from control_backend.schemas.ri_message import ( SpeechCommand, ) +experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name) + class UserInterruptAgent(BaseAgent): """ @@ -305,6 +308,7 @@ class UserInterruptAgent(BaseAgent): :param text_to_say: The string that the robot has to say. """ + experiment_logger.chat(text_to_say, extra={"role": "assistant"}) cmd = SpeechCommand(data=text_to_say, is_priority=True) out_msg = InternalMessage( to=settings.agent_settings.robot_speech_name, diff --git a/test/unit/agents/actuation/test_robot_gesture_agent.py b/test/unit/agents/actuation/test_robot_gesture_agent.py index 1e6fd8a..20d7d51 100644 --- a/test/unit/agents/actuation/test_robot_gesture_agent.py +++ b/test/unit/agents/actuation/test_robot_gesture_agent.py @@ -1,5 +1,5 @@ import json -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch import pytest import zmq @@ -19,6 +19,12 @@ def zmq_context(mocker): return mock_context +@pytest.fixture(autouse=True) +def mock_experiment_logger(): + with patch("control_backend.agents.actuation.robot_gesture_agent.experiment_logger") as logger: + yield logger + + @pytest.mark.asyncio async def test_setup_bind(zmq_context, mocker): """Setup binds and subscribes to internal commands.""" diff --git a/test/unit/agents/bdi/test_bdi_core_agent.py b/test/unit/agents/bdi/test_bdi_core_agent.py index 6245d5b..1bf0107 100644 --- a/test/unit/agents/bdi/test_bdi_core_agent.py +++ b/test/unit/agents/bdi/test_bdi_core_agent.py @@ -26,6 +26,12 @@ def agent(): return agent +@pytest.fixture(autouse=True) +def mock_experiment_logger(): + with patch("control_backend.agents.bdi.bdi_core_agent.experiment_logger") as logger: + yield logger + + @pytest.mark.asyncio async def test_setup_loads_asl(mock_agentspeak_env, agent): # Mock file opening diff --git a/test/unit/agents/llm/test_llm_agent.py b/test/unit/agents/llm/test_llm_agent.py index bd407cc..bbd6e93 100644 --- a/test/unit/agents/llm/test_llm_agent.py +++ b/test/unit/agents/llm/test_llm_agent.py @@ -18,6 +18,12 @@ def mock_httpx_client(): yield mock_client +@pytest.fixture(autouse=True) +def mock_experiment_logger(): + with patch("control_backend.agents.llm.llm_agent.experiment_logger") as logger: + yield logger + + @pytest.mark.asyncio async def test_llm_processing_success(mock_httpx_client, mock_settings): # Setup the mock response for the stream diff --git a/test/unit/agents/perception/transcription_agent/test_transcription_agent.py b/test/unit/agents/perception/transcription_agent/test_transcription_agent.py index 57875ca..f5a4d1c 100644 --- a/test/unit/agents/perception/transcription_agent/test_transcription_agent.py +++ b/test/unit/agents/perception/transcription_agent/test_transcription_agent.py @@ -14,6 +14,15 @@ from control_backend.agents.perception.transcription_agent.transcription_agent i ) +@pytest.fixture(autouse=True) +def mock_experiment_logger(): + with patch( + "control_backend.agents.perception" + ".transcription_agent.transcription_agent.experiment_logger" + ) as logger: + yield logger + + @pytest.mark.asyncio async def test_transcription_agent_flow(mock_zmq_context): mock_sub = MagicMock() diff --git a/test/unit/agents/perception/vad_agent/test_vad_streaming.py b/test/unit/agents/perception/vad_agent/test_vad_streaming.py index 349fab2..b53f63d 100644 --- a/test/unit/agents/perception/vad_agent/test_vad_streaming.py +++ b/test/unit/agents/perception/vad_agent/test_vad_streaming.py @@ -24,7 +24,9 @@ def audio_out_socket(): @pytest.fixture def vad_agent(audio_out_socket): - return VADAgent("tcp://localhost:5555", False) + agent = VADAgent("tcp://localhost:5555", False) + agent._internal_pub_socket = AsyncMock() + return agent @pytest.fixture(autouse=True) @@ -44,6 +46,12 @@ def patch_settings(monkeypatch): monkeypatch.setattr(vad_agent.settings.vad_settings, "sample_rate_hz", 16_000, raising=False) +@pytest.fixture(autouse=True) +def mock_experiment_logger(): + with patch("control_backend.agents.perception.vad_agent.experiment_logger") as logger: + yield logger + + async def simulate_streaming_with_probabilities(streaming, probabilities: list[float]): """ Simulates a streaming scenario with given VAD model probabilities for testing purposes. @@ -84,14 +92,15 @@ async def test_voice_activity_detected(audio_out_socket, vad_agent): Test a scenario where there is voice activity detected between silences. """ speech_chunk_count = 5 - probabilities = [0.0] * 5 + [1.0] * speech_chunk_count + [0.0] * 5 + begin_silence_chunks = settings.behaviour_settings.vad_begin_silence_chunks + probabilities = [0.0] * 15 + [1.0] * speech_chunk_count + [0.0] * 5 vad_agent.audio_out_socket = audio_out_socket await simulate_streaming_with_probabilities(vad_agent, probabilities) audio_out_socket.send.assert_called_once() data = audio_out_socket.send.call_args[0][0] assert isinstance(data, bytes) - assert len(data) == 512 * 4 * (speech_chunk_count + 1) + assert len(data) == 512 * 4 * (begin_silence_chunks + speech_chunk_count) @pytest.mark.asyncio @@ -101,8 +110,9 @@ async def test_voice_activity_short_pause(audio_out_socket, vad_agent): short pause. """ speech_chunk_count = 5 + begin_silence_chunks = settings.behaviour_settings.vad_begin_silence_chunks probabilities = ( - [0.0] * 5 + [1.0] * speech_chunk_count + [0.0] + [1.0] * speech_chunk_count + [0.0] * 5 + [0.0] * 15 + [1.0] * speech_chunk_count + [0.0] + [1.0] * speech_chunk_count + [0.0] * 5 ) vad_agent.audio_out_socket = audio_out_socket await simulate_streaming_with_probabilities(vad_agent, probabilities) @@ -110,8 +120,8 @@ async def test_voice_activity_short_pause(audio_out_socket, vad_agent): audio_out_socket.send.assert_called_once() data = audio_out_socket.send.call_args[0][0] assert isinstance(data, bytes) - # Expecting 13 chunks (2*5 with speech, 1 pause between, 1 as padding) - assert len(data) == 512 * 4 * (speech_chunk_count * 2 + 1 + 1) + # Expecting 13 chunks (2*5 with speech, 1 pause between, begin_silence_chunks as padding) + assert len(data) == 512 * 4 * (speech_chunk_count * 2 + 1 + begin_silence_chunks) @pytest.mark.asyncio diff --git a/test/unit/agents/user_interrupt/test_user_interrupt.py b/test/unit/agents/user_interrupt/test_user_interrupt.py index 3786f8d..c41d79e 100644 --- a/test/unit/agents/user_interrupt/test_user_interrupt.py +++ b/test/unit/agents/user_interrupt/test_user_interrupt.py @@ -30,6 +30,14 @@ def agent(): return agent +@pytest.fixture(autouse=True) +def mock_experiment_logger(): + with patch( + "control_backend.agents.user_interrupt.user_interrupt_agent.experiment_logger" + ) as logger: + yield logger + + @pytest.mark.asyncio async def test_send_to_speech_agent(agent): """Verify speech command format.""" @@ -579,30 +587,6 @@ async def test_create_mapping_recursive_goals(agent): assert agent._goal_reverse_map["child_goal"] == str(child_goal_id) -@pytest.mark.asyncio -async def test_setup(agent): - """Test the setup method initializes sockets correctly.""" - with patch("control_backend.agents.user_interrupt.user_interrupt_agent.Context") as MockContext: - mock_ctx_instance = MagicMock() - MockContext.instance.return_value = mock_ctx_instance - - mock_sub = MagicMock() - mock_pub = MagicMock() - mock_ctx_instance.socket.side_effect = [mock_sub, mock_pub] - - # MOCK add_behavior so we don't rely on internal attributes - agent.add_behavior = MagicMock() - - await agent.setup() - - # Check sockets - mock_sub.connect.assert_called_with(settings.zmq_settings.internal_sub_address) - mock_pub.connect.assert_called_with(settings.zmq_settings.internal_pub_address) - - # Verify add_behavior was called - agent.add_behavior.assert_called_once() - - @pytest.mark.asyncio async def test_receive_loop_advanced_scenarios(agent): """ @@ -706,63 +690,3 @@ async def test_receive_loop_advanced_scenarios(agent): # Next Phase agent._send_experiment_control_to_bdi_core.assert_awaited_with("next_phase") - - -@pytest.mark.asyncio -async def test_handle_message_unknown_thread(agent): - """Test handling of an unknown message thread (lines 213-214).""" - msg = InternalMessage(to="me", thread="unknown_thread", body="test") - await agent.handle_message(msg) - - agent.logger.debug.assert_called_with( - "Received internal message on unhandled thread: unknown_thread" - ) - - -@pytest.mark.asyncio -async def test_send_to_bdi_belief_edge_cases(agent): - """ - Covers: - - Unknown asl_type warning (lines 326-328) - - unachieve=True logic (lines 334-337) - """ - # 1. Unknown Type - await agent._send_to_bdi_belief("slug", "unknown_type") - - agent.logger.warning.assert_called_with("Tried to send belief with unknown type") - agent.send.assert_not_called() - - # Reset mock for part 2 - agent.send.reset_mock() - - # 2. Unachieve = True - await agent._send_to_bdi_belief("slug", "cond_norm", unachieve=True) - - agent.send.assert_awaited() - sent_msg = agent.send.call_args.args[0] - - # Verify it is a delete operation - body_obj = BeliefMessage.model_validate_json(sent_msg.body) - - # Verify 'delete' has content - assert body_obj.delete is not None - assert len(body_obj.delete) == 1 - assert body_obj.delete[0].name == "force_slug" - - # Verify 'create' is empty (handling both None and []) - assert not body_obj.create - - -@pytest.mark.asyncio -async def test_send_experiment_control_unknown(agent): - """Test sending an unknown experiment control type (lines 366-367).""" - await agent._send_experiment_control_to_bdi_core("invalid_command") - - agent.logger.warning.assert_called_with( - "Received unknown experiment control type '%s' to send to BDI Core.", "invalid_command" - ) - - # Ensure it still sends an empty message (as per code logic, though thread is empty) - agent.send.assert_awaited() - msg = agent.send.call_args[0][0] - assert msg.thread == ""