Files
Vox/voice_manager.py
Spencer 9917d44f5d docs: add HuggingFace cache troubleshooting to README
- Document HF_HOME environment variable for writable cache
- Add systemd service permission guidance for /tmp paths
- Troubleshooting steps for read-only file system errors
2026-02-26 15:56:09 -06:00

313 lines
13 KiB
Python
Executable File

"""Voice management for per-user voice selection and on-demand loading."""
import json
from pathlib import Path
from typing import Any
from pocket_tts import TTSModel
from audio_effects import AudioEffects
from audio_preprocessor import (
AudioPreprocessor,
PreprocessingConfig,
print_audio_analysis,
)
class VoiceManager:
"""Manages available voices, per-user preferences, and on-demand voice loading."""
def __init__(self, voices_dir: str, default_voice: str | None = None):
self.voices_dir = Path(voices_dir)
self.default_voice = default_voice
self.model: TTSModel | None = None
self.preferences_file = self.voices_dir / "preferences.json"
# Cache of loaded voice states: voice_name -> voice_state
self._voice_states: dict[str, Any] = {}
# Per-user voice preferences: user_id -> voice_name
self._user_voices: dict[int, str] = {}
# Per-user audio effects: user_id -> {"pitch": int, "speed": float}
self._user_effects: dict[int, dict[str, Any]] = {}
# Available voices: voice_name -> file_path
self._available_voices: dict[str, Path] = {}
def discover_voices(self) -> dict[str, Path]:
"""Discover all available voice WAV files in the voices directory."""
old_voices = set(self._available_voices.keys())
self._available_voices = {}
if not self.voices_dir.exists():
print(f"Voices directory not found: {self.voices_dir}")
return self._available_voices
for wav_file in self.voices_dir.glob("*.wav"):
voice_name = wav_file.stem.lower()
self._available_voices[voice_name] = wav_file
print(f" Found voice: {voice_name} ({wav_file.name})")
# Set default voice if not specified
if self.default_voice is None and self._available_voices:
self.default_voice = next(iter(self._available_voices.keys()))
# Load saved preferences
self._load_preferences()
return self._available_voices
def refresh_voices(self) -> tuple[list[str], list[str]]:
"""Re-scan for voices and return (new_voices, removed_voices)."""
old_voices = set(self._available_voices.keys())
self._available_voices = {}
if self.voices_dir.exists():
for wav_file in self.voices_dir.glob("*.wav"):
voice_name = wav_file.stem.lower()
self._available_voices[voice_name] = wav_file
new_voices = set(self._available_voices.keys())
added = sorted(new_voices - old_voices)
removed = sorted(old_voices - new_voices)
# Update default if needed
if self.default_voice not in self._available_voices and self._available_voices:
self.default_voice = next(iter(self._available_voices.keys()))
return added, removed
def load_model(self) -> None:
"""Load the TTS model (does not load any voices yet)."""
print("Loading Pocket TTS model...")
self.model = TTSModel.load_model()
print("TTS model loaded!")
def get_available_voices(self) -> list[str]:
"""Get list of available voice names."""
return sorted(self._available_voices.keys())
def is_voice_available(self, voice_name: str) -> bool:
"""Check if a voice is available."""
return voice_name.lower() in self._available_voices
def get_voice_state(self, voice_name: str) -> Any:
"""Get or load a voice state on-demand."""
if self.model is None:
raise RuntimeError("Model not loaded. Call load_model() first.")
voice_name = voice_name.lower()
if voice_name not in self._available_voices:
raise ValueError(f"Voice '{voice_name}' not found")
# Return cached state if already loaded
if voice_name in self._voice_states:
return self._voice_states[voice_name]
# Load the voice on-demand
voice_path = self._available_voices[voice_name]
print(f"Loading voice '{voice_name}' from {voice_path}...")
# Preprocess the audio
print(f" Analyzing audio...")
print_audio_analysis(str(voice_path))
print(f" Preprocessing audio...")
config = PreprocessingConfig(
target_sample_rate=22050,
normalize=True,
trim_silence=True,
trim_top_db=20,
reduce_noise=True,
target_length_seconds=15.0,
)
preprocessor = AudioPreprocessor(config)
processed_path = preprocessor.preprocess_file(str(voice_path))
# Load voice state
voice_state = self.model.get_state_for_audio_prompt(processed_path)
self._voice_states[voice_name] = voice_state
print(f" Voice '{voice_name}' loaded and cached!")
return voice_state
def is_voice_loaded(self, voice_name: str) -> bool:
"""Check if a voice is already loaded in cache."""
return voice_name.lower() in self._voice_states
def get_user_voice(self, user_id: int) -> str:
"""Get the voice preference for a user, or default voice."""
return self._user_voices.get(user_id, self.default_voice or "")
def set_user_voice(self, user_id: int, voice_name: str) -> None:
"""Set the voice preference for a user."""
voice_name = voice_name.lower()
if voice_name not in self._available_voices:
raise ValueError(f"Voice '{voice_name}' not found")
self._user_voices[user_id] = voice_name
self._save_preferences()
def get_user_voice_state(self, user_id: int) -> Any:
"""Get the voice state for a user (loads on-demand if needed)."""
voice_name = self.get_user_voice(user_id)
if not voice_name:
raise RuntimeError("No default voice available")
return self.get_voice_state(voice_name)
def get_loaded_voices(self) -> list[str]:
"""Get list of currently loaded voice names."""
return list(self._voice_states.keys())
def _load_preferences(self) -> None:
"""Load user voice preferences from JSON file."""
if not self.preferences_file.exists():
return
try:
with open(self.preferences_file, "r") as f:
data = json.load(f)
# Load user preferences (convert string keys back to int)
for user_id_str, voice_name in data.get("user_voices", {}).items():
user_id = int(user_id_str)
# Only load if voice still exists
if voice_name.lower() in self._available_voices:
self._user_voices[user_id] = voice_name.lower()
print(f" Loaded {len(self._user_voices)} user voice preferences")
except Exception as e:
print(f" Warning: Failed to load preferences: {e}")
def _save_preferences(self) -> None:
"""Save user voice preferences to JSON file."""
try:
# Ensure directory exists
self.preferences_file.parent.mkdir(parents=True, exist_ok=True)
data = {
"user_voices": {str(k): v for k, v in self._user_voices.items()},
"user_effects": {str(k): v for k, v in self._user_effects.items()},
}
with open(self.preferences_file, "w") as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Warning: Failed to save preferences: {e}")
# Effects management methods
def get_user_effects(self, user_id: int) -> dict[str, int | float]:
"""Get the audio effects for a user. Returns defaults if not set."""
effects = self._user_effects.get(user_id, {})
# Convert to proper types (JSON stores them as strings)
pitch = effects.get("pitch", AudioEffects.PITCH_DEFAULT)
speed = effects.get("speed", AudioEffects.SPEED_DEFAULT)
echo = effects.get("echo", AudioEffects.ECHO_DEFAULT)
robot = effects.get("robot", AudioEffects.ROBOT_DEFAULT)
chorus = effects.get("chorus", AudioEffects.CHORUS_DEFAULT)
tremolo_depth = effects.get("tremolo_depth", AudioEffects.TREMOLO_DEPTH_DEFAULT)
tremolo_rate = effects.get("tremolo_rate", AudioEffects.TREMOLO_RATE_DEFAULT)
return {
"pitch": int(pitch) if pitch is not None else AudioEffects.PITCH_DEFAULT,
"speed": float(speed) if speed is not None else AudioEffects.SPEED_DEFAULT,
"echo": int(echo) if echo is not None else AudioEffects.ECHO_DEFAULT,
"robot": int(robot) if robot is not None else AudioEffects.ROBOT_DEFAULT,
"chorus": int(chorus) if chorus is not None else AudioEffects.CHORUS_DEFAULT,
"tremolo_depth": float(tremolo_depth) if tremolo_depth is not None else AudioEffects.TREMOLO_DEPTH_DEFAULT,
"tremolo_rate": float(tremolo_rate) if tremolo_rate is not None else AudioEffects.TREMOLO_RATE_DEFAULT,
}
def set_user_effect(self, user_id: int, effect_name: str, value: Any) -> tuple[bool, str]:
"""
Set an audio effect for a user.
Returns:
Tuple of (success, message)
"""
# Validate the effect
is_valid, error_msg = AudioEffects.validate_effect(effect_name, value)
if not is_valid:
return False, error_msg
# Get current effects
if user_id not in self._user_effects:
self._user_effects[user_id] = {}
# Save the effect
current_effects = self._user_effects[user_id].copy()
if effect_name == "pitch":
current_effects["pitch"] = int(value)
elif effect_name == "speed":
current_effects["speed"] = float(value)
elif effect_name == "echo":
current_effects["echo"] = int(value)
elif effect_name == "robot":
current_effects["robot"] = int(value)
elif effect_name == "chorus":
current_effects["chorus"] = int(value)
elif effect_name == "tremolo_depth":
current_effects["tremolo_depth"] = float(value)
elif effect_name == "tremolo_rate":
current_effects["tremolo_rate"] = float(value)
# Count active effects and show warning if > 2
active_count = AudioEffects.count_active_effects(
pitch=current_effects.get("pitch", AudioEffects.PITCH_DEFAULT),
speed=current_effects.get("speed", AudioEffects.SPEED_DEFAULT),
echo=current_effects.get("echo", AudioEffects.ECHO_DEFAULT),
robot=current_effects.get("robot", AudioEffects.ROBOT_DEFAULT),
chorus=current_effects.get("chorus", AudioEffects.CHORUS_DEFAULT),
tremolo_depth=current_effects.get("tremolo_depth", AudioEffects.TREMOLO_DEPTH_DEFAULT),
)
self._user_effects[user_id][effect_name] = value
self._save_preferences()
if active_count > 2:
return True, f"Effect applied! ⚠️ You have {active_count} active effects. Performance may be slower with more effects."
else:
return True, "Effect applied successfully!"
def reset_user_effects(self, user_id: int) -> None:
"""Reset all audio effects to defaults for a user."""
if user_id in self._user_effects:
del self._user_effects[user_id]
self._save_preferences()
def count_active_effects(self, user_id: int) -> int:
"""Count how many effects are active for a user."""
effects = self.get_user_effects(user_id)
return AudioEffects.count_active_effects(
pitch=effects["pitch"],
speed=effects["speed"],
echo=effects["echo"],
robot=effects["robot"],
chorus=effects["chorus"],
tremolo_depth=effects["tremolo_depth"],
)
def _load_preferences(self) -> None:
"""Load user voice preferences from JSON file."""
if not self.preferences_file.exists():
return
try:
with open(self.preferences_file, "r") as f:
data = json.load(f)
# Load user preferences (convert string keys back to int)
for user_id_str, voice_name in data.get("user_voices", {}).items():
user_id = int(user_id_str)
# Only load if voice still exists
if voice_name.lower() in self._available_voices:
self._user_voices[user_id] = voice_name.lower()
# Load user effects (convert string keys back to int)
for user_id_str, effects in data.get("user_effects", {}).items():
user_id = int(user_id_str)
self._user_effects[user_id] = effects
print(f" Loaded {len(self._user_voices)} user voice preferences")
print(f" Loaded {len(self._user_effects)} user effect preferences")
except Exception as e:
print(f" Warning: Failed to load preferences: {e}")