164 lines
6.3 KiB
Python
164 lines
6.3 KiB
Python
import json
|
||
import re
|
||
from collections.abc import AsyncGenerator
|
||
|
||
import httpx
|
||
from spade.behaviour import CyclicBehaviour
|
||
from spade.message import Message
|
||
|
||
from control_backend.agents import BaseAgent
|
||
from control_backend.core.config import settings
|
||
|
||
from .llm_instructions import LLMInstructions
|
||
|
||
|
||
class LLMAgent(BaseAgent):
|
||
"""
|
||
Agent responsible for processing user text input and querying a locally
|
||
hosted LLM for text generation. Receives messages from the BDI Core Agent
|
||
and responds with processed LLM output.
|
||
"""
|
||
|
||
class ReceiveMessageBehaviour(CyclicBehaviour):
|
||
"""
|
||
Cyclic behaviour to continuously listen for incoming messages from
|
||
the BDI Core Agent and handle them.
|
||
"""
|
||
|
||
async def run(self):
|
||
"""
|
||
Receives SPADE messages and processes only those originating from the
|
||
configured BDI agent.
|
||
"""
|
||
msg = await self.receive(timeout=1)
|
||
if not msg:
|
||
return
|
||
|
||
sender = msg.sender.node
|
||
self.agent.logger.debug(
|
||
"Received message: %s from %s",
|
||
msg.body,
|
||
sender,
|
||
)
|
||
|
||
if sender == settings.agent_settings.bdi_core_agent_name:
|
||
self.agent.logger.debug("Processing message from BDI Core Agent")
|
||
await self._process_bdi_message(msg)
|
||
else:
|
||
self.agent.logger.debug("Message ignored (not from BDI Core Agent)")
|
||
|
||
async def _process_bdi_message(self, message: Message):
|
||
"""
|
||
Forwards user text from the BDI to the LLM and replies with the generated text in chunks
|
||
separated by punctuation.
|
||
"""
|
||
user_text = message.body
|
||
# Consume the streaming generator and send a reply for every chunk
|
||
async for chunk in self._query_llm(user_text):
|
||
await self._reply(chunk)
|
||
self.agent.logger.debug(
|
||
"Finished processing BDI message. Response sent in chunks to BDI Core Agent."
|
||
)
|
||
|
||
async def _reply(self, msg: str):
|
||
"""
|
||
Sends a response message back to the BDI Core Agent.
|
||
"""
|
||
reply = Message(
|
||
to=settings.agent_settings.bdi_core_agent_name + "@" + settings.agent_settings.host,
|
||
body=msg,
|
||
)
|
||
await self.send(reply)
|
||
|
||
async def _query_llm(self, prompt: str) -> AsyncGenerator[str]:
|
||
"""
|
||
Sends a chat completion request to the local LLM service and streams the response by
|
||
yielding fragments separated by punctuation like.
|
||
|
||
:param prompt: Input text prompt to pass to the LLM.
|
||
:yield: Fragments of the LLM-generated content.
|
||
"""
|
||
instructions = LLMInstructions(
|
||
"- Be friendly and respectful.\n"
|
||
"- Make the conversation feel natural and engaging.\n"
|
||
"- Speak like a pirate.\n"
|
||
"- When the user asks what you can do, tell them.",
|
||
"- Try to learn the user's name during conversation.\n"
|
||
"- Suggest playing a game of asking yes or no questions where you think of a word "
|
||
"and the user must guess it.",
|
||
)
|
||
messages = [
|
||
{
|
||
"role": "developer",
|
||
"content": instructions.build_developer_instruction(),
|
||
},
|
||
{
|
||
"role": "user",
|
||
"content": prompt,
|
||
},
|
||
]
|
||
|
||
try:
|
||
current_chunk = ""
|
||
async for token in self._stream_query_llm(messages):
|
||
current_chunk += token
|
||
|
||
# Stream the message in chunks separated by punctuation.
|
||
# We include the delimiter in the emitted chunk for natural flow.
|
||
pattern = re.compile(r".*?(?:,|;|:|—|–|\.{3}|…|\.|\?|!)\s*", re.DOTALL)
|
||
for m in pattern.finditer(current_chunk):
|
||
chunk = m.group(0)
|
||
if chunk:
|
||
yield current_chunk
|
||
current_chunk = ""
|
||
|
||
# Yield any remaining tail
|
||
if current_chunk:
|
||
yield current_chunk
|
||
except httpx.HTTPError as err:
|
||
self.agent.logger.error("HTTP error.", exc_info=err)
|
||
yield "LLM service unavailable."
|
||
except Exception as err:
|
||
self.agent.logger.error("Unexpected error.", exc_info=err)
|
||
yield "Error processing the request."
|
||
|
||
async def _stream_query_llm(self, messages) -> AsyncGenerator[str]:
|
||
"""Raises httpx.HTTPError when the API gives an error."""
|
||
async with httpx.AsyncClient(timeout=None) as client:
|
||
async with client.stream(
|
||
"POST",
|
||
settings.llm_settings.local_llm_url,
|
||
json={
|
||
"model": settings.llm_settings.local_llm_model,
|
||
"messages": messages,
|
||
"temperature": 0.3,
|
||
"stream": True,
|
||
},
|
||
) as response:
|
||
response.raise_for_status()
|
||
|
||
async for line in response.aiter_lines():
|
||
if not line or not line.startswith("data: "):
|
||
continue
|
||
|
||
data = line[len("data: ") :]
|
||
if data.strip() == "[DONE]":
|
||
break
|
||
|
||
try:
|
||
event = json.loads(data)
|
||
delta = event.get("choices", [{}])[0].get("delta", {}).get("content")
|
||
if delta:
|
||
yield delta
|
||
except json.JSONDecodeError:
|
||
self.agent.logger.error("Failed to parse LLM response: %s", data)
|
||
|
||
async def setup(self):
|
||
"""
|
||
Sets up the SPADE behaviour to filter and process messages from the
|
||
BDI Core Agent.
|
||
"""
|
||
behaviour = self.ReceiveMessageBehaviour()
|
||
self.add_behaviour(behaviour)
|
||
self.logger.info("LLMAgent setup complete")
|