- Updated queue system to pass effects as dict instead of individual params - Updated process_queue to handle effects_dict for previews - Updated speak_message to extract all 7 effects from user settings - Updated _generate_wav_bytes to accept effects dict and pass all params - Updated _handle_voice_preview to use new effects dict system - Effects now actually process the audio: - pitch, speed, echo, robot, chorus, tremolo_depth, tremolo_rate - Fixed preview effect description to use preview_effects dict
845 lines
33 KiB
Python
845 lines
33 KiB
Python
__version__ = "1.1.0"
|
||
|
||
import random
|
||
import sys
|
||
import os
|
||
|
||
# Parse command line arguments before loading any config
|
||
if len(sys.argv) > 1 and sys.argv[1] == "testing":
|
||
os.environ["ENV_MODE"] = "testing"
|
||
# Remove the argument so it doesn't interfere with other parsing
|
||
sys.argv.pop(1)
|
||
|
||
import numba_config
|
||
import asyncio
|
||
import io
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
from typing import Any
|
||
|
||
import discord
|
||
import numpy as np
|
||
import scipy.io.wavfile as wavfile
|
||
from discord import app_commands
|
||
from discord.ext import commands
|
||
|
||
from audio_effects import AudioEffects
|
||
from config import Config
|
||
from voice_manager import VoiceManager
|
||
|
||
|
||
# Inactivity timeout in seconds (10 minutes)
|
||
INACTIVITY_TIMEOUT = 10 * 60
|
||
|
||
# Sample lines for voice preview
|
||
PREVIEW_LINES = [
|
||
"Hello! This is how I sound. Choose me as your voice with /voice set.",
|
||
"Testing, one, two, three! Can you hear me clearly?",
|
||
"Here's a preview of my voice. Pretty cool, right?",
|
||
"Greetings! I am ready to speak for you.",
|
||
"Voice check! This is what I sound like.",
|
||
"Audio test complete. This voice is ready to go!",
|
||
"Sample message incoming. How do I sound to you?",
|
||
"Preview mode activated. Testing speech synthesis.",
|
||
]
|
||
|
||
|
||
class TTSBot(commands.Bot):
|
||
"""Discord bot that reads messages aloud using Pocket TTS."""
|
||
|
||
def __init__(self):
|
||
intents = discord.Intents.default()
|
||
intents.message_content = True
|
||
intents.voice_states = True
|
||
super().__init__(command_prefix="!", intents=intents)
|
||
|
||
self.voice_manager = VoiceManager(Config.VOICES_DIR, Config.DEFAULT_VOICE)
|
||
self.message_queue: asyncio.Queue[tuple[discord.Message, str] | tuple[discord.Message, str, str]] = asyncio.Queue()
|
||
self.last_activity: float = 0.0
|
||
|
||
print("\n=== Command Registration ===")
|
||
self._setup_slash_commands()
|
||
self._setup_effects_commands()
|
||
self._log_registered_commands()
|
||
print("=== End Command Registration ===\n")
|
||
|
||
def _log_registered_commands(self) -> None:
|
||
"""Log all registered commands to console."""
|
||
print("\nRegistered commands:")
|
||
commands = list(self.tree.get_commands())
|
||
if not commands:
|
||
print(" ⚠️ No commands registered!")
|
||
else:
|
||
for cmd in commands:
|
||
print(f" ✓ /{cmd.name} - {cmd.description}")
|
||
print(f"\nTotal commands registered: {len(commands)}")
|
||
|
||
def _setup_slash_commands(self) -> None:
|
||
"""Set up slash commands for voice management."""
|
||
print("Setting up voice commands...")
|
||
|
||
@self.tree.command(name="voice", description="Manage your TTS voice")
|
||
@app_commands.describe(
|
||
action="What to do",
|
||
voice_name="Name of the voice (for 'set' or 'preview' action)",
|
||
preview_pitch="Optional pitch for preview (-12 to 12, default: use your settings)",
|
||
preview_speed="Optional speed for preview (0.5 to 2.0, default: use your settings)",
|
||
)
|
||
@app_commands.choices(action=[
|
||
app_commands.Choice(name="list", value="list"),
|
||
app_commands.Choice(name="set", value="set"),
|
||
app_commands.Choice(name="current", value="current"),
|
||
app_commands.Choice(name="refresh", value="refresh"),
|
||
app_commands.Choice(name="preview", value="preview"),
|
||
])
|
||
async def voice_command(
|
||
interaction: discord.Interaction,
|
||
action: app_commands.Choice[str],
|
||
voice_name: str | None = None,
|
||
preview_pitch: int | None = None,
|
||
preview_speed: float | None = None,
|
||
):
|
||
if action.value == "list":
|
||
await self._handle_voice_list(interaction)
|
||
elif action.value == "set":
|
||
await self._handle_voice_set(interaction, voice_name)
|
||
elif action.value == "current":
|
||
await self._handle_voice_current(interaction)
|
||
elif action.value == "refresh":
|
||
await self._handle_voice_refresh(interaction)
|
||
elif action.value == "preview":
|
||
await self._handle_voice_preview(interaction, voice_name, preview_pitch, preview_speed)
|
||
|
||
@voice_command.autocomplete("voice_name")
|
||
async def voice_name_autocomplete(
|
||
interaction: discord.Interaction,
|
||
current: str
|
||
) -> list[app_commands.Choice[str]]:
|
||
voices = self.voice_manager.get_available_voices()
|
||
return [
|
||
app_commands.Choice(name=v, value=v)
|
||
for v in voices
|
||
if current.lower() in v.lower()
|
||
][:25]
|
||
|
||
def _setup_effects_commands(self) -> None:
|
||
"""Set up slash commands for audio effects management."""
|
||
print("Setting up effects commands...")
|
||
|
||
@self.tree.command(name="effects", description="Manage your TTS audio effects")
|
||
@app_commands.describe(
|
||
action="What to do",
|
||
effect_name="Name of the effect (for 'set' action)",
|
||
value="Value for the effect (for 'set' action)"
|
||
)
|
||
@app_commands.choices(action=[
|
||
app_commands.Choice(name="list", value="list"),
|
||
app_commands.Choice(name="set", value="set"),
|
||
app_commands.Choice(name="reset", value="reset"),
|
||
])
|
||
@app_commands.choices(effect_name=[
|
||
app_commands.Choice(name="pitch", value="pitch"),
|
||
app_commands.Choice(name="speed", value="speed"),
|
||
app_commands.Choice(name="echo", value="echo"),
|
||
app_commands.Choice(name="robot", value="robot"),
|
||
app_commands.Choice(name="chorus", value="chorus"),
|
||
app_commands.Choice(name="tremolo_depth", value="tremolo_depth"),
|
||
app_commands.Choice(name="tremolo_rate", value="tremolo_rate"),
|
||
])
|
||
async def effects_command(
|
||
interaction: discord.Interaction,
|
||
action: app_commands.Choice[str],
|
||
effect_name: app_commands.Choice[str] | None = None,
|
||
value: str | None = None
|
||
):
|
||
if action.value == "list":
|
||
await self._handle_effects_list(interaction)
|
||
elif action.value == "set":
|
||
await self._handle_effects_set(interaction, effect_name, value)
|
||
elif action.value == "reset":
|
||
await self._handle_effects_reset(interaction)
|
||
|
||
async def _handle_effects_list(self, interaction: discord.Interaction) -> None:
|
||
"""Handle /effects list command."""
|
||
effects = self.voice_manager.get_user_effects(interaction.user.id)
|
||
active_count = self.voice_manager.count_active_effects(interaction.user.id)
|
||
|
||
lines = ["**Your Audio Effects:**\n"]
|
||
|
||
# Pitch
|
||
pitch_desc = AudioEffects.get_effect_description("pitch")
|
||
pitch_val = AudioEffects.format_effect_value("pitch", effects["pitch"])
|
||
lines.append(f"🎵 **Pitch**: {pitch_val}")
|
||
lines.append(f" {pitch_desc}\n")
|
||
|
||
# Speed
|
||
speed_desc = AudioEffects.get_effect_description("speed")
|
||
speed_val = AudioEffects.format_effect_value("speed", effects["speed"])
|
||
lines.append(f"⚡ **Speed**: {speed_val}")
|
||
lines.append(f" {speed_desc}\n")
|
||
|
||
# Echo
|
||
echo_desc = AudioEffects.get_effect_description("echo")
|
||
echo_val = AudioEffects.format_effect_value("echo", effects["echo"])
|
||
lines.append(f"🔊 **Echo**: {echo_val}")
|
||
lines.append(f" {echo_desc}\n")
|
||
|
||
# Robot
|
||
robot_desc = AudioEffects.get_effect_description("robot")
|
||
robot_val = AudioEffects.format_effect_value("robot", effects["robot"])
|
||
lines.append(f"🤖 **Robot**: {robot_val}")
|
||
lines.append(f" {robot_desc}\n")
|
||
|
||
# Chorus
|
||
chorus_desc = AudioEffects.get_effect_description("chorus")
|
||
chorus_val = AudioEffects.format_effect_value("chorus", effects["chorus"])
|
||
lines.append(f"🎶 **Chorus**: {chorus_val}")
|
||
lines.append(f" {chorus_desc}\n")
|
||
|
||
# Tremolo Depth
|
||
tremolo_depth_desc = AudioEffects.get_effect_description("tremolo_depth")
|
||
tremolo_depth_val = AudioEffects.format_effect_value("tremolo_depth", effects["tremolo_depth"])
|
||
lines.append(f"〰️ **Tremolo Depth**: {tremolo_depth_val}")
|
||
lines.append(f" {tremolo_depth_desc}\n")
|
||
|
||
# Tremolo Rate
|
||
tremolo_rate_desc = AudioEffects.get_effect_description("tremolo_rate")
|
||
tremolo_rate_val = AudioEffects.format_effect_value("tremolo_rate", effects["tremolo_rate"])
|
||
lines.append(f"📳 **Tremolo Rate**: {tremolo_rate_val}")
|
||
lines.append(f" {tremolo_rate_desc}\n")
|
||
|
||
# Active count warning
|
||
lines.append(f"**Active Effects**: {active_count}")
|
||
if active_count > 2:
|
||
lines.append("⚠️ You have more than 2 active effects. Processing may be slower!")
|
||
elif active_count > 0:
|
||
lines.append("ℹ️ Add more effects for fun variations (may slow processing)")
|
||
|
||
lines.append(f"\n*Use `/effects set <effect> <value>` to change settings*")
|
||
lines.append(f"*Use `/effects reset` to clear all effects*")
|
||
|
||
await interaction.response.send_message(
|
||
"\n".join(lines),
|
||
ephemeral=True
|
||
)
|
||
|
||
async def _handle_effects_set(
|
||
self,
|
||
interaction: discord.Interaction,
|
||
effect_name: app_commands.Choice[str] | None,
|
||
value: str | None
|
||
) -> None:
|
||
"""Handle /effects set command."""
|
||
if not effect_name or value is None:
|
||
await interaction.response.send_message(
|
||
"❌ Please provide both effect name and value. Example: `/effects set pitch 3`",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
success, message = self.voice_manager.set_user_effect(
|
||
interaction.user.id,
|
||
effect_name.value,
|
||
value
|
||
)
|
||
|
||
if success:
|
||
await interaction.response.send_message(
|
||
f"✅ {message}",
|
||
ephemeral=True
|
||
)
|
||
else:
|
||
await interaction.response.send_message(
|
||
f"❌ {message}",
|
||
ephemeral=True
|
||
)
|
||
|
||
async def _handle_effects_reset(self, interaction: discord.Interaction) -> None:
|
||
"""Handle /effects reset command with confirmation UI."""
|
||
# Check if user has any effects to reset
|
||
active_count = self.voice_manager.count_active_effects(interaction.user.id)
|
||
|
||
if active_count == 0:
|
||
await interaction.response.send_message(
|
||
"ℹ️ You don't have any active effects to reset.",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
# Create confirmation buttons
|
||
class ConfirmResetView(discord.ui.View):
|
||
def __init__(self, voice_manager, user_id):
|
||
super().__init__(timeout=30)
|
||
self.voice_manager = voice_manager
|
||
self.user_id = user_id
|
||
self.confirmed = False
|
||
|
||
@discord.ui.button(label="✅ Yes, Reset All", style=discord.ButtonStyle.danger)
|
||
async def confirm_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||
if interaction.user.id != self.user_id:
|
||
await interaction.response.send_message("This button is not for you!", ephemeral=True)
|
||
return
|
||
|
||
self.voice_manager.reset_user_effects(self.user_id)
|
||
self.confirmed = True
|
||
await interaction.response.edit_message(
|
||
content="✅ All audio effects have been reset to defaults!",
|
||
view=None
|
||
)
|
||
self.stop()
|
||
|
||
@discord.ui.button(label="❌ Cancel", style=discord.ButtonStyle.secondary)
|
||
async def cancel_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||
if interaction.user.id != self.user_id:
|
||
await interaction.response.send_message("This button is not for you!", ephemeral=True)
|
||
return
|
||
|
||
await interaction.response.edit_message(
|
||
content="❌ Reset cancelled. Your effects remain unchanged.",
|
||
view=None
|
||
)
|
||
self.stop()
|
||
|
||
view = ConfirmResetView(self.voice_manager, interaction.user.id)
|
||
|
||
await interaction.response.send_message(
|
||
f"⚠️ **Reset Confirmation**\n\n"
|
||
f"You have {active_count} active effect(s).\n"
|
||
f"This will reset **all** your audio effects to defaults:\n"
|
||
f"• Pitch: 0 (normal)\n"
|
||
f"• Speed: 1.0x (normal)\n\n"
|
||
f"Are you sure you want to continue?",
|
||
view=view,
|
||
ephemeral=True
|
||
)
|
||
|
||
async def _handle_voice_list(self, interaction: discord.Interaction) -> None:
|
||
"""Handle /voice list command."""
|
||
voices = self.voice_manager.get_available_voices()
|
||
loaded = self.voice_manager.get_loaded_voices()
|
||
user_voice = self.voice_manager.get_user_voice(interaction.user.id)
|
||
|
||
if not voices:
|
||
await interaction.response.send_message(
|
||
"❌ No voices available. Add .wav files to the voices directory.",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
lines = ["**Available Voices:**\n"]
|
||
for voice in voices:
|
||
status = []
|
||
if voice == user_voice:
|
||
status.append("✅ your voice")
|
||
if voice in loaded:
|
||
status.append("📦 loaded")
|
||
status_str = f" ({', '.join(status)})" if status else ""
|
||
lines.append(f"• `{voice}`{status_str}")
|
||
|
||
lines.append(f"\n*Use `/voice set <name>` to change your voice.*")
|
||
|
||
await interaction.response.send_message(
|
||
"\n".join(lines),
|
||
ephemeral=True
|
||
)
|
||
|
||
async def _handle_voice_set(self, interaction: discord.Interaction, voice_name: str | None) -> None:
|
||
"""Handle /voice set command."""
|
||
if not voice_name:
|
||
await interaction.response.send_message(
|
||
"❌ Please provide a voice name. Use `/voice list` to see available voices.",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
voice_name = voice_name.lower()
|
||
|
||
if not self.voice_manager.is_voice_available(voice_name):
|
||
voices = self.voice_manager.get_available_voices()
|
||
await interaction.response.send_message(
|
||
f"❌ Voice `{voice_name}` not found.\n"
|
||
f"Available voices: {', '.join(f'`{v}`' for v in voices)}",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
# Check if voice needs to be loaded
|
||
needs_loading = not self.voice_manager.is_voice_loaded(voice_name)
|
||
|
||
if needs_loading:
|
||
await interaction.response.send_message(
|
||
f"⏳ Loading voice `{voice_name}` for the first time... This may take a moment.",
|
||
ephemeral=True
|
||
)
|
||
try:
|
||
await asyncio.to_thread(self.voice_manager.get_voice_state, voice_name)
|
||
except Exception as e:
|
||
await interaction.followup.send(
|
||
f"❌ Failed to load voice `{voice_name}`: {e}",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
self.voice_manager.set_user_voice(interaction.user.id, voice_name)
|
||
|
||
if needs_loading:
|
||
await interaction.followup.send(
|
||
f"✅ Voice changed to `{voice_name}`!",
|
||
ephemeral=True
|
||
)
|
||
else:
|
||
await interaction.response.send_message(
|
||
f"✅ Voice changed to `{voice_name}`!",
|
||
ephemeral=True
|
||
)
|
||
|
||
async def _handle_voice_current(self, interaction: discord.Interaction) -> None:
|
||
"""Handle /voice current command."""
|
||
voice = self.voice_manager.get_user_voice(interaction.user.id)
|
||
if voice:
|
||
loaded = "(loaded)" if self.voice_manager.is_voice_loaded(voice) else "(not yet loaded)"
|
||
await interaction.response.send_message(
|
||
f"🎤 Your current voice: `{voice}` {loaded}",
|
||
ephemeral=True
|
||
)
|
||
else:
|
||
await interaction.response.send_message(
|
||
"❌ No voice set. Use `/voice set <name>` to choose a voice.",
|
||
ephemeral=True
|
||
)
|
||
|
||
async def _handle_voice_refresh(self, interaction: discord.Interaction) -> None:
|
||
"""Handle /voice refresh command."""
|
||
await interaction.response.send_message(
|
||
"🔄 Scanning for new voices...",
|
||
ephemeral=True
|
||
)
|
||
|
||
added, removed = await asyncio.to_thread(self.voice_manager.refresh_voices)
|
||
|
||
lines = []
|
||
if added:
|
||
lines.append(f"✅ **New voices found:** {', '.join(f'`{v}`' for v in added)}")
|
||
if removed:
|
||
lines.append(f"❌ **Voices removed:** {', '.join(f'`{v}`' for v in removed)}")
|
||
if not added and not removed:
|
||
lines.append("No changes detected.")
|
||
|
||
total = len(self.voice_manager.get_available_voices())
|
||
lines.append(f"\n*Total voices available: {total}*")
|
||
|
||
await interaction.followup.send(
|
||
"\n".join(lines),
|
||
ephemeral=True
|
||
)
|
||
|
||
async def _handle_voice_preview(
|
||
self,
|
||
interaction: discord.Interaction,
|
||
voice_name: str | None,
|
||
preview_pitch: int | None = None,
|
||
preview_speed: float | None = None,
|
||
) -> None:
|
||
"""Handle /voice preview command."""
|
||
if not voice_name:
|
||
await interaction.response.send_message(
|
||
"❌ Please provide a voice name. Use `/voice list` to see available voices.",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
# Check if user is in a voice channel
|
||
if interaction.user.voice is None:
|
||
await interaction.response.send_message(
|
||
"❌ You need to be in a voice channel to hear a preview!",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
voice_name = voice_name.lower()
|
||
|
||
# Validate voice exists
|
||
if not self.voice_manager.is_voice_available(voice_name):
|
||
voices = self.voice_manager.get_available_voices()
|
||
await interaction.response.send_message(
|
||
f"❌ Voice `{voice_name}` not found.\n"
|
||
f"Available voices: {', '.join(f'`{v}`' for v in voices)}",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
# Validate pitch if provided
|
||
if preview_pitch is not None:
|
||
is_valid, error_msg = AudioEffects.validate_effect("pitch", preview_pitch)
|
||
if not is_valid:
|
||
await interaction.response.send_message(
|
||
f"❌ Invalid pitch value: {error_msg}",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
# Validate speed if provided
|
||
if preview_speed is not None:
|
||
is_valid, error_msg = AudioEffects.validate_effect("speed", preview_speed)
|
||
if not is_valid:
|
||
await interaction.response.send_message(
|
||
f"❌ Invalid speed value: {error_msg}",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
# Select a random preview line
|
||
preview_text = random.choice(PREVIEW_LINES)
|
||
|
||
# Create a preview message object with all necessary attributes
|
||
class PreviewMessage:
|
||
def __init__(self, user, channel, voice_channel):
|
||
self.author = user
|
||
self.channel = channel
|
||
self._voice_channel = voice_channel
|
||
|
||
@property
|
||
def voice(self):
|
||
class VoiceState:
|
||
def __init__(self, channel):
|
||
self.channel = channel
|
||
return VoiceState(self._voice_channel)
|
||
|
||
preview_message = PreviewMessage(
|
||
interaction.user,
|
||
interaction.channel,
|
||
interaction.user.voice.channel
|
||
)
|
||
|
||
# Use user's current effects if not overridden
|
||
user_effects = self.voice_manager.get_user_effects(interaction.user.id)
|
||
effect_overrides = {}
|
||
if preview_pitch is not None:
|
||
effect_overrides["pitch"] = preview_pitch
|
||
if preview_speed is not None:
|
||
effect_overrides["speed"] = preview_speed
|
||
|
||
# Use default effects from user settings for preview
|
||
preview_effects = user_effects.copy()
|
||
preview_effects.update(effect_overrides)
|
||
|
||
# Queue the preview with voice override and effects
|
||
await self.message_queue.put((preview_message, preview_text, voice_name, preview_effects))
|
||
|
||
# Build effect description
|
||
effect_desc = []
|
||
if preview_effects.get("pitch", 0) != 0:
|
||
effect_desc.append(f"pitch: {preview_effects['pitch']:+d}")
|
||
if preview_effects.get("speed", 1.0) != 1.0:
|
||
effect_desc.append(f"speed: {preview_effects['speed']:.1f}x")
|
||
|
||
effect_str = f" (with {', '.join(effect_desc)})" if effect_desc else ""
|
||
|
||
await interaction.response.send_message(
|
||
f"⏳ Queued preview for `{voice_name}`{effect_str}. Sample: \"{preview_text[:50]}{'...' if len(preview_text) > 50 else ''}\"",
|
||
ephemeral=True
|
||
)
|
||
|
||
async def setup_hook(self) -> None:
|
||
"""Called when the bot is starting up."""
|
||
print("Initializing TTS...")
|
||
print("Discovering available voices...")
|
||
await asyncio.to_thread(self.voice_manager.discover_voices)
|
||
await asyncio.to_thread(self.voice_manager.load_model)
|
||
|
||
# Pre-load the default voice if one is set
|
||
default = self.voice_manager.default_voice
|
||
if default:
|
||
print(f"Pre-loading default voice: {default}")
|
||
await asyncio.to_thread(self.voice_manager.get_voice_state, default)
|
||
|
||
self.loop.create_task(self.process_queue())
|
||
self.loop.create_task(self.check_inactivity())
|
||
|
||
async def on_ready(self) -> None:
|
||
print(f"Logged in as {self.user}")
|
||
print(f"Bot ID: {self.user.id}")
|
||
print(f"Monitoring channel ID: {Config.TEXT_CHANNEL_ID}")
|
||
print(f"Available voices: {', '.join(self.voice_manager.get_available_voices())}")
|
||
|
||
# Log registered commands before sync
|
||
registered_cmds = list(self.tree.get_commands())
|
||
print(f"\nCommands in tree before sync: {len(registered_cmds)}")
|
||
for cmd in registered_cmds:
|
||
print(f" - /{cmd.name}")
|
||
|
||
# Sync slash commands to each guild for immediate availability
|
||
print(f"\nConnected to {len(self.guilds)} guild(s):")
|
||
for guild in self.guilds:
|
||
print(f" - {guild.name} (ID: {guild.id})")
|
||
|
||
print("\nSyncing slash commands to guilds...")
|
||
sync_count = 0
|
||
for guild in self.guilds:
|
||
try:
|
||
# Copy global commands to this guild before syncing
|
||
# This is necessary for guild-specific command registration
|
||
self.tree.copy_global_to(guild=discord.Object(guild.id))
|
||
print(f" 📋 Copied global commands to guild: {guild.name}")
|
||
|
||
synced = await self.tree.sync(guild=discord.Object(guild.id))
|
||
print(f" ✓ Synced {len(synced)} commands to guild: {guild.name}")
|
||
for cmd in synced:
|
||
print(f" - /{cmd.name}")
|
||
sync_count += 1
|
||
except discord.errors.Forbidden as e:
|
||
print(f" ✗ Forbidden: Cannot sync to guild {guild.name}. Missing 'applications.commands' scope!")
|
||
print(f" Error: {e}")
|
||
except Exception as e:
|
||
print(f" ✗ Failed to sync to guild {guild.name}: {type(e).__name__}: {e}")
|
||
|
||
if sync_count == 0:
|
||
print("\n⚠️ WARNING: No guilds were synced! Commands won't appear in Discord.")
|
||
print(" Make sure the bot was invited with 'applications.commands' scope.")
|
||
else:
|
||
print(f"\n✓ Successfully synced to {sync_count}/{len(self.guilds)} guild(s)")
|
||
|
||
print("\nBot is ready!")
|
||
|
||
async def on_message(self, message: discord.Message) -> None:
|
||
if message.author.bot:
|
||
return
|
||
|
||
if message.channel.id != Config.TEXT_CHANNEL_ID:
|
||
return
|
||
|
||
if not message.content.strip():
|
||
return
|
||
|
||
if message.author.voice is None:
|
||
await message.channel.send(
|
||
f"{message.author.mention}, you need to be in a voice channel for me to speak!",
|
||
delete_after=5
|
||
)
|
||
return
|
||
|
||
await self.message_queue.put((message, message.content))
|
||
print(f"Queued message from {message.author}: {message.content[:50]}...")
|
||
|
||
await self.process_commands(message)
|
||
|
||
async def process_queue(self) -> None:
|
||
"""Process messages from the queue one at a time."""
|
||
while True:
|
||
queue_item = await self.message_queue.get()
|
||
|
||
# Handle queue items:
|
||
# - (message, text) - regular message
|
||
# - (message, text, voice_override) - preview with voice override
|
||
# - (message, text, voice_override, effects_dict) - preview with effect overrides
|
||
if len(queue_item) == 4 and isinstance(queue_item[3], dict):
|
||
message, text, voice_override, effect_overrides = queue_item
|
||
elif len(queue_item) == 3:
|
||
message, text, voice_override = queue_item
|
||
effect_overrides = {}
|
||
else:
|
||
message, text = queue_item
|
||
voice_override = None
|
||
effect_overrides = {}
|
||
|
||
try:
|
||
await self.speak_message(message, text, voice_override, effect_overrides)
|
||
except Exception as e:
|
||
print(f"Error processing message: {e}")
|
||
finally:
|
||
self.message_queue.task_done()
|
||
|
||
async def speak_message(
|
||
self,
|
||
message: discord.Message,
|
||
text: str,
|
||
voice_override: str | None = None,
|
||
effect_overrides: dict | None = None,
|
||
) -> None:
|
||
"""Generate TTS and play it in the user's voice channel."""
|
||
if message.author.voice is None:
|
||
return
|
||
|
||
voice_channel = message.author.voice.channel
|
||
|
||
voice_client = await self.ensure_voice_connection(voice_channel)
|
||
if voice_client is None:
|
||
return
|
||
|
||
print(f"Generating TTS for: {text[:50]}...")
|
||
|
||
# Get voice state (use override for previews, otherwise user's voice)
|
||
try:
|
||
if voice_override:
|
||
voice_state = await asyncio.to_thread(
|
||
self.voice_manager.get_voice_state, voice_override
|
||
)
|
||
else:
|
||
user_id = message.author.id
|
||
voice_state = await asyncio.to_thread(
|
||
self.voice_manager.get_user_voice_state, user_id
|
||
)
|
||
except Exception as e:
|
||
print(f"Error loading voice: {e}")
|
||
if not voice_override:
|
||
await message.channel.send(
|
||
f"{message.author.mention}, failed to load your voice. Use `/voice set` to choose a voice.",
|
||
delete_after=5
|
||
)
|
||
return
|
||
|
||
# Get user's effects and apply any overrides
|
||
user_effects = self.voice_manager.get_user_effects(message.author.id)
|
||
effects = user_effects.copy()
|
||
if effect_overrides:
|
||
effects.update(effect_overrides)
|
||
|
||
wav_bytes = await asyncio.to_thread(
|
||
self._generate_wav_bytes, voice_state, text, effects
|
||
)
|
||
|
||
audio_source = discord.FFmpegPCMAudio(
|
||
io.BytesIO(wav_bytes),
|
||
pipe=True,
|
||
options="-loglevel panic"
|
||
)
|
||
|
||
if voice_client.is_playing():
|
||
voice_client.stop()
|
||
|
||
play_complete = asyncio.Event()
|
||
|
||
def after_playing(error: Exception | None) -> None:
|
||
if error:
|
||
print(f"Playback error: {error}")
|
||
self.loop.call_soon_threadsafe(play_complete.set)
|
||
|
||
voice_client.play(audio_source, after=after_playing)
|
||
self.last_activity = time.time()
|
||
print(f"Playing audio in {voice_channel.name}")
|
||
|
||
await play_complete.wait()
|
||
|
||
def _generate_wav_bytes(
|
||
self,
|
||
voice_state: Any,
|
||
text: str,
|
||
effects: dict,
|
||
) -> bytes:
|
||
"""Generate audio and return as WAV file bytes."""
|
||
model = self.voice_manager.model
|
||
if model is None:
|
||
raise RuntimeError("Model not loaded")
|
||
|
||
audio = model.generate_audio(voice_state, text)
|
||
audio_np = audio.numpy()
|
||
|
||
# Ensure audio is 2D [samples, channels] for storage
|
||
if audio_np.ndim == 1:
|
||
audio_np = audio_np.reshape(-1, 1)
|
||
|
||
# Apply audio effects if any are active
|
||
pitch = effects.get("pitch", AudioEffects.PITCH_DEFAULT)
|
||
speed = effects.get("speed", AudioEffects.SPEED_DEFAULT)
|
||
echo = effects.get("echo", AudioEffects.ECHO_DEFAULT)
|
||
robot = effects.get("robot", AudioEffects.ROBOT_DEFAULT)
|
||
chorus = effects.get("chorus", AudioEffects.CHORUS_DEFAULT)
|
||
tremolo_depth = effects.get("tremolo_depth", AudioEffects.TREMOLO_DEPTH_DEFAULT)
|
||
tremolo_rate = effects.get("tremolo_rate", AudioEffects.TREMOLO_RATE_DEFAULT)
|
||
|
||
if any([pitch != 0, speed != 1.0, echo > 0, robot > 0, chorus > 0, tremolo_depth > 0]):
|
||
print(f"Applying {AudioEffects.count_active_effects(**effects)} effect(s)...")
|
||
# Squeeze to 1D for librosa effects, then reshape back
|
||
audio_1d = audio_np.squeeze()
|
||
audio_1d, show_processing = AudioEffects.apply_effects(
|
||
audio_1d, model.sample_rate,
|
||
pitch, speed, echo, robot, chorus, tremolo_depth, tremolo_rate
|
||
)
|
||
# Reshape back to 2D
|
||
audio_np = audio_1d.reshape(-1, 1)
|
||
if show_processing:
|
||
print("⚠️ Audio processing took longer than expected due to effects")
|
||
|
||
max_val = np.max(np.abs(audio_np))
|
||
if max_val > 0:
|
||
audio_np = audio_np / max_val
|
||
audio_int16 = (audio_np * 32767).astype(np.int16)
|
||
|
||
wav_buffer = io.BytesIO()
|
||
wavfile.write(wav_buffer, model.sample_rate, audio_int16)
|
||
wav_buffer.seek(0)
|
||
return wav_buffer.read()
|
||
|
||
async def check_inactivity(self) -> None:
|
||
"""Periodically check for inactivity and disconnect from voice channels."""
|
||
while True:
|
||
await asyncio.sleep(60) # Check every minute
|
||
|
||
if self.last_activity == 0.0:
|
||
continue
|
||
|
||
elapsed = time.time() - self.last_activity
|
||
if elapsed >= INACTIVITY_TIMEOUT:
|
||
# Disconnect from all voice channels
|
||
for guild in self.guilds:
|
||
if guild.voice_client is not None:
|
||
print(f"Disconnecting from {guild.name} due to inactivity")
|
||
await guild.voice_client.disconnect()
|
||
self.last_activity = 0.0
|
||
|
||
async def ensure_voice_connection(self, channel: discord.VoiceChannel) -> discord.VoiceClient | None:
|
||
"""Ensure we're connected to the specified voice channel."""
|
||
guild = channel.guild
|
||
|
||
if guild.voice_client is not None:
|
||
if guild.voice_client.channel.id == channel.id:
|
||
return guild.voice_client
|
||
await guild.voice_client.move_to(channel)
|
||
return guild.voice_client
|
||
|
||
try:
|
||
voice_client = await channel.connect(timeout=10.0)
|
||
self.last_activity = time.time()
|
||
return voice_client
|
||
except Exception as e:
|
||
print(f"Failed to connect to voice channel: {e}")
|
||
return None
|
||
|
||
|
||
def auto_update_dependencies() -> None:
|
||
"""Auto-update pip packages on startup."""
|
||
try:
|
||
print("Checking for package updates...")
|
||
result = subprocess.run(
|
||
[sys.executable, "-m", "pip", "install", "-r", "requirements.txt", "-U", "-q"],
|
||
capture_output=True,
|
||
text=True,
|
||
check=False
|
||
)
|
||
if result.returncode == 0:
|
||
print("Packages updated successfully (or already up to date)")
|
||
else:
|
||
print(f"Warning: Package update had issues: {result.stderr}")
|
||
except Exception as e:
|
||
print(f"Warning: Could not auto-update packages: {e}")
|
||
|
||
|
||
def main():
|
||
auto_update_dependencies()
|
||
|
||
errors = Config.validate()
|
||
if errors:
|
||
print("Configuration errors:")
|
||
for error in errors:
|
||
print(f" - {error}")
|
||
print("\nPlease create a .env file based on .env.example")
|
||
return
|
||
|
||
bot = TTSBot()
|
||
bot.run(Config.DISCORD_TOKEN)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|