Compare commits
23 Commits
ae1c2a65d3
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 9917d44f5d | |||
| 85a334a57b | |||
| 40843e4ac9 | |||
| 7e76deed3d | |||
| 795d5087e9 | |||
| 8d4ac59f73 | |||
| 68bc3b2c7d | |||
| 4cb0a78486 | |||
| b12639a618 | |||
| f082c62a16 | |||
| 85f3e79d2a | |||
| 9f14e8c745 | |||
| 4a2d72517f | |||
| 2403b431e9 | |||
| c0e5d4bcb6 | |||
| c5e3fd33c4 | |||
| d0de47bdd7 | |||
| 9e537b7d20 | |||
| d40f895e2a | |||
| a46ddc9b21 | |||
| 736a819493 | |||
| c69028a970 | |||
| 92dfcb1d39 |
19
.env.example
Normal file → Executable file
19
.env.example
Normal file → Executable file
@@ -1,9 +1,16 @@
|
|||||||
# Discord Bot Token (from Discord Developer Portal)
|
# Discord Bot Configuration
|
||||||
DISCORD_TOKEN=your_discord_bot_token_here
|
# Copy this file to .env and fill in your values
|
||||||
|
|
||||||
# Channel ID to monitor for TTS messages
|
# Your Discord bot token (from Discord Developer Portal)
|
||||||
# Right-click the channel in Discord and copy ID (enable Developer Mode in settings)
|
DISCORD_TOKEN=your_bot_token_here
|
||||||
|
|
||||||
|
# The text channel ID to monitor for messages
|
||||||
|
# (Right-click channel with Developer Mode enabled -> Copy ID)
|
||||||
TEXT_CHANNEL_ID=123456789012345678
|
TEXT_CHANNEL_ID=123456789012345678
|
||||||
|
|
||||||
# Path to the voice reference WAV file for voice cloning
|
# Directory containing voice .wav files
|
||||||
VOICE_WAV_PATH=./voice.wav
|
VOICES_DIR=./voices
|
||||||
|
|
||||||
|
# Default voice name (optional - uses first found voice if not set)
|
||||||
|
# This should match the filename without .wav extension (case-insensitive)
|
||||||
|
# DEFAULT_VOICE=masterchief
|
||||||
|
|||||||
21
.env.testing
Executable file
21
.env.testing
Executable file
@@ -0,0 +1,21 @@
|
|||||||
|
# Discord Bot Configuration
|
||||||
|
# Testing environment configuration
|
||||||
|
# This file is used when running: python bot.py testing
|
||||||
|
|
||||||
|
# Your Discord bot token (from Discord Developer Portal) - use a DIFFERENT bot for testing!
|
||||||
|
DISCORD_TOKEN=MTQyNDU3MjA4MjI1MTEwODQyNQ.GJ8iyw.B2O1nlAsw6AlRz3YR5eSN-OcHm4j1l7lEHzxY0
|
||||||
|
|
||||||
|
# The text channel ID to monitor for messages
|
||||||
|
# (Right-click channel with Developer Mode enabled -> Copy ID)
|
||||||
|
# Use a DIFFERENT channel for testing!
|
||||||
|
TEXT_CHANNEL_ID=1424585470616146061
|
||||||
|
|
||||||
|
# Directory containing voice .wav files
|
||||||
|
VOICES_DIR=./voices
|
||||||
|
|
||||||
|
# Default voice name (optional - uses first found voice if not set)
|
||||||
|
# This should match the filename without .wav extension (case-insensitive)
|
||||||
|
# DEFAULT_VOICE=masterchief
|
||||||
|
|
||||||
|
# HuggingFace cache directory (must be writable)
|
||||||
|
HF_HOME=/tmp/huggingface
|
||||||
5
.gitignore
vendored
Normal file → Executable file
5
.gitignore
vendored
Normal file → Executable file
@@ -117,10 +117,15 @@ dmypy.json
|
|||||||
.venv
|
.venv
|
||||||
env/
|
env/
|
||||||
venv/
|
venv/
|
||||||
|
linux_venv/
|
||||||
ENV/
|
ENV/
|
||||||
env.bak/
|
env.bak/
|
||||||
venv.bak/
|
venv.bak/
|
||||||
/venv
|
/venv
|
||||||
|
.numba_cache/
|
||||||
|
|
||||||
# Gemini files
|
# Gemini files
|
||||||
GEMINI.md
|
GEMINI.md
|
||||||
PROGRESS.md
|
PROGRESS.md
|
||||||
|
.vscode/launch.json
|
||||||
|
voices/preferences.json
|
||||||
|
|||||||
242
README.md
Normal file → Executable file
242
README.md
Normal file → Executable file
@@ -8,6 +8,15 @@ A Discord bot that reads messages aloud using [Pocket TTS](https://github.com/ky
|
|||||||
- 📝 **Auto-read Messages**: Automatically reads all messages from a configured text channel
|
- 📝 **Auto-read Messages**: Automatically reads all messages from a configured text channel
|
||||||
- 🔊 **Voice Channel Streaming**: Streams generated audio to the voice channel where the message author is
|
- 🔊 **Voice Channel Streaming**: Streams generated audio to the voice channel where the message author is
|
||||||
- 📋 **Message Queue**: Messages are queued and spoken in order
|
- 📋 **Message Queue**: Messages are queued and spoken in order
|
||||||
|
- 🔄 **Per-User Voice Selection**: Each user can choose their own TTS voice via `/voice` commands
|
||||||
|
- 💾 **Voice Persistence**: User voice preferences are saved and restored on restart
|
||||||
|
- 🔄 **Hot-reload Voices**: Add new voices without restarting the bot using `/voice refresh`
|
||||||
|
- 🧪 **Test Mode**: Separate testing configuration for safe development
|
||||||
|
- 📦 **Auto-updates**: Automatically checks for and installs dependency updates on startup
|
||||||
|
- 👂 **Voice Preview**: Preview voices with `/voice preview` before committing to them
|
||||||
|
- 🎵 **Audio Effects**: 7 different effects to customize your voice (pitch, speed, echo, robot, chorus, tremolo)
|
||||||
|
- ⚡ **Unlimited Effects**: Use as many effects as you want (warning shown when >2 active)
|
||||||
|
- ⏱️ **Processing Indicator**: Shows when audio processing is taking longer than expected
|
||||||
|
|
||||||
## Prerequisites
|
## Prerequisites
|
||||||
|
|
||||||
@@ -75,12 +84,15 @@ A Discord bot that reads messages aloud using [Pocket TTS](https://github.com/ky
|
|||||||
```env
|
```env
|
||||||
DISCORD_TOKEN=your_bot_token_here
|
DISCORD_TOKEN=your_bot_token_here
|
||||||
TEXT_CHANNEL_ID=123456789012345678
|
TEXT_CHANNEL_ID=123456789012345678
|
||||||
VOICE_WAV_PATH=./voice.wav
|
VOICES_DIR=./voices
|
||||||
|
DEFAULT_VOICE=estinien
|
||||||
```
|
```
|
||||||
|
|
||||||
5. **Add a voice reference file**:
|
5. **Add voice reference files**:
|
||||||
- Place a WAV file named `voice.wav` in the project directory
|
- Create a `voices/` directory: `mkdir voices`
|
||||||
- The file should contain 3-10 seconds of clear speech
|
- Place `.wav` files in the `voices/` directory
|
||||||
|
- Each file should contain 3-10 seconds of clear speech
|
||||||
|
- File names become voice names (e.g., `MasterChief.wav` → `/voice set masterchief`)
|
||||||
- Higher quality audio = better voice cloning results
|
- Higher quality audio = better voice cloning results
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
@@ -96,6 +108,114 @@ A Discord bot that reads messages aloud using [Pocket TTS](https://github.com/ky
|
|||||||
- The bot will join your voice channel and read your message aloud
|
- The bot will join your voice channel and read your message aloud
|
||||||
- Messages are queued if the bot is already speaking
|
- Messages are queued if the bot is already speaking
|
||||||
|
|
||||||
|
3. **Voice Commands** (Slash Commands):
|
||||||
|
- `/voice list` - Shows all available voices
|
||||||
|
- `/voice set <name>` - Change your personal TTS voice
|
||||||
|
- `/voice current` - Shows your current voice
|
||||||
|
- `/voice refresh` - Re-scan for new voice files (no restart needed)
|
||||||
|
- `/voice preview <name>` - Preview a voice before selecting it
|
||||||
|
|
||||||
|
### Test Mode
|
||||||
|
|
||||||
|
Run the bot in testing mode to use a separate configuration:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python bot.py testing
|
||||||
|
```
|
||||||
|
|
||||||
|
This loads `.env.testing` instead of `.env`, allowing you to:
|
||||||
|
- Use a different Discord bot token for testing
|
||||||
|
- Monitor a different text channel
|
||||||
|
- Test new features without affecting the production bot
|
||||||
|
|
||||||
|
Create `.env.testing` by copying `.env.example` and configuring it with your testing values.
|
||||||
|
|
||||||
|
### Audio Effects
|
||||||
|
|
||||||
|
Transform your TTS voice with 7 different audio effects:
|
||||||
|
|
||||||
|
#### Available Effects:
|
||||||
|
|
||||||
|
**🎵 Pitch** (`/effects set pitch <semitones>`)
|
||||||
|
- Range: -12 to +12 semitones
|
||||||
|
- Default: 0 (no change)
|
||||||
|
- Positive = higher/chipmunk voice
|
||||||
|
- Negative = lower/deeper voice
|
||||||
|
|
||||||
|
**⚡ Speed** (`/effects set speed <multiplier>`)
|
||||||
|
- Range: 0.5 to 2.0
|
||||||
|
- Default: 1.0x (normal speed)
|
||||||
|
- Higher = faster speech
|
||||||
|
- Lower = slower speech
|
||||||
|
|
||||||
|
**🔊 Echo** (`/effects set echo <percentage>`)
|
||||||
|
- Range: 0-100%
|
||||||
|
- Default: 0% (off)
|
||||||
|
- Adds spatial delay and reverb effect
|
||||||
|
- Higher values = more pronounced echo
|
||||||
|
|
||||||
|
**🤖 Robot** (`/effects set robot <percentage>`)
|
||||||
|
- Range: 0-100%
|
||||||
|
- Default: 0% (off)
|
||||||
|
- Applies ring modulation for sci-fi robotic voice
|
||||||
|
- Higher values = more robotic distortion
|
||||||
|
|
||||||
|
**🎶 Chorus** (`/effects set chorus <percentage>`)
|
||||||
|
- Range: 0-100%
|
||||||
|
- Default: 0% (off)
|
||||||
|
- Creates "multiple voices" effect with slight pitch variations
|
||||||
|
- Higher values = more voices and depth
|
||||||
|
|
||||||
|
**〰️ Tremolo Depth** (`/effects set tremolo_depth <value>`)
|
||||||
|
- Range: 0.0 to 1.0
|
||||||
|
- Default: 0.0 (off)
|
||||||
|
- Controls amplitude modulation amount
|
||||||
|
- Higher = more warble/vintage radio effect
|
||||||
|
|
||||||
|
**📳 Tremolo Rate** (`/effects set tremolo_rate <hertz>`)
|
||||||
|
- Range: 0.0 to 10.0 Hz
|
||||||
|
- Default: 0.0 Hz (off)
|
||||||
|
- Controls how fast the tremolo warbles
|
||||||
|
- Requires tremolo_depth > 0 to have effect
|
||||||
|
|
||||||
|
#### Effect Commands:
|
||||||
|
- `/effects list` - Show all your current effect settings
|
||||||
|
- `/effects set <effect> <value>` - Change an effect value
|
||||||
|
- `/effects reset` - Reset all effects to defaults (with confirmation)
|
||||||
|
|
||||||
|
#### Effect Application Order:
|
||||||
|
Effects are applied in this sequence:
|
||||||
|
1. Pitch shift
|
||||||
|
2. Speed change
|
||||||
|
3. Echo/Reverb
|
||||||
|
4. Chorus
|
||||||
|
5. Tremolo
|
||||||
|
6. Robot voice
|
||||||
|
|
||||||
|
#### Performance Notes:
|
||||||
|
- **No limit** on number of active effects
|
||||||
|
- ⚠️ Warning shown when you have more than 2 active effects
|
||||||
|
- More effects = longer processing time
|
||||||
|
- Some effects (like pitch shift and chorus) are more CPU-intensive
|
||||||
|
- Processing time is logged to console for monitoring
|
||||||
|
|
||||||
|
### Preview with Effects
|
||||||
|
|
||||||
|
Test any combination of voice and effects before committing:
|
||||||
|
|
||||||
|
**Preview a voice:**
|
||||||
|
- `/voice preview <voice_name>` - Preview with your current effects
|
||||||
|
|
||||||
|
**Preview with specific effects:**
|
||||||
|
- `/voice preview <voice_name> pitch:5 speed:1.5` - Preview with pitch +5 and 1.5x speed
|
||||||
|
- All effect parameters are optional and default to your current settings
|
||||||
|
|
||||||
|
**Example combinations to try:**
|
||||||
|
- Robot voice: `/effects set robot 75`
|
||||||
|
- Deep scary voice: `/effects set pitch -8`
|
||||||
|
- Fast chipmunk: `/effects set pitch 8 speed:1.5`
|
||||||
|
- Radio announcer: `/effects set echo 40 tremolo_depth:0.3 tremolo_rate:4`
|
||||||
|
|
||||||
## How It Works
|
## How It Works
|
||||||
|
|
||||||
```
|
```
|
||||||
@@ -106,8 +226,8 @@ A Discord bot that reads messages aloud using [Pocket TTS](https://github.com/ky
|
|||||||
▲
|
▲
|
||||||
│
|
│
|
||||||
┌─────┴─────┐
|
┌─────┴─────┐
|
||||||
│ voice.wav │
|
│ voices/ │
|
||||||
│ (speaker) │
|
│ per-user │
|
||||||
└───────────┘
|
└───────────┘
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -133,6 +253,116 @@ A Discord bot that reads messages aloud using [Pocket TTS](https://github.com/ky
|
|||||||
- Ensure the reference audio is clear with minimal background noise
|
- Ensure the reference audio is clear with minimal background noise
|
||||||
- Try a longer reference clip (5-10 seconds)
|
- Try a longer reference clip (5-10 seconds)
|
||||||
|
|
||||||
|
### HuggingFace cache read-only error
|
||||||
|
If you see errors like `OSError: [Errno 30] Read-only file system` when the bot tries to download the TTS model:
|
||||||
|
|
||||||
|
1. **Set a writable cache directory**: Add to your `.env` file:
|
||||||
|
```env
|
||||||
|
HF_HOME=/tmp/huggingface
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Create and set permissions** on the directory:
|
||||||
|
```bash
|
||||||
|
sudo mkdir /tmp/huggingface
|
||||||
|
sudo chown -R $USER:$USER /tmp/huggingface
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **If using systemd service**: Ensure the service has write access to `/tmp` or the chosen cache directory. You may need to add `ReadWritePaths=/tmp/huggingface` to the service file or remove `ProtectHome=read-only`.
|
||||||
|
|
||||||
|
4. **Restart the bot**:
|
||||||
|
```bash
|
||||||
|
sudo systemctl restart vox.service
|
||||||
|
```
|
||||||
|
|
||||||
|
## Linux Server Deployment
|
||||||
|
|
||||||
|
To run the bot as a service on a Linux server:
|
||||||
|
|
||||||
|
### Quick Setup (Recommended)
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Make the setup script executable
|
||||||
|
chmod +x setup_linux.sh
|
||||||
|
|
||||||
|
# Run the setup script
|
||||||
|
./setup_linux.sh
|
||||||
|
```
|
||||||
|
|
||||||
|
The script will:
|
||||||
|
- Check system dependencies (Python 3.10+, FFmpeg, pip)
|
||||||
|
- Create a virtual environment and install dependencies
|
||||||
|
- Create `.env` template if needed
|
||||||
|
- Optionally install and configure the systemd service
|
||||||
|
|
||||||
|
### Manual Setup
|
||||||
|
|
||||||
|
1. **Install system dependencies**:
|
||||||
|
```bash
|
||||||
|
# Ubuntu/Debian
|
||||||
|
sudo apt update
|
||||||
|
sudo apt install python3 python3-pip python3-venv ffmpeg
|
||||||
|
|
||||||
|
# Fedora
|
||||||
|
sudo dnf install python3 python3-pip ffmpeg
|
||||||
|
|
||||||
|
# Arch
|
||||||
|
sudo pacman -S python python-pip ffmpeg
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Set up the project**:
|
||||||
|
```bash
|
||||||
|
cd /path/to/PocketTTSBot
|
||||||
|
python3 -m venv venv
|
||||||
|
source venv/bin/activate
|
||||||
|
pip install -r requirements.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **Configure the service**:
|
||||||
|
|
||||||
|
Edit `pockettts.service` and replace:
|
||||||
|
- `YOUR_USERNAME` with your Linux username
|
||||||
|
- Update paths if your bot is not in `/home/YOUR_USERNAME/PocketTTSBot`
|
||||||
|
|
||||||
|
4. **Install the service**:
|
||||||
|
```bash
|
||||||
|
sudo cp pockettts.service /etc/systemd/system/
|
||||||
|
sudo systemctl daemon-reload
|
||||||
|
sudo systemctl enable pockettts # Start on boot
|
||||||
|
sudo systemctl start pockettts # Start now
|
||||||
|
```
|
||||||
|
|
||||||
|
### Service Management
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Check status
|
||||||
|
sudo systemctl status pockettts
|
||||||
|
|
||||||
|
# View logs (live)
|
||||||
|
journalctl -u pockettts -f
|
||||||
|
|
||||||
|
# View recent logs
|
||||||
|
journalctl -u pockettts --since "1 hour ago"
|
||||||
|
|
||||||
|
# Restart after changes
|
||||||
|
sudo systemctl restart pockettts
|
||||||
|
|
||||||
|
# Stop the bot
|
||||||
|
sudo systemctl stop pockettts
|
||||||
|
|
||||||
|
# Disable auto-start
|
||||||
|
sudo systemctl disable pockettts
|
||||||
|
```
|
||||||
|
|
||||||
|
### Updating the Bot
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd /path/to/PocketTTSBot
|
||||||
|
git pull # If using git
|
||||||
|
source venv/bin/activate
|
||||||
|
pip install -r requirements.txt
|
||||||
|
sudo systemctl restart pockettts
|
||||||
|
```
|
||||||
|
|
||||||
## License
|
## License
|
||||||
|
|
||||||
MIT License
|
MIT License
|
||||||
|
|||||||
345
audio_effects.py
Executable file
345
audio_effects.py
Executable file
@@ -0,0 +1,345 @@
|
|||||||
|
"""Audio effects processing for TTS output."""
|
||||||
|
|
||||||
|
import time
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import librosa
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
class AudioEffects:
|
||||||
|
"""Apply post-processing effects to TTS audio."""
|
||||||
|
|
||||||
|
# No limit on effects, but warnings shown when > 2 active
|
||||||
|
MAX_ACTIVE_EFFECTS = None
|
||||||
|
|
||||||
|
# Effect ranges and defaults
|
||||||
|
PITCH_MIN = -12
|
||||||
|
PITCH_MAX = 12
|
||||||
|
PITCH_DEFAULT = 0
|
||||||
|
|
||||||
|
SPEED_MIN = 0.5
|
||||||
|
SPEED_MAX = 2.0
|
||||||
|
SPEED_DEFAULT = 1.0
|
||||||
|
|
||||||
|
ECHO_MIN = 0
|
||||||
|
ECHO_MAX = 100
|
||||||
|
ECHO_DEFAULT = 0
|
||||||
|
|
||||||
|
ROBOT_MIN = 0
|
||||||
|
ROBOT_MAX = 100
|
||||||
|
ROBOT_DEFAULT = 0
|
||||||
|
|
||||||
|
CHORUS_MIN = 0
|
||||||
|
CHORUS_MAX = 100
|
||||||
|
CHORUS_DEFAULT = 0
|
||||||
|
|
||||||
|
TREMOLO_DEPTH_MIN = 0.0
|
||||||
|
TREMOLO_DEPTH_MAX = 1.0
|
||||||
|
TREMOLO_DEPTH_DEFAULT = 0.0
|
||||||
|
|
||||||
|
TREMOLO_RATE_MIN = 0.0
|
||||||
|
TREMOLO_RATE_MAX = 10.0
|
||||||
|
TREMOLO_RATE_DEFAULT = 0.0
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def apply_effects(
|
||||||
|
cls,
|
||||||
|
audio: np.ndarray,
|
||||||
|
sr: int,
|
||||||
|
pitch: int = PITCH_DEFAULT,
|
||||||
|
speed: float = SPEED_DEFAULT,
|
||||||
|
echo: int = ECHO_DEFAULT,
|
||||||
|
robot: int = ROBOT_DEFAULT,
|
||||||
|
chorus: int = CHORUS_DEFAULT,
|
||||||
|
tremolo_depth: float = TREMOLO_DEPTH_DEFAULT,
|
||||||
|
tremolo_rate: float = TREMOLO_RATE_DEFAULT,
|
||||||
|
) -> tuple[np.ndarray, bool]:
|
||||||
|
"""
|
||||||
|
Apply effects to audio in order: pitch → speed → echo → chorus → tremolo → robot
|
||||||
|
|
||||||
|
Args:
|
||||||
|
audio: Input audio array (1D)
|
||||||
|
sr: Sample rate
|
||||||
|
pitch: Pitch shift in semitones (-12 to +12, 0 = no shift)
|
||||||
|
speed: Speed multiplier (0.5 to 2.0, 1.0 = normal)
|
||||||
|
echo: Echo intensity (0-100, 0 = no echo)
|
||||||
|
robot: Robot voice intensity (0-100, 0 = no robot)
|
||||||
|
chorus: Chorus intensity (0-100, 0 = no chorus)
|
||||||
|
tremolo_depth: Tremolo depth (0.0-1.0, 0.0 = no tremolo)
|
||||||
|
tremolo_rate: Tremolo rate in Hz (0.0-10.0)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (processed_audio, show_processing_message)
|
||||||
|
show_processing_message is True if processing took > 1 second
|
||||||
|
"""
|
||||||
|
start_time = time.time()
|
||||||
|
original_length = len(audio)
|
||||||
|
|
||||||
|
# Validate inputs
|
||||||
|
pitch = max(cls.PITCH_MIN, min(cls.PITCH_MAX, pitch))
|
||||||
|
speed = max(cls.SPEED_MIN, min(cls.SPEED_MAX, speed))
|
||||||
|
echo = max(cls.ECHO_MIN, min(cls.ECHO_MAX, echo))
|
||||||
|
robot = max(cls.ROBOT_MIN, min(cls.ROBOT_MAX, robot))
|
||||||
|
chorus = max(cls.CHORUS_MIN, min(cls.CHORUS_MAX, chorus))
|
||||||
|
tremolo_depth = max(cls.TREMOLO_DEPTH_MIN, min(cls.TREMOLO_DEPTH_MAX, tremolo_depth))
|
||||||
|
tremolo_rate = max(cls.TREMOLO_RATE_MIN, min(cls.TREMOLO_RATE_MAX, tremolo_rate))
|
||||||
|
|
||||||
|
# Apply pitch shift first
|
||||||
|
if pitch != cls.PITCH_DEFAULT:
|
||||||
|
print(f" Applying pitch shift: {pitch:+d} semitones...")
|
||||||
|
audio = librosa.effects.pitch_shift(
|
||||||
|
audio, sr=sr, n_steps=pitch, bins_per_octave=12
|
||||||
|
)
|
||||||
|
|
||||||
|
# Apply speed change second
|
||||||
|
if speed != cls.SPEED_DEFAULT:
|
||||||
|
print(f" Applying speed change: {speed:.1f}x...")
|
||||||
|
audio = librosa.effects.time_stretch(audio, rate=speed)
|
||||||
|
|
||||||
|
# Apply echo third
|
||||||
|
if echo > 0:
|
||||||
|
print(f" Applying echo: {echo}%...")
|
||||||
|
audio = cls._apply_echo(audio, sr, echo)
|
||||||
|
|
||||||
|
# Apply chorus fourth
|
||||||
|
if chorus > 0:
|
||||||
|
print(f" Applying chorus: {chorus}%...")
|
||||||
|
audio = cls._apply_chorus(audio, sr, chorus)
|
||||||
|
|
||||||
|
# Apply tremolo fifth
|
||||||
|
if tremolo_depth > 0 and tremolo_rate > 0:
|
||||||
|
print(f" Applying tremolo: depth={tremolo_depth:.1f}, rate={tremolo_rate:.1f}Hz...")
|
||||||
|
audio = cls._apply_tremolo(audio, sr, tremolo_depth, tremolo_rate)
|
||||||
|
|
||||||
|
# Apply robot voice last
|
||||||
|
if robot > 0:
|
||||||
|
print(f" Applying robot effect: {robot}%...")
|
||||||
|
audio = cls._apply_robot(audio, sr, robot)
|
||||||
|
|
||||||
|
processing_time = time.time() - start_time
|
||||||
|
print(f" Effects applied in {processing_time:.2f}s")
|
||||||
|
|
||||||
|
# Show processing message if it took more than 1 second
|
||||||
|
show_message = processing_time > 1.0
|
||||||
|
|
||||||
|
return audio, show_message
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _apply_echo(cls, audio: np.ndarray, sr: int, intensity: int) -> np.ndarray:
|
||||||
|
"""Apply simple echo/reverb effect."""
|
||||||
|
if intensity == 0:
|
||||||
|
return audio
|
||||||
|
|
||||||
|
# Calculate delay in samples (50-300ms based on intensity)
|
||||||
|
delay_ms = 50 + (intensity / 100) * 250
|
||||||
|
delay_samples = int((delay_ms / 1000) * sr)
|
||||||
|
|
||||||
|
# Create output array
|
||||||
|
output = np.copy(audio)
|
||||||
|
|
||||||
|
# Add delayed copy with decay
|
||||||
|
decay = 0.3 + (intensity / 100) * 0.4 # 0.3-0.7 decay factor
|
||||||
|
if delay_samples < len(audio):
|
||||||
|
output[delay_samples:] += audio[:-delay_samples] * decay
|
||||||
|
|
||||||
|
# Normalize
|
||||||
|
max_val = np.max(np.abs(output))
|
||||||
|
if max_val > 0:
|
||||||
|
output = output / max_val * np.max(np.abs(audio))
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _apply_chorus(cls, audio: np.ndarray, sr: int, intensity: int) -> np.ndarray:
|
||||||
|
"""Apply chorus effect using multiple delayed voices."""
|
||||||
|
if intensity == 0:
|
||||||
|
return audio
|
||||||
|
|
||||||
|
# Number of voices based on intensity (1-3)
|
||||||
|
num_voices = 1 + int((intensity / 100) * 2)
|
||||||
|
|
||||||
|
# Base delay (15-30ms)
|
||||||
|
base_delay_ms = 15 + (intensity / 100) * 15
|
||||||
|
base_delay_samples = int((base_delay_ms / 1000) * sr)
|
||||||
|
|
||||||
|
output = np.copy(audio) * 0.6 # Reduce original to make room for voices
|
||||||
|
|
||||||
|
for i in range(num_voices):
|
||||||
|
# Slight pitch variation for each voice (±3%)
|
||||||
|
pitch_var = 1.0 + (0.03 * (i - 1))
|
||||||
|
try:
|
||||||
|
voice = librosa.effects.time_stretch(audio, rate=pitch_var)
|
||||||
|
|
||||||
|
# Slight delay variation
|
||||||
|
delay_samples = base_delay_samples + int((i * 5 / 1000) * sr)
|
||||||
|
|
||||||
|
# Mix voice into output
|
||||||
|
voice_len = min(len(voice), len(output) - delay_samples)
|
||||||
|
if voice_len > 0:
|
||||||
|
output[delay_samples:delay_samples + voice_len] += voice[:voice_len] * 0.2
|
||||||
|
except Exception as e:
|
||||||
|
print(f" Warning: Chorus voice {i+1} failed: {e}")
|
||||||
|
|
||||||
|
# Normalize
|
||||||
|
max_val = np.max(np.abs(output))
|
||||||
|
if max_val > 0:
|
||||||
|
output = output / max_val * 0.95
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _apply_tremolo(cls, audio: np.ndarray, sr: int, depth: float, rate: float) -> np.ndarray:
|
||||||
|
"""Apply tremolo effect (amplitude modulation)."""
|
||||||
|
if depth == 0 or rate == 0:
|
||||||
|
return audio
|
||||||
|
|
||||||
|
# Create modulation signal
|
||||||
|
duration = len(audio) / sr
|
||||||
|
t = np.linspace(0, duration, len(audio))
|
||||||
|
|
||||||
|
# Sine wave modulation at specified rate
|
||||||
|
modulation = 1.0 - depth * 0.5 * (1 - np.sin(2 * np.pi * rate * t))
|
||||||
|
|
||||||
|
return audio * modulation
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _apply_robot(cls, audio: np.ndarray, sr: int, intensity: int) -> np.ndarray:
|
||||||
|
"""Apply robot voice effect using ring modulation."""
|
||||||
|
if intensity == 0:
|
||||||
|
return audio
|
||||||
|
|
||||||
|
# Carrier frequency based on intensity (80-300 Hz)
|
||||||
|
carrier_freq = 80 + (intensity / 100) * 220
|
||||||
|
|
||||||
|
# Create carrier signal
|
||||||
|
duration = len(audio) / sr
|
||||||
|
t = np.linspace(0, duration, len(audio))
|
||||||
|
carrier = np.sin(2 * np.pi * carrier_freq * t)
|
||||||
|
|
||||||
|
# Mix original with ring-modulated version based on intensity
|
||||||
|
mix = intensity / 100
|
||||||
|
robot_signal = audio * carrier
|
||||||
|
output = audio * (1 - mix * 0.7) + robot_signal * mix * 0.7
|
||||||
|
|
||||||
|
# Normalize
|
||||||
|
max_val = np.max(np.abs(output))
|
||||||
|
if max_val > 0:
|
||||||
|
output = output / max_val * 0.95
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def validate_effect(cls, effect_name: str, value: Any) -> tuple[bool, str]:
|
||||||
|
"""
|
||||||
|
Validate an effect value.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (is_valid, error_message)
|
||||||
|
"""
|
||||||
|
validators = {
|
||||||
|
"pitch": (int, cls.PITCH_MIN, cls.PITCH_MAX, "Pitch must be a whole number", "semitones"),
|
||||||
|
"speed": (float, cls.SPEED_MIN, cls.SPEED_MAX, "Speed must be a number", "x"),
|
||||||
|
"echo": (int, cls.ECHO_MIN, cls.ECHO_MAX, "Echo must be a whole number", "%"),
|
||||||
|
"robot": (int, cls.ROBOT_MIN, cls.ROBOT_MAX, "Robot must be a whole number", "%"),
|
||||||
|
"chorus": (int, cls.CHORUS_MIN, cls.CHORUS_MAX, "Chorus must be a whole number", "%"),
|
||||||
|
"tremolo_depth": (float, cls.TREMOLO_DEPTH_MIN, cls.TREMOLO_DEPTH_MAX, "Tremolo depth must be a number", ""),
|
||||||
|
"tremolo_rate": (float, cls.TREMOLO_RATE_MIN, cls.TREMOLO_RATE_MAX, "Tremolo rate must be a number", "Hz"),
|
||||||
|
}
|
||||||
|
|
||||||
|
if effect_name not in validators:
|
||||||
|
return False, f"Unknown effect: {effect_name}"
|
||||||
|
|
||||||
|
type_func, min_val, max_val, error_msg, unit = validators[effect_name]
|
||||||
|
|
||||||
|
try:
|
||||||
|
val = type_func(value)
|
||||||
|
if min_val <= val <= max_val:
|
||||||
|
return True, ""
|
||||||
|
unit_str = f" {unit}" if unit else ""
|
||||||
|
return False, f"{effect_name.replace('_', ' ').title()} must be between {min_val} and {max_val}{unit_str}"
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
return False, error_msg
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def count_active_effects(cls, **effects) -> int:
|
||||||
|
"""Count how many effects are active (non-default)."""
|
||||||
|
count = 0
|
||||||
|
# Convert values to proper types (JSON stores them as strings)
|
||||||
|
pitch = int(effects.get("pitch", cls.PITCH_DEFAULT))
|
||||||
|
speed = float(effects.get("speed", cls.SPEED_DEFAULT))
|
||||||
|
echo = int(effects.get("echo", cls.ECHO_DEFAULT))
|
||||||
|
robot = int(effects.get("robot", cls.ROBOT_DEFAULT))
|
||||||
|
chorus = int(effects.get("chorus", cls.CHORUS_DEFAULT))
|
||||||
|
tremolo_depth = float(effects.get("tremolo_depth", cls.TREMOLO_DEPTH_DEFAULT))
|
||||||
|
|
||||||
|
if pitch != cls.PITCH_DEFAULT:
|
||||||
|
count += 1
|
||||||
|
if speed != cls.SPEED_DEFAULT:
|
||||||
|
count += 1
|
||||||
|
if echo > cls.ECHO_DEFAULT:
|
||||||
|
count += 1
|
||||||
|
if robot > cls.ROBOT_DEFAULT:
|
||||||
|
count += 1
|
||||||
|
if chorus > cls.CHORUS_DEFAULT:
|
||||||
|
count += 1
|
||||||
|
if tremolo_depth > cls.TREMOLO_DEPTH_DEFAULT:
|
||||||
|
count += 1
|
||||||
|
# tremolo_rate only counts if depth is also active
|
||||||
|
return count
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_effect_description(cls, effect_name: str) -> str:
|
||||||
|
"""Get a human-readable description of what an effect does."""
|
||||||
|
descriptions = {
|
||||||
|
"pitch": f"Changes voice pitch ({cls.PITCH_MIN} to {cls.PITCH_MAX} semitones). Positive = higher/chipmunk, Negative = lower/deeper.",
|
||||||
|
"speed": f"Changes speech speed ({cls.SPEED_MIN} to {cls.SPEED_MAX}x). Higher = faster, Lower = slower.",
|
||||||
|
"echo": f"Adds echo/reverb ({cls.ECHO_MIN} to {cls.ECHO_MAX}%). Higher = more pronounced echo.",
|
||||||
|
"robot": f"Applies robot voice effect ({cls.ROBOT_MIN} to {cls.ROBOT_MAX}%). Higher = more robotic.",
|
||||||
|
"chorus": f"Adds chorus effect ({cls.CHORUS_MIN} to {cls.CHORUS_MAX}%). Higher = more voices/depth.",
|
||||||
|
"tremolo_depth": f"Tremolo amplitude modulation ({cls.TREMOLO_DEPTH_MIN} to {cls.TREMOLO_DEPTH_MAX}). Higher = more warble.",
|
||||||
|
"tremolo_rate": f"Tremolo speed ({cls.TREMOLO_RATE_MIN} to {cls.TREMOLO_RATE_MAX} Hz). Higher = faster warble.",
|
||||||
|
}
|
||||||
|
return descriptions.get(effect_name, "Unknown effect")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def format_effect_value(cls, effect_name: str, value: Any) -> str:
|
||||||
|
"""Format an effect value for display."""
|
||||||
|
if effect_name == "pitch":
|
||||||
|
pitch = int(value)
|
||||||
|
if pitch == 0:
|
||||||
|
return "0 (normal)"
|
||||||
|
direction = "higher" if pitch > 0 else "lower"
|
||||||
|
return f"{pitch:+d} ({direction})"
|
||||||
|
elif effect_name == "speed":
|
||||||
|
speed = float(value)
|
||||||
|
if speed == 1.0:
|
||||||
|
return "1.0x (normal)"
|
||||||
|
direction = "faster" if speed > 1.0 else "slower"
|
||||||
|
return f"{speed:.1f}x ({direction})"
|
||||||
|
elif effect_name == "echo":
|
||||||
|
echo = int(value)
|
||||||
|
if echo == 0:
|
||||||
|
return "0% (off)"
|
||||||
|
return f"{echo}%"
|
||||||
|
elif effect_name == "robot":
|
||||||
|
robot = int(value)
|
||||||
|
if robot == 0:
|
||||||
|
return "0% (off)"
|
||||||
|
return f"{robot}%"
|
||||||
|
elif effect_name == "chorus":
|
||||||
|
chorus = int(value)
|
||||||
|
if chorus == 0:
|
||||||
|
return "0% (off)"
|
||||||
|
return f"{chorus}%"
|
||||||
|
elif effect_name == "tremolo_depth":
|
||||||
|
depth = float(value)
|
||||||
|
if depth == 0.0:
|
||||||
|
return "0.0 (off)"
|
||||||
|
return f"{depth:.1f}"
|
||||||
|
elif effect_name == "tremolo_rate":
|
||||||
|
rate = float(value)
|
||||||
|
if rate == 0.0:
|
||||||
|
return "0.0 Hz (off)"
|
||||||
|
return f"{rate:.1f} Hz"
|
||||||
|
return str(value)
|
||||||
12
audio_preprocessor.py
Normal file → Executable file
12
audio_preprocessor.py
Normal file → Executable file
@@ -190,16 +190,16 @@ def print_audio_analysis(file_path: str) -> None:
|
|||||||
print(f"\n{'=' * 50}")
|
print(f"\n{'=' * 50}")
|
||||||
print(f"Audio Analysis: {info['path']}")
|
print(f"Audio Analysis: {info['path']}")
|
||||||
print(f"{'=' * 50}")
|
print(f"{'=' * 50}")
|
||||||
print(f" Sample Rate: {info['sample_rate']} Hz {'⚠️ (should be 22050)' if info['needs_resampling'] else '✓'}")
|
print(f" Sample Rate: {info['sample_rate']} Hz {'[WARN] (should be 22050)' if info['needs_resampling'] else '[OK]'}")
|
||||||
print(f" Duration: {info['duration_seconds']:.2f}s", end="")
|
print(f" Duration: {info['duration_seconds']:.2f}s", end="")
|
||||||
if info['is_too_short']:
|
if info['is_too_short']:
|
||||||
print(" ⚠️ (too short, aim for 5-15s)")
|
print(" [WARN] (too short, aim for 5-15s)")
|
||||||
elif info['is_too_long']:
|
elif info['is_too_long']:
|
||||||
print(" ⚠️ (quite long, 5-15s is ideal)")
|
print(" [WARN] (quite long, 5-15s is ideal)")
|
||||||
else:
|
else:
|
||||||
print(" ✓")
|
print(" [OK]")
|
||||||
print(f" Channels: {'Stereo' if info['is_stereo'] else 'Mono'} {'⚠️ (will convert to mono)' if info['is_stereo'] else '✓'}")
|
print(f" Channels: {'Stereo' if info['is_stereo'] else 'Mono'} {'[WARN] (will convert to mono)' if info['is_stereo'] else '[OK]'}")
|
||||||
print(f" Max Amplitude: {info['max_amplitude']:.3f} {'✓' if info['is_normalized'] else '⚠️ (low volume)'}")
|
print(f" Max Amplitude: {info['max_amplitude']:.3f} {'[OK]' if info['is_normalized'] else '[WARN] (low volume)'}")
|
||||||
print(f" RMS Level: {info['rms_level']:.4f}")
|
print(f" RMS Level: {info['rms_level']:.4f}")
|
||||||
print(f" Noise Floor: {info['estimated_noise_floor']:.4f}")
|
print(f" Noise Floor: {info['estimated_noise_floor']:.4f}")
|
||||||
print(f"{'=' * 50}\n")
|
print(f"{'=' * 50}\n")
|
||||||
|
|||||||
731
bot.py
Normal file → Executable file
731
bot.py
Normal file → Executable file
@@ -1,9 +1,48 @@
|
|||||||
|
__version__ = "1.2.0"
|
||||||
|
|
||||||
|
import random
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
|
# Parse command line arguments before loading any config
|
||||||
|
if len(sys.argv) > 1 and sys.argv[1] == "testing":
|
||||||
|
os.environ["ENV_MODE"] = "testing"
|
||||||
|
# Remove the argument so it doesn't interfere with other parsing
|
||||||
|
sys.argv.pop(1)
|
||||||
|
|
||||||
|
import numba_config
|
||||||
import asyncio
|
import asyncio
|
||||||
import io
|
import io
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
import discord
|
import discord
|
||||||
|
import numpy as np
|
||||||
|
import scipy.io.wavfile as wavfile
|
||||||
|
from discord import app_commands
|
||||||
from discord.ext import commands
|
from discord.ext import commands
|
||||||
|
|
||||||
|
from audio_effects import AudioEffects
|
||||||
from config import Config
|
from config import Config
|
||||||
from tts_handler import TTSHandler
|
from voice_manager import VoiceManager
|
||||||
|
|
||||||
|
|
||||||
|
# Inactivity timeout in seconds (10 minutes)
|
||||||
|
INACTIVITY_TIMEOUT = 10 * 60
|
||||||
|
|
||||||
|
# Sample lines for voice preview
|
||||||
|
PREVIEW_LINES = [
|
||||||
|
"Hello! This is how I sound. Choose me as your voice with /voice set.",
|
||||||
|
"Testing, one, two, three! Can you hear me clearly?",
|
||||||
|
"Here's a preview of my voice. Pretty cool, right?",
|
||||||
|
"Greetings! I am ready to speak for you.",
|
||||||
|
"Voice check! This is what I sound like.",
|
||||||
|
"Audio test complete. This voice is ready to go!",
|
||||||
|
"Sample message incoming. How do I sound to you?",
|
||||||
|
"Preview mode activated. Testing speech synthesis.",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
class TTSBot(commands.Bot):
|
class TTSBot(commands.Bot):
|
||||||
@@ -15,19 +54,553 @@ class TTSBot(commands.Bot):
|
|||||||
intents.voice_states = True
|
intents.voice_states = True
|
||||||
super().__init__(command_prefix="!", intents=intents)
|
super().__init__(command_prefix="!", intents=intents)
|
||||||
|
|
||||||
self.tts_handler = TTSHandler(Config.VOICE_WAV_PATH)
|
self.voice_manager = VoiceManager(Config.VOICES_DIR, Config.DEFAULT_VOICE)
|
||||||
self.message_queue: asyncio.Queue[tuple[discord.Message, str]] = asyncio.Queue()
|
self.message_queue: asyncio.Queue[tuple[discord.Message, str] | tuple[discord.Message, str, str]] = asyncio.Queue()
|
||||||
|
self.last_activity: float = 0.0
|
||||||
|
|
||||||
|
print("\n=== Command Registration ===")
|
||||||
|
self._setup_slash_commands()
|
||||||
|
self._setup_effects_commands()
|
||||||
|
self._log_registered_commands()
|
||||||
|
print("=== End Command Registration ===\n")
|
||||||
|
|
||||||
|
def _log_registered_commands(self) -> None:
|
||||||
|
"""Log all registered commands to console."""
|
||||||
|
print("\nRegistered commands:")
|
||||||
|
commands = list(self.tree.get_commands())
|
||||||
|
if not commands:
|
||||||
|
print(" ⚠️ No commands registered!")
|
||||||
|
else:
|
||||||
|
for cmd in commands:
|
||||||
|
print(f" ✓ /{cmd.name} - {cmd.description}")
|
||||||
|
print(f"\nTotal commands registered: {len(commands)}")
|
||||||
|
|
||||||
|
def _setup_slash_commands(self) -> None:
|
||||||
|
"""Set up slash commands for voice management."""
|
||||||
|
print("Setting up voice commands...")
|
||||||
|
|
||||||
|
@self.tree.command(name="voice", description="Manage your TTS voice")
|
||||||
|
@app_commands.describe(
|
||||||
|
action="What to do",
|
||||||
|
voice_name="Name of the voice (for 'set' or 'preview' action)",
|
||||||
|
preview_pitch="Optional pitch for preview (-12 to 12, default: use your settings)",
|
||||||
|
preview_speed="Optional speed for preview (0.5 to 2.0, default: use your settings)",
|
||||||
|
)
|
||||||
|
@app_commands.choices(action=[
|
||||||
|
app_commands.Choice(name="list", value="list"),
|
||||||
|
app_commands.Choice(name="set", value="set"),
|
||||||
|
app_commands.Choice(name="current", value="current"),
|
||||||
|
app_commands.Choice(name="refresh", value="refresh"),
|
||||||
|
app_commands.Choice(name="preview", value="preview"),
|
||||||
|
])
|
||||||
|
async def voice_command(
|
||||||
|
interaction: discord.Interaction,
|
||||||
|
action: app_commands.Choice[str],
|
||||||
|
voice_name: str | None = None,
|
||||||
|
preview_pitch: int | None = None,
|
||||||
|
preview_speed: float | None = None,
|
||||||
|
):
|
||||||
|
if action.value == "list":
|
||||||
|
await self._handle_voice_list(interaction)
|
||||||
|
elif action.value == "set":
|
||||||
|
await self._handle_voice_set(interaction, voice_name)
|
||||||
|
elif action.value == "current":
|
||||||
|
await self._handle_voice_current(interaction)
|
||||||
|
elif action.value == "refresh":
|
||||||
|
await self._handle_voice_refresh(interaction)
|
||||||
|
elif action.value == "preview":
|
||||||
|
await self._handle_voice_preview(interaction, voice_name, preview_pitch, preview_speed)
|
||||||
|
|
||||||
|
@voice_command.autocomplete("voice_name")
|
||||||
|
async def voice_name_autocomplete(
|
||||||
|
interaction: discord.Interaction,
|
||||||
|
current: str
|
||||||
|
) -> list[app_commands.Choice[str]]:
|
||||||
|
voices = self.voice_manager.get_available_voices()
|
||||||
|
return [
|
||||||
|
app_commands.Choice(name=v, value=v)
|
||||||
|
for v in voices
|
||||||
|
if current.lower() in v.lower()
|
||||||
|
][:25]
|
||||||
|
|
||||||
|
def _setup_effects_commands(self) -> None:
|
||||||
|
"""Set up slash commands for audio effects management."""
|
||||||
|
print("Setting up effects commands...")
|
||||||
|
|
||||||
|
@self.tree.command(name="effects", description="Manage your TTS audio effects")
|
||||||
|
@app_commands.describe(
|
||||||
|
action="What to do",
|
||||||
|
effect_name="Name of the effect (for 'set' action)",
|
||||||
|
value="Value for the effect (for 'set' action)"
|
||||||
|
)
|
||||||
|
@app_commands.choices(action=[
|
||||||
|
app_commands.Choice(name="list", value="list"),
|
||||||
|
app_commands.Choice(name="set", value="set"),
|
||||||
|
app_commands.Choice(name="reset", value="reset"),
|
||||||
|
])
|
||||||
|
@app_commands.choices(effect_name=[
|
||||||
|
app_commands.Choice(name="pitch", value="pitch"),
|
||||||
|
app_commands.Choice(name="speed", value="speed"),
|
||||||
|
app_commands.Choice(name="echo", value="echo"),
|
||||||
|
app_commands.Choice(name="robot", value="robot"),
|
||||||
|
app_commands.Choice(name="chorus", value="chorus"),
|
||||||
|
app_commands.Choice(name="tremolo_depth", value="tremolo_depth"),
|
||||||
|
app_commands.Choice(name="tremolo_rate", value="tremolo_rate"),
|
||||||
|
])
|
||||||
|
async def effects_command(
|
||||||
|
interaction: discord.Interaction,
|
||||||
|
action: app_commands.Choice[str],
|
||||||
|
effect_name: app_commands.Choice[str] | None = None,
|
||||||
|
value: str | None = None
|
||||||
|
):
|
||||||
|
if action.value == "list":
|
||||||
|
await self._handle_effects_list(interaction)
|
||||||
|
elif action.value == "set":
|
||||||
|
await self._handle_effects_set(interaction, effect_name, value)
|
||||||
|
elif action.value == "reset":
|
||||||
|
await self._handle_effects_reset(interaction)
|
||||||
|
|
||||||
|
async def _handle_effects_list(self, interaction: discord.Interaction) -> None:
|
||||||
|
"""Handle /effects list command."""
|
||||||
|
effects = self.voice_manager.get_user_effects(interaction.user.id)
|
||||||
|
active_count = self.voice_manager.count_active_effects(interaction.user.id)
|
||||||
|
|
||||||
|
lines = ["**Your Audio Effects:**\n"]
|
||||||
|
|
||||||
|
# Pitch
|
||||||
|
pitch_desc = AudioEffects.get_effect_description("pitch")
|
||||||
|
pitch_val = AudioEffects.format_effect_value("pitch", effects["pitch"])
|
||||||
|
lines.append(f"🎵 **Pitch**: {pitch_val}")
|
||||||
|
lines.append(f" {pitch_desc}\n")
|
||||||
|
|
||||||
|
# Speed
|
||||||
|
speed_desc = AudioEffects.get_effect_description("speed")
|
||||||
|
speed_val = AudioEffects.format_effect_value("speed", effects["speed"])
|
||||||
|
lines.append(f"⚡ **Speed**: {speed_val}")
|
||||||
|
lines.append(f" {speed_desc}\n")
|
||||||
|
|
||||||
|
# Echo
|
||||||
|
echo_desc = AudioEffects.get_effect_description("echo")
|
||||||
|
echo_val = AudioEffects.format_effect_value("echo", effects["echo"])
|
||||||
|
lines.append(f"🔊 **Echo**: {echo_val}")
|
||||||
|
lines.append(f" {echo_desc}\n")
|
||||||
|
|
||||||
|
# Robot
|
||||||
|
robot_desc = AudioEffects.get_effect_description("robot")
|
||||||
|
robot_val = AudioEffects.format_effect_value("robot", effects["robot"])
|
||||||
|
lines.append(f"🤖 **Robot**: {robot_val}")
|
||||||
|
lines.append(f" {robot_desc}\n")
|
||||||
|
|
||||||
|
# Chorus
|
||||||
|
chorus_desc = AudioEffects.get_effect_description("chorus")
|
||||||
|
chorus_val = AudioEffects.format_effect_value("chorus", effects["chorus"])
|
||||||
|
lines.append(f"🎶 **Chorus**: {chorus_val}")
|
||||||
|
lines.append(f" {chorus_desc}\n")
|
||||||
|
|
||||||
|
# Tremolo Depth
|
||||||
|
tremolo_depth_desc = AudioEffects.get_effect_description("tremolo_depth")
|
||||||
|
tremolo_depth_val = AudioEffects.format_effect_value("tremolo_depth", effects["tremolo_depth"])
|
||||||
|
lines.append(f"〰️ **Tremolo Depth**: {tremolo_depth_val}")
|
||||||
|
lines.append(f" {tremolo_depth_desc}\n")
|
||||||
|
|
||||||
|
# Tremolo Rate
|
||||||
|
tremolo_rate_desc = AudioEffects.get_effect_description("tremolo_rate")
|
||||||
|
tremolo_rate_val = AudioEffects.format_effect_value("tremolo_rate", effects["tremolo_rate"])
|
||||||
|
lines.append(f"📳 **Tremolo Rate**: {tremolo_rate_val}")
|
||||||
|
lines.append(f" {tremolo_rate_desc}\n")
|
||||||
|
|
||||||
|
# Active count warning
|
||||||
|
lines.append(f"**Active Effects**: {active_count}")
|
||||||
|
if active_count > 2:
|
||||||
|
lines.append("⚠️ You have more than 2 active effects. Processing may be slower!")
|
||||||
|
elif active_count > 0:
|
||||||
|
lines.append("ℹ️ Add more effects for fun variations (may slow processing)")
|
||||||
|
|
||||||
|
lines.append(f"\n*Use `/effects set <effect> <value>` to change settings*")
|
||||||
|
lines.append(f"*Use `/effects reset` to clear all effects*")
|
||||||
|
|
||||||
|
await interaction.response.send_message(
|
||||||
|
"\n".join(lines),
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _handle_effects_set(
|
||||||
|
self,
|
||||||
|
interaction: discord.Interaction,
|
||||||
|
effect_name: app_commands.Choice[str] | None,
|
||||||
|
value: str | None
|
||||||
|
) -> None:
|
||||||
|
"""Handle /effects set command."""
|
||||||
|
if not effect_name or value is None:
|
||||||
|
await interaction.response.send_message(
|
||||||
|
"❌ Please provide both effect name and value. Example: `/effects set pitch 3`",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
success, message = self.voice_manager.set_user_effect(
|
||||||
|
interaction.user.id,
|
||||||
|
effect_name.value,
|
||||||
|
value
|
||||||
|
)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
await interaction.response.send_message(
|
||||||
|
f"✅ {message}",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await interaction.response.send_message(
|
||||||
|
f"❌ {message}",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _handle_effects_reset(self, interaction: discord.Interaction) -> None:
|
||||||
|
"""Handle /effects reset command with confirmation UI."""
|
||||||
|
# Check if user has any effects to reset
|
||||||
|
active_count = self.voice_manager.count_active_effects(interaction.user.id)
|
||||||
|
|
||||||
|
if active_count == 0:
|
||||||
|
await interaction.response.send_message(
|
||||||
|
"ℹ️ You don't have any active effects to reset.",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Create confirmation buttons
|
||||||
|
class ConfirmResetView(discord.ui.View):
|
||||||
|
def __init__(self, voice_manager, user_id):
|
||||||
|
super().__init__(timeout=30)
|
||||||
|
self.voice_manager = voice_manager
|
||||||
|
self.user_id = user_id
|
||||||
|
self.confirmed = False
|
||||||
|
|
||||||
|
@discord.ui.button(label="✅ Yes, Reset All", style=discord.ButtonStyle.danger)
|
||||||
|
async def confirm_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||||||
|
if interaction.user.id != self.user_id:
|
||||||
|
await interaction.response.send_message("This button is not for you!", ephemeral=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
self.voice_manager.reset_user_effects(self.user_id)
|
||||||
|
self.confirmed = True
|
||||||
|
await interaction.response.edit_message(
|
||||||
|
content="✅ All audio effects have been reset to defaults!",
|
||||||
|
view=None
|
||||||
|
)
|
||||||
|
self.stop()
|
||||||
|
|
||||||
|
@discord.ui.button(label="❌ Cancel", style=discord.ButtonStyle.secondary)
|
||||||
|
async def cancel_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||||||
|
if interaction.user.id != self.user_id:
|
||||||
|
await interaction.response.send_message("This button is not for you!", ephemeral=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
await interaction.response.edit_message(
|
||||||
|
content="❌ Reset cancelled. Your effects remain unchanged.",
|
||||||
|
view=None
|
||||||
|
)
|
||||||
|
self.stop()
|
||||||
|
|
||||||
|
view = ConfirmResetView(self.voice_manager, interaction.user.id)
|
||||||
|
|
||||||
|
await interaction.response.send_message(
|
||||||
|
f"⚠️ **Reset Confirmation**\n\n"
|
||||||
|
f"You have {active_count} active effect(s).\n"
|
||||||
|
f"This will reset **all** your audio effects to defaults:\n"
|
||||||
|
f"• Pitch: 0 (normal)\n"
|
||||||
|
f"• Speed: 1.0x (normal)\n\n"
|
||||||
|
f"Are you sure you want to continue?",
|
||||||
|
view=view,
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _handle_voice_list(self, interaction: discord.Interaction) -> None:
|
||||||
|
"""Handle /voice list command."""
|
||||||
|
voices = self.voice_manager.get_available_voices()
|
||||||
|
loaded = self.voice_manager.get_loaded_voices()
|
||||||
|
user_voice = self.voice_manager.get_user_voice(interaction.user.id)
|
||||||
|
|
||||||
|
if not voices:
|
||||||
|
await interaction.response.send_message(
|
||||||
|
"❌ No voices available. Add .wav files to the voices directory.",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
lines = ["**Available Voices:**\n"]
|
||||||
|
for voice in voices:
|
||||||
|
status = []
|
||||||
|
if voice == user_voice:
|
||||||
|
status.append("✅ your voice")
|
||||||
|
if voice in loaded:
|
||||||
|
status.append("📦 loaded")
|
||||||
|
status_str = f" ({', '.join(status)})" if status else ""
|
||||||
|
lines.append(f"• `{voice}`{status_str}")
|
||||||
|
|
||||||
|
lines.append(f"\n*Use `/voice set <name>` to change your voice.*")
|
||||||
|
|
||||||
|
await interaction.response.send_message(
|
||||||
|
"\n".join(lines),
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _handle_voice_set(self, interaction: discord.Interaction, voice_name: str | None) -> None:
|
||||||
|
"""Handle /voice set command."""
|
||||||
|
if not voice_name:
|
||||||
|
await interaction.response.send_message(
|
||||||
|
"❌ Please provide a voice name. Use `/voice list` to see available voices.",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
voice_name = voice_name.lower()
|
||||||
|
|
||||||
|
if not self.voice_manager.is_voice_available(voice_name):
|
||||||
|
voices = self.voice_manager.get_available_voices()
|
||||||
|
await interaction.response.send_message(
|
||||||
|
f"❌ Voice `{voice_name}` not found.\n"
|
||||||
|
f"Available voices: {', '.join(f'`{v}`' for v in voices)}",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check if voice needs to be loaded
|
||||||
|
needs_loading = not self.voice_manager.is_voice_loaded(voice_name)
|
||||||
|
|
||||||
|
if needs_loading:
|
||||||
|
await interaction.response.send_message(
|
||||||
|
f"⏳ Loading voice `{voice_name}` for the first time... This may take a moment.",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
await asyncio.to_thread(self.voice_manager.get_voice_state, voice_name)
|
||||||
|
except Exception as e:
|
||||||
|
await interaction.followup.send(
|
||||||
|
f"❌ Failed to load voice `{voice_name}`: {e}",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
self.voice_manager.set_user_voice(interaction.user.id, voice_name)
|
||||||
|
|
||||||
|
if needs_loading:
|
||||||
|
await interaction.followup.send(
|
||||||
|
f"✅ Voice changed to `{voice_name}`!",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await interaction.response.send_message(
|
||||||
|
f"✅ Voice changed to `{voice_name}`!",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _handle_voice_current(self, interaction: discord.Interaction) -> None:
|
||||||
|
"""Handle /voice current command."""
|
||||||
|
voice = self.voice_manager.get_user_voice(interaction.user.id)
|
||||||
|
if voice:
|
||||||
|
loaded = "(loaded)" if self.voice_manager.is_voice_loaded(voice) else "(not yet loaded)"
|
||||||
|
await interaction.response.send_message(
|
||||||
|
f"🎤 Your current voice: `{voice}` {loaded}",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await interaction.response.send_message(
|
||||||
|
"❌ No voice set. Use `/voice set <name>` to choose a voice.",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _handle_voice_refresh(self, interaction: discord.Interaction) -> None:
|
||||||
|
"""Handle /voice refresh command."""
|
||||||
|
await interaction.response.send_message(
|
||||||
|
"🔄 Scanning for new voices...",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
|
||||||
|
added, removed = await asyncio.to_thread(self.voice_manager.refresh_voices)
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
if added:
|
||||||
|
lines.append(f"✅ **New voices found:** {', '.join(f'`{v}`' for v in added)}")
|
||||||
|
if removed:
|
||||||
|
lines.append(f"❌ **Voices removed:** {', '.join(f'`{v}`' for v in removed)}")
|
||||||
|
if not added and not removed:
|
||||||
|
lines.append("No changes detected.")
|
||||||
|
|
||||||
|
total = len(self.voice_manager.get_available_voices())
|
||||||
|
lines.append(f"\n*Total voices available: {total}*")
|
||||||
|
|
||||||
|
await interaction.followup.send(
|
||||||
|
"\n".join(lines),
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _handle_voice_preview(
|
||||||
|
self,
|
||||||
|
interaction: discord.Interaction,
|
||||||
|
voice_name: str | None,
|
||||||
|
preview_pitch: int | None = None,
|
||||||
|
preview_speed: float | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Handle /voice preview command."""
|
||||||
|
if not voice_name:
|
||||||
|
await interaction.response.send_message(
|
||||||
|
"❌ Please provide a voice name. Use `/voice list` to see available voices.",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check if user is in a voice channel
|
||||||
|
if interaction.user.voice is None:
|
||||||
|
await interaction.response.send_message(
|
||||||
|
"❌ You need to be in a voice channel to hear a preview!",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
voice_name = voice_name.lower()
|
||||||
|
|
||||||
|
# Validate voice exists
|
||||||
|
if not self.voice_manager.is_voice_available(voice_name):
|
||||||
|
voices = self.voice_manager.get_available_voices()
|
||||||
|
await interaction.response.send_message(
|
||||||
|
f"❌ Voice `{voice_name}` not found.\n"
|
||||||
|
f"Available voices: {', '.join(f'`{v}`' for v in voices)}",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Validate pitch if provided
|
||||||
|
if preview_pitch is not None:
|
||||||
|
is_valid, error_msg = AudioEffects.validate_effect("pitch", preview_pitch)
|
||||||
|
if not is_valid:
|
||||||
|
await interaction.response.send_message(
|
||||||
|
f"❌ Invalid pitch value: {error_msg}",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Validate speed if provided
|
||||||
|
if preview_speed is not None:
|
||||||
|
is_valid, error_msg = AudioEffects.validate_effect("speed", preview_speed)
|
||||||
|
if not is_valid:
|
||||||
|
await interaction.response.send_message(
|
||||||
|
f"❌ Invalid speed value: {error_msg}",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Select a random preview line
|
||||||
|
preview_text = random.choice(PREVIEW_LINES)
|
||||||
|
|
||||||
|
# Create a preview message object with all necessary attributes
|
||||||
|
class PreviewMessage:
|
||||||
|
def __init__(self, user, channel, voice_channel):
|
||||||
|
self.author = user
|
||||||
|
self.channel = channel
|
||||||
|
self._voice_channel = voice_channel
|
||||||
|
|
||||||
|
@property
|
||||||
|
def voice(self):
|
||||||
|
class VoiceState:
|
||||||
|
def __init__(self, channel):
|
||||||
|
self.channel = channel
|
||||||
|
return VoiceState(self._voice_channel)
|
||||||
|
|
||||||
|
preview_message = PreviewMessage(
|
||||||
|
interaction.user,
|
||||||
|
interaction.channel,
|
||||||
|
interaction.user.voice.channel
|
||||||
|
)
|
||||||
|
|
||||||
|
# Use user's current effects if not overridden
|
||||||
|
user_effects = self.voice_manager.get_user_effects(interaction.user.id)
|
||||||
|
effect_overrides = {}
|
||||||
|
if preview_pitch is not None:
|
||||||
|
effect_overrides["pitch"] = preview_pitch
|
||||||
|
if preview_speed is not None:
|
||||||
|
effect_overrides["speed"] = preview_speed
|
||||||
|
|
||||||
|
# Use default effects from user settings for preview
|
||||||
|
preview_effects = user_effects.copy()
|
||||||
|
preview_effects.update(effect_overrides)
|
||||||
|
|
||||||
|
# Queue the preview with voice override and effects
|
||||||
|
await self.message_queue.put((preview_message, preview_text, voice_name, preview_effects))
|
||||||
|
|
||||||
|
# Build effect description
|
||||||
|
effect_desc = []
|
||||||
|
if preview_effects.get("pitch", 0) != 0:
|
||||||
|
effect_desc.append(f"pitch: {preview_effects['pitch']:+d}")
|
||||||
|
if preview_effects.get("speed", 1.0) != 1.0:
|
||||||
|
effect_desc.append(f"speed: {preview_effects['speed']:.1f}x")
|
||||||
|
|
||||||
|
effect_str = f" (with {', '.join(effect_desc)})" if effect_desc else ""
|
||||||
|
|
||||||
|
await interaction.response.send_message(
|
||||||
|
f"⏳ Queued preview for `{voice_name}`{effect_str}. Sample: \"{preview_text[:50]}{'...' if len(preview_text) > 50 else ''}\"",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
|
||||||
async def setup_hook(self) -> None:
|
async def setup_hook(self) -> None:
|
||||||
"""Called when the bot is starting up."""
|
"""Called when the bot is starting up."""
|
||||||
print("Initializing TTS...")
|
print("Initializing TTS...")
|
||||||
await asyncio.to_thread(self.tts_handler.load)
|
print("Discovering available voices...")
|
||||||
|
await asyncio.to_thread(self.voice_manager.discover_voices)
|
||||||
|
await asyncio.to_thread(self.voice_manager.load_model)
|
||||||
|
|
||||||
|
# Pre-load the default voice if one is set
|
||||||
|
default = self.voice_manager.default_voice
|
||||||
|
if default:
|
||||||
|
print(f"Pre-loading default voice: {default}")
|
||||||
|
await asyncio.to_thread(self.voice_manager.get_voice_state, default)
|
||||||
|
|
||||||
self.loop.create_task(self.process_queue())
|
self.loop.create_task(self.process_queue())
|
||||||
|
self.loop.create_task(self.check_inactivity())
|
||||||
|
|
||||||
async def on_ready(self) -> None:
|
async def on_ready(self) -> None:
|
||||||
print(f"Logged in as {self.user}")
|
print(f"Logged in as {self.user}")
|
||||||
|
print(f"Bot ID: {self.user.id}")
|
||||||
print(f"Monitoring channel ID: {Config.TEXT_CHANNEL_ID}")
|
print(f"Monitoring channel ID: {Config.TEXT_CHANNEL_ID}")
|
||||||
print("Bot is ready!")
|
print(f"Available voices: {', '.join(self.voice_manager.get_available_voices())}")
|
||||||
|
|
||||||
|
# Log registered commands before sync
|
||||||
|
registered_cmds = list(self.tree.get_commands())
|
||||||
|
print(f"\nCommands in tree before sync: {len(registered_cmds)}")
|
||||||
|
for cmd in registered_cmds:
|
||||||
|
print(f" - /{cmd.name}")
|
||||||
|
|
||||||
|
# Sync slash commands to each guild for immediate availability
|
||||||
|
print(f"\nConnected to {len(self.guilds)} guild(s):")
|
||||||
|
for guild in self.guilds:
|
||||||
|
print(f" - {guild.name} (ID: {guild.id})")
|
||||||
|
|
||||||
|
print("\nSyncing slash commands to guilds...")
|
||||||
|
sync_count = 0
|
||||||
|
for guild in self.guilds:
|
||||||
|
try:
|
||||||
|
# Copy global commands to this guild before syncing
|
||||||
|
# This is necessary for guild-specific command registration
|
||||||
|
self.tree.copy_global_to(guild=discord.Object(guild.id))
|
||||||
|
print(f" 📋 Copied global commands to guild: {guild.name}")
|
||||||
|
|
||||||
|
synced = await self.tree.sync(guild=discord.Object(guild.id))
|
||||||
|
print(f" ✓ Synced {len(synced)} commands to guild: {guild.name}")
|
||||||
|
for cmd in synced:
|
||||||
|
print(f" - /{cmd.name}")
|
||||||
|
sync_count += 1
|
||||||
|
except discord.errors.Forbidden as e:
|
||||||
|
print(f" ✗ Forbidden: Cannot sync to guild {guild.name}. Missing 'applications.commands' scope!")
|
||||||
|
print(f" Error: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ✗ Failed to sync to guild {guild.name}: {type(e).__name__}: {e}")
|
||||||
|
|
||||||
|
if sync_count == 0:
|
||||||
|
print("\n⚠️ WARNING: No guilds were synced! Commands won't appear in Discord.")
|
||||||
|
print(" Make sure the bot was invited with 'applications.commands' scope.")
|
||||||
|
else:
|
||||||
|
print(f"\n✓ Successfully synced to {sync_count}/{len(self.guilds)} guild(s)")
|
||||||
|
|
||||||
|
print("\nBot is ready!")
|
||||||
|
|
||||||
async def on_message(self, message: discord.Message) -> None:
|
async def on_message(self, message: discord.Message) -> None:
|
||||||
if message.author.bot:
|
if message.author.bot:
|
||||||
@@ -54,16 +627,36 @@ class TTSBot(commands.Bot):
|
|||||||
async def process_queue(self) -> None:
|
async def process_queue(self) -> None:
|
||||||
"""Process messages from the queue one at a time."""
|
"""Process messages from the queue one at a time."""
|
||||||
while True:
|
while True:
|
||||||
message, text = await self.message_queue.get()
|
queue_item = await self.message_queue.get()
|
||||||
|
|
||||||
|
# Handle queue items:
|
||||||
|
# - (message, text) - regular message
|
||||||
|
# - (message, text, voice_override) - preview with voice override
|
||||||
|
# - (message, text, voice_override, effects_dict) - preview with effect overrides
|
||||||
|
if len(queue_item) == 4 and isinstance(queue_item[3], dict):
|
||||||
|
message, text, voice_override, effect_overrides = queue_item
|
||||||
|
elif len(queue_item) == 3:
|
||||||
|
message, text, voice_override = queue_item
|
||||||
|
effect_overrides = {}
|
||||||
|
else:
|
||||||
|
message, text = queue_item
|
||||||
|
voice_override = None
|
||||||
|
effect_overrides = {}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await self.speak_message(message, text)
|
await self.speak_message(message, text, voice_override, effect_overrides)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error processing message: {e}")
|
print(f"Error processing message: {e}")
|
||||||
finally:
|
finally:
|
||||||
self.message_queue.task_done()
|
self.message_queue.task_done()
|
||||||
|
|
||||||
async def speak_message(self, message: discord.Message, text: str) -> None:
|
async def speak_message(
|
||||||
|
self,
|
||||||
|
message: discord.Message,
|
||||||
|
text: str,
|
||||||
|
voice_override: str | None = None,
|
||||||
|
effect_overrides: dict | None = None,
|
||||||
|
) -> None:
|
||||||
"""Generate TTS and play it in the user's voice channel."""
|
"""Generate TTS and play it in the user's voice channel."""
|
||||||
if message.author.voice is None:
|
if message.author.voice is None:
|
||||||
return
|
return
|
||||||
@@ -75,7 +668,36 @@ class TTSBot(commands.Bot):
|
|||||||
return
|
return
|
||||||
|
|
||||||
print(f"Generating TTS for: {text[:50]}...")
|
print(f"Generating TTS for: {text[:50]}...")
|
||||||
wav_bytes = await asyncio.to_thread(self.tts_handler.generate_wav_bytes, text)
|
|
||||||
|
# Get voice state (use override for previews, otherwise user's voice)
|
||||||
|
try:
|
||||||
|
if voice_override:
|
||||||
|
voice_state = await asyncio.to_thread(
|
||||||
|
self.voice_manager.get_voice_state, voice_override
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
user_id = message.author.id
|
||||||
|
voice_state = await asyncio.to_thread(
|
||||||
|
self.voice_manager.get_user_voice_state, user_id
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error loading voice: {e}")
|
||||||
|
if not voice_override:
|
||||||
|
await message.channel.send(
|
||||||
|
f"{message.author.mention}, failed to load your voice. Use `/voice set` to choose a voice.",
|
||||||
|
delete_after=5
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Get user's effects and apply any overrides
|
||||||
|
user_effects = self.voice_manager.get_user_effects(message.author.id)
|
||||||
|
effects = user_effects.copy()
|
||||||
|
if effect_overrides:
|
||||||
|
effects.update(effect_overrides)
|
||||||
|
|
||||||
|
wav_bytes = await asyncio.to_thread(
|
||||||
|
self._generate_wav_bytes, voice_state, text, effects
|
||||||
|
)
|
||||||
|
|
||||||
audio_source = discord.FFmpegPCMAudio(
|
audio_source = discord.FFmpegPCMAudio(
|
||||||
io.BytesIO(wav_bytes),
|
io.BytesIO(wav_bytes),
|
||||||
@@ -88,16 +710,84 @@ class TTSBot(commands.Bot):
|
|||||||
|
|
||||||
play_complete = asyncio.Event()
|
play_complete = asyncio.Event()
|
||||||
|
|
||||||
def after_playing(error):
|
def after_playing(error: Exception | None) -> None:
|
||||||
if error:
|
if error:
|
||||||
print(f"Playback error: {error}")
|
print(f"Playback error: {error}")
|
||||||
self.loop.call_soon_threadsafe(play_complete.set)
|
self.loop.call_soon_threadsafe(play_complete.set)
|
||||||
|
|
||||||
voice_client.play(audio_source, after=after_playing)
|
voice_client.play(audio_source, after=after_playing)
|
||||||
|
self.last_activity = time.time()
|
||||||
print(f"Playing audio in {voice_channel.name}")
|
print(f"Playing audio in {voice_channel.name}")
|
||||||
|
|
||||||
await play_complete.wait()
|
await play_complete.wait()
|
||||||
|
|
||||||
|
def _generate_wav_bytes(
|
||||||
|
self,
|
||||||
|
voice_state: Any,
|
||||||
|
text: str,
|
||||||
|
effects: dict,
|
||||||
|
) -> bytes:
|
||||||
|
"""Generate audio and return as WAV file bytes."""
|
||||||
|
model = self.voice_manager.model
|
||||||
|
if model is None:
|
||||||
|
raise RuntimeError("Model not loaded")
|
||||||
|
|
||||||
|
audio = model.generate_audio(voice_state, text)
|
||||||
|
audio_np = audio.numpy()
|
||||||
|
|
||||||
|
# Ensure audio is 2D [samples, channels] for storage
|
||||||
|
if audio_np.ndim == 1:
|
||||||
|
audio_np = audio_np.reshape(-1, 1)
|
||||||
|
|
||||||
|
# Apply audio effects if any are active
|
||||||
|
pitch = effects.get("pitch", AudioEffects.PITCH_DEFAULT)
|
||||||
|
speed = effects.get("speed", AudioEffects.SPEED_DEFAULT)
|
||||||
|
echo = effects.get("echo", AudioEffects.ECHO_DEFAULT)
|
||||||
|
robot = effects.get("robot", AudioEffects.ROBOT_DEFAULT)
|
||||||
|
chorus = effects.get("chorus", AudioEffects.CHORUS_DEFAULT)
|
||||||
|
tremolo_depth = effects.get("tremolo_depth", AudioEffects.TREMOLO_DEPTH_DEFAULT)
|
||||||
|
tremolo_rate = effects.get("tremolo_rate", AudioEffects.TREMOLO_RATE_DEFAULT)
|
||||||
|
|
||||||
|
if any([pitch != 0, speed != 1.0, echo > 0, robot > 0, chorus > 0, tremolo_depth > 0]):
|
||||||
|
print(f"Applying {AudioEffects.count_active_effects(**effects)} effect(s)...")
|
||||||
|
# Squeeze to 1D for librosa effects, then reshape back
|
||||||
|
audio_1d = audio_np.squeeze()
|
||||||
|
audio_1d, show_processing = AudioEffects.apply_effects(
|
||||||
|
audio_1d, model.sample_rate,
|
||||||
|
pitch, speed, echo, robot, chorus, tremolo_depth, tremolo_rate
|
||||||
|
)
|
||||||
|
# Reshape back to 2D
|
||||||
|
audio_np = audio_1d.reshape(-1, 1)
|
||||||
|
if show_processing:
|
||||||
|
print("⚠️ Audio processing took longer than expected due to effects")
|
||||||
|
|
||||||
|
max_val = np.max(np.abs(audio_np))
|
||||||
|
if max_val > 0:
|
||||||
|
audio_np = audio_np / max_val
|
||||||
|
audio_int16 = (audio_np * 32767).astype(np.int16)
|
||||||
|
|
||||||
|
wav_buffer = io.BytesIO()
|
||||||
|
wavfile.write(wav_buffer, model.sample_rate, audio_int16)
|
||||||
|
wav_buffer.seek(0)
|
||||||
|
return wav_buffer.read()
|
||||||
|
|
||||||
|
async def check_inactivity(self) -> None:
|
||||||
|
"""Periodically check for inactivity and disconnect from voice channels."""
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(60) # Check every minute
|
||||||
|
|
||||||
|
if self.last_activity == 0.0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
elapsed = time.time() - self.last_activity
|
||||||
|
if elapsed >= INACTIVITY_TIMEOUT:
|
||||||
|
# Disconnect from all voice channels
|
||||||
|
for guild in self.guilds:
|
||||||
|
if guild.voice_client is not None:
|
||||||
|
print(f"Disconnecting from {guild.name} due to inactivity")
|
||||||
|
await guild.voice_client.disconnect()
|
||||||
|
self.last_activity = 0.0
|
||||||
|
|
||||||
async def ensure_voice_connection(self, channel: discord.VoiceChannel) -> discord.VoiceClient | None:
|
async def ensure_voice_connection(self, channel: discord.VoiceChannel) -> discord.VoiceClient | None:
|
||||||
"""Ensure we're connected to the specified voice channel."""
|
"""Ensure we're connected to the specified voice channel."""
|
||||||
guild = channel.guild
|
guild = channel.guild
|
||||||
@@ -110,13 +800,34 @@ class TTSBot(commands.Bot):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
voice_client = await channel.connect(timeout=10.0)
|
voice_client = await channel.connect(timeout=10.0)
|
||||||
|
self.last_activity = time.time()
|
||||||
return voice_client
|
return voice_client
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Failed to connect to voice channel: {e}")
|
print(f"Failed to connect to voice channel: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def auto_update_dependencies() -> None:
|
||||||
|
"""Auto-update pip packages on startup."""
|
||||||
|
try:
|
||||||
|
print("Checking for package updates...")
|
||||||
|
result = subprocess.run(
|
||||||
|
[sys.executable, "-m", "pip", "install", "-r", "requirements.txt", "-U", "-q"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
check=False
|
||||||
|
)
|
||||||
|
if result.returncode == 0:
|
||||||
|
print("Packages updated successfully (or already up to date)")
|
||||||
|
else:
|
||||||
|
print(f"Warning: Package update had issues: {result.stderr}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Warning: Could not auto-update packages: {e}")
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
auto_update_dependencies()
|
||||||
|
|
||||||
errors = Config.validate()
|
errors = Config.validate()
|
||||||
if errors:
|
if errors:
|
||||||
print("Configuration errors:")
|
print("Configuration errors:")
|
||||||
|
|||||||
12
config.py
Normal file → Executable file
12
config.py
Normal file → Executable file
@@ -1,13 +1,17 @@
|
|||||||
import os
|
import os
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
load_dotenv()
|
# Load appropriate .env file based on ENV_MODE
|
||||||
|
env_mode = os.getenv("ENV_MODE", "production")
|
||||||
|
env_file = ".env.testing" if env_mode == "testing" else ".env"
|
||||||
|
load_dotenv(env_file)
|
||||||
|
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
DISCORD_TOKEN: str = os.getenv("DISCORD_TOKEN", "")
|
DISCORD_TOKEN: str = os.getenv("DISCORD_TOKEN", "")
|
||||||
TEXT_CHANNEL_ID: int = int(os.getenv("TEXT_CHANNEL_ID", "0"))
|
TEXT_CHANNEL_ID: int = int(os.getenv("TEXT_CHANNEL_ID", "0"))
|
||||||
VOICE_WAV_PATH: str = os.getenv("VOICE_WAV_PATH", "./voice.wav")
|
VOICES_DIR: str = os.getenv("VOICES_DIR", "./voices")
|
||||||
|
DEFAULT_VOICE: str | None = os.getenv("DEFAULT_VOICE", None)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def validate(cls) -> list[str]:
|
def validate(cls) -> list[str]:
|
||||||
@@ -17,6 +21,6 @@ class Config:
|
|||||||
errors.append("DISCORD_TOKEN is not set")
|
errors.append("DISCORD_TOKEN is not set")
|
||||||
if cls.TEXT_CHANNEL_ID == 0:
|
if cls.TEXT_CHANNEL_ID == 0:
|
||||||
errors.append("TEXT_CHANNEL_ID is not set")
|
errors.append("TEXT_CHANNEL_ID is not set")
|
||||||
if not os.path.exists(cls.VOICE_WAV_PATH):
|
if not os.path.exists(cls.VOICES_DIR):
|
||||||
errors.append(f"Voice WAV file not found: {cls.VOICE_WAV_PATH}")
|
errors.append(f"Voices directory not found: {cls.VOICES_DIR}")
|
||||||
return errors
|
return errors
|
||||||
|
|||||||
4
launch.sh
Executable file
4
launch.sh
Executable file
@@ -0,0 +1,4 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
cd /home/artanis/Documents/Vox/
|
||||||
|
source venv/bin/activate
|
||||||
|
python bot.py
|
||||||
0
media/Subnautica/CyclopsEngineOff.oga
Normal file → Executable file
0
media/Subnautica/CyclopsEngineOff.oga
Normal file → Executable file
0
media/Subnautica/CyclopsEngineOn.oga
Normal file → Executable file
0
media/Subnautica/CyclopsEngineOn.oga
Normal file → Executable file
0
media/Subnautica/CyclopsOverheat.oga
Normal file → Executable file
0
media/Subnautica/CyclopsOverheat.oga
Normal file → Executable file
0
media/Subnautica/Cyclops_Welcome.oga
Normal file → Executable file
0
media/Subnautica/Cyclops_Welcome.oga
Normal file → Executable file
0
media/Subnautica/Cyclops_Welcome2.oga
Normal file → Executable file
0
media/Subnautica/Cyclops_Welcome2.oga
Normal file → Executable file
0
media/TF2/Ronin/diag_gs_titanRonin_embark_03.wav
Normal file → Executable file
0
media/TF2/Ronin/diag_gs_titanRonin_embark_03.wav
Normal file → Executable file
0
media/TF2/Ronin/diag_gs_titanRonin_embark_05.wav
Normal file → Executable file
0
media/TF2/Ronin/diag_gs_titanRonin_embark_05.wav
Normal file → Executable file
0
media/TF2/Ronin/diag_gs_titanRonin_embark_06.wav
Normal file → Executable file
0
media/TF2/Ronin/diag_gs_titanRonin_embark_06.wav
Normal file → Executable file
0
media/TF2/Ronin/diag_gs_titanRonin_embark_08.wav
Normal file → Executable file
0
media/TF2/Ronin/diag_gs_titanRonin_embark_08.wav
Normal file → Executable file
0
media/TF2/Ronin/diag_gs_titanRonin_embark_09.wav
Normal file → Executable file
0
media/TF2/Ronin/diag_gs_titanRonin_embark_09.wav
Normal file → Executable file
0
media/TF2/Ronin/diag_gs_titanRonin_embark_10.wav
Normal file → Executable file
0
media/TF2/Ronin/diag_gs_titanRonin_embark_10.wav
Normal file → Executable file
0
media/TF2/Ronin/diag_gs_titanRonin_embark_11.wav
Normal file → Executable file
0
media/TF2/Ronin/diag_gs_titanRonin_embark_11.wav
Normal file → Executable file
19
numba_config.py
Executable file
19
numba_config.py
Executable file
@@ -0,0 +1,19 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# Set a writable cache directory for Numba
|
||||||
|
# This is crucial when running as a systemd service with restricted home directory access.
|
||||||
|
# The cache will be created in the bot's root directory.
|
||||||
|
CACHE_DIR = os.path.join(os.path.dirname(__file__), '.numba_cache')
|
||||||
|
|
||||||
|
if not os.path.exists(CACHE_DIR):
|
||||||
|
try:
|
||||||
|
os.makedirs(CACHE_DIR)
|
||||||
|
print(f"Numba cache directory created at: {CACHE_DIR}")
|
||||||
|
except OSError as e:
|
||||||
|
print(f"Error creating Numba cache directory: {e}", file=sys.stderr)
|
||||||
|
|
||||||
|
# Set the environment variable for Numba
|
||||||
|
os.environ['NUMBA_CACHE_DIR'] = CACHE_DIR
|
||||||
|
|
||||||
|
print(f"Numba cache directory set to: {os.environ.get('NUMBA_CACHE_DIR')}")
|
||||||
37
pockettts.service
Executable file
37
pockettts.service
Executable file
@@ -0,0 +1,37 @@
|
|||||||
|
[Unit]
|
||||||
|
Description=Pocket TTS Discord Bot
|
||||||
|
After=network-online.target
|
||||||
|
Wants=network-online.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
# Replace with your username
|
||||||
|
User=YOUR_USERNAME
|
||||||
|
Group=YOUR_USERNAME
|
||||||
|
|
||||||
|
# Replace with the actual path to your bot directory
|
||||||
|
WorkingDirectory=/home/YOUR_USERNAME/PocketTTSBot
|
||||||
|
|
||||||
|
# Use the Python from the virtual environment
|
||||||
|
ExecStart=/home/YOUR_USERNAME/PocketTTSBot/venv/bin/python bot.py
|
||||||
|
|
||||||
|
# Restart on failure
|
||||||
|
Restart=on-failure
|
||||||
|
RestartSec=10
|
||||||
|
|
||||||
|
# Give the bot time to gracefully shutdown
|
||||||
|
TimeoutStopSec=30
|
||||||
|
|
||||||
|
# Logging
|
||||||
|
StandardOutput=journal
|
||||||
|
StandardError=journal
|
||||||
|
SyslogIdentifier=pockettts
|
||||||
|
|
||||||
|
# Security hardening (optional but recommended)
|
||||||
|
NoNewPrivileges=true
|
||||||
|
ProtectSystem=strict
|
||||||
|
ProtectHome=read-only
|
||||||
|
ReadWritePaths=/home/YOUR_USERNAME/PocketTTSBot/voices
|
||||||
|
PrivateTmp=true
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
||||||
0
requirements.txt
Normal file → Executable file
0
requirements.txt
Normal file → Executable file
140
research/overview.md
Executable file
140
research/overview.md
Executable file
@@ -0,0 +1,140 @@
|
|||||||
|
# Vox - Discord Text-to-Speech Bot
|
||||||
|
|
||||||
|
A Python-based Discord bot that generates neural text-to-speech using voice cloning from reference WAV files.
|
||||||
|
|
||||||
|
## Project Structure
|
||||||
|
|
||||||
|
```
|
||||||
|
Vox/
|
||||||
|
├── bot.py # Main entry point, Discord bot implementation
|
||||||
|
├── config.py # Configuration management using environment variables
|
||||||
|
├── voice_manager.py # Voice discovery, loading, and user preferences
|
||||||
|
├── audio_effects.py # Audio post-processing effects (7 effects)
|
||||||
|
├── audio_preprocessor.py # Audio preprocessing for voice cloning
|
||||||
|
├── numba_config.py # Numba JIT compiler cache configuration
|
||||||
|
├── requirements.txt # Python dependencies
|
||||||
|
├── launch.sh # Shell script to start the bot
|
||||||
|
├── pockettts.service # Systemd service file for Linux deployment
|
||||||
|
├── README.md # Comprehensive documentation
|
||||||
|
├── .env # Production environment configuration
|
||||||
|
├── .env.testing # Testing environment configuration
|
||||||
|
├── .env.example # Environment configuration template
|
||||||
|
└── voices/ # Directory for voice WAV files
|
||||||
|
├── preferences.json # User voice/effect preferences (auto-generated)
|
||||||
|
└── *.wav # Voice reference files
|
||||||
|
```
|
||||||
|
|
||||||
|
## Core Functionality
|
||||||
|
|
||||||
|
### TTS Implementation
|
||||||
|
- **Engine**: Pocket TTS (`pocket-tts` library) for neural text-to-speech synthesis
|
||||||
|
- **Voice Cloning**: Uses reference WAV files to clone voices via `model.get_state_for_audio_prompt()`
|
||||||
|
- **On-demand Loading**: Voices are loaded only when first needed, then cached
|
||||||
|
|
||||||
|
### Discord Integration
|
||||||
|
- Monitors a configured text channel for messages
|
||||||
|
- Joins the user's voice channel when they speak
|
||||||
|
- Uses `discord.FFmpegPCMAudio` with piped WAV data for streaming
|
||||||
|
|
||||||
|
### Audio Processing Pipeline
|
||||||
|
```
|
||||||
|
Text Message → Pocket TTS → Audio Effects → Normalize → FFmpeg → Discord VC
|
||||||
|
```
|
||||||
|
|
||||||
|
## Dependencies
|
||||||
|
|
||||||
|
| Library | Purpose |
|
||||||
|
|---------|---------|
|
||||||
|
| `discord.py[voice]>=2.3.0` | Discord bot API with voice support |
|
||||||
|
| `pocket-tts>=0.1.0` | Neural TTS engine with voice cloning |
|
||||||
|
| `scipy>=1.10.0` | Scientific computing (audio I/O) |
|
||||||
|
| `numpy>=1.24.0` | Numerical computing |
|
||||||
|
| `librosa>=0.10.0` | Audio analysis and effects |
|
||||||
|
| `noisereduce>=3.0.0` | Noise reduction preprocessing |
|
||||||
|
| `soundfile>=0.12.0` | Audio file I/O |
|
||||||
|
| `python-dotenv>=1.0.0` | Environment variable loading |
|
||||||
|
|
||||||
|
**System Requirements**: Python 3.10+, FFmpeg
|
||||||
|
|
||||||
|
## Key Modules
|
||||||
|
|
||||||
|
### `TTSBot` (bot.py)
|
||||||
|
Main Discord bot class that extends `commands.Bot`. Handles:
|
||||||
|
- Message processing and TTS queue
|
||||||
|
- Voice channel connections
|
||||||
|
- Slash command registration
|
||||||
|
- Startup initialization (loads TTS model, discovers voices)
|
||||||
|
|
||||||
|
### `VoiceManager` (voice_manager.py)
|
||||||
|
Manages voice files and user preferences:
|
||||||
|
- Discovers voices from WAV files in `voices/` directory
|
||||||
|
- On-demand voice loading with caching
|
||||||
|
- Per-user voice selection and effect preferences
|
||||||
|
- Preferences persistence to JSON
|
||||||
|
|
||||||
|
### `AudioEffects` (audio_effects.py)
|
||||||
|
Provides 7 post-processing effects:
|
||||||
|
1. **Pitch** (-12 to +12 semitones)
|
||||||
|
2. **Speed** (0.5x to 2.0x)
|
||||||
|
3. **Echo** (0-100%)
|
||||||
|
4. **Robot** (0-100%) - Ring modulation
|
||||||
|
5. **Chorus** (0-100%) - Multiple voice layering
|
||||||
|
6. **Tremolo Depth** (0.0-1.0)
|
||||||
|
7. **Tremolo Rate** (0.0-10.0 Hz)
|
||||||
|
|
||||||
|
### `AudioPreprocessor` (audio_preprocessor.py)
|
||||||
|
Prepares voice reference files for cloning:
|
||||||
|
1. Load and resample to 22050 Hz
|
||||||
|
2. Normalize volume
|
||||||
|
3. Trim silence
|
||||||
|
4. Noise reduction
|
||||||
|
5. Limit length (default 15 seconds)
|
||||||
|
|
||||||
|
### `Config` (config.py)
|
||||||
|
Centralized configuration management with environment-aware loading and validation.
|
||||||
|
|
||||||
|
## Slash Commands
|
||||||
|
|
||||||
|
| Command | Description |
|
||||||
|
|---------|-------------|
|
||||||
|
| `/voice list` | Show available voices |
|
||||||
|
| `/voice set <name>` | Select your voice |
|
||||||
|
| `/voice current` | Show current voice |
|
||||||
|
| `/voice refresh` | Rescan for new voices |
|
||||||
|
| `/voice preview <name>` | Preview before committing |
|
||||||
|
| `/effects list` | Show your effect settings |
|
||||||
|
| `/effects set <effect> <value>` | Adjust effects |
|
||||||
|
| `/effects reset` | Reset to defaults |
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
- **Voice Cloning**: Add new voices by placing `.wav` files in `voices/` directory
|
||||||
|
- **Per-User Customization**: Each user can have their own voice and effect preferences
|
||||||
|
- **Hot-Reload**: Rescan for new voices without restart (`/voice refresh`)
|
||||||
|
- **Message Queue**: Queues messages for sequential playback
|
||||||
|
- **Inactivity Management**: Disconnects after 10 minutes of inactivity
|
||||||
|
- **Testing Support**: Separate `.env.testing` configuration for safe development
|
||||||
|
|
||||||
|
## Configuration (.env)
|
||||||
|
|
||||||
|
```env
|
||||||
|
DISCORD_TOKEN=your_bot_token
|
||||||
|
TEXT_CHANNEL_ID=channel_id_to_monitor
|
||||||
|
VOICES_DIR=./voices
|
||||||
|
DEFAULT_VOICE=optional_default_voice_name
|
||||||
|
```
|
||||||
|
|
||||||
|
## Running the Bot
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Production
|
||||||
|
python bot.py
|
||||||
|
|
||||||
|
# Testing (uses .env.testing)
|
||||||
|
python bot.py testing
|
||||||
|
|
||||||
|
# Or use the launch script
|
||||||
|
./launch.sh
|
||||||
|
```
|
||||||
|
|
||||||
|
For production deployment on Linux, a systemd service file (`pockettts.service`) is included.
|
||||||
@@ -1,77 +0,0 @@
|
|||||||
import io
|
|
||||||
import numpy as np
|
|
||||||
import scipy.io.wavfile as wavfile
|
|
||||||
from typing import Any
|
|
||||||
from pocket_tts import TTSModel
|
|
||||||
|
|
||||||
from audio_preprocessor import (
|
|
||||||
AudioPreprocessor,
|
|
||||||
PreprocessingConfig,
|
|
||||||
print_audio_analysis,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TTSHandler:
|
|
||||||
"""Handles text-to-speech generation using Pocket TTS."""
|
|
||||||
|
|
||||||
DISCORD_SAMPLE_RATE = 48000
|
|
||||||
|
|
||||||
def __init__(self, voice_wav_path: str, preprocess_audio: bool = True):
|
|
||||||
self.voice_wav_path = voice_wav_path
|
|
||||||
self.preprocess_audio = preprocess_audio
|
|
||||||
self.model: TTSModel | None = None
|
|
||||||
self.voice_state: Any = None
|
|
||||||
self._preprocessed_path: str | None = None
|
|
||||||
|
|
||||||
def load(self) -> None:
|
|
||||||
"""Load the TTS model and voice state from the WAV file."""
|
|
||||||
print("Loading Pocket TTS model...")
|
|
||||||
self.model = TTSModel.load_model()
|
|
||||||
|
|
||||||
voice_path = self.voice_wav_path
|
|
||||||
|
|
||||||
# Analyze and preprocess the audio if enabled
|
|
||||||
if self.preprocess_audio:
|
|
||||||
print("\nAnalyzing original audio...")
|
|
||||||
print_audio_analysis(self.voice_wav_path)
|
|
||||||
|
|
||||||
print("Preprocessing audio for optimal voice cloning...")
|
|
||||||
config = PreprocessingConfig(
|
|
||||||
target_sample_rate=22050,
|
|
||||||
normalize=True,
|
|
||||||
trim_silence=True,
|
|
||||||
trim_top_db=20,
|
|
||||||
reduce_noise=True,
|
|
||||||
target_length_seconds=15.0, # Limit to 15 seconds for best results
|
|
||||||
)
|
|
||||||
preprocessor = AudioPreprocessor(config)
|
|
||||||
voice_path = preprocessor.preprocess_file(self.voice_wav_path)
|
|
||||||
self._preprocessed_path = voice_path
|
|
||||||
print("")
|
|
||||||
|
|
||||||
print(f"Loading voice state from: {voice_path}")
|
|
||||||
self.voice_state = self.model.get_state_for_audio_prompt(voice_path)
|
|
||||||
print("TTS handler ready!")
|
|
||||||
|
|
||||||
def generate_wav_bytes(self, text: str) -> bytes:
|
|
||||||
"""Generate audio and return as WAV file bytes (for FFmpeg)."""
|
|
||||||
if self.model is None or self.voice_state is None:
|
|
||||||
raise RuntimeError("TTS handler not loaded. Call load() first.")
|
|
||||||
|
|
||||||
audio = self.model.generate_audio(self.voice_state, text)
|
|
||||||
audio_np = audio.numpy()
|
|
||||||
|
|
||||||
if audio_np.ndim == 1:
|
|
||||||
audio_np = audio_np.reshape(-1, 1)
|
|
||||||
|
|
||||||
max_val = np.max(np.abs(audio_np))
|
|
||||||
if max_val > 0:
|
|
||||||
audio_np = audio_np / max_val
|
|
||||||
audio_int16 = (audio_np * 32767).astype(np.int16)
|
|
||||||
|
|
||||||
wav_buffer = io.BytesIO()
|
|
||||||
wavfile.write(wav_buffer, self.model.sample_rate, audio_int16)
|
|
||||||
wav_buffer.seek(0)
|
|
||||||
return wav_buffer.read()
|
|
||||||
|
|
||||||
|
|
||||||
312
voice_manager.py
Executable file
312
voice_manager.py
Executable file
@@ -0,0 +1,312 @@
|
|||||||
|
"""Voice management for per-user voice selection and on-demand loading."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from pocket_tts import TTSModel
|
||||||
|
|
||||||
|
from audio_effects import AudioEffects
|
||||||
|
from audio_preprocessor import (
|
||||||
|
AudioPreprocessor,
|
||||||
|
PreprocessingConfig,
|
||||||
|
print_audio_analysis,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class VoiceManager:
|
||||||
|
"""Manages available voices, per-user preferences, and on-demand voice loading."""
|
||||||
|
|
||||||
|
def __init__(self, voices_dir: str, default_voice: str | None = None):
|
||||||
|
self.voices_dir = Path(voices_dir)
|
||||||
|
self.default_voice = default_voice
|
||||||
|
self.model: TTSModel | None = None
|
||||||
|
self.preferences_file = self.voices_dir / "preferences.json"
|
||||||
|
|
||||||
|
# Cache of loaded voice states: voice_name -> voice_state
|
||||||
|
self._voice_states: dict[str, Any] = {}
|
||||||
|
# Per-user voice preferences: user_id -> voice_name
|
||||||
|
self._user_voices: dict[int, str] = {}
|
||||||
|
# Per-user audio effects: user_id -> {"pitch": int, "speed": float}
|
||||||
|
self._user_effects: dict[int, dict[str, Any]] = {}
|
||||||
|
# Available voices: voice_name -> file_path
|
||||||
|
self._available_voices: dict[str, Path] = {}
|
||||||
|
|
||||||
|
def discover_voices(self) -> dict[str, Path]:
|
||||||
|
"""Discover all available voice WAV files in the voices directory."""
|
||||||
|
old_voices = set(self._available_voices.keys())
|
||||||
|
self._available_voices = {}
|
||||||
|
|
||||||
|
if not self.voices_dir.exists():
|
||||||
|
print(f"Voices directory not found: {self.voices_dir}")
|
||||||
|
return self._available_voices
|
||||||
|
|
||||||
|
for wav_file in self.voices_dir.glob("*.wav"):
|
||||||
|
voice_name = wav_file.stem.lower()
|
||||||
|
self._available_voices[voice_name] = wav_file
|
||||||
|
print(f" Found voice: {voice_name} ({wav_file.name})")
|
||||||
|
|
||||||
|
# Set default voice if not specified
|
||||||
|
if self.default_voice is None and self._available_voices:
|
||||||
|
self.default_voice = next(iter(self._available_voices.keys()))
|
||||||
|
|
||||||
|
# Load saved preferences
|
||||||
|
self._load_preferences()
|
||||||
|
|
||||||
|
return self._available_voices
|
||||||
|
|
||||||
|
def refresh_voices(self) -> tuple[list[str], list[str]]:
|
||||||
|
"""Re-scan for voices and return (new_voices, removed_voices)."""
|
||||||
|
old_voices = set(self._available_voices.keys())
|
||||||
|
|
||||||
|
self._available_voices = {}
|
||||||
|
if self.voices_dir.exists():
|
||||||
|
for wav_file in self.voices_dir.glob("*.wav"):
|
||||||
|
voice_name = wav_file.stem.lower()
|
||||||
|
self._available_voices[voice_name] = wav_file
|
||||||
|
|
||||||
|
new_voices = set(self._available_voices.keys())
|
||||||
|
added = sorted(new_voices - old_voices)
|
||||||
|
removed = sorted(old_voices - new_voices)
|
||||||
|
|
||||||
|
# Update default if needed
|
||||||
|
if self.default_voice not in self._available_voices and self._available_voices:
|
||||||
|
self.default_voice = next(iter(self._available_voices.keys()))
|
||||||
|
|
||||||
|
return added, removed
|
||||||
|
|
||||||
|
def load_model(self) -> None:
|
||||||
|
"""Load the TTS model (does not load any voices yet)."""
|
||||||
|
print("Loading Pocket TTS model...")
|
||||||
|
self.model = TTSModel.load_model()
|
||||||
|
print("TTS model loaded!")
|
||||||
|
|
||||||
|
def get_available_voices(self) -> list[str]:
|
||||||
|
"""Get list of available voice names."""
|
||||||
|
return sorted(self._available_voices.keys())
|
||||||
|
|
||||||
|
def is_voice_available(self, voice_name: str) -> bool:
|
||||||
|
"""Check if a voice is available."""
|
||||||
|
return voice_name.lower() in self._available_voices
|
||||||
|
|
||||||
|
def get_voice_state(self, voice_name: str) -> Any:
|
||||||
|
"""Get or load a voice state on-demand."""
|
||||||
|
if self.model is None:
|
||||||
|
raise RuntimeError("Model not loaded. Call load_model() first.")
|
||||||
|
|
||||||
|
voice_name = voice_name.lower()
|
||||||
|
|
||||||
|
if voice_name not in self._available_voices:
|
||||||
|
raise ValueError(f"Voice '{voice_name}' not found")
|
||||||
|
|
||||||
|
# Return cached state if already loaded
|
||||||
|
if voice_name in self._voice_states:
|
||||||
|
return self._voice_states[voice_name]
|
||||||
|
|
||||||
|
# Load the voice on-demand
|
||||||
|
voice_path = self._available_voices[voice_name]
|
||||||
|
print(f"Loading voice '{voice_name}' from {voice_path}...")
|
||||||
|
|
||||||
|
# Preprocess the audio
|
||||||
|
print(f" Analyzing audio...")
|
||||||
|
print_audio_analysis(str(voice_path))
|
||||||
|
|
||||||
|
print(f" Preprocessing audio...")
|
||||||
|
config = PreprocessingConfig(
|
||||||
|
target_sample_rate=22050,
|
||||||
|
normalize=True,
|
||||||
|
trim_silence=True,
|
||||||
|
trim_top_db=20,
|
||||||
|
reduce_noise=True,
|
||||||
|
target_length_seconds=15.0,
|
||||||
|
)
|
||||||
|
preprocessor = AudioPreprocessor(config)
|
||||||
|
processed_path = preprocessor.preprocess_file(str(voice_path))
|
||||||
|
|
||||||
|
# Load voice state
|
||||||
|
voice_state = self.model.get_state_for_audio_prompt(processed_path)
|
||||||
|
self._voice_states[voice_name] = voice_state
|
||||||
|
print(f" Voice '{voice_name}' loaded and cached!")
|
||||||
|
|
||||||
|
return voice_state
|
||||||
|
|
||||||
|
def is_voice_loaded(self, voice_name: str) -> bool:
|
||||||
|
"""Check if a voice is already loaded in cache."""
|
||||||
|
return voice_name.lower() in self._voice_states
|
||||||
|
|
||||||
|
def get_user_voice(self, user_id: int) -> str:
|
||||||
|
"""Get the voice preference for a user, or default voice."""
|
||||||
|
return self._user_voices.get(user_id, self.default_voice or "")
|
||||||
|
|
||||||
|
def set_user_voice(self, user_id: int, voice_name: str) -> None:
|
||||||
|
"""Set the voice preference for a user."""
|
||||||
|
voice_name = voice_name.lower()
|
||||||
|
if voice_name not in self._available_voices:
|
||||||
|
raise ValueError(f"Voice '{voice_name}' not found")
|
||||||
|
self._user_voices[user_id] = voice_name
|
||||||
|
self._save_preferences()
|
||||||
|
|
||||||
|
def get_user_voice_state(self, user_id: int) -> Any:
|
||||||
|
"""Get the voice state for a user (loads on-demand if needed)."""
|
||||||
|
voice_name = self.get_user_voice(user_id)
|
||||||
|
if not voice_name:
|
||||||
|
raise RuntimeError("No default voice available")
|
||||||
|
return self.get_voice_state(voice_name)
|
||||||
|
|
||||||
|
def get_loaded_voices(self) -> list[str]:
|
||||||
|
"""Get list of currently loaded voice names."""
|
||||||
|
return list(self._voice_states.keys())
|
||||||
|
|
||||||
|
def _load_preferences(self) -> None:
|
||||||
|
"""Load user voice preferences from JSON file."""
|
||||||
|
if not self.preferences_file.exists():
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(self.preferences_file, "r") as f:
|
||||||
|
data = json.load(f)
|
||||||
|
|
||||||
|
# Load user preferences (convert string keys back to int)
|
||||||
|
for user_id_str, voice_name in data.get("user_voices", {}).items():
|
||||||
|
user_id = int(user_id_str)
|
||||||
|
# Only load if voice still exists
|
||||||
|
if voice_name.lower() in self._available_voices:
|
||||||
|
self._user_voices[user_id] = voice_name.lower()
|
||||||
|
|
||||||
|
print(f" Loaded {len(self._user_voices)} user voice preferences")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" Warning: Failed to load preferences: {e}")
|
||||||
|
|
||||||
|
def _save_preferences(self) -> None:
|
||||||
|
"""Save user voice preferences to JSON file."""
|
||||||
|
try:
|
||||||
|
# Ensure directory exists
|
||||||
|
self.preferences_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
data = {
|
||||||
|
"user_voices": {str(k): v for k, v in self._user_voices.items()},
|
||||||
|
"user_effects": {str(k): v for k, v in self._user_effects.items()},
|
||||||
|
}
|
||||||
|
|
||||||
|
with open(self.preferences_file, "w") as f:
|
||||||
|
json.dump(data, f, indent=2)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Warning: Failed to save preferences: {e}")
|
||||||
|
|
||||||
|
# Effects management methods
|
||||||
|
|
||||||
|
def get_user_effects(self, user_id: int) -> dict[str, int | float]:
|
||||||
|
"""Get the audio effects for a user. Returns defaults if not set."""
|
||||||
|
effects = self._user_effects.get(user_id, {})
|
||||||
|
# Convert to proper types (JSON stores them as strings)
|
||||||
|
pitch = effects.get("pitch", AudioEffects.PITCH_DEFAULT)
|
||||||
|
speed = effects.get("speed", AudioEffects.SPEED_DEFAULT)
|
||||||
|
echo = effects.get("echo", AudioEffects.ECHO_DEFAULT)
|
||||||
|
robot = effects.get("robot", AudioEffects.ROBOT_DEFAULT)
|
||||||
|
chorus = effects.get("chorus", AudioEffects.CHORUS_DEFAULT)
|
||||||
|
tremolo_depth = effects.get("tremolo_depth", AudioEffects.TREMOLO_DEPTH_DEFAULT)
|
||||||
|
tremolo_rate = effects.get("tremolo_rate", AudioEffects.TREMOLO_RATE_DEFAULT)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"pitch": int(pitch) if pitch is not None else AudioEffects.PITCH_DEFAULT,
|
||||||
|
"speed": float(speed) if speed is not None else AudioEffects.SPEED_DEFAULT,
|
||||||
|
"echo": int(echo) if echo is not None else AudioEffects.ECHO_DEFAULT,
|
||||||
|
"robot": int(robot) if robot is not None else AudioEffects.ROBOT_DEFAULT,
|
||||||
|
"chorus": int(chorus) if chorus is not None else AudioEffects.CHORUS_DEFAULT,
|
||||||
|
"tremolo_depth": float(tremolo_depth) if tremolo_depth is not None else AudioEffects.TREMOLO_DEPTH_DEFAULT,
|
||||||
|
"tremolo_rate": float(tremolo_rate) if tremolo_rate is not None else AudioEffects.TREMOLO_RATE_DEFAULT,
|
||||||
|
}
|
||||||
|
|
||||||
|
def set_user_effect(self, user_id: int, effect_name: str, value: Any) -> tuple[bool, str]:
|
||||||
|
"""
|
||||||
|
Set an audio effect for a user.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (success, message)
|
||||||
|
"""
|
||||||
|
# Validate the effect
|
||||||
|
is_valid, error_msg = AudioEffects.validate_effect(effect_name, value)
|
||||||
|
if not is_valid:
|
||||||
|
return False, error_msg
|
||||||
|
|
||||||
|
# Get current effects
|
||||||
|
if user_id not in self._user_effects:
|
||||||
|
self._user_effects[user_id] = {}
|
||||||
|
|
||||||
|
# Save the effect
|
||||||
|
current_effects = self._user_effects[user_id].copy()
|
||||||
|
if effect_name == "pitch":
|
||||||
|
current_effects["pitch"] = int(value)
|
||||||
|
elif effect_name == "speed":
|
||||||
|
current_effects["speed"] = float(value)
|
||||||
|
elif effect_name == "echo":
|
||||||
|
current_effects["echo"] = int(value)
|
||||||
|
elif effect_name == "robot":
|
||||||
|
current_effects["robot"] = int(value)
|
||||||
|
elif effect_name == "chorus":
|
||||||
|
current_effects["chorus"] = int(value)
|
||||||
|
elif effect_name == "tremolo_depth":
|
||||||
|
current_effects["tremolo_depth"] = float(value)
|
||||||
|
elif effect_name == "tremolo_rate":
|
||||||
|
current_effects["tremolo_rate"] = float(value)
|
||||||
|
|
||||||
|
# Count active effects and show warning if > 2
|
||||||
|
active_count = AudioEffects.count_active_effects(
|
||||||
|
pitch=current_effects.get("pitch", AudioEffects.PITCH_DEFAULT),
|
||||||
|
speed=current_effects.get("speed", AudioEffects.SPEED_DEFAULT),
|
||||||
|
echo=current_effects.get("echo", AudioEffects.ECHO_DEFAULT),
|
||||||
|
robot=current_effects.get("robot", AudioEffects.ROBOT_DEFAULT),
|
||||||
|
chorus=current_effects.get("chorus", AudioEffects.CHORUS_DEFAULT),
|
||||||
|
tremolo_depth=current_effects.get("tremolo_depth", AudioEffects.TREMOLO_DEPTH_DEFAULT),
|
||||||
|
)
|
||||||
|
self._user_effects[user_id][effect_name] = value
|
||||||
|
self._save_preferences()
|
||||||
|
|
||||||
|
if active_count > 2:
|
||||||
|
return True, f"Effect applied! ⚠️ You have {active_count} active effects. Performance may be slower with more effects."
|
||||||
|
else:
|
||||||
|
return True, "Effect applied successfully!"
|
||||||
|
|
||||||
|
def reset_user_effects(self, user_id: int) -> None:
|
||||||
|
"""Reset all audio effects to defaults for a user."""
|
||||||
|
if user_id in self._user_effects:
|
||||||
|
del self._user_effects[user_id]
|
||||||
|
self._save_preferences()
|
||||||
|
|
||||||
|
def count_active_effects(self, user_id: int) -> int:
|
||||||
|
"""Count how many effects are active for a user."""
|
||||||
|
effects = self.get_user_effects(user_id)
|
||||||
|
return AudioEffects.count_active_effects(
|
||||||
|
pitch=effects["pitch"],
|
||||||
|
speed=effects["speed"],
|
||||||
|
echo=effects["echo"],
|
||||||
|
robot=effects["robot"],
|
||||||
|
chorus=effects["chorus"],
|
||||||
|
tremolo_depth=effects["tremolo_depth"],
|
||||||
|
)
|
||||||
|
|
||||||
|
def _load_preferences(self) -> None:
|
||||||
|
"""Load user voice preferences from JSON file."""
|
||||||
|
if not self.preferences_file.exists():
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(self.preferences_file, "r") as f:
|
||||||
|
data = json.load(f)
|
||||||
|
|
||||||
|
# Load user preferences (convert string keys back to int)
|
||||||
|
for user_id_str, voice_name in data.get("user_voices", {}).items():
|
||||||
|
user_id = int(user_id_str)
|
||||||
|
# Only load if voice still exists
|
||||||
|
if voice_name.lower() in self._available_voices:
|
||||||
|
self._user_voices[user_id] = voice_name.lower()
|
||||||
|
|
||||||
|
# Load user effects (convert string keys back to int)
|
||||||
|
for user_id_str, effects in data.get("user_effects", {}).items():
|
||||||
|
user_id = int(user_id_str)
|
||||||
|
self._user_effects[user_id] = effects
|
||||||
|
|
||||||
|
print(f" Loaded {len(self._user_voices)} user voice preferences")
|
||||||
|
print(f" Loaded {len(self._user_effects)} user effect preferences")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" Warning: Failed to load preferences: {e}")
|
||||||
BIN
voices/ChoGath.wav
Executable file
BIN
voices/ChoGath.wav
Executable file
Binary file not shown.
0
Estinien.wav → voices/Estinien.wav
Normal file → Executable file
0
Estinien.wav → voices/Estinien.wav
Normal file → Executable file
0
Gaius.wav → voices/Gaius.wav
Normal file → Executable file
0
Gaius.wav → voices/Gaius.wav
Normal file → Executable file
0
Gibralter_funny.wav → voices/Gibralter_funny.wav
Normal file → Executable file
0
Gibralter_funny.wav → voices/Gibralter_funny.wav
Normal file → Executable file
0
Gibralter_good.wav → voices/Gibralter_good.wav
Normal file → Executable file
0
Gibralter_good.wav → voices/Gibralter_good.wav
Normal file → Executable file
0
HankHill.wav → voices/HankHill.wav
Normal file → Executable file
0
HankHill.wav → voices/HankHill.wav
Normal file → Executable file
0
Johnny.wav → voices/Johnny.wav
Normal file → Executable file
0
Johnny.wav → voices/Johnny.wav
Normal file → Executable file
0
MasterChief.wav → voices/MasterChief.wav
Normal file → Executable file
0
MasterChief.wav → voices/MasterChief.wav
Normal file → Executable file
BIN
voices/SelfHelpSingh.wav
Executable file
BIN
voices/SelfHelpSingh.wav
Executable file
Binary file not shown.
0
Trump.wav → voices/Trump.wav
Normal file → Executable file
0
Trump.wav → voices/Trump.wav
Normal file → Executable file
Reference in New Issue
Block a user