test: complete VAD unit and integration tests

Including an integration test with real voice audio.

ref: N25B-213
This commit is contained in:
Twirre Meulenbelt
2025-10-23 21:17:41 +02:00
parent ca5e59d029
commit d47074d091
10 changed files with 312 additions and 22 deletions

View File

@@ -14,18 +14,28 @@ logger = logging.getLogger(__name__)
class SocketPoller[T]:
def __init__(self, socket: azmq.Socket):
"""
Convenience class for polling a socket for data with a timeout, persisting a zmq.Poller for
multiple usages.
"""
def __init__(self, socket: azmq.Socket, timeout_ms: int = 100):
"""
:param socket: The socket to poll and get data from.
:param timeout_ms: A timeout in milliseconds to wait for data.
"""
self.socket = socket
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.timeout_ms = timeout_ms
async def poll(self, timeout_ms: int) -> T | None:
async def poll(self, timeout_ms: int | None = None) -> T | None:
"""
Get data from the socket, or None if the timeout is reached.
:param timeout_ms: The number of milliseconds to wait for the socket.
:param timeout_ms: If given, the timeout. Otherwise, `self.timeout_ms` is used.
:return: Data from the socket or None.
"""
timeout_ms = timeout_ms or self.timeout_ms
socks = dict(self.poller.poll(timeout_ms))
if socks.get(self.socket) == zmq.POLLIN:
return await self.socket.recv()
@@ -41,17 +51,16 @@ class Streaming(CyclicBehaviour):
force_reload=False)
self.audio_out_socket = audio_out_socket
self.audio_buffer = np.array([], dtype=np.float32) # TODO: Consider using a Tensor
self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_data = 0 # Used to avoid logging every cycle if audio input stops
self.i_since_speech = 0 # Used to allow small pauses in speech
self.i_since_speech = 100 # Used to allow small pauses in speech
async def run(self) -> None:
timeout_ms = 100
data = await self.audio_in_poller.poll(timeout_ms)
data = await self.audio_in_poller.poll()
if data is None:
if self.i_since_data % 10 == 0:
logger.debug("Failed to receive audio from socket for %d ms.",
timeout_ms*self.i_since_data)
self.audio_in_poller.timeout_ms*(self.i_since_data+1))
self.i_since_data += 1
return
self.i_since_data = 0
@@ -75,7 +84,7 @@ class Streaming(CyclicBehaviour):
# Speech probably ended. Make sure we have a usable amount of data.
if len(self.audio_buffer) >= 3*len(chunk):
logger.debug("Speech ended.")
await self.audio_out_socket.send(self.audio_buffer.tobytes())
await self.audio_out_socket.send(self.audio_buffer[:-2*len(chunk)].tobytes())
# At this point, we know that the speech has ended.
# Prepend the last chunk that had no speech, for a more fluent boundary
@@ -101,10 +110,12 @@ class VADAgent(Agent):
"""
Stop listening to audio, stop publishing audio, close sockets.
"""
self.audio_in_socket.close()
self.audio_in_socket = None
self.audio_out_socket.close()
self.audio_out_socket = None
if self.audio_in_socket is not None:
self.audio_in_socket.close()
self.audio_in_socket = None
if self.audio_out_socket is not None:
self.audio_out_socket.close()
self.audio_out_socket = None
return await super().stop()
def _connect_audio_in_socket(self):