Compare commits

...

10 Commits

Author SHA1 Message Date
173326d4ad build: add Dockerfile
ref: N25B-280
2025-11-14 14:06:39 +01:00
9c538d927f build: integrate Docker functionality
Add environment variables throughout the code base to support Docker
compose integration.

ref: N25B-280
2025-11-14 13:59:13 +01:00
Twirre Meulenbelt
1518b14867 fix: set empty default norms and goals
ref: N25B-200
2025-11-12 16:33:53 +01:00
Twirre Meulenbelt
858a554c78 feat: update endpoint to support the new UI request type
The UI now sends a Program as defined in our schemas.

ref: N25B-200
2025-11-12 16:30:42 +01:00
Twirre Meulenbelt
5376b3bb4c feat: create the program manager agent to interpret programs from the UI
Extracts norms and goals and sends these to the BDI agent.

ref: N25B-200
2025-11-12 16:28:52 +01:00
8cd8988fe0 feat: (hopefully) optional norms and goals
ref: N25B-200
2025-11-12 14:00:50 +01:00
Twirre Meulenbelt
919604493e Merge remote-tracking branch 'origin/feat/recieve-programs-ui' into demo 2025-11-12 13:39:53 +01:00
273f621b1b feat: norms and goals in BDI
ref: N25B-200
2025-11-12 13:35:15 +01:00
Twirre Meulenbelt
e39139cac9 fix: VAD agent requires reset
Otherwise, it won't start up correctly.

ref: N25B-266
2025-11-12 12:06:17 +01:00
Twirre Meulenbelt
b785493b97 fix: messages are None when no message is received
ref: N25B-265
2025-11-12 11:47:59 +01:00
18 changed files with 264 additions and 71 deletions

14
.dockerignore Normal file
View File

@@ -0,0 +1,14 @@
.git
.venv
__pycache__/
*.pyc
.dockerignore
Dockerfile
README.md
.gitlab-ci.yml
.gitignore
.pre-commit-config.yaml
.githooks/
test/
.pytest_cache/
.ruff_cache/

View File

@@ -30,7 +30,7 @@ HEADER=$(head -n 1 "$COMMIT_MSG_FILE")
# Check for Merge commits (covers 'git merge' and PR merges from GitHub/GitLab) # Check for Merge commits (covers 'git merge' and PR merges from GitHub/GitLab)
# Examples: "Merge branch 'main' into ...", "Merge pull request #123 from ..." # Examples: "Merge branch 'main' into ...", "Merge pull request #123 from ..."
MERGE_PATTERN="^Merge (branch|pull request|tag) .*" MERGE_PATTERN="^Merge (remote-tracking )?(branch|pull request|tag) .*"
if [[ "$HEADER" =~ $MERGE_PATTERN ]]; then if [[ "$HEADER" =~ $MERGE_PATTERN ]]; then
echo -e "${GREEN}Merge commit detected by message content. Skipping validation.${NC}" echo -e "${GREEN}Merge commit detected by message content. Skipping validation.${NC}"
exit 0 exit 0

21
Dockerfile Normal file
View File

@@ -0,0 +1,21 @@
# Debian based image
FROM ghcr.io/astral-sh/uv:0.9.8-trixie-slim
WORKDIR /app
ENV VIRTUAL_ENV=/app/.venv
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
RUN apt-get update && apt-get install -y gcc=4:14.2.0-1 portaudio19-dev && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
COPY pyproject.toml uv.lock .python-version ./
RUN uv sync
COPY . .
EXPOSE 8000
ENV PYTHONPATH=src
CMD [ "fastapi", "run", "src/control_backend/main.py" ]

View File

@@ -1,3 +1,4 @@
import json
import logging import logging
import agentspeak import agentspeak
@@ -37,28 +38,66 @@ class BDICoreAgent(BDIAgent):
Registers custom AgentSpeak actions callable from plans. Registers custom AgentSpeak actions callable from plans.
""" """
@actions.add(".reply", 1) @actions.add(".reply", 3)
def _reply(agent: "BDICoreAgent", term, intention): def _reply(agent: "BDICoreAgent", term, intention):
""" """
Sends text to the LLM (AgentSpeak action). Sends text to the LLM (AgentSpeak action).
Example: .reply("Hello LLM!") Example: .reply("Hello LLM!")
""" """
message_text = agentspeak.grounded(term.args[0], intention.scope) message_text = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug("Reply action sending: %s", message_text) norms = agentspeak.grounded(term.args[1], intention.scope)
goals = agentspeak.grounded(term.args[2], intention.scope)
self._send_to_llm(str(message_text)) self.logger.debug("Norms: %s", norms)
self.logger.debug("Goals: %s", goals)
self.logger.debug("User text: %s", message_text)
self._send_to_llm(str(message_text), str(norms), str(goals))
yield yield
def _send_to_llm(self, text: str): @actions.add(".reply_no_norms", 2)
def _reply_no_norms(agent: "BDICoreAgent", term, intention):
message_text = agentspeak.grounded(term.args[0], intention.scope)
goals = agentspeak.grounded(term.args[1], intention.scope)
self.logger.debug("Goals: %s", goals)
self.logger.debug("User text: %s", message_text)
self._send_to_llm(str(message_text), goals=str(goals))
@actions.add(".reply_no_goals", 2)
def _reply_no_goals(agent: "BDICoreAgent", term, intention):
message_text = agentspeak.grounded(term.args[0], intention.scope)
norms = agentspeak.grounded(term.args[1], intention.scope)
self.logger.debug("Norms: %s", norms)
self.logger.debug("User text: %s", message_text)
self._send_to_llm(str(message_text), norms=str(norms))
@actions.add(".reply_no_goals_no_norms", 1)
def _reply_no_goals_no_norms(agent: "BDICoreAgent", term, intention):
message_text = agentspeak.grounded(term.args[0], intention.scope)
self.logger.debug("User text: %s", message_text)
self._send_to_llm(message_text)
def _send_to_llm(self, text: str, norms: str = None, goals: str = None):
""" """
Sends a text query to the LLM Agent asynchronously. Sends a text query to the LLM Agent asynchronously.
""" """
class SendBehaviour(OneShotBehaviour): class SendBehaviour(OneShotBehaviour):
async def run(self) -> None: async def run(self) -> None:
message_dict = {
"text": text,
"norms": norms if norms else "",
"goals": goals if goals else "",
}
msg = Message( msg = Message(
to=settings.agent_settings.llm_agent_name + "@" + settings.agent_settings.host, to=settings.agent_settings.llm_agent_name + "@" + settings.agent_settings.host,
body=text, body=json.dumps(message_dict),
) )
await self.send(msg) await self.send(msg)

View File

@@ -0,0 +1,27 @@
import zmq
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.core.config import settings
from .receive_programs_behavior import ReceiveProgramsBehavior
class BDIProgramManager(BaseAgent):
"""
Will interpret programs received from the HTTP endpoint. Extracts norms, goals, triggers and
forwards them to the BDI as beliefs.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sub_socket = None
async def setup(self):
context = Context.instance()
self.sub_socket = context.socket(zmq.SUB)
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
self.sub_socket.subscribe("program")
self.add_behaviour(ReceiveProgramsBehavior())

View File

@@ -0,0 +1,59 @@
import json
from pydantic import ValidationError
from spade.behaviour import CyclicBehaviour
from spade.message import Message
from control_backend.core.config import settings
from control_backend.schemas.program import Program
class ReceiveProgramsBehavior(CyclicBehaviour):
async def _receive(self) -> Program | None:
topic, body = await self.agent.sub_socket.recv_multipart()
try:
return Program.model_validate_json(body)
except ValidationError as e:
self.agent.logger.error("Received an invalid program.", exc_info=e)
return None
def _extract_norms(self, program: Program) -> str:
"""First phase only for now, as a single newline delimited string."""
if not program.phases:
return ""
if not program.phases[0].phaseData.norms:
return ""
norm_values = [norm.value for norm in program.phases[0].phaseData.norms]
return "\n".join(norm_values)
def _extract_goals(self, program: Program) -> str:
"""First phase only for now, as a single newline delimited string."""
if not program.phases:
return ""
if not program.phases[0].phaseData.goals:
return ""
goal_descriptions = [goal.description for goal in program.phases[0].phaseData.goals]
return "\n".join(goal_descriptions)
async def _send_to_bdi(self, program: Program):
temp_allowed_parts = {
"norms": [self._extract_norms(program)],
"goals": [self._extract_goals(program)],
}
message = Message(
to=settings.agent_settings.bdi_core_agent_name + "@" + settings.agent_settings.host,
sender=self.agent.jid,
body=json.dumps(temp_allowed_parts),
thread="beliefs",
)
await self.send(message)
self.agent.logger.debug("Sent new norms and goals to the BDI agent.")
async def run(self):
program = await self._receive()
if not program:
return
await self._send_to_bdi(program)

View File

@@ -17,7 +17,9 @@ class BeliefSetterBehaviour(CyclicBehaviour):
async def run(self): async def run(self):
"""Polls for messages and processes them.""" """Polls for messages and processes them."""
msg = await self.receive() msg = await self.receive(timeout=1)
if not msg:
return
self.agent.logger.debug( self.agent.logger.debug(
"Received message from %s with thread '%s' and body: %s", "Received message from %s with thread '%s' and body: %s",
msg.sender, msg.sender,
@@ -37,8 +39,13 @@ class BeliefSetterBehaviour(CyclicBehaviour):
"Message is from the belief collector agent. Processing as belief message." "Message is from the belief collector agent. Processing as belief message."
) )
self._process_belief_message(message) self._process_belief_message(message)
case settings.agent_settings.program_manager_agent_name:
self.agent.logger.debug(
"Processing message from the program manager. Processing as belief message."
)
self._process_belief_message(message)
case _: case _:
self.agent.logger.debug("Not the belief agent, discarding message") self.agent.logger.debug("Not from expected agents, discarding message")
pass pass
def _process_belief_message(self, message: Message): def _process_belief_message(self, message: Message):

View File

@@ -11,7 +11,9 @@ class ReceiveLLMResponseBehaviour(CyclicBehaviour):
""" """
async def run(self): async def run(self):
msg = await self.receive() msg = await self.receive(timeout=1)
if not msg:
return
sender = msg.sender.node sender = msg.sender.node
match sender: match sender:

View File

@@ -38,8 +38,8 @@ class BeliefFromText(CyclicBehaviour):
beliefs = {"mood": ["X"], "car": ["Y"]} beliefs = {"mood": ["X"], "car": ["Y"]}
async def run(self): async def run(self):
msg = await self.receive() msg = await self.receive(timeout=1)
if msg is None: if not msg:
return return
sender = msg.sender.node sender = msg.sender.node

View File

@@ -1,3 +1,18 @@
+new_message : user_said(Message) <- norms("").
goals("").
+new_message : user_said(Message) & norms(Norms) & goals(Goals) <-
-new_message; -new_message;
.reply(Message). .reply(Message, Norms, Goals).
// +new_message : user_said(Message) & norms(Norms) <-
// -new_message;
// .reply_no_goals(Message, Norms).
//
// +new_message : user_said(Message) & goals(Goals) <-
// -new_message;
// .reply_no_norms(Message, Goals).
//
// +new_message : user_said(Message) <-
// -new_message;
// .reply_no_goals_no_norms(Message).

View File

@@ -14,7 +14,9 @@ class ContinuousBeliefCollector(CyclicBehaviour):
""" """
async def run(self): async def run(self):
msg = await self.receive() msg = await self.receive(timeout=1)
if not msg:
return
await self._process_message(msg) await self._process_message(msg)
async def _process_message(self, msg: Message): async def _process_message(self, msg: Message):

View File

@@ -30,7 +30,9 @@ class LLMAgent(BaseAgent):
Receives SPADE messages and processes only those originating from the Receives SPADE messages and processes only those originating from the
configured BDI agent. configured BDI agent.
""" """
msg = await self.receive() msg = await self.receive(timeout=1)
if not msg:
return
sender = msg.sender.node sender = msg.sender.node
self.agent.logger.debug( self.agent.logger.debug(
@@ -50,9 +52,13 @@ class LLMAgent(BaseAgent):
Forwards user text from the BDI to the LLM and replies with the generated text in chunks Forwards user text from the BDI to the LLM and replies with the generated text in chunks
separated by punctuation. separated by punctuation.
""" """
user_text = message.body try:
message = json.loads(message.body)
except json.JSONDecodeError:
self.agent.logger.error("Could not process BDI message.", exc_info=True)
# Consume the streaming generator and send a reply for every chunk # Consume the streaming generator and send a reply for every chunk
async for chunk in self._query_llm(user_text): async for chunk in self._query_llm(message["text"], message["norms"], message["goals"]):
await self._reply(chunk) await self._reply(chunk)
self.agent.logger.debug( self.agent.logger.debug(
"Finished processing BDI message. Response sent in chunks to BDI Core Agent." "Finished processing BDI message. Response sent in chunks to BDI Core Agent."
@@ -68,7 +74,7 @@ class LLMAgent(BaseAgent):
) )
await self.send(reply) await self.send(reply)
async def _query_llm(self, prompt: str) -> AsyncGenerator[str]: async def _query_llm(self, prompt: str, norms: str, goals: str) -> AsyncGenerator[str]:
""" """
Sends a chat completion request to the local LLM service and streams the response by Sends a chat completion request to the local LLM service and streams the response by
yielding fragments separated by punctuation like. yielding fragments separated by punctuation like.
@@ -76,15 +82,7 @@ class LLMAgent(BaseAgent):
:param prompt: Input text prompt to pass to the LLM. :param prompt: Input text prompt to pass to the LLM.
:yield: Fragments of the LLM-generated content. :yield: Fragments of the LLM-generated content.
""" """
instructions = LLMInstructions( instructions = LLMInstructions(norms if norms else None, goals if goals else None)
"- Be friendly and respectful.\n"
"- Make the conversation feel natural and engaging.\n"
"- Speak like a pirate.\n"
"- When the user asks what you can do, tell them.",
"- Try to learn the user's name during conversation.\n"
"- Suggest playing a game of asking yes or no questions where you think of a word "
"and the user must guess it.",
)
messages = [ messages = [
{ {
"role": "developer", "role": "developer",

View File

@@ -6,10 +6,7 @@ class LLMInstructions:
@staticmethod @staticmethod
def default_norms() -> str: def default_norms() -> str:
return """ return "Be friendly and respectful.\nMake the conversation feel natural and engaging."
Be friendly and respectful.
Make the conversation feel natural and engaging.
""".strip()
@staticmethod @staticmethod
def default_goals() -> str: def default_goals() -> str:

View File

@@ -54,7 +54,9 @@ class RICommandAgent(BaseAgent):
"""Behaviour for sending commands received from other Python agents.""" """Behaviour for sending commands received from other Python agents."""
async def run(self): async def run(self):
message: spade.agent.Message = await self.receive(timeout=0.1) message: spade.agent.Message = await self.receive(timeout=1)
if not message:
return
if message and message.to == self.agent.jid: if message and message.to == self.agent.jid:
try: try:
speech_command = SpeechCommand.model_validate_json(message.body) speech_command = SpeechCommand.model_validate_json(message.body)

View File

@@ -21,10 +21,13 @@ class RICommunicationAgent(BaseAgent):
password: str, password: str,
port: int = 5222, port: int = 5222,
verify_security: bool = False, verify_security: bool = False,
address="tcp://localhost:0000", address=None,
bind=False, bind=True,
): ):
super().__init__(jid, password, port, verify_security) super().__init__(jid, password, port, verify_security)
if not address:
self.logger.critical("No address set for negotiations.")
raise Exception # TODO: improve
self._address = address self._address = address
self._bind = bind self._bind = bind
@@ -119,10 +122,7 @@ class RICommunicationAgent(BaseAgent):
port = port_data["port"] port = port_data["port"]
bind = port_data["bind"] bind = port_data["bind"]
if not bind: addr = f"tcp://{settings.zmq_settings.external_host}:{port}"
addr = f"tcp://localhost:{port}"
else:
addr = f"tcp://*:{port}"
match id: match id:
case "main": case "main":

View File

@@ -1,10 +1,10 @@
import json
import logging import logging
from fastapi import APIRouter, HTTPException, Request from fastapi import APIRouter, HTTPException, Request
from pydantic import ValidationError
from control_backend.schemas.message import Message from control_backend.schemas.message import Message
from control_backend.schemas.program import Phase from control_backend.schemas.program import Program
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
router = APIRouter() router = APIRouter()
@@ -16,37 +16,20 @@ async def receive_message(program: Message, request: Request):
Receives a BehaviorProgram as a stringified JSON list inside `message`. Receives a BehaviorProgram as a stringified JSON list inside `message`.
Converts it into real Phase objects. Converts it into real Phase objects.
""" """
logger.info("Received raw program: ") logger.debug("Received raw program: %s", program)
logger.debug("%s", program)
raw_str = program.message # This is the JSON string raw_str = program.message # This is the JSON string
# Convert Json into dict. # Validate program
try: try:
program_list = json.loads(raw_str) program = Program.model_validate_json(raw_str)
except json.JSONDecodeError as e: except ValidationError as e:
logger.error("Failed to decode program JSON: %s", e) logger.error("Failed to validate program JSON: %s", e)
raise HTTPException(status_code=400, detail="Undecodeable Json string") from None raise HTTPException(status_code=400, detail="Not a valid program") from None
# Validate Phases
try:
phases: list[Phase] = [Phase(**phase) for phase in program_list]
except Exception as e:
logger.error("❌ Failed to convert to Phase objects: %s", e)
raise HTTPException(status_code=400, detail="Non-Phase String") from None
logger.info(f"Succesfully recieved {len(phases)} Phase(s).")
for p in phases:
logger.info(
f"Phase {p.id}: "
f"{len(p.phaseData.norms)} norms, "
f"{len(p.phaseData.goals)} goals, "
f"{len(p.phaseData.triggers) if hasattr(p.phaseData, 'triggers') else 0} triggers"
)
# send away # send away
topic = b"program" topic = b"program"
body = json.dumps([p.model_dump() for p in phases]).encode("utf-8") body = program.model_dump_json().encode()
pub_socket = request.app.state.endpoints_pub_socket pub_socket = request.app.state.endpoints_pub_socket
await pub_socket.send_multipart([topic, body]) await pub_socket.send_multipart([topic, body])
return {"status": "Program parsed", "phase_count": len(phases)} return {"status": "Program parsed"}

View File

@@ -1,3 +1,5 @@
import os
from pydantic import BaseModel from pydantic import BaseModel
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -6,9 +8,11 @@ class ZMQSettings(BaseModel):
internal_pub_address: str = "tcp://localhost:5560" internal_pub_address: str = "tcp://localhost:5560"
internal_sub_address: str = "tcp://localhost:5561" internal_sub_address: str = "tcp://localhost:5561"
external_host: str = "0.0.0.0"
class AgentSettings(BaseModel): class AgentSettings(BaseModel):
host: str = "localhost" host: str = os.environ.get("XMPP_HOST", "localhost")
bdi_core_agent_name: str = "bdi_core" bdi_core_agent_name: str = "bdi_core"
belief_collector_agent_name: str = "belief_collector" belief_collector_agent_name: str = "belief_collector"
text_belief_extractor_agent_name: str = "text_belief_extractor" text_belief_extractor_agent_name: str = "text_belief_extractor"
@@ -16,14 +20,15 @@ class AgentSettings(BaseModel):
llm_agent_name: str = "llm_agent" llm_agent_name: str = "llm_agent"
test_agent_name: str = "test_agent" test_agent_name: str = "test_agent"
transcription_agent_name: str = "transcription_agent" transcription_agent_name: str = "transcription_agent"
program_manager_agent_name: str = "program_manager"
ri_communication_agent_name: str = "ri_communication_agent" ri_communication_agent_name: str = "ri_communication_agent"
ri_command_agent_name: str = "ri_command_agent" ri_command_agent_name: str = "ri_command_agent"
class LLMSettings(BaseModel): class LLMSettings(BaseModel):
local_llm_url: str = "http://localhost:1234/v1/chat/completions" local_llm_url: str = os.environ.get("LLM_URL", "http://localhost:1234/v1/") + "chat/completions"
local_llm_model: str = "openai/gpt-oss-20b" local_llm_model: str = os.environ.get("LLM_MODEL", "openai/gpt-oss-20b")
class Settings(BaseSettings): class Settings(BaseSettings):

View File

@@ -1,5 +1,6 @@
import contextlib import contextlib
import logging import logging
import os
import threading import threading
import zmq import zmq
@@ -14,6 +15,7 @@ from control_backend.agents import (
VADAgent, VADAgent,
) )
from control_backend.agents.bdi import BDICoreAgent, TBeliefExtractorAgent from control_backend.agents.bdi import BDICoreAgent, TBeliefExtractorAgent
from control_backend.agents.bdi.bdi_program_manager.bdi_program_manager import BDIProgramManager
from control_backend.api.v1.router import api_router from control_backend.api.v1.router import api_router
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.logging import setup_logging from control_backend.logging import setup_logging
@@ -48,7 +50,9 @@ async def lifespan(app: FastAPI):
# --- APPLICATION STARTUP --- # --- APPLICATION STARTUP ---
setup_logging() setup_logging()
logger.info("%s is starting up.", app.title) logger.info("%s is starting up.", app.title)
logger.warning("testing extra", extra={"extra1": "one", "extra2": "two"}) logger.info(
"LLM_URL: %s, LLM_MODEL: %s", os.environ.get("LLM_URL"), os.environ.get("LLM_MODEL")
)
# Initiate sockets # Initiate sockets
proxy_thread = threading.Thread(target=setup_sockets) proxy_thread = threading.Thread(target=setup_sockets)
@@ -71,7 +75,7 @@ async def lifespan(app: FastAPI):
"jid": f"{settings.agent_settings.ri_communication_agent_name}" "jid": f"{settings.agent_settings.ri_communication_agent_name}"
f"@{settings.agent_settings.host}", f"@{settings.agent_settings.host}",
"password": settings.agent_settings.ri_communication_agent_name, "password": settings.agent_settings.ri_communication_agent_name,
"address": "tcp://*:5555", "address": f"tcp://{settings.zmq_settings.external_host}:5555",
"bind": True, "bind": True,
}, },
), ),
@@ -113,21 +117,39 @@ async def lifespan(app: FastAPI):
), ),
"VADAgent": ( "VADAgent": (
VADAgent, VADAgent,
{"audio_in_address": "tcp://localhost:5558", "audio_in_bind": False}, {
"audio_in_address": f"tcp://{settings.zmq_settings.external_host}:5558",
"audio_in_bind": True,
},
),
"ProgramManager": (
BDIProgramManager,
{
"name": settings.agent_settings.program_manager_agent_name,
"jid": f"{settings.agent_settings.program_manager_agent_name}@"
f"{settings.agent_settings.host}",
"password": settings.agent_settings.program_manager_agent_name,
},
), ),
} }
vad_agent_instance = None
for name, (agent_class, kwargs) in agents_to_start.items(): for name, (agent_class, kwargs) in agents_to_start.items():
try: try:
logger.debug("Starting agent: %s", name) logger.debug("Starting agent: %s", name)
agent_instance = agent_class(**{k: v for k, v in kwargs.items() if k != "name"}) agent_instance = agent_class(**{k: v for k, v in kwargs.items() if k != "name"})
await agent_instance.start() await agent_instance.start()
if isinstance(agent_instance, VADAgent):
vad_agent_instance = agent_instance
logger.info("Agent '%s' started successfully.", name) logger.info("Agent '%s' started successfully.", name)
except Exception as e: except Exception as e:
logger.error("Failed to start agent '%s': %s", name, e, exc_info=True) logger.error("Failed to start agent '%s': %s", name, e, exc_info=True)
# Consider if the application should continue if an agent fails to start. # Consider if the application should continue if an agent fails to start.
raise raise
await vad_agent_instance.streaming_behaviour.reset()
logger.info("Application startup complete.") logger.info("Application startup complete.")
yield yield