refactor: rename all agents and improve structure pt1
ref: N25B-257
This commit is contained in:
163
src/control_backend/agents/llm_agents/llm_agent.py
Normal file
163
src/control_backend/agents/llm_agents/llm_agent.py
Normal file
@@ -0,0 +1,163 @@
|
||||
import json
|
||||
import re
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
import httpx
|
||||
from spade.behaviour import CyclicBehaviour
|
||||
from spade.message import Message
|
||||
|
||||
from control_backend.agents import BaseAgent
|
||||
from control_backend.core.config import settings
|
||||
|
||||
from .llm_instructions import LLMInstructions
|
||||
|
||||
|
||||
class LLMAgent(BaseAgent):
|
||||
"""
|
||||
Agent responsible for processing user text input and querying a locally
|
||||
hosted LLM for text generation. Receives messages from the BDI Core Agent
|
||||
and responds with processed LLM output.
|
||||
"""
|
||||
|
||||
class ReceiveMessageBehaviour(CyclicBehaviour):
|
||||
"""
|
||||
Cyclic behaviour to continuously listen for incoming messages from
|
||||
the BDI Core Agent and handle them.
|
||||
"""
|
||||
|
||||
async def run(self):
|
||||
"""
|
||||
Receives SPADE messages and processes only those originating from the
|
||||
configured BDI agent.
|
||||
"""
|
||||
msg = await self.receive()
|
||||
|
||||
sender = msg.sender.node
|
||||
self.agent.logger.debug(
|
||||
"Received message: %s from %s",
|
||||
msg.body,
|
||||
sender,
|
||||
)
|
||||
|
||||
if sender == settings.agent_settings.bdi_core_agent_agent_name:
|
||||
self.agent.logger.debug("Processing message from BDI Core Agent")
|
||||
await self._process_bdi_message(msg)
|
||||
else:
|
||||
self.agent.logger.debug("Message ignored (not from BDI Core Agent)")
|
||||
|
||||
async def _process_bdi_message(self, message: Message):
|
||||
"""
|
||||
Forwards user text from the BDI to the LLM and replies with the generated text in chunks
|
||||
separated by punctuation.
|
||||
"""
|
||||
user_text = message.body
|
||||
# Consume the streaming generator and send a reply for every chunk
|
||||
async for chunk in self._query_llm(user_text):
|
||||
await self._reply(chunk)
|
||||
self.agent.logger.debug(
|
||||
"Finished processing BDI message. Response sent in chunks to BDI Core Agent."
|
||||
)
|
||||
|
||||
async def _reply(self, msg: str):
|
||||
"""
|
||||
Sends a response message back to the BDI Core Agent.
|
||||
"""
|
||||
reply = Message(
|
||||
to=settings.agent_settings.bdi_core_agent_agent_name
|
||||
+ "@"
|
||||
+ settings.agent_settings.host,
|
||||
body=msg,
|
||||
)
|
||||
await self.send(reply)
|
||||
|
||||
async def _query_llm(self, prompt: str) -> AsyncGenerator[str]:
|
||||
"""
|
||||
Sends a chat completion request to the local LLM service and streams the response by
|
||||
yielding fragments separated by punctuation like.
|
||||
|
||||
:param prompt: Input text prompt to pass to the LLM.
|
||||
:yield: Fragments of the LLM-generated content.
|
||||
"""
|
||||
instructions = LLMInstructions(
|
||||
"- Be friendly and respectful.\n"
|
||||
"- Make the conversation feel natural and engaging.\n"
|
||||
"- Speak like a pirate.\n"
|
||||
"- When the user asks what you can do, tell them.",
|
||||
"- Try to learn the user's name during conversation.\n"
|
||||
"- Suggest playing a game of asking yes or no questions where you think of a word "
|
||||
"and the user must guess it.",
|
||||
)
|
||||
messages = [
|
||||
{
|
||||
"role": "developer",
|
||||
"content": instructions.build_developer_instruction(),
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": prompt,
|
||||
},
|
||||
]
|
||||
|
||||
try:
|
||||
current_chunk = ""
|
||||
async for token in self._stream_query_llm(messages):
|
||||
current_chunk += token
|
||||
|
||||
# Stream the message in chunks separated by punctuation.
|
||||
# We include the delimiter in the emitted chunk for natural flow.
|
||||
pattern = re.compile(r".*?(?:,|;|:|—|–|\.{3}|…|\.|\?|!)\s*", re.DOTALL)
|
||||
for m in pattern.finditer(current_chunk):
|
||||
chunk = m.group(0)
|
||||
if chunk:
|
||||
yield current_chunk
|
||||
current_chunk = ""
|
||||
|
||||
# Yield any remaining tail
|
||||
if current_chunk:
|
||||
yield current_chunk
|
||||
except httpx.HTTPError as err:
|
||||
self.agent.logger.error("HTTP error.", exc_info=err)
|
||||
yield "LLM service unavailable."
|
||||
except Exception as err:
|
||||
self.agent.logger.error("Unexpected error.", exc_info=err)
|
||||
yield "Error processing the request."
|
||||
|
||||
async def _stream_query_llm(self, messages) -> AsyncGenerator[str]:
|
||||
"""Raises httpx.HTTPError when the API gives an error."""
|
||||
async with httpx.AsyncClient(timeout=None) as client:
|
||||
async with client.stream(
|
||||
"POST",
|
||||
settings.llm_settings.local_llm_url,
|
||||
json={
|
||||
"model": settings.llm_settings.local_llm_model,
|
||||
"messages": messages,
|
||||
"temperature": 0.3,
|
||||
"stream": True,
|
||||
},
|
||||
) as response:
|
||||
response.raise_for_status()
|
||||
|
||||
async for line in response.aiter_lines():
|
||||
if not line or not line.startswith("data: "):
|
||||
continue
|
||||
|
||||
data = line[len("data: ") :]
|
||||
if data.strip() == "[DONE]":
|
||||
break
|
||||
|
||||
try:
|
||||
event = json.loads(data)
|
||||
delta = event.get("choices", [{}])[0].get("delta", {}).get("content")
|
||||
if delta:
|
||||
yield delta
|
||||
except json.JSONDecodeError:
|
||||
self.agent.logger.error("Failed to parse LLM response: %s", data)
|
||||
|
||||
async def setup(self):
|
||||
"""
|
||||
Sets up the SPADE behaviour to filter and process messages from the
|
||||
BDI Core Agent.
|
||||
"""
|
||||
behaviour = self.ReceiveMessageBehaviour()
|
||||
self.add_behaviour(behaviour)
|
||||
self.logger.info("LLMAgent setup complete")
|
||||
Reference in New Issue
Block a user