"""Voice management for per-user voice selection and on-demand loading.""" import json from pathlib import Path from typing import Any from pocket_tts import TTSModel from audio_effects import AudioEffects from audio_preprocessor import ( AudioPreprocessor, PreprocessingConfig, print_audio_analysis, ) class VoiceManager: """Manages available voices, per-user preferences, and on-demand voice loading.""" def __init__(self, voices_dir: str, default_voice: str | None = None): self.voices_dir = Path(voices_dir) self.default_voice = default_voice self.model: TTSModel | None = None self.preferences_file = self.voices_dir / "preferences.json" # Cache of loaded voice states: voice_name -> voice_state self._voice_states: dict[str, Any] = {} # Per-user voice preferences: user_id -> voice_name self._user_voices: dict[int, str] = {} # Per-user audio effects: user_id -> {"pitch": int, "speed": float} self._user_effects: dict[int, dict[str, Any]] = {} # Available voices: voice_name -> file_path self._available_voices: dict[str, Path] = {} def discover_voices(self) -> dict[str, Path]: """Discover all available voice WAV files in the voices directory.""" old_voices = set(self._available_voices.keys()) self._available_voices = {} if not self.voices_dir.exists(): print(f"Voices directory not found: {self.voices_dir}") return self._available_voices for wav_file in self.voices_dir.glob("*.wav"): voice_name = wav_file.stem.lower() self._available_voices[voice_name] = wav_file print(f" Found voice: {voice_name} ({wav_file.name})") # Set default voice if not specified if self.default_voice is None and self._available_voices: self.default_voice = next(iter(self._available_voices.keys())) # Load saved preferences self._load_preferences() return self._available_voices def refresh_voices(self) -> tuple[list[str], list[str]]: """Re-scan for voices and return (new_voices, removed_voices).""" old_voices = set(self._available_voices.keys()) self._available_voices = {} if self.voices_dir.exists(): for wav_file in self.voices_dir.glob("*.wav"): voice_name = wav_file.stem.lower() self._available_voices[voice_name] = wav_file new_voices = set(self._available_voices.keys()) added = sorted(new_voices - old_voices) removed = sorted(old_voices - new_voices) # Update default if needed if self.default_voice not in self._available_voices and self._available_voices: self.default_voice = next(iter(self._available_voices.keys())) return added, removed def load_model(self) -> None: """Load the TTS model (does not load any voices yet).""" print("Loading Pocket TTS model...") self.model = TTSModel.load_model() print("TTS model loaded!") def get_available_voices(self) -> list[str]: """Get list of available voice names.""" return sorted(self._available_voices.keys()) def is_voice_available(self, voice_name: str) -> bool: """Check if a voice is available.""" return voice_name.lower() in self._available_voices def get_voice_state(self, voice_name: str) -> Any: """Get or load a voice state on-demand.""" if self.model is None: raise RuntimeError("Model not loaded. Call load_model() first.") voice_name = voice_name.lower() if voice_name not in self._available_voices: raise ValueError(f"Voice '{voice_name}' not found") # Return cached state if already loaded if voice_name in self._voice_states: return self._voice_states[voice_name] # Load the voice on-demand voice_path = self._available_voices[voice_name] print(f"Loading voice '{voice_name}' from {voice_path}...") # Preprocess the audio print(f" Analyzing audio...") print_audio_analysis(str(voice_path)) print(f" Preprocessing audio...") config = PreprocessingConfig( target_sample_rate=22050, normalize=True, trim_silence=True, trim_top_db=20, reduce_noise=True, target_length_seconds=15.0, ) preprocessor = AudioPreprocessor(config) processed_path = preprocessor.preprocess_file(str(voice_path)) # Load voice state voice_state = self.model.get_state_for_audio_prompt(processed_path) self._voice_states[voice_name] = voice_state print(f" Voice '{voice_name}' loaded and cached!") return voice_state def is_voice_loaded(self, voice_name: str) -> bool: """Check if a voice is already loaded in cache.""" return voice_name.lower() in self._voice_states def get_user_voice(self, user_id: int) -> str: """Get the voice preference for a user, or default voice.""" return self._user_voices.get(user_id, self.default_voice or "") def set_user_voice(self, user_id: int, voice_name: str) -> None: """Set the voice preference for a user.""" voice_name = voice_name.lower() if voice_name not in self._available_voices: raise ValueError(f"Voice '{voice_name}' not found") self._user_voices[user_id] = voice_name self._save_preferences() def get_user_voice_state(self, user_id: int) -> Any: """Get the voice state for a user (loads on-demand if needed).""" voice_name = self.get_user_voice(user_id) if not voice_name: raise RuntimeError("No default voice available") return self.get_voice_state(voice_name) def get_loaded_voices(self) -> list[str]: """Get list of currently loaded voice names.""" return list(self._voice_states.keys()) def _load_preferences(self) -> None: """Load user voice preferences from JSON file.""" if not self.preferences_file.exists(): return try: with open(self.preferences_file, "r") as f: data = json.load(f) # Load user preferences (convert string keys back to int) for user_id_str, voice_name in data.get("user_voices", {}).items(): user_id = int(user_id_str) # Only load if voice still exists if voice_name.lower() in self._available_voices: self._user_voices[user_id] = voice_name.lower() print(f" Loaded {len(self._user_voices)} user voice preferences") except Exception as e: print(f" Warning: Failed to load preferences: {e}") def _save_preferences(self) -> None: """Save user voice preferences to JSON file.""" try: # Ensure directory exists self.preferences_file.parent.mkdir(parents=True, exist_ok=True) data = { "user_voices": {str(k): v for k, v in self._user_voices.items()}, "user_effects": {str(k): v for k, v in self._user_effects.items()}, } with open(self.preferences_file, "w") as f: json.dump(data, f, indent=2) except Exception as e: print(f"Warning: Failed to save preferences: {e}") # Effects management methods def get_user_effects(self, user_id: int) -> dict[str, int | float]: """Get the audio effects for a user. Returns defaults if not set.""" effects = self._user_effects.get(user_id, {}) # Convert to proper types (JSON stores them as strings) pitch = effects.get("pitch", AudioEffects.PITCH_DEFAULT) speed = effects.get("speed", AudioEffects.SPEED_DEFAULT) echo = effects.get("echo", AudioEffects.ECHO_DEFAULT) robot = effects.get("robot", AudioEffects.ROBOT_DEFAULT) chorus = effects.get("chorus", AudioEffects.CHORUS_DEFAULT) tremolo_depth = effects.get("tremolo_depth", AudioEffects.TREMOLO_DEPTH_DEFAULT) tremolo_rate = effects.get("tremolo_rate", AudioEffects.TREMOLO_RATE_DEFAULT) return { "pitch": int(pitch) if pitch is not None else AudioEffects.PITCH_DEFAULT, "speed": float(speed) if speed is not None else AudioEffects.SPEED_DEFAULT, "echo": int(echo) if echo is not None else AudioEffects.ECHO_DEFAULT, "robot": int(robot) if robot is not None else AudioEffects.ROBOT_DEFAULT, "chorus": int(chorus) if chorus is not None else AudioEffects.CHORUS_DEFAULT, "tremolo_depth": float(tremolo_depth) if tremolo_depth is not None else AudioEffects.TREMOLO_DEPTH_DEFAULT, "tremolo_rate": float(tremolo_rate) if tremolo_rate is not None else AudioEffects.TREMOLO_RATE_DEFAULT, } def set_user_effect(self, user_id: int, effect_name: str, value: Any) -> tuple[bool, str]: """ Set an audio effect for a user. Returns: Tuple of (success, message) """ # Validate the effect is_valid, error_msg = AudioEffects.validate_effect(effect_name, value) if not is_valid: return False, error_msg # Get current effects if user_id not in self._user_effects: self._user_effects[user_id] = {} # Save the effect current_effects = self._user_effects[user_id].copy() if effect_name == "pitch": current_effects["pitch"] = int(value) elif effect_name == "speed": current_effects["speed"] = float(value) elif effect_name == "echo": current_effects["echo"] = int(value) elif effect_name == "robot": current_effects["robot"] = int(value) elif effect_name == "chorus": current_effects["chorus"] = int(value) elif effect_name == "tremolo_depth": current_effects["tremolo_depth"] = float(value) elif effect_name == "tremolo_rate": current_effects["tremolo_rate"] = float(value) # Count active effects and show warning if > 2 active_count = AudioEffects.count_active_effects( pitch=current_effects.get("pitch", AudioEffects.PITCH_DEFAULT), speed=current_effects.get("speed", AudioEffects.SPEED_DEFAULT), echo=current_effects.get("echo", AudioEffects.ECHO_DEFAULT), robot=current_effects.get("robot", AudioEffects.ROBOT_DEFAULT), chorus=current_effects.get("chorus", AudioEffects.CHORUS_DEFAULT), tremolo_depth=current_effects.get("tremolo_depth", AudioEffects.TREMOLO_DEPTH_DEFAULT), ) self._user_effects[user_id][effect_name] = value self._save_preferences() if active_count > 2: return True, f"Effect applied! ⚠️ You have {active_count} active effects. Performance may be slower with more effects." else: return True, "Effect applied successfully!" def reset_user_effects(self, user_id: int) -> None: """Reset all audio effects to defaults for a user.""" if user_id in self._user_effects: del self._user_effects[user_id] self._save_preferences() def count_active_effects(self, user_id: int) -> int: """Count how many effects are active for a user.""" effects = self.get_user_effects(user_id) return AudioEffects.count_active_effects( pitch=effects["pitch"], speed=effects["speed"], echo=effects["echo"], robot=effects["robot"], chorus=effects["chorus"], tremolo_depth=effects["tremolo_depth"], ) def _load_preferences(self) -> None: """Load user voice preferences from JSON file.""" if not self.preferences_file.exists(): return try: with open(self.preferences_file, "r") as f: data = json.load(f) # Load user preferences (convert string keys back to int) for user_id_str, voice_name in data.get("user_voices", {}).items(): user_id = int(user_id_str) # Only load if voice still exists if voice_name.lower() in self._available_voices: self._user_voices[user_id] = voice_name.lower() # Load user effects (convert string keys back to int) for user_id_str, effects in data.get("user_effects", {}).items(): user_id = int(user_id_str) self._user_effects[user_id] = effects print(f" Loaded {len(self._user_voices)} user voice preferences") print(f" Loaded {len(self._user_effects)} user effect preferences") except Exception as e: print(f" Warning: Failed to load preferences: {e}")