__version__ = "1.1.0" import random import sys import os # Parse command line arguments before loading any config if len(sys.argv) > 1 and sys.argv[1] == "testing": os.environ["ENV_MODE"] = "testing" # Remove the argument so it doesn't interfere with other parsing sys.argv.pop(1) import numba_config import asyncio import io import subprocess import sys import time from typing import Any import discord import numpy as np import scipy.io.wavfile as wavfile from discord import app_commands from discord.ext import commands from audio_effects import AudioEffects from config import Config from voice_manager import VoiceManager # Inactivity timeout in seconds (10 minutes) INACTIVITY_TIMEOUT = 10 * 60 # Sample lines for voice preview PREVIEW_LINES = [ "Hello! This is how I sound. Choose me as your voice with /voice set.", "Testing, one, two, three! Can you hear me clearly?", "Here's a preview of my voice. Pretty cool, right?", "Greetings! I am ready to speak for you.", "Voice check! This is what I sound like.", "Audio test complete. This voice is ready to go!", "Sample message incoming. How do I sound to you?", "Preview mode activated. Testing speech synthesis.", ] class TTSBot(commands.Bot): """Discord bot that reads messages aloud using Pocket TTS.""" def __init__(self): intents = discord.Intents.default() intents.message_content = True intents.voice_states = True super().__init__(command_prefix="!", intents=intents) self.voice_manager = VoiceManager(Config.VOICES_DIR, Config.DEFAULT_VOICE) self.message_queue: asyncio.Queue[tuple[discord.Message, str] | tuple[discord.Message, str, str]] = asyncio.Queue() self.last_activity: float = 0.0 print("\n=== Command Registration ===") self._setup_slash_commands() self._setup_effects_commands() self._log_registered_commands() print("=== End Command Registration ===\n") def _log_registered_commands(self) -> None: """Log all registered commands to console.""" print("\nRegistered commands:") commands = list(self.tree.get_commands()) if not commands: print(" āš ļø No commands registered!") else: for cmd in commands: print(f" āœ“ /{cmd.name} - {cmd.description}") print(f"\nTotal commands registered: {len(commands)}") def _setup_slash_commands(self) -> None: """Set up slash commands for voice management.""" print("Setting up voice commands...") @self.tree.command(name="voice", description="Manage your TTS voice") @app_commands.describe( action="What to do", voice_name="Name of the voice (for 'set' or 'preview' action)", preview_pitch="Optional pitch for preview (-12 to 12, default: use your settings)", preview_speed="Optional speed for preview (0.5 to 2.0, default: use your settings)", ) @app_commands.choices(action=[ app_commands.Choice(name="list", value="list"), app_commands.Choice(name="set", value="set"), app_commands.Choice(name="current", value="current"), app_commands.Choice(name="refresh", value="refresh"), app_commands.Choice(name="preview", value="preview"), ]) async def voice_command( interaction: discord.Interaction, action: app_commands.Choice[str], voice_name: str | None = None, preview_pitch: int | None = None, preview_speed: float | None = None, ): if action.value == "list": await self._handle_voice_list(interaction) elif action.value == "set": await self._handle_voice_set(interaction, voice_name) elif action.value == "current": await self._handle_voice_current(interaction) elif action.value == "refresh": await self._handle_voice_refresh(interaction) elif action.value == "preview": await self._handle_voice_preview(interaction, voice_name, preview_pitch, preview_speed) @voice_command.autocomplete("voice_name") async def voice_name_autocomplete( interaction: discord.Interaction, current: str ) -> list[app_commands.Choice[str]]: voices = self.voice_manager.get_available_voices() return [ app_commands.Choice(name=v, value=v) for v in voices if current.lower() in v.lower() ][:25] def _setup_effects_commands(self) -> None: """Set up slash commands for audio effects management.""" print("Setting up effects commands...") @self.tree.command(name="effects", description="Manage your TTS audio effects") @app_commands.describe( action="What to do", effect_name="Name of the effect (for 'set' action)", value="Value for the effect (for 'set' action)" ) @app_commands.choices(action=[ app_commands.Choice(name="list", value="list"), app_commands.Choice(name="set", value="set"), app_commands.Choice(name="reset", value="reset"), ]) @app_commands.choices(effect_name=[ app_commands.Choice(name="pitch", value="pitch"), app_commands.Choice(name="speed", value="speed"), app_commands.Choice(name="echo", value="echo"), app_commands.Choice(name="robot", value="robot"), app_commands.Choice(name="chorus", value="chorus"), app_commands.Choice(name="tremolo_depth", value="tremolo_depth"), app_commands.Choice(name="tremolo_rate", value="tremolo_rate"), ]) async def effects_command( interaction: discord.Interaction, action: app_commands.Choice[str], effect_name: app_commands.Choice[str] | None = None, value: str | None = None ): if action.value == "list": await self._handle_effects_list(interaction) elif action.value == "set": await self._handle_effects_set(interaction, effect_name, value) elif action.value == "reset": await self._handle_effects_reset(interaction) async def _handle_effects_list(self, interaction: discord.Interaction) -> None: """Handle /effects list command.""" effects = self.voice_manager.get_user_effects(interaction.user.id) active_count = self.voice_manager.count_active_effects(interaction.user.id) lines = ["**Your Audio Effects:**\n"] # Pitch pitch_desc = AudioEffects.get_effect_description("pitch") pitch_val = AudioEffects.format_effect_value("pitch", effects["pitch"]) lines.append(f"šŸŽµ **Pitch**: {pitch_val}") lines.append(f" {pitch_desc}\n") # Speed speed_desc = AudioEffects.get_effect_description("speed") speed_val = AudioEffects.format_effect_value("speed", effects["speed"]) lines.append(f"⚔ **Speed**: {speed_val}") lines.append(f" {speed_desc}\n") # Echo echo_desc = AudioEffects.get_effect_description("echo") echo_val = AudioEffects.format_effect_value("echo", effects["echo"]) lines.append(f"šŸ”Š **Echo**: {echo_val}") lines.append(f" {echo_desc}\n") # Robot robot_desc = AudioEffects.get_effect_description("robot") robot_val = AudioEffects.format_effect_value("robot", effects["robot"]) lines.append(f"šŸ¤– **Robot**: {robot_val}") lines.append(f" {robot_desc}\n") # Chorus chorus_desc = AudioEffects.get_effect_description("chorus") chorus_val = AudioEffects.format_effect_value("chorus", effects["chorus"]) lines.append(f"šŸŽ¶ **Chorus**: {chorus_val}") lines.append(f" {chorus_desc}\n") # Tremolo Depth tremolo_depth_desc = AudioEffects.get_effect_description("tremolo_depth") tremolo_depth_val = AudioEffects.format_effect_value("tremolo_depth", effects["tremolo_depth"]) lines.append(f"ć€°ļø **Tremolo Depth**: {tremolo_depth_val}") lines.append(f" {tremolo_depth_desc}\n") # Tremolo Rate tremolo_rate_desc = AudioEffects.get_effect_description("tremolo_rate") tremolo_rate_val = AudioEffects.format_effect_value("tremolo_rate", effects["tremolo_rate"]) lines.append(f"šŸ“³ **Tremolo Rate**: {tremolo_rate_val}") lines.append(f" {tremolo_rate_desc}\n") # Active count warning lines.append(f"**Active Effects**: {active_count}") if active_count > 2: lines.append("āš ļø You have more than 2 active effects. Processing may be slower!") elif active_count > 0: lines.append("ā„¹ļø Add more effects for fun variations (may slow processing)") lines.append(f"\n*Use `/effects set ` to change settings*") lines.append(f"*Use `/effects reset` to clear all effects*") await interaction.response.send_message( "\n".join(lines), ephemeral=True ) async def _handle_effects_set( self, interaction: discord.Interaction, effect_name: app_commands.Choice[str] | None, value: str | None ) -> None: """Handle /effects set command.""" if not effect_name or value is None: await interaction.response.send_message( "āŒ Please provide both effect name and value. Example: `/effects set pitch 3`", ephemeral=True ) return success, message = self.voice_manager.set_user_effect( interaction.user.id, effect_name.value, value ) if success: await interaction.response.send_message( f"āœ… {message}", ephemeral=True ) else: await interaction.response.send_message( f"āŒ {message}", ephemeral=True ) async def _handle_effects_reset(self, interaction: discord.Interaction) -> None: """Handle /effects reset command with confirmation UI.""" # Check if user has any effects to reset active_count = self.voice_manager.count_active_effects(interaction.user.id) if active_count == 0: await interaction.response.send_message( "ā„¹ļø You don't have any active effects to reset.", ephemeral=True ) return # Create confirmation buttons class ConfirmResetView(discord.ui.View): def __init__(self, voice_manager, user_id): super().__init__(timeout=30) self.voice_manager = voice_manager self.user_id = user_id self.confirmed = False @discord.ui.button(label="āœ… Yes, Reset All", style=discord.ButtonStyle.danger) async def confirm_button(self, interaction: discord.Interaction, button: discord.ui.Button): if interaction.user.id != self.user_id: await interaction.response.send_message("This button is not for you!", ephemeral=True) return self.voice_manager.reset_user_effects(self.user_id) self.confirmed = True await interaction.response.edit_message( content="āœ… All audio effects have been reset to defaults!", view=None ) self.stop() @discord.ui.button(label="āŒ Cancel", style=discord.ButtonStyle.secondary) async def cancel_button(self, interaction: discord.Interaction, button: discord.ui.Button): if interaction.user.id != self.user_id: await interaction.response.send_message("This button is not for you!", ephemeral=True) return await interaction.response.edit_message( content="āŒ Reset cancelled. Your effects remain unchanged.", view=None ) self.stop() view = ConfirmResetView(self.voice_manager, interaction.user.id) await interaction.response.send_message( f"āš ļø **Reset Confirmation**\n\n" f"You have {active_count} active effect(s).\n" f"This will reset **all** your audio effects to defaults:\n" f"• Pitch: 0 (normal)\n" f"• Speed: 1.0x (normal)\n\n" f"Are you sure you want to continue?", view=view, ephemeral=True ) async def _handle_voice_list(self, interaction: discord.Interaction) -> None: """Handle /voice list command.""" voices = self.voice_manager.get_available_voices() loaded = self.voice_manager.get_loaded_voices() user_voice = self.voice_manager.get_user_voice(interaction.user.id) if not voices: await interaction.response.send_message( "āŒ No voices available. Add .wav files to the voices directory.", ephemeral=True ) return lines = ["**Available Voices:**\n"] for voice in voices: status = [] if voice == user_voice: status.append("āœ… your voice") if voice in loaded: status.append("šŸ“¦ loaded") status_str = f" ({', '.join(status)})" if status else "" lines.append(f"• `{voice}`{status_str}") lines.append(f"\n*Use `/voice set ` to change your voice.*") await interaction.response.send_message( "\n".join(lines), ephemeral=True ) async def _handle_voice_set(self, interaction: discord.Interaction, voice_name: str | None) -> None: """Handle /voice set command.""" if not voice_name: await interaction.response.send_message( "āŒ Please provide a voice name. Use `/voice list` to see available voices.", ephemeral=True ) return voice_name = voice_name.lower() if not self.voice_manager.is_voice_available(voice_name): voices = self.voice_manager.get_available_voices() await interaction.response.send_message( f"āŒ Voice `{voice_name}` not found.\n" f"Available voices: {', '.join(f'`{v}`' for v in voices)}", ephemeral=True ) return # Check if voice needs to be loaded needs_loading = not self.voice_manager.is_voice_loaded(voice_name) if needs_loading: await interaction.response.send_message( f"ā³ Loading voice `{voice_name}` for the first time... This may take a moment.", ephemeral=True ) try: await asyncio.to_thread(self.voice_manager.get_voice_state, voice_name) except Exception as e: await interaction.followup.send( f"āŒ Failed to load voice `{voice_name}`: {e}", ephemeral=True ) return self.voice_manager.set_user_voice(interaction.user.id, voice_name) if needs_loading: await interaction.followup.send( f"āœ… Voice changed to `{voice_name}`!", ephemeral=True ) else: await interaction.response.send_message( f"āœ… Voice changed to `{voice_name}`!", ephemeral=True ) async def _handle_voice_current(self, interaction: discord.Interaction) -> None: """Handle /voice current command.""" voice = self.voice_manager.get_user_voice(interaction.user.id) if voice: loaded = "(loaded)" if self.voice_manager.is_voice_loaded(voice) else "(not yet loaded)" await interaction.response.send_message( f"šŸŽ¤ Your current voice: `{voice}` {loaded}", ephemeral=True ) else: await interaction.response.send_message( "āŒ No voice set. Use `/voice set ` to choose a voice.", ephemeral=True ) async def _handle_voice_refresh(self, interaction: discord.Interaction) -> None: """Handle /voice refresh command.""" await interaction.response.send_message( "šŸ”„ Scanning for new voices...", ephemeral=True ) added, removed = await asyncio.to_thread(self.voice_manager.refresh_voices) lines = [] if added: lines.append(f"āœ… **New voices found:** {', '.join(f'`{v}`' for v in added)}") if removed: lines.append(f"āŒ **Voices removed:** {', '.join(f'`{v}`' for v in removed)}") if not added and not removed: lines.append("No changes detected.") total = len(self.voice_manager.get_available_voices()) lines.append(f"\n*Total voices available: {total}*") await interaction.followup.send( "\n".join(lines), ephemeral=True ) async def _handle_voice_preview( self, interaction: discord.Interaction, voice_name: str | None, preview_pitch: int | None = None, preview_speed: float | None = None, ) -> None: """Handle /voice preview command.""" if not voice_name: await interaction.response.send_message( "āŒ Please provide a voice name. Use `/voice list` to see available voices.", ephemeral=True ) return # Check if user is in a voice channel if interaction.user.voice is None: await interaction.response.send_message( "āŒ You need to be in a voice channel to hear a preview!", ephemeral=True ) return voice_name = voice_name.lower() # Validate voice exists if not self.voice_manager.is_voice_available(voice_name): voices = self.voice_manager.get_available_voices() await interaction.response.send_message( f"āŒ Voice `{voice_name}` not found.\n" f"Available voices: {', '.join(f'`{v}`' for v in voices)}", ephemeral=True ) return # Validate pitch if provided if preview_pitch is not None: is_valid, error_msg = AudioEffects.validate_effect("pitch", preview_pitch) if not is_valid: await interaction.response.send_message( f"āŒ Invalid pitch value: {error_msg}", ephemeral=True ) return # Validate speed if provided if preview_speed is not None: is_valid, error_msg = AudioEffects.validate_effect("speed", preview_speed) if not is_valid: await interaction.response.send_message( f"āŒ Invalid speed value: {error_msg}", ephemeral=True ) return # Select a random preview line preview_text = random.choice(PREVIEW_LINES) # Create a preview message object with all necessary attributes class PreviewMessage: def __init__(self, user, channel, voice_channel): self.author = user self.channel = channel self._voice_channel = voice_channel @property def voice(self): class VoiceState: def __init__(self, channel): self.channel = channel return VoiceState(self._voice_channel) preview_message = PreviewMessage( interaction.user, interaction.channel, interaction.user.voice.channel ) # Use user's current effects if not overridden user_effects = self.voice_manager.get_user_effects(interaction.user.id) effect_overrides = {} if preview_pitch is not None: effect_overrides["pitch"] = preview_pitch if preview_speed is not None: effect_overrides["speed"] = preview_speed # Use default effects from user settings for preview preview_effects = user_effects.copy() preview_effects.update(effect_overrides) # Queue the preview with voice override and effects await self.message_queue.put((preview_message, preview_text, voice_name, preview_effects)) # Build effect description effect_desc = [] if preview_effects.get("pitch", 0) != 0: effect_desc.append(f"pitch: {preview_effects['pitch']:+d}") if preview_effects.get("speed", 1.0) != 1.0: effect_desc.append(f"speed: {preview_effects['speed']:.1f}x") effect_str = f" (with {', '.join(effect_desc)})" if effect_desc else "" await interaction.response.send_message( f"ā³ Queued preview for `{voice_name}`{effect_str}. Sample: \"{preview_text[:50]}{'...' if len(preview_text) > 50 else ''}\"", ephemeral=True ) async def setup_hook(self) -> None: """Called when the bot is starting up.""" print("Initializing TTS...") print("Discovering available voices...") await asyncio.to_thread(self.voice_manager.discover_voices) await asyncio.to_thread(self.voice_manager.load_model) # Pre-load the default voice if one is set default = self.voice_manager.default_voice if default: print(f"Pre-loading default voice: {default}") await asyncio.to_thread(self.voice_manager.get_voice_state, default) self.loop.create_task(self.process_queue()) self.loop.create_task(self.check_inactivity()) async def on_ready(self) -> None: print(f"Logged in as {self.user}") print(f"Bot ID: {self.user.id}") print(f"Monitoring channel ID: {Config.TEXT_CHANNEL_ID}") print(f"Available voices: {', '.join(self.voice_manager.get_available_voices())}") # Log registered commands before sync registered_cmds = list(self.tree.get_commands()) print(f"\nCommands in tree before sync: {len(registered_cmds)}") for cmd in registered_cmds: print(f" - /{cmd.name}") # Sync slash commands to each guild for immediate availability print(f"\nConnected to {len(self.guilds)} guild(s):") for guild in self.guilds: print(f" - {guild.name} (ID: {guild.id})") print("\nSyncing slash commands to guilds...") sync_count = 0 for guild in self.guilds: try: # Copy global commands to this guild before syncing # This is necessary for guild-specific command registration self.tree.copy_global_to(guild=discord.Object(guild.id)) print(f" šŸ“‹ Copied global commands to guild: {guild.name}") synced = await self.tree.sync(guild=discord.Object(guild.id)) print(f" āœ“ Synced {len(synced)} commands to guild: {guild.name}") for cmd in synced: print(f" - /{cmd.name}") sync_count += 1 except discord.errors.Forbidden as e: print(f" āœ— Forbidden: Cannot sync to guild {guild.name}. Missing 'applications.commands' scope!") print(f" Error: {e}") except Exception as e: print(f" āœ— Failed to sync to guild {guild.name}: {type(e).__name__}: {e}") if sync_count == 0: print("\nāš ļø WARNING: No guilds were synced! Commands won't appear in Discord.") print(" Make sure the bot was invited with 'applications.commands' scope.") else: print(f"\nāœ“ Successfully synced to {sync_count}/{len(self.guilds)} guild(s)") print("\nBot is ready!") async def on_message(self, message: discord.Message) -> None: if message.author.bot: return if message.channel.id != Config.TEXT_CHANNEL_ID: return if not message.content.strip(): return if message.author.voice is None: await message.channel.send( f"{message.author.mention}, you need to be in a voice channel for me to speak!", delete_after=5 ) return await self.message_queue.put((message, message.content)) print(f"Queued message from {message.author}: {message.content[:50]}...") await self.process_commands(message) async def process_queue(self) -> None: """Process messages from the queue one at a time.""" while True: queue_item = await self.message_queue.get() # Handle queue items: # - (message, text) - regular message # - (message, text, voice_override) - preview with voice override # - (message, text, voice_override, effects_dict) - preview with effect overrides if len(queue_item) == 4 and isinstance(queue_item[3], dict): message, text, voice_override, effect_overrides = queue_item elif len(queue_item) == 3: message, text, voice_override = queue_item effect_overrides = {} else: message, text = queue_item voice_override = None effect_overrides = {} try: await self.speak_message(message, text, voice_override, effect_overrides) except Exception as e: print(f"Error processing message: {e}") finally: self.message_queue.task_done() async def speak_message( self, message: discord.Message, text: str, voice_override: str | None = None, effect_overrides: dict | None = None, ) -> None: """Generate TTS and play it in the user's voice channel.""" if message.author.voice is None: return voice_channel = message.author.voice.channel voice_client = await self.ensure_voice_connection(voice_channel) if voice_client is None: return print(f"Generating TTS for: {text[:50]}...") # Get voice state (use override for previews, otherwise user's voice) try: if voice_override: voice_state = await asyncio.to_thread( self.voice_manager.get_voice_state, voice_override ) else: user_id = message.author.id voice_state = await asyncio.to_thread( self.voice_manager.get_user_voice_state, user_id ) except Exception as e: print(f"Error loading voice: {e}") if not voice_override: await message.channel.send( f"{message.author.mention}, failed to load your voice. Use `/voice set` to choose a voice.", delete_after=5 ) return # Get user's effects and apply any overrides user_effects = self.voice_manager.get_user_effects(message.author.id) effects = user_effects.copy() if effect_overrides: effects.update(effect_overrides) wav_bytes = await asyncio.to_thread( self._generate_wav_bytes, voice_state, text, effects ) audio_source = discord.FFmpegPCMAudio( io.BytesIO(wav_bytes), pipe=True, options="-loglevel panic" ) if voice_client.is_playing(): voice_client.stop() play_complete = asyncio.Event() def after_playing(error: Exception | None) -> None: if error: print(f"Playback error: {error}") self.loop.call_soon_threadsafe(play_complete.set) voice_client.play(audio_source, after=after_playing) self.last_activity = time.time() print(f"Playing audio in {voice_channel.name}") await play_complete.wait() def _generate_wav_bytes( self, voice_state: Any, text: str, effects: dict, ) -> bytes: """Generate audio and return as WAV file bytes.""" model = self.voice_manager.model if model is None: raise RuntimeError("Model not loaded") audio = model.generate_audio(voice_state, text) audio_np = audio.numpy() # Ensure audio is 2D [samples, channels] for storage if audio_np.ndim == 1: audio_np = audio_np.reshape(-1, 1) # Apply audio effects if any are active pitch = effects.get("pitch", AudioEffects.PITCH_DEFAULT) speed = effects.get("speed", AudioEffects.SPEED_DEFAULT) echo = effects.get("echo", AudioEffects.ECHO_DEFAULT) robot = effects.get("robot", AudioEffects.ROBOT_DEFAULT) chorus = effects.get("chorus", AudioEffects.CHORUS_DEFAULT) tremolo_depth = effects.get("tremolo_depth", AudioEffects.TREMOLO_DEPTH_DEFAULT) tremolo_rate = effects.get("tremolo_rate", AudioEffects.TREMOLO_RATE_DEFAULT) if any([pitch != 0, speed != 1.0, echo > 0, robot > 0, chorus > 0, tremolo_depth > 0]): print(f"Applying {AudioEffects.count_active_effects(**effects)} effect(s)...") # Squeeze to 1D for librosa effects, then reshape back audio_1d = audio_np.squeeze() audio_1d, show_processing = AudioEffects.apply_effects( audio_1d, model.sample_rate, pitch, speed, echo, robot, chorus, tremolo_depth, tremolo_rate ) # Reshape back to 2D audio_np = audio_1d.reshape(-1, 1) if show_processing: print("āš ļø Audio processing took longer than expected due to effects") max_val = np.max(np.abs(audio_np)) if max_val > 0: audio_np = audio_np / max_val audio_int16 = (audio_np * 32767).astype(np.int16) wav_buffer = io.BytesIO() wavfile.write(wav_buffer, model.sample_rate, audio_int16) wav_buffer.seek(0) return wav_buffer.read() async def check_inactivity(self) -> None: """Periodically check for inactivity and disconnect from voice channels.""" while True: await asyncio.sleep(60) # Check every minute if self.last_activity == 0.0: continue elapsed = time.time() - self.last_activity if elapsed >= INACTIVITY_TIMEOUT: # Disconnect from all voice channels for guild in self.guilds: if guild.voice_client is not None: print(f"Disconnecting from {guild.name} due to inactivity") await guild.voice_client.disconnect() self.last_activity = 0.0 async def ensure_voice_connection(self, channel: discord.VoiceChannel) -> discord.VoiceClient | None: """Ensure we're connected to the specified voice channel.""" guild = channel.guild if guild.voice_client is not None: if guild.voice_client.channel.id == channel.id: return guild.voice_client await guild.voice_client.move_to(channel) return guild.voice_client try: voice_client = await channel.connect(timeout=10.0) self.last_activity = time.time() return voice_client except Exception as e: print(f"Failed to connect to voice channel: {e}") return None def auto_update_dependencies() -> None: """Auto-update pip packages on startup.""" try: print("Checking for package updates...") result = subprocess.run( [sys.executable, "-m", "pip", "install", "-r", "requirements.txt", "-U", "-q"], capture_output=True, text=True, check=False ) if result.returncode == 0: print("Packages updated successfully (or already up to date)") else: print(f"Warning: Package update had issues: {result.stderr}") except Exception as e: print(f"Warning: Could not auto-update packages: {e}") def main(): auto_update_dependencies() errors = Config.validate() if errors: print("Configuration errors:") for error in errors: print(f" - {error}") print("\nPlease create a .env file based on .env.example") return bot = TTSBot() bot.run(Config.DISCORD_TOKEN) if __name__ == "__main__": main()