refactor: improve logging and module structure

Changed some folders to not be modules and organized some `__init__.py`
files.

ref: N25B-223
This commit is contained in:
2025-11-02 11:32:21 +01:00
parent d66fe07438
commit d43cb9394a
18 changed files with 179 additions and 154 deletions

View File

@@ -1,7 +0,0 @@
{
"python.testing.pytestArgs": [
"test"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}

View File

@@ -0,0 +1,4 @@
from .belief_collector.belief_collector import BeliefCollectorAgent
from .llm.llm import LLMAgent
from .ri_communication_agent import RICommunicationAgent
from .vad_agent import VADAgent

View File

@@ -0,0 +1,2 @@
from .bdi_core import BDICoreAgent
from .text_extractor import TBeliefExtractorAgent

View File

@@ -5,10 +5,8 @@ from spade.behaviour import OneShotBehaviour
from spade.message import Message from spade.message import Message
from spade_bdi.bdi import BDIAgent from spade_bdi.bdi import BDIAgent
from control_backend.agents.bdi.behaviours.belief_setter import BeliefSetterBehaviour from .behaviours.belief_setter import BeliefSetterBehaviour
from control_backend.agents.bdi.behaviours.receive_llm_resp_behaviour import ( from .behaviours.receive_llm_resp_behaviour import ReceiveLLMResponseBehaviour
ReceiveLLMResponseBehaviour,
)
from control_backend.core.config import settings from control_backend.core.config import settings
@@ -26,12 +24,12 @@ class BDICoreAgent(BDIAgent):
""" """
Initializes belief behaviors and message routing. Initializes belief behaviors and message routing.
""" """
self.logger.info("BDICoreAgent setup started") self.logger.info("BDICoreAgent setup started.")
self.add_behaviour(BeliefSetterBehaviour()) self.add_behaviour(BeliefSetterBehaviour())
self.add_behaviour(ReceiveLLMResponseBehaviour()) self.add_behaviour(ReceiveLLMResponseBehaviour())
self.logger.info("BDICoreAgent setup complete") self.logger.info("BDICoreAgent setup complete.")
def add_custom_actions(self, actions) -> None: def add_custom_actions(self, actions) -> None:
""" """
@@ -45,7 +43,7 @@ class BDICoreAgent(BDIAgent):
Example: .reply("Hello LLM!") Example: .reply("Hello LLM!")
""" """
message_text = agentspeak.grounded(term.args[0], intention.scope) message_text = agentspeak.grounded(term.args[0], intention.scope)
self.logger.info("Reply action sending: %s", message_text) self.logger.debug("Reply action sending: %s", message_text)
self._send_to_llm(str(message_text)) self._send_to_llm(str(message_text))
yield yield
@@ -63,6 +61,6 @@ class BDICoreAgent(BDIAgent):
) )
await self.send(msg) await self.send(msg)
self.agent.logger.info("Message sent to LLM: %s", text) self.agent.logger.info("Message sent to LLM agent: %s", text)
self.add_behaviour(SendBehaviour()) self.add_behaviour(SendBehaviour())

View File

@@ -3,7 +3,7 @@ import logging
from spade.agent import Message from spade.agent import Message
from spade.behaviour import CyclicBehaviour from spade.behaviour import CyclicBehaviour
from spade_bdi.bdi import BDIAgent, BeliefNotInitiated from spade_bdi.bdi import BDIAgent
from control_backend.core.config import settings from control_backend.core.config import settings
@@ -11,26 +11,32 @@ from control_backend.core.config import settings
class BeliefSetterBehaviour(CyclicBehaviour): class BeliefSetterBehaviour(CyclicBehaviour):
""" """
This is the behaviour that the BDI agent runs. This behaviour waits for incoming This is the behaviour that the BDI agent runs. This behaviour waits for incoming
message and processes it based on sender. message and updates the agent's beliefs accordingly.
""" """
agent: BDIAgent agent: BDIAgent
logger = logging.getLogger("BDI/Belief Setter") logger = logging.getLogger(__name__)
async def run(self): async def run(self):
msg = await self.receive(timeout=0.1) """Polls for messages and processes them."""
if msg: msg = await self.receive()
self.logger.info(f"Received message {msg.body}") self.logger.debug(
"Received message from %s with thread '%s' and body: %s",
msg.sender,
msg.thread,
msg.body,
)
self._process_message(msg) self._process_message(msg)
def _process_message(self, message: Message): def _process_message(self, message: Message):
"""Routes the message to the correct processing function based on the sender."""
sender = message.sender.node # removes host from jid and converts to str sender = message.sender.node # removes host from jid and converts to str
self.logger.debug("Sender: %s", sender) self.logger.debug("Processing message from sender: %s", sender)
match sender: match sender:
case settings.agent_settings.belief_collector_agent_name: case settings.agent_settings.belief_collector_agent_name:
self.logger.debug("Processing message from belief collector.") self.logger.debug("Message is from the belief collector agent. Processing as belief message.")
self._process_belief_message(message) self._process_belief_message(message)
case _: case _:
self.logger.debug("Not the belief agent, discarding message") self.logger.debug("Not the belief agent, discarding message")
@@ -38,6 +44,7 @@ class BeliefSetterBehaviour(CyclicBehaviour):
def _process_belief_message(self, message: Message): def _process_belief_message(self, message: Message):
if not message.body: if not message.body:
self.logger.debug("Ignoring message with empty body from %s", message.sender.node)
return return
match message.thread: match message.thread:
@@ -45,22 +52,33 @@ class BeliefSetterBehaviour(CyclicBehaviour):
try: try:
beliefs: dict[str, list[str]] = json.loads(message.body) beliefs: dict[str, list[str]] = json.loads(message.body)
self._set_beliefs(beliefs) self._set_beliefs(beliefs)
except json.JSONDecodeError as e: except json.JSONDecodeError:
self.logger.error("Could not decode beliefs into JSON format: %s", e) self.logger.error(
"Could not decode beliefs from JSON. Message body: '%s'",
message.body,
exc_info=True
)
case _: case _:
pass pass
def _set_beliefs(self, beliefs: dict[str, list[str]]): def _set_beliefs(self, beliefs: dict[str, list[str]]):
"""Remove previous values for beliefs and update them with the provided values.""" """Removes previous values for beliefs and updates them with the provided values."""
if self.agent.bdi is None: if self.agent.bdi is None:
self.logger.warning("Cannot set beliefs, since agent's BDI is not yet initialized.") self.logger.warning("Cannot set beliefs; agent's BDI is not yet initialized.")
return
if not beliefs:
self.logger.debug("Received an empty set of beliefs. No beliefs were updated.")
return return
# Set new beliefs (outdated beliefs are automatically removed) # Set new beliefs (outdated beliefs are automatically removed)
for belief, arguments in beliefs.items(): for belief, arguments in beliefs.items():
self.logger.debug("Setting belief %s with arguments %s", belief, arguments)
self.agent.bdi.set_belief(belief, *arguments) self.agent.bdi.set_belief(belief, *arguments)
# Special case: if there's a new user message, flag that we haven't responded yet # Special case: if there's a new user message, flag that we haven't responded yet
if belief == "user_said": self.agent.bdi.set_belief("new_message") if belief == "user_said":
self.agent.bdi.set_belief("new_message")
self.logger.debug("Detected 'user_said' belief, also setting 'new_message' belief.")
self.logger.info("Set belief %s with arguments %s", belief, arguments) self.logger.info("Successfully updated %d beliefs.", len(beliefs))

View File

@@ -9,11 +9,9 @@ class ReceiveLLMResponseBehaviour(CyclicBehaviour):
""" """
Adds behavior to receive responses from the LLM Agent. Adds behavior to receive responses from the LLM Agent.
""" """
logger = logging.getLogger("BDI/LLM Reciever") logger = logging.getLogger(__name__)
async def run(self): async def run(self):
msg = await self.receive(timeout=2) msg = await self.receive()
if not msg:
return
sender = msg.sender.node sender = msg.sender.node
match sender: match sender:
@@ -22,5 +20,5 @@ class ReceiveLLMResponseBehaviour(CyclicBehaviour):
self.logger.info("Received LLM response: %s", content) self.logger.info("Received LLM response: %s", content)
#Here the BDI can pass the message back as a response #Here the BDI can pass the message back as a response
case _: case _:
self.logger.debug("Not from the llm, discarding message") self.logger.debug("Discarding message from %s", sender)
pass pass

View File

@@ -37,17 +37,15 @@ class BeliefFromText(CyclicBehaviour):
} }
async def run(self): async def run(self):
msg = await self.receive(timeout=0.1) msg = await self.receive()
if msg:
sender = msg.sender.node sender = msg.sender.node
match sender: match sender:
case settings.agent_settings.transcription_agent_name: case settings.agent_settings.transcription_agent_name:
self.logger.info("Received text from transcriber.") self.logger.debug("Received text from transcriber: %s", msg.body)
await self._process_transcription_demo(msg.body) await self._process_transcription_demo(msg.body)
case _: case _:
self.logger.info("Received message from other agent.") self.logger.info("Discarding message from %s", sender)
pass pass
await asyncio.sleep(1)
async def _process_transcription(self, text: str): async def _process_transcription(self, text: str):
text_prompt = f"Text: {text}" text_prompt = f"Text: {text}"
@@ -91,4 +89,4 @@ class BeliefFromText(CyclicBehaviour):
belief_msg.thread = "beliefs" belief_msg.thread = "beliefs"
await self.send(belief_msg) await self.send(belief_msg)
self.logger.info("Sent beliefs to Belief Collector.") self.logger.info("Sent %d beliefs to the belief collector.", len(belief["beliefs"]))

View File

@@ -1,9 +1,8 @@
from spade.agent import Agent from spade.agent import Agent
from control_backend.agents.bdi.behaviours.text_belief_extractor import BeliefFromText from .behaviours.text_belief_extractor import BeliefFromText
class TBeliefExtractor(Agent): class TBeliefExtractorAgent(Agent):
async def setup(self): async def setup(self):
self.b = BeliefFromText() self.add_behaviour(BeliefFromText())
self.add_behaviour(self.b)

View File

@@ -1,32 +1,32 @@
import json import json
import logging import logging
from json import JSONDecodeError
from spade.behaviour import CyclicBehaviour from spade.behaviour import CyclicBehaviour
from spade.agent import Message from spade.agent import Message
from control_backend.core.config import settings from control_backend.core.config import settings
logger = logging.getLogger(__name__)
class ContinuousBeliefCollector(CyclicBehaviour): class ContinuousBeliefCollector(CyclicBehaviour):
""" """
Continuously collects beliefs/emotions from extractor agents: Continuously collects beliefs/emotions from extractor agents:
Then we send a unified belief packet to the BDI agent. Then we send a unified belief packet to the BDI agent.
""" """
logger = logging.getLogger(__name__)
async def run(self): async def run(self):
msg = await self.receive(timeout=0.1) # Wait for 0.1s msg = await self.receive()
if msg:
await self._process_message(msg) await self._process_message(msg)
async def _process_message(self, msg: Message): async def _process_message(self, msg: Message):
sender_node = self._sender_node(msg) sender_node = msg.sender.node
# Parse JSON payload # Parse JSON payload
try: try:
payload = json.loads(msg.body) payload = json.loads(msg.body)
except Exception as e: except JSONDecodeError as e:
logger.warning( self.logger.warning(
"BeliefCollector: failed to parse JSON from %s. Body=%r Error=%s", "Failed to parse JSON from %s. Body=%r Error=%s",
sender_node, msg.body, e sender_node, msg.body, e
) )
return return
@@ -35,27 +35,18 @@ class ContinuousBeliefCollector(CyclicBehaviour):
# Prefer explicit 'type' field # Prefer explicit 'type' field
if msg_type == "belief_extraction_text" or sender_node == "belief_text_agent_mock": if msg_type == "belief_extraction_text" or sender_node == "belief_text_agent_mock":
logger.info("BeliefCollector: message routed to _handle_belief_text (sender=%s)", sender_node) self.logger.debug("Message routed to _handle_belief_text (sender=%s)", sender_node)
await self._handle_belief_text(payload, sender_node) await self._handle_belief_text(payload, sender_node)
#This is not implemented yet, but we keep the structure for future use #This is not implemented yet, but we keep the structure for future use
elif msg_type == "emotion_extraction_text" or sender_node == "emo_text_agent_mock": elif msg_type == "emotion_extraction_text" or sender_node == "emo_text_agent_mock":
logger.info("BeliefCollector: message routed to _handle_emo_text (sender=%s)", sender_node) self.logger.debug("Message routed to _handle_emo_text (sender=%s)", sender_node)
await self._handle_emo_text(payload, sender_node) await self._handle_emo_text(payload, sender_node)
else: else:
logger.info( self.logger.warning(
"BeliefCollector: unrecognized message (sender=%s, type=%r). Ignoring.", "Unrecognized message (sender=%s, type=%r). Ignoring.",
sender_node, msg_type sender_node, msg_type
) )
@staticmethod
def _sender_node(msg: Message) -> str:
"""
Extracts the 'node' (localpart) of the sender JID.
E.g., 'agent@host/resource' -> 'agent'
"""
s = str(msg.sender) if msg.sender is not None else "no_sender"
return s.split("@", 1)[0] if "@" in s else s
async def _handle_belief_text(self, payload: dict, origin: str): async def _handle_belief_text(self, payload: dict, origin: str):
""" """
@@ -70,21 +61,13 @@ class ContinuousBeliefCollector(CyclicBehaviour):
beliefs = payload.get("beliefs", {}) beliefs = payload.get("beliefs", {})
if not beliefs: if not beliefs:
logger.info("BeliefCollector: no beliefs to process.") self.logger.debug("Received empty beliefs set.")
return return
if not isinstance(beliefs, dict): self.logger.debug("Forwarding %d beliefs.", len(beliefs))
logger.warning("BeliefCollector: 'beliefs' is not a dict: %r", beliefs)
return
if not all(isinstance(v, list) for v in beliefs.values()):
logger.warning("BeliefCollector: 'beliefs' values are not all lists: %r", beliefs)
return
logger.info("BeliefCollector: forwarding %d beliefs.", len(beliefs))
for belief_name, belief_list in beliefs.items(): for belief_name, belief_list in beliefs.items():
for belief in belief_list: for belief in belief_list:
logger.info(" - %s %s", belief_name,str(belief)) self.logger.debug(" - %s %s", belief_name,str(belief))
await self._send_beliefs_to_bdi(beliefs, origin=origin) await self._send_beliefs_to_bdi(beliefs, origin=origin)
@@ -109,4 +92,4 @@ class ContinuousBeliefCollector(CyclicBehaviour):
await self.send(msg) await self.send(msg)
logger.info("BeliefCollector: sent %d belief(s) to BDI at %s", len(beliefs), to_jid) self.logger.info("Sent %d belief(s) to BDI core.", len(beliefs))

View File

@@ -11,7 +11,7 @@ from spade.agent import Agent
from spade.behaviour import CyclicBehaviour from spade.behaviour import CyclicBehaviour
from spade.message import Message from spade.message import Message
from control_backend.agents.llm.llm_instructions import LLMInstructions from .llm_instructions import LLMInstructions
from control_backend.core.config import settings from control_backend.core.config import settings
@@ -35,12 +35,10 @@ class LLMAgent(Agent):
Receives SPADE messages and processes only those originating from the Receives SPADE messages and processes only those originating from the
configured BDI agent. configured BDI agent.
""" """
msg = await self.receive(timeout=1) msg = await self.receive()
if not msg:
return
sender = msg.sender.node sender = msg.sender.node
self.agent.logger.info( self.agent.logger.debug(
"Received message: %s from %s", "Received message: %s from %s",
msg.body, msg.body,
sender, sender,
@@ -121,7 +119,6 @@ class LLMAgent(Agent):
Sets up the SPADE behaviour to filter and process messages from the Sets up the SPADE behaviour to filter and process messages from the
BDI Core Agent. BDI Core Agent.
""" """
self.logger.info("LLMAgent setup complete")
behaviour = self.ReceiveMessageBehaviour() behaviour = self.ReceiveMessageBehaviour()
self.add_behaviour(behaviour) self.add_behaviour(behaviour)
self.logger.info("LLMAgent setup complete")

View File

@@ -1,5 +1,4 @@
import asyncio import asyncio
import json
import logging import logging
from spade.agent import Agent from spade.agent import Agent
from spade.behaviour import CyclicBehaviour from spade.behaviour import CyclicBehaviour
@@ -7,8 +6,7 @@ import zmq
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.core.zmq_context import context from control_backend.core.zmq_context import context
from control_backend.schemas.message import Message from .ri_command_agent import RICommandAgent
from control_backend.agents.ri_command_agent import RICommandAgent
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@@ -1,2 +0,0 @@
from .speech_recognizer import SpeechRecognizer as SpeechRecognizer
from .transcription_agent import TranscriptionAgent as TranscriptionAgent

View File

@@ -8,7 +8,7 @@ from spade.agent import Agent
from spade.behaviour import CyclicBehaviour from spade.behaviour import CyclicBehaviour
from spade.message import Message from spade.message import Message
from control_backend.agents.transcription.speech_recognizer import SpeechRecognizer from .speech_recognizer import SpeechRecognizer
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.core.zmq_context import context as zmq_context from control_backend.core.zmq_context import context as zmq_context

View File

@@ -7,7 +7,7 @@ import zmq.asyncio as azmq
from spade.agent import Agent from spade.agent import Agent
from spade.behaviour import CyclicBehaviour from spade.behaviour import CyclicBehaviour
from control_backend.agents.transcription import TranscriptionAgent from .transcription.transcription_agent import TranscriptionAgent
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.core.zmq_context import context as zmq_context from control_backend.core.zmq_context import context as zmq_context

View File

@@ -1,6 +1,3 @@
# Standard library imports
# External imports
import contextlib import contextlib
import logging import logging
@@ -8,74 +5,116 @@ import zmq
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from control_backend.agents.bdi.bdi_core import BDICoreAgent from control_backend.agents import (
from control_backend.agents.bdi.text_extractor import TBeliefExtractor BeliefCollectorAgent,
from control_backend.agents.belief_collector.belief_collector import BeliefCollectorAgent LLMAgent,
from control_backend.agents.llm.llm import LLMAgent RICommunicationAgent,
VADAgent,
# Internal imports )
from control_backend.agents.ri_communication_agent import RICommunicationAgent from control_backend.agents.bdi import BDICoreAgent, TBeliefExtractorAgent
from control_backend.agents.vad_agent import VADAgent
from control_backend.api.v1.router import api_router from control_backend.api.v1.router import api_router
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.core.zmq_context import context from control_backend.core.zmq_context import context
from control_backend.logging import setup_logging from control_backend.logging import setup_logging
logger = logging.getLogger(__name__)
@contextlib.asynccontextmanager @contextlib.asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
"""
Application lifespan context manager to handle startup and shutdown events.
"""
# --- APPLICATION STARTUP ---
setup_logging() setup_logging()
logger = logging.getLogger(__name__) logger.info("%s is starting up.", app.title)
logger.info("%s starting up.", app.title) # --- Initialize Sockets ---
logger.info("Initializing ZeroMQ sockets.")
# Initiate sockets try:
internal_comm_socket = context.socket(zmq.PUB) internal_comm_socket = context.socket(zmq.PUB)
internal_comm_address = settings.zmq_settings.internal_comm_address internal_comm_address = settings.zmq_settings.internal_comm_address
logger.debug("Binding internal PUB socket to address: %s", internal_comm_address)
internal_comm_socket.bind(internal_comm_address) internal_comm_socket.bind(internal_comm_address)
app.state.internal_comm_socket = internal_comm_socket app.state.internal_comm_socket = internal_comm_socket
logger.info("Internal publishing socket bound to %s", internal_comm_socket) logger.info("Internal communication socket bound successfully.")
except Exception as e:
logger.error("Failed to bind internal communication socket: %s", e, exc_info=True)
raise
# Initiate agents # --- Initialize Agents ---
ri_communication_agent = RICommunicationAgent( logger.info("Initializing and starting agents.")
settings.agent_settings.ri_communication_agent_name + "@" + settings.agent_settings.host, agents_to_start = {
settings.agent_settings.ri_communication_agent_name, "RICommunicationAgent": (
address="tcp://*:5555", RICommunicationAgent,
bind=True, {
) "name": settings.agent_settings.ri_communication_agent_name,
await ri_communication_agent.start() "jid": f"{settings.agent_settings.ri_communication_agent_name}@{settings.agent_settings.host}",
"password": settings.agent_settings.ri_communication_agent_name,
"address": "tcp://*:5555",
"bind": True,
},
),
"LLMAgent": (
LLMAgent,
{
"name": settings.agent_settings.llm_agent_name,
"jid": f"{settings.agent_settings.llm_agent_name}@{settings.agent_settings.host}",
"password": settings.agent_settings.llm_agent_name,
},
),
"BDICoreAgent": (
BDICoreAgent,
{
"name": settings.agent_settings.bdi_core_agent_name,
"jid": f"{settings.agent_settings.bdi_core_agent_name}@{settings.agent_settings.host}",
"password": settings.agent_settings.bdi_core_agent_name,
"asl": "src/control_backend/agents/bdi/rules.asl",
},
),
"BeliefCollectorAgent": (
BeliefCollectorAgent,
{
"name": settings.agent_settings.belief_collector_agent_name,
"jid": f"{settings.agent_settings.belief_collector_agent_name}@{settings.agent_settings.host}",
"password": settings.agent_settings.belief_collector_agent_name,
},
),
"TBeliefExtractor": (
TBeliefExtractorAgent,
{
"name": settings.agent_settings.text_belief_extractor_agent_name,
"jid": f"{settings.agent_settings.text_belief_extractor_agent_name}@{settings.agent_settings.host}",
"password": settings.agent_settings.text_belief_extractor_agent_name,
},
),
"VADAgent": (
VADAgent,
{"audio_in_address": "tcp://localhost:5558", "audio_in_bind": False},
),
}
llm_agent = LLMAgent( for name, (agent_class, kwargs) in agents_to_start.items():
settings.agent_settings.llm_agent_name + '@' + settings.agent_settings.host, try:
settings.agent_settings.llm_agent_name, logger.debug("Starting agent: %s", name)
) agent_instance = agent_class(**{k: v for k, v in kwargs.items() if k != "name"})
await llm_agent.start() await agent_instance.start()
logger.info("Agent '%s' started successfully.", name)
except Exception as e:
logger.error("Failed to start agent '%s': %s", name, e, exc_info=True)
# Consider if the application should continue if an agent fails to start.
raise
bdi_core = BDICoreAgent( logger.info("Application startup complete.")
settings.agent_settings.bdi_core_agent_name + '@' + settings.agent_settings.host,
settings.agent_settings.bdi_core_agent_name,
"src/control_backend/agents/bdi/rules.asl",
)
await bdi_core.start()
belief_collector = BeliefCollectorAgent(
settings.agent_settings.belief_collector_agent_name + '@' + settings.agent_settings.host,
settings.agent_settings.belief_collector_agent_name,
)
await belief_collector.start()
text_belief_extractor = TBeliefExtractor(
settings.agent_settings.text_belief_extractor_agent_name + '@' + settings.agent_settings.host,
settings.agent_settings.text_belief_extractor_agent_name,
)
await text_belief_extractor.start()
_temp_vad_agent = VADAgent("tcp://localhost:5558", False)
await _temp_vad_agent.start()
yield yield
logger.info("%s shutting down.", app.title) # --- APPLICATION SHUTDOWN ---
logger.info("%s is shutting down.", app.title)
# Potential shutdown logic goes here
logger.info("Application shutdown complete.")
# if __name__ == "__main__": # if __name__ == "__main__":