Compare commits

..

3 Commits

Author SHA1 Message Date
Twirre Meulenbelt
a87ac35201 docs: add docstrings to dated file handler
ref: N25B-401
2026-01-22 11:34:51 +01:00
Twirre Meulenbelt
3fed2f95b0 Merge remote-tracking branch 'refs/remotes/origin/feat/visual-emotion-recognition' into feat/add-experiment-logs 2026-01-20 10:11:43 +01:00
Twirre Meulenbelt
ae39298f9c Merge branch 'feat/experiment-logging' into feat/add-experiment-logs
# Conflicts:
#	.gitignore
2026-01-20 09:37:34 +01:00
12 changed files with 32 additions and 107 deletions

2
.gitignore vendored
View File

@@ -273,6 +273,8 @@ experiment-*.log

View File

@@ -1,5 +1,4 @@
import json import json
import logging
import zmq import zmq
import zmq.asyncio as azmq import zmq.asyncio as azmq
@@ -9,8 +8,6 @@ from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint from control_backend.schemas.ri_message import GestureCommand, RIEndpoint
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class RobotGestureAgent(BaseAgent): class RobotGestureAgent(BaseAgent):
""" """
@@ -114,7 +111,6 @@ class RobotGestureAgent(BaseAgent):
gesture_command.data, gesture_command.data,
) )
return return
experiment_logger.action("Gesture: %s", gesture_command.data)
await self.pubsocket.send_json(gesture_command.model_dump()) await self.pubsocket.send_json(gesture_command.model_dump())
except Exception: except Exception:
self.logger.exception("Error processing internal message.") self.logger.exception("Error processing internal message.")

View File

@@ -1,7 +1,6 @@
import asyncio import asyncio
import copy import copy
import json import json
import logging
import time import time
from collections.abc import Iterable from collections.abc import Iterable
@@ -20,9 +19,6 @@ from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, Speec
DELIMITER = ";\n" # TODO: temporary until we support lists in AgentSpeak DELIMITER = ";\n" # TODO: temporary until we support lists in AgentSpeak
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class BDICoreAgent(BaseAgent): class BDICoreAgent(BaseAgent):
""" """
BDI Core Agent. BDI Core Agent.
@@ -211,9 +207,6 @@ class BDICoreAgent(BaseAgent):
else: else:
term = agentspeak.Literal(name) term = agentspeak.Literal(name)
if name != "user_said":
experiment_logger.observation(f"Formed new belief: {name}{f'={args}' if args else ''}")
self.bdi_agent.call( self.bdi_agent.call(
agentspeak.Trigger.addition, agentspeak.Trigger.addition,
agentspeak.GoalType.belief, agentspeak.GoalType.belief,
@@ -251,9 +244,6 @@ class BDICoreAgent(BaseAgent):
new_args = (agentspeak.Literal(arg) for arg in args) new_args = (agentspeak.Literal(arg) for arg in args)
term = agentspeak.Literal(name, new_args) term = agentspeak.Literal(name, new_args)
if name != "user_said":
experiment_logger.observation(f"Removed belief: {name}{f'={args}' if args else ''}")
result = self.bdi_agent.call( result = self.bdi_agent.call(
agentspeak.Trigger.removal, agentspeak.Trigger.removal,
agentspeak.GoalType.belief, agentspeak.GoalType.belief,
@@ -396,8 +386,6 @@ class BDICoreAgent(BaseAgent):
body=str(message_text), body=str(message_text),
) )
experiment_logger.chat(str(message_text), extra={"role": "assistant"})
self.add_behavior(self.send(chat_history_message)) self.add_behavior(self.send(chat_history_message))
yield yield
@@ -453,7 +441,6 @@ class BDICoreAgent(BaseAgent):
trigger_name = agentspeak.grounded(term.args[0], intention.scope) trigger_name = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug("Started trigger %s", trigger_name) self.logger.debug("Started trigger %s", trigger_name)
experiment_logger.observation("Triggered: %s", trigger_name)
msg = InternalMessage( msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name, to=settings.agent_settings.user_interrupt_name,

View File

@@ -1,12 +1,10 @@
import asyncio import asyncio
import json import json
import logging
import zmq import zmq
from pydantic import ValidationError from pydantic import ValidationError
from zmq.asyncio import Context from zmq.asyncio import Context
import control_backend
from control_backend.agents import BaseAgent from control_backend.agents import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.config import settings from control_backend.core.config import settings
@@ -21,8 +19,6 @@ from control_backend.schemas.program import (
Program, Program,
) )
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class BDIProgramManager(BaseAgent): class BDIProgramManager(BaseAgent):
""" """
@@ -245,18 +241,6 @@ class BDIProgramManager(BaseAgent):
await self.send(extractor_msg) await self.send(extractor_msg)
self.logger.debug("Sent message to extractor agent to clear history.") self.logger.debug("Sent message to extractor agent to clear history.")
@staticmethod
def _rollover_experiment_logs():
"""
A new experiment program started; make a new experiment log file.
"""
handlers = logging.getLogger("experiment").handlers
for handler in handlers:
if isinstance(handler, control_backend.logging.DatedFileHandler):
experiment_logger.action("Doing rollover...")
handler.do_rollover()
experiment_logger.debug("Finished rollover.")
async def _receive_programs(self): async def _receive_programs(self):
""" """
Continuous loop that receives program updates from the HTTP endpoint. Continuous loop that receives program updates from the HTTP endpoint.
@@ -277,7 +261,6 @@ class BDIProgramManager(BaseAgent):
self._initialize_internal_state(program) self._initialize_internal_state(program)
await self._send_program_to_user_interrupt(program) await self._send_program_to_user_interrupt(program)
await self._send_clear_llm_history() await self._send_clear_llm_history()
self._rollover_experiment_logs()
await asyncio.gather( await asyncio.gather(
self._create_agentspeak_and_send_to_bdi(program), self._create_agentspeak_and_send_to_bdi(program),

View File

@@ -150,9 +150,6 @@ class TextBeliefExtractorAgent(BaseAgent):
return return
available_beliefs = [b for b in belief_list.beliefs if isinstance(b, SemanticBelief)] available_beliefs = [b for b in belief_list.beliefs if isinstance(b, SemanticBelief)]
self._current_beliefs = BeliefState(
false={InternalBelief(name=b.name, arguments=None) for b in available_beliefs},
)
self.belief_inferrer.available_beliefs = available_beliefs self.belief_inferrer.available_beliefs = available_beliefs
self.logger.debug( self.logger.debug(
"Received %d semantic beliefs from the program manager: %s", "Received %d semantic beliefs from the program manager: %s",
@@ -173,9 +170,6 @@ class TextBeliefExtractorAgent(BaseAgent):
available_goals = {g for g in goals_list.goals if g.can_fail} available_goals = {g for g in goals_list.goals if g.can_fail}
available_goals -= self._force_completed_goals available_goals -= self._force_completed_goals
self.goal_inferrer.goals = available_goals self.goal_inferrer.goals = available_goals
self._current_goal_completions = {
f"achieved_{AgentSpeakGenerator.slugify(goal)}": False for goal in available_goals
}
self.logger.debug( self.logger.debug(
"Received %d failable goals from the program manager: %s", "Received %d failable goals from the program manager: %s",
len(available_goals), len(available_goals),

View File

@@ -55,7 +55,6 @@ class RICommunicationAgent(BaseAgent):
self.connected = False self.connected = False
self.gesture_agent: RobotGestureAgent | None = None self.gesture_agent: RobotGestureAgent | None = None
self.speech_agent: RobotSpeechAgent | None = None self.speech_agent: RobotSpeechAgent | None = None
self.visual_emotion_recognition_agent: VisualEmotionRecognitionAgent | None = None
async def setup(self): async def setup(self):
""" """
@@ -219,7 +218,6 @@ class RICommunicationAgent(BaseAgent):
socket_address=addr, socket_address=addr,
bind=bind, bind=bind,
) )
self.visual_emotion_recognition_agent = visual_emotion_agent
await visual_emotion_agent.start() await visual_emotion_agent.start()
case _: case _:
self.logger.warning("Unhandled negotiation id: %s", id) self.logger.warning("Unhandled negotiation id: %s", id)
@@ -326,9 +324,6 @@ class RICommunicationAgent(BaseAgent):
if self.speech_agent is not None: if self.speech_agent is not None:
await self.speech_agent.stop() await self.speech_agent.stop()
if self.visual_emotion_recognition_agent is not None:
await self.visual_emotion_recognition_agent.stop()
if self.pub_socket is not None: if self.pub_socket is not None:
self.pub_socket.close() self.pub_socket.close()
@@ -337,7 +332,6 @@ class RICommunicationAgent(BaseAgent):
self.connected = True self.connected = True
async def handle_message(self, msg: InternalMessage): async def handle_message(self, msg: InternalMessage):
return
try: try:
pause_command = PauseCommand.model_validate_json(msg.body) pause_command = PauseCommand.model_validate_json(msg.body)
await self._req_socket.send_json(pause_command.model_dump()) await self._req_socket.send_json(pause_command.model_dump())

View File

@@ -1,6 +1,5 @@
import asyncio import asyncio
import json import json
import logging
import re import re
import uuid import uuid
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
@@ -15,8 +14,6 @@ from control_backend.core.config import settings
from ...schemas.llm_prompt_message import LLMPromptMessage from ...schemas.llm_prompt_message import LLMPromptMessage
from .llm_instructions import LLMInstructions from .llm_instructions import LLMInstructions
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class LLMAgent(BaseAgent): class LLMAgent(BaseAgent):
""" """
@@ -173,7 +170,7 @@ class LLMAgent(BaseAgent):
*self.history, *self.history,
] ]
message_id = str(uuid.uuid4()) message_id = str(uuid.uuid4()) # noqa
try: try:
full_message = "" full_message = ""
@@ -182,9 +179,10 @@ class LLMAgent(BaseAgent):
full_message += token full_message += token
current_chunk += token current_chunk += token
experiment_logger.chat( self.logger.llm(
"Received token: %s",
full_message, full_message,
extra={"role": "assistant", "reference": message_id, "partial": True}, extra={"reference": message_id}, # Used in the UI to update old logs
) )
# Stream the message in chunks separated by punctuation. # Stream the message in chunks separated by punctuation.
@@ -199,11 +197,6 @@ class LLMAgent(BaseAgent):
# Yield any remaining tail # Yield any remaining tail
if current_chunk: if current_chunk:
yield current_chunk yield current_chunk
experiment_logger.chat(
full_message,
extra={"role": "assistant", "reference": message_id, "partial": False},
)
except httpx.HTTPError as err: except httpx.HTTPError as err:
self.logger.error("HTTP error.", exc_info=err) self.logger.error("HTTP error.", exc_info=err)
yield "LLM service unavailable." yield "LLM service unavailable."
@@ -219,7 +212,7 @@ class LLMAgent(BaseAgent):
:yield: Raw text tokens (deltas) from the SSE stream. :yield: Raw text tokens (deltas) from the SSE stream.
:raises httpx.HTTPError: If the API returns a non-200 status. :raises httpx.HTTPError: If the API returns a non-200 status.
""" """
async with httpx.AsyncClient(timeout=httpx.Timeout(20.0)) as client: async with httpx.AsyncClient() as client:
async with client.stream( async with client.stream(
"POST", "POST",
settings.llm_settings.local_llm_url, settings.llm_settings.local_llm_url,

View File

@@ -1,5 +1,4 @@
import asyncio import asyncio
import logging
import numpy as np import numpy as np
import zmq import zmq
@@ -11,8 +10,6 @@ from control_backend.core.config import settings
from .speech_recognizer import SpeechRecognizer from .speech_recognizer import SpeechRecognizer
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class TranscriptionAgent(BaseAgent): class TranscriptionAgent(BaseAgent):
""" """
@@ -28,8 +25,6 @@ class TranscriptionAgent(BaseAgent):
:ivar audio_in_socket: The ZMQ SUB socket instance. :ivar audio_in_socket: The ZMQ SUB socket instance.
:ivar speech_recognizer: The speech recognition engine instance. :ivar speech_recognizer: The speech recognition engine instance.
:ivar _concurrency: Semaphore to limit concurrent transcriptions. :ivar _concurrency: Semaphore to limit concurrent transcriptions.
:ivar _current_speech_reference: The reference of the current user utterance, for synchronising
experiment logs.
""" """
def __init__(self, audio_in_address: str): def __init__(self, audio_in_address: str):
@@ -44,7 +39,6 @@ class TranscriptionAgent(BaseAgent):
self.audio_in_socket: azmq.Socket | None = None self.audio_in_socket: azmq.Socket | None = None
self.speech_recognizer = None self.speech_recognizer = None
self._concurrency = None self._concurrency = None
self._current_speech_reference: str | None = None
async def setup(self): async def setup(self):
""" """
@@ -69,10 +63,6 @@ class TranscriptionAgent(BaseAgent):
self.logger.info("Finished setting up %s", self.name) self.logger.info("Finished setting up %s", self.name)
async def handle_message(self, msg: InternalMessage):
if msg.thread == "voice_activity":
self._current_speech_reference = msg.body
async def stop(self): async def stop(self):
""" """
Stop the agent and close sockets. Stop the agent and close sockets.
@@ -106,21 +96,20 @@ class TranscriptionAgent(BaseAgent):
async def _share_transcription(self, transcription: str): async def _share_transcription(self, transcription: str):
""" """
Share a transcription to the other agents that depend on it, and to experiment logs. Share a transcription to the other agents that depend on it.
Currently sends to: Currently sends to:
- :attr:`settings.agent_settings.text_belief_extractor_name` - :attr:`settings.agent_settings.text_belief_extractor_name`
- The UI via the experiment logger
:param transcription: The transcribed text. :param transcription: The transcribed text.
""" """
experiment_logger.chat( receiver_names = [
transcription, settings.agent_settings.text_belief_extractor_name,
extra={"role": "user", "reference": self._current_speech_reference, "partial": False}, ]
)
for receiver_name in receiver_names:
message = InternalMessage( message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name, to=receiver_name,
sender=self.name, sender=self.name,
body=transcription, body=transcription,
) )
@@ -140,9 +129,10 @@ class TranscriptionAgent(BaseAgent):
audio = np.frombuffer(audio_data, dtype=np.float32) audio = np.frombuffer(audio_data, dtype=np.float32)
speech = await self._transcribe(audio) speech = await self._transcribe(audio)
if not speech: if not speech:
self.logger.debug("Nothing transcribed.") self.logger.info("Nothing transcribed.")
continue continue
self.logger.info("Transcribed speech: %s", speech)
await self._share_transcription(speech) await self._share_transcription(speech)
except Exception as e: except Exception as e:
self.logger.error(f"Error in transcription loop: {e}") self.logger.error(f"Error in transcription loop: {e}")

View File

@@ -1,6 +1,4 @@
import asyncio import asyncio
import logging
import uuid
import numpy as np import numpy as np
import torch import torch
@@ -14,8 +12,6 @@ from control_backend.schemas.internal_message import InternalMessage
from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus
from .transcription_agent.transcription_agent import TranscriptionAgent from .transcription_agent.transcription_agent import TranscriptionAgent
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class SocketPoller[T]: class SocketPoller[T]:
""" """
@@ -256,18 +252,6 @@ class VADAgent(BaseAgent):
if prob > prob_threshold: if prob > prob_threshold:
if self.i_since_speech > non_speech_patience + begin_silence_length: if self.i_since_speech > non_speech_patience + begin_silence_length:
self.logger.debug("Speech started.") self.logger.debug("Speech started.")
reference = str(uuid.uuid4())
experiment_logger.chat(
"...",
extra={"role": "user", "reference": reference, "partial": True},
)
await self.send(
InternalMessage(
to=settings.agent_settings.transcription_name,
body=reference,
thread="voice_activity",
)
)
self.audio_buffer = np.append(self.audio_buffer, chunk) self.audio_buffer = np.append(self.audio_buffer, chunk)
self.i_since_speech = 0 self.i_since_speech = 0
continue continue
@@ -285,10 +269,9 @@ class VADAgent(BaseAgent):
assert self.audio_out_socket is not None assert self.audio_out_socket is not None
await self.audio_out_socket.send(self.audio_buffer[: -2 * len(chunk)].tobytes()) await self.audio_out_socket.send(self.audio_buffer[: -2 * len(chunk)].tobytes())
# At this point, we know that there is no speech. # At this point, we know that the speech has ended.
# Prepend the last few chunks that had no speech, for a more fluent boundary. # Prepend the last chunk that had no speech, for a more fluent boundary
self.audio_buffer = np.append(self.audio_buffer, chunk) self.audio_buffer = chunk
self.audio_buffer = self.audio_buffer[-begin_silence_length * len(chunk) :]
async def handle_message(self, msg: InternalMessage): async def handle_message(self, msg: InternalMessage):
""" """

View File

@@ -97,7 +97,6 @@ class VisualEmotionRecognitionAgent(BaseAgent):
if frame_image is None: if frame_image is None:
# Could not decode image, skip this frame # Could not decode image, skip this frame
self.logger.warning("Received invalid video frame, skipping.")
continue continue
# Get the dominant emotion from each face # Get the dominant emotion from each face

View File

@@ -1,5 +1,4 @@
import json import json
import logging
import zmq import zmq
from zmq.asyncio import Context from zmq.asyncio import Context
@@ -17,8 +16,6 @@ from control_backend.schemas.ri_message import (
SpeechCommand, SpeechCommand,
) )
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class UserInterruptAgent(BaseAgent): class UserInterruptAgent(BaseAgent):
""" """
@@ -197,7 +194,6 @@ class UserInterruptAgent(BaseAgent):
case "transition_phase": case "transition_phase":
new_phase_id = msg.body new_phase_id = msg.body
self.logger.info(f"Phase transition detected: {new_phase_id}") self.logger.info(f"Phase transition detected: {new_phase_id}")
experiment_logger.observation("Transitioned to next phase.")
payload = {"type": "phase_update", "id": new_phase_id} payload = {"type": "phase_update", "id": new_phase_id}
@@ -300,7 +296,6 @@ class UserInterruptAgent(BaseAgent):
:param text_to_say: The string that the robot has to say. :param text_to_say: The string that the robot has to say.
""" """
experiment_logger.chat(text_to_say, extra={"role": "assistant"})
cmd = SpeechCommand(data=text_to_say, is_priority=True) cmd = SpeechCommand(data=text_to_say, is_priority=True)
out_msg = InternalMessage( out_msg = InternalMessage(
to=settings.agent_settings.robot_speech_name, to=settings.agent_settings.robot_speech_name,

View File

@@ -12,12 +12,21 @@ class DatedFileHandler(FileHandler):
super().__init__(**kwargs) super().__init__(**kwargs)
def _make_filename(self) -> str: def _make_filename(self) -> str:
"""
Create the filename for the current logfile, using the configured file prefix and the
current date and time. If the directory does not exist, it gets created.
:return: A filepath.
"""
filepath = Path(f"{self._file_prefix}-{datetime.now():%Y%m%d-%H%M%S}.log") filepath = Path(f"{self._file_prefix}-{datetime.now():%Y%m%d-%H%M%S}.log")
if not filepath.parent.is_dir(): if not filepath.parent.is_dir():
filepath.parent.mkdir(parents=True, exist_ok=True) filepath.parent.mkdir(parents=True, exist_ok=True)
return str(filepath) return str(filepath)
def do_rollover(self): def do_rollover(self):
"""
Close the current logfile and create a new one with the current date and time.
"""
self.acquire() self.acquire()
try: try:
if self.stream: if self.stream: