chore: merge current dev into refactor/config-file
ref: N25B-236
This commit is contained in:
@@ -0,0 +1,7 @@
|
||||
from .base import BaseAgent as BaseAgent
|
||||
from .belief_collector.belief_collector import BeliefCollectorAgent as BeliefCollectorAgent
|
||||
from .llm.llm import LLMAgent as LLMAgent
|
||||
from .ri_command_agent import RICommandAgent as RICommandAgent
|
||||
from .ri_communication_agent import RICommunicationAgent as RICommunicationAgent
|
||||
from .transcription.transcription_agent import TranscriptionAgent as TranscriptionAgent
|
||||
from .vad_agent import VADAgent as VADAgent
|
||||
|
||||
18
src/control_backend/agents/base.py
Normal file
18
src/control_backend/agents/base.py
Normal file
@@ -0,0 +1,18 @@
|
||||
import logging
|
||||
|
||||
from spade.agent import Agent
|
||||
|
||||
|
||||
class BaseAgent(Agent):
|
||||
"""
|
||||
Base agent class for our agents to inherit from.
|
||||
This ensures that all agents have a logger.
|
||||
"""
|
||||
|
||||
logger: logging.Logger
|
||||
|
||||
# Whenever a subclass is initiated, give it the correct logger
|
||||
def __init_subclass__(cls, **kwargs) -> None:
|
||||
super().__init_subclass__(**kwargs)
|
||||
|
||||
cls.logger = logging.getLogger(__package__).getChild(cls.__name__)
|
||||
@@ -0,0 +1,2 @@
|
||||
from .bdi_core import BDICoreAgent as BDICoreAgent
|
||||
from .text_extractor import TBeliefExtractorAgent as TBeliefExtractorAgent
|
||||
|
||||
@@ -5,12 +5,11 @@ from spade.behaviour import OneShotBehaviour
|
||||
from spade.message import Message
|
||||
from spade_bdi.bdi import BDIAgent
|
||||
|
||||
from control_backend.agents.bdi.behaviours.belief_setter import BeliefSetterBehaviour
|
||||
from control_backend.agents.bdi.behaviours.receive_llm_resp_behaviour import (
|
||||
ReceiveLLMResponseBehaviour,
|
||||
)
|
||||
from control_backend.core.config import settings
|
||||
|
||||
from .behaviours.belief_setter import BeliefSetterBehaviour
|
||||
from .behaviours.receive_llm_resp_behaviour import ReceiveLLMResponseBehaviour
|
||||
|
||||
|
||||
class BDICoreAgent(BDIAgent):
|
||||
"""
|
||||
@@ -20,18 +19,18 @@ class BDICoreAgent(BDIAgent):
|
||||
It has the BeliefSetter behaviour and can aks and recieve requests from the LLM agent.
|
||||
"""
|
||||
|
||||
logger = logging.getLogger("bdi_core_agent")
|
||||
logger = logging.getLogger(__package__).getChild(__name__)
|
||||
|
||||
async def setup(self) -> None:
|
||||
"""
|
||||
Initializes belief behaviors and message routing.
|
||||
"""
|
||||
self.logger.info("BDICoreAgent setup started")
|
||||
self.logger.info("BDICoreAgent setup started.")
|
||||
|
||||
self.add_behaviour(BeliefSetterBehaviour())
|
||||
self.add_behaviour(ReceiveLLMResponseBehaviour())
|
||||
|
||||
self.logger.info("BDICoreAgent setup complete")
|
||||
self.logger.info("BDICoreAgent setup complete.")
|
||||
|
||||
def add_custom_actions(self, actions) -> None:
|
||||
"""
|
||||
@@ -45,7 +44,7 @@ class BDICoreAgent(BDIAgent):
|
||||
Example: .reply("Hello LLM!")
|
||||
"""
|
||||
message_text = agentspeak.grounded(term.args[0], intention.scope)
|
||||
self.logger.info("Reply action sending: %s", message_text)
|
||||
self.logger.debug("Reply action sending: %s", message_text)
|
||||
|
||||
self._send_to_llm(str(message_text))
|
||||
yield
|
||||
@@ -63,6 +62,6 @@ class BDICoreAgent(BDIAgent):
|
||||
)
|
||||
|
||||
await self.send(msg)
|
||||
self.agent.logger.info("Message sent to LLM: %s", text)
|
||||
self.agent.logger.info("Message sent to LLM agent: %s", text)
|
||||
|
||||
self.add_behaviour(SendBehaviour())
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import json
|
||||
import logging
|
||||
|
||||
from spade.agent import Message
|
||||
from spade.behaviour import CyclicBehaviour
|
||||
@@ -11,33 +10,40 @@ from control_backend.core.config import settings
|
||||
class BeliefSetterBehaviour(CyclicBehaviour):
|
||||
"""
|
||||
This is the behaviour that the BDI agent runs. This behaviour waits for incoming
|
||||
message and processes it based on sender.
|
||||
message and updates the agent's beliefs accordingly.
|
||||
"""
|
||||
|
||||
agent: BDIAgent
|
||||
logger = logging.getLogger("BDI/Belief Setter")
|
||||
|
||||
async def run(self):
|
||||
t = settings.behaviour_settings.default_rcv_timeout
|
||||
msg = await self.receive(timeout=t)
|
||||
if msg:
|
||||
self.logger.info(f"Received message {msg.body}")
|
||||
self._process_message(msg)
|
||||
"""Polls for messages and processes them."""
|
||||
msg = await self.receive()
|
||||
self.agent.logger.debug(
|
||||
"Received message from %s with thread '%s' and body: %s",
|
||||
msg.sender,
|
||||
msg.thread,
|
||||
msg.body,
|
||||
)
|
||||
self._process_message(msg)
|
||||
|
||||
def _process_message(self, message: Message):
|
||||
"""Routes the message to the correct processing function based on the sender."""
|
||||
sender = message.sender.node # removes host from jid and converts to str
|
||||
self.logger.debug("Sender: %s", sender)
|
||||
self.agent.logger.debug("Processing message from sender: %s", sender)
|
||||
|
||||
match sender:
|
||||
case settings.agent_settings.belief_collector_agent_name:
|
||||
self.logger.debug("Processing message from belief collector.")
|
||||
self.agent.logger.debug(
|
||||
"Message is from the belief collector agent. Processing as belief message."
|
||||
)
|
||||
self._process_belief_message(message)
|
||||
case _:
|
||||
self.logger.debug("Not the belief agent, discarding message")
|
||||
self.agent.logger.debug("Not the belief agent, discarding message")
|
||||
pass
|
||||
|
||||
def _process_belief_message(self, message: Message):
|
||||
if not message.body:
|
||||
self.agent.logger.debug("Ignoring message with empty body from %s", message.sender.node)
|
||||
return
|
||||
|
||||
match message.thread:
|
||||
@@ -45,23 +51,35 @@ class BeliefSetterBehaviour(CyclicBehaviour):
|
||||
try:
|
||||
beliefs: dict[str, list[str]] = json.loads(message.body)
|
||||
self._set_beliefs(beliefs)
|
||||
except json.JSONDecodeError as e:
|
||||
self.logger.error("Could not decode beliefs into JSON format: %s", e)
|
||||
except json.JSONDecodeError:
|
||||
self.agent.logger.error(
|
||||
"Could not decode beliefs from JSON. Message body: '%s'",
|
||||
message.body,
|
||||
exc_info=True,
|
||||
)
|
||||
case _:
|
||||
pass
|
||||
|
||||
def _set_beliefs(self, beliefs: dict[str, list[str]]):
|
||||
"""Remove previous values for beliefs and update them with the provided values."""
|
||||
"""Removes previous values for beliefs and updates them with the provided values."""
|
||||
if self.agent.bdi is None:
|
||||
self.logger.warning("Cannot set beliefs, since agent's BDI is not yet initialized.")
|
||||
self.agent.logger.warning("Cannot set beliefs; agent's BDI is not yet initialized.")
|
||||
return
|
||||
|
||||
if not beliefs:
|
||||
self.agent.logger.debug("Received an empty set of beliefs. No beliefs were updated.")
|
||||
return
|
||||
|
||||
# Set new beliefs (outdated beliefs are automatically removed)
|
||||
for belief, arguments in beliefs.items():
|
||||
self.agent.logger.debug("Setting belief %s with arguments %s", belief, arguments)
|
||||
self.agent.bdi.set_belief(belief, *arguments)
|
||||
|
||||
# Special case: if there's a new user message, flag that we haven't responded yet
|
||||
if belief == "user_said":
|
||||
self.agent.bdi.set_belief("new_message")
|
||||
self.agent.logger.debug(
|
||||
"Detected 'user_said' belief, also setting 'new_message' belief."
|
||||
)
|
||||
|
||||
self.logger.info("Set belief %s with arguments %s", belief, arguments)
|
||||
self.agent.logger.info("Successfully updated %d beliefs.", len(beliefs))
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import logging
|
||||
|
||||
from spade.behaviour import CyclicBehaviour
|
||||
from spade.message import Message
|
||||
|
||||
@@ -12,19 +10,14 @@ class ReceiveLLMResponseBehaviour(CyclicBehaviour):
|
||||
Adds behavior to receive responses from the LLM Agent.
|
||||
"""
|
||||
|
||||
logger = logging.getLogger("BDI/LLM Receiver")
|
||||
|
||||
async def run(self):
|
||||
t = settings.llm_settings.llm_response_rcv_timeout
|
||||
msg = await self.receive(timeout=t)
|
||||
if not msg:
|
||||
return
|
||||
msg = await self.receive()
|
||||
|
||||
sender = msg.sender.node
|
||||
match sender:
|
||||
case settings.agent_settings.llm_agent_name:
|
||||
content = msg.body
|
||||
self.logger.info("Received LLM response: %s", content)
|
||||
self.agent.logger.info("Received LLM response: %s", content)
|
||||
|
||||
speech_command = SpeechCommand(data=content)
|
||||
|
||||
@@ -36,9 +29,9 @@ class ReceiveLLMResponseBehaviour(CyclicBehaviour):
|
||||
body=speech_command.model_dump_json(),
|
||||
)
|
||||
|
||||
self.logger.debug("Sending message: %s", message)
|
||||
self.agent.logger.debug("Sending message: %s", message)
|
||||
|
||||
await self.send(message)
|
||||
case _:
|
||||
self.logger.debug("Not from the llm, discarding message")
|
||||
self.agent.logger.debug("Discarding message from %s", sender)
|
||||
pass
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
|
||||
from spade.behaviour import CyclicBehaviour
|
||||
from spade.message import Message
|
||||
@@ -9,8 +7,6 @@ from control_backend.core.config import settings
|
||||
|
||||
|
||||
class BeliefFromText(CyclicBehaviour):
|
||||
logger = logging.getLogger("Belief From Text")
|
||||
|
||||
# TODO: LLM prompt nog hardcoded
|
||||
llm_instruction_prompt = """
|
||||
You are an information extraction assistent for a BDI agent. Your task is to extract values \
|
||||
@@ -39,18 +35,15 @@ class BeliefFromText(CyclicBehaviour):
|
||||
beliefs = {"mood": ["X"], "car": ["Y"]}
|
||||
|
||||
async def run(self):
|
||||
t = settings.behaviour_settings.default_rcv_timeout
|
||||
msg = await self.receive(timeout=t)
|
||||
if msg:
|
||||
sender = msg.sender.node
|
||||
match sender:
|
||||
case settings.agent_settings.transcription_agent_name:
|
||||
self.logger.info("Received text from transcriber.")
|
||||
await self._process_transcription_demo(msg.body)
|
||||
case _:
|
||||
self.logger.info("Received message from other agent.")
|
||||
pass
|
||||
await asyncio.sleep(1)
|
||||
msg = await self.receive()
|
||||
sender = msg.sender.node
|
||||
match sender:
|
||||
case settings.agent_settings.transcription_agent_name:
|
||||
self.logger.debug("Received text from transcriber: %s", msg.body)
|
||||
await self._process_transcription_demo(msg.body)
|
||||
case _:
|
||||
self.logger.info("Discarding message from %s", sender)
|
||||
pass
|
||||
|
||||
async def _process_transcription(self, text: str):
|
||||
text_prompt = f"Text: {text}"
|
||||
@@ -76,10 +69,10 @@ class BeliefFromText(CyclicBehaviour):
|
||||
belief_message.thread = "beliefs"
|
||||
|
||||
await self.send(belief_message)
|
||||
self.logger.info("Sent beliefs to BDI.")
|
||||
self.agent.logger.info("Sent beliefs to BDI.")
|
||||
except json.JSONDecodeError:
|
||||
# Parsing failed, so the response is in the wrong format, log warning
|
||||
self.logger.warning("Received LLM response in incorrect format.")
|
||||
self.agent.logger.warning("Received LLM response in incorrect format.")
|
||||
|
||||
async def _process_transcription_demo(self, txt: str):
|
||||
"""
|
||||
@@ -98,4 +91,4 @@ class BeliefFromText(CyclicBehaviour):
|
||||
belief_msg.thread = "beliefs"
|
||||
|
||||
await self.send(belief_msg)
|
||||
self.logger.info("Sent beliefs to Belief Collector.")
|
||||
self.logger.info("Sent %d beliefs to the belief collector.", len(belief["beliefs"]))
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
from spade.agent import Agent
|
||||
from control_backend.agents.base import BaseAgent
|
||||
|
||||
from control_backend.agents.bdi.behaviours.text_belief_extractor import BeliefFromText
|
||||
from .behaviours.text_belief_extractor import BeliefFromText
|
||||
|
||||
|
||||
class TBeliefExtractor(Agent):
|
||||
class TBeliefExtractorAgent(BaseAgent):
|
||||
async def setup(self):
|
||||
self.b = BeliefFromText()
|
||||
self.add_behaviour(self.b)
|
||||
self.add_behaviour(BeliefFromText())
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
import json
|
||||
import logging
|
||||
from json import JSONDecodeError
|
||||
|
||||
from spade.agent import Message
|
||||
from spade.behaviour import CyclicBehaviour
|
||||
|
||||
from control_backend.core.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ContinuousBeliefCollector(CyclicBehaviour):
|
||||
"""
|
||||
@@ -16,19 +14,17 @@ class ContinuousBeliefCollector(CyclicBehaviour):
|
||||
"""
|
||||
|
||||
async def run(self):
|
||||
t = settings.behaviour_settings.default_rcv_timeout
|
||||
msg = await self.receive(timeout=t)
|
||||
if msg:
|
||||
await self._process_message(msg)
|
||||
msg = await self.receive()
|
||||
await self._process_message(msg)
|
||||
|
||||
async def _process_message(self, msg: Message):
|
||||
sender_node = self._sender_node(msg)
|
||||
sender_node = msg.sender.node
|
||||
|
||||
# Parse JSON payload
|
||||
try:
|
||||
payload = json.loads(msg.body)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
except JSONDecodeError as e:
|
||||
self.agent.logger.warning(
|
||||
"BeliefCollector: failed to parse JSON from %s. Body=%r Error=%s",
|
||||
sender_node,
|
||||
msg.body,
|
||||
@@ -40,32 +36,19 @@ class ContinuousBeliefCollector(CyclicBehaviour):
|
||||
|
||||
# Prefer explicit 'type' field
|
||||
if msg_type == "belief_extraction_text" or sender_node == "belief_text_agent_mock":
|
||||
logger.info(
|
||||
"BeliefCollector: message routed to _handle_belief_text (sender=%s)", sender_node
|
||||
self.agent.logger.debug(
|
||||
"Message routed to _handle_belief_text (sender=%s)", sender_node
|
||||
)
|
||||
await self._handle_belief_text(payload, sender_node)
|
||||
# This is not implemented yet, but we keep the structure for future use
|
||||
elif msg_type == "emotion_extraction_text" or sender_node == "emo_text_agent_mock":
|
||||
logger.info(
|
||||
"BeliefCollector: message routed to _handle_emo_text (sender=%s)", sender_node
|
||||
)
|
||||
self.agent.logger.debug("Message routed to _handle_emo_text (sender=%s)", sender_node)
|
||||
await self._handle_emo_text(payload, sender_node)
|
||||
else:
|
||||
logger.info(
|
||||
"BeliefCollector: unrecognized message (sender=%s, type=%r). Ignoring.",
|
||||
sender_node,
|
||||
msg_type,
|
||||
self.agent.logger.warning(
|
||||
"Unrecognized message (sender=%s, type=%r). Ignoring.", sender_node, msg_type
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _sender_node(msg: Message) -> str:
|
||||
"""
|
||||
Extracts the 'node' (localpart) of the sender JID.
|
||||
E.g., 'agent@host/resource' -> 'agent'
|
||||
"""
|
||||
s = str(msg.sender) if msg.sender is not None else "no_sender"
|
||||
return s.split("@", 1)[0] if "@" in s else s
|
||||
|
||||
async def _handle_belief_text(self, payload: dict, origin: str):
|
||||
"""
|
||||
Expected payload:
|
||||
@@ -79,21 +62,13 @@ class ContinuousBeliefCollector(CyclicBehaviour):
|
||||
beliefs = payload.get("beliefs", {})
|
||||
|
||||
if not beliefs:
|
||||
logger.info("BeliefCollector: no beliefs to process.")
|
||||
self.agent.logger.debug("Received empty beliefs set.")
|
||||
return
|
||||
|
||||
if not isinstance(beliefs, dict):
|
||||
logger.warning("BeliefCollector: 'beliefs' is not a dict: %r", beliefs)
|
||||
return
|
||||
|
||||
if not all(isinstance(v, list) for v in beliefs.values()):
|
||||
logger.warning("BeliefCollector: 'beliefs' values are not all lists: %r", beliefs)
|
||||
return
|
||||
|
||||
logger.info("BeliefCollector: forwarding %d beliefs.", len(beliefs))
|
||||
self.agent.logger.debug("Forwarding %d beliefs.", len(beliefs))
|
||||
for belief_name, belief_list in beliefs.items():
|
||||
for belief in belief_list:
|
||||
logger.info(" - %s %s", belief_name, str(belief))
|
||||
self.agent.logger.debug(" - %s %s", belief_name, str(belief))
|
||||
|
||||
await self._send_beliefs_to_bdi(beliefs, origin=origin)
|
||||
|
||||
@@ -114,4 +89,4 @@ class ContinuousBeliefCollector(CyclicBehaviour):
|
||||
msg.body = json.dumps(beliefs)
|
||||
|
||||
await self.send(msg)
|
||||
logger.info("BeliefCollector: sent %d belief(s) to BDI at %s", len(beliefs), to_jid)
|
||||
self.agent.logger.info("Sent %d belief(s) to BDI core.", len(beliefs))
|
||||
|
||||
@@ -1,15 +1,11 @@
|
||||
import logging
|
||||
|
||||
from spade.agent import Agent
|
||||
from control_backend.agents.base import BaseAgent
|
||||
|
||||
from .behaviours.continuous_collect import ContinuousBeliefCollector
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BeliefCollectorAgent(Agent):
|
||||
class BeliefCollectorAgent(BaseAgent):
|
||||
async def setup(self):
|
||||
logger.info("BeliefCollectorAgent starting (%s)", self.jid)
|
||||
self.logger.info("BeliefCollectorAgent starting (%s)", self.jid)
|
||||
# Attach the continuous collector behaviour (listens and forwards to BDI)
|
||||
self.add_behaviour(ContinuousBeliefCollector())
|
||||
logger.info("BeliefCollectorAgent ready.")
|
||||
self.logger.info("BeliefCollectorAgent ready.")
|
||||
|
||||
@@ -1,31 +1,24 @@
|
||||
"""
|
||||
LLM Agent module for routing text queries from the BDI Core Agent to a local LLM
|
||||
service and returning its responses back to the BDI Core Agent.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
import httpx
|
||||
from spade.agent import Agent
|
||||
from spade.behaviour import CyclicBehaviour
|
||||
from spade.message import Message
|
||||
|
||||
from control_backend.agents.llm.llm_instructions import LLMInstructions
|
||||
from control_backend.agents import BaseAgent
|
||||
from control_backend.core.config import settings
|
||||
|
||||
from .llm_instructions import LLMInstructions
|
||||
|
||||
class LLMAgent(Agent):
|
||||
|
||||
class LLMAgent(BaseAgent):
|
||||
"""
|
||||
Agent responsible for processing user text input and querying a locally
|
||||
hosted LLM for text generation. Receives messages from the BDI Core Agent
|
||||
and responds with processed LLM output.
|
||||
"""
|
||||
|
||||
logger = logging.getLogger("llm_agent")
|
||||
|
||||
class ReceiveMessageBehaviour(CyclicBehaviour):
|
||||
"""
|
||||
Cyclic behaviour to continuously listen for incoming messages from
|
||||
@@ -37,13 +30,10 @@ class LLMAgent(Agent):
|
||||
Receives SPADE messages and processes only those originating from the
|
||||
configured BDI agent.
|
||||
"""
|
||||
t = settings.behaviour_settings.llm_response_rcv_timeout
|
||||
msg = await self.receive(timeout=t)
|
||||
if not msg:
|
||||
return
|
||||
msg = await self.receive()
|
||||
|
||||
sender = msg.sender.node
|
||||
self.agent.logger.info(
|
||||
self.agent.logger.debug(
|
||||
"Received message: %s from %s",
|
||||
msg.body,
|
||||
sender,
|
||||
@@ -166,7 +156,6 @@ class LLMAgent(Agent):
|
||||
Sets up the SPADE behaviour to filter and process messages from the
|
||||
BDI Core Agent.
|
||||
"""
|
||||
self.logger.info("LLMAgent setup complete")
|
||||
|
||||
behaviour = self.ReceiveMessageBehaviour()
|
||||
self.add_behaviour(behaviour)
|
||||
self.logger.info("LLMAgent setup complete")
|
||||
|
||||
@@ -1,19 +1,16 @@
|
||||
import json
|
||||
import logging
|
||||
|
||||
import spade.agent
|
||||
import zmq
|
||||
from spade.agent import Agent
|
||||
from spade.behaviour import CyclicBehaviour
|
||||
from zmq.asyncio import Context
|
||||
|
||||
from control_backend.agents import BaseAgent
|
||||
from control_backend.core.config import settings
|
||||
from control_backend.schemas.ri_message import SpeechCommand
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RICommandAgent(Agent):
|
||||
class RICommandAgent(BaseAgent):
|
||||
subsocket: zmq.Socket
|
||||
pubsocket: zmq.Socket
|
||||
address = ""
|
||||
@@ -51,7 +48,7 @@ class RICommandAgent(Agent):
|
||||
# Send to the robot.
|
||||
await self.agent.pubsocket.send_json(message.model_dump())
|
||||
except Exception as e:
|
||||
logger.error("Error processing message: %s", e)
|
||||
self.agent.logger.error("Error processing message: %s", e)
|
||||
|
||||
class SendPythonCommandsBehaviour(CyclicBehaviour):
|
||||
"""Behaviour for sending commands received from other Python agents."""
|
||||
@@ -63,13 +60,13 @@ class RICommandAgent(Agent):
|
||||
speech_command = SpeechCommand.model_validate_json(message.body)
|
||||
await self.agent.pubsocket.send_json(speech_command.model_dump())
|
||||
except Exception as e:
|
||||
logger.error("Error processing message: %s", e)
|
||||
self.agent.logger.error("Error processing message: %s", e)
|
||||
|
||||
async def setup(self):
|
||||
"""
|
||||
Setup the command agent
|
||||
"""
|
||||
logger.info("Setting up %s", self.jid)
|
||||
self.logger.info("Setting up %s", self.jid)
|
||||
|
||||
context = Context.instance()
|
||||
|
||||
@@ -90,4 +87,4 @@ class RICommandAgent(Agent):
|
||||
self.add_behaviour(commands_behaviour)
|
||||
self.add_behaviour(self.SendPythonCommandsBehaviour())
|
||||
|
||||
logger.info("Finished setting up %s", self.jid)
|
||||
self.logger.info("Finished setting up %s", self.jid)
|
||||
|
||||
@@ -1,18 +1,16 @@
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
import zmq
|
||||
from spade.agent import Agent
|
||||
from spade.behaviour import CyclicBehaviour
|
||||
from zmq.asyncio import Context
|
||||
|
||||
from control_backend.agents.ri_command_agent import RICommandAgent
|
||||
from control_backend.agents import BaseAgent
|
||||
from control_backend.core.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
from .ri_command_agent import RICommandAgent
|
||||
|
||||
|
||||
class RICommunicationAgent(Agent):
|
||||
class RICommunicationAgent(BaseAgent):
|
||||
req_socket: zmq.Socket
|
||||
_address = ""
|
||||
_bind = True
|
||||
@@ -47,12 +45,12 @@ class RICommunicationAgent(Agent):
|
||||
|
||||
# We didnt get a reply :(
|
||||
except TimeoutError:
|
||||
logger.info("No ping retrieved in 3 seconds, killing myself.")
|
||||
self.agent.logger.info("No ping retrieved in 3 seconds, killing myself.")
|
||||
self.kill()
|
||||
|
||||
logger.debug('Received message "%s"', message)
|
||||
self.agent.logger.debug('Received message "%s"', message)
|
||||
if "endpoint" not in message:
|
||||
logger.error("No received endpoint in message, excepted ping endpoint.")
|
||||
self.agent.logger.error("No received endpoint in message, excepted ping endpoint.")
|
||||
return
|
||||
|
||||
# See what endpoint we received
|
||||
@@ -60,7 +58,7 @@ class RICommunicationAgent(Agent):
|
||||
case "ping":
|
||||
await asyncio.sleep(settings.agent_settings.behaviour_settings.ping_sleep_s)
|
||||
case _:
|
||||
logger.info(
|
||||
self.agent.logger.info(
|
||||
"Received message with topic different than ping, while ping expected."
|
||||
)
|
||||
|
||||
@@ -68,7 +66,7 @@ class RICommunicationAgent(Agent):
|
||||
"""
|
||||
Try to setup the communication agent, we have 5 retries in case we dont have a response yet.
|
||||
"""
|
||||
logger.info("Setting up %s", self.jid)
|
||||
self.logger.info("Setting up %s", self.jid)
|
||||
retries = 0
|
||||
|
||||
# Let's try a certain amount of times before failing connection
|
||||
@@ -88,7 +86,7 @@ class RICommunicationAgent(Agent):
|
||||
received_message = await asyncio.wait_for(self.req_socket.recv_json(), timeout=20.0)
|
||||
|
||||
except TimeoutError:
|
||||
logger.warning(
|
||||
self.logger.warning(
|
||||
"No connection established in 20 seconds (attempt %d/%d)",
|
||||
retries + 1,
|
||||
max_retries,
|
||||
@@ -97,7 +95,7 @@ class RICommunicationAgent(Agent):
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Unexpected error during negotiation: %s", e)
|
||||
self.logger.error("Unexpected error during negotiation: %s", e)
|
||||
retries += 1
|
||||
continue
|
||||
|
||||
@@ -105,7 +103,7 @@ class RICommunicationAgent(Agent):
|
||||
endpoint = received_message.get("endpoint")
|
||||
if endpoint != "negotiate/ports":
|
||||
# TODO: Should this send a message back?
|
||||
logger.error(
|
||||
self.logger.error(
|
||||
"Invalid endpoint '%s' received (attempt %d/%d)",
|
||||
endpoint,
|
||||
retries + 1,
|
||||
@@ -144,10 +142,10 @@ class RICommunicationAgent(Agent):
|
||||
)
|
||||
await ri_commands_agent.start()
|
||||
case _:
|
||||
logger.warning("Unhandled negotiation id: %s", id)
|
||||
self.logger.warning("Unhandled negotiation id: %s", id)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error unpacking negotiation data: %s", e)
|
||||
self.logger.error("Error unpacking negotiation data: %s", e)
|
||||
retries += 1
|
||||
continue
|
||||
|
||||
@@ -155,10 +153,10 @@ class RICommunicationAgent(Agent):
|
||||
break
|
||||
|
||||
else:
|
||||
logger.error("Failed to set up RICommunicationAgent after %d retries", max_retries)
|
||||
self.logger.error("Failed to set up RICommunicationAgent after %d retries", max_retries)
|
||||
return
|
||||
|
||||
# Set up ping behaviour
|
||||
listen_behaviour = self.ListenBehaviour()
|
||||
self.add_behaviour(listen_behaviour)
|
||||
logger.info("Finished setting up %s", self.jid)
|
||||
self.logger.info("Finished setting up %s", self.jid)
|
||||
|
||||
@@ -1,2 +0,0 @@
|
||||
from .speech_recognizer import SpeechRecognizer as SpeechRecognizer
|
||||
from .transcription_agent import TranscriptionAgent as TranscriptionAgent
|
||||
@@ -1,20 +1,18 @@
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
import numpy as np
|
||||
import zmq
|
||||
import zmq.asyncio as azmq
|
||||
from spade.agent import Agent
|
||||
from spade.behaviour import CyclicBehaviour
|
||||
from spade.message import Message
|
||||
|
||||
from control_backend.agents.transcription.speech_recognizer import SpeechRecognizer
|
||||
from control_backend.agents import BaseAgent
|
||||
from control_backend.core.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
from .speech_recognizer import SpeechRecognizer
|
||||
|
||||
|
||||
class TranscriptionAgent(Agent):
|
||||
class TranscriptionAgent(BaseAgent):
|
||||
"""
|
||||
An agent which listens to audio fragments with voice, transcribes them, and sends the
|
||||
transcription to other agents.
|
||||
@@ -60,10 +58,10 @@ class TranscriptionAgent(Agent):
|
||||
audio = np.frombuffer(audio, dtype=np.float32)
|
||||
speech = await self._transcribe(audio)
|
||||
if not speech:
|
||||
logger.info("Nothing transcribed.")
|
||||
self.agent.logger.info("Nothing transcribed.")
|
||||
return
|
||||
|
||||
logger.info("Transcribed speech: %s", speech)
|
||||
self.agent.logger.info("Transcribed speech: %s", speech)
|
||||
|
||||
await self._share_transcription(speech)
|
||||
|
||||
@@ -78,7 +76,7 @@ class TranscriptionAgent(Agent):
|
||||
self.audio_in_socket.connect(self.audio_in_address)
|
||||
|
||||
async def setup(self):
|
||||
logger.info("Setting up %s", self.jid)
|
||||
self.logger.info("Setting up %s", self.jid)
|
||||
|
||||
self._connect_audio_in_socket()
|
||||
|
||||
@@ -86,4 +84,4 @@ class TranscriptionAgent(Agent):
|
||||
transcribing.warmup()
|
||||
self.add_behaviour(transcribing)
|
||||
|
||||
logger.info("Finished setting up %s", self.jid)
|
||||
self.logger.info("Finished setting up %s", self.jid)
|
||||
|
||||
@@ -1,16 +1,13 @@
|
||||
import logging
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
import zmq
|
||||
import zmq.asyncio as azmq
|
||||
from spade.agent import Agent
|
||||
from spade.behaviour import CyclicBehaviour
|
||||
|
||||
from control_backend.agents.transcription import TranscriptionAgent
|
||||
from control_backend.agents import BaseAgent
|
||||
from control_backend.core.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
from .transcription.transcription_agent import TranscriptionAgent
|
||||
|
||||
|
||||
class SocketPoller[T]:
|
||||
@@ -70,7 +67,7 @@ class Streaming(CyclicBehaviour):
|
||||
poll_time = settings.behaviour_settings.vad_poll_time
|
||||
while await self.audio_in_poller.poll(poll_time) is not None:
|
||||
discarded += 1
|
||||
logging.info(f"Discarded {discarded} audio packets before starting.")
|
||||
self.agent.logger.info(f"Discarded {discarded} audio packets before starting.")
|
||||
self._ready = True
|
||||
|
||||
async def run(self) -> None:
|
||||
@@ -80,7 +77,9 @@ class Streaming(CyclicBehaviour):
|
||||
data = await self.audio_in_poller.poll()
|
||||
if data is None:
|
||||
if len(self.audio_buffer) > 0:
|
||||
logger.debug("No audio data received. Discarding buffer until new data arrives.")
|
||||
self.agent.logger.debug(
|
||||
"No audio data received. Discarding buffer until new data arrives."
|
||||
)
|
||||
self.audio_buffer = np.array([], dtype=np.float32)
|
||||
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
|
||||
return
|
||||
@@ -93,7 +92,7 @@ class Streaming(CyclicBehaviour):
|
||||
|
||||
if prob > prob_threshold:
|
||||
if self.i_since_speech > non_speech_patience:
|
||||
logger.debug("Speech started.")
|
||||
self.agent.logger.debug("Speech started.")
|
||||
self.audio_buffer = np.append(self.audio_buffer, chunk)
|
||||
self.i_since_speech = 0
|
||||
return
|
||||
@@ -106,7 +105,7 @@ class Streaming(CyclicBehaviour):
|
||||
|
||||
# Speech probably ended. Make sure we have a usable amount of data.
|
||||
if len(self.audio_buffer) >= 3 * len(chunk):
|
||||
logger.debug("Speech ended.")
|
||||
self.agent.logger.debug("Speech ended.")
|
||||
await self.audio_out_socket.send(self.audio_buffer[: -2 * len(chunk)].tobytes())
|
||||
|
||||
# At this point, we know that the speech has ended.
|
||||
@@ -114,7 +113,7 @@ class Streaming(CyclicBehaviour):
|
||||
self.audio_buffer = chunk
|
||||
|
||||
|
||||
class VADAgent(Agent):
|
||||
class VADAgent(BaseAgent):
|
||||
"""
|
||||
An agent which listens to an audio stream, does Voice Activity Detection (VAD), and sends
|
||||
fragments with detected speech to other agents over ZeroMQ.
|
||||
@@ -159,12 +158,12 @@ class VADAgent(Agent):
|
||||
self.audio_out_socket = azmq.Context.instance().socket(zmq.PUB)
|
||||
return self.audio_out_socket.bind_to_random_port("tcp://*", max_tries=100)
|
||||
except zmq.ZMQBindError:
|
||||
logger.error("Failed to bind an audio output socket after 100 tries.")
|
||||
self.logger.error("Failed to bind an audio output socket after 100 tries.")
|
||||
self.audio_out_socket = None
|
||||
return None
|
||||
|
||||
async def setup(self):
|
||||
logger.info("Setting up %s", self.jid)
|
||||
self.logger.info("Setting up %s", self.jid)
|
||||
|
||||
self._connect_audio_in_socket()
|
||||
|
||||
@@ -181,4 +180,4 @@ class VADAgent(Agent):
|
||||
transcriber = TranscriptionAgent(audio_out_address)
|
||||
await transcriber.start()
|
||||
|
||||
logger.info("Finished setting up %s", self.jid)
|
||||
self.logger.info("Finished setting up %s", self.jid)
|
||||
|
||||
33
src/control_backend/api/v1/endpoints/logs.py
Normal file
33
src/control_backend/api/v1/endpoints/logs.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import logging
|
||||
|
||||
import zmq
|
||||
from fastapi import APIRouter
|
||||
from fastapi.responses import StreamingResponse
|
||||
from pyjabber.server_parameters import json
|
||||
from zmq.asyncio import Context
|
||||
|
||||
from control_backend.core.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/logs/stream")
|
||||
async def log_stream():
|
||||
context = Context.instance()
|
||||
socket = context.socket(zmq.SUB)
|
||||
|
||||
for level in logging.getLevelNamesMapping():
|
||||
socket.subscribe(topic=level)
|
||||
|
||||
socket.connect(settings.zmq_settings.internal_sub_address)
|
||||
|
||||
async def gen():
|
||||
while True:
|
||||
_, message = await socket.recv_multipart()
|
||||
message = message.decode().strip()
|
||||
json_data = json.dumps(message)
|
||||
yield f"data: {json_data}\n\n"
|
||||
|
||||
return StreamingResponse(gen(), media_type="text/event-stream")
|
||||
@@ -1,6 +1,6 @@
|
||||
from fastapi.routing import APIRouter
|
||||
|
||||
from control_backend.api.v1.endpoints import command, message, sse
|
||||
from control_backend.api.v1.endpoints import command, logs, message, sse
|
||||
|
||||
api_router = APIRouter()
|
||||
|
||||
@@ -9,3 +9,5 @@ api_router.include_router(message.router, tags=["Messages"])
|
||||
api_router.include_router(sse.router, tags=["SSE"])
|
||||
|
||||
api_router.include_router(command.router, tags=["Commands"])
|
||||
|
||||
api_router.include_router(logs.router, tags=["Logs"])
|
||||
|
||||
1
src/control_backend/logging/__init__.py
Normal file
1
src/control_backend/logging/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .setup_logging import setup_logging as setup_logging
|
||||
59
src/control_backend/logging/setup_logging.py
Normal file
59
src/control_backend/logging/setup_logging.py
Normal file
@@ -0,0 +1,59 @@
|
||||
import logging
|
||||
import logging.config
|
||||
import os
|
||||
|
||||
import yaml
|
||||
import zmq
|
||||
|
||||
from control_backend.core.config import settings
|
||||
|
||||
|
||||
def add_logging_level(level_name: str, level_num: int, method_name: str | None = None) -> None:
|
||||
"""
|
||||
Adds a logging level to the `logging` module and the
|
||||
currently configured logging class.
|
||||
"""
|
||||
if not method_name:
|
||||
method_name = level_name.lower()
|
||||
|
||||
if hasattr(logging, level_name):
|
||||
raise AttributeError(f"{level_name} already defined in logging module")
|
||||
if hasattr(logging, method_name):
|
||||
raise AttributeError(f"{method_name} already defined in logging module")
|
||||
if hasattr(logging.getLoggerClass(), method_name):
|
||||
raise AttributeError(f"{method_name} already defined in logger class")
|
||||
|
||||
def log_for_level(self, message, *args, **kwargs):
|
||||
if self.isEnabledFor(level_num):
|
||||
self._log(level_num, message, args, **kwargs)
|
||||
|
||||
def log_to_root(message, *args, **kwargs):
|
||||
logging.log(level_num, message, *args, **kwargs)
|
||||
|
||||
logging.addLevelName(level_num, level_name)
|
||||
setattr(logging, level_name, level_num)
|
||||
setattr(logging.getLoggerClass(), method_name, log_for_level)
|
||||
setattr(logging, method_name, log_to_root)
|
||||
|
||||
|
||||
def setup_logging(path: str = ".logging_config.yaml") -> None:
|
||||
if os.path.exists(path):
|
||||
with open(path) as f:
|
||||
try:
|
||||
config = yaml.safe_load(f.read())
|
||||
except (AttributeError, yaml.YAMLError) as e:
|
||||
logging.warning(f"Could not load logging configuration: {e}")
|
||||
config = {}
|
||||
|
||||
if "custom_levels" in config:
|
||||
for level_name, level_num in config["custom_levels"].items():
|
||||
add_logging_level(level_name, level_num)
|
||||
|
||||
if config.get("handlers") is not None and config.get("handlers").get("ui"):
|
||||
pub_socket = zmq.Context.instance().socket(zmq.PUB)
|
||||
pub_socket.connect(settings.zmq_settings.internal_pub_address)
|
||||
config["handlers"]["ui"]["interface_or_socket"] = pub_socket
|
||||
logging.config.dictConfig(config)
|
||||
|
||||
else:
|
||||
logging.warning("Logging config file not found. Using default logging configuration.")
|
||||
@@ -1,6 +1,3 @@
|
||||
# Standard library imports
|
||||
|
||||
# External imports
|
||||
import contextlib
|
||||
import logging
|
||||
import threading
|
||||
@@ -10,17 +7,18 @@ from fastapi import FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from zmq.asyncio import Context
|
||||
|
||||
from control_backend.agents.bdi.bdi_core import BDICoreAgent
|
||||
from control_backend.agents.bdi.text_extractor import TBeliefExtractor
|
||||
from control_backend.agents.belief_collector.belief_collector import BeliefCollectorAgent
|
||||
from control_backend.agents.llm.llm import LLMAgent
|
||||
from control_backend.agents.ri_communication_agent import RICommunicationAgent
|
||||
from control_backend.agents.vad_agent import VADAgent
|
||||
from control_backend.agents import (
|
||||
BeliefCollectorAgent,
|
||||
LLMAgent,
|
||||
RICommunicationAgent,
|
||||
VADAgent,
|
||||
)
|
||||
from control_backend.agents.bdi import BDICoreAgent, TBeliefExtractorAgent
|
||||
from control_backend.api.v1.router import api_router
|
||||
from control_backend.core.config import settings
|
||||
from control_backend.logging import setup_logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
|
||||
def setup_sockets():
|
||||
@@ -44,7 +42,13 @@ def setup_sockets():
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
logger.info("%s starting up.", app.title)
|
||||
"""
|
||||
Application lifespan context manager to handle startup and shutdown events.
|
||||
"""
|
||||
# --- APPLICATION STARTUP ---
|
||||
setup_logging()
|
||||
logger.info("%s is starting up.", app.title)
|
||||
logger.warning("testing extra", extra={"extra1": "one", "extra2": "two"})
|
||||
|
||||
# Initiate sockets
|
||||
proxy_thread = threading.Thread(target=setup_sockets)
|
||||
@@ -57,50 +61,83 @@ async def lifespan(app: FastAPI):
|
||||
endpoints_pub_socket.connect(settings.zmq_settings.internal_pub_address)
|
||||
app.state.endpoints_pub_socket = endpoints_pub_socket
|
||||
|
||||
# Initiate agents
|
||||
ri_communication_agent = RICommunicationAgent(
|
||||
settings.agent_settings.ri_communication_agent_name + "@" + settings.agent_settings.host,
|
||||
settings.agent_settings.ri_communication_agent_name,
|
||||
address=settings.zmq_settings.ri_communication_address,
|
||||
bind=True,
|
||||
)
|
||||
await ri_communication_agent.start()
|
||||
# --- Initialize Agents ---
|
||||
logger.info("Initializing and starting agents.")
|
||||
agents_to_start = {
|
||||
"RICommunicationAgent": (
|
||||
RICommunicationAgent,
|
||||
{
|
||||
"name": settings.agent_settings.ri_communication_agent_name,
|
||||
"jid": f"{settings.agent_settings.ri_communication_agent_name}"
|
||||
f"@{settings.agent_settings.host}",
|
||||
"password": settings.agent_settings.ri_communication_agent_name,
|
||||
"address": settings.zmq_settings.ri_communication_address,
|
||||
"bind": True,
|
||||
},
|
||||
),
|
||||
"LLMAgent": (
|
||||
LLMAgent,
|
||||
{
|
||||
"name": settings.agent_settings.llm_agent_name,
|
||||
"jid": f"{settings.agent_settings.llm_agent_name}@{settings.agent_settings.host}",
|
||||
"password": settings.agent_settings.llm_agent_name,
|
||||
},
|
||||
),
|
||||
"BDICoreAgent": (
|
||||
BDICoreAgent,
|
||||
{
|
||||
"name": settings.agent_settings.bdi_core_agent_name,
|
||||
"jid": f"{settings.agent_settings.bdi_core_agent_name}@"
|
||||
f"{settings.agent_settings.host}",
|
||||
"password": settings.agent_settings.bdi_core_agent_name,
|
||||
"asl": "src/control_backend/agents/bdi/rules.asl",
|
||||
},
|
||||
),
|
||||
"BeliefCollectorAgent": (
|
||||
BeliefCollectorAgent,
|
||||
{
|
||||
"name": settings.agent_settings.belief_collector_agent_name,
|
||||
"jid": f"{settings.agent_settings.belief_collector_agent_name}@"
|
||||
f"{settings.agent_settings.host}",
|
||||
"password": settings.agent_settings.belief_collector_agent_name,
|
||||
},
|
||||
),
|
||||
"TBeliefExtractor": (
|
||||
TBeliefExtractorAgent,
|
||||
{
|
||||
"name": settings.agent_settings.text_belief_extractor_agent_name,
|
||||
"jid": f"{settings.agent_settings.text_belief_extractor_agent_name}@"
|
||||
f"{settings.agent_settings.host}",
|
||||
"password": settings.agent_settings.text_belief_extractor_agent_name,
|
||||
},
|
||||
),
|
||||
"VADAgent": (
|
||||
VADAgent,
|
||||
{"audio_in_address": settings.zmq_settings.vad_agent_address, "audio_in_bind": False},
|
||||
),
|
||||
}
|
||||
|
||||
llm_agent = LLMAgent(
|
||||
settings.agent_settings.llm_agent_name + "@" + settings.agent_settings.host,
|
||||
settings.agent_settings.llm_agent_name,
|
||||
)
|
||||
await llm_agent.start()
|
||||
for name, (agent_class, kwargs) in agents_to_start.items():
|
||||
try:
|
||||
logger.debug("Starting agent: %s", name)
|
||||
agent_instance = agent_class(**{k: v for k, v in kwargs.items() if k != "name"})
|
||||
await agent_instance.start()
|
||||
logger.info("Agent '%s' started successfully.", name)
|
||||
except Exception as e:
|
||||
logger.error("Failed to start agent '%s': %s", name, e, exc_info=True)
|
||||
# Consider if the application should continue if an agent fails to start.
|
||||
raise
|
||||
|
||||
bdi_core = BDICoreAgent(
|
||||
settings.agent_settings.bdi_core_agent_name + "@" + settings.agent_settings.host,
|
||||
settings.agent_settings.bdi_core_agent_name,
|
||||
"src/control_backend/agents/bdi/rules.asl",
|
||||
)
|
||||
await bdi_core.start()
|
||||
|
||||
belief_collector = BeliefCollectorAgent(
|
||||
settings.agent_settings.belief_collector_agent_name + "@" + settings.agent_settings.host,
|
||||
settings.agent_settings.belief_collector_agent_name,
|
||||
)
|
||||
await belief_collector.start()
|
||||
|
||||
text_belief_extractor = TBeliefExtractor(
|
||||
settings.agent_settings.text_belief_extractor_agent_name
|
||||
+ "@"
|
||||
+ settings.agent_settings.host,
|
||||
settings.agent_settings.text_belief_extractor_agent_name,
|
||||
)
|
||||
await text_belief_extractor.start()
|
||||
|
||||
_temp_vad_agent = VADAgent(settings.zmq_settings.vad_agent_address, False)
|
||||
await _temp_vad_agent.start()
|
||||
logger.info("VAD agent started, now making ready...")
|
||||
await _temp_vad_agent.streaming_behaviour.reset()
|
||||
logger.info("Application startup complete.")
|
||||
|
||||
yield
|
||||
|
||||
logger.info("%s shutting down.", app.title)
|
||||
# --- APPLICATION SHUTDOWN ---
|
||||
logger.info("%s is shutting down.", app.title)
|
||||
|
||||
# Potential shutdown logic goes here
|
||||
|
||||
logger.info("Application shutdown complete.")
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user