feat: add audio effects (pitch and speed control)
- Added new audio_effects.py module with pitch shift and speed change - Pitch range: -12 to +12 semitones (higher = chipmunk, lower = deeper) - Speed range: 0.5 to 2.0x (higher = faster, lower = slower) - Maximum 2 active effects per user (performance optimization) - Added /effects command group: - /effects list - Shows current effects with descriptions - /effects set pitch|speed <value> - Apply effects - /effects reset - Confirmation UI to clear all effects - Effects persist across restarts in preferences.json - Updated /voice preview to support optional pitch/speed parameters - Effects applied in _generate_wav_bytes using librosa - Added performance warnings when processing takes >1 second - Updated README with effects documentation
This commit is contained in:
24
README.md
24
README.md
@@ -13,6 +13,8 @@ A Discord bot that reads messages aloud using [Pocket TTS](https://github.com/ky
|
||||
- 🔄 **Hot-reload Voices**: Add new voices without restarting the bot using `/voice refresh`
|
||||
- 🧪 **Test Mode**: Separate testing configuration for safe development
|
||||
- 📦 **Auto-updates**: Automatically checks for and installs dependency updates on startup
|
||||
- 👂 **Voice Preview**: Preview voices with `/voice preview` before committing to them
|
||||
- 🎵 **Audio Effects**: Apply pitch shift and speed changes to your TTS voice
|
||||
|
||||
## Prerequisites
|
||||
|
||||
@@ -126,6 +128,28 @@ This loads `.env.testing` instead of `.env`, allowing you to:
|
||||
|
||||
Create `.env.testing` by copying `.env.example` and configuring it with your testing values.
|
||||
|
||||
### Audio Effects
|
||||
|
||||
Apply pitch shift and speed changes to your TTS voice:
|
||||
|
||||
- `/effects list` - Show your current effect settings
|
||||
- `/effects set pitch <semitones>` - Change pitch (-12 to +12)
|
||||
- Positive = higher/chipmunk voice
|
||||
- Negative = lower/deeper voice
|
||||
- 0 = normal pitch (default)
|
||||
- `/effects set speed <multiplier>` - Change speed (0.5 to 2.0)
|
||||
- Higher = faster speech
|
||||
- Lower = slower speech
|
||||
- 1.0 = normal speed (default)
|
||||
- `/effects reset` - Reset all effects to defaults
|
||||
|
||||
**Note**: You can use up to 2 effects simultaneously. More effects require more processing time.
|
||||
|
||||
### Preview with Effects
|
||||
|
||||
Test voice and effect combinations before committing:
|
||||
- `/voice preview <name> [pitch] [speed]` - Preview a voice with optional effect overrides
|
||||
|
||||
## How It Works
|
||||
|
||||
```
|
||||
|
||||
146
audio_effects.py
Normal file
146
audio_effects.py
Normal file
@@ -0,0 +1,146 @@
|
||||
"""Audio effects processing for TTS output."""
|
||||
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import librosa
|
||||
import numpy as np
|
||||
|
||||
|
||||
class AudioEffects:
|
||||
"""Apply post-processing effects to TTS audio."""
|
||||
|
||||
MAX_ACTIVE_EFFECTS = 2
|
||||
|
||||
# Effect ranges
|
||||
PITCH_MIN = -12
|
||||
PITCH_MAX = 12
|
||||
PITCH_DEFAULT = 0
|
||||
|
||||
SPEED_MIN = 0.5
|
||||
SPEED_MAX = 2.0
|
||||
SPEED_DEFAULT = 1.0
|
||||
|
||||
@classmethod
|
||||
def apply_effects(
|
||||
cls,
|
||||
audio: np.ndarray,
|
||||
sr: int,
|
||||
pitch: int = PITCH_DEFAULT,
|
||||
speed: float = SPEED_DEFAULT,
|
||||
) -> tuple[np.ndarray, bool]:
|
||||
"""
|
||||
Apply effects to audio.
|
||||
|
||||
Args:
|
||||
audio: Input audio array
|
||||
sr: Sample rate
|
||||
pitch: Pitch shift in semitones (-12 to +12, 0 = no shift)
|
||||
speed: Speed multiplier (0.5 to 2.0, 1.0 = normal)
|
||||
|
||||
Returns:
|
||||
Tuple of (processed_audio, show_processing_message)
|
||||
show_processing_message is True if processing took > 1 second
|
||||
"""
|
||||
start_time = time.time()
|
||||
original_length = len(audio)
|
||||
|
||||
# Validate inputs
|
||||
pitch = max(cls.PITCH_MIN, min(cls.PITCH_MAX, pitch))
|
||||
speed = max(cls.SPEED_MIN, min(cls.SPEED_MAX, speed))
|
||||
|
||||
print(f"Applying effects - Pitch: {pitch:+d}, Speed: {speed:.1f}x")
|
||||
|
||||
# Apply pitch shift first (if not default)
|
||||
if pitch != cls.PITCH_DEFAULT:
|
||||
print(f" Applying pitch shift: {pitch:+d} semitones...")
|
||||
audio = librosa.effects.pitch_shift(
|
||||
audio, sr=sr, n_steps=pitch, bins_per_octave=12
|
||||
)
|
||||
|
||||
# Apply speed change second (if not default)
|
||||
if speed != cls.SPEED_DEFAULT:
|
||||
print(f" Applying speed change: {speed:.1f}x...")
|
||||
audio = librosa.effects.time_stretch(audio, rate=speed)
|
||||
|
||||
# Stretching changes length, so we need to resample to maintain duration
|
||||
# Actually, for TTS we want the new speed, so we don't resample back
|
||||
# The audio will be shorter or longer based on speed
|
||||
|
||||
processing_time = time.time() - start_time
|
||||
print(f" Effects applied in {processing_time:.2f}s")
|
||||
|
||||
# Show processing message if it took more than 1 second
|
||||
show_message = processing_time > 1.0
|
||||
|
||||
return audio, show_message
|
||||
|
||||
@classmethod
|
||||
def validate_effect(cls, effect_name: str, value: Any) -> tuple[bool, str]:
|
||||
"""
|
||||
Validate an effect value.
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_message)
|
||||
"""
|
||||
if effect_name == "pitch":
|
||||
try:
|
||||
pitch = int(value)
|
||||
if cls.PITCH_MIN <= pitch <= cls.PITCH_MAX:
|
||||
return True, ""
|
||||
return (
|
||||
False,
|
||||
f"Pitch must be between {cls.PITCH_MIN} and {cls.PITCH_MAX} semitones",
|
||||
)
|
||||
except (ValueError, TypeError):
|
||||
return False, "Pitch must be a whole number"
|
||||
|
||||
elif effect_name == "speed":
|
||||
try:
|
||||
speed = float(value)
|
||||
if cls.SPEED_MIN <= speed <= cls.SPEED_MAX:
|
||||
return True, ""
|
||||
return (
|
||||
False,
|
||||
f"Speed must be between {cls.SPEED_MIN} and {cls.SPEED_MAX}",
|
||||
)
|
||||
except (ValueError, TypeError):
|
||||
return False, "Speed must be a number"
|
||||
|
||||
return False, f"Unknown effect: {effect_name}"
|
||||
|
||||
@classmethod
|
||||
def count_active_effects(cls, pitch: int, speed: float) -> int:
|
||||
"""Count how many effects are active (non-default)."""
|
||||
count = 0
|
||||
if pitch != cls.PITCH_DEFAULT:
|
||||
count += 1
|
||||
if speed != cls.SPEED_DEFAULT:
|
||||
count += 1
|
||||
return count
|
||||
|
||||
@classmethod
|
||||
def get_effect_description(cls, effect_name: str) -> str:
|
||||
"""Get a human-readable description of what an effect does."""
|
||||
descriptions = {
|
||||
"pitch": f"Changes voice pitch ({cls.PITCH_MIN} to {cls.PITCH_MAX} semitones). Positive = higher/chipmunk, Negative = lower/deeper.",
|
||||
"speed": f"Changes speech speed ({cls.SPEED_MIN} to {cls.SPEED_MAX}x). Higher = faster, Lower = slower.",
|
||||
}
|
||||
return descriptions.get(effect_name, "Unknown effect")
|
||||
|
||||
@classmethod
|
||||
def format_effect_value(cls, effect_name: str, value: Any) -> str:
|
||||
"""Format an effect value for display."""
|
||||
if effect_name == "pitch":
|
||||
pitch = int(value)
|
||||
if pitch == 0:
|
||||
return "0 (normal)"
|
||||
direction = "higher" if pitch > 0 else "lower"
|
||||
return f"{pitch:+d} ({direction})"
|
||||
elif effect_name == "speed":
|
||||
speed = float(value)
|
||||
if speed == 1.0:
|
||||
return "1.0x (normal)"
|
||||
direction = "faster" if speed > 1.0 else "slower"
|
||||
return f"{speed:.1f}x ({direction})"
|
||||
return str(value)
|
||||
266
bot.py
266
bot.py
@@ -24,6 +24,7 @@ import scipy.io.wavfile as wavfile
|
||||
from discord import app_commands
|
||||
from discord.ext import commands
|
||||
|
||||
from audio_effects import AudioEffects
|
||||
from config import Config
|
||||
from voice_manager import VoiceManager
|
||||
|
||||
@@ -58,6 +59,7 @@ class TTSBot(commands.Bot):
|
||||
self.last_activity: float = 0.0
|
||||
|
||||
self._setup_slash_commands()
|
||||
self._setup_effects_commands()
|
||||
|
||||
def _setup_slash_commands(self) -> None:
|
||||
"""Set up slash commands for voice management."""
|
||||
@@ -65,7 +67,9 @@ class TTSBot(commands.Bot):
|
||||
@self.tree.command(name="voice", description="Manage your TTS voice")
|
||||
@app_commands.describe(
|
||||
action="What to do",
|
||||
voice_name="Name of the voice (for 'set' action)"
|
||||
voice_name="Name of the voice (for 'set' or 'preview' action)",
|
||||
preview_pitch="Optional pitch for preview (-12 to 12, default: use your settings)",
|
||||
preview_speed="Optional speed for preview (0.5 to 2.0, default: use your settings)",
|
||||
)
|
||||
@app_commands.choices(action=[
|
||||
app_commands.Choice(name="list", value="list"),
|
||||
@@ -77,7 +81,9 @@ class TTSBot(commands.Bot):
|
||||
async def voice_command(
|
||||
interaction: discord.Interaction,
|
||||
action: app_commands.Choice[str],
|
||||
voice_name: str | None = None
|
||||
voice_name: str | None = None,
|
||||
preview_pitch: int | None = None,
|
||||
preview_speed: float | None = None,
|
||||
):
|
||||
if action.value == "list":
|
||||
await self._handle_voice_list(interaction)
|
||||
@@ -88,7 +94,7 @@ class TTSBot(commands.Bot):
|
||||
elif action.value == "refresh":
|
||||
await self._handle_voice_refresh(interaction)
|
||||
elif action.value == "preview":
|
||||
await self._handle_voice_preview(interaction, voice_name)
|
||||
await self._handle_voice_preview(interaction, voice_name, preview_pitch, preview_speed)
|
||||
|
||||
@voice_command.autocomplete("voice_name")
|
||||
async def voice_name_autocomplete(
|
||||
@@ -102,6 +108,161 @@ class TTSBot(commands.Bot):
|
||||
if current.lower() in v.lower()
|
||||
][:25]
|
||||
|
||||
def _setup_effects_commands(self) -> None:
|
||||
"""Set up slash commands for audio effects management."""
|
||||
|
||||
@self.tree.command(name="effects", description="Manage your TTS audio effects")
|
||||
@app_commands.describe(
|
||||
action="What to do",
|
||||
effect_name="Name of the effect (for 'set' action)",
|
||||
value="Value for the effect (for 'set' action)"
|
||||
)
|
||||
@app_commands.choices(action=[
|
||||
app_commands.Choice(name="list", value="list"),
|
||||
app_commands.Choice(name="set", value="set"),
|
||||
app_commands.Choice(name="reset", value="reset"),
|
||||
])
|
||||
@app_commands.choices(effect_name=[
|
||||
app_commands.Choice(name="pitch", value="pitch"),
|
||||
app_commands.Choice(name="speed", value="speed"),
|
||||
])
|
||||
async def effects_command(
|
||||
interaction: discord.Interaction,
|
||||
action: app_commands.Choice[str],
|
||||
effect_name: app_commands.Choice[str] | None = None,
|
||||
value: str | None = None
|
||||
):
|
||||
if action.value == "list":
|
||||
await self._handle_effects_list(interaction)
|
||||
elif action.value == "set":
|
||||
await self._handle_effects_set(interaction, effect_name, value)
|
||||
elif action.value == "reset":
|
||||
await self._handle_effects_reset(interaction)
|
||||
|
||||
async def _handle_effects_list(self, interaction: discord.Interaction) -> None:
|
||||
"""Handle /effects list command."""
|
||||
effects = self.voice_manager.get_user_effects(interaction.user.id)
|
||||
active_count = self.voice_manager.count_active_effects(interaction.user.id)
|
||||
|
||||
lines = ["**Your Audio Effects:**\n"]
|
||||
|
||||
# Pitch
|
||||
pitch_desc = AudioEffects.get_effect_description("pitch")
|
||||
pitch_val = AudioEffects.format_effect_value("pitch", effects["pitch"])
|
||||
lines.append(f"🎵 **Pitch**: {pitch_val}")
|
||||
lines.append(f" {pitch_desc}\n")
|
||||
|
||||
# Speed
|
||||
speed_desc = AudioEffects.get_effect_description("speed")
|
||||
speed_val = AudioEffects.format_effect_value("speed", effects["speed"])
|
||||
lines.append(f"⚡ **Speed**: {speed_val}")
|
||||
lines.append(f" {speed_desc}\n")
|
||||
|
||||
# Active count warning
|
||||
lines.append(f"**Active Effects**: {active_count}/{AudioEffects.MAX_ACTIVE_EFFECTS}")
|
||||
if active_count >= AudioEffects.MAX_ACTIVE_EFFECTS:
|
||||
lines.append("⚠️ Max effects reached. More effects = slower processing time.")
|
||||
elif active_count > 0:
|
||||
lines.append(f"ℹ️ You can add {AudioEffects.MAX_ACTIVE_EFFECTS - active_count} more effect(s).")
|
||||
|
||||
lines.append(f"\n*Use `/effects set <effect> <value>` to change settings*")
|
||||
lines.append(f"*Use `/effects reset` to clear all effects*")
|
||||
|
||||
await interaction.response.send_message(
|
||||
"\n".join(lines),
|
||||
ephemeral=True
|
||||
)
|
||||
|
||||
async def _handle_effects_set(
|
||||
self,
|
||||
interaction: discord.Interaction,
|
||||
effect_name: app_commands.Choice[str] | None,
|
||||
value: str | None
|
||||
) -> None:
|
||||
"""Handle /effects set command."""
|
||||
if not effect_name or value is None:
|
||||
await interaction.response.send_message(
|
||||
"❌ Please provide both effect name and value. Example: `/effects set pitch 3`",
|
||||
ephemeral=True
|
||||
)
|
||||
return
|
||||
|
||||
success, message = self.voice_manager.set_user_effect(
|
||||
interaction.user.id,
|
||||
effect_name.value,
|
||||
value
|
||||
)
|
||||
|
||||
if success:
|
||||
await interaction.response.send_message(
|
||||
f"✅ {message}",
|
||||
ephemeral=True
|
||||
)
|
||||
else:
|
||||
await interaction.response.send_message(
|
||||
f"❌ {message}",
|
||||
ephemeral=True
|
||||
)
|
||||
|
||||
async def _handle_effects_reset(self, interaction: discord.Interaction) -> None:
|
||||
"""Handle /effects reset command with confirmation UI."""
|
||||
# Check if user has any effects to reset
|
||||
active_count = self.voice_manager.count_active_effects(interaction.user.id)
|
||||
|
||||
if active_count == 0:
|
||||
await interaction.response.send_message(
|
||||
"ℹ️ You don't have any active effects to reset.",
|
||||
ephemeral=True
|
||||
)
|
||||
return
|
||||
|
||||
# Create confirmation buttons
|
||||
class ConfirmResetView(discord.ui.View):
|
||||
def __init__(self, voice_manager, user_id):
|
||||
super().__init__(timeout=30)
|
||||
self.voice_manager = voice_manager
|
||||
self.user_id = user_id
|
||||
self.confirmed = False
|
||||
|
||||
@discord.ui.button(label="✅ Yes, Reset All", style=discord.ButtonStyle.danger)
|
||||
async def confirm_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||||
if interaction.user.id != self.user_id:
|
||||
await interaction.response.send_message("This button is not for you!", ephemeral=True)
|
||||
return
|
||||
|
||||
self.voice_manager.reset_user_effects(self.user_id)
|
||||
self.confirmed = True
|
||||
await interaction.response.edit_message(
|
||||
content="✅ All audio effects have been reset to defaults!",
|
||||
view=None
|
||||
)
|
||||
self.stop()
|
||||
|
||||
@discord.ui.button(label="❌ Cancel", style=discord.ButtonStyle.secondary)
|
||||
async def cancel_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||||
if interaction.user.id != self.user_id:
|
||||
await interaction.response.send_message("This button is not for you!", ephemeral=True)
|
||||
return
|
||||
|
||||
await interaction.response.edit_message(
|
||||
content="❌ Reset cancelled. Your effects remain unchanged.",
|
||||
view=None
|
||||
)
|
||||
self.stop()
|
||||
|
||||
view = ConfirmResetView(self.voice_manager, interaction.user.id)
|
||||
|
||||
await interaction.response.send_message(
|
||||
f"⚠️ **Reset Confirmation**\n\n"
|
||||
f"You have {active_count} active effect(s).\n"
|
||||
f"This will reset **all** your audio effects to defaults:\n"
|
||||
f"• Pitch: 0 (normal)\n"
|
||||
f"• Speed: 1.0x (normal)\n\n"
|
||||
f"Are you sure you want to continue?",
|
||||
view=view,
|
||||
ephemeral=True
|
||||
)
|
||||
|
||||
async def _handle_voice_list(self, interaction: discord.Interaction) -> None:
|
||||
"""Handle /voice list command."""
|
||||
voices = self.voice_manager.get_available_voices()
|
||||
@@ -222,7 +383,13 @@ class TTSBot(commands.Bot):
|
||||
ephemeral=True
|
||||
)
|
||||
|
||||
async def _handle_voice_preview(self, interaction: discord.Interaction, voice_name: str | None) -> None:
|
||||
async def _handle_voice_preview(
|
||||
self,
|
||||
interaction: discord.Interaction,
|
||||
voice_name: str | None,
|
||||
preview_pitch: int | None = None,
|
||||
preview_speed: float | None = None,
|
||||
) -> None:
|
||||
"""Handle /voice preview command."""
|
||||
if not voice_name:
|
||||
await interaction.response.send_message(
|
||||
@@ -251,6 +418,26 @@ class TTSBot(commands.Bot):
|
||||
)
|
||||
return
|
||||
|
||||
# Validate pitch if provided
|
||||
if preview_pitch is not None:
|
||||
is_valid, error_msg = AudioEffects.validate_effect("pitch", preview_pitch)
|
||||
if not is_valid:
|
||||
await interaction.response.send_message(
|
||||
f"❌ Invalid pitch value: {error_msg}",
|
||||
ephemeral=True
|
||||
)
|
||||
return
|
||||
|
||||
# Validate speed if provided
|
||||
if preview_speed is not None:
|
||||
is_valid, error_msg = AudioEffects.validate_effect("speed", preview_speed)
|
||||
if not is_valid:
|
||||
await interaction.response.send_message(
|
||||
f"❌ Invalid speed value: {error_msg}",
|
||||
ephemeral=True
|
||||
)
|
||||
return
|
||||
|
||||
# Select a random preview line
|
||||
preview_text = random.choice(PREVIEW_LINES)
|
||||
|
||||
@@ -274,11 +461,25 @@ class TTSBot(commands.Bot):
|
||||
interaction.user.voice.channel
|
||||
)
|
||||
|
||||
# Queue the preview with voice override
|
||||
await self.message_queue.put((preview_message, preview_text, voice_name))
|
||||
# Use user's current effects if not overridden
|
||||
user_effects = self.voice_manager.get_user_effects(interaction.user.id)
|
||||
final_pitch = preview_pitch if preview_pitch is not None else user_effects["pitch"]
|
||||
final_speed = preview_speed if preview_speed is not None else user_effects["speed"]
|
||||
|
||||
# Queue the preview with voice override and effects
|
||||
await self.message_queue.put((preview_message, preview_text, voice_name, final_pitch, final_speed))
|
||||
|
||||
# Build effect description
|
||||
effect_desc = []
|
||||
if final_pitch != 0:
|
||||
effect_desc.append(f"pitch: {final_pitch:+d}")
|
||||
if final_speed != 1.0:
|
||||
effect_desc.append(f"speed: {final_speed:.1f}x")
|
||||
|
||||
effect_str = f" (with {', '.join(effect_desc)})" if effect_desc else ""
|
||||
|
||||
await interaction.response.send_message(
|
||||
f"⏳ Queued preview for `{voice_name}`. Sample: \"{preview_text[:50]}{'...' if len(preview_text) > 50 else ''}\"",
|
||||
f"⏳ Queued preview for `{voice_name}`{effect_str}. Sample: \"{preview_text[:50]}{'...' if len(preview_text) > 50 else ''}\"",
|
||||
ephemeral=True
|
||||
)
|
||||
|
||||
@@ -336,21 +537,37 @@ class TTSBot(commands.Bot):
|
||||
while True:
|
||||
queue_item = await self.message_queue.get()
|
||||
|
||||
# Handle both regular messages (message, text) and previews (message, text, voice_name)
|
||||
if len(queue_item) == 3:
|
||||
# Handle queue items of different lengths:
|
||||
# - (message, text) - regular message
|
||||
# - (message, text, voice_name) - preview with voice override
|
||||
# - (message, text, voice_name, pitch, speed) - preview with effects
|
||||
if len(queue_item) == 5:
|
||||
message, text, voice_override, pitch, speed = queue_item
|
||||
elif len(queue_item) == 3:
|
||||
message, text, voice_override = queue_item
|
||||
pitch = None
|
||||
speed = None
|
||||
else:
|
||||
message, text = queue_item
|
||||
voice_override = None
|
||||
pitch = None
|
||||
speed = None
|
||||
|
||||
try:
|
||||
await self.speak_message(message, text, voice_override)
|
||||
await self.speak_message(message, text, voice_override, pitch, speed)
|
||||
except Exception as e:
|
||||
print(f"Error processing message: {e}")
|
||||
finally:
|
||||
self.message_queue.task_done()
|
||||
|
||||
async def speak_message(self, message: discord.Message, text: str, voice_override: str | None = None) -> None:
|
||||
async def speak_message(
|
||||
self,
|
||||
message: discord.Message,
|
||||
text: str,
|
||||
voice_override: str | None = None,
|
||||
pitch: int | None = None,
|
||||
speed: float | None = None,
|
||||
) -> None:
|
||||
"""Generate TTS and play it in the user's voice channel."""
|
||||
if message.author.voice is None:
|
||||
return
|
||||
@@ -383,8 +600,16 @@ class TTSBot(commands.Bot):
|
||||
)
|
||||
return
|
||||
|
||||
# Get user's effects if not overridden
|
||||
if pitch is None or speed is None:
|
||||
user_effects = self.voice_manager.get_user_effects(message.author.id)
|
||||
if pitch is None:
|
||||
pitch = user_effects["pitch"]
|
||||
if speed is None:
|
||||
speed = user_effects["speed"]
|
||||
|
||||
wav_bytes = await asyncio.to_thread(
|
||||
self._generate_wav_bytes, voice_state, text
|
||||
self._generate_wav_bytes, voice_state, text, pitch, speed
|
||||
)
|
||||
|
||||
audio_source = discord.FFmpegPCMAudio(
|
||||
@@ -409,7 +634,13 @@ class TTSBot(commands.Bot):
|
||||
|
||||
await play_complete.wait()
|
||||
|
||||
def _generate_wav_bytes(self, voice_state: Any, text: str) -> bytes:
|
||||
def _generate_wav_bytes(
|
||||
self,
|
||||
voice_state: Any,
|
||||
text: str,
|
||||
pitch: int = 0,
|
||||
speed: float = 1.0,
|
||||
) -> bytes:
|
||||
"""Generate audio and return as WAV file bytes."""
|
||||
model = self.voice_manager.model
|
||||
if model is None:
|
||||
@@ -421,6 +652,15 @@ class TTSBot(commands.Bot):
|
||||
if audio_np.ndim == 1:
|
||||
audio_np = audio_np.reshape(-1, 1)
|
||||
|
||||
# Apply audio effects if any are active
|
||||
if pitch != 0 or speed != 1.0:
|
||||
print(f"Applying effects - Pitch: {pitch:+d}, Speed: {speed:.1f}x")
|
||||
audio_np, show_processing = AudioEffects.apply_effects(
|
||||
audio_np, model.sample_rate, pitch, speed
|
||||
)
|
||||
if show_processing:
|
||||
print("⚠️ Audio processing took longer than expected due to effects")
|
||||
|
||||
max_val = np.max(np.abs(audio_np))
|
||||
if max_val > 0:
|
||||
audio_np = audio_np / max_val
|
||||
|
||||
@@ -6,6 +6,7 @@ from typing import Any
|
||||
|
||||
from pocket_tts import TTSModel
|
||||
|
||||
from audio_effects import AudioEffects
|
||||
from audio_preprocessor import (
|
||||
AudioPreprocessor,
|
||||
PreprocessingConfig,
|
||||
@@ -26,6 +27,8 @@ class VoiceManager:
|
||||
self._voice_states: dict[str, Any] = {}
|
||||
# Per-user voice preferences: user_id -> voice_name
|
||||
self._user_voices: dict[int, str] = {}
|
||||
# Per-user audio effects: user_id -> {"pitch": int, "speed": float}
|
||||
self._user_effects: dict[int, dict[str, Any]] = {}
|
||||
# Available voices: voice_name -> file_path
|
||||
self._available_voices: dict[str, Path] = {}
|
||||
|
||||
@@ -181,10 +184,95 @@ class VoiceManager:
|
||||
self.preferences_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
data = {
|
||||
"user_voices": {str(k): v for k, v in self._user_voices.items()}
|
||||
"user_voices": {str(k): v for k, v in self._user_voices.items()},
|
||||
"user_effects": {str(k): v for k, v in self._user_effects.items()},
|
||||
}
|
||||
|
||||
with open(self.preferences_file, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to save preferences: {e}")
|
||||
|
||||
# Effects management methods
|
||||
|
||||
def get_user_effects(self, user_id: int) -> dict[str, Any]:
|
||||
"""Get the audio effects for a user. Returns defaults if not set."""
|
||||
effects = self._user_effects.get(user_id, {})
|
||||
return {
|
||||
"pitch": effects.get("pitch", AudioEffects.PITCH_DEFAULT),
|
||||
"speed": effects.get("speed", AudioEffects.SPEED_DEFAULT),
|
||||
}
|
||||
|
||||
def set_user_effect(self, user_id: int, effect_name: str, value: Any) -> tuple[bool, str]:
|
||||
"""
|
||||
Set an audio effect for a user.
|
||||
|
||||
Returns:
|
||||
Tuple of (success, message)
|
||||
"""
|
||||
# Validate the effect
|
||||
is_valid, error_msg = AudioEffects.validate_effect(effect_name, value)
|
||||
if not is_valid:
|
||||
return False, error_msg
|
||||
|
||||
# Get current effects
|
||||
if user_id not in self._user_effects:
|
||||
self._user_effects[user_id] = {}
|
||||
|
||||
# Check if this would exceed max effects
|
||||
current_effects = self._user_effects[user_id].copy()
|
||||
if effect_name == "pitch":
|
||||
current_effects["pitch"] = int(value)
|
||||
elif effect_name == "speed":
|
||||
current_effects["speed"] = float(value)
|
||||
|
||||
active_count = AudioEffects.count_active_effects(
|
||||
current_effects.get("pitch", AudioEffects.PITCH_DEFAULT),
|
||||
current_effects.get("speed", AudioEffects.SPEED_DEFAULT),
|
||||
)
|
||||
|
||||
# Save the effect
|
||||
self._user_effects[user_id][effect_name] = value
|
||||
self._save_preferences()
|
||||
|
||||
if active_count >= AudioEffects.MAX_ACTIVE_EFFECTS:
|
||||
return True, f"Effect applied! ⚠️ You now have {active_count} active effects (max {AudioEffects.MAX_ACTIVE_EFFECTS}). More effects = slower processing."
|
||||
else:
|
||||
return True, "Effect applied successfully!"
|
||||
|
||||
def reset_user_effects(self, user_id: int) -> None:
|
||||
"""Reset all audio effects to defaults for a user."""
|
||||
if user_id in self._user_effects:
|
||||
del self._user_effects[user_id]
|
||||
self._save_preferences()
|
||||
|
||||
def count_active_effects(self, user_id: int) -> int:
|
||||
"""Count how many effects are active for a user."""
|
||||
effects = self.get_user_effects(user_id)
|
||||
return AudioEffects.count_active_effects(effects["pitch"], effects["speed"])
|
||||
|
||||
def _load_preferences(self) -> None:
|
||||
"""Load user voice preferences from JSON file."""
|
||||
if not self.preferences_file.exists():
|
||||
return
|
||||
|
||||
try:
|
||||
with open(self.preferences_file, "r") as f:
|
||||
data = json.load(f)
|
||||
|
||||
# Load user preferences (convert string keys back to int)
|
||||
for user_id_str, voice_name in data.get("user_voices", {}).items():
|
||||
user_id = int(user_id_str)
|
||||
# Only load if voice still exists
|
||||
if voice_name.lower() in self._available_voices:
|
||||
self._user_voices[user_id] = voice_name.lower()
|
||||
|
||||
# Load user effects (convert string keys back to int)
|
||||
for user_id_str, effects in data.get("user_effects", {}).items():
|
||||
user_id = int(user_id_str)
|
||||
self._user_effects[user_id] = effects
|
||||
|
||||
print(f" Loaded {len(self._user_voices)} user voice preferences")
|
||||
print(f" Loaded {len(self._user_effects)} user effect preferences")
|
||||
except Exception as e:
|
||||
print(f" Warning: Failed to load preferences: {e}")
|
||||
|
||||
Reference in New Issue
Block a user