Merge remote-tracking branch 'origin/dev' into feat/transcription-agent

This commit is contained in:
Twirre Meulenbelt
2025-10-28 21:51:09 +01:00
16 changed files with 1110 additions and 3 deletions

View File

@@ -0,0 +1,74 @@
import json
import logging
from spade.agent import Agent
from spade.behaviour import CyclicBehaviour
import zmq
from control_backend.core.config import settings
from control_backend.core.zmq_context import context
from control_backend.schemas.ri_message import SpeechCommand
logger = logging.getLogger(__name__)
class RICommandAgent(Agent):
subsocket: zmq.Socket
pubsocket: zmq.Socket
address = ""
bind = False
def __init__(
self,
jid: str,
password: str,
port: int = 5222,
verify_security: bool = False,
address="tcp://localhost:0000",
bind=False,
):
super().__init__(jid, password, port, verify_security)
self.address = address
self.bind = bind
class SendCommandsBehaviour(CyclicBehaviour):
async def run(self):
"""
Run the command publishing loop indefinetely.
"""
assert self.agent is not None
# Get a message internally (with topic command)
topic, body = await self.agent.subsocket.recv_multipart()
# Try to get body
try:
body = json.loads(body)
message = SpeechCommand.model_validate(body)
# Send to the robot.
await self.agent.pubsocket.send_json(message.model_dump())
except Exception as e:
logger.error("Error processing message: %s", e)
async def setup(self):
"""
Setup the command agent
"""
logger.info("Setting up %s", self.jid)
# To the robot
self.pubsocket = context.socket(zmq.PUB)
if self.bind:
self.pubsocket.bind(self.address)
else:
self.pubsocket.connect(self.address)
# Receive internal topics regarding commands
self.subsocket = context.socket(zmq.SUB)
self.subsocket.connect(settings.zmq_settings.internal_comm_address)
self.subsocket.setsockopt(zmq.SUBSCRIBE, b"command")
# Add behaviour to our agent
commands_behaviour = self.SendCommandsBehaviour()
self.add_behaviour(commands_behaviour)
logger.info("Finished setting up %s", self.jid)

View File

@@ -0,0 +1,165 @@
import asyncio
import json
import logging
from spade.agent import Agent
from spade.behaviour import CyclicBehaviour
import zmq
from control_backend.core.config import settings
from control_backend.core.zmq_context import context
from control_backend.schemas.message import Message
from control_backend.agents.ri_command_agent import RICommandAgent
logger = logging.getLogger(__name__)
class RICommunicationAgent(Agent):
req_socket: zmq.Socket
_address = ""
_bind = True
def __init__(
self,
jid: str,
password: str,
port: int = 5222,
verify_security: bool = False,
address="tcp://localhost:0000",
bind=False,
):
super().__init__(jid, password, port, verify_security)
self._address = address
self._bind = bind
class ListenBehaviour(CyclicBehaviour):
async def run(self):
"""
Run the listening (ping) loop indefinetely.
"""
assert self.agent is not None
# We need to listen and sent pings.
message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}}
await self.agent.req_socket.send_json(message)
# Wait up to three seconds for a reply:)
try:
message = await asyncio.wait_for(self.agent.req_socket.recv_json(), timeout=3.0)
# We didnt get a reply :(
except asyncio.TimeoutError as e:
logger.info("No ping retrieved in 3 seconds, killing myself.")
self.kill()
logger.debug('Received message "%s"', message)
if "endpoint" not in message:
logger.error("No received endpoint in message, excepted ping endpoint.")
return
# See what endpoint we received
match message["endpoint"]:
case "ping":
await asyncio.sleep(1)
case _:
logger.info(
"Received message with topic different than ping, while ping expected."
)
async def setup(self, max_retries: int = 5):
"""
Try to setup the communication agent, we have 5 retries in case we dont have a response yet.
"""
logger.info("Setting up %s", self.jid)
retries = 0
# Let's try a certain amount of times before failing connection
while retries < max_retries:
# Bind request socket
self.req_socket = context.socket(zmq.REQ)
if self._bind:
self.req_socket.bind(self._address)
else:
self.req_socket.connect(self._address)
# Send our message and receive one back:)
message = {"endpoint": "negotiate/ports", "data": None}
await self.req_socket.send_json(message)
try:
received_message = await asyncio.wait_for(self.req_socket.recv_json(), timeout=20.0)
except asyncio.TimeoutError:
logger.warning(
"No connection established in 20 seconds (attempt %d/%d)",
retries + 1,
max_retries,
)
retries += 1
continue
except Exception as e:
logger.error("Unexpected error during negotiation: %s", e)
retries += 1
continue
# Validate endpoint
endpoint = received_message.get("endpoint")
if endpoint != "negotiate/ports":
# TODO: Should this send a message back?
logger.error(
"Invalid endpoint '%s' received (attempt %d/%d)",
endpoint,
retries + 1,
max_retries,
)
retries += 1
continue
# At this point, we have a valid response
try:
for port_data in received_message["data"]:
id = port_data["id"]
port = port_data["port"]
bind = port_data["bind"]
if not bind:
addr = f"tcp://localhost:{port}"
else:
addr = f"tcp://*:{port}"
match id:
case "main":
if addr != self._address:
if not bind:
self.req_socket.connect(addr)
else:
self.req_socket.bind(addr)
case "actuation":
ri_commands_agent = RICommandAgent(
settings.agent_settings.ri_command_agent_name
+ "@"
+ settings.agent_settings.host,
settings.agent_settings.ri_command_agent_name,
address=addr,
bind=bind,
)
await ri_commands_agent.start()
case _:
logger.warning("Unhandled negotiation id: %s", id)
except Exception as e:
logger.error("Error unpacking negotiation data: %s", e)
retries += 1
continue
# setup succeeded
break
else:
logger.error("Failed to set up RICommunicationAgent after %d retries", max_retries)
return
# Set up ping behaviour
listen_behaviour = self.ListenBehaviour()
self.add_behaviour(listen_behaviour)
logger.info("Finished setting up %s", self.jid)

View File

@@ -0,0 +1,22 @@
from fastapi import APIRouter, Request
import logging
from zmq import Socket
from control_backend.schemas.ri_message import SpeechCommand, RIEndpoint
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/command", status_code=202)
async def receive_command(command: SpeechCommand, request: Request):
# Validate and retrieve data.
SpeechCommand.model_validate(command)
topic = b"command"
pub_socket: Socket = request.app.state.internal_comm_socket
pub_socket.send_multipart([topic, command.model_dump_json().encode()])
return {"status": "Command received"}

View File

@@ -1,9 +1,11 @@
from fastapi.routing import APIRouter
from control_backend.api.v1.endpoints import message, sse
from control_backend.api.v1.endpoints import message, sse, command
api_router = APIRouter()
api_router.include_router(message.router, tags=["Messages"])
api_router.include_router(sse.router, tags=["SSE"])
api_router.include_router(command.router, tags=["Commands"])

View File

@@ -12,6 +12,9 @@ class AgentSettings(BaseModel):
belief_collector_agent_name: str = "belief_collector"
vad_agent_name: str = "vad_agent"
ri_communication_agent_name: str = "ri_communication_agent"
ri_command_agent_name: str = "ri_command_agent"
class Settings(BaseSettings):
app_title: str = "PepperPlus"

View File

@@ -9,6 +9,7 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
# Internal imports
from control_backend.agents.ri_communication_agent import RICommunicationAgent
from control_backend.agents.bdi.bdi_core import BDICoreAgent
from control_backend.agents.vad_agent import VADAgent
from control_backend.api.v1.router import api_router
@@ -31,6 +32,14 @@ async def lifespan(app: FastAPI):
logger.info("Internal publishing socket bound to %s", internal_comm_socket)
# Initiate agents
ri_communication_agent = RICommunicationAgent(
settings.agent_settings.ri_communication_agent_name + "@" + settings.agent_settings.host,
settings.agent_settings.ri_communication_agent_name,
address="tcp://*:5555",
bind=True,
)
await ri_communication_agent.start()
bdi_core = BDICoreAgent(
settings.agent_settings.bdi_core_agent_name + "@" + settings.agent_settings.host,
settings.agent_settings.bdi_core_agent_name,

View File

@@ -0,0 +1,20 @@
from enum import Enum
from typing import Any, Literal
from pydantic import BaseModel, Field, ValidationError
class RIEndpoint(str, Enum):
SPEECH = "actuate/speech"
PING = "ping"
NEGOTIATE_PORTS = "negotiate/ports"
class RIMessage(BaseModel):
endpoint: RIEndpoint
data: Any
class SpeechCommand(RIMessage):
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH)
data: str