A modern, easy-to-use, feature-rich async-ready API wrapper for Discord written in Python
—
Voice channel connection, audio streaming, and voice-related functionality for music bots and voice applications with comprehensive audio handling, connection management, and voice client operations for Discord voice features.
Voice channel connection and management for audio streaming and voice functionality.
class VoiceClient:
"""Voice connection to a Discord voice channel."""
def __init__(self, client: Client, channel: VoiceChannel):
"""
Initialize voice client.
Parameters:
- client: Bot client
- channel: Voice channel to connect to
"""
channel: VoiceChannel
guild: Guild
user: ClientUser
session_id: str
token: str
endpoint: str
socket: VoiceWebSocket
loop: asyncio.AbstractEventLoop
_runner: asyncio.Task
_player: Optional[AudioPlayer]
@property
def latency(self) -> float:
"""Voice connection latency in seconds."""
@property
def average_latency(self) -> float:
"""Average voice connection latency."""
def is_connected(self) -> bool:
"""
Check if voice client is connected.
Returns:
True if connected to voice
"""
def is_playing(self) -> bool:
"""
Check if audio is currently playing.
Returns:
True if audio is playing
"""
def is_paused(self) -> bool:
"""
Check if audio playback is paused.
Returns:
True if audio is paused
"""
async def connect(
self,
*,
timeout: float = 60.0,
reconnect: bool = True,
self_deaf: bool = False,
self_mute: bool = False
) -> None:
"""
Connect to the voice channel.
Parameters:
- timeout: Connection timeout
- reconnect: Whether to reconnect on disconnection
- self_deaf: Whether to self-deafen
- self_mute: Whether to self-mute
"""
async def disconnect(self, *, force: bool = False) -> None:
"""
Disconnect from voice channel.
Parameters:
- force: Force disconnection without cleanup
"""
async def move_to(self, channel: Optional[VoiceChannel]) -> None:
"""
Move to a different voice channel.
Parameters:
- channel: New voice channel (None to disconnect)
"""
def play(
self,
source: AudioSource,
*,
after: Optional[Callable[[Optional[Exception]], None]] = None
) -> None:
"""
Play audio from source.
Parameters:
- source: Audio source to play
- after: Callback when playback finishes
"""
def stop(self) -> None:
"""Stop audio playback."""
def pause(self) -> None:
"""Pause audio playback."""
def resume(self) -> None:
"""Resume audio playback."""
@property
def source(self) -> Optional[AudioSource]:
"""Currently playing audio source."""
def send_audio_packet(self, data: bytes, *, encode: bool = True) -> None:
"""
Send raw audio packet.
Parameters:
- data: Audio data
- encode: Whether to encode with Opus
"""
async def ws_connect(self, host: str, port: int) -> VoiceWebSocket:
"""
Connect to voice WebSocket.
Parameters:
- host: Voice server host
- port: Voice server port
Returns:
Voice WebSocket connection
"""
class VoiceProtocol:
"""Base protocol for voice connections."""
def __init__(self, client: Client):
self.client = client
async def connect(self, channel: VoiceChannel) -> VoiceClient:
"""Connect to voice channel."""
async def disconnect(self) -> None:
"""Disconnect from voice."""Audio source classes for different types of audio input and streaming.
class AudioSource:
"""Base class for audio sources."""
def read(self) -> bytes:
"""
Read audio data.
Returns:
Audio frame data (20ms of audio)
"""
def cleanup(self) -> None:
"""Clean up audio source resources."""
def is_opus(self) -> bool:
"""
Check if source provides Opus-encoded audio.
Returns:
True if Opus-encoded
"""
class FFmpegAudio(AudioSource):
"""Audio source using FFmpeg for processing."""
def __init__(
self,
source: Union[str, io.BufferedIOBase],
*,
executable: str = 'ffmpeg',
pipe: bool = False,
stderr: Optional[io.TextIOBase] = None,
before_options: Optional[str] = None,
options: Optional[str] = None
):
"""
Initialize FFmpeg audio source.
Parameters:
- source: Audio file path or stream
- executable: FFmpeg executable path
- pipe: Whether to use pipe input
- stderr: Stderr stream for FFmpeg
- before_options: FFmpeg input options
- options: FFmpeg output options
"""
@classmethod
def from_probe(
cls,
source: Union[str, io.BufferedIOBase],
**kwargs
) -> FFmpegAudio:
"""
Create FFmpeg source with automatic format detection.
Parameters:
- source: Audio source
- kwargs: Additional FFmpeg options
Returns:
FFmpeg audio source
"""
class FFmpegPCMAudio(FFmpegAudio):
"""FFmpeg audio source outputting PCM audio."""
def __init__(
self,
source: Union[str, io.BufferedIOBase],
**kwargs
):
"""
Initialize FFmpeg PCM audio source.
Parameters:
- source: Audio source
- kwargs: FFmpeg options
"""
class FFmpegOpusAudio(FFmpegAudio):
"""FFmpeg audio source outputting Opus audio."""
def __init__(
self,
source: Union[str, io.BufferedIOBase],
**kwargs
):
"""
Initialize FFmpeg Opus audio source.
Parameters:
- source: Audio source
- kwargs: FFmpeg options
"""
class PCMAudio(AudioSource):
"""Raw PCM audio source."""
def __init__(self, stream: io.BufferedIOBase):
"""
Initialize PCM audio source.
Parameters:
- stream: PCM audio stream
"""
class PCMVolumeTransformer(AudioSource):
"""Audio source with volume transformation."""
def __init__(self, original: AudioSource, volume: float = 1.0):
"""
Initialize volume transformer.
Parameters:
- original: Original audio source
- volume: Volume multiplier (0.0-2.0)
"""
@property
def volume(self) -> float:
"""Current volume level."""
@volume.setter
def volume(self, value: float) -> None:
"""Set volume level."""
class AudioPlayer:
"""Audio player for managing playback."""
def __init__(
self,
source: AudioSource,
client: VoiceClient,
*,
after: Optional[Callable[[Optional[Exception]], None]] = None
):
"""
Initialize audio player.
Parameters:
- source: Audio source to play
- client: Voice client
- after: Callback when playback finishes
"""
def start(self) -> None:
"""Start audio playback."""
def stop(self) -> None:
"""Stop audio playback."""
def pause(self, *, update_resume: bool = True) -> None:
"""
Pause audio playback.
Parameters:
- update_resume: Whether to update resume timestamp
"""
def resume(self, *, update_pause: bool = True) -> None:
"""
Resume audio playback.
Parameters:
- update_pause: Whether to update pause timestamp
"""
def is_playing(self) -> bool:
"""Check if audio is playing."""
def is_paused(self) -> bool:
"""Check if audio is paused."""
@property
def source(self) -> AudioSource:
"""Audio source being played."""Opus codec utilities for voice audio encoding and decoding.
class Encoder:
"""Opus encoder for audio compression."""
def __init__(
self,
sampling_rate: int = 48000,
channels: int = 2,
application: int = Application.audio
):
"""
Initialize Opus encoder.
Parameters:
- sampling_rate: Audio sampling rate
- channels: Number of audio channels
- application: Opus application type
"""
def encode(self, pcm: bytes, frame_size: int) -> bytes:
"""
Encode PCM audio to Opus.
Parameters:
- pcm: PCM audio data
- frame_size: Frame size in samples
Returns:
Opus-encoded audio data
"""
def set_bitrate(self, kbps: int) -> None:
"""
Set encoder bitrate.
Parameters:
- kbps: Bitrate in kilobits per second
"""
def set_bandwidth(self, req: int) -> None:
"""
Set encoder bandwidth.
Parameters:
- req: Bandwidth setting
"""
def set_signal_type(self, req: int) -> None:
"""
Set signal type.
Parameters:
- req: Signal type (voice/music)
"""
class Decoder:
"""Opus decoder for audio decompression."""
def __init__(self, sampling_rate: int = 48000, channels: int = 2):
"""
Initialize Opus decoder.
Parameters:
- sampling_rate: Audio sampling rate
- channels: Number of audio channels
"""
def decode(self, opus: bytes, *, decode_fec: bool = False) -> bytes:
"""
Decode Opus audio to PCM.
Parameters:
- opus: Opus-encoded audio data
- decode_fec: Whether to decode FEC data
Returns:
PCM audio data
"""
@staticmethod
def packet_get_bandwidth(data: bytes) -> int:
"""Get packet bandwidth."""
@staticmethod
def packet_get_nb_channels(data: bytes) -> int:
"""Get packet channel count."""
@staticmethod
def packet_get_nb_frames(data: bytes, frame_size: int) -> int:
"""Get packet frame count."""
@staticmethod
def packet_get_samples_per_frame(data: bytes, sampling_rate: int) -> int:
"""Get samples per frame."""
def is_loaded() -> bool:
"""
Check if Opus library is loaded.
Returns:
True if Opus is available
"""
def load_opus(name: str) -> None:
"""
Load Opus library.
Parameters:
- name: Library name or path
"""Voice server regions and quality settings for optimal voice performance.
class VoiceRegion:
"""Voice server region information."""
def __init__(self): ...
id: str
name: str
vip: bool
optimal: bool
deprecated: bool
custom: bool
def __str__(self) -> str:
return self.name
class VideoQualityMode(enum.Enum):
"""Video quality modes for voice channels."""
auto = 1
full = 2
async def discover_voice_regions(guild_id: int) -> List[VoiceRegion]:
"""
Discover available voice regions for a guild.
Parameters:
- guild_id: Guild ID
Returns:
List of available voice regions
"""Voice channel effects and audio modifications.
class VoiceChannelEffect:
"""Voice channel effect configuration."""
def __init__(self): ...
emoji: Optional[PartialEmoji]
animation_type: Optional[VoiceChannelEffectAnimationType]
animation_id: Optional[int]
user_id: Optional[int]
class VoiceChannelEffectAnimationType(enum.Enum):
"""Voice channel effect animation types."""
premium = 0
basic = 1
async def send_voice_channel_effect(
channel: VoiceChannel,
emoji: Union[str, Emoji, PartialEmoji],
*,
animation_type: VoiceChannelEffectAnimationType = VoiceChannelEffectAnimationType.premium
) -> None:
"""
Send voice channel effect.
Parameters:
- channel: Voice channel
- emoji: Effect emoji
- animation_type: Animation type
"""Voice-related events for monitoring voice activity and connections.
@bot.event
async def on_voice_state_update(member: Member, before: VoiceState, after: VoiceState):
"""
Called when member voice state changes.
Parameters:
- member: Member whose voice state changed
- before: Previous voice state
- after: New voice state
"""
@bot.event
async def on_voice_channel_effect(effect: VoiceChannelEffect):
"""
Called when voice channel effect is sent.
Parameters:
- effect: Voice channel effect data
"""
# Voice client events (when using voice)
async def on_voice_ready():
"""Called when voice connection is ready."""
async def on_voice_disconnect(error: Optional[Exception]):
"""
Called when voice connection disconnects.
Parameters:
- error: Disconnection error if any
"""import disnake
from disnake.ext import commands
import asyncio
import youtube_dl
import os
# Suppress noise about console usage from errors
youtube_dl.utils.bug_reports_message = lambda: ''
ytdl_format_options = {
'format': 'bestaudio/best',
'outtmpl': '%(extractor)s-%(id)s-%(title)s.%(ext)s',
'restrictfilenames': True,
'noplaylist': True,
'nocheckcertificate': True,
'ignoreerrors': False,
'logtostderr': False,
'quiet': True,
'no_warnings': True,
'default_search': 'auto',
'source_address': '0.0.0.0'
}
ffmpeg_options = {
'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
'options': '-vn'
}
ytdl = youtube_dl.YoutubeDL(ytdl_format_options)
class YTDLSource(disnake.PCMVolumeTransformer):
def __init__(self, source, *, data, volume=0.5):
super().__init__(source, volume)
self.data = data
self.title = data.get('title')
self.url = data.get('url')
@classmethod
async def from_url(cls, url, *, loop=None, stream=False):
loop = loop or asyncio.get_event_loop()
data = await loop.run_in_executor(None, lambda: ytdl.extract_info(url, download=not stream))
if 'entries' in data:
# Take first item from a playlist
data = data['entries'][0]
filename = data['url'] if stream else ytdl.prepare_filename(data)
return cls(disnake.FFmpegPCMAudio(filename, **ffmpeg_options), data=data)
class Music(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.queue = {}
self.current = {}
@commands.command()
async def join(self, ctx, *, channel: disnake.VoiceChannel = None):
"""Join a voice channel."""
if channel is None:
if ctx.author.voice:
channel = ctx.author.voice.channel
else:
return await ctx.send("You need to specify a channel or be in one.")
if ctx.voice_client is not None:
return await ctx.voice_client.move_to(channel)
await channel.connect()
await ctx.send(f"Connected to {channel}")
@commands.command()
async def play(self, ctx, *, url):
"""Play audio from a URL or search query."""
if not ctx.voice_client:
if ctx.author.voice:
await ctx.author.voice.channel.connect()
else:
return await ctx.send("You need to be in a voice channel!")
async with ctx.typing():
try:
player = await YTDLSource.from_url(url, loop=self.bot.loop, stream=True)
ctx.voice_client.play(player, after=lambda e: print(f'Player error: {e}') if e else None)
self.current[ctx.guild.id] = player
await ctx.send(f'**Now playing:** {player.title}')
except Exception as e:
await ctx.send(f'An error occurred: {e}')
@commands.command()
async def volume(self, ctx, volume: int):
"""Change the player volume (0-100)."""
if ctx.voice_client is None:
return await ctx.send("Not connected to a voice channel.")
if not 0 <= volume <= 100:
return await ctx.send("Volume must be between 0 and 100.")
ctx.voice_client.source.volume = volume / 100
await ctx.send(f"Changed volume to {volume}%")
@commands.command()
async def stop(self, ctx):
"""Stop the currently playing audio."""
if ctx.voice_client:
ctx.voice_client.stop()
await ctx.send("⏹️ Stopped playback")
@commands.command()
async def pause(self, ctx):
"""Pause the currently playing audio."""
if ctx.voice_client and ctx.voice_client.is_playing():
ctx.voice_client.pause()
await ctx.send("⏸️ Paused playback")
@commands.command()
async def resume(self, ctx):
"""Resume paused audio."""
if ctx.voice_client and ctx.voice_client.is_paused():
ctx.voice_client.resume()
await ctx.send("▶️ Resumed playback")
@commands.command()
async def leave(self, ctx):
"""Disconnect from voice channel."""
if ctx.voice_client:
await ctx.voice_client.disconnect()
await ctx.send("👋 Disconnected from voice channel")
@commands.command()
async def now_playing(self, ctx):
"""Show currently playing track."""
if ctx.guild.id in self.current:
player = self.current[ctx.guild.id]
embed = disnake.Embed(title="🎵 Now Playing", description=player.title)
embed.add_field(name="Volume", value=f"{int(player.volume * 100)}%")
if ctx.voice_client:
if ctx.voice_client.is_playing():
embed.color = 0x00ff00
embed.set_footer(text="Playing")
elif ctx.voice_client.is_paused():
embed.color = 0xffaa00
embed.set_footer(text="Paused")
await ctx.send(embed=embed)
else:
await ctx.send("Nothing is currently playing.")
@play.before_invoke
async def ensure_voice(self, ctx):
"""Ensure voice connection before playing."""
if ctx.voice_client is None:
if ctx.author.voice:
await ctx.author.voice.channel.connect()
else:
await ctx.send("You are not connected to a voice channel.")
raise commands.CommandError("Author not connected to a voice channel.")
elif ctx.voice_client.is_playing():
ctx.voice_client.stop()
bot = commands.Bot(command_prefix='!', intents=disnake.Intents.all())
bot.add_cog(Music(bot))
bot.run('YOUR_BOT_TOKEN')import asyncio
from collections import deque
from typing import Optional, Dict, List
class MusicQueue:
"""Music queue management."""
def __init__(self):
self.queue = deque()
self.history = deque(maxlen=10)
self.repeat_mode = 0 # 0=off, 1=track, 2=queue
self.shuffle = False
self.volume = 0.5
def add(self, item):
"""Add item to queue."""
self.queue.append(item)
def next(self):
"""Get next item from queue."""
if not self.queue:
return None
if self.repeat_mode == 2 and len(self.queue) == 1:
# Queue repeat with single item
return self.queue[0]
item = self.queue.popleft()
if self.repeat_mode == 2:
# Add back to end for queue repeat
self.queue.append(item)
return item
def clear(self):
"""Clear the queue."""
self.queue.clear()
def remove(self, index: int):
"""Remove item at index."""
if 0 <= index < len(self.queue):
del self.queue[index]
def __len__(self):
return len(self.queue)
def __iter__(self):
return iter(self.queue)
class AdvancedMusic(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.queues: Dict[int, MusicQueue] = {}
self.current_players: Dict[int, YTDLSource] = {}
def get_queue(self, guild_id: int) -> MusicQueue:
"""Get or create queue for guild."""
if guild_id not in self.queues:
self.queues[guild_id] = MusicQueue()
return self.queues[guild_id]
async def play_next(self, ctx):
"""Play next track in queue."""
guild_id = ctx.guild.id
queue = self.get_queue(guild_id)
if not ctx.voice_client:
return
# Handle repeat single
if queue.repeat_mode == 1 and guild_id in self.current_players:
current = self.current_players[guild_id]
player = await YTDLSource.from_url(current.data['webpage_url'], loop=self.bot.loop, stream=True)
player.volume = queue.volume
else:
next_item = queue.next()
if not next_item:
await ctx.send("Queue is empty. Playback finished.")
return
player = await YTDLSource.from_url(next_item['url'], loop=self.bot.loop, stream=True)
player.volume = queue.volume
# Add to history
if guild_id in self.current_players:
queue.history.append(self.current_players[guild_id])
self.current_players[guild_id] = player
ctx.voice_client.play(
player,
after=lambda e: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop) if not e else print(f'Player error: {e}')
)
embed = disnake.Embed(title="🎵 Now Playing", description=player.title, color=0x00ff00)
embed.add_field(name="Tracks in Queue", value=len(queue), inline=True)
embed.add_field(name="Volume", value=f"{int(player.volume * 100)}%", inline=True)
repeat_text = ["Off", "Track", "Queue"][queue.repeat_mode]
embed.add_field(name="Repeat", value=repeat_text, inline=True)
await ctx.send(embed=embed)
@commands.command()
async def queue(self, ctx, *, query):
"""Add a track to the queue."""
if not ctx.voice_client:
if ctx.author.voice:
await ctx.author.voice.channel.connect()
else:
return await ctx.send("You need to be in a voice channel!")
async with ctx.typing():
try:
# Extract info without downloading
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(None, lambda: ytdl.extract_info(query, download=False))
if 'entries' in data:
# Playlist
entries = data['entries'][:10] # Limit to 10 tracks
queue = self.get_queue(ctx.guild.id)
for entry in entries:
queue.add({
'url': entry['webpage_url'],
'title': entry['title'],
'duration': entry.get('duration', 0),
'requester': ctx.author.id
})
await ctx.send(f"Added {len(entries)} tracks to queue")
else:
# Single track
queue = self.get_queue(ctx.guild.id)
queue.add({
'url': data['webpage_url'],
'title': data['title'],
'duration': data.get('duration', 0),
'requester': ctx.author.id
})
await ctx.send(f"Added **{data['title']}** to queue (position {len(queue)})")
# Start playing if nothing is playing
if not ctx.voice_client.is_playing() and not ctx.voice_client.is_paused():
await self.play_next(ctx)
except Exception as e:
await ctx.send(f'An error occurred: {e}')
@commands.command(name='queue_list', aliases=['q', 'list'])
async def queue_list(self, ctx):
"""Show current queue."""
queue = self.get_queue(ctx.guild.id)
if len(queue) == 0:
return await ctx.send("Queue is empty.")
embed = disnake.Embed(title="🎵 Music Queue", color=0x00ff00)
# Show current track
if ctx.guild.id in self.current_players:
current = self.current_players[ctx.guild.id]
embed.add_field(
name="Now Playing",
value=f"**{current.title}**",
inline=False
)
# Show next tracks
queue_text = ""
for i, track in enumerate(list(queue)[:10]): # Show first 10
duration = f"{track['duration'] // 60}:{track['duration'] % 60:02d}" if track['duration'] else "Unknown"
queue_text += f"`{i+1}.` **{track['title']}** `[{duration}]`\n"
if queue_text:
embed.add_field(name="Up Next", value=queue_text, inline=False)
if len(queue) > 10:
embed.set_footer(text=f"... and {len(queue) - 10} more tracks")
await ctx.send(embed=embed)
@commands.command()
async def skip(self, ctx, amount: int = 1):
"""Skip current track or multiple tracks."""
if not ctx.voice_client or not ctx.voice_client.is_playing():
return await ctx.send("Nothing is playing.")
queue = self.get_queue(ctx.guild.id)
# Skip multiple tracks by removing from queue
for _ in range(amount - 1):
if len(queue) > 0:
queue.next()
ctx.voice_client.stop()
await ctx.send(f"⏭️ Skipped {amount} track(s)")
@commands.command()
async def remove(self, ctx, index: int):
"""Remove track from queue by index."""
queue = self.get_queue(ctx.guild.id)
if not 1 <= index <= len(queue):
return await ctx.send(f"Invalid index. Queue has {len(queue)} tracks.")
removed_track = list(queue)[index - 1]
queue.remove(index - 1)
await ctx.send(f"Removed **{removed_track['title']}** from queue")
@commands.command()
async def clear_queue(self, ctx):
"""Clear the entire queue."""
queue = self.get_queue(ctx.guild.id)
queue.clear()
await ctx.send("🗑️ Queue cleared")
@commands.command()
async def repeat(self, ctx, mode: str = None):
"""Set repeat mode (off/track/queue)."""
queue = self.get_queue(ctx.guild.id)
if mode is None:
modes = ["off", "track", "queue"]
current_mode = modes[queue.repeat_mode]
return await ctx.send(f"Current repeat mode: **{current_mode}**")
mode = mode.lower()
if mode in ['off', '0', 'none']:
queue.repeat_mode = 0
await ctx.send("🔁 Repeat mode: **Off**")
elif mode in ['track', '1', 'song']:
queue.repeat_mode = 1
await ctx.send("🔂 Repeat mode: **Track**")
elif mode in ['queue', '2', 'all']:
queue.repeat_mode = 2
await ctx.send("🔁 Repeat mode: **Queue**")
else:
await ctx.send("Invalid mode. Use: `off`, `track`, or `queue`")
@commands.command()
async def shuffle(self, ctx):
"""Toggle shuffle mode."""
import random
queue = self.get_queue(ctx.guild.id)
queue.shuffle = not queue.shuffle
if queue.shuffle:
# Shuffle current queue
queue_list = list(queue.queue)
random.shuffle(queue_list)
queue.queue = deque(queue_list)
await ctx.send("🔀 Shuffle: **On**")
else:
await ctx.send("🔀 Shuffle: **Off**")
@commands.command()
async def seek(self, ctx, timestamp: str):
"""Seek to timestamp (MM:SS format)."""
if not ctx.voice_client or not ctx.voice_client.is_playing():
return await ctx.send("Nothing is playing.")
try:
parts = timestamp.split(':')
if len(parts) == 2:
minutes, seconds = map(int, parts)
total_seconds = minutes * 60 + seconds
else:
total_seconds = int(parts[0])
# Note: This is simplified - actual seeking requires more complex FFmpeg handling
await ctx.send(f"⏩ Seeking to {timestamp} (restart with timestamp)")
# In a real implementation, you'd restart playback from the timestamp
except ValueError:
await ctx.send("Invalid timestamp format. Use MM:SS or seconds.")
@commands.command()
async def lyrics(self, ctx, *, query: str = None):
"""Get lyrics for current or specified song."""
if query is None:
if ctx.guild.id in self.current_players:
query = self.current_players[ctx.guild.id].title
else:
return await ctx.send("No song is playing. Specify a song name.")
# This would integrate with a lyrics API like Genius
await ctx.send(f"🎤 Searching lyrics for: **{query}**\n*Lyrics API integration needed*")
@commands.command()
async def history(self, ctx):
"""Show recently played tracks."""
queue = self.get_queue(ctx.guild.id)
if not queue.history:
return await ctx.send("No recent tracks.")
embed = disnake.Embed(title="🕐 Recently Played", color=0x0099ff)
history_text = ""
for i, track in enumerate(reversed(list(queue.history))):
history_text += f"`{i+1}.` **{track.title}**\n"
embed.description = history_text
await ctx.send(embed=embed)
# Error handling for voice
@bot.event
async def on_voice_state_update(member, before, after):
"""Handle voice state changes."""
# Auto-disconnect if bot is alone in voice channel
if member == bot.user:
return
voice_client = member.guild.voice_client
if voice_client and voice_client.channel:
# Check if bot is alone (only bot in voice channel)
members_in_voice = [m for m in voice_client.channel.members if not m.bot]
if len(members_in_voice) == 0:
# Wait a bit before disconnecting
await asyncio.sleep(30)
# Check again after delay
members_in_voice = [m for m in voice_client.channel.members if not m.bot]
if len(members_in_voice) == 0:
await voice_client.disconnect()
bot.add_cog(AdvancedMusic(bot))import wave
import io
from typing import Dict, List
class VoiceRecorder:
"""Record voice from Discord voice channels."""
def __init__(self, voice_client: disnake.VoiceClient):
self.voice_client = voice_client
self.recordings: Dict[int, List[bytes]] = {}
self.is_recording = False
def start_recording(self):
"""Start recording all users in voice channel."""
if self.is_recording:
return
self.is_recording = True
self.recordings.clear()
# This would require a custom voice receive implementation
# Discord bots cannot currently receive audio through the official API
print("Recording started (implementation needed)")
def stop_recording(self):
"""Stop recording and return audio data."""
if not self.is_recording:
return
self.is_recording = False
# Process recordings into WAV files
wav_files = {}
for user_id, audio_data in self.recordings.items():
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, 'wb') as wav_file:
wav_file.setnchannels(2) # Stereo
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(48000) # 48kHz
# Combine audio frames
combined_audio = b''.join(audio_data)
wav_file.writeframes(combined_audio)
wav_files[user_id] = wav_buffer.getvalue()
return wav_files
class VoiceEffects(commands.Cog):
"""Voice effects and processing commands."""
def __init__(self, bot):
self.bot = bot
@commands.command()
async def voice_effect(self, ctx, effect: str):
"""Apply voice effect to bot's audio."""
if not ctx.voice_client:
return await ctx.send("Not connected to voice channel.")
effects = {
'robot': '-af "afftfilt=real=\'hypot(re,im)*sin(0)\':imag=\'hypot(re,im)*cos(0)\':win_size=512:overlap=0.75"',
'echo': '-af "aecho=0.8:0.9:1000:0.3"',
'bass': '-af "bass=g=5"',
'treble': '-af "treble=g=5"',
'speed': '-af "atempo=1.5"',
'slow': '-af "atempo=0.75"',
'nightcore': '-af "aresample=48000,asetrate=48000*1.25"',
'deep': '-af "asetrate=22050,aresample=48000"'
}
if effect not in effects:
available = ', '.join(effects.keys())
return await ctx.send(f"Available effects: {available}")
# This would modify the FFmpeg options for the current audio source
await ctx.send(f"🎛️ Applied **{effect}** effect")
# Implementation would require restarting playback with new FFmpeg filter
@commands.command()
async def soundboard(self, ctx, sound: str):
"""Play soundboard effects."""
if not ctx.voice_client:
if ctx.author.voice:
await ctx.author.voice.channel.connect()
else:
return await ctx.send("You need to be in a voice channel!")
# Soundboard files directory
sound_file = f"sounds/{sound}.mp3"
if not os.path.exists(sound_file):
return await ctx.send(f"Sound '{sound}' not found.")
# Play sound effect
source = disnake.FFmpegPCMAudio(sound_file, **ffmpeg_options)
if ctx.voice_client.is_playing():
ctx.voice_client.stop()
ctx.voice_client.play(source)
await ctx.send(f"🔊 Playing sound: **{sound}**")
@commands.command()
async def tts(self, ctx, *, text: str):
"""Text-to-speech in voice channel."""
if not ctx.voice_client:
if ctx.author.voice:
await ctx.author.voice.channel.connect()
else:
return await ctx.send("You need to be in a voice channel!")
if len(text) > 200:
return await ctx.send("Text too long (max 200 characters).")
# This would use a TTS service like gTTS
# For demo purposes, just acknowledge
await ctx.send(f"🗣️ TTS: {text[:50]}{'...' if len(text) > 50 else ''}")
# Implementation would:
# 1. Generate TTS audio file
# 2. Play through voice client
# 3. Clean up temporary file
@commands.command()
async def voice_status(self, ctx):
"""Show voice connection status and stats."""
if not ctx.voice_client:
return await ctx.send("Not connected to voice.")
vc = ctx.voice_client
embed = disnake.Embed(title="🔊 Voice Status", color=0x00ff00)
embed.add_field(name="Channel", value=vc.channel.mention, inline=True)
embed.add_field(name="Latency", value=f"{vc.latency*1000:.2f}ms", inline=True)
embed.add_field(name="Average Latency", value=f"{vc.average_latency*1000:.2f}ms", inline=True)
status_text = []
if vc.is_connected():
status_text.append("✅ Connected")
if vc.is_playing():
status_text.append("▶️ Playing")
if vc.is_paused():
status_text.append("⏸️ Paused")
embed.add_field(name="Status", value=" | ".join(status_text) or "Idle", inline=False)
# Voice channel members
members = [m.display_name for m in vc.channel.members if not m.bot]
if members:
embed.add_field(name=f"Members ({len(members)})", value=", ".join(members), inline=False)
await ctx.send(embed=embed)
# Voice channel management
@bot.command()
async def create_voice(ctx, *, name: str):
"""Create a temporary voice channel."""
if not ctx.author.guild_permissions.manage_channels:
return await ctx.send("You don't have permission to manage channels.")
# Create temporary voice channel
overwrites = {
ctx.guild.default_role: disnake.PermissionOverwrite(view_channel=True),
ctx.author: disnake.PermissionOverwrite(manage_channels=True)
}
channel = await ctx.guild.create_voice_channel(
name=f"🔊 {name}",
overwrites=overwrites,
reason=f"Temporary voice channel created by {ctx.author}"
)
await ctx.send(f"Created temporary voice channel: {channel.mention}")
# Auto-delete when empty (would need a background task)
@bot.command()
async def voice_info(ctx, channel: disnake.VoiceChannel = None):
"""Get information about a voice channel."""
if channel is None:
if ctx.author.voice:
channel = ctx.author.voice.channel
else:
return await ctx.send("Specify a voice channel or join one.")
embed = disnake.Embed(title=f"🔊 {channel.name}", color=0x0099ff)
embed.add_field(name="ID", value=channel.id, inline=True)
embed.add_field(name="Bitrate", value=f"{channel.bitrate}bps", inline=True)
embed.add_field(name="User Limit", value=channel.user_limit or "No limit", inline=True)
embed.add_field(name="Members", value=len(channel.members), inline=True)
embed.add_field(name="Created", value=f"<t:{int(channel.created_at.timestamp())}:F>", inline=True)
if channel.rtc_region:
embed.add_field(name="Region", value=channel.rtc_region, inline=True)
# List members
if channel.members:
member_list = []
for member in channel.members:
status = []
if member.voice.deaf:
status.append("🔇")
if member.voice.mute:
status.append("🤐")
if member.voice.self_deaf:
status.append("🙉")
if member.voice.self_mute:
status.append("🤫")
if member.voice.streaming:
status.append("📹")
member_list.append(f"{member.display_name} {''.join(status)}")
embed.add_field(name="Connected Members", value="\n".join(member_list), inline=False)
await ctx.send(embed=embed)
bot.add_cog(VoiceEffects(bot))Install with Tessl CLI
npx tessl i tessl/pypi-disnake