__version__ = "1.1.0" import sys import os # Parse command line arguments before loading any config if len(sys.argv) > 1 and sys.argv[1] == "testing": os.environ["ENV_MODE"] = "testing" # Remove the argument so it doesn't interfere with other parsing sys.argv.pop(1) import numba_config import asyncio import io import subprocess import sys import time from typing import Any import discord import numpy as np import scipy.io.wavfile as wavfile from discord import app_commands from discord.ext import commands from config import Config from voice_manager import VoiceManager # Inactivity timeout in seconds (10 minutes) INACTIVITY_TIMEOUT = 10 * 60 class TTSBot(commands.Bot): """Discord bot that reads messages aloud using Pocket TTS.""" def __init__(self): intents = discord.Intents.default() intents.message_content = True intents.voice_states = True super().__init__(command_prefix="!", intents=intents) self.voice_manager = VoiceManager(Config.VOICES_DIR, Config.DEFAULT_VOICE) self.message_queue: asyncio.Queue[tuple[discord.Message, str]] = asyncio.Queue() self.last_activity: float = 0.0 self._setup_slash_commands() def _setup_slash_commands(self) -> None: """Set up slash commands for voice management.""" @self.tree.command(name="voice", description="Manage your TTS voice") @app_commands.describe( action="What to do", voice_name="Name of the voice (for 'set' action)" ) @app_commands.choices(action=[ app_commands.Choice(name="list", value="list"), app_commands.Choice(name="set", value="set"), app_commands.Choice(name="current", value="current"), app_commands.Choice(name="refresh", value="refresh"), ]) async def voice_command( interaction: discord.Interaction, action: app_commands.Choice[str], voice_name: str | None = None ): if action.value == "list": await self._handle_voice_list(interaction) elif action.value == "set": await self._handle_voice_set(interaction, voice_name) elif action.value == "current": await self._handle_voice_current(interaction) elif action.value == "refresh": await self._handle_voice_refresh(interaction) @voice_command.autocomplete("voice_name") async def voice_name_autocomplete( interaction: discord.Interaction, current: str ) -> list[app_commands.Choice[str]]: voices = self.voice_manager.get_available_voices() return [ app_commands.Choice(name=v, value=v) for v in voices if current.lower() in v.lower() ][:25] async def _handle_voice_list(self, interaction: discord.Interaction) -> None: """Handle /voice list command.""" voices = self.voice_manager.get_available_voices() loaded = self.voice_manager.get_loaded_voices() user_voice = self.voice_manager.get_user_voice(interaction.user.id) if not voices: await interaction.response.send_message( "❌ No voices available. Add .wav files to the voices directory.", ephemeral=True ) return lines = ["**Available Voices:**\n"] for voice in voices: status = [] if voice == user_voice: status.append("✅ your voice") if voice in loaded: status.append("📦 loaded") status_str = f" ({', '.join(status)})" if status else "" lines.append(f"• `{voice}`{status_str}") lines.append(f"\n*Use `/voice set ` to change your voice.*") await interaction.response.send_message( "\n".join(lines), ephemeral=True ) async def _handle_voice_set(self, interaction: discord.Interaction, voice_name: str | None) -> None: """Handle /voice set command.""" if not voice_name: await interaction.response.send_message( "❌ Please provide a voice name. Use `/voice list` to see available voices.", ephemeral=True ) return voice_name = voice_name.lower() if not self.voice_manager.is_voice_available(voice_name): voices = self.voice_manager.get_available_voices() await interaction.response.send_message( f"❌ Voice `{voice_name}` not found.\n" f"Available voices: {', '.join(f'`{v}`' for v in voices)}", ephemeral=True ) return # Check if voice needs to be loaded needs_loading = not self.voice_manager.is_voice_loaded(voice_name) if needs_loading: await interaction.response.send_message( f"⏳ Loading voice `{voice_name}` for the first time... This may take a moment.", ephemeral=True ) try: await asyncio.to_thread(self.voice_manager.get_voice_state, voice_name) except Exception as e: await interaction.followup.send( f"❌ Failed to load voice `{voice_name}`: {e}", ephemeral=True ) return self.voice_manager.set_user_voice(interaction.user.id, voice_name) if needs_loading: await interaction.followup.send( f"✅ Voice changed to `{voice_name}`!", ephemeral=True ) else: await interaction.response.send_message( f"✅ Voice changed to `{voice_name}`!", ephemeral=True ) async def _handle_voice_current(self, interaction: discord.Interaction) -> None: """Handle /voice current command.""" voice = self.voice_manager.get_user_voice(interaction.user.id) if voice: loaded = "(loaded)" if self.voice_manager.is_voice_loaded(voice) else "(not yet loaded)" await interaction.response.send_message( f"🎤 Your current voice: `{voice}` {loaded}", ephemeral=True ) else: await interaction.response.send_message( "❌ No voice set. Use `/voice set ` to choose a voice.", ephemeral=True ) async def _handle_voice_refresh(self, interaction: discord.Interaction) -> None: """Handle /voice refresh command.""" await interaction.response.send_message( "🔄 Scanning for new voices...", ephemeral=True ) added, removed = await asyncio.to_thread(self.voice_manager.refresh_voices) lines = [] if added: lines.append(f"✅ **New voices found:** {', '.join(f'`{v}`' for v in added)}") if removed: lines.append(f"❌ **Voices removed:** {', '.join(f'`{v}`' for v in removed)}") if not added and not removed: lines.append("No changes detected.") total = len(self.voice_manager.get_available_voices()) lines.append(f"\n*Total voices available: {total}*") await interaction.followup.send( "\n".join(lines), ephemeral=True ) async def setup_hook(self) -> None: """Called when the bot is starting up.""" print("Initializing TTS...") print("Discovering available voices...") await asyncio.to_thread(self.voice_manager.discover_voices) await asyncio.to_thread(self.voice_manager.load_model) # Pre-load the default voice if one is set default = self.voice_manager.default_voice if default: print(f"Pre-loading default voice: {default}") await asyncio.to_thread(self.voice_manager.get_voice_state, default) self.loop.create_task(self.process_queue()) self.loop.create_task(self.check_inactivity()) # Sync slash commands print("Syncing slash commands...") await self.tree.sync() print("Slash commands synced!") async def on_ready(self) -> None: print(f"Logged in as {self.user}") print(f"Monitoring channel ID: {Config.TEXT_CHANNEL_ID}") print(f"Available voices: {', '.join(self.voice_manager.get_available_voices())}") print("Bot is ready!") async def on_message(self, message: discord.Message) -> None: if message.author.bot: return if message.channel.id != Config.TEXT_CHANNEL_ID: return if not message.content.strip(): return if message.author.voice is None: await message.channel.send( f"{message.author.mention}, you need to be in a voice channel for me to speak!", delete_after=5 ) return await self.message_queue.put((message, message.content)) print(f"Queued message from {message.author}: {message.content[:50]}...") await self.process_commands(message) async def process_queue(self) -> None: """Process messages from the queue one at a time.""" while True: message, text = await self.message_queue.get() try: await self.speak_message(message, text) except Exception as e: print(f"Error processing message: {e}") finally: self.message_queue.task_done() async def speak_message(self, message: discord.Message, text: str) -> None: """Generate TTS and play it in the user's voice channel.""" if message.author.voice is None: return voice_channel = message.author.voice.channel voice_client = await self.ensure_voice_connection(voice_channel) if voice_client is None: return print(f"Generating TTS for: {text[:50]}...") # Get user's voice (loads on-demand if needed) user_id = message.author.id try: voice_state = await asyncio.to_thread( self.voice_manager.get_user_voice_state, user_id ) except Exception as e: print(f"Error loading voice for user {user_id}: {e}") await message.channel.send( f"{message.author.mention}, failed to load your voice. Use `/voice set` to choose a voice.", delete_after=5 ) return wav_bytes = await asyncio.to_thread( self._generate_wav_bytes, voice_state, text ) audio_source = discord.FFmpegPCMAudio( io.BytesIO(wav_bytes), pipe=True, options="-loglevel panic" ) if voice_client.is_playing(): voice_client.stop() play_complete = asyncio.Event() def after_playing(error: Exception | None) -> None: if error: print(f"Playback error: {error}") self.loop.call_soon_threadsafe(play_complete.set) voice_client.play(audio_source, after=after_playing) self.last_activity = time.time() print(f"Playing audio in {voice_channel.name}") await play_complete.wait() def _generate_wav_bytes(self, voice_state: Any, text: str) -> bytes: """Generate audio and return as WAV file bytes.""" model = self.voice_manager.model if model is None: raise RuntimeError("Model not loaded") audio = model.generate_audio(voice_state, text) audio_np = audio.numpy() if audio_np.ndim == 1: audio_np = audio_np.reshape(-1, 1) max_val = np.max(np.abs(audio_np)) if max_val > 0: audio_np = audio_np / max_val audio_int16 = (audio_np * 32767).astype(np.int16) wav_buffer = io.BytesIO() wavfile.write(wav_buffer, model.sample_rate, audio_int16) wav_buffer.seek(0) return wav_buffer.read() async def check_inactivity(self) -> None: """Periodically check for inactivity and disconnect from voice channels.""" while True: await asyncio.sleep(60) # Check every minute if self.last_activity == 0.0: continue elapsed = time.time() - self.last_activity if elapsed >= INACTIVITY_TIMEOUT: # Disconnect from all voice channels for guild in self.guilds: if guild.voice_client is not None: print(f"Disconnecting from {guild.name} due to inactivity") await guild.voice_client.disconnect() self.last_activity = 0.0 async def ensure_voice_connection(self, channel: discord.VoiceChannel) -> discord.VoiceClient | None: """Ensure we're connected to the specified voice channel.""" guild = channel.guild if guild.voice_client is not None: if guild.voice_client.channel.id == channel.id: return guild.voice_client await guild.voice_client.move_to(channel) return guild.voice_client try: voice_client = await channel.connect(timeout=10.0) self.last_activity = time.time() return voice_client except Exception as e: print(f"Failed to connect to voice channel: {e}") return None def auto_update_dependencies() -> None: """Auto-update pip packages on startup.""" try: print("Checking for package updates...") result = subprocess.run( [sys.executable, "-m", "pip", "install", "-r", "requirements.txt", "-U", "-q"], capture_output=True, text=True, check=False ) if result.returncode == 0: print("Packages updated successfully (or already up to date)") else: print(f"Warning: Package update had issues: {result.stderr}") except Exception as e: print(f"Warning: Could not auto-update packages: {e}") def main(): auto_update_dependencies() errors = Config.validate() if errors: print("Configuration errors:") for error in errors: print(f" - {error}") print("\nPlease create a .env file based on .env.example") return bot = TTSBot() bot.run(Config.DISCORD_TOKEN) if __name__ == "__main__": main()