174 lines
6.3 KiB
Python
174 lines
6.3 KiB
Python
import json
|
|
|
|
import zmq
|
|
import zmq.asyncio as azmq
|
|
|
|
from control_backend.agents import BaseAgent
|
|
from control_backend.core.agent_system import InternalMessage
|
|
from control_backend.core.config import settings
|
|
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint
|
|
|
|
|
|
class RobotGestureAgent(BaseAgent):
|
|
"""
|
|
This agent acts as a bridge between the control backend and the Robot Interface (RI).
|
|
It receives gesture commands from other agents or from the UI,
|
|
and forwards them to the robot via a ZMQ PUB socket.
|
|
|
|
:ivar subsocket: ZMQ SUB socket for receiving external commands (e.g., from UI).
|
|
:ivar pubsocket: ZMQ PUB socket for sending commands to the Robot Interface.
|
|
:ivar address: Address to bind/connect the PUB socket.
|
|
:ivar bind: Whether to bind or connect the PUB socket.
|
|
:ivar gesture_data: A list of strings for available gestures
|
|
"""
|
|
|
|
subsocket: azmq.Socket
|
|
repsocket: azmq.Socket
|
|
pubsocket: azmq.Socket
|
|
address = ""
|
|
bind = False
|
|
gesture_data = []
|
|
single_gesture_data = []
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
address=settings.zmq_settings.ri_command_address,
|
|
bind=False,
|
|
gesture_data=None,
|
|
single_gesture_data=None,
|
|
):
|
|
self.gesture_data = gesture_data or []
|
|
self.single_gesture_data = single_gesture_data or []
|
|
super().__init__(name)
|
|
self.address = address
|
|
self.bind = bind
|
|
|
|
async def setup(self):
|
|
"""
|
|
Initialize the agent.
|
|
|
|
1. Sets up the PUB socket to talk to the robot.
|
|
2. Sets up the SUB socket to listen for "command" topics (from UI/External).
|
|
3. Starts the loop for handling ZMQ commands.
|
|
"""
|
|
self.logger.info("Setting up %s", self.name)
|
|
|
|
context = azmq.Context.instance()
|
|
|
|
# To the robot
|
|
self.pubsocket = context.socket(zmq.PUB)
|
|
if self.bind:
|
|
self.pubsocket.bind(self.address)
|
|
else:
|
|
self.pubsocket.connect(self.address)
|
|
|
|
# Receive internal topics regarding commands
|
|
self.subsocket = context.socket(zmq.SUB)
|
|
self.subsocket.connect(settings.zmq_settings.internal_sub_address)
|
|
self.subsocket.setsockopt(zmq.SUBSCRIBE, b"command")
|
|
self.subsocket.setsockopt(zmq.SUBSCRIBE, b"send_gestures")
|
|
|
|
# REP socket for replying to gesture requests
|
|
self.repsocket = context.socket(zmq.REP)
|
|
self.repsocket.bind(settings.zmq_settings.internal_gesture_rep_adress)
|
|
|
|
self.add_behavior(self._zmq_command_loop())
|
|
self.add_behavior(self._fetch_gestures_loop())
|
|
|
|
self.logger.info("Finished setting up %s", self.name)
|
|
|
|
async def stop(self):
|
|
if self.subsocket:
|
|
self.subsocket.close()
|
|
if self.pubsocket:
|
|
self.pubsocket.close()
|
|
if self.repsocket:
|
|
self.repsocket.close()
|
|
await super().stop()
|
|
|
|
async def handle_message(self, msg: InternalMessage):
|
|
"""
|
|
Handle commands received from other internal Python agents.
|
|
|
|
Validates the message as a :class:`GestureCommand` and forwards it to the robot.
|
|
|
|
:param msg: The internal message containing the command.
|
|
"""
|
|
try:
|
|
gesture_command = GestureCommand.model_validate_json(msg.body)
|
|
if gesture_command.endpoint == RIEndpoint.GESTURE_TAG:
|
|
if gesture_command.data not in self.gesture_data:
|
|
self.logger.warning(
|
|
"Received gesture tag '%s' which is not in available tags. Early returning",
|
|
gesture_command.data,
|
|
)
|
|
return
|
|
elif gesture_command.endpoint == RIEndpoint.GESTURE_SINGLE:
|
|
if gesture_command.data not in self.single_gesture_data:
|
|
self.logger.warning(
|
|
"Received gesture '%s' which is not in available gestures. Early returning",
|
|
gesture_command.data,
|
|
)
|
|
return
|
|
await self.pubsocket.send_json(gesture_command.model_dump())
|
|
except Exception:
|
|
self.logger.exception("Error processing internal message.")
|
|
|
|
async def _zmq_command_loop(self):
|
|
"""
|
|
Loop to handle commands received via ZMQ (e.g., from the UI).
|
|
|
|
Listens on the 'command' topic, validates the JSON and forwards it to the robot.
|
|
"""
|
|
while self._running:
|
|
try:
|
|
topic, body = await self.subsocket.recv_multipart()
|
|
|
|
# Don't process send_gestures here
|
|
if topic != b"command":
|
|
continue
|
|
|
|
body = json.loads(body)
|
|
gesture_command = GestureCommand.model_validate(body)
|
|
if gesture_command.endpoint == RIEndpoint.GESTURE_TAG:
|
|
if gesture_command.data not in self.gesture_data:
|
|
self.logger.warning(
|
|
"Received gesture tag '%s' which is not in available tags.\
|
|
Early returning",
|
|
gesture_command.data,
|
|
)
|
|
continue
|
|
await self.pubsocket.send_json(gesture_command.model_dump())
|
|
except Exception:
|
|
self.logger.exception("Error processing ZMQ message.")
|
|
|
|
async def _fetch_gestures_loop(self):
|
|
"""
|
|
Loop to handle fetching gestures received via ZMQ (e.g., from the UI).
|
|
|
|
Listens on the 'send_gestures' topic, and returns a list on the get_gestures topic.
|
|
"""
|
|
while self._running:
|
|
try:
|
|
# Get a request
|
|
body = await self.repsocket.recv()
|
|
|
|
# Figure out amount, if specified
|
|
try:
|
|
body = json.loads(body)
|
|
except json.JSONDecodeError:
|
|
body = None
|
|
|
|
amount = None
|
|
if isinstance(body, int):
|
|
amount = body
|
|
|
|
# Fetch tags from gesture data and respond
|
|
tags = self.gesture_data[:amount] if amount else self.gesture_data
|
|
response = json.dumps({"tags": tags}).encode()
|
|
await self.repsocket.send(response)
|
|
|
|
except Exception:
|
|
self.logger.exception("Error fetching gesture tags.")
|