80 lines
2.0 KiB
Python
80 lines
2.0 KiB
Python
from enum import Enum
|
|
from typing import Any, Literal
|
|
|
|
from pydantic import BaseModel, model_validator
|
|
|
|
|
|
class RIEndpoint(str, Enum):
|
|
"""
|
|
Enumeration of valid endpoints for the Robot Interface (RI).
|
|
"""
|
|
|
|
SPEECH = "actuate/speech"
|
|
GESTURE_SINGLE = "actuate/gesture/single"
|
|
GESTURE_TAG = "actuate/gesture/tag"
|
|
PING = "ping"
|
|
NEGOTIATE_PORTS = "negotiate/ports"
|
|
PAUSE = ""
|
|
|
|
|
|
class RIMessage(BaseModel):
|
|
"""
|
|
Base schema for messages sent to the Robot Interface.
|
|
|
|
:ivar endpoint: The target endpoint/action on the RI.
|
|
:ivar data: The payload associated with the action.
|
|
"""
|
|
|
|
endpoint: RIEndpoint
|
|
data: Any
|
|
|
|
|
|
class SpeechCommand(RIMessage):
|
|
"""
|
|
A specific command to make the robot speak.
|
|
|
|
:ivar endpoint: Fixed to ``RIEndpoint.SPEECH``.
|
|
:ivar data: The text string to be spoken.
|
|
"""
|
|
|
|
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH)
|
|
data: str
|
|
is_priority: bool = False
|
|
|
|
|
|
class GestureCommand(RIMessage):
|
|
"""
|
|
A specific command to make the robot do a gesture.
|
|
|
|
:ivar endpoint: Should be ``RIEndpoint.GESTURE_SINGLE`` or ``RIEndpoint.GESTURE_TAG``.
|
|
:ivar data: The id of the gesture to be executed.
|
|
"""
|
|
|
|
endpoint: Literal[ # pyright: ignore[reportIncompatibleVariableOverride] - We validate this stricter rule ourselves
|
|
RIEndpoint.GESTURE_SINGLE, RIEndpoint.GESTURE_TAG
|
|
]
|
|
data: str
|
|
is_priority: bool = False
|
|
|
|
@model_validator(mode="after")
|
|
def check_endpoint(self):
|
|
allowed = {
|
|
RIEndpoint.GESTURE_SINGLE,
|
|
RIEndpoint.GESTURE_TAG,
|
|
}
|
|
if self.endpoint not in allowed:
|
|
raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG")
|
|
return self
|
|
|
|
|
|
class PauseCommand(RIMessage):
|
|
"""
|
|
A specific command to pause or unpause the robot's actions.
|
|
|
|
:ivar endpoint: Fixed to ``RIEndpoint.PAUSE``.
|
|
:ivar data: A boolean indicating whether to pause (True) or unpause (False).
|
|
"""
|
|
|
|
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.PAUSE)
|
|
data: bool
|