feat: add environment variables and docs
ref: N25B-352
This commit is contained in:
@@ -1,95 +1,101 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from robot_interface.utils.get_config import get_config
|
||||
|
||||
|
||||
class AgentSettings(object):
|
||||
"""
|
||||
Agent port configuration.
|
||||
|
||||
:ivar actuating_receiver_port: Port for receiving actuation commands.
|
||||
:vartype actuating_receiver_port: int
|
||||
:ivar main_receiver_port: Port for receiving main messages.
|
||||
:ivar control_backend_host: Hostname of the control backend, defaults to "localhost".
|
||||
:vartype control_backend_host: string
|
||||
:ivar actuation_receiver_port: Port for receiving actuation commands, defaults to 5557.
|
||||
:vartype actuation_receiver_port: int
|
||||
:ivar main_receiver_port: Port for receiving main messages, defaults to 5555.
|
||||
:vartype main_receiver_port: int
|
||||
:ivar video_sender_port: Port used for sending video frames.
|
||||
:ivar video_sender_port: Port used for sending video frames, defaults to 5556.
|
||||
:vartype video_sender_port: int
|
||||
:ivar audio_sender_port: Port used for sending audio data.
|
||||
:ivar audio_sender_port: Port used for sending audio data, defaults to 5558.
|
||||
:vartype audio_sender_port: int
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
actuating_receiver_port=5557,
|
||||
main_receiver_port=5555,
|
||||
video_sender_port=5556,
|
||||
audio_sender_port=5558,
|
||||
self,
|
||||
control_backend_host=None,
|
||||
actuation_receiver_port=None,
|
||||
main_receiver_port=None,
|
||||
video_sender_port=None,
|
||||
audio_sender_port=None,
|
||||
):
|
||||
self.actuating_receiver_port = actuating_receiver_port
|
||||
self.main_receiver_port = main_receiver_port
|
||||
self.video_sender_port = video_sender_port
|
||||
self.audio_sender_port = audio_sender_port
|
||||
self.control_backend_host = get_config(control_backend_host, "AGENT__CONTROL_BACKEND_HOST", "localhost")
|
||||
self.actuation_receiver_port = get_config(actuation_receiver_port, "AGENT__ACTUATION_RECEIVER_PORT", 5557, int)
|
||||
self.main_receiver_port = get_config(main_receiver_port, "AGENT__MAIN_RECEIVER_PORT", 5555, int)
|
||||
self.video_sender_port = get_config(video_sender_port, "AGENT__VIDEO_SENDER_PORT", 5556, int)
|
||||
self.audio_sender_port = get_config(audio_sender_port, "AGENT__AUDIO_SENDER_PORT", 5558, int)
|
||||
|
||||
|
||||
class VideoConfig(object):
|
||||
"""
|
||||
Video configuration constants.
|
||||
|
||||
:ivar camera_index: Index of the camera used.
|
||||
:ivar camera_index: Index of the camera used, defaults to 0.
|
||||
:vartype camera_index: int
|
||||
:ivar resolution: Video resolution mode.
|
||||
:ivar resolution: Video resolution mode, defaults to 2.
|
||||
:vartype resolution: int
|
||||
:ivar color_space: Color space identifier.
|
||||
:ivar color_space: Color space identifier, defaults to 11.
|
||||
:vartype color_space: int
|
||||
:ivar fps: Frames per second of the video stream.
|
||||
:ivar fps: Frames per second of the video stream, defaults to 15.
|
||||
:vartype fps: int
|
||||
:ivar stream_name: Name of the video stream.
|
||||
:ivar stream_name: Name of the video stream, defaults to "Pepper Video".
|
||||
:vartype stream_name: str
|
||||
:ivar image_buffer: Internal buffer size for video frames.
|
||||
:ivar image_buffer: Internal buffer size for video frames, defaults to 6.
|
||||
:vartype image_buffer: int
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
camera_index=0,
|
||||
resolution=2,
|
||||
color_space=11,
|
||||
fps=15,
|
||||
stream_name="Pepper Video",
|
||||
image_buffer=6,
|
||||
camera_index=None,
|
||||
resolution=None,
|
||||
color_space=None,
|
||||
fps=None,
|
||||
stream_name=None,
|
||||
image_buffer=None,
|
||||
):
|
||||
self.camera_index = camera_index
|
||||
self.resolution = resolution
|
||||
self.color_space = color_space
|
||||
self.fps = fps
|
||||
self.stream_name = stream_name
|
||||
self.image_buffer = image_buffer
|
||||
self.camera_index = get_config(camera_index, "VIDEO__CAMERA_INDEX", 0, int)
|
||||
self.resolution = get_config(resolution, "VIDEO__RESOLUTION", 2, int)
|
||||
self.color_space = get_config(color_space, "VIDEO__COLOR_SPACE", 11, int)
|
||||
self.fps = get_config(fps, "VIDEO__FPS", 15, int)
|
||||
self.stream_name = get_config(stream_name, "VIDEO__STREAM_NAME", "Pepper Video")
|
||||
self.image_buffer = get_config(image_buffer, "VIDEO__IMAGE_BUFFER", 6, int)
|
||||
|
||||
|
||||
class AudioConfig(object):
|
||||
"""
|
||||
Audio configuration constants.
|
||||
|
||||
:ivar sample_rate: Audio sampling rate in Hz.
|
||||
:ivar sample_rate: Audio sampling rate in Hz, defaults to 16000.
|
||||
:vartype sample_rate: int
|
||||
:ivar chunk_size: Size of audio chunks to capture/process.
|
||||
:ivar chunk_size: Size of audio chunks to capture/process, defaults to 512.
|
||||
:vartype chunk_size: int
|
||||
:ivar channels: Number of audio channels.
|
||||
:ivar channels: Number of audio channels, defaults to 1.
|
||||
:vartype channels: int
|
||||
"""
|
||||
def __init__(self, sample_rate=16000, chunk_size=512, channels=1):
|
||||
self.sample_rate = sample_rate
|
||||
self.chunk_size = chunk_size
|
||||
self.channels = channels
|
||||
def __init__(self, sample_rate=None, chunk_size=None, channels=None):
|
||||
self.sample_rate = get_config(sample_rate, "AUDIO__SAMPLE_RATE", 16000, int)
|
||||
self.chunk_size = get_config(chunk_size, "AUDIO__CHUNK_SIZE", 512, int)
|
||||
self.channels = get_config(channels, "AUDIO__CHANNELS", 1, int)
|
||||
|
||||
|
||||
class MainConfig(object):
|
||||
"""
|
||||
Main system configuration.
|
||||
|
||||
:ivar poll_timeout_ms: Timeout for polling events, in milliseconds.
|
||||
:ivar poll_timeout_ms: Timeout for polling events, in milliseconds, defaults to 100.
|
||||
:vartype poll_timeout_ms: int
|
||||
:ivar max_handler_time_ms: Maximum allowed handler time, in milliseconds.
|
||||
:ivar max_handler_time_ms: Maximum allowed handler time, in milliseconds, defaults to 50.
|
||||
:vartype max_handler_time_ms: int
|
||||
"""
|
||||
def __init__(self, poll_timeout_ms=100, max_handler_time_ms=50):
|
||||
self.poll_timeout_ms = poll_timeout_ms
|
||||
self.max_handler_time_ms = max_handler_time_ms
|
||||
def __init__(self, poll_timeout_ms=None, max_handler_time_ms=None):
|
||||
self.poll_timeout_ms = get_config(poll_timeout_ms, "MAIN__POLL_TIMEOUT_MS", 100, int)
|
||||
self.max_handler_time_ms = get_config(max_handler_time_ms, "MAIN__MAX_HANDLER_TIME_MS", 50, int)
|
||||
|
||||
|
||||
class Settings(object):
|
||||
|
||||
@@ -22,7 +22,7 @@ class ActuationReceiver(ReceiverBase):
|
||||
:ivar _tts_service: The text-to-speech service object from the Qi session.
|
||||
:vartype _tts_service: qi.Session | None
|
||||
"""
|
||||
def __init__(self, zmq_context, port=settings.agent_settings.actuating_receiver_port):
|
||||
def __init__(self, zmq_context, port=settings.agent_settings.actuation_receiver_port):
|
||||
super(ActuationReceiver, self).__init__("actuation")
|
||||
self.create_socket(zmq_context, zmq.SUB, port)
|
||||
self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options
|
||||
|
||||
@@ -5,6 +5,7 @@ from robot_interface.state import state
|
||||
|
||||
from robot_interface.core.config import settings
|
||||
|
||||
|
||||
class MainReceiver(ReceiverBase):
|
||||
"""
|
||||
The main receiver endpoint, responsible for handling ping and negotiation requests.
|
||||
@@ -12,10 +13,12 @@ class MainReceiver(ReceiverBase):
|
||||
:param zmq_context: The ZeroMQ context to use.
|
||||
:type zmq_context: zmq.Context
|
||||
|
||||
:param port: The port to use.
|
||||
:param port: The port to use, defaults to value in `settings.agent_settings.main_receiver_port`.
|
||||
:type port: int
|
||||
"""
|
||||
def __init__(self, zmq_context, port=settings.agent_settings.main_receiver_port):
|
||||
def __init__(self, zmq_context, port=None):
|
||||
if port is None:
|
||||
port = settings.agent_settings.main_receiver_port
|
||||
super(MainReceiver, self).__init__("main")
|
||||
self.create_socket(zmq_context, zmq.REP, port, bind=False)
|
||||
|
||||
|
||||
@@ -2,6 +2,8 @@ from abc import ABCMeta
|
||||
|
||||
import zmq
|
||||
|
||||
from robot_interface.core.config import settings
|
||||
|
||||
|
||||
class SocketBase(object):
|
||||
"""
|
||||
@@ -59,7 +61,7 @@ class SocketBase(object):
|
||||
if bind:
|
||||
self.socket.bind("tcp://*:{}".format(port))
|
||||
else:
|
||||
self.socket.connect("tcp://localhost:{}".format(port))
|
||||
self.socket.connect("tcp://{}:{}".format(settings.agent_settings.control_backend_host, port))
|
||||
|
||||
def close(self):
|
||||
"""Close the ZeroMQ socket."""
|
||||
|
||||
32
src/robot_interface/utils/get_config.py
Normal file
32
src/robot_interface/utils/get_config.py
Normal file
@@ -0,0 +1,32 @@
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
def get_config(value, env, default, cast=None):
|
||||
"""
|
||||
Small utility to get a configuration value, returns `value` if it is not None, else it will try to get the
|
||||
environment variable cast with `cast`. If the environment variable is not set, it will return `default`.
|
||||
|
||||
:param value: The value to check.
|
||||
:type value: Any
|
||||
:param env: The environment variable to check.
|
||||
:type env: string
|
||||
:param default: The default value to return if the environment variable is not set.
|
||||
:type default: Any
|
||||
:param cast: A function to use to cast the environment variable. Must support string input.
|
||||
:type cast: Callable[[Any], Any], optional
|
||||
|
||||
:return: The value, the environment variable value, or the default.
|
||||
:rtype: Any
|
||||
"""
|
||||
if value is not None:
|
||||
return value
|
||||
|
||||
env = os.environ.get(env, default)
|
||||
|
||||
if cast is None:
|
||||
return env
|
||||
|
||||
return cast(env)
|
||||
Reference in New Issue
Block a user