- Added 8 random preview sample lines for voice testing - New /voice preview <name> command to hear voices before selecting - Previews play in queue like regular messages (no queue jumping) - Preview does NOT change user's active voice preference - Updated queue system to support voice override for previews - Added documentation for new command in README
505 lines
18 KiB
Python
505 lines
18 KiB
Python
__version__ = "1.1.0"
|
|
|
|
import random
|
|
import sys
|
|
import os
|
|
|
|
# Parse command line arguments before loading any config
|
|
if len(sys.argv) > 1 and sys.argv[1] == "testing":
|
|
os.environ["ENV_MODE"] = "testing"
|
|
# Remove the argument so it doesn't interfere with other parsing
|
|
sys.argv.pop(1)
|
|
|
|
import numba_config
|
|
import asyncio
|
|
import io
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from typing import Any
|
|
|
|
import discord
|
|
import numpy as np
|
|
import scipy.io.wavfile as wavfile
|
|
from discord import app_commands
|
|
from discord.ext import commands
|
|
|
|
from config import Config
|
|
from voice_manager import VoiceManager
|
|
|
|
|
|
# Inactivity timeout in seconds (10 minutes)
|
|
INACTIVITY_TIMEOUT = 10 * 60
|
|
|
|
# Sample lines for voice preview
|
|
PREVIEW_LINES = [
|
|
"Hello! This is how I sound. Choose me as your voice with /voice set.",
|
|
"Testing, one, two, three! Can you hear me clearly?",
|
|
"Here's a preview of my voice. Pretty cool, right?",
|
|
"Greetings! I am ready to speak for you.",
|
|
"Voice check! This is what I sound like.",
|
|
"Audio test complete. This voice is ready to go!",
|
|
"Sample message incoming. How do I sound to you?",
|
|
"Preview mode activated. Testing speech synthesis.",
|
|
]
|
|
|
|
|
|
class TTSBot(commands.Bot):
|
|
"""Discord bot that reads messages aloud using Pocket TTS."""
|
|
|
|
def __init__(self):
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
intents.voice_states = True
|
|
super().__init__(command_prefix="!", intents=intents)
|
|
|
|
self.voice_manager = VoiceManager(Config.VOICES_DIR, Config.DEFAULT_VOICE)
|
|
self.message_queue: asyncio.Queue[tuple[discord.Message, str] | tuple[discord.Message, str, str]] = asyncio.Queue()
|
|
self.last_activity: float = 0.0
|
|
|
|
self._setup_slash_commands()
|
|
|
|
def _setup_slash_commands(self) -> None:
|
|
"""Set up slash commands for voice management."""
|
|
|
|
@self.tree.command(name="voice", description="Manage your TTS voice")
|
|
@app_commands.describe(
|
|
action="What to do",
|
|
voice_name="Name of the voice (for 'set' action)"
|
|
)
|
|
@app_commands.choices(action=[
|
|
app_commands.Choice(name="list", value="list"),
|
|
app_commands.Choice(name="set", value="set"),
|
|
app_commands.Choice(name="current", value="current"),
|
|
app_commands.Choice(name="refresh", value="refresh"),
|
|
app_commands.Choice(name="preview", value="preview"),
|
|
])
|
|
async def voice_command(
|
|
interaction: discord.Interaction,
|
|
action: app_commands.Choice[str],
|
|
voice_name: str | None = None
|
|
):
|
|
if action.value == "list":
|
|
await self._handle_voice_list(interaction)
|
|
elif action.value == "set":
|
|
await self._handle_voice_set(interaction, voice_name)
|
|
elif action.value == "current":
|
|
await self._handle_voice_current(interaction)
|
|
elif action.value == "refresh":
|
|
await self._handle_voice_refresh(interaction)
|
|
elif action.value == "preview":
|
|
await self._handle_voice_preview(interaction, voice_name)
|
|
|
|
@voice_command.autocomplete("voice_name")
|
|
async def voice_name_autocomplete(
|
|
interaction: discord.Interaction,
|
|
current: str
|
|
) -> list[app_commands.Choice[str]]:
|
|
voices = self.voice_manager.get_available_voices()
|
|
return [
|
|
app_commands.Choice(name=v, value=v)
|
|
for v in voices
|
|
if current.lower() in v.lower()
|
|
][:25]
|
|
|
|
async def _handle_voice_list(self, interaction: discord.Interaction) -> None:
|
|
"""Handle /voice list command."""
|
|
voices = self.voice_manager.get_available_voices()
|
|
loaded = self.voice_manager.get_loaded_voices()
|
|
user_voice = self.voice_manager.get_user_voice(interaction.user.id)
|
|
|
|
if not voices:
|
|
await interaction.response.send_message(
|
|
"❌ No voices available. Add .wav files to the voices directory.",
|
|
ephemeral=True
|
|
)
|
|
return
|
|
|
|
lines = ["**Available Voices:**\n"]
|
|
for voice in voices:
|
|
status = []
|
|
if voice == user_voice:
|
|
status.append("✅ your voice")
|
|
if voice in loaded:
|
|
status.append("📦 loaded")
|
|
status_str = f" ({', '.join(status)})" if status else ""
|
|
lines.append(f"• `{voice}`{status_str}")
|
|
|
|
lines.append(f"\n*Use `/voice set <name>` to change your voice.*")
|
|
|
|
await interaction.response.send_message(
|
|
"\n".join(lines),
|
|
ephemeral=True
|
|
)
|
|
|
|
async def _handle_voice_set(self, interaction: discord.Interaction, voice_name: str | None) -> None:
|
|
"""Handle /voice set command."""
|
|
if not voice_name:
|
|
await interaction.response.send_message(
|
|
"❌ Please provide a voice name. Use `/voice list` to see available voices.",
|
|
ephemeral=True
|
|
)
|
|
return
|
|
|
|
voice_name = voice_name.lower()
|
|
|
|
if not self.voice_manager.is_voice_available(voice_name):
|
|
voices = self.voice_manager.get_available_voices()
|
|
await interaction.response.send_message(
|
|
f"❌ Voice `{voice_name}` not found.\n"
|
|
f"Available voices: {', '.join(f'`{v}`' for v in voices)}",
|
|
ephemeral=True
|
|
)
|
|
return
|
|
|
|
# Check if voice needs to be loaded
|
|
needs_loading = not self.voice_manager.is_voice_loaded(voice_name)
|
|
|
|
if needs_loading:
|
|
await interaction.response.send_message(
|
|
f"⏳ Loading voice `{voice_name}` for the first time... This may take a moment.",
|
|
ephemeral=True
|
|
)
|
|
try:
|
|
await asyncio.to_thread(self.voice_manager.get_voice_state, voice_name)
|
|
except Exception as e:
|
|
await interaction.followup.send(
|
|
f"❌ Failed to load voice `{voice_name}`: {e}",
|
|
ephemeral=True
|
|
)
|
|
return
|
|
|
|
self.voice_manager.set_user_voice(interaction.user.id, voice_name)
|
|
|
|
if needs_loading:
|
|
await interaction.followup.send(
|
|
f"✅ Voice changed to `{voice_name}`!",
|
|
ephemeral=True
|
|
)
|
|
else:
|
|
await interaction.response.send_message(
|
|
f"✅ Voice changed to `{voice_name}`!",
|
|
ephemeral=True
|
|
)
|
|
|
|
async def _handle_voice_current(self, interaction: discord.Interaction) -> None:
|
|
"""Handle /voice current command."""
|
|
voice = self.voice_manager.get_user_voice(interaction.user.id)
|
|
if voice:
|
|
loaded = "(loaded)" if self.voice_manager.is_voice_loaded(voice) else "(not yet loaded)"
|
|
await interaction.response.send_message(
|
|
f"🎤 Your current voice: `{voice}` {loaded}",
|
|
ephemeral=True
|
|
)
|
|
else:
|
|
await interaction.response.send_message(
|
|
"❌ No voice set. Use `/voice set <name>` to choose a voice.",
|
|
ephemeral=True
|
|
)
|
|
|
|
async def _handle_voice_refresh(self, interaction: discord.Interaction) -> None:
|
|
"""Handle /voice refresh command."""
|
|
await interaction.response.send_message(
|
|
"🔄 Scanning for new voices...",
|
|
ephemeral=True
|
|
)
|
|
|
|
added, removed = await asyncio.to_thread(self.voice_manager.refresh_voices)
|
|
|
|
lines = []
|
|
if added:
|
|
lines.append(f"✅ **New voices found:** {', '.join(f'`{v}`' for v in added)}")
|
|
if removed:
|
|
lines.append(f"❌ **Voices removed:** {', '.join(f'`{v}`' for v in removed)}")
|
|
if not added and not removed:
|
|
lines.append("No changes detected.")
|
|
|
|
total = len(self.voice_manager.get_available_voices())
|
|
lines.append(f"\n*Total voices available: {total}*")
|
|
|
|
await interaction.followup.send(
|
|
"\n".join(lines),
|
|
ephemeral=True
|
|
)
|
|
|
|
async def _handle_voice_preview(self, interaction: discord.Interaction, voice_name: str | None) -> None:
|
|
"""Handle /voice preview command."""
|
|
if not voice_name:
|
|
await interaction.response.send_message(
|
|
"❌ Please provide a voice name. Use `/voice list` to see available voices.",
|
|
ephemeral=True
|
|
)
|
|
return
|
|
|
|
# Check if user is in a voice channel
|
|
if interaction.user.voice is None:
|
|
await interaction.response.send_message(
|
|
"❌ You need to be in a voice channel to hear a preview!",
|
|
ephemeral=True
|
|
)
|
|
return
|
|
|
|
voice_name = voice_name.lower()
|
|
|
|
# Validate voice exists
|
|
if not self.voice_manager.is_voice_available(voice_name):
|
|
voices = self.voice_manager.get_available_voices()
|
|
await interaction.response.send_message(
|
|
f"❌ Voice `{voice_name}` not found.\n"
|
|
f"Available voices: {', '.join(f'`{v}`' for v in voices)}",
|
|
ephemeral=True
|
|
)
|
|
return
|
|
|
|
# Select a random preview line
|
|
preview_text = random.choice(PREVIEW_LINES)
|
|
|
|
# Create a preview message object with all necessary attributes
|
|
class PreviewMessage:
|
|
def __init__(self, user, channel, voice_channel):
|
|
self.author = user
|
|
self.channel = channel
|
|
self._voice_channel = voice_channel
|
|
|
|
@property
|
|
def voice(self):
|
|
class VoiceState:
|
|
def __init__(self, channel):
|
|
self.channel = channel
|
|
return VoiceState(self._voice_channel)
|
|
|
|
preview_message = PreviewMessage(
|
|
interaction.user,
|
|
interaction.channel,
|
|
interaction.user.voice.channel
|
|
)
|
|
|
|
# Queue the preview with voice override
|
|
await self.message_queue.put((preview_message, preview_text, voice_name))
|
|
|
|
await interaction.response.send_message(
|
|
f"⏳ Queued preview for `{voice_name}`. Sample: \"{preview_text[:50]}{'...' if len(preview_text) > 50 else ''}\"",
|
|
ephemeral=True
|
|
)
|
|
|
|
async def setup_hook(self) -> None:
|
|
"""Called when the bot is starting up."""
|
|
print("Initializing TTS...")
|
|
print("Discovering available voices...")
|
|
await asyncio.to_thread(self.voice_manager.discover_voices)
|
|
await asyncio.to_thread(self.voice_manager.load_model)
|
|
|
|
# Pre-load the default voice if one is set
|
|
default = self.voice_manager.default_voice
|
|
if default:
|
|
print(f"Pre-loading default voice: {default}")
|
|
await asyncio.to_thread(self.voice_manager.get_voice_state, default)
|
|
|
|
self.loop.create_task(self.process_queue())
|
|
self.loop.create_task(self.check_inactivity())
|
|
|
|
# Sync slash commands
|
|
print("Syncing slash commands...")
|
|
await self.tree.sync()
|
|
print("Slash commands synced!")
|
|
|
|
async def on_ready(self) -> None:
|
|
print(f"Logged in as {self.user}")
|
|
print(f"Monitoring channel ID: {Config.TEXT_CHANNEL_ID}")
|
|
print(f"Available voices: {', '.join(self.voice_manager.get_available_voices())}")
|
|
print("Bot is ready!")
|
|
|
|
async def on_message(self, message: discord.Message) -> None:
|
|
if message.author.bot:
|
|
return
|
|
|
|
if message.channel.id != Config.TEXT_CHANNEL_ID:
|
|
return
|
|
|
|
if not message.content.strip():
|
|
return
|
|
|
|
if message.author.voice is None:
|
|
await message.channel.send(
|
|
f"{message.author.mention}, you need to be in a voice channel for me to speak!",
|
|
delete_after=5
|
|
)
|
|
return
|
|
|
|
await self.message_queue.put((message, message.content))
|
|
print(f"Queued message from {message.author}: {message.content[:50]}...")
|
|
|
|
await self.process_commands(message)
|
|
|
|
async def process_queue(self) -> None:
|
|
"""Process messages from the queue one at a time."""
|
|
while True:
|
|
queue_item = await self.message_queue.get()
|
|
|
|
# Handle both regular messages (message, text) and previews (message, text, voice_name)
|
|
if len(queue_item) == 3:
|
|
message, text, voice_override = queue_item
|
|
else:
|
|
message, text = queue_item
|
|
voice_override = None
|
|
|
|
try:
|
|
await self.speak_message(message, text, voice_override)
|
|
except Exception as e:
|
|
print(f"Error processing message: {e}")
|
|
finally:
|
|
self.message_queue.task_done()
|
|
|
|
async def speak_message(self, message: discord.Message, text: str, voice_override: str | None = None) -> None:
|
|
"""Generate TTS and play it in the user's voice channel."""
|
|
if message.author.voice is None:
|
|
return
|
|
|
|
voice_channel = message.author.voice.channel
|
|
|
|
voice_client = await self.ensure_voice_connection(voice_channel)
|
|
if voice_client is None:
|
|
return
|
|
|
|
print(f"Generating TTS for: {text[:50]}...")
|
|
|
|
# Get voice state (use override for previews, otherwise user's voice)
|
|
try:
|
|
if voice_override:
|
|
voice_state = await asyncio.to_thread(
|
|
self.voice_manager.get_voice_state, voice_override
|
|
)
|
|
else:
|
|
user_id = message.author.id
|
|
voice_state = await asyncio.to_thread(
|
|
self.voice_manager.get_user_voice_state, user_id
|
|
)
|
|
except Exception as e:
|
|
print(f"Error loading voice: {e}")
|
|
if not voice_override:
|
|
await message.channel.send(
|
|
f"{message.author.mention}, failed to load your voice. Use `/voice set` to choose a voice.",
|
|
delete_after=5
|
|
)
|
|
return
|
|
|
|
wav_bytes = await asyncio.to_thread(
|
|
self._generate_wav_bytes, voice_state, text
|
|
)
|
|
|
|
audio_source = discord.FFmpegPCMAudio(
|
|
io.BytesIO(wav_bytes),
|
|
pipe=True,
|
|
options="-loglevel panic"
|
|
)
|
|
|
|
if voice_client.is_playing():
|
|
voice_client.stop()
|
|
|
|
play_complete = asyncio.Event()
|
|
|
|
def after_playing(error: Exception | None) -> None:
|
|
if error:
|
|
print(f"Playback error: {error}")
|
|
self.loop.call_soon_threadsafe(play_complete.set)
|
|
|
|
voice_client.play(audio_source, after=after_playing)
|
|
self.last_activity = time.time()
|
|
print(f"Playing audio in {voice_channel.name}")
|
|
|
|
await play_complete.wait()
|
|
|
|
def _generate_wav_bytes(self, voice_state: Any, text: str) -> bytes:
|
|
"""Generate audio and return as WAV file bytes."""
|
|
model = self.voice_manager.model
|
|
if model is None:
|
|
raise RuntimeError("Model not loaded")
|
|
|
|
audio = model.generate_audio(voice_state, text)
|
|
audio_np = audio.numpy()
|
|
|
|
if audio_np.ndim == 1:
|
|
audio_np = audio_np.reshape(-1, 1)
|
|
|
|
max_val = np.max(np.abs(audio_np))
|
|
if max_val > 0:
|
|
audio_np = audio_np / max_val
|
|
audio_int16 = (audio_np * 32767).astype(np.int16)
|
|
|
|
wav_buffer = io.BytesIO()
|
|
wavfile.write(wav_buffer, model.sample_rate, audio_int16)
|
|
wav_buffer.seek(0)
|
|
return wav_buffer.read()
|
|
|
|
async def check_inactivity(self) -> None:
|
|
"""Periodically check for inactivity and disconnect from voice channels."""
|
|
while True:
|
|
await asyncio.sleep(60) # Check every minute
|
|
|
|
if self.last_activity == 0.0:
|
|
continue
|
|
|
|
elapsed = time.time() - self.last_activity
|
|
if elapsed >= INACTIVITY_TIMEOUT:
|
|
# Disconnect from all voice channels
|
|
for guild in self.guilds:
|
|
if guild.voice_client is not None:
|
|
print(f"Disconnecting from {guild.name} due to inactivity")
|
|
await guild.voice_client.disconnect()
|
|
self.last_activity = 0.0
|
|
|
|
async def ensure_voice_connection(self, channel: discord.VoiceChannel) -> discord.VoiceClient | None:
|
|
"""Ensure we're connected to the specified voice channel."""
|
|
guild = channel.guild
|
|
|
|
if guild.voice_client is not None:
|
|
if guild.voice_client.channel.id == channel.id:
|
|
return guild.voice_client
|
|
await guild.voice_client.move_to(channel)
|
|
return guild.voice_client
|
|
|
|
try:
|
|
voice_client = await channel.connect(timeout=10.0)
|
|
self.last_activity = time.time()
|
|
return voice_client
|
|
except Exception as e:
|
|
print(f"Failed to connect to voice channel: {e}")
|
|
return None
|
|
|
|
|
|
def auto_update_dependencies() -> None:
|
|
"""Auto-update pip packages on startup."""
|
|
try:
|
|
print("Checking for package updates...")
|
|
result = subprocess.run(
|
|
[sys.executable, "-m", "pip", "install", "-r", "requirements.txt", "-U", "-q"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=False
|
|
)
|
|
if result.returncode == 0:
|
|
print("Packages updated successfully (or already up to date)")
|
|
else:
|
|
print(f"Warning: Package update had issues: {result.stderr}")
|
|
except Exception as e:
|
|
print(f"Warning: Could not auto-update packages: {e}")
|
|
|
|
|
|
def main():
|
|
auto_update_dependencies()
|
|
|
|
errors = Config.validate()
|
|
if errors:
|
|
print("Configuration errors:")
|
|
for error in errors:
|
|
print(f" - {error}")
|
|
print("\nPlease create a .env file based on .env.example")
|
|
return
|
|
|
|
bot = TTSBot()
|
|
bot.run(Config.DISCORD_TOKEN)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|