End to end connected for demo #17
@@ -1,15 +1,18 @@
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
from spade.behaviour import CyclicBehaviour
|
from spade.behaviour import CyclicBehaviour
|
||||||
|
from spade.message import Message
|
||||||
|
|
||||||
from control_backend.core.config import settings
|
from control_backend.core.config import settings
|
||||||
|
from control_backend.schemas.ri_message import SpeechCommand
|
||||||
|
|
||||||
|
|
||||||
class ReceiveLLMResponseBehaviour(CyclicBehaviour):
|
class ReceiveLLMResponseBehaviour(CyclicBehaviour):
|
||||||
"""
|
"""
|
||||||
Adds behavior to receive responses from the LLM Agent.
|
Adds behavior to receive responses from the LLM Agent.
|
||||||
"""
|
"""
|
||||||
logger = logging.getLogger("BDI/LLM Reciever")
|
logger = logging.getLogger("BDI/LLM Receiver")
|
||||||
|
|
||||||
async def run(self):
|
async def run(self):
|
||||||
msg = await self.receive(timeout=2)
|
msg = await self.receive(timeout=2)
|
||||||
if not msg:
|
if not msg:
|
||||||
@@ -20,7 +23,17 @@ class ReceiveLLMResponseBehaviour(CyclicBehaviour):
|
|||||||
case settings.agent_settings.llm_agent_name:
|
case settings.agent_settings.llm_agent_name:
|
||||||
content = msg.body
|
content = msg.body
|
||||||
self.logger.info("Received LLM response: %s", content)
|
self.logger.info("Received LLM response: %s", content)
|
||||||
#Here the BDI can pass the message back as a response
|
|
||||||
|
speech_command = SpeechCommand(data=content)
|
||||||
|
|
||||||
|
message = Message(to=settings.agent_settings.ri_command_agent_name
|
||||||
|
+ '@' + settings.agent_settings.host,
|
||||||
|
sender=self.agent.jid,
|
||||||
|
body=speech_command.model_dump_json())
|
||||||
|
|
||||||
|
self.logger.debug("Sending message: %s", message)
|
||||||
|
|
||||||
|
await self.send(message)
|
||||||
case _:
|
case _:
|
||||||
self.logger.debug("Not from the llm, discarding message")
|
self.logger.debug("Not from the llm, discarding message")
|
||||||
pass
|
pass
|
||||||
@@ -2,9 +2,10 @@
|
|||||||
LLM Agent module for routing text queries from the BDI Core Agent to a local LLM
|
LLM Agent module for routing text queries from the BDI Core Agent to a local LLM
|
||||||
service and returning its responses back to the BDI Core Agent.
|
service and returning its responses back to the BDI Core Agent.
|
||||||
"""
|
"""
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
from typing import Any
|
import re
|
||||||
|
from typing import AsyncGenerator
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
from spade.agent import Agent
|
from spade.agent import Agent
|
||||||
@@ -54,11 +55,15 @@ class LLMAgent(Agent):
|
|||||||
|
|
||||||
async def _process_bdi_message(self, message: Message):
|
async def _process_bdi_message(self, message: Message):
|
||||||
"""
|
"""
|
||||||
Forwards user text to the LLM and replies with the generated text.
|
Forwards user text from the BDI to the LLM and replies with the generated text in chunks
|
||||||
|
separated by punctuation.
|
||||||
"""
|
"""
|
||||||
user_text = message.body
|
user_text = message.body
|
||||||
llm_response = await self._query_llm(user_text)
|
# Consume the streaming generator and send a reply for every chunk
|
||||||
await self._reply(llm_response)
|
async for chunk in self._query_llm(user_text):
|
||||||
|
await self._reply(chunk)
|
||||||
|
self.agent.logger.debug("Finished processing BDI message. "
|
||||||
|
"Response sent in chunks to BDI Core Agent.")
|
||||||
|
|
||||||
async def _reply(self, msg: str):
|
async def _reply(self, msg: str):
|
||||||
"""
|
"""
|
||||||
@@ -69,52 +74,88 @@ class LLMAgent(Agent):
|
|||||||
body=msg
|
body=msg
|
||||||
)
|
)
|
||||||
await self.send(reply)
|
await self.send(reply)
|
||||||
self.agent.logger.info("Reply sent to BDI Core Agent")
|
|
||||||
|
|
||||||
async def _query_llm(self, prompt: str) -> str:
|
async def _query_llm(self, prompt: str) -> AsyncGenerator[str]:
|
||||||
"""
|
"""
|
||||||
Sends a chat completion request to the local LLM service.
|
Sends a chat completion request to the local LLM service and streams the response by
|
||||||
|
yielding fragments separated by punctuation like.
|
||||||
|
|
||||||
:param prompt: Input text prompt to pass to the LLM.
|
:param prompt: Input text prompt to pass to the LLM.
|
||||||
:return: LLM-generated content or fallback message.
|
:yield: Fragments of the LLM-generated content.
|
||||||
"""
|
"""
|
||||||
async with httpx.AsyncClient(timeout=120.0) as client:
|
instructions = LLMInstructions(
|
||||||
# Example dynamic content for future (optional)
|
"- Be friendly and respectful.\n"
|
||||||
|
"- Make the conversation feel natural and engaging.\n"
|
||||||
instructions = LLMInstructions()
|
"- Speak like a pirate.\n"
|
||||||
developer_instruction = instructions.build_developer_instruction()
|
"- When the user asks what you can do, tell them.",
|
||||||
|
"- Try to learn the user's name during conversation.\n"
|
||||||
response = await client.post(
|
"- Suggest playing a game of asking yes or no questions where you think of a word "
|
||||||
settings.llm_settings.local_llm_url,
|
"and the user must guess it.",
|
||||||
headers={"Content-Type": "application/json"},
|
)
|
||||||
json={
|
messages = [
|
||||||
"model": settings.llm_settings.local_llm_model,
|
|
||||||
"messages": [
|
|
||||||
{
|
{
|
||||||
"role": "developer",
|
"role": "developer",
|
||||||
"content": developer_instruction
|
"content": instructions.build_developer_instruction(),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"role": "user",
|
"role": "user",
|
||||||
"content": prompt
|
"content": prompt,
|
||||||
}
|
}
|
||||||
],
|
]
|
||||||
"temperature": 0.3
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response.raise_for_status()
|
current_chunk = ""
|
||||||
data: dict[str, Any] = response.json()
|
async for token in self._stream_query_llm(messages):
|
||||||
return data.get("choices", [{}])[0].get(
|
current_chunk += token
|
||||||
"message", {}
|
|
||||||
).get("content", "No response")
|
# Stream the message in chunks separated by punctuation.
|
||||||
|
# We include the delimiter in the emitted chunk for natural flow.
|
||||||
|
pattern = re.compile(
|
||||||
|
r".*?(?:,|;|:|—|–|-|\.{3}|…|\.|\?|!|\(|\)|\[|\]|/)\s*",
|
||||||
|
re.DOTALL
|
||||||
|
)
|
||||||
|
for m in pattern.finditer(current_chunk):
|
||||||
|
chunk = m.group(0)
|
||||||
|
if chunk:
|
||||||
|
yield current_chunk
|
||||||
|
current_chunk = ""
|
||||||
|
|
||||||
|
# Yield any remaining tail
|
||||||
|
if current_chunk: yield current_chunk
|
||||||
except httpx.HTTPError as err:
|
except httpx.HTTPError as err:
|
||||||
self.agent.logger.error("HTTP error: %s", err)
|
self.agent.logger.error("HTTP error.", exc_info=err)
|
||||||
return "LLM service unavailable."
|
yield "LLM service unavailable."
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
self.agent.logger.error("Unexpected error: %s", err)
|
self.agent.logger.error("Unexpected error.", exc_info=err)
|
||||||
return "Error processing the request."
|
yield "Error processing the request."
|
||||||
|
|
||||||
|
async def _stream_query_llm(self, messages) -> AsyncGenerator[str]:
|
||||||
|
"""Raises httpx.HTTPError when the API gives an error."""
|
||||||
|
async with httpx.AsyncClient(timeout=None) as client:
|
||||||
|
async with client.stream(
|
||||||
|
"POST",
|
||||||
|
settings.llm_settings.local_llm_url,
|
||||||
|
json={
|
||||||
|
"model": settings.llm_settings.local_llm_model,
|
||||||
|
"messages": messages,
|
||||||
|
"temperature": 0.3,
|
||||||
|
"stream": True,
|
||||||
|
},
|
||||||
|
) as response:
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
async for line in response.aiter_lines():
|
||||||
|
if not line or not line.startswith("data: "): continue
|
||||||
|
|
||||||
|
data = line[len("data: "):]
|
||||||
|
if data.strip() == "[DONE]": break
|
||||||
|
|
||||||
|
try:
|
||||||
|
event = json.loads(data)
|
||||||
|
delta = event.get("choices", [{}])[0].get("delta", {}).get("content")
|
||||||
|
if delta: yield delta
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
self.agent.logger.error("Failed to parse LLM response: %s", data)
|
||||||
|
|
||||||
async def setup(self):
|
async def setup(self):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -28,7 +28,9 @@ class LLMInstructions:
|
|||||||
"""
|
"""
|
||||||
sections = [
|
sections = [
|
||||||
"You are a Pepper robot engaging in natural human conversation.",
|
"You are a Pepper robot engaging in natural human conversation.",
|
||||||
"Keep responses between 1–5 sentences, unless instructed otherwise.\n",
|
"Keep responses between 1–3 sentences, unless told otherwise.\n",
|
||||||
|
"You're given goals to reach. Reach them in order, but make the conversation feel "
|
||||||
|
"natural. Some turns you should not try to achieve your goals.\n"
|
||||||
]
|
]
|
||||||
|
|
||||||
if self.norms:
|
if self.norms:
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
import spade.agent
|
||||||
from spade.agent import Agent
|
from spade.agent import Agent
|
||||||
from spade.behaviour import CyclicBehaviour
|
from spade.behaviour import CyclicBehaviour
|
||||||
import zmq
|
import zmq
|
||||||
@@ -31,6 +33,7 @@ class RICommandAgent(Agent):
|
|||||||
self.bind = bind
|
self.bind = bind
|
||||||
|
|
||||||
class SendCommandsBehaviour(CyclicBehaviour):
|
class SendCommandsBehaviour(CyclicBehaviour):
|
||||||
|
"""Behaviour for sending commands received from the UI."""
|
||||||
async def run(self):
|
async def run(self):
|
||||||
"""
|
"""
|
||||||
Run the command publishing loop indefinetely.
|
Run the command publishing loop indefinetely.
|
||||||
@@ -49,6 +52,17 @@ class RICommandAgent(Agent):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Error processing message: %s", e)
|
logger.error("Error processing message: %s", e)
|
||||||
|
|
||||||
|
class SendPythonCommandsBehaviour(CyclicBehaviour):
|
||||||
|
"""Behaviour for sending commands received from other Python agents."""
|
||||||
|
async def run(self):
|
||||||
|
message: spade.agent.Message = await self.receive(timeout=0.1)
|
||||||
|
if message and message.to == self.agent.jid:
|
||||||
|
try:
|
||||||
|
speech_command = SpeechCommand.model_validate_json(message.body)
|
||||||
|
await self.agent.pubsocket.send_json(speech_command.model_dump())
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error processing message: %s", e)
|
||||||
|
|
||||||
async def setup(self):
|
async def setup(self):
|
||||||
"""
|
"""
|
||||||
Setup the command agent
|
Setup the command agent
|
||||||
@@ -70,5 +84,6 @@ class RICommandAgent(Agent):
|
|||||||
# Add behaviour to our agent
|
# Add behaviour to our agent
|
||||||
commands_behaviour = self.SendCommandsBehaviour()
|
commands_behaviour = self.SendCommandsBehaviour()
|
||||||
self.add_behaviour(commands_behaviour)
|
self.add_behaviour(commands_behaviour)
|
||||||
|
self.add_behaviour(self.SendPythonCommandsBehaviour())
|
||||||
|
|
||||||
logger.info("Finished setting up %s", self.jid)
|
logger.info("Finished setting up %s", self.jid)
|
||||||
|
|||||||
@@ -83,9 +83,6 @@ class MLXWhisperSpeechRecognizer(SpeechRecognizer):
|
|||||||
|
|
||||||
def recognize_speech(self, audio: np.ndarray) -> str:
|
def recognize_speech(self, audio: np.ndarray) -> str:
|
||||||
self.load_model()
|
self.load_model()
|
||||||
return mlx_whisper.transcribe(audio,
|
|
||||||
path_or_hf_repo=self.model_name,
|
|
||||||
decode_options=self._get_decode_options(audio))["text"]
|
|
||||||
return mlx_whisper.transcribe(audio, path_or_hf_repo=self.model_name)["text"].strip()
|
return mlx_whisper.transcribe(audio, path_or_hf_repo=self.model_name)["text"].strip()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -55,8 +55,19 @@ class Streaming(CyclicBehaviour):
|
|||||||
|
|
||||||
self.audio_buffer = np.array([], dtype=np.float32)
|
self.audio_buffer = np.array([], dtype=np.float32)
|
||||||
self.i_since_speech = 100 # Used to allow small pauses in speech
|
self.i_since_speech = 100 # Used to allow small pauses in speech
|
||||||
|
self._ready = False
|
||||||
|
|
||||||
|
async def reset(self):
|
||||||
|
"""Clears the ZeroMQ queue and tells this behavior to start."""
|
||||||
|
discarded = 0
|
||||||
|
while await self.audio_in_poller.poll(1) is not None:
|
||||||
|
discarded += 1
|
||||||
|
logging.info(f"Discarded {discarded} audio packets before starting.")
|
||||||
|
self._ready = True
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self) -> None:
|
||||||
|
if not self._ready: return
|
||||||
|
|
||||||
data = await self.audio_in_poller.poll()
|
data = await self.audio_in_poller.poll()
|
||||||
if data is None:
|
if data is None:
|
||||||
if len(self.audio_buffer) > 0:
|
if len(self.audio_buffer) > 0:
|
||||||
@@ -108,6 +119,8 @@ class VADAgent(Agent):
|
|||||||
self.audio_in_socket: azmq.Socket | None = None
|
self.audio_in_socket: azmq.Socket | None = None
|
||||||
self.audio_out_socket: azmq.Socket | None = None
|
self.audio_out_socket: azmq.Socket | None = None
|
||||||
|
|
||||||
|
self.streaming_behaviour: Streaming | None = None
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
"""
|
"""
|
||||||
Stop listening to audio, stop publishing audio, close sockets.
|
Stop listening to audio, stop publishing audio, close sockets.
|
||||||
@@ -150,8 +163,8 @@ class VADAgent(Agent):
|
|||||||
return
|
return
|
||||||
audio_out_address = f"tcp://localhost:{audio_out_port}"
|
audio_out_address = f"tcp://localhost:{audio_out_port}"
|
||||||
|
|
||||||
streaming = Streaming(self.audio_in_socket, self.audio_out_socket)
|
self.streaming_behaviour = Streaming(self.audio_in_socket, self.audio_out_socket)
|
||||||
self.add_behaviour(streaming)
|
self.add_behaviour(self.streaming_behaviour)
|
||||||
|
|
||||||
# Start agents dependent on the output audio fragments here
|
# Start agents dependent on the output audio fragments here
|
||||||
transcriber = TranscriptionAgent(audio_out_address)
|
transcriber = TranscriptionAgent(audio_out_address)
|
||||||
|
|||||||
@@ -71,6 +71,8 @@ async def lifespan(app: FastAPI):
|
|||||||
|
|
||||||
_temp_vad_agent = VADAgent("tcp://localhost:5558", False)
|
_temp_vad_agent = VADAgent("tcp://localhost:5558", False)
|
||||||
await _temp_vad_agent.start()
|
await _temp_vad_agent.start()
|
||||||
|
logger.info("VAD agent started, now making ready...")
|
||||||
|
await _temp_vad_agent.streaming_behaviour.reset()
|
||||||
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user