CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-disnake

A modern, easy-to-use, feature-rich async-ready API wrapper for Discord written in Python

Pending
Overview
Eval results
Files

voice-audio.mddocs/

Voice and Audio

Voice channel connection, audio streaming, and voice-related functionality for music bots and voice applications with comprehensive audio handling, connection management, and voice client operations for Discord voice features.

Capabilities

Voice Connections

Voice channel connection and management for audio streaming and voice functionality.

class VoiceClient:
    """Voice connection to a Discord voice channel."""
    
    def __init__(self, client: Client, channel: VoiceChannel):
        """
        Initialize voice client.
        
        Parameters:
        - client: Bot client
        - channel: Voice channel to connect to
        """

    channel: VoiceChannel
    guild: Guild
    user: ClientUser
    session_id: str
    token: str
    endpoint: str
    socket: VoiceWebSocket
    loop: asyncio.AbstractEventLoop
    _runner: asyncio.Task
    _player: Optional[AudioPlayer]

    @property
    def latency(self) -> float:
        """Voice connection latency in seconds."""

    @property
    def average_latency(self) -> float:
        """Average voice connection latency."""

    def is_connected(self) -> bool:
        """
        Check if voice client is connected.
        
        Returns:
        True if connected to voice
        """

    def is_playing(self) -> bool:
        """
        Check if audio is currently playing.
        
        Returns:
        True if audio is playing
        """

    def is_paused(self) -> bool:
        """
        Check if audio playback is paused.
        
        Returns:
        True if audio is paused
        """

    async def connect(
        self,
        *,
        timeout: float = 60.0,
        reconnect: bool = True,
        self_deaf: bool = False,
        self_mute: bool = False
    ) -> None:
        """
        Connect to the voice channel.
        
        Parameters:
        - timeout: Connection timeout
        - reconnect: Whether to reconnect on disconnection
        - self_deaf: Whether to self-deafen
        - self_mute: Whether to self-mute
        """

    async def disconnect(self, *, force: bool = False) -> None:
        """
        Disconnect from voice channel.
        
        Parameters:
        - force: Force disconnection without cleanup
        """

    async def move_to(self, channel: Optional[VoiceChannel]) -> None:
        """
        Move to a different voice channel.
        
        Parameters:
        - channel: New voice channel (None to disconnect)
        """

    def play(
        self,
        source: AudioSource,
        *,
        after: Optional[Callable[[Optional[Exception]], None]] = None
    ) -> None:
        """
        Play audio from source.
        
        Parameters:
        - source: Audio source to play
        - after: Callback when playback finishes
        """

    def stop(self) -> None:
        """Stop audio playback."""

    def pause(self) -> None:
        """Pause audio playback."""

    def resume(self) -> None:
        """Resume audio playback."""

    @property
    def source(self) -> Optional[AudioSource]:
        """Currently playing audio source."""

    def send_audio_packet(self, data: bytes, *, encode: bool = True) -> None:
        """
        Send raw audio packet.
        
        Parameters:
        - data: Audio data
        - encode: Whether to encode with Opus
        """

    async def ws_connect(self, host: str, port: int) -> VoiceWebSocket:
        """
        Connect to voice WebSocket.
        
        Parameters:
        - host: Voice server host
        - port: Voice server port
        
        Returns:
        Voice WebSocket connection
        """

class VoiceProtocol:
    """Base protocol for voice connections."""
    
    def __init__(self, client: Client):
        self.client = client

    async def connect(self, channel: VoiceChannel) -> VoiceClient:
        """Connect to voice channel."""

    async def disconnect(self) -> None:
        """Disconnect from voice."""

Audio Sources

Audio source classes for different types of audio input and streaming.

class AudioSource:
    """Base class for audio sources."""
    
    def read(self) -> bytes:
        """
        Read audio data.
        
        Returns:
        Audio frame data (20ms of audio)
        """

    def cleanup(self) -> None:
        """Clean up audio source resources."""

    def is_opus(self) -> bool:
        """
        Check if source provides Opus-encoded audio.
        
        Returns:
        True if Opus-encoded
        """

class FFmpegAudio(AudioSource):
    """Audio source using FFmpeg for processing."""
    
    def __init__(
        self,
        source: Union[str, io.BufferedIOBase],
        *,
        executable: str = 'ffmpeg',
        pipe: bool = False,
        stderr: Optional[io.TextIOBase] = None,
        before_options: Optional[str] = None,
        options: Optional[str] = None
    ):
        """
        Initialize FFmpeg audio source.
        
        Parameters:
        - source: Audio file path or stream
        - executable: FFmpeg executable path
        - pipe: Whether to use pipe input
        - stderr: Stderr stream for FFmpeg
        - before_options: FFmpeg input options
        - options: FFmpeg output options
        """

    @classmethod
    def from_probe(
        cls,
        source: Union[str, io.BufferedIOBase],
        **kwargs
    ) -> FFmpegAudio:
        """
        Create FFmpeg source with automatic format detection.
        
        Parameters:
        - source: Audio source
        - kwargs: Additional FFmpeg options
        
        Returns:
        FFmpeg audio source
        """

class FFmpegPCMAudio(FFmpegAudio):
    """FFmpeg audio source outputting PCM audio."""
    
    def __init__(
        self,
        source: Union[str, io.BufferedIOBase],
        **kwargs
    ):
        """
        Initialize FFmpeg PCM audio source.
        
        Parameters:
        - source: Audio source
        - kwargs: FFmpeg options
        """

class FFmpegOpusAudio(FFmpegAudio):
    """FFmpeg audio source outputting Opus audio."""
    
    def __init__(
        self,
        source: Union[str, io.BufferedIOBase],
        **kwargs
    ):
        """
        Initialize FFmpeg Opus audio source.
        
        Parameters:
        - source: Audio source
        - kwargs: FFmpeg options
        """

class PCMAudio(AudioSource):
    """Raw PCM audio source."""
    
    def __init__(self, stream: io.BufferedIOBase):
        """
        Initialize PCM audio source.
        
        Parameters:
        - stream: PCM audio stream
        """

class PCMVolumeTransformer(AudioSource):
    """Audio source with volume transformation."""
    
    def __init__(self, original: AudioSource, volume: float = 1.0):
        """
        Initialize volume transformer.
        
        Parameters:
        - original: Original audio source
        - volume: Volume multiplier (0.0-2.0)
        """

    @property
    def volume(self) -> float:
        """Current volume level."""

    @volume.setter
    def volume(self, value: float) -> None:
        """Set volume level."""

class AudioPlayer:
    """Audio player for managing playback."""
    
    def __init__(
        self,
        source: AudioSource,
        client: VoiceClient,
        *,
        after: Optional[Callable[[Optional[Exception]], None]] = None
    ):
        """
        Initialize audio player.
        
        Parameters:
        - source: Audio source to play
        - client: Voice client
        - after: Callback when playback finishes
        """

    def start(self) -> None:
        """Start audio playback."""

    def stop(self) -> None:
        """Stop audio playback."""

    def pause(self, *, update_resume: bool = True) -> None:
        """
        Pause audio playback.
        
        Parameters:
        - update_resume: Whether to update resume timestamp
        """

    def resume(self, *, update_pause: bool = True) -> None:
        """
        Resume audio playback.
        
        Parameters:
        - update_pause: Whether to update pause timestamp
        """

    def is_playing(self) -> bool:
        """Check if audio is playing."""

    def is_paused(self) -> bool:
        """Check if audio is paused."""

    @property
    def source(self) -> AudioSource:
        """Audio source being played."""

Opus Audio Codec

Opus codec utilities for voice audio encoding and decoding.

class Encoder:
    """Opus encoder for audio compression."""
    
    def __init__(
        self,
        sampling_rate: int = 48000,
        channels: int = 2,
        application: int = Application.audio
    ):
        """
        Initialize Opus encoder.
        
        Parameters:
        - sampling_rate: Audio sampling rate
        - channels: Number of audio channels
        - application: Opus application type
        """

    def encode(self, pcm: bytes, frame_size: int) -> bytes:
        """
        Encode PCM audio to Opus.
        
        Parameters:
        - pcm: PCM audio data
        - frame_size: Frame size in samples
        
        Returns:
        Opus-encoded audio data
        """

    def set_bitrate(self, kbps: int) -> None:
        """
        Set encoder bitrate.
        
        Parameters:
        - kbps: Bitrate in kilobits per second
        """

    def set_bandwidth(self, req: int) -> None:
        """
        Set encoder bandwidth.
        
        Parameters:
        - req: Bandwidth setting
        """

    def set_signal_type(self, req: int) -> None:
        """
        Set signal type.
        
        Parameters:
        - req: Signal type (voice/music)
        """

class Decoder:
    """Opus decoder for audio decompression."""
    
    def __init__(self, sampling_rate: int = 48000, channels: int = 2):
        """
        Initialize Opus decoder.
        
        Parameters:
        - sampling_rate: Audio sampling rate
        - channels: Number of audio channels
        """

    def decode(self, opus: bytes, *, decode_fec: bool = False) -> bytes:
        """
        Decode Opus audio to PCM.
        
        Parameters:
        - opus: Opus-encoded audio data
        - decode_fec: Whether to decode FEC data
        
        Returns:
        PCM audio data
        """

    @staticmethod
    def packet_get_bandwidth(data: bytes) -> int:
        """Get packet bandwidth."""

    @staticmethod
    def packet_get_nb_channels(data: bytes) -> int:
        """Get packet channel count."""

    @staticmethod
    def packet_get_nb_frames(data: bytes, frame_size: int) -> int:
        """Get packet frame count."""

    @staticmethod
    def packet_get_samples_per_frame(data: bytes, sampling_rate: int) -> int:
        """Get samples per frame."""

def is_loaded() -> bool:
    """
    Check if Opus library is loaded.
    
    Returns:
    True if Opus is available
    """

def load_opus(name: str) -> None:
    """
    Load Opus library.
    
    Parameters:
    - name: Library name or path
    """

Voice Regions and Quality

Voice server regions and quality settings for optimal voice performance.

class VoiceRegion:
    """Voice server region information."""
    
    def __init__(self): ...

    id: str
    name: str
    vip: bool
    optimal: bool
    deprecated: bool
    custom: bool

    def __str__(self) -> str:
        return self.name

class VideoQualityMode(enum.Enum):
    """Video quality modes for voice channels."""
    
    auto = 1
    full = 2

async def discover_voice_regions(guild_id: int) -> List[VoiceRegion]:
    """
    Discover available voice regions for a guild.
    
    Parameters:
    - guild_id: Guild ID
    
    Returns:
    List of available voice regions
    """

Voice Channel Effects

Voice channel effects and audio modifications.

class VoiceChannelEffect:
    """Voice channel effect configuration."""
    
    def __init__(self): ...

    emoji: Optional[PartialEmoji]
    animation_type: Optional[VoiceChannelEffectAnimationType]
    animation_id: Optional[int]
    user_id: Optional[int]

class VoiceChannelEffectAnimationType(enum.Enum):
    """Voice channel effect animation types."""
    
    premium = 0
    basic = 1

async def send_voice_channel_effect(
    channel: VoiceChannel,
    emoji: Union[str, Emoji, PartialEmoji],
    *,
    animation_type: VoiceChannelEffectAnimationType = VoiceChannelEffectAnimationType.premium
) -> None:
    """
    Send voice channel effect.
    
    Parameters:
    - channel: Voice channel
    - emoji: Effect emoji
    - animation_type: Animation type
    """

Voice Events

Voice-related events for monitoring voice activity and connections.

@bot.event
async def on_voice_state_update(member: Member, before: VoiceState, after: VoiceState):
    """
    Called when member voice state changes.
    
    Parameters:
    - member: Member whose voice state changed
    - before: Previous voice state
    - after: New voice state
    """

@bot.event
async def on_voice_channel_effect(effect: VoiceChannelEffect):
    """
    Called when voice channel effect is sent.
    
    Parameters:
    - effect: Voice channel effect data
    """

# Voice client events (when using voice)
async def on_voice_ready():
    """Called when voice connection is ready."""

async def on_voice_disconnect(error: Optional[Exception]):
    """
    Called when voice connection disconnects.
    
    Parameters:
    - error: Disconnection error if any
    """

Usage Examples

Basic Music Bot

import disnake
from disnake.ext import commands
import asyncio
import youtube_dl
import os

# Suppress noise about console usage from errors
youtube_dl.utils.bug_reports_message = lambda: ''

ytdl_format_options = {
    'format': 'bestaudio/best',
    'outtmpl': '%(extractor)s-%(id)s-%(title)s.%(ext)s',
    'restrictfilenames': True,
    'noplaylist': True,
    'nocheckcertificate': True,
    'ignoreerrors': False,
    'logtostderr': False,
    'quiet': True,
    'no_warnings': True,
    'default_search': 'auto',
    'source_address': '0.0.0.0'
}

ffmpeg_options = {
    'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
    'options': '-vn'
}

ytdl = youtube_dl.YoutubeDL(ytdl_format_options)

class YTDLSource(disnake.PCMVolumeTransformer):
    def __init__(self, source, *, data, volume=0.5):
        super().__init__(source, volume)
        self.data = data
        self.title = data.get('title')
        self.url = data.get('url')

    @classmethod
    async def from_url(cls, url, *, loop=None, stream=False):
        loop = loop or asyncio.get_event_loop()
        data = await loop.run_in_executor(None, lambda: ytdl.extract_info(url, download=not stream))

        if 'entries' in data:
            # Take first item from a playlist
            data = data['entries'][0]

        filename = data['url'] if stream else ytdl.prepare_filename(data)
        return cls(disnake.FFmpegPCMAudio(filename, **ffmpeg_options), data=data)

class Music(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.queue = {}
        self.current = {}

    @commands.command()
    async def join(self, ctx, *, channel: disnake.VoiceChannel = None):
        """Join a voice channel."""
        if channel is None:
            if ctx.author.voice:
                channel = ctx.author.voice.channel
            else:
                return await ctx.send("You need to specify a channel or be in one.")

        if ctx.voice_client is not None:
            return await ctx.voice_client.move_to(channel)

        await channel.connect()
        await ctx.send(f"Connected to {channel}")

    @commands.command()
    async def play(self, ctx, *, url):
        """Play audio from a URL or search query."""
        if not ctx.voice_client:
            if ctx.author.voice:
                await ctx.author.voice.channel.connect()
            else:
                return await ctx.send("You need to be in a voice channel!")

        async with ctx.typing():
            try:
                player = await YTDLSource.from_url(url, loop=self.bot.loop, stream=True)
                ctx.voice_client.play(player, after=lambda e: print(f'Player error: {e}') if e else None)

                self.current[ctx.guild.id] = player
                await ctx.send(f'**Now playing:** {player.title}')

            except Exception as e:
                await ctx.send(f'An error occurred: {e}')

    @commands.command()
    async def volume(self, ctx, volume: int):
        """Change the player volume (0-100)."""
        if ctx.voice_client is None:
            return await ctx.send("Not connected to a voice channel.")

        if not 0 <= volume <= 100:
            return await ctx.send("Volume must be between 0 and 100.")

        ctx.voice_client.source.volume = volume / 100
        await ctx.send(f"Changed volume to {volume}%")

    @commands.command()
    async def stop(self, ctx):
        """Stop the currently playing audio."""
        if ctx.voice_client:
            ctx.voice_client.stop()
            await ctx.send("⏹️ Stopped playback")

    @commands.command()
    async def pause(self, ctx):
        """Pause the currently playing audio."""
        if ctx.voice_client and ctx.voice_client.is_playing():
            ctx.voice_client.pause()
            await ctx.send("⏸️ Paused playback")

    @commands.command()
    async def resume(self, ctx):
        """Resume paused audio."""
        if ctx.voice_client and ctx.voice_client.is_paused():
            ctx.voice_client.resume()
            await ctx.send("▶️ Resumed playback")

    @commands.command()
    async def leave(self, ctx):
        """Disconnect from voice channel."""
        if ctx.voice_client:
            await ctx.voice_client.disconnect()
            await ctx.send("👋 Disconnected from voice channel")

    @commands.command()
    async def now_playing(self, ctx):
        """Show currently playing track."""
        if ctx.guild.id in self.current:
            player = self.current[ctx.guild.id]
            embed = disnake.Embed(title="🎵 Now Playing", description=player.title)
            embed.add_field(name="Volume", value=f"{int(player.volume * 100)}%")
            
            if ctx.voice_client:
                if ctx.voice_client.is_playing():
                    embed.color = 0x00ff00
                    embed.set_footer(text="Playing")
                elif ctx.voice_client.is_paused():
                    embed.color = 0xffaa00
                    embed.set_footer(text="Paused")
            
            await ctx.send(embed=embed)
        else:
            await ctx.send("Nothing is currently playing.")

    @play.before_invoke
    async def ensure_voice(self, ctx):
        """Ensure voice connection before playing."""
        if ctx.voice_client is None:
            if ctx.author.voice:
                await ctx.author.voice.channel.connect()
            else:
                await ctx.send("You are not connected to a voice channel.")
                raise commands.CommandError("Author not connected to a voice channel.")
        elif ctx.voice_client.is_playing():
            ctx.voice_client.stop()

bot = commands.Bot(command_prefix='!', intents=disnake.Intents.all())
bot.add_cog(Music(bot))

bot.run('YOUR_BOT_TOKEN')

Advanced Music Bot with Queue

import asyncio
from collections import deque
from typing import Optional, Dict, List

class MusicQueue:
    """Music queue management."""
    
    def __init__(self):
        self.queue = deque()
        self.history = deque(maxlen=10)
        self.repeat_mode = 0  # 0=off, 1=track, 2=queue
        self.shuffle = False
        self.volume = 0.5
    
    def add(self, item):
        """Add item to queue."""
        self.queue.append(item)
    
    def next(self):
        """Get next item from queue."""
        if not self.queue:
            return None
        
        if self.repeat_mode == 2 and len(self.queue) == 1:
            # Queue repeat with single item
            return self.queue[0]
        
        item = self.queue.popleft()
        
        if self.repeat_mode == 2:
            # Add back to end for queue repeat
            self.queue.append(item)
        
        return item
    
    def clear(self):
        """Clear the queue."""
        self.queue.clear()
    
    def remove(self, index: int):
        """Remove item at index."""
        if 0 <= index < len(self.queue):
            del self.queue[index]
    
    def __len__(self):
        return len(self.queue)
    
    def __iter__(self):
        return iter(self.queue)

class AdvancedMusic(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.queues: Dict[int, MusicQueue] = {}
        self.current_players: Dict[int, YTDLSource] = {}
    
    def get_queue(self, guild_id: int) -> MusicQueue:
        """Get or create queue for guild."""
        if guild_id not in self.queues:
            self.queues[guild_id] = MusicQueue()
        return self.queues[guild_id]
    
    async def play_next(self, ctx):
        """Play next track in queue."""
        guild_id = ctx.guild.id
        queue = self.get_queue(guild_id)
        
        if not ctx.voice_client:
            return
        
        # Handle repeat single
        if queue.repeat_mode == 1 and guild_id in self.current_players:
            current = self.current_players[guild_id]
            player = await YTDLSource.from_url(current.data['webpage_url'], loop=self.bot.loop, stream=True)
            player.volume = queue.volume
        else:
            next_item = queue.next()
            if not next_item:
                await ctx.send("Queue is empty. Playback finished.")
                return
            
            player = await YTDLSource.from_url(next_item['url'], loop=self.bot.loop, stream=True)
            player.volume = queue.volume
            
            # Add to history
            if guild_id in self.current_players:
                queue.history.append(self.current_players[guild_id])
        
        self.current_players[guild_id] = player
        
        ctx.voice_client.play(
            player,
            after=lambda e: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop) if not e else print(f'Player error: {e}')
        )
        
        embed = disnake.Embed(title="🎵 Now Playing", description=player.title, color=0x00ff00)
        embed.add_field(name="Tracks in Queue", value=len(queue), inline=True)
        embed.add_field(name="Volume", value=f"{int(player.volume * 100)}%", inline=True)
        
        repeat_text = ["Off", "Track", "Queue"][queue.repeat_mode]
        embed.add_field(name="Repeat", value=repeat_text, inline=True)
        
        await ctx.send(embed=embed)

    @commands.command()
    async def queue(self, ctx, *, query):
        """Add a track to the queue."""
        if not ctx.voice_client:
            if ctx.author.voice:
                await ctx.author.voice.channel.connect()
            else:
                return await ctx.send("You need to be in a voice channel!")

        async with ctx.typing():
            try:
                # Extract info without downloading
                loop = asyncio.get_event_loop()
                data = await loop.run_in_executor(None, lambda: ytdl.extract_info(query, download=False))
                
                if 'entries' in data:
                    # Playlist
                    entries = data['entries'][:10]  # Limit to 10 tracks
                    queue = self.get_queue(ctx.guild.id)
                    
                    for entry in entries:
                        queue.add({
                            'url': entry['webpage_url'],
                            'title': entry['title'],
                            'duration': entry.get('duration', 0),
                            'requester': ctx.author.id
                        })
                    
                    await ctx.send(f"Added {len(entries)} tracks to queue")
                
                else:
                    # Single track
                    queue = self.get_queue(ctx.guild.id)
                    queue.add({
                        'url': data['webpage_url'],
                        'title': data['title'],
                        'duration': data.get('duration', 0),
                        'requester': ctx.author.id
                    })
                    
                    await ctx.send(f"Added **{data['title']}** to queue (position {len(queue)})")
                
                # Start playing if nothing is playing
                if not ctx.voice_client.is_playing() and not ctx.voice_client.is_paused():
                    await self.play_next(ctx)

            except Exception as e:
                await ctx.send(f'An error occurred: {e}')

    @commands.command(name='queue_list', aliases=['q', 'list'])
    async def queue_list(self, ctx):
        """Show current queue."""
        queue = self.get_queue(ctx.guild.id)
        
        if len(queue) == 0:
            return await ctx.send("Queue is empty.")
        
        embed = disnake.Embed(title="🎵 Music Queue", color=0x00ff00)
        
        # Show current track
        if ctx.guild.id in self.current_players:
            current = self.current_players[ctx.guild.id]
            embed.add_field(
                name="Now Playing",
                value=f"**{current.title}**",
                inline=False
            )
        
        # Show next tracks
        queue_text = ""
        for i, track in enumerate(list(queue)[:10]):  # Show first 10
            duration = f"{track['duration'] // 60}:{track['duration'] % 60:02d}" if track['duration'] else "Unknown"
            queue_text += f"`{i+1}.` **{track['title']}** `[{duration}]`\n"
        
        if queue_text:
            embed.add_field(name="Up Next", value=queue_text, inline=False)
        
        if len(queue) > 10:
            embed.set_footer(text=f"... and {len(queue) - 10} more tracks")
        
        await ctx.send(embed=embed)

    @commands.command()
    async def skip(self, ctx, amount: int = 1):
        """Skip current track or multiple tracks."""
        if not ctx.voice_client or not ctx.voice_client.is_playing():
            return await ctx.send("Nothing is playing.")
        
        queue = self.get_queue(ctx.guild.id)
        
        # Skip multiple tracks by removing from queue
        for _ in range(amount - 1):
            if len(queue) > 0:
                queue.next()
        
        ctx.voice_client.stop()
        await ctx.send(f"⏭️ Skipped {amount} track(s)")

    @commands.command()
    async def remove(self, ctx, index: int):
        """Remove track from queue by index."""
        queue = self.get_queue(ctx.guild.id)
        
        if not 1 <= index <= len(queue):
            return await ctx.send(f"Invalid index. Queue has {len(queue)} tracks.")
        
        removed_track = list(queue)[index - 1]
        queue.remove(index - 1)
        
        await ctx.send(f"Removed **{removed_track['title']}** from queue")

    @commands.command()
    async def clear_queue(self, ctx):
        """Clear the entire queue."""
        queue = self.get_queue(ctx.guild.id)
        queue.clear()
        await ctx.send("🗑️ Queue cleared")

    @commands.command()
    async def repeat(self, ctx, mode: str = None):
        """Set repeat mode (off/track/queue)."""
        queue = self.get_queue(ctx.guild.id)
        
        if mode is None:
            modes = ["off", "track", "queue"]
            current_mode = modes[queue.repeat_mode]
            return await ctx.send(f"Current repeat mode: **{current_mode}**")
        
        mode = mode.lower()
        if mode in ['off', '0', 'none']:
            queue.repeat_mode = 0
            await ctx.send("🔁 Repeat mode: **Off**")
        elif mode in ['track', '1', 'song']:
            queue.repeat_mode = 1
            await ctx.send("🔂 Repeat mode: **Track**")
        elif mode in ['queue', '2', 'all']:
            queue.repeat_mode = 2
            await ctx.send("🔁 Repeat mode: **Queue**")
        else:
            await ctx.send("Invalid mode. Use: `off`, `track`, or `queue`")

    @commands.command()
    async def shuffle(self, ctx):
        """Toggle shuffle mode."""
        import random
        
        queue = self.get_queue(ctx.guild.id)
        queue.shuffle = not queue.shuffle
        
        if queue.shuffle:
            # Shuffle current queue
            queue_list = list(queue.queue)
            random.shuffle(queue_list)
            queue.queue = deque(queue_list)
            await ctx.send("🔀 Shuffle: **On**")
        else:
            await ctx.send("🔀 Shuffle: **Off**")

    @commands.command()
    async def seek(self, ctx, timestamp: str):
        """Seek to timestamp (MM:SS format)."""
        if not ctx.voice_client or not ctx.voice_client.is_playing():
            return await ctx.send("Nothing is playing.")
        
        try:
            parts = timestamp.split(':')
            if len(parts) == 2:
                minutes, seconds = map(int, parts)
                total_seconds = minutes * 60 + seconds
            else:
                total_seconds = int(parts[0])
            
            # Note: This is simplified - actual seeking requires more complex FFmpeg handling
            await ctx.send(f"⏩ Seeking to {timestamp} (restart with timestamp)")
            
            # In a real implementation, you'd restart playback from the timestamp
            
        except ValueError:
            await ctx.send("Invalid timestamp format. Use MM:SS or seconds.")

    @commands.command()
    async def lyrics(self, ctx, *, query: str = None):
        """Get lyrics for current or specified song."""
        if query is None:
            if ctx.guild.id in self.current_players:
                query = self.current_players[ctx.guild.id].title
            else:
                return await ctx.send("No song is playing. Specify a song name.")
        
        # This would integrate with a lyrics API like Genius
        await ctx.send(f"🎤 Searching lyrics for: **{query}**\n*Lyrics API integration needed*")

    @commands.command()
    async def history(self, ctx):
        """Show recently played tracks."""
        queue = self.get_queue(ctx.guild.id)
        
        if not queue.history:
            return await ctx.send("No recent tracks.")
        
        embed = disnake.Embed(title="🕐 Recently Played", color=0x0099ff)
        
        history_text = ""
        for i, track in enumerate(reversed(list(queue.history))):
            history_text += f"`{i+1}.` **{track.title}**\n"
        
        embed.description = history_text
        await ctx.send(embed=embed)

# Error handling for voice
@bot.event
async def on_voice_state_update(member, before, after):
    """Handle voice state changes."""
    # Auto-disconnect if bot is alone in voice channel
    if member == bot.user:
        return
    
    voice_client = member.guild.voice_client
    if voice_client and voice_client.channel:
        # Check if bot is alone (only bot in voice channel)
        members_in_voice = [m for m in voice_client.channel.members if not m.bot]
        
        if len(members_in_voice) == 0:
            # Wait a bit before disconnecting
            await asyncio.sleep(30)
            
            # Check again after delay
            members_in_voice = [m for m in voice_client.channel.members if not m.bot]
            if len(members_in_voice) == 0:
                await voice_client.disconnect()

bot.add_cog(AdvancedMusic(bot))

Voice Recording and Processing

import wave
import io
from typing import Dict, List

class VoiceRecorder:
    """Record voice from Discord voice channels."""
    
    def __init__(self, voice_client: disnake.VoiceClient):
        self.voice_client = voice_client
        self.recordings: Dict[int, List[bytes]] = {}
        self.is_recording = False
    
    def start_recording(self):
        """Start recording all users in voice channel."""
        if self.is_recording:
            return
        
        self.is_recording = True
        self.recordings.clear()
        
        # This would require a custom voice receive implementation
        # Discord bots cannot currently receive audio through the official API
        print("Recording started (implementation needed)")
    
    def stop_recording(self):
        """Stop recording and return audio data."""
        if not self.is_recording:
            return
        
        self.is_recording = False
        
        # Process recordings into WAV files
        wav_files = {}
        for user_id, audio_data in self.recordings.items():
            wav_buffer = io.BytesIO()
            with wave.open(wav_buffer, 'wb') as wav_file:
                wav_file.setnchannels(2)  # Stereo
                wav_file.setsampwidth(2)  # 16-bit
                wav_file.setframerate(48000)  # 48kHz
                
                # Combine audio frames
                combined_audio = b''.join(audio_data)
                wav_file.writeframes(combined_audio)
            
            wav_files[user_id] = wav_buffer.getvalue()
        
        return wav_files

class VoiceEffects(commands.Cog):
    """Voice effects and processing commands."""
    
    def __init__(self, bot):
        self.bot = bot
    
    @commands.command()
    async def voice_effect(self, ctx, effect: str):
        """Apply voice effect to bot's audio."""
        if not ctx.voice_client:
            return await ctx.send("Not connected to voice channel.")
        
        effects = {
            'robot': '-af "afftfilt=real=\'hypot(re,im)*sin(0)\':imag=\'hypot(re,im)*cos(0)\':win_size=512:overlap=0.75"',
            'echo': '-af "aecho=0.8:0.9:1000:0.3"',
            'bass': '-af "bass=g=5"',
            'treble': '-af "treble=g=5"',
            'speed': '-af "atempo=1.5"',
            'slow': '-af "atempo=0.75"',
            'nightcore': '-af "aresample=48000,asetrate=48000*1.25"',
            'deep': '-af "asetrate=22050,aresample=48000"'
        }
        
        if effect not in effects:
            available = ', '.join(effects.keys())
            return await ctx.send(f"Available effects: {available}")
        
        # This would modify the FFmpeg options for the current audio source
        await ctx.send(f"🎛️ Applied **{effect}** effect")
        # Implementation would require restarting playback with new FFmpeg filter

    @commands.command()
    async def soundboard(self, ctx, sound: str):
        """Play soundboard effects."""
        if not ctx.voice_client:
            if ctx.author.voice:
                await ctx.author.voice.channel.connect()
            else:
                return await ctx.send("You need to be in a voice channel!")
        
        # Soundboard files directory
        sound_file = f"sounds/{sound}.mp3"
        
        if not os.path.exists(sound_file):
            return await ctx.send(f"Sound '{sound}' not found.")
        
        # Play sound effect
        source = disnake.FFmpegPCMAudio(sound_file, **ffmpeg_options)
        
        if ctx.voice_client.is_playing():
            ctx.voice_client.stop()
        
        ctx.voice_client.play(source)
        await ctx.send(f"🔊 Playing sound: **{sound}**")

    @commands.command()
    async def tts(self, ctx, *, text: str):
        """Text-to-speech in voice channel."""
        if not ctx.voice_client:
            if ctx.author.voice:
                await ctx.author.voice.channel.connect()
            else:
                return await ctx.send("You need to be in a voice channel!")
        
        if len(text) > 200:
            return await ctx.send("Text too long (max 200 characters).")
        
        # This would use a TTS service like gTTS
        # For demo purposes, just acknowledge
        await ctx.send(f"🗣️ TTS: {text[:50]}{'...' if len(text) > 50 else ''}")
        
        # Implementation would:
        # 1. Generate TTS audio file
        # 2. Play through voice client
        # 3. Clean up temporary file

    @commands.command()
    async def voice_status(self, ctx):
        """Show voice connection status and stats."""
        if not ctx.voice_client:
            return await ctx.send("Not connected to voice.")
        
        vc = ctx.voice_client
        
        embed = disnake.Embed(title="🔊 Voice Status", color=0x00ff00)
        embed.add_field(name="Channel", value=vc.channel.mention, inline=True)
        embed.add_field(name="Latency", value=f"{vc.latency*1000:.2f}ms", inline=True)
        embed.add_field(name="Average Latency", value=f"{vc.average_latency*1000:.2f}ms", inline=True)
        
        status_text = []
        if vc.is_connected():
            status_text.append("✅ Connected")
        if vc.is_playing():
            status_text.append("▶️ Playing")
        if vc.is_paused():
            status_text.append("⏸️ Paused")
        
        embed.add_field(name="Status", value=" | ".join(status_text) or "Idle", inline=False)
        
        # Voice channel members
        members = [m.display_name for m in vc.channel.members if not m.bot]
        if members:
            embed.add_field(name=f"Members ({len(members)})", value=", ".join(members), inline=False)
        
        await ctx.send(embed=embed)

# Voice channel management
@bot.command()
async def create_voice(ctx, *, name: str):
    """Create a temporary voice channel."""
    if not ctx.author.guild_permissions.manage_channels:
        return await ctx.send("You don't have permission to manage channels.")
    
    # Create temporary voice channel
    overwrites = {
        ctx.guild.default_role: disnake.PermissionOverwrite(view_channel=True),
        ctx.author: disnake.PermissionOverwrite(manage_channels=True)
    }
    
    channel = await ctx.guild.create_voice_channel(
        name=f"🔊 {name}",
        overwrites=overwrites,
        reason=f"Temporary voice channel created by {ctx.author}"
    )
    
    await ctx.send(f"Created temporary voice channel: {channel.mention}")
    
    # Auto-delete when empty (would need a background task)

@bot.command()
async def voice_info(ctx, channel: disnake.VoiceChannel = None):
    """Get information about a voice channel."""
    if channel is None:
        if ctx.author.voice:
            channel = ctx.author.voice.channel
        else:
            return await ctx.send("Specify a voice channel or join one.")
    
    embed = disnake.Embed(title=f"🔊 {channel.name}", color=0x0099ff)
    embed.add_field(name="ID", value=channel.id, inline=True)
    embed.add_field(name="Bitrate", value=f"{channel.bitrate}bps", inline=True)
    embed.add_field(name="User Limit", value=channel.user_limit or "No limit", inline=True)
    embed.add_field(name="Members", value=len(channel.members), inline=True)
    embed.add_field(name="Created", value=f"<t:{int(channel.created_at.timestamp())}:F>", inline=True)
    
    if channel.rtc_region:
        embed.add_field(name="Region", value=channel.rtc_region, inline=True)
    
    # List members
    if channel.members:
        member_list = []
        for member in channel.members:
            status = []
            if member.voice.deaf:
                status.append("🔇")
            if member.voice.mute:
                status.append("🤐")
            if member.voice.self_deaf:
                status.append("🙉")
            if member.voice.self_mute:
                status.append("🤫")
            if member.voice.streaming:
                status.append("📹")
            
            member_list.append(f"{member.display_name} {''.join(status)}")
        
        embed.add_field(name="Connected Members", value="\n".join(member_list), inline=False)
    
    await ctx.send(embed=embed)

bot.add_cog(VoiceEffects(bot))

Install with Tessl CLI

npx tessl i tessl/pypi-disnake

docs

application-commands.md

automod.md

channels-messaging.md

client-bot.md

command-framework.md

error-handling.md

events-gateway.md

guild-management.md

index.md

interactions-ui.md

localization-i18n.md

permissions-security.md

polls.md

users-members.md

voice-audio.md

tile.json