"""Voice management for per-user voice selection and on-demand loading.""" import json from pathlib import Path from typing import Any from pocket_tts import TTSModel from audio_preprocessor import ( AudioPreprocessor, PreprocessingConfig, print_audio_analysis, ) class VoiceManager: """Manages available voices, per-user preferences, and on-demand voice loading.""" def __init__(self, voices_dir: str, default_voice: str | None = None): self.voices_dir = Path(voices_dir) self.default_voice = default_voice self.model: TTSModel | None = None self.preferences_file = self.voices_dir / "preferences.json" # Cache of loaded voice states: voice_name -> voice_state self._voice_states: dict[str, Any] = {} # Per-user voice preferences: user_id -> voice_name self._user_voices: dict[int, str] = {} # Available voices: voice_name -> file_path self._available_voices: dict[str, Path] = {} def discover_voices(self) -> dict[str, Path]: """Discover all available voice WAV files in the voices directory.""" old_voices = set(self._available_voices.keys()) self._available_voices = {} if not self.voices_dir.exists(): print(f"Voices directory not found: {self.voices_dir}") return self._available_voices for wav_file in self.voices_dir.glob("*.wav"): voice_name = wav_file.stem.lower() self._available_voices[voice_name] = wav_file print(f" Found voice: {voice_name} ({wav_file.name})") # Set default voice if not specified if self.default_voice is None and self._available_voices: self.default_voice = next(iter(self._available_voices.keys())) # Load saved preferences self._load_preferences() return self._available_voices def refresh_voices(self) -> tuple[list[str], list[str]]: """Re-scan for voices and return (new_voices, removed_voices).""" old_voices = set(self._available_voices.keys()) self._available_voices = {} if self.voices_dir.exists(): for wav_file in self.voices_dir.glob("*.wav"): voice_name = wav_file.stem.lower() self._available_voices[voice_name] = wav_file new_voices = set(self._available_voices.keys()) added = sorted(new_voices - old_voices) removed = sorted(old_voices - new_voices) # Update default if needed if self.default_voice not in self._available_voices and self._available_voices: self.default_voice = next(iter(self._available_voices.keys())) return added, removed def load_model(self) -> None: """Load the TTS model (does not load any voices yet).""" print("Loading Pocket TTS model...") self.model = TTSModel.load_model() print("TTS model loaded!") def get_available_voices(self) -> list[str]: """Get list of available voice names.""" return sorted(self._available_voices.keys()) def is_voice_available(self, voice_name: str) -> bool: """Check if a voice is available.""" return voice_name.lower() in self._available_voices def get_voice_state(self, voice_name: str) -> Any: """Get or load a voice state on-demand.""" if self.model is None: raise RuntimeError("Model not loaded. Call load_model() first.") voice_name = voice_name.lower() if voice_name not in self._available_voices: raise ValueError(f"Voice '{voice_name}' not found") # Return cached state if already loaded if voice_name in self._voice_states: return self._voice_states[voice_name] # Load the voice on-demand voice_path = self._available_voices[voice_name] print(f"Loading voice '{voice_name}' from {voice_path}...") # Preprocess the audio print(f" Analyzing audio...") print_audio_analysis(str(voice_path)) print(f" Preprocessing audio...") config = PreprocessingConfig( target_sample_rate=22050, normalize=True, trim_silence=True, trim_top_db=20, reduce_noise=True, target_length_seconds=15.0, ) preprocessor = AudioPreprocessor(config) processed_path = preprocessor.preprocess_file(str(voice_path)) # Load voice state voice_state = self.model.get_state_for_audio_prompt(processed_path) self._voice_states[voice_name] = voice_state print(f" Voice '{voice_name}' loaded and cached!") return voice_state def is_voice_loaded(self, voice_name: str) -> bool: """Check if a voice is already loaded in cache.""" return voice_name.lower() in self._voice_states def get_user_voice(self, user_id: int) -> str: """Get the voice preference for a user, or default voice.""" return self._user_voices.get(user_id, self.default_voice or "") def set_user_voice(self, user_id: int, voice_name: str) -> None: """Set the voice preference for a user.""" voice_name = voice_name.lower() if voice_name not in self._available_voices: raise ValueError(f"Voice '{voice_name}' not found") self._user_voices[user_id] = voice_name self._save_preferences() def get_user_voice_state(self, user_id: int) -> Any: """Get the voice state for a user (loads on-demand if needed).""" voice_name = self.get_user_voice(user_id) if not voice_name: raise RuntimeError("No default voice available") return self.get_voice_state(voice_name) def get_loaded_voices(self) -> list[str]: """Get list of currently loaded voice names.""" return list(self._voice_states.keys()) def _load_preferences(self) -> None: """Load user voice preferences from JSON file.""" if not self.preferences_file.exists(): return try: with open(self.preferences_file, "r") as f: data = json.load(f) # Load user preferences (convert string keys back to int) for user_id_str, voice_name in data.get("user_voices", {}).items(): user_id = int(user_id_str) # Only load if voice still exists if voice_name.lower() in self._available_voices: self._user_voices[user_id] = voice_name.lower() print(f" Loaded {len(self._user_voices)} user voice preferences") except Exception as e: print(f" Warning: Failed to load preferences: {e}") def _save_preferences(self) -> None: """Save user voice preferences to JSON file.""" try: # Ensure directory exists self.preferences_file.parent.mkdir(parents=True, exist_ok=True) data = { "user_voices": {str(k): v for k, v in self._user_voices.items()} } with open(self.preferences_file, "w") as f: json.dump(data, f, indent=2) except Exception as e: print(f"Warning: Failed to save preferences: {e}")