build: merge

merge with riAgent

ref: N25B-208
This commit is contained in:
JobvAlewijk
2025-10-28 15:03:47 +01:00
25 changed files with 1372 additions and 76 deletions

View File

@@ -13,7 +13,7 @@ from control_backend.core.config import settings
class BDICoreAgent(BDIAgent):
"""
This is the Brain agent that does the belief inference with AgentSpeak.
This is the Brain agent that does the belief inference with AgentSpeak.
This is a continous process that happens automatically in the background.
This class contains all the actions that can be called from AgentSpeak plans.
It has the BeliefSetter behaviour and can aks and recieve requests from the LLM agent.

View File

@@ -8,15 +8,17 @@ from spade_bdi.bdi import BDIAgent
from control_backend.core.config import settings
class BeliefSetter(CyclicBehaviour):
"""
This is the behaviour that the BDI agent runs.
This behaviour waits for incoming message and processes it based on sender.
Currently, t only waits for messages containing beliefs from Belief Collector and adds these to its KB.
This is the behaviour that the BDI agent runs. This behaviour waits for incoming
message and processes it based on sender. Currently, it only waits for messages
containing beliefs from BeliefCollector and adds these to its KB.
"""
agent: BDIAgent
logger = logging.getLogger("BDI/Belief Setter")
async def run(self):
msg = await self.receive(timeout=0.1)
if msg:
@@ -37,7 +39,8 @@ class BeliefSetter(CyclicBehaviour):
pass
def _process_belief_message(self, message: Message):
if not message.body: return
if not message.body:
return
match message.thread:
case "beliefs":
@@ -49,7 +52,6 @@ class BeliefSetter(CyclicBehaviour):
case _:
pass
def _set_beliefs(self, beliefs: dict[str, list[list[str]]]):
if self.agent.bdi is None:
self.logger.warning("Cannot set beliefs, since agent's BDI is not yet initialized.")

View File

@@ -0,0 +1,74 @@
import json
import logging
from spade.agent import Agent
from spade.behaviour import CyclicBehaviour
import zmq
from control_backend.core.config import settings
from control_backend.core.zmq_context import context
from control_backend.schemas.ri_message import SpeechCommand
logger = logging.getLogger(__name__)
class RICommandAgent(Agent):
subsocket: zmq.Socket
pubsocket: zmq.Socket
address = ""
bind = False
def __init__(
self,
jid: str,
password: str,
port: int = 5222,
verify_security: bool = False,
address="tcp://localhost:0000",
bind=False,
):
super().__init__(jid, password, port, verify_security)
self.address = address
self.bind = bind
class SendCommandsBehaviour(CyclicBehaviour):
async def run(self):
"""
Run the command publishing loop indefinetely.
"""
assert self.agent is not None
# Get a message internally (with topic command)
topic, body = await self.agent.subsocket.recv_multipart()
# Try to get body
try:
body = json.loads(body)
message = SpeechCommand.model_validate(body)
# Send to the robot.
await self.agent.pubsocket.send_json(message.model_dump())
except Exception as e:
logger.error("Error processing message: %s", e)
async def setup(self):
"""
Setup the command agent
"""
logger.info("Setting up %s", self.jid)
# To the robot
self.pubsocket = context.socket(zmq.PUB)
if self.bind:
self.pubsocket.bind(self.address)
else:
self.pubsocket.connect(self.address)
# Receive internal topics regarding commands
self.subsocket = context.socket(zmq.SUB)
self.subsocket.connect(settings.zmq_settings.internal_comm_address)
self.subsocket.setsockopt(zmq.SUBSCRIBE, b"command")
# Add behaviour to our agent
commands_behaviour = self.SendCommandsBehaviour()
self.add_behaviour(commands_behaviour)
logger.info("Finished setting up %s", self.jid)

View File

@@ -0,0 +1,165 @@
import asyncio
import json
import logging
from spade.agent import Agent
from spade.behaviour import CyclicBehaviour
import zmq
from control_backend.core.config import settings
from control_backend.core.zmq_context import context
from control_backend.schemas.message import Message
from control_backend.agents.ri_command_agent import RICommandAgent
logger = logging.getLogger(__name__)
class RICommunicationAgent(Agent):
req_socket: zmq.Socket
_address = ""
_bind = True
def __init__(
self,
jid: str,
password: str,
port: int = 5222,
verify_security: bool = False,
address="tcp://localhost:0000",
bind=False,
):
super().__init__(jid, password, port, verify_security)
self._address = address
self._bind = bind
class ListenBehaviour(CyclicBehaviour):
async def run(self):
"""
Run the listening (ping) loop indefinetely.
"""
assert self.agent is not None
# We need to listen and sent pings.
message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}}
await self.agent.req_socket.send_json(message)
# Wait up to three seconds for a reply:)
try:
message = await asyncio.wait_for(self.agent.req_socket.recv_json(), timeout=3.0)
# We didnt get a reply :(
except asyncio.TimeoutError as e:
logger.info("No ping retrieved in 3 seconds, killing myself.")
self.kill()
logger.debug('Received message "%s"', message)
if "endpoint" not in message:
logger.error("No received endpoint in message, excepted ping endpoint.")
return
# See what endpoint we received
match message["endpoint"]:
case "ping":
await asyncio.sleep(1)
case _:
logger.info(
"Received message with topic different than ping, while ping expected."
)
async def setup(self, max_retries: int = 5):
"""
Try to setup the communication agent, we have 5 retries in case we dont have a response yet.
"""
logger.info("Setting up %s", self.jid)
retries = 0
# Let's try a certain amount of times before failing connection
while retries < max_retries:
# Bind request socket
self.req_socket = context.socket(zmq.REQ)
if self._bind:
self.req_socket.bind(self._address)
else:
self.req_socket.connect(self._address)
# Send our message and receive one back:)
message = {"endpoint": "negotiate/ports", "data": None}
await self.req_socket.send_json(message)
try:
received_message = await asyncio.wait_for(self.req_socket.recv_json(), timeout=20.0)
except asyncio.TimeoutError:
logger.warning(
"No connection established in 20 seconds (attempt %d/%d)",
retries + 1,
max_retries,
)
retries += 1
continue
except Exception as e:
logger.error("Unexpected error during negotiation: %s", e)
retries += 1
continue
# Validate endpoint
endpoint = received_message.get("endpoint")
if endpoint != "negotiate/ports":
# TODO: Should this send a message back?
logger.error(
"Invalid endpoint '%s' received (attempt %d/%d)",
endpoint,
retries + 1,
max_retries,
)
retries += 1
continue
# At this point, we have a valid response
try:
for port_data in received_message["data"]:
id = port_data["id"]
port = port_data["port"]
bind = port_data["bind"]
if not bind:
addr = f"tcp://localhost:{port}"
else:
addr = f"tcp://*:{port}"
match id:
case "main":
if addr != self._address:
if not bind:
self.req_socket.connect(addr)
else:
self.req_socket.bind(addr)
case "actuation":
ri_commands_agent = RICommandAgent(
settings.agent_settings.ri_command_agent_name
+ "@"
+ settings.agent_settings.host,
settings.agent_settings.ri_command_agent_name,
address=addr,
bind=bind,
)
await ri_commands_agent.start()
case _:
logger.warning("Unhandled negotiation id: %s", id)
except Exception as e:
logger.error("Error unpacking negotiation data: %s", e)
retries += 1
continue
# setup succeeded
break
else:
logger.error("Failed to set up RICommunicationAgent after %d retries", max_retries)
return
# Set up ping behaviour
listen_behaviour = self.ListenBehaviour()
self.add_behaviour(listen_behaviour)
logger.info("Finished setting up %s", self.jid)

View File

@@ -0,0 +1,22 @@
from fastapi import APIRouter, Request
import logging
from zmq import Socket
from control_backend.schemas.ri_message import SpeechCommand, RIEndpoint
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/command", status_code=202)
async def receive_command(command: SpeechCommand, request: Request):
# Validate and retrieve data.
SpeechCommand.model_validate(command)
topic = b"command"
pub_socket: Socket = request.app.state.internal_comm_socket
pub_socket.send_multipart([topic, command.model_dump_json().encode()])
return {"status": "Command received"}

View File

@@ -1,6 +1,6 @@
from fastapi import APIRouter, Request
import logging
from fastapi import APIRouter, Request
from zmq import Socket
from control_backend.schemas.message import Message
@@ -9,6 +9,7 @@ logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/message", status_code=202)
async def receive_message(message: Message, request: Request):
logger.info("Received message: %s", message.message)

View File

@@ -2,7 +2,8 @@ from fastapi import APIRouter, Request
router = APIRouter()
# TODO: implement
@router.get("/sse")
async def sse(request: Request):
pass
pass

View File

@@ -1,15 +1,11 @@
from fastapi.routing import APIRouter
from control_backend.api.v1.endpoints import message, sse
from control_backend.api.v1.endpoints import message, sse, command
api_router = APIRouter()
api_router.include_router(
message.router,
tags=["Messages"]
)
api_router.include_router(message.router, tags=["Messages"])
api_router.include_router(
sse.router,
tags=["SSE"]
)
api_router.include_router(sse.router, tags=["SSE"])
api_router.include_router(command.router, tags=["Commands"])

View File

@@ -1,10 +1,11 @@
from re import L
from pydantic import BaseModel
from pydantic_settings import BaseSettings, SettingsConfigDict
class ZMQSettings(BaseModel):
internal_comm_address: str = "tcp://localhost:5560"
class AgentSettings(BaseModel):
host: str = "xmpp.twirre.dev"
bdi_core_agent_name: str = "bdi_core"
@@ -12,6 +13,10 @@ class AgentSettings(BaseModel):
llm_agent_name: str = "llm_agent"
test_agent_name: str = "test_agent"
ri_communication_agent_name: str = "ri_communication_agent"
ri_command_agent_name: str = "ri_command_agent"
class LLMSettings(BaseModel):
local_llm_url: str = "http://127.0.0.1:1234/v1/chat/completions"
local_llm_model: str = "openai/gpt-oss-20b"
@@ -26,9 +31,7 @@ class Settings(BaseSettings):
agent_settings: AgentSettings = AgentSettings()
llm_settings: LLMSettings = LLMSettings()
model_config = SettingsConfigDict(env_file=".env")
settings = Settings()

View File

@@ -1,26 +1,25 @@
# Standard library imports
import asyncio
import json
# External imports
import contextlib
import logging
import zmq
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import logging
from spade.agent import Agent, Message
from spade.behaviour import OneShotBehaviour
import zmq
# Internal imports
from control_backend.agents.ri_communication_agent import RICommunicationAgent
from control_backend.agents.bdi.bdi_core import BDICoreAgent
from control_backend.agents.llm.llm import LLMAgent
from control_backend.api.v1.router import api_router
from control_backend.core.config import AgentSettings, settings
from control_backend.core.config import settings
from control_backend.core.zmq_context import context
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("%s starting up.", app.title)
@@ -34,6 +33,13 @@ async def lifespan(app: FastAPI):
# Initiate agents
ri_communication_agent = RICommunicationAgent(
settings.agent_settings.ri_communication_agent_name + "@" + settings.agent_settings.host,
settings.agent_settings.ri_communication_agent_name,
address="tcp://*:5555",
bind=True,
)
await ri_communication_agent.start()
llm_agent = LLMAgent(settings.agent_settings.llm_agent_name + '@' + settings.agent_settings.host,
@@ -42,23 +48,24 @@ async def lifespan(app: FastAPI):
bdi_core = BDICoreAgent(settings.agent_settings.bdi_core_agent_name + '@' + settings.agent_settings.host,
"secret, ask twirre", "src/control_backend/agents/bdi/rules.asl")
await bdi_core.start()
yield
logger.info("%s shutting down.", app.title)
# if __name__ == "__main__":
app = FastAPI(title=settings.app_title, lifespan=lifespan)
# This middleware allows other origins to communicate with us
app.add_middleware(
CORSMiddleware, # https://developer.mozilla.org/en-US/docs/Web/HTTP/Guides/CORS
allow_origins=[settings.ui_url], # address of our UI application
allow_methods=["*"], # GET, POST, etc.
CORSMiddleware, # https://developer.mozilla.org/en-US/docs/Web/HTTP/Guides/CORS
allow_origins=[settings.ui_url], # address of our UI application
allow_methods=["*"], # GET, POST, etc.
)
app.include_router(api_router, prefix="") # TODO: make prefix /api/v1
app.include_router(api_router, prefix="") # TODO: make prefix /api/v1
@app.get("/")
async def root():

View File

@@ -1,4 +1,5 @@
from pydantic import BaseModel
class Message(BaseModel):
message: str
message: str

View File

@@ -0,0 +1,20 @@
from enum import Enum
from typing import Any, Literal
from pydantic import BaseModel, Field, ValidationError
class RIEndpoint(str, Enum):
SPEECH = "actuate/speech"
PING = "ping"
NEGOTIATE_PORTS = "negotiate/ports"
class RIMessage(BaseModel):
endpoint: RIEndpoint
data: Any
class SpeechCommand(RIMessage):
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH)
data: str