From 86938f79c0a3f3e7b776f277c8e10a25a8106133 Mon Sep 17 00:00:00 2001 From: Twirre Meulenbelt <43213592+TwirreM@users.noreply.github.com> Date: Thu, 30 Oct 2025 10:42:25 +0100 Subject: [PATCH 1/6] feat: end to end connected for demo Includes the Transcription agent. Involved updating the RI agent to receive messages from other agents, sending speech commands to the RI agent, and some performance optimizations. ref: N25B-216 --- .../behaviours/receive_llm_resp_behaviour.py | 17 ++- src/control_backend/agents/llm/llm.py | 123 ++++++++++++------ .../agents/llm/llm_instructions.py | 4 +- .../agents/ri_command_agent.py | 15 +++ .../agents/transcription/speech_recognizer.py | 3 - src/control_backend/agents/vad_agent.py | 17 ++- src/control_backend/main.py | 2 + 7 files changed, 132 insertions(+), 49 deletions(-) diff --git a/src/control_backend/agents/bdi/behaviours/receive_llm_resp_behaviour.py b/src/control_backend/agents/bdi/behaviours/receive_llm_resp_behaviour.py index 747ab4c..33525f0 100644 --- a/src/control_backend/agents/bdi/behaviours/receive_llm_resp_behaviour.py +++ b/src/control_backend/agents/bdi/behaviours/receive_llm_resp_behaviour.py @@ -1,15 +1,18 @@ import logging from spade.behaviour import CyclicBehaviour +from spade.message import Message from control_backend.core.config import settings +from control_backend.schemas.ri_message import SpeechCommand class ReceiveLLMResponseBehaviour(CyclicBehaviour): """ Adds behavior to receive responses from the LLM Agent. """ - logger = logging.getLogger("BDI/LLM Reciever") + logger = logging.getLogger("BDI/LLM Receiver") + async def run(self): msg = await self.receive(timeout=2) if not msg: @@ -20,7 +23,17 @@ class ReceiveLLMResponseBehaviour(CyclicBehaviour): case settings.agent_settings.llm_agent_name: content = msg.body self.logger.info("Received LLM response: %s", content) - #Here the BDI can pass the message back as a response + + speech_command = SpeechCommand(data=content) + + message = Message(to=settings.agent_settings.ri_command_agent_name + + '@' + settings.agent_settings.host, + sender=self.agent.jid, + body=speech_command.model_dump_json()) + + self.logger.debug("Sending message: %s", message) + + await self.send(message) case _: self.logger.debug("Not from the llm, discarding message") pass \ No newline at end of file diff --git a/src/control_backend/agents/llm/llm.py b/src/control_backend/agents/llm/llm.py index 0f78095..96658f6 100644 --- a/src/control_backend/agents/llm/llm.py +++ b/src/control_backend/agents/llm/llm.py @@ -2,9 +2,10 @@ LLM Agent module for routing text queries from the BDI Core Agent to a local LLM service and returning its responses back to the BDI Core Agent. """ - +import json import logging -from typing import Any +import re +from typing import AsyncGenerator import httpx from spade.agent import Agent @@ -54,11 +55,15 @@ class LLMAgent(Agent): async def _process_bdi_message(self, message: Message): """ - Forwards user text to the LLM and replies with the generated text. + Forwards user text from the BDI to the LLM and replies with the generated text in chunks + separated by punctuation. """ user_text = message.body - llm_response = await self._query_llm(user_text) - await self._reply(llm_response) + # Consume the streaming generator and send a reply for every chunk + async for chunk in self._query_llm(user_text): + await self._reply(chunk) + self.agent.logger.debug("Finished processing BDI message. " + "Response sent in chunks to BDI Core Agent.") async def _reply(self, msg: str): """ @@ -69,52 +74,88 @@ class LLMAgent(Agent): body=msg ) await self.send(reply) - self.agent.logger.info("Reply sent to BDI Core Agent") - async def _query_llm(self, prompt: str) -> str: + async def _query_llm(self, prompt: str) -> AsyncGenerator[str]: """ - Sends a chat completion request to the local LLM service. + Sends a chat completion request to the local LLM service and streams the response by + yielding fragments separated by punctuation like. :param prompt: Input text prompt to pass to the LLM. - :return: LLM-generated content or fallback message. + :yield: Fragments of the LLM-generated content. """ - async with httpx.AsyncClient(timeout=120.0) as client: - # Example dynamic content for future (optional) - - instructions = LLMInstructions() - developer_instruction = instructions.build_developer_instruction() - - response = await client.post( + instructions = LLMInstructions( + "- Be friendly and respectful.\n" + "- Make the conversation feel natural and engaging.\n" + "- Speak like a pirate.\n" + "- When the user asks what you can do, tell them.", + "- Try to learn the user's name during conversation.\n" + "- Suggest playing a game of asking yes or no questions where you think of a word " + "and the user must guess it.", + ) + messages = [ + { + "role": "developer", + "content": instructions.build_developer_instruction(), + }, + { + "role": "user", + "content": prompt, + } + ] + + try: + current_chunk = "" + async for token in self._stream_query_llm(messages): + current_chunk += token + + # Stream the message in chunks separated by punctuation. + # We include the delimiter in the emitted chunk for natural flow. + pattern = re.compile( + r".*?(?:,|;|:|—|–|-|\.{3}|…|\.|\?|!|\(|\)|\[|\]|/)\s*", + re.DOTALL + ) + for m in pattern.finditer(current_chunk): + chunk = m.group(0) + if chunk: + yield current_chunk + current_chunk = "" + + # Yield any remaining tail + if current_chunk: yield current_chunk + except httpx.HTTPError as err: + self.agent.logger.error("HTTP error.", exc_info=err) + yield "LLM service unavailable." + except Exception as err: + self.agent.logger.error("Unexpected error.", exc_info=err) + yield "Error processing the request." + + async def _stream_query_llm(self, messages) -> AsyncGenerator[str]: + """Raises httpx.HTTPError when the API gives an error.""" + async with httpx.AsyncClient(timeout=None) as client: + async with client.stream( + "POST", settings.llm_settings.local_llm_url, - headers={"Content-Type": "application/json"}, json={ "model": settings.llm_settings.local_llm_model, - "messages": [ - { - "role": "developer", - "content": developer_instruction - }, - { - "role": "user", - "content": prompt - } - ], - "temperature": 0.3 + "messages": messages, + "temperature": 0.3, + "stream": True, }, - ) - - try: + ) as response: response.raise_for_status() - data: dict[str, Any] = response.json() - return data.get("choices", [{}])[0].get( - "message", {} - ).get("content", "No response") - except httpx.HTTPError as err: - self.agent.logger.error("HTTP error: %s", err) - return "LLM service unavailable." - except Exception as err: - self.agent.logger.error("Unexpected error: %s", err) - return "Error processing the request." + + async for line in response.aiter_lines(): + if not line or not line.startswith("data: "): continue + + data = line[len("data: "):] + if data.strip() == "[DONE]": break + + try: + event = json.loads(data) + delta = event.get("choices", [{}])[0].get("delta", {}).get("content") + if delta: yield delta + except json.JSONDecodeError: + self.agent.logger.error("Failed to parse LLM response: %s", data) async def setup(self): """ diff --git a/src/control_backend/agents/llm/llm_instructions.py b/src/control_backend/agents/llm/llm_instructions.py index 9636d88..e3aed7e 100644 --- a/src/control_backend/agents/llm/llm_instructions.py +++ b/src/control_backend/agents/llm/llm_instructions.py @@ -28,7 +28,9 @@ class LLMInstructions: """ sections = [ "You are a Pepper robot engaging in natural human conversation.", - "Keep responses between 1–5 sentences, unless instructed otherwise.\n", + "Keep responses between 1–3 sentences, unless told otherwise.\n", + "You're given goals to reach. Reach them in order, but make the conversation feel " + "natural. Some turns you should not try to achieve your goals.\n" ] if self.norms: diff --git a/src/control_backend/agents/ri_command_agent.py b/src/control_backend/agents/ri_command_agent.py index 01fc824..f8234ce 100644 --- a/src/control_backend/agents/ri_command_agent.py +++ b/src/control_backend/agents/ri_command_agent.py @@ -1,5 +1,7 @@ import json import logging + +import spade.agent from spade.agent import Agent from spade.behaviour import CyclicBehaviour import zmq @@ -31,6 +33,7 @@ class RICommandAgent(Agent): self.bind = bind class SendCommandsBehaviour(CyclicBehaviour): + """Behaviour for sending commands received from the UI.""" async def run(self): """ Run the command publishing loop indefinetely. @@ -49,6 +52,17 @@ class RICommandAgent(Agent): except Exception as e: logger.error("Error processing message: %s", e) + class SendPythonCommandsBehaviour(CyclicBehaviour): + """Behaviour for sending commands received from other Python agents.""" + async def run(self): + message: spade.agent.Message = await self.receive(timeout=0.1) + if message and message.to == self.agent.jid: + try: + speech_command = SpeechCommand.model_validate_json(message.body) + await self.agent.pubsocket.send_json(speech_command.model_dump()) + except Exception as e: + logger.error("Error processing message: %s", e) + async def setup(self): """ Setup the command agent @@ -70,5 +84,6 @@ class RICommandAgent(Agent): # Add behaviour to our agent commands_behaviour = self.SendCommandsBehaviour() self.add_behaviour(commands_behaviour) + self.add_behaviour(self.SendPythonCommandsBehaviour()) logger.info("Finished setting up %s", self.jid) diff --git a/src/control_backend/agents/transcription/speech_recognizer.py b/src/control_backend/agents/transcription/speech_recognizer.py index f316cda..83a5fd3 100644 --- a/src/control_backend/agents/transcription/speech_recognizer.py +++ b/src/control_backend/agents/transcription/speech_recognizer.py @@ -83,9 +83,6 @@ class MLXWhisperSpeechRecognizer(SpeechRecognizer): def recognize_speech(self, audio: np.ndarray) -> str: self.load_model() - return mlx_whisper.transcribe(audio, - path_or_hf_repo=self.model_name, - decode_options=self._get_decode_options(audio))["text"] return mlx_whisper.transcribe(audio, path_or_hf_repo=self.model_name)["text"].strip() diff --git a/src/control_backend/agents/vad_agent.py b/src/control_backend/agents/vad_agent.py index a228135..5b7f598 100644 --- a/src/control_backend/agents/vad_agent.py +++ b/src/control_backend/agents/vad_agent.py @@ -55,8 +55,19 @@ class Streaming(CyclicBehaviour): self.audio_buffer = np.array([], dtype=np.float32) self.i_since_speech = 100 # Used to allow small pauses in speech + self._ready = False + + async def reset(self): + """Clears the ZeroMQ queue and tells this behavior to start.""" + discarded = 0 + while await self.audio_in_poller.poll(1) is not None: + discarded += 1 + logging.info(f"Discarded {discarded} audio packets before starting.") + self._ready = True async def run(self) -> None: + if not self._ready: return + data = await self.audio_in_poller.poll() if data is None: if len(self.audio_buffer) > 0: @@ -108,6 +119,8 @@ class VADAgent(Agent): self.audio_in_socket: azmq.Socket | None = None self.audio_out_socket: azmq.Socket | None = None + self.streaming_behaviour: Streaming | None = None + async def stop(self): """ Stop listening to audio, stop publishing audio, close sockets. @@ -150,8 +163,8 @@ class VADAgent(Agent): return audio_out_address = f"tcp://localhost:{audio_out_port}" - streaming = Streaming(self.audio_in_socket, self.audio_out_socket) - self.add_behaviour(streaming) + self.streaming_behaviour = Streaming(self.audio_in_socket, self.audio_out_socket) + self.add_behaviour(self.streaming_behaviour) # Start agents dependent on the output audio fragments here transcriber = TranscriptionAgent(audio_out_address) diff --git a/src/control_backend/main.py b/src/control_backend/main.py index d3588ea..4684746 100644 --- a/src/control_backend/main.py +++ b/src/control_backend/main.py @@ -71,6 +71,8 @@ async def lifespan(app: FastAPI): _temp_vad_agent = VADAgent("tcp://localhost:5558", False) await _temp_vad_agent.start() + logger.info("VAD agent started, now making ready...") + await _temp_vad_agent.streaming_behaviour.reset() yield -- 2.49.1 From 4ffe3b2071410655b59e7ebc117f69d79ccf57a4 Mon Sep 17 00:00:00 2001 From: Twirre Meulenbelt <43213592+TwirreM@users.noreply.github.com> Date: Thu, 30 Oct 2025 16:40:45 +0100 Subject: [PATCH 2/6] fix: make VAD unit tests work after changes Namely, the Streamer has to be marked ready. ref: N25B-216 --- test/integration/agents/vad_agent/test_vad_with_audio.py | 1 + test/unit/agents/test_vad_streaming.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/test/integration/agents/vad_agent/test_vad_with_audio.py b/test/integration/agents/vad_agent/test_vad_with_audio.py index 7d10aa3..fd7d4d7 100644 --- a/test/integration/agents/vad_agent/test_vad_with_audio.py +++ b/test/integration/agents/vad_agent/test_vad_with_audio.py @@ -48,6 +48,7 @@ async def test_real_audio(mocker): audio_out_socket = AsyncMock() vad_streamer = Streaming(audio_in_socket, audio_out_socket) + vad_streamer._ready = True for _ in audio_chunks: await vad_streamer.run() diff --git a/test/unit/agents/test_vad_streaming.py b/test/unit/agents/test_vad_streaming.py index 9b38cd0..ab2da0d 100644 --- a/test/unit/agents/test_vad_streaming.py +++ b/test/unit/agents/test_vad_streaming.py @@ -21,7 +21,9 @@ def streaming(audio_in_socket, audio_out_socket): import torch torch.hub.load.return_value = (..., ...) # Mock - return Streaming(audio_in_socket, audio_out_socket) + streaming = Streaming(audio_in_socket, audio_out_socket) + streaming._ready = True + return streaming async def simulate_streaming_with_probabilities(streaming, probabilities: list[float]): -- 2.49.1 From 5c228df1094454443edd8dd79b192c9b4f89edc6 Mon Sep 17 00:00:00 2001 From: Twirre Meulenbelt <43213592+TwirreM@users.noreply.github.com> Date: Wed, 5 Nov 2025 10:41:11 +0100 Subject: [PATCH 3/6] fix: allow Whisper to generate more tokens based on audio length Before, it sometimes cut off the transcription too early. ref: N25B-209 --- .../agents/transcription/speech_recognizer.py | 17 ++++++++++++----- .../agents/transcription/transcription_agent.py | 4 ++++ 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/control_backend/agents/transcription/speech_recognizer.py b/src/control_backend/agents/transcription/speech_recognizer.py index 45e42bf..9e61fd7 100644 --- a/src/control_backend/agents/transcription/speech_recognizer.py +++ b/src/control_backend/agents/transcription/speech_recognizer.py @@ -36,16 +36,16 @@ class SpeechRecognizer(abc.ABC): def _estimate_max_tokens(audio: np.ndarray) -> int: """ Estimate the maximum length of a given audio sample in tokens. Assumes a maximum speaking - rate of 300 words per minute (2x average), and assumes that 3 words is 4 tokens. + rate of 450 words per minute (3x average), and assumes that 3 words is 4 tokens. :param audio: The audio sample (16 kHz) to use for length estimation. :return: The estimated length of the transcribed audio in tokens. """ length_seconds = len(audio) / 16_000 length_minutes = length_seconds / 60 - word_count = length_minutes * 300 + word_count = length_minutes * 450 token_count = word_count / 3 * 4 - return int(token_count) + return int(token_count) + 10 def _get_decode_options(self, audio: np.ndarray) -> dict: """ @@ -84,7 +84,12 @@ class MLXWhisperSpeechRecognizer(SpeechRecognizer): def recognize_speech(self, audio: np.ndarray) -> str: self.load_model() - return mlx_whisper.transcribe(audio, path_or_hf_repo=self.model_name)["text"] + return mlx_whisper.transcribe( + audio, + path_or_hf_repo=self.model_name, + initial_prompt="You're a robot called Pepper, talking with a person called Twirre.", + **self._get_decode_options(audio), + )["text"].strip() class OpenAIWhisperSpeechRecognizer(SpeechRecognizer): @@ -101,5 +106,7 @@ class OpenAIWhisperSpeechRecognizer(SpeechRecognizer): def recognize_speech(self, audio: np.ndarray) -> str: self.load_model() return whisper.transcribe( - self.model, audio, decode_options=self._get_decode_options(audio) + self.model, + audio, + **self._get_decode_options(audio) )["text"] diff --git a/src/control_backend/agents/transcription/transcription_agent.py b/src/control_backend/agents/transcription/transcription_agent.py index 2d936c4..196fd28 100644 --- a/src/control_backend/agents/transcription/transcription_agent.py +++ b/src/control_backend/agents/transcription/transcription_agent.py @@ -59,6 +59,10 @@ class TranscriptionAgent(Agent): audio = await self.audio_in_socket.recv() audio = np.frombuffer(audio, dtype=np.float32) speech = await self._transcribe(audio) + if not speech: + logger.info("Nothing transcribed.") + return + logger.info("Transcribed speech: %s", speech) await self._share_transcription(speech) -- 2.49.1 From 1b58549c2ab3580a35217a55750943dbdd1336c4 Mon Sep 17 00:00:00 2001 From: Twirre Meulenbelt <43213592+TwirreM@users.noreply.github.com> Date: Wed, 5 Nov 2025 12:41:48 +0100 Subject: [PATCH 4/6] test: fix expected test value after changing audio token allowance ref: N25B-209 --- test/unit/agents/transcription/test_speech_recognizer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/unit/agents/transcription/test_speech_recognizer.py b/test/unit/agents/transcription/test_speech_recognizer.py index 88a5ac2..ab28dcf 100644 --- a/test/unit/agents/transcription/test_speech_recognizer.py +++ b/test/unit/agents/transcription/test_speech_recognizer.py @@ -5,12 +5,13 @@ from control_backend.agents.transcription.speech_recognizer import OpenAIWhisper def test_estimate_max_tokens(): - """Inputting one minute of audio, assuming 300 words per minute, expecting 400 tokens.""" + """Inputting one minute of audio, assuming 450 words per minute and adding a 10 token padding, + expecting 610 tokens.""" audio = np.empty(shape=(60 * 16_000), dtype=np.float32) actual = SpeechRecognizer._estimate_max_tokens(audio) - assert actual == 400 + assert actual == 610 assert isinstance(actual, int) -- 2.49.1 From 06e9e4fd150311edfea46940f6c8ea8fc64cfa1e Mon Sep 17 00:00:00 2001 From: Twirre Meulenbelt <43213592+TwirreM@users.noreply.github.com> Date: Wed, 5 Nov 2025 14:15:03 +0100 Subject: [PATCH 5/6] chore: ruff format --- src/control_backend/agents/llm/llm_instructions.py | 2 +- .../agents/transcription/speech_recognizer.py | 7 +------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/control_backend/agents/llm/llm_instructions.py b/src/control_backend/agents/llm/llm_instructions.py index e3aed7e..6922fca 100644 --- a/src/control_backend/agents/llm/llm_instructions.py +++ b/src/control_backend/agents/llm/llm_instructions.py @@ -30,7 +30,7 @@ class LLMInstructions: "You are a Pepper robot engaging in natural human conversation.", "Keep responses between 1–3 sentences, unless told otherwise.\n", "You're given goals to reach. Reach them in order, but make the conversation feel " - "natural. Some turns you should not try to achieve your goals.\n" + "natural. Some turns you should not try to achieve your goals.\n", ] if self.norms: diff --git a/src/control_backend/agents/transcription/speech_recognizer.py b/src/control_backend/agents/transcription/speech_recognizer.py index 9e61fd7..527d371 100644 --- a/src/control_backend/agents/transcription/speech_recognizer.py +++ b/src/control_backend/agents/transcription/speech_recognizer.py @@ -87,7 +87,6 @@ class MLXWhisperSpeechRecognizer(SpeechRecognizer): return mlx_whisper.transcribe( audio, path_or_hf_repo=self.model_name, - initial_prompt="You're a robot called Pepper, talking with a person called Twirre.", **self._get_decode_options(audio), )["text"].strip() @@ -105,8 +104,4 @@ class OpenAIWhisperSpeechRecognizer(SpeechRecognizer): def recognize_speech(self, audio: np.ndarray) -> str: self.load_model() - return whisper.transcribe( - self.model, - audio, - **self._get_decode_options(audio) - )["text"] + return whisper.transcribe(self.model, audio, **self._get_decode_options(audio))["text"] -- 2.49.1 From 262376fb58d3e6a867fd40a77e0c7c317dab9157 Mon Sep 17 00:00:00 2001 From: Twirre Meulenbelt <43213592+TwirreM@users.noreply.github.com> Date: Wed, 5 Nov 2025 15:01:01 +0100 Subject: [PATCH 6/6] fix: break LLM response with fewer types of punctuation ref: N25B-207 --- src/control_backend/agents/llm/llm.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/control_backend/agents/llm/llm.py b/src/control_backend/agents/llm/llm.py index 0b9d259..4487b23 100644 --- a/src/control_backend/agents/llm/llm.py +++ b/src/control_backend/agents/llm/llm.py @@ -112,9 +112,7 @@ class LLMAgent(Agent): # Stream the message in chunks separated by punctuation. # We include the delimiter in the emitted chunk for natural flow. - pattern = re.compile( - r".*?(?:,|;|:|—|–|-|\.{3}|…|\.|\?|!|\(|\)|\[|\]|/)\s*", re.DOTALL - ) + pattern = re.compile(r".*?(?:,|;|:|—|–|\.{3}|…|\.|\?|!)\s*", re.DOTALL) for m in pattern.finditer(current_chunk): chunk = m.group(0) if chunk: -- 2.49.1