fix: fixed new tests and merged dev into branch

ref: N25B-256
This commit is contained in:
Björn Otgaar
2025-11-05 16:29:56 +01:00
29 changed files with 520 additions and 298 deletions

View File

@@ -1,8 +1,10 @@
import logging
from spade.behaviour import CyclicBehaviour
from spade.message import Message
from control_backend.core.config import settings
from control_backend.schemas.ri_message import SpeechCommand
class ReceiveLLMResponseBehaviour(CyclicBehaviour):
@@ -10,7 +12,7 @@ class ReceiveLLMResponseBehaviour(CyclicBehaviour):
Adds behavior to receive responses from the LLM Agent.
"""
logger = logging.getLogger("BDI/LLM Reciever")
logger = logging.getLogger("BDI/LLM Receiver")
async def run(self):
msg = await self.receive(timeout=2)
@@ -22,7 +24,20 @@ class ReceiveLLMResponseBehaviour(CyclicBehaviour):
case settings.agent_settings.llm_agent_name:
content = msg.body
self.logger.info("Received LLM response: %s", content)
# Here the BDI can pass the message back as a response
speech_command = SpeechCommand(data=content)
message = Message(
to=settings.agent_settings.ri_command_agent_name
+ "@"
+ settings.agent_settings.host,
sender=self.agent.jid,
body=speech_command.model_dump_json(),
)
self.logger.debug("Sending message: %s", message)
await self.send(message)
case _:
self.logger.debug("Not from the llm, discarding message")
pass

View File

@@ -13,23 +13,23 @@ class BeliefFromText(CyclicBehaviour):
# TODO: LLM prompt nog hardcoded
llm_instruction_prompt = """
You are an information extraction assistent for a BDI agent.
Your task is to extract values from a user's text to bind a list of ungrounded beliefs. Rules:
You will receive a JSON object with "beliefs"
(a list of ungrounded AgentSpeak beliefs) and "text" (user's transcript).
You are an information extraction assistent for a BDI agent. Your task is to extract values \
from a user's text to bind a list of ungrounded beliefs. Rules:
You will receive a JSON object with "beliefs" (a list of ungrounded AgentSpeak beliefs) \
and "text" (user's transcript).
Analyze the text to find values that sematically match the variables (X,Y,Z) in the beliefs.
A single piece of text might contain multiple instances that match a belief.
Respond ONLY with a single JSON object.
The JSON object's keys should be the belief functors (e.g., "weather").
The value for each key must be a list of lists.
Each inner list must contain the extracted arguments
(as strings) for one instance of that belief.
CRITICAL: If no information in the text matches a belief,
DO NOT include that key in your response.
Each inner list must contain the extracted arguments (as strings) for one instance \
of that belief.
CRITICAL: If no information in the text matches a belief, DO NOT include that key \
in your response.
"""
# on_start agent receives message containing the beliefs to look out
# for and sets up the LLM with instruction prompt
# on_start agent receives message containing the beliefs to look out for and
# sets up the LLM with instruction prompt
# async def on_start(self):
# msg = await self.receive(timeout=0.1)
# self.beliefs = dict uit message

View File

@@ -70,8 +70,7 @@ class ContinuousBeliefCollector(CyclicBehaviour):
Expected payload:
{
"type": "belief_extraction_text",
"beliefs": {"user_said": ["hello"","Can you help me?",
"stop talking to me","No","Pepper do a dance"]}
"beliefs": {"user_said": ["Can you help me?"]}
}

View File

@@ -3,8 +3,10 @@ LLM Agent module for routing text queries from the BDI Core Agent to a local LLM
service and returning its responses back to the BDI Core Agent.
"""
import json
import logging
from typing import Any
import re
from collections.abc import AsyncGenerator
import httpx
from spade.agent import Agent
@@ -54,11 +56,16 @@ class LLMAgent(Agent):
async def _process_bdi_message(self, message: Message):
"""
Forwards user text to the LLM and replies with the generated text.
Forwards user text from the BDI to the LLM and replies with the generated text in chunks
separated by punctuation.
"""
user_text = message.body
llm_response = await self._query_llm(user_text)
await self._reply(llm_response)
# Consume the streaming generator and send a reply for every chunk
async for chunk in self._query_llm(user_text):
await self._reply(chunk)
self.agent.logger.debug(
"Finished processing BDI message. Response sent in chunks to BDI Core Agent."
)
async def _reply(self, msg: str):
"""
@@ -69,48 +76,89 @@ class LLMAgent(Agent):
body=msg,
)
await self.send(reply)
self.agent.logger.info("Reply sent to BDI Core Agent")
async def _query_llm(self, prompt: str) -> str:
async def _query_llm(self, prompt: str) -> AsyncGenerator[str]:
"""
Sends a chat completion request to the local LLM service.
Sends a chat completion request to the local LLM service and streams the response by
yielding fragments separated by punctuation like.
:param prompt: Input text prompt to pass to the LLM.
:return: LLM-generated content or fallback message.
:yield: Fragments of the LLM-generated content.
"""
async with httpx.AsyncClient(timeout=120.0) as client:
# Example dynamic content for future (optional)
instructions = LLMInstructions(
"- Be friendly and respectful.\n"
"- Make the conversation feel natural and engaging.\n"
"- Speak like a pirate.\n"
"- When the user asks what you can do, tell them.",
"- Try to learn the user's name during conversation.\n"
"- Suggest playing a game of asking yes or no questions where you think of a word "
"and the user must guess it.",
)
messages = [
{
"role": "developer",
"content": instructions.build_developer_instruction(),
},
{
"role": "user",
"content": prompt,
},
]
instructions = LLMInstructions()
developer_instruction = instructions.build_developer_instruction()
try:
current_chunk = ""
async for token in self._stream_query_llm(messages):
current_chunk += token
response = await client.post(
# Stream the message in chunks separated by punctuation.
# We include the delimiter in the emitted chunk for natural flow.
pattern = re.compile(r".*?(?:,|;|:|—||\.{3}|…|\.|\?|!)\s*", re.DOTALL)
for m in pattern.finditer(current_chunk):
chunk = m.group(0)
if chunk:
yield current_chunk
current_chunk = ""
# Yield any remaining tail
if current_chunk:
yield current_chunk
except httpx.HTTPError as err:
self.agent.logger.error("HTTP error.", exc_info=err)
yield "LLM service unavailable."
except Exception as err:
self.agent.logger.error("Unexpected error.", exc_info=err)
yield "Error processing the request."
async def _stream_query_llm(self, messages) -> AsyncGenerator[str]:
"""Raises httpx.HTTPError when the API gives an error."""
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream(
"POST",
settings.llm_settings.local_llm_url,
headers={"Content-Type": "application/json"},
json={
"model": settings.llm_settings.local_llm_model,
"messages": [
{"role": "developer", "content": developer_instruction},
{"role": "user", "content": prompt},
],
"messages": messages,
"temperature": 0.3,
"stream": True,
},
)
try:
) as response:
response.raise_for_status()
data: dict[str, Any] = response.json()
return (
data.get("choices", [{}])[0]
.get("message", {})
.get("content", "No response")
)
except httpx.HTTPError as err:
self.agent.logger.error("HTTP error: %s", err)
return "LLM service unavailable."
except Exception as err:
self.agent.logger.error("Unexpected error: %s", err)
return "Error processing the request."
async for line in response.aiter_lines():
if not line or not line.startswith("data: "):
continue
data = line[len("data: ") :]
if data.strip() == "[DONE]":
break
try:
event = json.loads(data)
delta = event.get("choices", [{}])[0].get("delta", {}).get("content")
if delta:
yield delta
except json.JSONDecodeError:
self.agent.logger.error("Failed to parse LLM response: %s", data)
async def setup(self):
"""

View File

@@ -28,7 +28,9 @@ class LLMInstructions:
"""
sections = [
"You are a Pepper robot engaging in natural human conversation.",
"Keep responses between 15 sentences, unless instructed otherwise.\n",
"Keep responses between 13 sentences, unless told otherwise.\n",
"You're given goals to reach. Reach them in order, but make the conversation feel "
"natural. Some turns you should not try to achieve your goals.\n",
]
if self.norms:

View File

@@ -11,8 +11,9 @@ class BeliefTextAgent(Agent):
class SendOnceBehaviourBlfText(OneShotBehaviour):
async def run(self):
to_jid = (
f"{settings.agent_settings.belief_collector_agent_name}"
f"@{settings.agent_settings.host}"
settings.agent_settings.belief_collector_agent_name
+ "@"
+ settings.agent_settings.host
)
# Send multiple beliefs in one JSON payload

View File

@@ -1,6 +1,7 @@
import json
import logging
import spade.agent
import zmq
from spade.agent import Agent
from spade.behaviour import CyclicBehaviour
@@ -32,6 +33,8 @@ class RICommandAgent(Agent):
self.bind = bind
class SendCommandsBehaviour(CyclicBehaviour):
"""Behaviour for sending commands received from the UI."""
async def run(self):
"""
Run the command publishing loop indefinetely.
@@ -50,6 +53,18 @@ class RICommandAgent(Agent):
except Exception as e:
logger.error("Error processing message: %s", e)
class SendPythonCommandsBehaviour(CyclicBehaviour):
"""Behaviour for sending commands received from other Python agents."""
async def run(self):
message: spade.agent.Message = await self.receive(timeout=0.1)
if message and message.to == self.agent.jid:
try:
speech_command = SpeechCommand.model_validate_json(message.body)
await self.agent.pubsocket.send_json(speech_command.model_dump())
except Exception as e:
logger.error("Error processing message: %s", e)
async def setup(self):
"""
Setup the command agent
@@ -73,5 +88,6 @@ class RICommandAgent(Agent):
# Add behaviour to our agent
commands_behaviour = self.SendCommandsBehaviour()
self.add_behaviour(commands_behaviour)
self.add_behaviour(self.SendPythonCommandsBehaviour())
logger.info("Finished setting up %s", self.jid)

View File

@@ -63,7 +63,25 @@ class RICommunicationAgent(Agent):
# We didnt get a reply :(
except TimeoutError:
logger.info("No ping retrieved in 3 seconds, killing myself.")
self.kill()
# Tell UI we're disconnected.
topic = b"ping"
data = json.dumps(False).encode()
if self.agent.pub_socket is None:
logger.error("communication agent pub socket not correctly initialized.")
else:
try:
await asyncio.wait_for(
self.agent.pub_socket.send_multipart([topic, data]), 5
)
except TimeoutError:
logger.error(
"Initial connection ping for router timed"
" out in ri_communication_agent."
)
# Try to reboot.
self.agent.setup()
logger.debug('Received message "%s"', message)
if "endpoint" not in message:

View File

@@ -36,16 +36,16 @@ class SpeechRecognizer(abc.ABC):
def _estimate_max_tokens(audio: np.ndarray) -> int:
"""
Estimate the maximum length of a given audio sample in tokens. Assumes a maximum speaking
rate of 300 words per minute (2x average), and assumes that 3 words is 4 tokens.
rate of 450 words per minute (3x average), and assumes that 3 words is 4 tokens.
:param audio: The audio sample (16 kHz) to use for length estimation.
:return: The estimated length of the transcribed audio in tokens.
"""
length_seconds = len(audio) / 16_000
length_minutes = length_seconds / 60
word_count = length_minutes * 300
word_count = length_minutes * 450
token_count = word_count / 3 * 4
return int(token_count)
return int(token_count) + 10
def _get_decode_options(self, audio: np.ndarray) -> dict:
"""
@@ -85,9 +85,10 @@ class MLXWhisperSpeechRecognizer(SpeechRecognizer):
def recognize_speech(self, audio: np.ndarray) -> str:
self.load_model()
return mlx_whisper.transcribe(
audio, path_or_hf_repo=self.model_name, decode_options=self._get_decode_options(audio)
)["text"]
return mlx_whisper.transcribe(audio, path_or_hf_repo=self.model_name)["text"].strip()
audio,
path_or_hf_repo=self.model_name,
**self._get_decode_options(audio),
)["text"].strip()
class OpenAIWhisperSpeechRecognizer(SpeechRecognizer):
@@ -103,6 +104,4 @@ class OpenAIWhisperSpeechRecognizer(SpeechRecognizer):
def recognize_speech(self, audio: np.ndarray) -> str:
self.load_model()
return whisper.transcribe(
self.model, audio, decode_options=self._get_decode_options(audio)
)["text"]
return whisper.transcribe(self.model, audio, **self._get_decode_options(audio))["text"]

View File

@@ -58,6 +58,10 @@ class TranscriptionAgent(Agent):
audio = await self.audio_in_socket.recv()
audio = np.frombuffer(audio, dtype=np.float32)
speech = await self._transcribe(audio)
if not speech:
logger.info("Nothing transcribed.")
return
logger.info("Transcribed speech: %s", speech)
await self._share_transcription(speech)

View File

@@ -54,8 +54,20 @@ class Streaming(CyclicBehaviour):
self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_speech = 100 # Used to allow small pauses in speech
self._ready = False
async def reset(self):
"""Clears the ZeroMQ queue and tells this behavior to start."""
discarded = 0
while await self.audio_in_poller.poll(1) is not None:
discarded += 1
logging.info(f"Discarded {discarded} audio packets before starting.")
self._ready = True
async def run(self) -> None:
if not self._ready:
return
data = await self.audio_in_poller.poll()
if data is None:
if len(self.audio_buffer) > 0:
@@ -107,6 +119,8 @@ class VADAgent(Agent):
self.audio_in_socket: azmq.Socket | None = None
self.audio_out_socket: azmq.Socket | None = None
self.streaming_behaviour: Streaming | None = None
async def stop(self):
"""
Stop listening to audio, stop publishing audio, close sockets.
@@ -149,8 +163,8 @@ class VADAgent(Agent):
return
audio_out_address = f"tcp://localhost:{audio_out_port}"
streaming = Streaming(self.audio_in_socket, self.audio_out_socket)
self.add_behaviour(streaming)
self.streaming_behaviour = Streaming(self.audio_in_socket, self.audio_out_socket)
self.add_behaviour(self.streaming_behaviour)
# Start agents dependent on the output audio fragments here
transcriber = TranscriptionAgent(audio_out_address)

View File

@@ -22,8 +22,8 @@ async def receive_command(command: SpeechCommand, request: Request):
topic = b"command"
# TODO: Check with Kasper
pub_socket: Socket = request.app.state.internal_comm_socket
pub_socket.send_multipart([topic, command.model_dump_json().encode()])
pub_socket: Socket = request.app.state.endpoints_pub_socket
await pub_socket.send_multipart([topic, command.model_dump_json().encode()])
return {"status": "Command received"}

View File

@@ -14,8 +14,6 @@ from control_backend.agents.bdi.bdi_core import BDICoreAgent
from control_backend.agents.bdi.text_extractor import TBeliefExtractor
from control_backend.agents.belief_collector.belief_collector import BeliefCollectorAgent
from control_backend.agents.llm.llm import LLMAgent
# Internal imports
from control_backend.agents.ri_communication_agent import RICommunicationAgent
from control_backend.agents.vad_agent import VADAgent
from control_backend.api.v1.router import api_router
@@ -99,6 +97,8 @@ async def lifespan(app: FastAPI):
_temp_vad_agent = VADAgent("tcp://localhost:5558", False)
await _temp_vad_agent.start()
logger.info("VAD agent started, now making ready...")
await _temp_vad_agent.streaming_behaviour.reset()
yield