feat: pause RI

Pause functionality in RI implemented. The audio_sender and video_sender stop sending when paused.

ref: N25B-350
This commit is contained in:
Storm
2025-12-22 13:36:18 +01:00
parent 1e77548622
commit 358c4f6872
7 changed files with 435 additions and 84 deletions

View File

@@ -77,21 +77,60 @@ class AudioSender(SocketBase):
chunk = audio_settings.chunk_size # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD
# Docs say this only raises an error if neither `input` nor `output` is True
stream = self.audio.open(
format=pyaudio.paFloat32,
channels=audio_settings.channels,
rate=audio_settings.sample_rate,
input=True,
input_device_index=self.microphone["index"],
frames_per_buffer=chunk,
)
def open_stream():
return self.audio.open(
format=pyaudio.paFloat32,
channels=audio_settings.channels,
rate=audio_settings.sample_rate,
input=True,
input_device_index=self.microphone["index"],
frames_per_buffer=chunk,
)
stream = None
try:
state.active_event.wait() # Wait until the system is not paused
# Test in case exit_event was set while waiting
if not state.exit_event.is_set():
stream = open_stream()
while not state.exit_event.is_set():
data = stream.read(chunk)
self.socket.send(data)
if not state.active_event.is_set(): # when paused
# Stop and close the stream if it is open to prevent buffer overflow
if stream:
try:
stream.stop_stream()
stream.close()
except IOError:
pass # Ignore errors on closing
stream = None
state.active_event.wait() # Wait until unpaused
# Check if exit_event was set while waiting
if state.exit_event.is_set():
break
stream = open_stream()
if stream:
try:
data = stream.read(chunk)
self.socket.send(data)
except IOError as e:
logger.warn("Audio read error occurred.", exc_info=e)
if stream:
stream.close()
stream = open_stream()
except IOError as e:
logger.error("Stopped listening: failed to get audio from microphone.", exc_info=e)
finally:
stream.stop_stream()
stream.close()
if stream:
try:
stream.stop_stream()
stream.close()
except IOError:
pass # Ignore errors on closing

View File

@@ -71,6 +71,30 @@ class MainReceiver(ReceiverBase):
return MainReceiver._handle_port_negotiation(message)
return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."}
@staticmethod
def _handle_pause(message):
"""
Handle a pause request. Pauses or resumes the video and audio streams.
:param message: The pause request message.
:type message: dict
:return: A response dictionary indicating success.
:rtype: dict[str, str]
"""
if message.get("data"):
if state.active_event.is_set():
state.active_event.clear()
return {"endpoint": "pause", "data": "Streams paused."}
else:
return {"endpoint": "pause", "data": "Streams are already paused."}
else:
if not state.active_event.is_set():
state.active_event.set()
return {"endpoint": "pause", "data": "Streams resumed."}
else:
return {"endpoint": "pause", "data": "Streams are already running."}
def handle_message(self, message):
"""
@@ -88,5 +112,7 @@ class MainReceiver(ReceiverBase):
return self._handle_ping(message)
elif message["endpoint"].startswith("negotiate"):
return self._handle_negotiation(message)
elif message["endpoint"] == "pause":
return self._handle_pause(message)
return {"endpoint": "error", "data": "The requested endpoint is not supported."}

View File

@@ -52,6 +52,10 @@ class VideoSender(SocketBase):
:type vid_stream_name: str
"""
while not state.exit_event.is_set():
state.active_event.wait() # Wait until the system is not paused
if state.exit_event.is_set():
break
try:
img = vid_service.getImageRemote(vid_stream_name)
#Possibly limit images sent if queuing issues arise

View File

@@ -84,6 +84,7 @@ def main():
context = zmq.Context()
state.initialize()
state.active_event.set()
try:
main_loop(context)

View File

@@ -30,6 +30,7 @@ class State(object):
self.exit_event = None
self.sockets = []
self.qi_session = None
self.active_event = None
def initialize(self):
"""
@@ -48,6 +49,8 @@ class State(object):
signal.signal(signal.SIGINT, handle_exit)
signal.signal(signal.SIGTERM, handle_exit)
self.active_event = threading.Event()
self.qi_session = get_qi_session()
self.is_initialized = True