feat: Implement multi-voice support and management
Refactor the TTS handling to support multiple, user-selectable voices. This replaces the previous single-voice system. Key changes: - Introduce VoiceManager to handle loading and managing voices from a dedicated oices/ directory. - Add slash commands (/voice list, /set, /current, /refresh) for users to manage their personal TTS voice. - Implement on-demand voice loading to improve startup time and memory usage. - Remove the old ts_handler.py and single voice .wav files in favor of the new system. - Update configuration to specify a voices directory instead of a single file path.
This commit is contained in:
190
voice_manager.py
Normal file
190
voice_manager.py
Normal file
@@ -0,0 +1,190 @@
|
||||
"""Voice management for per-user voice selection and on-demand loading."""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from pocket_tts import TTSModel
|
||||
|
||||
from audio_preprocessor import (
|
||||
AudioPreprocessor,
|
||||
PreprocessingConfig,
|
||||
print_audio_analysis,
|
||||
)
|
||||
|
||||
|
||||
class VoiceManager:
|
||||
"""Manages available voices, per-user preferences, and on-demand voice loading."""
|
||||
|
||||
def __init__(self, voices_dir: str, default_voice: str | None = None):
|
||||
self.voices_dir = Path(voices_dir)
|
||||
self.default_voice = default_voice
|
||||
self.model: TTSModel | None = None
|
||||
self.preferences_file = self.voices_dir / "preferences.json"
|
||||
|
||||
# Cache of loaded voice states: voice_name -> voice_state
|
||||
self._voice_states: dict[str, Any] = {}
|
||||
# Per-user voice preferences: user_id -> voice_name
|
||||
self._user_voices: dict[int, str] = {}
|
||||
# Available voices: voice_name -> file_path
|
||||
self._available_voices: dict[str, Path] = {}
|
||||
|
||||
def discover_voices(self) -> dict[str, Path]:
|
||||
"""Discover all available voice WAV files in the voices directory."""
|
||||
old_voices = set(self._available_voices.keys())
|
||||
self._available_voices = {}
|
||||
|
||||
if not self.voices_dir.exists():
|
||||
print(f"Voices directory not found: {self.voices_dir}")
|
||||
return self._available_voices
|
||||
|
||||
for wav_file in self.voices_dir.glob("*.wav"):
|
||||
voice_name = wav_file.stem.lower()
|
||||
self._available_voices[voice_name] = wav_file
|
||||
print(f" Found voice: {voice_name} ({wav_file.name})")
|
||||
|
||||
# Set default voice if not specified
|
||||
if self.default_voice is None and self._available_voices:
|
||||
self.default_voice = next(iter(self._available_voices.keys()))
|
||||
|
||||
# Load saved preferences
|
||||
self._load_preferences()
|
||||
|
||||
return self._available_voices
|
||||
|
||||
def refresh_voices(self) -> tuple[list[str], list[str]]:
|
||||
"""Re-scan for voices and return (new_voices, removed_voices)."""
|
||||
old_voices = set(self._available_voices.keys())
|
||||
|
||||
self._available_voices = {}
|
||||
if self.voices_dir.exists():
|
||||
for wav_file in self.voices_dir.glob("*.wav"):
|
||||
voice_name = wav_file.stem.lower()
|
||||
self._available_voices[voice_name] = wav_file
|
||||
|
||||
new_voices = set(self._available_voices.keys())
|
||||
added = sorted(new_voices - old_voices)
|
||||
removed = sorted(old_voices - new_voices)
|
||||
|
||||
# Update default if needed
|
||||
if self.default_voice not in self._available_voices and self._available_voices:
|
||||
self.default_voice = next(iter(self._available_voices.keys()))
|
||||
|
||||
return added, removed
|
||||
|
||||
def load_model(self) -> None:
|
||||
"""Load the TTS model (does not load any voices yet)."""
|
||||
print("Loading Pocket TTS model...")
|
||||
self.model = TTSModel.load_model()
|
||||
print("TTS model loaded!")
|
||||
|
||||
def get_available_voices(self) -> list[str]:
|
||||
"""Get list of available voice names."""
|
||||
return sorted(self._available_voices.keys())
|
||||
|
||||
def is_voice_available(self, voice_name: str) -> bool:
|
||||
"""Check if a voice is available."""
|
||||
return voice_name.lower() in self._available_voices
|
||||
|
||||
def get_voice_state(self, voice_name: str) -> Any:
|
||||
"""Get or load a voice state on-demand."""
|
||||
if self.model is None:
|
||||
raise RuntimeError("Model not loaded. Call load_model() first.")
|
||||
|
||||
voice_name = voice_name.lower()
|
||||
|
||||
if voice_name not in self._available_voices:
|
||||
raise ValueError(f"Voice '{voice_name}' not found")
|
||||
|
||||
# Return cached state if already loaded
|
||||
if voice_name in self._voice_states:
|
||||
return self._voice_states[voice_name]
|
||||
|
||||
# Load the voice on-demand
|
||||
voice_path = self._available_voices[voice_name]
|
||||
print(f"Loading voice '{voice_name}' from {voice_path}...")
|
||||
|
||||
# Preprocess the audio
|
||||
print(f" Analyzing audio...")
|
||||
print_audio_analysis(str(voice_path))
|
||||
|
||||
print(f" Preprocessing audio...")
|
||||
config = PreprocessingConfig(
|
||||
target_sample_rate=22050,
|
||||
normalize=True,
|
||||
trim_silence=True,
|
||||
trim_top_db=20,
|
||||
reduce_noise=True,
|
||||
target_length_seconds=15.0,
|
||||
)
|
||||
preprocessor = AudioPreprocessor(config)
|
||||
processed_path = preprocessor.preprocess_file(str(voice_path))
|
||||
|
||||
# Load voice state
|
||||
voice_state = self.model.get_state_for_audio_prompt(processed_path)
|
||||
self._voice_states[voice_name] = voice_state
|
||||
print(f" Voice '{voice_name}' loaded and cached!")
|
||||
|
||||
return voice_state
|
||||
|
||||
def is_voice_loaded(self, voice_name: str) -> bool:
|
||||
"""Check if a voice is already loaded in cache."""
|
||||
return voice_name.lower() in self._voice_states
|
||||
|
||||
def get_user_voice(self, user_id: int) -> str:
|
||||
"""Get the voice preference for a user, or default voice."""
|
||||
return self._user_voices.get(user_id, self.default_voice or "")
|
||||
|
||||
def set_user_voice(self, user_id: int, voice_name: str) -> None:
|
||||
"""Set the voice preference for a user."""
|
||||
voice_name = voice_name.lower()
|
||||
if voice_name not in self._available_voices:
|
||||
raise ValueError(f"Voice '{voice_name}' not found")
|
||||
self._user_voices[user_id] = voice_name
|
||||
self._save_preferences()
|
||||
|
||||
def get_user_voice_state(self, user_id: int) -> Any:
|
||||
"""Get the voice state for a user (loads on-demand if needed)."""
|
||||
voice_name = self.get_user_voice(user_id)
|
||||
if not voice_name:
|
||||
raise RuntimeError("No default voice available")
|
||||
return self.get_voice_state(voice_name)
|
||||
|
||||
def get_loaded_voices(self) -> list[str]:
|
||||
"""Get list of currently loaded voice names."""
|
||||
return list(self._voice_states.keys())
|
||||
|
||||
def _load_preferences(self) -> None:
|
||||
"""Load user voice preferences from JSON file."""
|
||||
if not self.preferences_file.exists():
|
||||
return
|
||||
|
||||
try:
|
||||
with open(self.preferences_file, "r") as f:
|
||||
data = json.load(f)
|
||||
|
||||
# Load user preferences (convert string keys back to int)
|
||||
for user_id_str, voice_name in data.get("user_voices", {}).items():
|
||||
user_id = int(user_id_str)
|
||||
# Only load if voice still exists
|
||||
if voice_name.lower() in self._available_voices:
|
||||
self._user_voices[user_id] = voice_name.lower()
|
||||
|
||||
print(f" Loaded {len(self._user_voices)} user voice preferences")
|
||||
except Exception as e:
|
||||
print(f" Warning: Failed to load preferences: {e}")
|
||||
|
||||
def _save_preferences(self) -> None:
|
||||
"""Save user voice preferences to JSON file."""
|
||||
try:
|
||||
# Ensure directory exists
|
||||
self.preferences_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
data = {
|
||||
"user_voices": {str(k): v for k, v in self._user_voices.items()}
|
||||
}
|
||||
|
||||
with open(self.preferences_file, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to save preferences: {e}")
|
||||
Reference in New Issue
Block a user