Files
pepperplus-ri/state.py
Twirre Meulenbelt afae6fc331 feat: stream audio to CB
Uses PyAudio and ZeroMQ to publish audio chunks.

ref: N25B-119
2025-10-01 10:50:53 +02:00

48 lines
1.5 KiB
Python

import signal
import threading
class State(object):
"""
Do not create an instance of this class directly: use the instance `state` below. This state must be initiated once,
probably when your program starts.
This class is used to share state between threads. For example, when the program is quit, that all threads can
detect this via the `exit_event` property being set.
"""
def __init__(self):
self.is_initialized = False
self.exit_event = None
def initialize(self):
if self.is_initialized:
print("Already initialized")
return
self.exit_event = threading.Event()
def handle_exit(_, __):
print("Exiting.")
self.exit_event.set()
signal.signal(signal.SIGINT, handle_exit)
signal.signal(signal.SIGTERM, handle_exit)
self.is_initialized = True
def deinitialize(self):
if not self.is_initialized: return
self.is_initialized = False
def __getattribute__(self, name):
# Enforce that the state is initialized before accessing any property (aside from the basic ones)
if name in ("initialize", "deinitialize", "is_initialized", "__dict__", "__class__"):
return object.__getattribute__(self, name)
if not self.is_initialized:
raise RuntimeError("State must be initialized before accessing '%s'" % name)
return object.__getattribute__(self, name)
# Must call `.initialize` before use
state = State()