fix: create tests for new ri commands
ref: N25B-334
This commit is contained in:
@@ -1 +1,2 @@
|
||||
from .robot_gesture_agent import RobotGestureAgent as RobotGestureAgent
|
||||
from .robot_speech_agent import RobotSpeechAgent as RobotSpeechAgent
|
||||
|
||||
@@ -54,6 +54,7 @@ class RobotGestureAgent(BaseAgent):
|
||||
context = azmq.Context.instance()
|
||||
|
||||
# To the robot
|
||||
|
||||
self.pubsocket = context.socket(zmq.PUB)
|
||||
if self.bind: # TODO: Should this ever be the case?
|
||||
self.pubsocket.bind(self.address)
|
||||
@@ -64,7 +65,7 @@ class RobotGestureAgent(BaseAgent):
|
||||
self.subsocket = context.socket(zmq.SUB)
|
||||
self.subsocket.connect(settings.zmq_settings.internal_sub_address)
|
||||
self.subsocket.setsockopt(zmq.SUBSCRIBE, b"command")
|
||||
|
||||
# This one
|
||||
self.add_behavior(self._zmq_command_loop())
|
||||
|
||||
self.logger.info("Finished setting up %s", self.name)
|
||||
|
||||
@@ -187,7 +187,7 @@ class RICommunicationAgent(BaseAgent):
|
||||
bind=bind,
|
||||
)
|
||||
robot_gesture_agent = RobotGestureAgent(
|
||||
settings.agent_settings.robot_speech_name,
|
||||
settings.agent_settings.robot_gesture_name,
|
||||
address=addr,
|
||||
bind=bind,
|
||||
gesture_data=gesture_data,
|
||||
|
||||
@@ -47,6 +47,7 @@ class AgentSettings(BaseModel):
|
||||
transcription_name: str = "transcription_agent"
|
||||
ri_communication_name: str = "ri_communication_agent"
|
||||
robot_speech_name: str = "robot_speech_agent"
|
||||
robot_gesture_name: str = "robot_gesture_agent"
|
||||
|
||||
|
||||
class BehaviourSettings(BaseModel):
|
||||
|
||||
@@ -10,7 +10,8 @@ class RIEndpoint(str, Enum):
|
||||
"""
|
||||
|
||||
SPEECH = "actuate/speech"
|
||||
GESTURE = "actuate/gesture"
|
||||
GESTURE_SINGLE = "actuate/gesture/single"
|
||||
GESTURE_TAG = "actuate/gesture/tag"
|
||||
PING = "ping"
|
||||
NEGOTIATE_PORTS = "negotiate/ports"
|
||||
|
||||
@@ -43,9 +44,9 @@ class GestureCommand(RIMessage):
|
||||
"""
|
||||
A specific command to make the robot do a gesture.
|
||||
|
||||
:ivar endpoint: Fixed to ``RIEndpoint.GESTURE``.
|
||||
:ivar endpoint: Should be ``RIEndpoint.GESTURE_SINGLE`` or ``RIEndpoint.GESTURE_TAG``.
|
||||
:ivar data: The id of the gesture to be executed.
|
||||
"""
|
||||
|
||||
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH)
|
||||
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.GESTURE_TAG) or RIEndpoint(RIEndpoint.GESTURE_TAG)
|
||||
data: str
|
||||
|
||||
@@ -1,26 +1,58 @@
|
||||
import pytest
|
||||
from pydantic import ValidationError
|
||||
|
||||
from control_backend.schemas.ri_message import RIEndpoint, RIMessage, SpeechCommand
|
||||
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, RIMessage, SpeechCommand
|
||||
|
||||
|
||||
def valid_command_1():
|
||||
return SpeechCommand(data="Hallo?")
|
||||
|
||||
|
||||
def valid_command_2():
|
||||
return GestureCommand(endpoint=RIEndpoint.GESTURE_TAG, data="happy")
|
||||
|
||||
|
||||
def valid_command_3():
|
||||
return GestureCommand(endpoint=RIEndpoint.GESTURE_SINGLE, data="happy_1")
|
||||
|
||||
|
||||
def invalid_command_1():
|
||||
return RIMessage(endpoint=RIEndpoint.PING, data="Hello again.")
|
||||
|
||||
|
||||
def invalid_command_2():
|
||||
return GestureCommand(endpoint=RIEndpoint.PING, data="Hey!")
|
||||
|
||||
|
||||
def test_valid_speech_command_1():
|
||||
command = valid_command_1()
|
||||
RIMessage.model_validate(command)
|
||||
SpeechCommand.model_validate(command)
|
||||
|
||||
|
||||
def test_valid_gesture_command_1():
|
||||
command = valid_command_2()
|
||||
RIMessage.model_validate(command)
|
||||
GestureCommand.model_validate(command)
|
||||
|
||||
|
||||
def test_valid_gesture_command_2():
|
||||
command = valid_command_3()
|
||||
RIMessage.model_validate(command)
|
||||
GestureCommand.model_validate(command)
|
||||
|
||||
|
||||
def test_invalid_speech_command_1():
|
||||
command = invalid_command_1()
|
||||
RIMessage.model_validate(command)
|
||||
|
||||
with pytest.raises(ValidationError):
|
||||
SpeechCommand.model_validate(command)
|
||||
|
||||
|
||||
def test_invalid_gesture_command_1():
|
||||
command = invalid_command_2()
|
||||
RIMessage.model_validate(command)
|
||||
|
||||
with pytest.raises(ValidationError):
|
||||
GestureCommand.model_validate(command)
|
||||
|
||||
Reference in New Issue
Block a user