Compare commits

...

1 Commits

Author SHA1 Message Date
4c40347b6f feat: Qwen3 ASR
Uses env variable to determine which type of ASR to use (streaming or
full chunks).

ref: N25B-467
2026-01-30 12:04:25 +01:00
6 changed files with 3051 additions and 839 deletions

View File

@@ -3,55 +3,56 @@ name = "pepperplus-cb"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
requires-python = ">=3.13,<3.14"
dependencies = [
"agentspeak>=0.2.2",
"colorlog>=6.10.1",
"fastapi[all]>=0.115.6",
"mlx-whisper>=0.4.3 ; sys_platform == 'darwin'",
"numpy>=2.3.3",
"openai-whisper>=20250625",
"pyaudio>=0.2.14",
"pydantic>=2.12.0",
"pydantic-settings>=2.11.0",
"python-json-logger>=4.0.0",
"python-slugify>=8.0.4",
"pyyaml>=6.0.3",
"pyzmq>=27.1.0",
"silero-vad>=6.0.0",
"sphinx>=7.3.7",
"sphinx-rtd-theme>=3.0.2",
"torch>=2.8.0",
"uvicorn>=0.37.0",
"agentspeak",
"colorlog",
"fastapi[all]",
"mlx-whisper ; sys_platform == 'darwin'",
"numpy",
"openai-whisper",
"pyaudio",
"pydantic",
"pydantic-settings",
"python-json-logger",
"python-slugify",
"pyyaml",
"pyzmq",
"qwen-asr[vllm]",
"silero-vad",
"sphinx",
"sphinx-rtd-theme",
"torch",
"uvicorn",
]
[dependency-groups]
dev = [
"pre-commit>=4.3.0",
"pytest>=8.4.2",
"pytest-asyncio>=1.2.0",
"pytest-cov>=7.0.0",
"pytest-mock>=3.15.1",
"soundfile>=0.13.1",
"ruff>=0.14.2",
"ruff-format>=0.3.0",
"pre-commit",
"pytest",
"pytest-asyncio",
"pytest-cov",
"pytest-mock",
"soundfile",
"ruff",
"ruff-format",
]
test = [
"agentspeak>=0.2.2",
"fastapi>=0.115.6",
"httpx>=0.28.1",
"mlx-whisper>=0.4.3 ; sys_platform == 'darwin'",
"openai-whisper>=20250625",
"pydantic>=2.12.0",
"pydantic-settings>=2.11.0",
"pytest>=8.4.2",
"pytest-asyncio>=1.2.0",
"pytest-cov>=7.0.0",
"pytest-mock>=3.15.1",
"python-slugify>=8.0.4",
"pyyaml>=6.0.3",
"pyzmq>=27.1.0",
"soundfile>=0.13.1",
"agentspeak",
"fastapi",
"httpx",
"mlx-whisper ; sys_platform == 'darwin'",
"openai-whisper",
"pydantic",
"pydantic-settings",
"pytest",
"pytest-asyncio",
"pytest-cov",
"pytest-mock",
"python-slugify",
"pyyaml",
"pyzmq",
"soundfile",
]
[tool.pytest.ini_options]

View File

@@ -1,4 +1,5 @@
import abc
import logging
import sys
if sys.platform == "darwin":
@@ -9,9 +10,12 @@ if sys.platform == "darwin":
import numpy as np
import torch
import whisper
from qwen_asr import Qwen3ASRModel
from control_backend.core.config import settings
logger = logging.getLogger(__name__)
class SpeechRecognizer(abc.ABC):
"""
@@ -29,6 +33,7 @@ class SpeechRecognizer(abc.ABC):
limited by the length of the input audio and some heuristics.
"""
self.limit_output_length = limit_output_length
self.supports_streaming = False
@abc.abstractmethod
def load_model(self):
@@ -47,6 +52,24 @@ class SpeechRecognizer(abc.ABC):
:return: The recognized speech text.
"""
def begin_stream(self):
"""
Initialize a streaming recognition state.
"""
raise NotImplementedError
def stream_chunk(self, audio: np.ndarray, state) -> None:
"""
Stream an audio chunk into the recognizer state.
"""
raise NotImplementedError
def end_stream(self, state) -> str:
"""
Finalize a streaming recognition state and return the transcript.
"""
raise NotImplementedError
@staticmethod
def _estimate_max_tokens(audio: np.ndarray) -> int:
"""
@@ -84,11 +107,13 @@ class SpeechRecognizer(abc.ABC):
:return: An instance of :class:`MLXWhisperSpeechRecognizer` if on macOS with Apple Silicon,
otherwise :class:`OpenAIWhisperSpeechRecognizer`.
"""
if settings.behaviour_settings.transcription_streaming_enabled:
logger.info("Using Qwen3-ASR streaming backend (vLLM).")
return Qwen3ASRStreamingRecognizer()
if torch.mps.is_available():
print("Choosing MLX Whisper model.")
logger.info("Using MLX Whisper backend.")
return MLXWhisperSpeechRecognizer()
else:
print("Choosing reference Whisper model.")
logger.info("Using OpenAI Whisper backend.")
return OpenAIWhisperSpeechRecognizer()
@@ -148,3 +173,54 @@ class OpenAIWhisperSpeechRecognizer(SpeechRecognizer):
return whisper.transcribe(self.model, audio, **self._get_decode_options(audio))[
"text"
].strip()
class Qwen3ASRStreamingRecognizer(SpeechRecognizer):
"""
Speech recognizer using Qwen3-ASR with the vLLM streaming backend.
"""
def __init__(self):
super().__init__(limit_output_length=False)
self._supports_streaming = True
self._model = None
self._model_name = settings.speech_model_settings.qwen_asr_model_name
self._gpu_memory_utilization = (
settings.speech_model_settings.qwen_asr_gpu_memory_utilization
)
self._max_new_tokens = settings.speech_model_settings.qwen_asr_max_new_tokens
self._unfixed_chunk_num = settings.speech_model_settings.qwen_asr_unfixed_chunk_num
self._unfixed_token_num = settings.speech_model_settings.qwen_asr_unfixed_token_num
self._chunk_size_sec = settings.speech_model_settings.qwen_asr_chunk_size_sec
self._max_model_len = settings.speech_model_settings.qwen_asr_max_model_len
def load_model(self):
if self._model is not None:
return
self._model = Qwen3ASRModel.LLM(
model=self._model_name,
gpu_memory_utilization=self._gpu_memory_utilization,
max_new_tokens=self._max_new_tokens,
max_model_len=self._max_model_len,
)
def recognize_speech(self, audio: np.ndarray) -> str:
raise RuntimeError("Qwen3-ASR streaming recognizer does not support batch inference.")
def begin_stream(self):
self.load_model()
assert self._model is not None
return self._model.init_streaming_state(
unfixed_chunk_num=self._unfixed_chunk_num,
unfixed_token_num=self._unfixed_token_num,
chunk_size_sec=self._chunk_size_sec,
)
def stream_chunk(self, audio: np.ndarray, state) -> None:
assert self._model is not None
self._model.streaming_transcribe(audio, state)
def end_stream(self, state) -> str:
assert self._model is not None
self._model.finish_streaming_transcribe(state)
return state.text.strip()

View File

@@ -45,6 +45,8 @@ class TranscriptionAgent(BaseAgent):
self.speech_recognizer = None
self._concurrency = None
self._current_speech_reference: str | None = None
self._stream_state = None
self._streaming_enabled = settings.behaviour_settings.transcription_streaming_enabled
async def setup(self):
"""
@@ -59,10 +61,16 @@ class TranscriptionAgent(BaseAgent):
self._connect_audio_in_socket()
# Initialize recognizer and semaphore
max_concurrent_tasks = settings.behaviour_settings.transcription_max_concurrent_tasks
self._concurrency = asyncio.Semaphore(max_concurrent_tasks)
self.speech_recognizer = SpeechRecognizer.best_type()
self.speech_recognizer.load_model() # Warmup
if self._streaming_enabled and self.speech_recognizer._supports_streaming:
self.logger.info("Transcription streaming enabled.")
self._concurrency = asyncio.Semaphore(1)
else:
if not self._streaming_enabled:
self.logger.info("Transcription streaming disabled; using full-utterance mode.")
max_concurrent_tasks = settings.behaviour_settings.transcription_max_concurrent_tasks
self._concurrency = asyncio.Semaphore(max_concurrent_tasks)
# Start background loop
self.add_behavior(self._transcribing_loop())
@@ -71,7 +79,26 @@ class TranscriptionAgent(BaseAgent):
async def handle_message(self, msg: InternalMessage):
if msg.thread == "voice_activity":
if (
self._streaming_enabled
and self.speech_recognizer
and self.speech_recognizer._supports_streaming
):
await self._finalize_stream()
self._current_speech_reference = msg.body
if (
self._streaming_enabled
and self.speech_recognizer
and self.speech_recognizer._supports_streaming
):
await self._start_stream()
elif msg.thread == "voice_activity_end":
if (
self._streaming_enabled
and self.speech_recognizer
and self.speech_recognizer._supports_streaming
):
await self._finalize_stream()
async def stop(self):
"""
@@ -104,6 +131,31 @@ class TranscriptionAgent(BaseAgent):
async with self._concurrency:
return await asyncio.to_thread(self.speech_recognizer.recognize_speech, audio)
async def _start_stream(self):
assert self._concurrency is not None and self.speech_recognizer is not None
async with self._concurrency:
self._stream_state = await asyncio.to_thread(self.speech_recognizer.begin_stream)
async def _stream_chunk(self, audio: np.ndarray):
assert self._concurrency is not None and self.speech_recognizer is not None
if self._stream_state is None:
await self._start_stream()
assert self._stream_state is not None
async with self._concurrency:
await asyncio.to_thread(self.speech_recognizer.stream_chunk, audio, self._stream_state)
async def _finalize_stream(self):
if self._stream_state is None or self.speech_recognizer is None:
return
async with self._concurrency:
transcription = await asyncio.to_thread(
self.speech_recognizer.end_stream, self._stream_state
)
self._stream_state = None
if transcription:
await self._share_transcription(transcription)
self._current_speech_reference = None
async def _share_transcription(self, transcription: str):
"""
Share a transcription to the other agents that depend on it, and to experiment logs.
@@ -130,14 +182,21 @@ class TranscriptionAgent(BaseAgent):
"""
The main loop for receiving audio and triggering transcription.
Receives audio chunks from ZMQ, decodes them to float32, and calls :meth:`_transcribe`.
If speech is found, it calls :meth:`_share_transcription`.
Receives audio chunks from ZMQ, decodes them to float32, and either streams them into the
recognizer or runs a full transcription depending on backend support.
"""
while self._running:
try:
assert self.audio_in_socket is not None
audio_data = await self.audio_in_socket.recv()
audio = np.frombuffer(audio_data, dtype=np.float32)
audio = np.frombuffer(audio_data, dtype=np.float32).copy()
if (
self._streaming_enabled
and self.speech_recognizer
and self.speech_recognizer._supports_streaming
):
await self._stream_chunk(audio)
else:
speech = await self._transcribe(audio)
if not speech:
self.logger.debug("Nothing transcribed.")

View File

@@ -1,6 +1,7 @@
import asyncio
import logging
import uuid
from collections import deque
import numpy as np
import torch
@@ -60,8 +61,8 @@ class VADAgent(BaseAgent):
This agent:
1. Receives an audio stream (via ZMQ).
2. Processes the audio using the Silero VAD model to detect speech.
3. Buffers potential speech segments.
4. Publishes valid speech fragments (containing speech plus small buffer) to a ZMQ PUB socket.
3. Streams speech chunks to a ZMQ PUB socket while speech is active.
4. Sends an end-of-utterance control message when speech ends.
5. Instantiates and starts agents (like :class:`TranscriptionAgent`) that use this output.
:ivar audio_in_address: Address of the input audio stream.
@@ -91,6 +92,9 @@ class VADAgent(BaseAgent):
self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
self._ready = asyncio.Event()
self._in_speech = False
self._speech_reference: str | None = None
self._pre_speech_buffer = deque(maxlen=settings.behaviour_settings.vad_begin_silence_chunks)
# Pause control
self._reset_needed = False
@@ -184,6 +188,11 @@ class VADAgent(BaseAgent):
self.audio_out_socket = None
return None
async def _publish_audio_chunk(self, chunk: np.ndarray) -> None:
if self.audio_out_socket is None:
return
await self.audio_out_socket.send(chunk.tobytes())
async def _reset_stream(self):
"""
Clears the ZeroMQ queue and sets ready state.
@@ -193,6 +202,10 @@ class VADAgent(BaseAgent):
while await self.audio_in_poller.poll(1) is not None:
discarded += 1
self.logger.info(f"Discarded {discarded} audio packets before starting.")
self.audio_buffer = np.array([], dtype=np.float32)
self._pre_speech_buffer.clear()
self._in_speech = False
self._speech_reference = None
self._ready.set()
async def _status_loop(self):
@@ -219,8 +232,8 @@ class VADAgent(BaseAgent):
1. Polls for new audio chunks.
2. Passes chunk to VAD model.
3. Manages `i_since_speech` counter to determine start/end of speech.
4. Buffers speech + context.
5. Sends complete speech segment to output socket when silence is detected.
4. Streams chunks to output while speech is active.
5. Sends an end-of-utterance signal when silence is detected.
"""
await self._ready.wait()
while self._running:
@@ -230,13 +243,13 @@ class VADAgent(BaseAgent):
if self._reset_needed:
self.logger.debug("Resuming: resetting stream and buffers.")
await self._reset_stream()
self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
self._reset_needed = False
assert self.audio_in_poller is not None
data = await self.audio_in_poller.poll()
if data is None:
if not settings.behaviour_settings.transcription_streaming_enabled:
if len(self.audio_buffer) > 0:
self.logger.debug(
"No audio data received. Discarding buffer until new data arrives."
@@ -244,15 +257,30 @@ class VADAgent(BaseAgent):
self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
continue
if self._in_speech:
self.logger.debug("No audio data received. Ending current speech stream.")
await self.send(
InternalMessage(
to=settings.agent_settings.transcription_name,
body=self._speech_reference or "",
thread="voice_activity_end",
)
)
self._in_speech = False
self._speech_reference = None
self._pre_speech_buffer.clear()
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
continue
# copy otherwise Torch will be sad that it's immutable
chunk = np.frombuffer(data, dtype=np.float32).copy()
assert self.model is not None
prob = self.model(torch.from_numpy(chunk), settings.vad_settings.sample_rate_hz).item()
non_speech_patience = settings.behaviour_settings.vad_non_speech_patience_chunks
begin_silence_length = settings.behaviour_settings.vad_begin_silence_chunks
prob_threshold = settings.behaviour_settings.vad_prob_threshold
if not settings.behaviour_settings.transcription_streaming_enabled:
begin_silence_length = settings.behaviour_settings.vad_begin_silence_chunks
if prob > prob_threshold:
if self.i_since_speech > non_speech_patience + begin_silence_length:
self.logger.debug("Speech started.")
@@ -289,6 +317,54 @@ class VADAgent(BaseAgent):
# Prepend the last few chunks that had no speech, for a more fluent boundary.
self.audio_buffer = np.append(self.audio_buffer, chunk)
self.audio_buffer = self.audio_buffer[-begin_silence_length * len(chunk) :]
continue
if prob > prob_threshold:
if not self._in_speech:
self.logger.debug("Speech started.")
reference = str(uuid.uuid4())
self._speech_reference = reference
self._in_speech = True
experiment_logger.chat(
"...",
extra={"role": "user", "reference": reference, "partial": True},
)
await self.send(
InternalMessage(
to=settings.agent_settings.transcription_name,
body=reference,
thread="voice_activity",
)
)
for buffered in self._pre_speech_buffer:
await self._publish_audio_chunk(buffered)
self._pre_speech_buffer.clear()
self.i_since_speech = 0
await self._publish_audio_chunk(chunk)
continue
if self._in_speech:
self.i_since_speech += 1
if self.i_since_speech <= non_speech_patience:
await self._publish_audio_chunk(chunk)
continue
self.logger.debug("Speech ended.")
await self.send(
InternalMessage(
to=settings.agent_settings.transcription_name,
body=self._speech_reference or "",
thread="voice_activity_end",
)
)
self._speech_reference = None
self._in_speech = False
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
self._pre_speech_buffer.clear()
self._pre_speech_buffer.append(chunk)
continue
self._pre_speech_buffer.append(chunk)
async def handle_message(self, msg: InternalMessage):
"""

View File

@@ -73,6 +73,7 @@ class BehaviourSettings(BaseModel):
:ivar vad_non_speech_patience_chunks: Number of non-speech chunks to wait before speech ended.
:ivar vad_begin_silence_chunks: The number of chunks of silence to prepend to speech chunks.
:ivar transcription_max_concurrent_tasks: Maximum number of concurrent transcription tasks.
:ivar transcription_streaming_enabled: Whether to stream audio chunks to the recognizer.
:ivar transcription_words_per_minute: Estimated words per minute for transcription timing.
:ivar transcription_words_per_token: Estimated words per token for transcription timing.
:ivar transcription_token_buffer: Buffer for transcription tokens.
@@ -95,6 +96,7 @@ class BehaviourSettings(BaseModel):
# transcription behaviour
transcription_max_concurrent_tasks: int = 3
transcription_streaming_enabled: bool = True
transcription_words_per_minute: int = 300
transcription_words_per_token: float = 0.75 # (3 words = 4 tokens)
transcription_token_buffer: int = 10
@@ -151,6 +153,12 @@ class SpeechModelSettings(BaseModel):
:ivar mlx_model_name: Model name for MLX-based speech recognition.
:ivar openai_model_name: Model name for OpenAI-based speech recognition.
:ivar qwen_asr_model_name: Model name for Qwen3-ASR (vLLM backend).
:ivar qwen_asr_gpu_memory_utilization: GPU memory fraction reserved for vLLM.
:ivar qwen_asr_max_new_tokens: Max new tokens for streaming decode.
:ivar qwen_asr_unfixed_chunk_num: Unfixed chunk count for streaming state.
:ivar qwen_asr_unfixed_token_num: Unfixed token count for streaming state.
:ivar qwen_asr_chunk_size_sec: Streaming chunk size in seconds for ASR state.
"""
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
@@ -158,6 +166,13 @@ class SpeechModelSettings(BaseModel):
# model identifiers for speech recognition
mlx_model_name: str = "mlx-community/whisper-small.en-mlx"
openai_model_name: str = "small.en"
qwen_asr_model_name: str = "Qwen/Qwen3-ASR-1.7B"
qwen_asr_gpu_memory_utilization: float = 0.8
qwen_asr_max_new_tokens: int = 32
qwen_asr_unfixed_chunk_num: int = 2
qwen_asr_unfixed_token_num: int = 5
qwen_asr_chunk_size_sec: float = 2.0
qwen_asr_max_model_len: int = 4096
class LoggingSettings(BaseModel):

3497
uv.lock generated

File diff suppressed because it is too large Load Diff