import logging import signal import threading from robot_interface.utils.qi_utils import get_qi_session class State(object): """ Do not create an instance of this class directly: use the instance `state` below. This state must be initiated once, probably when your program starts. This class is used to share state between threads. For example, when the program is quit, that all threads can detect this via the `exit_event` property being set. :ivar is_initialized: Flag indicating whether the state setup (exit handlers, QI session) has completed. :vartype is_initialized: bool :ivar exit_event: A thread event used to signal all threads that the program is shutting down. :vartype exit_event: threading.Event | None :ivar sockets: A list of ZeroMQ socket wrappers (`SocketBase`) that need to be closed during deinitialization. :vartype sockets: List[SocketBase] :ivar qi_session: The QI session object used for interaction with the robot/platform services. :vartype qi_session: None | qi.Session """ def __init__(self): self.is_initialized = False self.exit_event = None self.sockets = [] self.qi_session = None self.is_speaking = False def initialize(self): """ Sets up the application state. Creates the thread exit event, registers signal handlers (`SIGINT`, `SIGTERM`) for graceful shutdown, and establishes the QI session. """ if self.is_initialized: logging.warn("Already initialized") return self.exit_event = threading.Event() def handle_exit(_, __): logging.info("Exiting.") self.exit_event.set() signal.signal(signal.SIGINT, handle_exit) signal.signal(signal.SIGTERM, handle_exit) self.qi_session = get_qi_session() self.is_initialized = True def deinitialize(self): """ Closes all sockets stored in the `sockets` list. """ if not self.is_initialized: return for socket in self.sockets: socket.close() self.is_initialized = False def __getattribute__(self, name): """ Custom attribute access method that enforces a check: the state must be fully initialized before any non-setup attributes (like `sockets` or `qi_session`) can be accessed. :param name: The name of the attribute being accessed. :type name: str :return: The value of the requested attribute. :rtype: Any """ if name in ( "initialize", "deinitialize", "is_initialized", "__dict__", "__class__", "__doc__"): return object.__getattribute__(self, name) if not object.__getattribute__(self, "is_initialized"): # Special case for the exit_event: if the event is set, return it without an error if name == "exit_event": exit_event = object.__getattribute__(self, "exit_event") if exit_event and exit_event.is_set(): return exit_event raise RuntimeError("State must be initialized before accessing '%s'" % name) return object.__getattribute__(self, name) # Must call `.initialize` before use state = State()