A robust and powerful, fully asynchronous Lavalink wrapper built for discord.py in Python.
Advanced queue management with multiple modes, history tracking, AutoPlay functionality, and thread-safe operations for robust track management. The queue system provides flexible track ordering, looping modes, and automatic track recommendations for continuous playback.
Thread-safe queue operations for managing track playback order with support for atomic operations and concurrent access.
class Queue:
def __init__(self, history: Queue | None = None):
"""
Initialize a new queue.
Parameters:
- history: Optional history queue for storing played tracks
"""
@property
def mode(self) -> QueueMode:
"""Current queue mode (normal, loop, loop_all)."""
@mode.setter
def mode(self, value: QueueMode) -> None:
"""Set the queue mode."""
@property
def history(self) -> Queue | None:
"""History queue containing previously played tracks."""
@property
def count(self) -> int:
"""Number of tracks currently in the queue."""
@property
def is_empty(self) -> bool:
"""Whether the queue is empty."""
@property
def loaded(self) -> Playable | None:
"""Pre-loaded next track for smooth transitions."""
def put(
self,
item: list[Playable] | Playable | Playlist,
*,
atomic: bool = True
) -> int:
"""
Add tracks to the queue.
Parameters:
- item: Track(s) or playlist to add
- atomic: Whether to add all tracks atomically
Returns:
int: Number of tracks added
"""
async def put_wait(
self,
item: list[Playable] | Playable | Playlist,
*,
atomic: bool = True
) -> int:
"""
Add tracks to the queue (async version).
Parameters:
- item: Track(s) or playlist to add
- atomic: Whether to add all tracks atomically
Returns:
int: Number of tracks added
"""
def get(self) -> Playable:
"""
Get the next track from the queue.
Returns:
Playable: Next track to play
Raises:
QueueEmpty: If queue is empty
"""
async def get_wait(self) -> Playable:
"""
Get the next track from the queue (async, waits if empty).
Returns:
Playable: Next track to play
"""
def get_at(self, index: int) -> Playable:
"""
Get a track at a specific index without removing it.
Parameters:
- index: Index of the track
Returns:
Playable: Track at the specified index
Raises:
IndexError: If index is out of range
"""
def put_at(self, index: int, value: Playable) -> None:
"""
Insert a track at a specific index.
Parameters:
- index: Index to insert at
- value: Track to insert
Raises:
IndexError: If index is out of range
"""Methods for manipulating queue contents including removal, reordering, and searching.
class Queue:
def delete(self, index: int) -> None:
"""
Remove a track at a specific index.
Parameters:
- index: Index of track to remove
Raises:
IndexError: If index is out of range
"""
def peek(self, index: int = 0) -> Playable:
"""
View a track without removing it from the queue.
Parameters:
- index: Index of track to peek at (default: next track)
Returns:
Playable: Track at the specified index
Raises:
IndexError: If index is out of range
"""
def swap(self, first: int, second: int) -> None:
"""
Swap two tracks in the queue.
Parameters:
- first: Index of first track
- second: Index of second track
Raises:
IndexError: If either index is out of range
"""
def index(self, item: Playable) -> int:
"""
Find the index of a track in the queue.
Parameters:
- item: Track to find
Returns:
int: Index of the track
Raises:
ValueError: If track is not in queue
"""
def shuffle(self) -> None:
"""Randomly shuffle all tracks in the queue."""
def clear(self) -> None:
"""Remove all tracks from the queue."""
def copy(self) -> Queue:
"""
Create a copy of the queue.
Returns:
Queue: New queue with same tracks and settings
"""
def reset(self) -> None:
"""Reset the queue to initial state, clearing all tracks and history."""
def remove(self, item: Playable, /, count: int | None = 1) -> int:
"""
Remove specific track(s) from the queue.
Parameters:
- item: Track to remove
- count: Number of instances to remove (None for all)
Returns:
int: Number of tracks removed
"""
# Container protocol methods
def __len__(self) -> int:
"""Return the number of tracks in the queue."""
def __getitem__(self, index: int | slice) -> Playable | list[Playable]:
"""Get track(s) by index or slice."""
def __setitem__(self, index: int, value: Playable) -> None:
"""Set track at index."""
def __delitem__(self, index: int) -> None:
"""Delete track at index."""
def __iter__(self) -> Iterator[Playable]:
"""Iterate over tracks in the queue."""
def __bool__(self) -> bool:
"""Return True if queue has tracks."""Enumeration of queue behavior modes for different playback patterns.
class QueueMode(enum.Enum):
"""Queue behavior modes."""
normal = 0 # No looping, play through queue once
loop = 1 # Loop the current track continuously
loop_all = 2 # Loop through entire queue continuouslyAutoPlay functionality for continuous music playback with track recommendations.
class AutoPlayMode(enum.Enum):
"""AutoPlay functionality modes."""
enabled = 0 # Fully autonomous with track recommendations
partial = 1 # Autonomous but no automatic recommendations
disabled = 2 # No automatic functionality
# AutoPlay is integrated into Player class
class Player:
@property
def autoplay(self) -> AutoPlayMode:
"""Current AutoPlay mode."""
@autoplay.setter
def autoplay(self, value: AutoPlayMode) -> None:
"""Set AutoPlay mode."""
auto_queue: Queue # Separate queue for AutoPlay recommendationsimport wavelink
@bot.command()
async def queue_example(ctx):
"""Demonstrate basic queue operations."""
player = ctx.voice_client
if not player:
return await ctx.send("Not connected to a voice channel!")
# Add tracks to queue
tracks = await wavelink.Pool.fetch_tracks("rock music")
if tracks:
added = player.queue.put(tracks[:5]) # Add first 5 tracks
await ctx.send(f"Added {added} tracks to queue")
# Check queue status
await ctx.send(f"Queue has {player.queue.count} tracks")
await ctx.send(f"Queue is empty: {player.queue.is_empty}")
# Peek at next track
if not player.queue.is_empty:
next_track = player.queue.peek()
await ctx.send(f"Next track: {next_track.title}")
@bot.command()
async def show_queue(ctx, page: int = 1):
"""Display the current queue with pagination."""
player = ctx.voice_client
if not player:
return await ctx.send("Not connected to a voice channel!")
if player.queue.is_empty:
return await ctx.send("Queue is empty!")
# Pagination
per_page = 10
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
queue_tracks = list(player.queue)[start_idx:end_idx]
embed = discord.Embed(
title=f"Queue (Page {page})",
description=f"Total tracks: {player.queue.count}",
color=discord.Color.blue()
)
track_list = []
for i, track in enumerate(queue_tracks, start_idx + 1):
duration = f"{track.length // 60000}:{(track.length // 1000) % 60:02d}"
track_list.append(f"{i}. {track.title} - {track.author} ({duration})")
embed.add_field(
name="Tracks",
value="\n".join(track_list) if track_list else "No tracks on this page",
inline=False
)
# Add current track info
if player.current:
embed.add_field(
name="Now Playing",
value=f"{player.current.title} - {player.current.author}",
inline=False
)
await ctx.send(embed=embed)@bot.command()
async def remove_track(ctx, index: int):
"""Remove a track from the queue by index."""
player = ctx.voice_client
if not player:
return await ctx.send("Not connected to a voice channel!")
if player.queue.is_empty:
return await ctx.send("Queue is empty!")
try:
# Convert to 0-based index
track_index = index - 1
# Get track info before removing
track = player.queue.get_at(track_index)
# Remove the track
player.queue.delete(track_index)
await ctx.send(f"Removed: {track.title}")
except IndexError:
await ctx.send(f"Invalid index! Queue has {player.queue.count} tracks.")
@bot.command()
async def move_track(ctx, from_pos: int, to_pos: int):
"""Move a track from one position to another."""
player = ctx.voice_client
if not player:
return await ctx.send("Not connected to a voice channel!")
try:
# Convert to 0-based indices
from_idx = from_pos - 1
to_idx = to_pos - 1
# Get track info
track = player.queue.get_at(from_idx)
# Remove from old position
player.queue.delete(from_idx)
# Adjust target index if it's after the removed track
if to_idx > from_idx:
to_idx -= 1
# Insert at new position
player.queue.put_at(to_idx, track)
await ctx.send(f"Moved '{track.title}' from position {from_pos} to {to_pos}")
except IndexError:
await ctx.send("Invalid position! Check queue size.")
@bot.command()
async def swap_tracks(ctx, pos1: int, pos2: int):
"""Swap two tracks in the queue."""
player = ctx.voice_client
if not player:
return await ctx.send("Not connected to a voice channel!")
try:
# Convert to 0-based indices
idx1 = pos1 - 1
idx2 = pos2 - 1
# Get track info
track1 = player.queue.get_at(idx1)
track2 = player.queue.get_at(idx2)
# Swap tracks
player.queue.swap(idx1, idx2)
await ctx.send(f"Swapped '{track1.title}' and '{track2.title}'")
except IndexError:
await ctx.send("Invalid positions! Check queue size.")
@bot.command()
async def shuffle_queue(ctx):
"""Shuffle the queue randomly."""
player = ctx.voice_client
if not player:
return await ctx.send("Not connected to a voice channel!")
if player.queue.count < 2:
return await ctx.send("Need at least 2 tracks to shuffle!")
player.queue.shuffle()
await ctx.send(f"Shuffled {player.queue.count} tracks!")
@bot.command()
async def clear_queue(ctx):
"""Clear all tracks from the queue."""
player = ctx.voice_client
if not player:
return await ctx.send("Not connected to a voice channel!")
if player.queue.is_empty:
return await ctx.send("Queue is already empty!")
count = player.queue.count
player.queue.clear()
await ctx.send(f"Cleared {count} tracks from the queue!")@bot.command()
async def loop_mode(ctx, mode: str = None):
"""Set or check queue loop mode."""
player = ctx.voice_client
if not player:
return await ctx.send("Not connected to a voice channel!")
if mode is None:
current_mode = player.queue.mode.name
await ctx.send(f"Current loop mode: {current_mode}")
return
mode_map = {
'off': wavelink.QueueMode.normal,
'normal': wavelink.QueueMode.normal,
'track': wavelink.QueueMode.loop,
'loop': wavelink.QueueMode.loop,
'all': wavelink.QueueMode.loop_all,
'queue': wavelink.QueueMode.loop_all
}
if mode.lower() not in mode_map:
return await ctx.send("Valid modes: normal/off, track/loop, all/queue")
player.queue.mode = mode_map[mode.lower()]
await ctx.send(f"Loop mode set to: {player.queue.mode.name}")
@bot.command()
async def autoplay_mode(ctx, mode: str = None):
"""Configure AutoPlay mode."""
player = ctx.voice_client
if not player:
return await ctx.send("Not connected to a voice channel!")
if mode is None:
current = player.autoplay.name
auto_queue_count = player.auto_queue.count
await ctx.send(f"AutoPlay: {current} | Auto queue: {auto_queue_count} tracks")
return
mode_map = {
'on': wavelink.AutoPlayMode.enabled,
'enabled': wavelink.AutoPlayMode.enabled,
'partial': wavelink.AutoPlayMode.partial,
'off': wavelink.AutoPlayMode.disabled,
'disabled': wavelink.AutoPlayMode.disabled
}
if mode.lower() not in mode_map:
return await ctx.send("Valid modes: enabled/on, partial, disabled/off")
player.autoplay = mode_map[mode.lower()]
await ctx.send(f"AutoPlay mode set to: {player.autoplay.name}")@bot.command()
async def history(ctx, count: int = 10):
"""Show recently played tracks."""
player = ctx.voice_client
if not player:
return await ctx.send("Not connected to a voice channel!")
if not player.queue.history or player.queue.history.is_empty:
return await ctx.send("No tracks in history!")
# Get last N tracks from history
history_tracks = list(player.queue.history)[-count:]
embed = discord.Embed(
title="Recently Played",
description=f"Last {len(history_tracks)} tracks",
color=discord.Color.purple()
)
track_list = []
for i, track in enumerate(reversed(history_tracks), 1):
track_list.append(f"{i}. {track.title} - {track.author}")
embed.add_field(
name="History",
value="\n".join(track_list),
inline=False
)
await ctx.send(embed=embed)
@bot.command()
async def replay(ctx, index: int = 1):
"""Replay a track from history."""
player = ctx.voice_client
if not player:
return await ctx.send("Not connected to a voice channel!")
if not player.queue.history or player.queue.history.is_empty:
return await ctx.send("No tracks in history!")
try:
# Get track from history (1-based index from most recent)
history_list = list(player.queue.history)
track = history_list[-(index)]
# Add to front of queue
player.queue.put_at(0, track)
await ctx.send(f"Added to front of queue: {track.title}")
except IndexError:
await ctx.send(f"Invalid index! History has {player.queue.history.count} tracks.")@bot.command()
async def find_track(ctx, *, query: str):
"""Find tracks in the queue matching a query."""
player = ctx.voice_client
if not player:
return await ctx.send("Not connected to a voice channel!")
if player.queue.is_empty:
return await ctx.send("Queue is empty!")
# Search for tracks matching the query
matching_tracks = []
for i, track in enumerate(player.queue):
if (query.lower() in track.title.lower() or
query.lower() in track.author.lower()):
matching_tracks.append((i + 1, track))
if not matching_tracks:
return await ctx.send(f"No tracks found matching '{query}'")
embed = discord.Embed(
title="Search Results in Queue",
description=f"Found {len(matching_tracks)} matching tracks",
color=discord.Color.green()
)
result_list = []
for pos, track in matching_tracks[:10]: # Limit to 10 results
result_list.append(f"{pos}. {track.title} - {track.author}")
embed.add_field(
name="Matches",
value="\n".join(result_list),
inline=False
)
await ctx.send(embed=embed)
@bot.command()
async def queue_stats(ctx):
"""Show detailed queue statistics."""
player = ctx.voice_client
if not player:
return await ctx.send("Not connected to a voice channel!")
# Calculate total duration
total_duration = sum(track.length for track in player.queue)
hours = total_duration // 3600000
minutes = (total_duration % 3600000) // 60000
seconds = (total_duration % 60000) // 1000
# Count tracks by source
source_counts = {}
for track in player.queue:
source = track.source.name
source_counts[source] = source_counts.get(source, 0) + 1
embed = discord.Embed(
title="Queue Statistics",
color=discord.Color.blue()
)
embed.add_field(name="Total Tracks", value=player.queue.count, inline=True)
embed.add_field(name="Mode", value=player.queue.mode.name, inline=True)
embed.add_field(name="AutoPlay", value=player.autoplay.name, inline=True)
duration_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
embed.add_field(name="Total Duration", value=duration_str, inline=True)
if player.queue.history:
embed.add_field(name="History Count", value=player.queue.history.count, inline=True)
embed.add_field(name="Auto Queue", value=player.auto_queue.count, inline=True)
# Source breakdown
if source_counts:
source_info = "\n".join(f"{source}: {count}" for source, count in source_counts.items())
embed.add_field(name="Sources", value=source_info, inline=False)
await ctx.send(embed=embed)Install with Tessl CLI
npx tessl i tessl/pypi-wavelink