feat: agent structure and implementation new

architecture with unit tests

ref: N25B-205
This commit is contained in:
Björn Otgaar
2025-10-22 10:28:48 +02:00
parent 2cacf17023
commit 3d7ef2b874
10 changed files with 953 additions and 223 deletions

View File

@@ -0,0 +1,60 @@
import json
import logging
from spade.agent import Agent
from spade.behaviour import CyclicBehaviour
import zmq
from control_backend.core.config import settings
from control_backend.core.zmq_context import context
from control_backend.schemas.message import Message
logger = logging.getLogger(__name__)
class RICommandAgent(Agent):
subsocket: zmq.Socket
pubsocket: zmq.Socket
address = ""
bind = False
def __init__(self, jid: str, password: str, port: int = 5222, verify_security: bool = False, address = "tcp://localhost:0000", bind = False):
super().__init__(jid, password, port, verify_security)
self.address = address
self.bind = bind
class SendCommandsBehaviour(CyclicBehaviour):
async def run(self):
assert self.agent is not None
# Get a message internally (with topic command)
topic, body = await self.agent.subsocket.recv_multipart()
# Try to get body
try:
message_json = json.loads(body.decode("utf-8"))
message = Message.model_validate(message_json)
logger.info("Received message \"%s\"", message.message)
# Send to the robot.
await self.agent.pubsocket.send_json(message)
except Exception as e:
logger.error("Error processing message: %s", e)
async def setup(self):
logger.info("Setting up %s", self.jid)
# To the robot
self.pubsocket = context.socket(zmq.PUB)
if self.bind:
self.pubsocket.bind(self.address)
else :
self.pubsocket.connect(self.address)
# Receive internal topics regarding commands
self.subsocket = context.socket(zmq.SUB)
self.subsocket.connect(settings.zmq_settings.internal_comm_address)
self.subsocket.setsockopt(zmq.SUBSCRIBE, b"command")
# Add behaviour to our agent
commands_behaviour = self.SendCommandsBehaviour()
self.add_behaviour(commands_behaviour)
logger.info("Finished setting up %s", self.jid)

View File

@@ -0,0 +1,138 @@
import asyncio
import json
import logging
from spade.agent import Agent
from spade.behaviour import CyclicBehaviour
import zmq
from control_backend.core.config import settings
from control_backend.core.zmq_context import context
from control_backend.schemas.message import Message
from control_backend.agents.ri_command_agent import RICommandAgent
logger = logging.getLogger(__name__)
class RICommunicationAgent(Agent):
req_socket: zmq.Socket
_address = ""
_bind = True
def __init__(self, jid: str, password: str, port: int = 5222, verify_security: bool = False, address = "tcp://localhost:0000", bind = False):
super().__init__(jid, password, port, verify_security)
self._address = address
self._bind = bind
class ListenBehaviour(CyclicBehaviour):
async def run(self):
assert self.agent is not None
# We need to listen and sent pings.
message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}}
await self.agent.req_socket.send_json(message)
# Wait up to three seconds for a reply:)
try:
message = await asyncio.wait_for(
self.agent.req_socket.recv_json(),
timeout=3.0)
# We didnt get a reply :(
except asyncio.TimeoutError as e:
logger.info("No ping retrieved in 3 seconds, killing myself.")
self.kill()
# message = Message.model_validate(message)
logger.info("Received message \"%s\"", message)
if "endpoint" not in message:
logger.error("No received endpoint in message, excepted ping endpoint.")
return
# See what endpoint we received
match message["endpoint"]:
case "ping":
await asyncio.sleep(1)
case _:
logger.info("Received message with topic different than ping, while ping expected.")
async def setup(self, max_retries: int = 5):
logger.info("Setting up %s", self.jid)
retries = 0
# Let's try a certain amount of times before failing connection
while retries < max_retries:
# Bind request socket
self.req_socket = context.socket(zmq.REQ)
if self._bind:
self.req_socket.bind(self._address)
else:
self.req_socket.connect(self._address)
# Send our message and receive one back:)
message = {"endpoint": "negotiate/ports", "data": None}
await self.req_socket.send_json(message)
try:
received_message = await asyncio.wait_for(self.req_socket.recv_json(), timeout=20.0)
except asyncio.TimeoutError:
logger.warning("No connection established in 20 seconds (attempt %d/%d)", retries + 1, max_retries)
retries += 1
continue
except Exception as e:
logger.error("Unexpected error during negotiation: %s", e)
retries += 1
continue
# Validate endpoint
endpoint = received_message.get("endpoint")
if endpoint != "negotiate/ports":
# TODO: Should this send a message back?
logger.error("Invalid endpoint '%s' received (attempt %d/%d)", endpoint, retries + 1, max_retries)
retries += 1
continue
# At this point, we have a valid response
try:
for port_data in received_message["data"]:
id = port_data["id"]
port = port_data["port"]
bind = port_data["bind"]
addr = f"tcp://localhost:{port}"
match id:
case "main":
if addr != self._address:
if not bind:
self.req_socket.connect(addr)
else:
self.req_socket.bind(addr)
case "actuation":
ri_commands_agent = RICommandAgent(
settings.agent_settings.ri_command_agent_name + '@' + settings.agent_settings.host,
settings.agent_settings.ri_command_agent_name,
address=addr,
bind=bind )
await ri_commands_agent.start()
case _:
logger.warning("Unhandled negotiation id: %s", id)
except Exception as e:
logger.error("Error unpacking negotiation data: %s", e)
retries += 1
continue
# setup succeeded
break
else:
logger.error("Failed to set up RICommunicationAgent after %d retries", max_retries)
return
# Set up ping behaviour
listen_behaviour = self.ListenBehaviour()
self.add_behaviour(listen_behaviour)
logger.info("Finished setting up %s", self.jid)

View File

@@ -1,15 +1,27 @@
from re import L
from pydantic import BaseModel
from pydantic_settings import BaseSettings, SettingsConfigDict
class ZMQSettings(BaseModel):
internal_comm_address: str = "tcp://localhost:5560"
class AgentSettings(BaseModel):
host: str = "localhost"
bdi_core_agent_name: str = "bdi_core"
belief_collector_agent_name: str = "belief_collector"
test_agent_name: str = "test_agent"
ri_communication_agent_name: str = "ri_communication_agent"
ri_command_agent_name: str = "ri_command_agent"
class Settings(BaseSettings):
app_title: str = "PepperPlus"
ui_url: str = "http://localhost:5173"
zmq_settings: ZMQSettings = ZMQSettings()
agent_settings: AgentSettings = AgentSettings()
model_config = SettingsConfigDict(env_file=".env")

View File

@@ -7,6 +7,7 @@ import zmq
# Internal imports
from control_backend.agents.test_agent import TestAgent
from control_backend.agents.ri_communication_agent import RICommunicationAgent
from control_backend.api.v1.router import api_router
from control_backend.core.config import settings
from control_backend.core.zmq_context import context
@@ -26,9 +27,13 @@ async def lifespan(app: FastAPI):
logger.info("Internal publishing socket bound to %s", internal_comm_socket)
# Initiate agents
test_agent = TestAgent("test_agent@localhost", "test_agent")
await test_agent.start()
logger.info(settings.agent_settings.ri_communication_agent_name + '@' + settings.agent_settings.host)
logger.info(settings.agent_settings.ri_communication_agent_name)
ri_communication_agent = RICommunicationAgent(settings.agent_settings.ri_communication_agent_name + '@' + settings.agent_settings.host,
settings.agent_settings.ri_communication_agent_name,
address="tcp://*:5555", bind=True)
await ri_communication_agent.start()
yield
logger.info("%s shutting down.", app.title)