diff --git a/src/control_backend/agents/bdi/behaviours/receive_llm_resp_behaviour.py b/src/control_backend/agents/bdi/behaviours/receive_llm_resp_behaviour.py index 747ab4c..33525f0 100644 --- a/src/control_backend/agents/bdi/behaviours/receive_llm_resp_behaviour.py +++ b/src/control_backend/agents/bdi/behaviours/receive_llm_resp_behaviour.py @@ -1,15 +1,18 @@ import logging from spade.behaviour import CyclicBehaviour +from spade.message import Message from control_backend.core.config import settings +from control_backend.schemas.ri_message import SpeechCommand class ReceiveLLMResponseBehaviour(CyclicBehaviour): """ Adds behavior to receive responses from the LLM Agent. """ - logger = logging.getLogger("BDI/LLM Reciever") + logger = logging.getLogger("BDI/LLM Receiver") + async def run(self): msg = await self.receive(timeout=2) if not msg: @@ -20,7 +23,17 @@ class ReceiveLLMResponseBehaviour(CyclicBehaviour): case settings.agent_settings.llm_agent_name: content = msg.body self.logger.info("Received LLM response: %s", content) - #Here the BDI can pass the message back as a response + + speech_command = SpeechCommand(data=content) + + message = Message(to=settings.agent_settings.ri_command_agent_name + + '@' + settings.agent_settings.host, + sender=self.agent.jid, + body=speech_command.model_dump_json()) + + self.logger.debug("Sending message: %s", message) + + await self.send(message) case _: self.logger.debug("Not from the llm, discarding message") pass \ No newline at end of file diff --git a/src/control_backend/agents/llm/llm.py b/src/control_backend/agents/llm/llm.py index 0f78095..96658f6 100644 --- a/src/control_backend/agents/llm/llm.py +++ b/src/control_backend/agents/llm/llm.py @@ -2,9 +2,10 @@ LLM Agent module for routing text queries from the BDI Core Agent to a local LLM service and returning its responses back to the BDI Core Agent. """ - +import json import logging -from typing import Any +import re +from typing import AsyncGenerator import httpx from spade.agent import Agent @@ -54,11 +55,15 @@ class LLMAgent(Agent): async def _process_bdi_message(self, message: Message): """ - Forwards user text to the LLM and replies with the generated text. + Forwards user text from the BDI to the LLM and replies with the generated text in chunks + separated by punctuation. """ user_text = message.body - llm_response = await self._query_llm(user_text) - await self._reply(llm_response) + # Consume the streaming generator and send a reply for every chunk + async for chunk in self._query_llm(user_text): + await self._reply(chunk) + self.agent.logger.debug("Finished processing BDI message. " + "Response sent in chunks to BDI Core Agent.") async def _reply(self, msg: str): """ @@ -69,52 +74,88 @@ class LLMAgent(Agent): body=msg ) await self.send(reply) - self.agent.logger.info("Reply sent to BDI Core Agent") - async def _query_llm(self, prompt: str) -> str: + async def _query_llm(self, prompt: str) -> AsyncGenerator[str]: """ - Sends a chat completion request to the local LLM service. + Sends a chat completion request to the local LLM service and streams the response by + yielding fragments separated by punctuation like. :param prompt: Input text prompt to pass to the LLM. - :return: LLM-generated content or fallback message. + :yield: Fragments of the LLM-generated content. """ - async with httpx.AsyncClient(timeout=120.0) as client: - # Example dynamic content for future (optional) - - instructions = LLMInstructions() - developer_instruction = instructions.build_developer_instruction() - - response = await client.post( + instructions = LLMInstructions( + "- Be friendly and respectful.\n" + "- Make the conversation feel natural and engaging.\n" + "- Speak like a pirate.\n" + "- When the user asks what you can do, tell them.", + "- Try to learn the user's name during conversation.\n" + "- Suggest playing a game of asking yes or no questions where you think of a word " + "and the user must guess it.", + ) + messages = [ + { + "role": "developer", + "content": instructions.build_developer_instruction(), + }, + { + "role": "user", + "content": prompt, + } + ] + + try: + current_chunk = "" + async for token in self._stream_query_llm(messages): + current_chunk += token + + # Stream the message in chunks separated by punctuation. + # We include the delimiter in the emitted chunk for natural flow. + pattern = re.compile( + r".*?(?:,|;|:|—|–|-|\.{3}|…|\.|\?|!|\(|\)|\[|\]|/)\s*", + re.DOTALL + ) + for m in pattern.finditer(current_chunk): + chunk = m.group(0) + if chunk: + yield current_chunk + current_chunk = "" + + # Yield any remaining tail + if current_chunk: yield current_chunk + except httpx.HTTPError as err: + self.agent.logger.error("HTTP error.", exc_info=err) + yield "LLM service unavailable." + except Exception as err: + self.agent.logger.error("Unexpected error.", exc_info=err) + yield "Error processing the request." + + async def _stream_query_llm(self, messages) -> AsyncGenerator[str]: + """Raises httpx.HTTPError when the API gives an error.""" + async with httpx.AsyncClient(timeout=None) as client: + async with client.stream( + "POST", settings.llm_settings.local_llm_url, - headers={"Content-Type": "application/json"}, json={ "model": settings.llm_settings.local_llm_model, - "messages": [ - { - "role": "developer", - "content": developer_instruction - }, - { - "role": "user", - "content": prompt - } - ], - "temperature": 0.3 + "messages": messages, + "temperature": 0.3, + "stream": True, }, - ) - - try: + ) as response: response.raise_for_status() - data: dict[str, Any] = response.json() - return data.get("choices", [{}])[0].get( - "message", {} - ).get("content", "No response") - except httpx.HTTPError as err: - self.agent.logger.error("HTTP error: %s", err) - return "LLM service unavailable." - except Exception as err: - self.agent.logger.error("Unexpected error: %s", err) - return "Error processing the request." + + async for line in response.aiter_lines(): + if not line or not line.startswith("data: "): continue + + data = line[len("data: "):] + if data.strip() == "[DONE]": break + + try: + event = json.loads(data) + delta = event.get("choices", [{}])[0].get("delta", {}).get("content") + if delta: yield delta + except json.JSONDecodeError: + self.agent.logger.error("Failed to parse LLM response: %s", data) async def setup(self): """ diff --git a/src/control_backend/agents/llm/llm_instructions.py b/src/control_backend/agents/llm/llm_instructions.py index 9636d88..e3aed7e 100644 --- a/src/control_backend/agents/llm/llm_instructions.py +++ b/src/control_backend/agents/llm/llm_instructions.py @@ -28,7 +28,9 @@ class LLMInstructions: """ sections = [ "You are a Pepper robot engaging in natural human conversation.", - "Keep responses between 1–5 sentences, unless instructed otherwise.\n", + "Keep responses between 1–3 sentences, unless told otherwise.\n", + "You're given goals to reach. Reach them in order, but make the conversation feel " + "natural. Some turns you should not try to achieve your goals.\n" ] if self.norms: diff --git a/src/control_backend/agents/ri_command_agent.py b/src/control_backend/agents/ri_command_agent.py index 01fc824..f8234ce 100644 --- a/src/control_backend/agents/ri_command_agent.py +++ b/src/control_backend/agents/ri_command_agent.py @@ -1,5 +1,7 @@ import json import logging + +import spade.agent from spade.agent import Agent from spade.behaviour import CyclicBehaviour import zmq @@ -31,6 +33,7 @@ class RICommandAgent(Agent): self.bind = bind class SendCommandsBehaviour(CyclicBehaviour): + """Behaviour for sending commands received from the UI.""" async def run(self): """ Run the command publishing loop indefinetely. @@ -49,6 +52,17 @@ class RICommandAgent(Agent): except Exception as e: logger.error("Error processing message: %s", e) + class SendPythonCommandsBehaviour(CyclicBehaviour): + """Behaviour for sending commands received from other Python agents.""" + async def run(self): + message: spade.agent.Message = await self.receive(timeout=0.1) + if message and message.to == self.agent.jid: + try: + speech_command = SpeechCommand.model_validate_json(message.body) + await self.agent.pubsocket.send_json(speech_command.model_dump()) + except Exception as e: + logger.error("Error processing message: %s", e) + async def setup(self): """ Setup the command agent @@ -70,5 +84,6 @@ class RICommandAgent(Agent): # Add behaviour to our agent commands_behaviour = self.SendCommandsBehaviour() self.add_behaviour(commands_behaviour) + self.add_behaviour(self.SendPythonCommandsBehaviour()) logger.info("Finished setting up %s", self.jid) diff --git a/src/control_backend/agents/transcription/speech_recognizer.py b/src/control_backend/agents/transcription/speech_recognizer.py index f316cda..83a5fd3 100644 --- a/src/control_backend/agents/transcription/speech_recognizer.py +++ b/src/control_backend/agents/transcription/speech_recognizer.py @@ -83,9 +83,6 @@ class MLXWhisperSpeechRecognizer(SpeechRecognizer): def recognize_speech(self, audio: np.ndarray) -> str: self.load_model() - return mlx_whisper.transcribe(audio, - path_or_hf_repo=self.model_name, - decode_options=self._get_decode_options(audio))["text"] return mlx_whisper.transcribe(audio, path_or_hf_repo=self.model_name)["text"].strip() diff --git a/src/control_backend/agents/vad_agent.py b/src/control_backend/agents/vad_agent.py index a228135..5b7f598 100644 --- a/src/control_backend/agents/vad_agent.py +++ b/src/control_backend/agents/vad_agent.py @@ -55,8 +55,19 @@ class Streaming(CyclicBehaviour): self.audio_buffer = np.array([], dtype=np.float32) self.i_since_speech = 100 # Used to allow small pauses in speech + self._ready = False + + async def reset(self): + """Clears the ZeroMQ queue and tells this behavior to start.""" + discarded = 0 + while await self.audio_in_poller.poll(1) is not None: + discarded += 1 + logging.info(f"Discarded {discarded} audio packets before starting.") + self._ready = True async def run(self) -> None: + if not self._ready: return + data = await self.audio_in_poller.poll() if data is None: if len(self.audio_buffer) > 0: @@ -108,6 +119,8 @@ class VADAgent(Agent): self.audio_in_socket: azmq.Socket | None = None self.audio_out_socket: azmq.Socket | None = None + self.streaming_behaviour: Streaming | None = None + async def stop(self): """ Stop listening to audio, stop publishing audio, close sockets. @@ -150,8 +163,8 @@ class VADAgent(Agent): return audio_out_address = f"tcp://localhost:{audio_out_port}" - streaming = Streaming(self.audio_in_socket, self.audio_out_socket) - self.add_behaviour(streaming) + self.streaming_behaviour = Streaming(self.audio_in_socket, self.audio_out_socket) + self.add_behaviour(self.streaming_behaviour) # Start agents dependent on the output audio fragments here transcriber = TranscriptionAgent(audio_out_address) diff --git a/src/control_backend/main.py b/src/control_backend/main.py index d3588ea..4684746 100644 --- a/src/control_backend/main.py +++ b/src/control_backend/main.py @@ -71,6 +71,8 @@ async def lifespan(app: FastAPI): _temp_vad_agent = VADAgent("tcp://localhost:5558", False) await _temp_vad_agent.start() + logger.info("VAD agent started, now making ready...") + await _temp_vad_agent.streaming_behaviour.reset() yield