CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-wavelink

A robust and powerful, fully asynchronous Lavalink wrapper built for discord.py in Python.

Overview
Eval results
Files

queue-system.mddocs/

Queue System

Advanced queue management with multiple modes, history tracking, AutoPlay functionality, and thread-safe operations for robust track management. The queue system provides flexible track ordering, looping modes, and automatic track recommendations for continuous playback.

Capabilities

Queue Management

Thread-safe queue operations for managing track playback order with support for atomic operations and concurrent access.

class Queue:
    def __init__(self, history: Queue | None = None):
        """
        Initialize a new queue.
        
        Parameters:
        - history: Optional history queue for storing played tracks
        """
    
    @property
    def mode(self) -> QueueMode:
        """Current queue mode (normal, loop, loop_all)."""
    
    @mode.setter
    def mode(self, value: QueueMode) -> None:
        """Set the queue mode."""
    
    @property
    def history(self) -> Queue | None:
        """History queue containing previously played tracks."""
    
    @property
    def count(self) -> int:
        """Number of tracks currently in the queue."""
    
    @property
    def is_empty(self) -> bool:
        """Whether the queue is empty."""
    
    @property
    def loaded(self) -> Playable | None:
        """Pre-loaded next track for smooth transitions."""
    
    def put(
        self,
        item: list[Playable] | Playable | Playlist,
        *,
        atomic: bool = True
    ) -> int:
        """
        Add tracks to the queue.
        
        Parameters:
        - item: Track(s) or playlist to add
        - atomic: Whether to add all tracks atomically
        
        Returns:
        int: Number of tracks added
        """
    
    async def put_wait(
        self,
        item: list[Playable] | Playable | Playlist,
        *,
        atomic: bool = True
    ) -> int:
        """
        Add tracks to the queue (async version).
        
        Parameters:
        - item: Track(s) or playlist to add
        - atomic: Whether to add all tracks atomically
        
        Returns:
        int: Number of tracks added
        """
    
    def get(self) -> Playable:
        """
        Get the next track from the queue.
        
        Returns:
        Playable: Next track to play
        
        Raises:
        QueueEmpty: If queue is empty
        """
    
    async def get_wait(self) -> Playable:
        """
        Get the next track from the queue (async, waits if empty).
        
        Returns:
        Playable: Next track to play
        """
    
    def get_at(self, index: int) -> Playable:
        """
        Get a track at a specific index without removing it.
        
        Parameters:
        - index: Index of the track
        
        Returns:
        Playable: Track at the specified index
        
        Raises:
        IndexError: If index is out of range
        """
    
    def put_at(self, index: int, value: Playable) -> None:
        """
        Insert a track at a specific index.
        
        Parameters:
        - index: Index to insert at
        - value: Track to insert
        
        Raises:
        IndexError: If index is out of range
        """

Queue Operations

Methods for manipulating queue contents including removal, reordering, and searching.

class Queue:
    def delete(self, index: int) -> None:
        """
        Remove a track at a specific index.
        
        Parameters:
        - index: Index of track to remove
        
        Raises:
        IndexError: If index is out of range
        """
    
    def peek(self, index: int = 0) -> Playable:
        """
        View a track without removing it from the queue.
        
        Parameters:
        - index: Index of track to peek at (default: next track)
        
        Returns:
        Playable: Track at the specified index
        
        Raises:
        IndexError: If index is out of range
        """
    
    def swap(self, first: int, second: int) -> None:
        """
        Swap two tracks in the queue.
        
        Parameters:
        - first: Index of first track
        - second: Index of second track
        
        Raises:
        IndexError: If either index is out of range
        """
    
    def index(self, item: Playable) -> int:
        """
        Find the index of a track in the queue.
        
        Parameters:
        - item: Track to find
        
        Returns:
        int: Index of the track
        
        Raises:
        ValueError: If track is not in queue
        """
    
    def shuffle(self) -> None:
        """Randomly shuffle all tracks in the queue."""
    
    def clear(self) -> None:
        """Remove all tracks from the queue."""
    
    def copy(self) -> Queue:
        """
        Create a copy of the queue.
        
        Returns:
        Queue: New queue with same tracks and settings
        """
    
    def reset(self) -> None:
        """Reset the queue to initial state, clearing all tracks and history."""
    
    def remove(self, item: Playable, /, count: int | None = 1) -> int:
        """
        Remove specific track(s) from the queue.
        
        Parameters:
        - item: Track to remove
        - count: Number of instances to remove (None for all)
        
        Returns:
        int: Number of tracks removed
        """
    
    # Container protocol methods
    def __len__(self) -> int:
        """Return the number of tracks in the queue."""
    
    def __getitem__(self, index: int | slice) -> Playable | list[Playable]:
        """Get track(s) by index or slice."""
    
    def __setitem__(self, index: int, value: Playable) -> None:
        """Set track at index."""
    
    def __delitem__(self, index: int) -> None:
        """Delete track at index."""
    
    def __iter__(self) -> Iterator[Playable]:
        """Iterate over tracks in the queue."""
    
    def __bool__(self) -> bool:
        """Return True if queue has tracks."""

Queue Modes

Enumeration of queue behavior modes for different playback patterns.

class QueueMode(enum.Enum):
    """Queue behavior modes."""
    normal = 0     # No looping, play through queue once
    loop = 1       # Loop the current track continuously  
    loop_all = 2   # Loop through entire queue continuously

AutoPlay System

AutoPlay functionality for continuous music playback with track recommendations.

class AutoPlayMode(enum.Enum):
    """AutoPlay functionality modes."""
    enabled = 0   # Fully autonomous with track recommendations
    partial = 1   # Autonomous but no automatic recommendations
    disabled = 2  # No automatic functionality

# AutoPlay is integrated into Player class
class Player:
    @property
    def autoplay(self) -> AutoPlayMode:
        """Current AutoPlay mode."""
    
    @autoplay.setter
    def autoplay(self, value: AutoPlayMode) -> None:
        """Set AutoPlay mode."""
    
    auto_queue: Queue  # Separate queue for AutoPlay recommendations

Usage Examples

Basic Queue Operations

import wavelink

@bot.command()
async def queue_example(ctx):
    """Demonstrate basic queue operations."""
    player = ctx.voice_client
    if not player:
        return await ctx.send("Not connected to a voice channel!")
    
    # Add tracks to queue
    tracks = await wavelink.Pool.fetch_tracks("rock music")
    if tracks:
        added = player.queue.put(tracks[:5])  # Add first 5 tracks
        await ctx.send(f"Added {added} tracks to queue")
    
    # Check queue status
    await ctx.send(f"Queue has {player.queue.count} tracks")
    await ctx.send(f"Queue is empty: {player.queue.is_empty}")
    
    # Peek at next track
    if not player.queue.is_empty:
        next_track = player.queue.peek()
        await ctx.send(f"Next track: {next_track.title}")

@bot.command()
async def show_queue(ctx, page: int = 1):
    """Display the current queue with pagination."""
    player = ctx.voice_client
    if not player:
        return await ctx.send("Not connected to a voice channel!")
    
    if player.queue.is_empty:
        return await ctx.send("Queue is empty!")
    
    # Pagination
    per_page = 10
    start_idx = (page - 1) * per_page
    end_idx = start_idx + per_page
    
    queue_tracks = list(player.queue)[start_idx:end_idx]
    
    embed = discord.Embed(
        title=f"Queue (Page {page})",
        description=f"Total tracks: {player.queue.count}",
        color=discord.Color.blue()
    )
    
    track_list = []
    for i, track in enumerate(queue_tracks, start_idx + 1):
        duration = f"{track.length // 60000}:{(track.length // 1000) % 60:02d}"
        track_list.append(f"{i}. {track.title} - {track.author} ({duration})")
    
    embed.add_field(
        name="Tracks",
        value="\n".join(track_list) if track_list else "No tracks on this page",
        inline=False
    )
    
    # Add current track info
    if player.current:
        embed.add_field(
            name="Now Playing",
            value=f"{player.current.title} - {player.current.author}",
            inline=False
        )
    
    await ctx.send(embed=embed)

Queue Manipulation

@bot.command()
async def remove_track(ctx, index: int):
    """Remove a track from the queue by index."""
    player = ctx.voice_client
    if not player:
        return await ctx.send("Not connected to a voice channel!")
    
    if player.queue.is_empty:
        return await ctx.send("Queue is empty!")
    
    try:
        # Convert to 0-based index
        track_index = index - 1
        
        # Get track info before removing
        track = player.queue.get_at(track_index)
        
        # Remove the track
        player.queue.delete(track_index)
        
        await ctx.send(f"Removed: {track.title}")
        
    except IndexError:
        await ctx.send(f"Invalid index! Queue has {player.queue.count} tracks.")

@bot.command()
async def move_track(ctx, from_pos: int, to_pos: int):
    """Move a track from one position to another."""
    player = ctx.voice_client
    if not player:
        return await ctx.send("Not connected to a voice channel!")
    
    try:
        # Convert to 0-based indices
        from_idx = from_pos - 1
        to_idx = to_pos - 1
        
        # Get track info
        track = player.queue.get_at(from_idx)
        
        # Remove from old position
        player.queue.delete(from_idx)
        
        # Adjust target index if it's after the removed track
        if to_idx > from_idx:
            to_idx -= 1
        
        # Insert at new position
        player.queue.put_at(to_idx, track)
        
        await ctx.send(f"Moved '{track.title}' from position {from_pos} to {to_pos}")
        
    except IndexError:
        await ctx.send("Invalid position! Check queue size.")

@bot.command()
async def swap_tracks(ctx, pos1: int, pos2: int):
    """Swap two tracks in the queue."""
    player = ctx.voice_client
    if not player:
        return await ctx.send("Not connected to a voice channel!")
    
    try:
        # Convert to 0-based indices
        idx1 = pos1 - 1
        idx2 = pos2 - 1
        
        # Get track info
        track1 = player.queue.get_at(idx1)
        track2 = player.queue.get_at(idx2)
        
        # Swap tracks
        player.queue.swap(idx1, idx2)
        
        await ctx.send(f"Swapped '{track1.title}' and '{track2.title}'")
        
    except IndexError:
        await ctx.send("Invalid positions! Check queue size.")

@bot.command()
async def shuffle_queue(ctx):
    """Shuffle the queue randomly."""
    player = ctx.voice_client
    if not player:
        return await ctx.send("Not connected to a voice channel!")
    
    if player.queue.count < 2:
        return await ctx.send("Need at least 2 tracks to shuffle!")
    
    player.queue.shuffle()
    await ctx.send(f"Shuffled {player.queue.count} tracks!")

@bot.command()
async def clear_queue(ctx):
    """Clear all tracks from the queue."""
    player = ctx.voice_client
    if not player:
        return await ctx.send("Not connected to a voice channel!")
    
    if player.queue.is_empty:
        return await ctx.send("Queue is already empty!")
    
    count = player.queue.count
    player.queue.clear()
    await ctx.send(f"Cleared {count} tracks from the queue!")

Queue Modes

@bot.command()
async def loop_mode(ctx, mode: str = None):
    """Set or check queue loop mode."""
    player = ctx.voice_client
    if not player:
        return await ctx.send("Not connected to a voice channel!")
    
    if mode is None:
        current_mode = player.queue.mode.name
        await ctx.send(f"Current loop mode: {current_mode}")
        return
    
    mode_map = {
        'off': wavelink.QueueMode.normal,
        'normal': wavelink.QueueMode.normal,
        'track': wavelink.QueueMode.loop,
        'loop': wavelink.QueueMode.loop,
        'all': wavelink.QueueMode.loop_all,
        'queue': wavelink.QueueMode.loop_all
    }
    
    if mode.lower() not in mode_map:
        return await ctx.send("Valid modes: normal/off, track/loop, all/queue")
    
    player.queue.mode = mode_map[mode.lower()]
    await ctx.send(f"Loop mode set to: {player.queue.mode.name}")

@bot.command()
async def autoplay_mode(ctx, mode: str = None):
    """Configure AutoPlay mode."""
    player = ctx.voice_client
    if not player:
        return await ctx.send("Not connected to a voice channel!")
    
    if mode is None:
        current = player.autoplay.name
        auto_queue_count = player.auto_queue.count
        await ctx.send(f"AutoPlay: {current} | Auto queue: {auto_queue_count} tracks")
        return
    
    mode_map = {
        'on': wavelink.AutoPlayMode.enabled,
        'enabled': wavelink.AutoPlayMode.enabled,
        'partial': wavelink.AutoPlayMode.partial,
        'off': wavelink.AutoPlayMode.disabled,
        'disabled': wavelink.AutoPlayMode.disabled
    }
    
    if mode.lower() not in mode_map:
        return await ctx.send("Valid modes: enabled/on, partial, disabled/off")
    
    player.autoplay = mode_map[mode.lower()]
    await ctx.send(f"AutoPlay mode set to: {player.autoplay.name}")

History Management

@bot.command()
async def history(ctx, count: int = 10):
    """Show recently played tracks."""
    player = ctx.voice_client
    if not player:
        return await ctx.send("Not connected to a voice channel!")
    
    if not player.queue.history or player.queue.history.is_empty:
        return await ctx.send("No tracks in history!")
    
    # Get last N tracks from history
    history_tracks = list(player.queue.history)[-count:]
    
    embed = discord.Embed(
        title="Recently Played",
        description=f"Last {len(history_tracks)} tracks",
        color=discord.Color.purple()
    )
    
    track_list = []
    for i, track in enumerate(reversed(history_tracks), 1):
        track_list.append(f"{i}. {track.title} - {track.author}")
    
    embed.add_field(
        name="History",
        value="\n".join(track_list),
        inline=False
    )
    
    await ctx.send(embed=embed)

@bot.command()
async def replay(ctx, index: int = 1):
    """Replay a track from history."""
    player = ctx.voice_client
    if not player:
        return await ctx.send("Not connected to a voice channel!")
    
    if not player.queue.history or player.queue.history.is_empty:
        return await ctx.send("No tracks in history!")
    
    try:
        # Get track from history (1-based index from most recent)
        history_list = list(player.queue.history)
        track = history_list[-(index)]
        
        # Add to front of queue
        player.queue.put_at(0, track)
        await ctx.send(f"Added to front of queue: {track.title}")
        
    except IndexError:
        await ctx.send(f"Invalid index! History has {player.queue.history.count} tracks.")

Advanced Queue Features

@bot.command()
async def find_track(ctx, *, query: str):
    """Find tracks in the queue matching a query."""
    player = ctx.voice_client
    if not player:
        return await ctx.send("Not connected to a voice channel!")
    
    if player.queue.is_empty:
        return await ctx.send("Queue is empty!")
    
    # Search for tracks matching the query
    matching_tracks = []
    for i, track in enumerate(player.queue):
        if (query.lower() in track.title.lower() or 
            query.lower() in track.author.lower()):
            matching_tracks.append((i + 1, track))
    
    if not matching_tracks:
        return await ctx.send(f"No tracks found matching '{query}'")
    
    embed = discord.Embed(
        title="Search Results in Queue",
        description=f"Found {len(matching_tracks)} matching tracks",
        color=discord.Color.green()
    )
    
    result_list = []
    for pos, track in matching_tracks[:10]:  # Limit to 10 results
        result_list.append(f"{pos}. {track.title} - {track.author}")
    
    embed.add_field(
        name="Matches",
        value="\n".join(result_list),
        inline=False
    )
    
    await ctx.send(embed=embed)

@bot.command()
async def queue_stats(ctx):
    """Show detailed queue statistics."""
    player = ctx.voice_client
    if not player:
        return await ctx.send("Not connected to a voice channel!")
    
    # Calculate total duration
    total_duration = sum(track.length for track in player.queue)
    hours = total_duration // 3600000
    minutes = (total_duration % 3600000) // 60000
    seconds = (total_duration % 60000) // 1000
    
    # Count tracks by source
    source_counts = {}
    for track in player.queue:
        source = track.source.name
        source_counts[source] = source_counts.get(source, 0) + 1
    
    embed = discord.Embed(
        title="Queue Statistics",
        color=discord.Color.blue()
    )
    
    embed.add_field(name="Total Tracks", value=player.queue.count, inline=True)
    embed.add_field(name="Mode", value=player.queue.mode.name, inline=True)
    embed.add_field(name="AutoPlay", value=player.autoplay.name, inline=True)
    
    duration_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
    embed.add_field(name="Total Duration", value=duration_str, inline=True)
    
    if player.queue.history:
        embed.add_field(name="History Count", value=player.queue.history.count, inline=True)
    
    embed.add_field(name="Auto Queue", value=player.auto_queue.count, inline=True)
    
    # Source breakdown
    if source_counts:
        source_info = "\n".join(f"{source}: {count}" for source, count in source_counts.items())
        embed.add_field(name="Sources", value=source_info, inline=False)
    
    await ctx.send(embed=embed)

Install with Tessl CLI

npx tessl i tessl/pypi-wavelink

docs

audio-filters.md

events-exceptions.md

index.md

node-management.md

player-control.md

queue-system.md

track-search.md

tile.json