Compare commits
14 Commits
c5e3fd33c4
...
85a334a57b
| Author | SHA1 | Date | |
|---|---|---|---|
| 85a334a57b | |||
| 40843e4ac9 | |||
| 7e76deed3d | |||
| 795d5087e9 | |||
| 8d4ac59f73 | |||
| 68bc3b2c7d | |||
| 4cb0a78486 | |||
| b12639a618 | |||
| f082c62a16 | |||
| 85f3e79d2a | |||
| 9f14e8c745 | |||
| 4a2d72517f | |||
| 2403b431e9 | |||
| c0e5d4bcb6 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -123,7 +123,9 @@ env.bak/
|
||||
venv.bak/
|
||||
/venv
|
||||
.numba_cache/
|
||||
|
||||
# Gemini files
|
||||
GEMINI.md
|
||||
PROGRESS.md
|
||||
.vscode/launch.json
|
||||
voices/preferences.json
|
||||
|
||||
108
README.md
108
README.md
@@ -11,6 +11,12 @@ A Discord bot that reads messages aloud using [Pocket TTS](https://github.com/ky
|
||||
- 🔄 **Per-User Voice Selection**: Each user can choose their own TTS voice via `/voice` commands
|
||||
- 💾 **Voice Persistence**: User voice preferences are saved and restored on restart
|
||||
- 🔄 **Hot-reload Voices**: Add new voices without restarting the bot using `/voice refresh`
|
||||
- 🧪 **Test Mode**: Separate testing configuration for safe development
|
||||
- 📦 **Auto-updates**: Automatically checks for and installs dependency updates on startup
|
||||
- 👂 **Voice Preview**: Preview voices with `/voice preview` before committing to them
|
||||
- 🎵 **Audio Effects**: 7 different effects to customize your voice (pitch, speed, echo, robot, chorus, tremolo)
|
||||
- ⚡ **Unlimited Effects**: Use as many effects as you want (warning shown when >2 active)
|
||||
- ⏱️ **Processing Indicator**: Shows when audio processing is taking longer than expected
|
||||
|
||||
## Prerequisites
|
||||
|
||||
@@ -107,6 +113,108 @@ A Discord bot that reads messages aloud using [Pocket TTS](https://github.com/ky
|
||||
- `/voice set <name>` - Change your personal TTS voice
|
||||
- `/voice current` - Shows your current voice
|
||||
- `/voice refresh` - Re-scan for new voice files (no restart needed)
|
||||
- `/voice preview <name>` - Preview a voice before selecting it
|
||||
|
||||
### Test Mode
|
||||
|
||||
Run the bot in testing mode to use a separate configuration:
|
||||
|
||||
```bash
|
||||
python bot.py testing
|
||||
```
|
||||
|
||||
This loads `.env.testing` instead of `.env`, allowing you to:
|
||||
- Use a different Discord bot token for testing
|
||||
- Monitor a different text channel
|
||||
- Test new features without affecting the production bot
|
||||
|
||||
Create `.env.testing` by copying `.env.example` and configuring it with your testing values.
|
||||
|
||||
### Audio Effects
|
||||
|
||||
Transform your TTS voice with 7 different audio effects:
|
||||
|
||||
#### Available Effects:
|
||||
|
||||
**🎵 Pitch** (`/effects set pitch <semitones>`)
|
||||
- Range: -12 to +12 semitones
|
||||
- Default: 0 (no change)
|
||||
- Positive = higher/chipmunk voice
|
||||
- Negative = lower/deeper voice
|
||||
|
||||
**⚡ Speed** (`/effects set speed <multiplier>`)
|
||||
- Range: 0.5 to 2.0
|
||||
- Default: 1.0x (normal speed)
|
||||
- Higher = faster speech
|
||||
- Lower = slower speech
|
||||
|
||||
**🔊 Echo** (`/effects set echo <percentage>`)
|
||||
- Range: 0-100%
|
||||
- Default: 0% (off)
|
||||
- Adds spatial delay and reverb effect
|
||||
- Higher values = more pronounced echo
|
||||
|
||||
**🤖 Robot** (`/effects set robot <percentage>`)
|
||||
- Range: 0-100%
|
||||
- Default: 0% (off)
|
||||
- Applies ring modulation for sci-fi robotic voice
|
||||
- Higher values = more robotic distortion
|
||||
|
||||
**🎶 Chorus** (`/effects set chorus <percentage>`)
|
||||
- Range: 0-100%
|
||||
- Default: 0% (off)
|
||||
- Creates "multiple voices" effect with slight pitch variations
|
||||
- Higher values = more voices and depth
|
||||
|
||||
**〰️ Tremolo Depth** (`/effects set tremolo_depth <value>`)
|
||||
- Range: 0.0 to 1.0
|
||||
- Default: 0.0 (off)
|
||||
- Controls amplitude modulation amount
|
||||
- Higher = more warble/vintage radio effect
|
||||
|
||||
**📳 Tremolo Rate** (`/effects set tremolo_rate <hertz>`)
|
||||
- Range: 0.0 to 10.0 Hz
|
||||
- Default: 0.0 Hz (off)
|
||||
- Controls how fast the tremolo warbles
|
||||
- Requires tremolo_depth > 0 to have effect
|
||||
|
||||
#### Effect Commands:
|
||||
- `/effects list` - Show all your current effect settings
|
||||
- `/effects set <effect> <value>` - Change an effect value
|
||||
- `/effects reset` - Reset all effects to defaults (with confirmation)
|
||||
|
||||
#### Effect Application Order:
|
||||
Effects are applied in this sequence:
|
||||
1. Pitch shift
|
||||
2. Speed change
|
||||
3. Echo/Reverb
|
||||
4. Chorus
|
||||
5. Tremolo
|
||||
6. Robot voice
|
||||
|
||||
#### Performance Notes:
|
||||
- **No limit** on number of active effects
|
||||
- ⚠️ Warning shown when you have more than 2 active effects
|
||||
- More effects = longer processing time
|
||||
- Some effects (like pitch shift and chorus) are more CPU-intensive
|
||||
- Processing time is logged to console for monitoring
|
||||
|
||||
### Preview with Effects
|
||||
|
||||
Test any combination of voice and effects before committing:
|
||||
|
||||
**Preview a voice:**
|
||||
- `/voice preview <voice_name>` - Preview with your current effects
|
||||
|
||||
**Preview with specific effects:**
|
||||
- `/voice preview <voice_name> pitch:5 speed:1.5` - Preview with pitch +5 and 1.5x speed
|
||||
- All effect parameters are optional and default to your current settings
|
||||
|
||||
**Example combinations to try:**
|
||||
- Robot voice: `/effects set robot 75`
|
||||
- Deep scary voice: `/effects set pitch -8`
|
||||
- Fast chipmunk: `/effects set pitch 8 speed:1.5`
|
||||
- Radio announcer: `/effects set echo 40 tremolo_depth:0.3 tremolo_rate:4`
|
||||
|
||||
## How It Works
|
||||
|
||||
|
||||
345
audio_effects.py
Normal file
345
audio_effects.py
Normal file
@@ -0,0 +1,345 @@
|
||||
"""Audio effects processing for TTS output."""
|
||||
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import librosa
|
||||
import numpy as np
|
||||
|
||||
|
||||
class AudioEffects:
|
||||
"""Apply post-processing effects to TTS audio."""
|
||||
|
||||
# No limit on effects, but warnings shown when > 2 active
|
||||
MAX_ACTIVE_EFFECTS = None
|
||||
|
||||
# Effect ranges and defaults
|
||||
PITCH_MIN = -12
|
||||
PITCH_MAX = 12
|
||||
PITCH_DEFAULT = 0
|
||||
|
||||
SPEED_MIN = 0.5
|
||||
SPEED_MAX = 2.0
|
||||
SPEED_DEFAULT = 1.0
|
||||
|
||||
ECHO_MIN = 0
|
||||
ECHO_MAX = 100
|
||||
ECHO_DEFAULT = 0
|
||||
|
||||
ROBOT_MIN = 0
|
||||
ROBOT_MAX = 100
|
||||
ROBOT_DEFAULT = 0
|
||||
|
||||
CHORUS_MIN = 0
|
||||
CHORUS_MAX = 100
|
||||
CHORUS_DEFAULT = 0
|
||||
|
||||
TREMOLO_DEPTH_MIN = 0.0
|
||||
TREMOLO_DEPTH_MAX = 1.0
|
||||
TREMOLO_DEPTH_DEFAULT = 0.0
|
||||
|
||||
TREMOLO_RATE_MIN = 0.0
|
||||
TREMOLO_RATE_MAX = 10.0
|
||||
TREMOLO_RATE_DEFAULT = 0.0
|
||||
|
||||
@classmethod
|
||||
def apply_effects(
|
||||
cls,
|
||||
audio: np.ndarray,
|
||||
sr: int,
|
||||
pitch: int = PITCH_DEFAULT,
|
||||
speed: float = SPEED_DEFAULT,
|
||||
echo: int = ECHO_DEFAULT,
|
||||
robot: int = ROBOT_DEFAULT,
|
||||
chorus: int = CHORUS_DEFAULT,
|
||||
tremolo_depth: float = TREMOLO_DEPTH_DEFAULT,
|
||||
tremolo_rate: float = TREMOLO_RATE_DEFAULT,
|
||||
) -> tuple[np.ndarray, bool]:
|
||||
"""
|
||||
Apply effects to audio in order: pitch → speed → echo → chorus → tremolo → robot
|
||||
|
||||
Args:
|
||||
audio: Input audio array (1D)
|
||||
sr: Sample rate
|
||||
pitch: Pitch shift in semitones (-12 to +12, 0 = no shift)
|
||||
speed: Speed multiplier (0.5 to 2.0, 1.0 = normal)
|
||||
echo: Echo intensity (0-100, 0 = no echo)
|
||||
robot: Robot voice intensity (0-100, 0 = no robot)
|
||||
chorus: Chorus intensity (0-100, 0 = no chorus)
|
||||
tremolo_depth: Tremolo depth (0.0-1.0, 0.0 = no tremolo)
|
||||
tremolo_rate: Tremolo rate in Hz (0.0-10.0)
|
||||
|
||||
Returns:
|
||||
Tuple of (processed_audio, show_processing_message)
|
||||
show_processing_message is True if processing took > 1 second
|
||||
"""
|
||||
start_time = time.time()
|
||||
original_length = len(audio)
|
||||
|
||||
# Validate inputs
|
||||
pitch = max(cls.PITCH_MIN, min(cls.PITCH_MAX, pitch))
|
||||
speed = max(cls.SPEED_MIN, min(cls.SPEED_MAX, speed))
|
||||
echo = max(cls.ECHO_MIN, min(cls.ECHO_MAX, echo))
|
||||
robot = max(cls.ROBOT_MIN, min(cls.ROBOT_MAX, robot))
|
||||
chorus = max(cls.CHORUS_MIN, min(cls.CHORUS_MAX, chorus))
|
||||
tremolo_depth = max(cls.TREMOLO_DEPTH_MIN, min(cls.TREMOLO_DEPTH_MAX, tremolo_depth))
|
||||
tremolo_rate = max(cls.TREMOLO_RATE_MIN, min(cls.TREMOLO_RATE_MAX, tremolo_rate))
|
||||
|
||||
# Apply pitch shift first
|
||||
if pitch != cls.PITCH_DEFAULT:
|
||||
print(f" Applying pitch shift: {pitch:+d} semitones...")
|
||||
audio = librosa.effects.pitch_shift(
|
||||
audio, sr=sr, n_steps=pitch, bins_per_octave=12
|
||||
)
|
||||
|
||||
# Apply speed change second
|
||||
if speed != cls.SPEED_DEFAULT:
|
||||
print(f" Applying speed change: {speed:.1f}x...")
|
||||
audio = librosa.effects.time_stretch(audio, rate=speed)
|
||||
|
||||
# Apply echo third
|
||||
if echo > 0:
|
||||
print(f" Applying echo: {echo}%...")
|
||||
audio = cls._apply_echo(audio, sr, echo)
|
||||
|
||||
# Apply chorus fourth
|
||||
if chorus > 0:
|
||||
print(f" Applying chorus: {chorus}%...")
|
||||
audio = cls._apply_chorus(audio, sr, chorus)
|
||||
|
||||
# Apply tremolo fifth
|
||||
if tremolo_depth > 0 and tremolo_rate > 0:
|
||||
print(f" Applying tremolo: depth={tremolo_depth:.1f}, rate={tremolo_rate:.1f}Hz...")
|
||||
audio = cls._apply_tremolo(audio, sr, tremolo_depth, tremolo_rate)
|
||||
|
||||
# Apply robot voice last
|
||||
if robot > 0:
|
||||
print(f" Applying robot effect: {robot}%...")
|
||||
audio = cls._apply_robot(audio, sr, robot)
|
||||
|
||||
processing_time = time.time() - start_time
|
||||
print(f" Effects applied in {processing_time:.2f}s")
|
||||
|
||||
# Show processing message if it took more than 1 second
|
||||
show_message = processing_time > 1.0
|
||||
|
||||
return audio, show_message
|
||||
|
||||
@classmethod
|
||||
def _apply_echo(cls, audio: np.ndarray, sr: int, intensity: int) -> np.ndarray:
|
||||
"""Apply simple echo/reverb effect."""
|
||||
if intensity == 0:
|
||||
return audio
|
||||
|
||||
# Calculate delay in samples (50-300ms based on intensity)
|
||||
delay_ms = 50 + (intensity / 100) * 250
|
||||
delay_samples = int((delay_ms / 1000) * sr)
|
||||
|
||||
# Create output array
|
||||
output = np.copy(audio)
|
||||
|
||||
# Add delayed copy with decay
|
||||
decay = 0.3 + (intensity / 100) * 0.4 # 0.3-0.7 decay factor
|
||||
if delay_samples < len(audio):
|
||||
output[delay_samples:] += audio[:-delay_samples] * decay
|
||||
|
||||
# Normalize
|
||||
max_val = np.max(np.abs(output))
|
||||
if max_val > 0:
|
||||
output = output / max_val * np.max(np.abs(audio))
|
||||
|
||||
return output
|
||||
|
||||
@classmethod
|
||||
def _apply_chorus(cls, audio: np.ndarray, sr: int, intensity: int) -> np.ndarray:
|
||||
"""Apply chorus effect using multiple delayed voices."""
|
||||
if intensity == 0:
|
||||
return audio
|
||||
|
||||
# Number of voices based on intensity (1-3)
|
||||
num_voices = 1 + int((intensity / 100) * 2)
|
||||
|
||||
# Base delay (15-30ms)
|
||||
base_delay_ms = 15 + (intensity / 100) * 15
|
||||
base_delay_samples = int((base_delay_ms / 1000) * sr)
|
||||
|
||||
output = np.copy(audio) * 0.6 # Reduce original to make room for voices
|
||||
|
||||
for i in range(num_voices):
|
||||
# Slight pitch variation for each voice (±3%)
|
||||
pitch_var = 1.0 + (0.03 * (i - 1))
|
||||
try:
|
||||
voice = librosa.effects.time_stretch(audio, rate=pitch_var)
|
||||
|
||||
# Slight delay variation
|
||||
delay_samples = base_delay_samples + int((i * 5 / 1000) * sr)
|
||||
|
||||
# Mix voice into output
|
||||
voice_len = min(len(voice), len(output) - delay_samples)
|
||||
if voice_len > 0:
|
||||
output[delay_samples:delay_samples + voice_len] += voice[:voice_len] * 0.2
|
||||
except Exception as e:
|
||||
print(f" Warning: Chorus voice {i+1} failed: {e}")
|
||||
|
||||
# Normalize
|
||||
max_val = np.max(np.abs(output))
|
||||
if max_val > 0:
|
||||
output = output / max_val * 0.95
|
||||
|
||||
return output
|
||||
|
||||
@classmethod
|
||||
def _apply_tremolo(cls, audio: np.ndarray, sr: int, depth: float, rate: float) -> np.ndarray:
|
||||
"""Apply tremolo effect (amplitude modulation)."""
|
||||
if depth == 0 or rate == 0:
|
||||
return audio
|
||||
|
||||
# Create modulation signal
|
||||
duration = len(audio) / sr
|
||||
t = np.linspace(0, duration, len(audio))
|
||||
|
||||
# Sine wave modulation at specified rate
|
||||
modulation = 1.0 - depth * 0.5 * (1 - np.sin(2 * np.pi * rate * t))
|
||||
|
||||
return audio * modulation
|
||||
|
||||
@classmethod
|
||||
def _apply_robot(cls, audio: np.ndarray, sr: int, intensity: int) -> np.ndarray:
|
||||
"""Apply robot voice effect using ring modulation."""
|
||||
if intensity == 0:
|
||||
return audio
|
||||
|
||||
# Carrier frequency based on intensity (80-300 Hz)
|
||||
carrier_freq = 80 + (intensity / 100) * 220
|
||||
|
||||
# Create carrier signal
|
||||
duration = len(audio) / sr
|
||||
t = np.linspace(0, duration, len(audio))
|
||||
carrier = np.sin(2 * np.pi * carrier_freq * t)
|
||||
|
||||
# Mix original with ring-modulated version based on intensity
|
||||
mix = intensity / 100
|
||||
robot_signal = audio * carrier
|
||||
output = audio * (1 - mix * 0.7) + robot_signal * mix * 0.7
|
||||
|
||||
# Normalize
|
||||
max_val = np.max(np.abs(output))
|
||||
if max_val > 0:
|
||||
output = output / max_val * 0.95
|
||||
|
||||
return output
|
||||
|
||||
@classmethod
|
||||
def validate_effect(cls, effect_name: str, value: Any) -> tuple[bool, str]:
|
||||
"""
|
||||
Validate an effect value.
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_message)
|
||||
"""
|
||||
validators = {
|
||||
"pitch": (int, cls.PITCH_MIN, cls.PITCH_MAX, "Pitch must be a whole number", "semitones"),
|
||||
"speed": (float, cls.SPEED_MIN, cls.SPEED_MAX, "Speed must be a number", "x"),
|
||||
"echo": (int, cls.ECHO_MIN, cls.ECHO_MAX, "Echo must be a whole number", "%"),
|
||||
"robot": (int, cls.ROBOT_MIN, cls.ROBOT_MAX, "Robot must be a whole number", "%"),
|
||||
"chorus": (int, cls.CHORUS_MIN, cls.CHORUS_MAX, "Chorus must be a whole number", "%"),
|
||||
"tremolo_depth": (float, cls.TREMOLO_DEPTH_MIN, cls.TREMOLO_DEPTH_MAX, "Tremolo depth must be a number", ""),
|
||||
"tremolo_rate": (float, cls.TREMOLO_RATE_MIN, cls.TREMOLO_RATE_MAX, "Tremolo rate must be a number", "Hz"),
|
||||
}
|
||||
|
||||
if effect_name not in validators:
|
||||
return False, f"Unknown effect: {effect_name}"
|
||||
|
||||
type_func, min_val, max_val, error_msg, unit = validators[effect_name]
|
||||
|
||||
try:
|
||||
val = type_func(value)
|
||||
if min_val <= val <= max_val:
|
||||
return True, ""
|
||||
unit_str = f" {unit}" if unit else ""
|
||||
return False, f"{effect_name.replace('_', ' ').title()} must be between {min_val} and {max_val}{unit_str}"
|
||||
except (ValueError, TypeError):
|
||||
return False, error_msg
|
||||
|
||||
@classmethod
|
||||
def count_active_effects(cls, **effects) -> int:
|
||||
"""Count how many effects are active (non-default)."""
|
||||
count = 0
|
||||
# Convert values to proper types (JSON stores them as strings)
|
||||
pitch = int(effects.get("pitch", cls.PITCH_DEFAULT))
|
||||
speed = float(effects.get("speed", cls.SPEED_DEFAULT))
|
||||
echo = int(effects.get("echo", cls.ECHO_DEFAULT))
|
||||
robot = int(effects.get("robot", cls.ROBOT_DEFAULT))
|
||||
chorus = int(effects.get("chorus", cls.CHORUS_DEFAULT))
|
||||
tremolo_depth = float(effects.get("tremolo_depth", cls.TREMOLO_DEPTH_DEFAULT))
|
||||
|
||||
if pitch != cls.PITCH_DEFAULT:
|
||||
count += 1
|
||||
if speed != cls.SPEED_DEFAULT:
|
||||
count += 1
|
||||
if echo > cls.ECHO_DEFAULT:
|
||||
count += 1
|
||||
if robot > cls.ROBOT_DEFAULT:
|
||||
count += 1
|
||||
if chorus > cls.CHORUS_DEFAULT:
|
||||
count += 1
|
||||
if tremolo_depth > cls.TREMOLO_DEPTH_DEFAULT:
|
||||
count += 1
|
||||
# tremolo_rate only counts if depth is also active
|
||||
return count
|
||||
|
||||
@classmethod
|
||||
def get_effect_description(cls, effect_name: str) -> str:
|
||||
"""Get a human-readable description of what an effect does."""
|
||||
descriptions = {
|
||||
"pitch": f"Changes voice pitch ({cls.PITCH_MIN} to {cls.PITCH_MAX} semitones). Positive = higher/chipmunk, Negative = lower/deeper.",
|
||||
"speed": f"Changes speech speed ({cls.SPEED_MIN} to {cls.SPEED_MAX}x). Higher = faster, Lower = slower.",
|
||||
"echo": f"Adds echo/reverb ({cls.ECHO_MIN} to {cls.ECHO_MAX}%). Higher = more pronounced echo.",
|
||||
"robot": f"Applies robot voice effect ({cls.ROBOT_MIN} to {cls.ROBOT_MAX}%). Higher = more robotic.",
|
||||
"chorus": f"Adds chorus effect ({cls.CHORUS_MIN} to {cls.CHORUS_MAX}%). Higher = more voices/depth.",
|
||||
"tremolo_depth": f"Tremolo amplitude modulation ({cls.TREMOLO_DEPTH_MIN} to {cls.TREMOLO_DEPTH_MAX}). Higher = more warble.",
|
||||
"tremolo_rate": f"Tremolo speed ({cls.TREMOLO_RATE_MIN} to {cls.TREMOLO_RATE_MAX} Hz). Higher = faster warble.",
|
||||
}
|
||||
return descriptions.get(effect_name, "Unknown effect")
|
||||
|
||||
@classmethod
|
||||
def format_effect_value(cls, effect_name: str, value: Any) -> str:
|
||||
"""Format an effect value for display."""
|
||||
if effect_name == "pitch":
|
||||
pitch = int(value)
|
||||
if pitch == 0:
|
||||
return "0 (normal)"
|
||||
direction = "higher" if pitch > 0 else "lower"
|
||||
return f"{pitch:+d} ({direction})"
|
||||
elif effect_name == "speed":
|
||||
speed = float(value)
|
||||
if speed == 1.0:
|
||||
return "1.0x (normal)"
|
||||
direction = "faster" if speed > 1.0 else "slower"
|
||||
return f"{speed:.1f}x ({direction})"
|
||||
elif effect_name == "echo":
|
||||
echo = int(value)
|
||||
if echo == 0:
|
||||
return "0% (off)"
|
||||
return f"{echo}%"
|
||||
elif effect_name == "robot":
|
||||
robot = int(value)
|
||||
if robot == 0:
|
||||
return "0% (off)"
|
||||
return f"{robot}%"
|
||||
elif effect_name == "chorus":
|
||||
chorus = int(value)
|
||||
if chorus == 0:
|
||||
return "0% (off)"
|
||||
return f"{chorus}%"
|
||||
elif effect_name == "tremolo_depth":
|
||||
depth = float(value)
|
||||
if depth == 0.0:
|
||||
return "0.0 (off)"
|
||||
return f"{depth:.1f}"
|
||||
elif effect_name == "tremolo_rate":
|
||||
rate = float(value)
|
||||
if rate == 0.0:
|
||||
return "0.0 Hz (off)"
|
||||
return f"{rate:.1f} Hz"
|
||||
return str(value)
|
||||
465
bot.py
465
bot.py
@@ -1,3 +1,6 @@
|
||||
__version__ = "1.2.0"
|
||||
|
||||
import random
|
||||
import sys
|
||||
import os
|
||||
|
||||
@@ -21,6 +24,7 @@ import scipy.io.wavfile as wavfile
|
||||
from discord import app_commands
|
||||
from discord.ext import commands
|
||||
|
||||
from audio_effects import AudioEffects
|
||||
from config import Config
|
||||
from voice_manager import VoiceManager
|
||||
|
||||
@@ -28,6 +32,18 @@ from voice_manager import VoiceManager
|
||||
# Inactivity timeout in seconds (10 minutes)
|
||||
INACTIVITY_TIMEOUT = 10 * 60
|
||||
|
||||
# Sample lines for voice preview
|
||||
PREVIEW_LINES = [
|
||||
"Hello! This is how I sound. Choose me as your voice with /voice set.",
|
||||
"Testing, one, two, three! Can you hear me clearly?",
|
||||
"Here's a preview of my voice. Pretty cool, right?",
|
||||
"Greetings! I am ready to speak for you.",
|
||||
"Voice check! This is what I sound like.",
|
||||
"Audio test complete. This voice is ready to go!",
|
||||
"Sample message incoming. How do I sound to you?",
|
||||
"Preview mode activated. Testing speech synthesis.",
|
||||
]
|
||||
|
||||
|
||||
class TTSBot(commands.Bot):
|
||||
"""Discord bot that reads messages aloud using Pocket TTS."""
|
||||
@@ -39,29 +55,50 @@ class TTSBot(commands.Bot):
|
||||
super().__init__(command_prefix="!", intents=intents)
|
||||
|
||||
self.voice_manager = VoiceManager(Config.VOICES_DIR, Config.DEFAULT_VOICE)
|
||||
self.message_queue: asyncio.Queue[tuple[discord.Message, str]] = asyncio.Queue()
|
||||
self.message_queue: asyncio.Queue[tuple[discord.Message, str] | tuple[discord.Message, str, str]] = asyncio.Queue()
|
||||
self.last_activity: float = 0.0
|
||||
|
||||
print("\n=== Command Registration ===")
|
||||
self._setup_slash_commands()
|
||||
self._setup_effects_commands()
|
||||
self._log_registered_commands()
|
||||
print("=== End Command Registration ===\n")
|
||||
|
||||
def _log_registered_commands(self) -> None:
|
||||
"""Log all registered commands to console."""
|
||||
print("\nRegistered commands:")
|
||||
commands = list(self.tree.get_commands())
|
||||
if not commands:
|
||||
print(" ⚠️ No commands registered!")
|
||||
else:
|
||||
for cmd in commands:
|
||||
print(f" ✓ /{cmd.name} - {cmd.description}")
|
||||
print(f"\nTotal commands registered: {len(commands)}")
|
||||
|
||||
def _setup_slash_commands(self) -> None:
|
||||
"""Set up slash commands for voice management."""
|
||||
print("Setting up voice commands...")
|
||||
|
||||
@self.tree.command(name="voice", description="Manage your TTS voice")
|
||||
@app_commands.describe(
|
||||
action="What to do",
|
||||
voice_name="Name of the voice (for 'set' action)"
|
||||
voice_name="Name of the voice (for 'set' or 'preview' action)",
|
||||
preview_pitch="Optional pitch for preview (-12 to 12, default: use your settings)",
|
||||
preview_speed="Optional speed for preview (0.5 to 2.0, default: use your settings)",
|
||||
)
|
||||
@app_commands.choices(action=[
|
||||
app_commands.Choice(name="list", value="list"),
|
||||
app_commands.Choice(name="set", value="set"),
|
||||
app_commands.Choice(name="current", value="current"),
|
||||
app_commands.Choice(name="refresh", value="refresh"),
|
||||
app_commands.Choice(name="preview", value="preview"),
|
||||
])
|
||||
async def voice_command(
|
||||
interaction: discord.Interaction,
|
||||
action: app_commands.Choice[str],
|
||||
voice_name: str | None = None
|
||||
voice_name: str | None = None,
|
||||
preview_pitch: int | None = None,
|
||||
preview_speed: float | None = None,
|
||||
):
|
||||
if action.value == "list":
|
||||
await self._handle_voice_list(interaction)
|
||||
@@ -71,6 +108,8 @@ class TTSBot(commands.Bot):
|
||||
await self._handle_voice_current(interaction)
|
||||
elif action.value == "refresh":
|
||||
await self._handle_voice_refresh(interaction)
|
||||
elif action.value == "preview":
|
||||
await self._handle_voice_preview(interaction, voice_name, preview_pitch, preview_speed)
|
||||
|
||||
@voice_command.autocomplete("voice_name")
|
||||
async def voice_name_autocomplete(
|
||||
@@ -84,6 +123,197 @@ class TTSBot(commands.Bot):
|
||||
if current.lower() in v.lower()
|
||||
][:25]
|
||||
|
||||
def _setup_effects_commands(self) -> None:
|
||||
"""Set up slash commands for audio effects management."""
|
||||
print("Setting up effects commands...")
|
||||
|
||||
@self.tree.command(name="effects", description="Manage your TTS audio effects")
|
||||
@app_commands.describe(
|
||||
action="What to do",
|
||||
effect_name="Name of the effect (for 'set' action)",
|
||||
value="Value for the effect (for 'set' action)"
|
||||
)
|
||||
@app_commands.choices(action=[
|
||||
app_commands.Choice(name="list", value="list"),
|
||||
app_commands.Choice(name="set", value="set"),
|
||||
app_commands.Choice(name="reset", value="reset"),
|
||||
])
|
||||
@app_commands.choices(effect_name=[
|
||||
app_commands.Choice(name="pitch", value="pitch"),
|
||||
app_commands.Choice(name="speed", value="speed"),
|
||||
app_commands.Choice(name="echo", value="echo"),
|
||||
app_commands.Choice(name="robot", value="robot"),
|
||||
app_commands.Choice(name="chorus", value="chorus"),
|
||||
app_commands.Choice(name="tremolo_depth", value="tremolo_depth"),
|
||||
app_commands.Choice(name="tremolo_rate", value="tremolo_rate"),
|
||||
])
|
||||
async def effects_command(
|
||||
interaction: discord.Interaction,
|
||||
action: app_commands.Choice[str],
|
||||
effect_name: app_commands.Choice[str] | None = None,
|
||||
value: str | None = None
|
||||
):
|
||||
if action.value == "list":
|
||||
await self._handle_effects_list(interaction)
|
||||
elif action.value == "set":
|
||||
await self._handle_effects_set(interaction, effect_name, value)
|
||||
elif action.value == "reset":
|
||||
await self._handle_effects_reset(interaction)
|
||||
|
||||
async def _handle_effects_list(self, interaction: discord.Interaction) -> None:
|
||||
"""Handle /effects list command."""
|
||||
effects = self.voice_manager.get_user_effects(interaction.user.id)
|
||||
active_count = self.voice_manager.count_active_effects(interaction.user.id)
|
||||
|
||||
lines = ["**Your Audio Effects:**\n"]
|
||||
|
||||
# Pitch
|
||||
pitch_desc = AudioEffects.get_effect_description("pitch")
|
||||
pitch_val = AudioEffects.format_effect_value("pitch", effects["pitch"])
|
||||
lines.append(f"🎵 **Pitch**: {pitch_val}")
|
||||
lines.append(f" {pitch_desc}\n")
|
||||
|
||||
# Speed
|
||||
speed_desc = AudioEffects.get_effect_description("speed")
|
||||
speed_val = AudioEffects.format_effect_value("speed", effects["speed"])
|
||||
lines.append(f"⚡ **Speed**: {speed_val}")
|
||||
lines.append(f" {speed_desc}\n")
|
||||
|
||||
# Echo
|
||||
echo_desc = AudioEffects.get_effect_description("echo")
|
||||
echo_val = AudioEffects.format_effect_value("echo", effects["echo"])
|
||||
lines.append(f"🔊 **Echo**: {echo_val}")
|
||||
lines.append(f" {echo_desc}\n")
|
||||
|
||||
# Robot
|
||||
robot_desc = AudioEffects.get_effect_description("robot")
|
||||
robot_val = AudioEffects.format_effect_value("robot", effects["robot"])
|
||||
lines.append(f"🤖 **Robot**: {robot_val}")
|
||||
lines.append(f" {robot_desc}\n")
|
||||
|
||||
# Chorus
|
||||
chorus_desc = AudioEffects.get_effect_description("chorus")
|
||||
chorus_val = AudioEffects.format_effect_value("chorus", effects["chorus"])
|
||||
lines.append(f"🎶 **Chorus**: {chorus_val}")
|
||||
lines.append(f" {chorus_desc}\n")
|
||||
|
||||
# Tremolo Depth
|
||||
tremolo_depth_desc = AudioEffects.get_effect_description("tremolo_depth")
|
||||
tremolo_depth_val = AudioEffects.format_effect_value("tremolo_depth", effects["tremolo_depth"])
|
||||
lines.append(f"〰️ **Tremolo Depth**: {tremolo_depth_val}")
|
||||
lines.append(f" {tremolo_depth_desc}\n")
|
||||
|
||||
# Tremolo Rate
|
||||
tremolo_rate_desc = AudioEffects.get_effect_description("tremolo_rate")
|
||||
tremolo_rate_val = AudioEffects.format_effect_value("tremolo_rate", effects["tremolo_rate"])
|
||||
lines.append(f"📳 **Tremolo Rate**: {tremolo_rate_val}")
|
||||
lines.append(f" {tremolo_rate_desc}\n")
|
||||
|
||||
# Active count warning
|
||||
lines.append(f"**Active Effects**: {active_count}")
|
||||
if active_count > 2:
|
||||
lines.append("⚠️ You have more than 2 active effects. Processing may be slower!")
|
||||
elif active_count > 0:
|
||||
lines.append("ℹ️ Add more effects for fun variations (may slow processing)")
|
||||
|
||||
lines.append(f"\n*Use `/effects set <effect> <value>` to change settings*")
|
||||
lines.append(f"*Use `/effects reset` to clear all effects*")
|
||||
|
||||
await interaction.response.send_message(
|
||||
"\n".join(lines),
|
||||
ephemeral=True
|
||||
)
|
||||
|
||||
async def _handle_effects_set(
|
||||
self,
|
||||
interaction: discord.Interaction,
|
||||
effect_name: app_commands.Choice[str] | None,
|
||||
value: str | None
|
||||
) -> None:
|
||||
"""Handle /effects set command."""
|
||||
if not effect_name or value is None:
|
||||
await interaction.response.send_message(
|
||||
"❌ Please provide both effect name and value. Example: `/effects set pitch 3`",
|
||||
ephemeral=True
|
||||
)
|
||||
return
|
||||
|
||||
success, message = self.voice_manager.set_user_effect(
|
||||
interaction.user.id,
|
||||
effect_name.value,
|
||||
value
|
||||
)
|
||||
|
||||
if success:
|
||||
await interaction.response.send_message(
|
||||
f"✅ {message}",
|
||||
ephemeral=True
|
||||
)
|
||||
else:
|
||||
await interaction.response.send_message(
|
||||
f"❌ {message}",
|
||||
ephemeral=True
|
||||
)
|
||||
|
||||
async def _handle_effects_reset(self, interaction: discord.Interaction) -> None:
|
||||
"""Handle /effects reset command with confirmation UI."""
|
||||
# Check if user has any effects to reset
|
||||
active_count = self.voice_manager.count_active_effects(interaction.user.id)
|
||||
|
||||
if active_count == 0:
|
||||
await interaction.response.send_message(
|
||||
"ℹ️ You don't have any active effects to reset.",
|
||||
ephemeral=True
|
||||
)
|
||||
return
|
||||
|
||||
# Create confirmation buttons
|
||||
class ConfirmResetView(discord.ui.View):
|
||||
def __init__(self, voice_manager, user_id):
|
||||
super().__init__(timeout=30)
|
||||
self.voice_manager = voice_manager
|
||||
self.user_id = user_id
|
||||
self.confirmed = False
|
||||
|
||||
@discord.ui.button(label="✅ Yes, Reset All", style=discord.ButtonStyle.danger)
|
||||
async def confirm_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||||
if interaction.user.id != self.user_id:
|
||||
await interaction.response.send_message("This button is not for you!", ephemeral=True)
|
||||
return
|
||||
|
||||
self.voice_manager.reset_user_effects(self.user_id)
|
||||
self.confirmed = True
|
||||
await interaction.response.edit_message(
|
||||
content="✅ All audio effects have been reset to defaults!",
|
||||
view=None
|
||||
)
|
||||
self.stop()
|
||||
|
||||
@discord.ui.button(label="❌ Cancel", style=discord.ButtonStyle.secondary)
|
||||
async def cancel_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||||
if interaction.user.id != self.user_id:
|
||||
await interaction.response.send_message("This button is not for you!", ephemeral=True)
|
||||
return
|
||||
|
||||
await interaction.response.edit_message(
|
||||
content="❌ Reset cancelled. Your effects remain unchanged.",
|
||||
view=None
|
||||
)
|
||||
self.stop()
|
||||
|
||||
view = ConfirmResetView(self.voice_manager, interaction.user.id)
|
||||
|
||||
await interaction.response.send_message(
|
||||
f"⚠️ **Reset Confirmation**\n\n"
|
||||
f"You have {active_count} active effect(s).\n"
|
||||
f"This will reset **all** your audio effects to defaults:\n"
|
||||
f"• Pitch: 0 (normal)\n"
|
||||
f"• Speed: 1.0x (normal)\n\n"
|
||||
f"Are you sure you want to continue?",
|
||||
view=view,
|
||||
ephemeral=True
|
||||
)
|
||||
|
||||
async def _handle_voice_list(self, interaction: discord.Interaction) -> None:
|
||||
"""Handle /voice list command."""
|
||||
voices = self.voice_manager.get_available_voices()
|
||||
@@ -204,6 +434,113 @@ class TTSBot(commands.Bot):
|
||||
ephemeral=True
|
||||
)
|
||||
|
||||
async def _handle_voice_preview(
|
||||
self,
|
||||
interaction: discord.Interaction,
|
||||
voice_name: str | None,
|
||||
preview_pitch: int | None = None,
|
||||
preview_speed: float | None = None,
|
||||
) -> None:
|
||||
"""Handle /voice preview command."""
|
||||
if not voice_name:
|
||||
await interaction.response.send_message(
|
||||
"❌ Please provide a voice name. Use `/voice list` to see available voices.",
|
||||
ephemeral=True
|
||||
)
|
||||
return
|
||||
|
||||
# Check if user is in a voice channel
|
||||
if interaction.user.voice is None:
|
||||
await interaction.response.send_message(
|
||||
"❌ You need to be in a voice channel to hear a preview!",
|
||||
ephemeral=True
|
||||
)
|
||||
return
|
||||
|
||||
voice_name = voice_name.lower()
|
||||
|
||||
# Validate voice exists
|
||||
if not self.voice_manager.is_voice_available(voice_name):
|
||||
voices = self.voice_manager.get_available_voices()
|
||||
await interaction.response.send_message(
|
||||
f"❌ Voice `{voice_name}` not found.\n"
|
||||
f"Available voices: {', '.join(f'`{v}`' for v in voices)}",
|
||||
ephemeral=True
|
||||
)
|
||||
return
|
||||
|
||||
# Validate pitch if provided
|
||||
if preview_pitch is not None:
|
||||
is_valid, error_msg = AudioEffects.validate_effect("pitch", preview_pitch)
|
||||
if not is_valid:
|
||||
await interaction.response.send_message(
|
||||
f"❌ Invalid pitch value: {error_msg}",
|
||||
ephemeral=True
|
||||
)
|
||||
return
|
||||
|
||||
# Validate speed if provided
|
||||
if preview_speed is not None:
|
||||
is_valid, error_msg = AudioEffects.validate_effect("speed", preview_speed)
|
||||
if not is_valid:
|
||||
await interaction.response.send_message(
|
||||
f"❌ Invalid speed value: {error_msg}",
|
||||
ephemeral=True
|
||||
)
|
||||
return
|
||||
|
||||
# Select a random preview line
|
||||
preview_text = random.choice(PREVIEW_LINES)
|
||||
|
||||
# Create a preview message object with all necessary attributes
|
||||
class PreviewMessage:
|
||||
def __init__(self, user, channel, voice_channel):
|
||||
self.author = user
|
||||
self.channel = channel
|
||||
self._voice_channel = voice_channel
|
||||
|
||||
@property
|
||||
def voice(self):
|
||||
class VoiceState:
|
||||
def __init__(self, channel):
|
||||
self.channel = channel
|
||||
return VoiceState(self._voice_channel)
|
||||
|
||||
preview_message = PreviewMessage(
|
||||
interaction.user,
|
||||
interaction.channel,
|
||||
interaction.user.voice.channel
|
||||
)
|
||||
|
||||
# Use user's current effects if not overridden
|
||||
user_effects = self.voice_manager.get_user_effects(interaction.user.id)
|
||||
effect_overrides = {}
|
||||
if preview_pitch is not None:
|
||||
effect_overrides["pitch"] = preview_pitch
|
||||
if preview_speed is not None:
|
||||
effect_overrides["speed"] = preview_speed
|
||||
|
||||
# Use default effects from user settings for preview
|
||||
preview_effects = user_effects.copy()
|
||||
preview_effects.update(effect_overrides)
|
||||
|
||||
# Queue the preview with voice override and effects
|
||||
await self.message_queue.put((preview_message, preview_text, voice_name, preview_effects))
|
||||
|
||||
# Build effect description
|
||||
effect_desc = []
|
||||
if preview_effects.get("pitch", 0) != 0:
|
||||
effect_desc.append(f"pitch: {preview_effects['pitch']:+d}")
|
||||
if preview_effects.get("speed", 1.0) != 1.0:
|
||||
effect_desc.append(f"speed: {preview_effects['speed']:.1f}x")
|
||||
|
||||
effect_str = f" (with {', '.join(effect_desc)})" if effect_desc else ""
|
||||
|
||||
await interaction.response.send_message(
|
||||
f"⏳ Queued preview for `{voice_name}`{effect_str}. Sample: \"{preview_text[:50]}{'...' if len(preview_text) > 50 else ''}\"",
|
||||
ephemeral=True
|
||||
)
|
||||
|
||||
async def setup_hook(self) -> None:
|
||||
"""Called when the bot is starting up."""
|
||||
print("Initializing TTS...")
|
||||
@@ -220,16 +557,50 @@ class TTSBot(commands.Bot):
|
||||
self.loop.create_task(self.process_queue())
|
||||
self.loop.create_task(self.check_inactivity())
|
||||
|
||||
# Sync slash commands
|
||||
print("Syncing slash commands...")
|
||||
await self.tree.sync()
|
||||
print("Slash commands synced!")
|
||||
|
||||
async def on_ready(self) -> None:
|
||||
print(f"Logged in as {self.user}")
|
||||
print(f"Bot ID: {self.user.id}")
|
||||
print(f"Monitoring channel ID: {Config.TEXT_CHANNEL_ID}")
|
||||
print(f"Available voices: {', '.join(self.voice_manager.get_available_voices())}")
|
||||
print("Bot is ready!")
|
||||
|
||||
# Log registered commands before sync
|
||||
registered_cmds = list(self.tree.get_commands())
|
||||
print(f"\nCommands in tree before sync: {len(registered_cmds)}")
|
||||
for cmd in registered_cmds:
|
||||
print(f" - /{cmd.name}")
|
||||
|
||||
# Sync slash commands to each guild for immediate availability
|
||||
print(f"\nConnected to {len(self.guilds)} guild(s):")
|
||||
for guild in self.guilds:
|
||||
print(f" - {guild.name} (ID: {guild.id})")
|
||||
|
||||
print("\nSyncing slash commands to guilds...")
|
||||
sync_count = 0
|
||||
for guild in self.guilds:
|
||||
try:
|
||||
# Copy global commands to this guild before syncing
|
||||
# This is necessary for guild-specific command registration
|
||||
self.tree.copy_global_to(guild=discord.Object(guild.id))
|
||||
print(f" 📋 Copied global commands to guild: {guild.name}")
|
||||
|
||||
synced = await self.tree.sync(guild=discord.Object(guild.id))
|
||||
print(f" ✓ Synced {len(synced)} commands to guild: {guild.name}")
|
||||
for cmd in synced:
|
||||
print(f" - /{cmd.name}")
|
||||
sync_count += 1
|
||||
except discord.errors.Forbidden as e:
|
||||
print(f" ✗ Forbidden: Cannot sync to guild {guild.name}. Missing 'applications.commands' scope!")
|
||||
print(f" Error: {e}")
|
||||
except Exception as e:
|
||||
print(f" ✗ Failed to sync to guild {guild.name}: {type(e).__name__}: {e}")
|
||||
|
||||
if sync_count == 0:
|
||||
print("\n⚠️ WARNING: No guilds were synced! Commands won't appear in Discord.")
|
||||
print(" Make sure the bot was invited with 'applications.commands' scope.")
|
||||
else:
|
||||
print(f"\n✓ Successfully synced to {sync_count}/{len(self.guilds)} guild(s)")
|
||||
|
||||
print("\nBot is ready!")
|
||||
|
||||
async def on_message(self, message: discord.Message) -> None:
|
||||
if message.author.bot:
|
||||
@@ -256,16 +627,36 @@ class TTSBot(commands.Bot):
|
||||
async def process_queue(self) -> None:
|
||||
"""Process messages from the queue one at a time."""
|
||||
while True:
|
||||
message, text = await self.message_queue.get()
|
||||
queue_item = await self.message_queue.get()
|
||||
|
||||
# Handle queue items:
|
||||
# - (message, text) - regular message
|
||||
# - (message, text, voice_override) - preview with voice override
|
||||
# - (message, text, voice_override, effects_dict) - preview with effect overrides
|
||||
if len(queue_item) == 4 and isinstance(queue_item[3], dict):
|
||||
message, text, voice_override, effect_overrides = queue_item
|
||||
elif len(queue_item) == 3:
|
||||
message, text, voice_override = queue_item
|
||||
effect_overrides = {}
|
||||
else:
|
||||
message, text = queue_item
|
||||
voice_override = None
|
||||
effect_overrides = {}
|
||||
|
||||
try:
|
||||
await self.speak_message(message, text)
|
||||
await self.speak_message(message, text, voice_override, effect_overrides)
|
||||
except Exception as e:
|
||||
print(f"Error processing message: {e}")
|
||||
finally:
|
||||
self.message_queue.task_done()
|
||||
|
||||
async def speak_message(self, message: discord.Message, text: str) -> None:
|
||||
async def speak_message(
|
||||
self,
|
||||
message: discord.Message,
|
||||
text: str,
|
||||
voice_override: str | None = None,
|
||||
effect_overrides: dict | None = None,
|
||||
) -> None:
|
||||
"""Generate TTS and play it in the user's voice channel."""
|
||||
if message.author.voice is None:
|
||||
return
|
||||
@@ -278,22 +669,34 @@ class TTSBot(commands.Bot):
|
||||
|
||||
print(f"Generating TTS for: {text[:50]}...")
|
||||
|
||||
# Get user's voice (loads on-demand if needed)
|
||||
user_id = message.author.id
|
||||
# Get voice state (use override for previews, otherwise user's voice)
|
||||
try:
|
||||
if voice_override:
|
||||
voice_state = await asyncio.to_thread(
|
||||
self.voice_manager.get_voice_state, voice_override
|
||||
)
|
||||
else:
|
||||
user_id = message.author.id
|
||||
voice_state = await asyncio.to_thread(
|
||||
self.voice_manager.get_user_voice_state, user_id
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error loading voice for user {user_id}: {e}")
|
||||
print(f"Error loading voice: {e}")
|
||||
if not voice_override:
|
||||
await message.channel.send(
|
||||
f"{message.author.mention}, failed to load your voice. Use `/voice set` to choose a voice.",
|
||||
delete_after=5
|
||||
)
|
||||
return
|
||||
|
||||
# Get user's effects and apply any overrides
|
||||
user_effects = self.voice_manager.get_user_effects(message.author.id)
|
||||
effects = user_effects.copy()
|
||||
if effect_overrides:
|
||||
effects.update(effect_overrides)
|
||||
|
||||
wav_bytes = await asyncio.to_thread(
|
||||
self._generate_wav_bytes, voice_state, text
|
||||
self._generate_wav_bytes, voice_state, text, effects
|
||||
)
|
||||
|
||||
audio_source = discord.FFmpegPCMAudio(
|
||||
@@ -318,7 +721,12 @@ class TTSBot(commands.Bot):
|
||||
|
||||
await play_complete.wait()
|
||||
|
||||
def _generate_wav_bytes(self, voice_state: Any, text: str) -> bytes:
|
||||
def _generate_wav_bytes(
|
||||
self,
|
||||
voice_state: Any,
|
||||
text: str,
|
||||
effects: dict,
|
||||
) -> bytes:
|
||||
"""Generate audio and return as WAV file bytes."""
|
||||
model = self.voice_manager.model
|
||||
if model is None:
|
||||
@@ -327,9 +735,32 @@ class TTSBot(commands.Bot):
|
||||
audio = model.generate_audio(voice_state, text)
|
||||
audio_np = audio.numpy()
|
||||
|
||||
# Ensure audio is 2D [samples, channels] for storage
|
||||
if audio_np.ndim == 1:
|
||||
audio_np = audio_np.reshape(-1, 1)
|
||||
|
||||
# Apply audio effects if any are active
|
||||
pitch = effects.get("pitch", AudioEffects.PITCH_DEFAULT)
|
||||
speed = effects.get("speed", AudioEffects.SPEED_DEFAULT)
|
||||
echo = effects.get("echo", AudioEffects.ECHO_DEFAULT)
|
||||
robot = effects.get("robot", AudioEffects.ROBOT_DEFAULT)
|
||||
chorus = effects.get("chorus", AudioEffects.CHORUS_DEFAULT)
|
||||
tremolo_depth = effects.get("tremolo_depth", AudioEffects.TREMOLO_DEPTH_DEFAULT)
|
||||
tremolo_rate = effects.get("tremolo_rate", AudioEffects.TREMOLO_RATE_DEFAULT)
|
||||
|
||||
if any([pitch != 0, speed != 1.0, echo > 0, robot > 0, chorus > 0, tremolo_depth > 0]):
|
||||
print(f"Applying {AudioEffects.count_active_effects(**effects)} effect(s)...")
|
||||
# Squeeze to 1D for librosa effects, then reshape back
|
||||
audio_1d = audio_np.squeeze()
|
||||
audio_1d, show_processing = AudioEffects.apply_effects(
|
||||
audio_1d, model.sample_rate,
|
||||
pitch, speed, echo, robot, chorus, tremolo_depth, tremolo_rate
|
||||
)
|
||||
# Reshape back to 2D
|
||||
audio_np = audio_1d.reshape(-1, 1)
|
||||
if show_processing:
|
||||
print("⚠️ Audio processing took longer than expected due to effects")
|
||||
|
||||
max_val = np.max(np.abs(audio_np))
|
||||
if max_val > 0:
|
||||
audio_np = audio_np / max_val
|
||||
|
||||
124
voice_manager.py
124
voice_manager.py
@@ -6,6 +6,7 @@ from typing import Any
|
||||
|
||||
from pocket_tts import TTSModel
|
||||
|
||||
from audio_effects import AudioEffects
|
||||
from audio_preprocessor import (
|
||||
AudioPreprocessor,
|
||||
PreprocessingConfig,
|
||||
@@ -26,6 +27,8 @@ class VoiceManager:
|
||||
self._voice_states: dict[str, Any] = {}
|
||||
# Per-user voice preferences: user_id -> voice_name
|
||||
self._user_voices: dict[int, str] = {}
|
||||
# Per-user audio effects: user_id -> {"pitch": int, "speed": float}
|
||||
self._user_effects: dict[int, dict[str, Any]] = {}
|
||||
# Available voices: voice_name -> file_path
|
||||
self._available_voices: dict[str, Path] = {}
|
||||
|
||||
@@ -181,10 +184,129 @@ class VoiceManager:
|
||||
self.preferences_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
data = {
|
||||
"user_voices": {str(k): v for k, v in self._user_voices.items()}
|
||||
"user_voices": {str(k): v for k, v in self._user_voices.items()},
|
||||
"user_effects": {str(k): v for k, v in self._user_effects.items()},
|
||||
}
|
||||
|
||||
with open(self.preferences_file, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to save preferences: {e}")
|
||||
|
||||
# Effects management methods
|
||||
|
||||
def get_user_effects(self, user_id: int) -> dict[str, int | float]:
|
||||
"""Get the audio effects for a user. Returns defaults if not set."""
|
||||
effects = self._user_effects.get(user_id, {})
|
||||
# Convert to proper types (JSON stores them as strings)
|
||||
pitch = effects.get("pitch", AudioEffects.PITCH_DEFAULT)
|
||||
speed = effects.get("speed", AudioEffects.SPEED_DEFAULT)
|
||||
echo = effects.get("echo", AudioEffects.ECHO_DEFAULT)
|
||||
robot = effects.get("robot", AudioEffects.ROBOT_DEFAULT)
|
||||
chorus = effects.get("chorus", AudioEffects.CHORUS_DEFAULT)
|
||||
tremolo_depth = effects.get("tremolo_depth", AudioEffects.TREMOLO_DEPTH_DEFAULT)
|
||||
tremolo_rate = effects.get("tremolo_rate", AudioEffects.TREMOLO_RATE_DEFAULT)
|
||||
|
||||
return {
|
||||
"pitch": int(pitch) if pitch is not None else AudioEffects.PITCH_DEFAULT,
|
||||
"speed": float(speed) if speed is not None else AudioEffects.SPEED_DEFAULT,
|
||||
"echo": int(echo) if echo is not None else AudioEffects.ECHO_DEFAULT,
|
||||
"robot": int(robot) if robot is not None else AudioEffects.ROBOT_DEFAULT,
|
||||
"chorus": int(chorus) if chorus is not None else AudioEffects.CHORUS_DEFAULT,
|
||||
"tremolo_depth": float(tremolo_depth) if tremolo_depth is not None else AudioEffects.TREMOLO_DEPTH_DEFAULT,
|
||||
"tremolo_rate": float(tremolo_rate) if tremolo_rate is not None else AudioEffects.TREMOLO_RATE_DEFAULT,
|
||||
}
|
||||
|
||||
def set_user_effect(self, user_id: int, effect_name: str, value: Any) -> tuple[bool, str]:
|
||||
"""
|
||||
Set an audio effect for a user.
|
||||
|
||||
Returns:
|
||||
Tuple of (success, message)
|
||||
"""
|
||||
# Validate the effect
|
||||
is_valid, error_msg = AudioEffects.validate_effect(effect_name, value)
|
||||
if not is_valid:
|
||||
return False, error_msg
|
||||
|
||||
# Get current effects
|
||||
if user_id not in self._user_effects:
|
||||
self._user_effects[user_id] = {}
|
||||
|
||||
# Save the effect
|
||||
current_effects = self._user_effects[user_id].copy()
|
||||
if effect_name == "pitch":
|
||||
current_effects["pitch"] = int(value)
|
||||
elif effect_name == "speed":
|
||||
current_effects["speed"] = float(value)
|
||||
elif effect_name == "echo":
|
||||
current_effects["echo"] = int(value)
|
||||
elif effect_name == "robot":
|
||||
current_effects["robot"] = int(value)
|
||||
elif effect_name == "chorus":
|
||||
current_effects["chorus"] = int(value)
|
||||
elif effect_name == "tremolo_depth":
|
||||
current_effects["tremolo_depth"] = float(value)
|
||||
elif effect_name == "tremolo_rate":
|
||||
current_effects["tremolo_rate"] = float(value)
|
||||
|
||||
# Count active effects and show warning if > 2
|
||||
active_count = AudioEffects.count_active_effects(
|
||||
pitch=current_effects.get("pitch", AudioEffects.PITCH_DEFAULT),
|
||||
speed=current_effects.get("speed", AudioEffects.SPEED_DEFAULT),
|
||||
echo=current_effects.get("echo", AudioEffects.ECHO_DEFAULT),
|
||||
robot=current_effects.get("robot", AudioEffects.ROBOT_DEFAULT),
|
||||
chorus=current_effects.get("chorus", AudioEffects.CHORUS_DEFAULT),
|
||||
tremolo_depth=current_effects.get("tremolo_depth", AudioEffects.TREMOLO_DEPTH_DEFAULT),
|
||||
)
|
||||
self._user_effects[user_id][effect_name] = value
|
||||
self._save_preferences()
|
||||
|
||||
if active_count > 2:
|
||||
return True, f"Effect applied! ⚠️ You have {active_count} active effects. Performance may be slower with more effects."
|
||||
else:
|
||||
return True, "Effect applied successfully!"
|
||||
|
||||
def reset_user_effects(self, user_id: int) -> None:
|
||||
"""Reset all audio effects to defaults for a user."""
|
||||
if user_id in self._user_effects:
|
||||
del self._user_effects[user_id]
|
||||
self._save_preferences()
|
||||
|
||||
def count_active_effects(self, user_id: int) -> int:
|
||||
"""Count how many effects are active for a user."""
|
||||
effects = self.get_user_effects(user_id)
|
||||
return AudioEffects.count_active_effects(
|
||||
pitch=effects["pitch"],
|
||||
speed=effects["speed"],
|
||||
echo=effects["echo"],
|
||||
robot=effects["robot"],
|
||||
chorus=effects["chorus"],
|
||||
tremolo_depth=effects["tremolo_depth"],
|
||||
)
|
||||
|
||||
def _load_preferences(self) -> None:
|
||||
"""Load user voice preferences from JSON file."""
|
||||
if not self.preferences_file.exists():
|
||||
return
|
||||
|
||||
try:
|
||||
with open(self.preferences_file, "r") as f:
|
||||
data = json.load(f)
|
||||
|
||||
# Load user preferences (convert string keys back to int)
|
||||
for user_id_str, voice_name in data.get("user_voices", {}).items():
|
||||
user_id = int(user_id_str)
|
||||
# Only load if voice still exists
|
||||
if voice_name.lower() in self._available_voices:
|
||||
self._user_voices[user_id] = voice_name.lower()
|
||||
|
||||
# Load user effects (convert string keys back to int)
|
||||
for user_id_str, effects in data.get("user_effects", {}).items():
|
||||
user_id = int(user_id_str)
|
||||
self._user_effects[user_id] = effects
|
||||
|
||||
print(f" Loaded {len(self._user_voices)} user voice preferences")
|
||||
print(f" Loaded {len(self._user_effects)} user effect preferences")
|
||||
except Exception as e:
|
||||
print(f" Warning: Failed to load preferences: {e}")
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
{
|
||||
"user_voices": {
|
||||
"122139828182712322": "gibralter_good"
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user