A robust and powerful, fully asynchronous Lavalink wrapper built for discord.py in Python.
Rich event system for track lifecycle, player state changes, and WebSocket events, plus comprehensive exception hierarchy for robust error handling. The event system enables responsive music bots that can react to playback changes, while the exception hierarchy provides detailed error information for debugging and user feedback.
Comprehensive exception system for handling various error conditions in wavelink operations.
# Base exception class
class WavelinkException(Exception):
"""
Base wavelink exception class.
All wavelink exceptions derive from this exception, making it easy
to catch any wavelink-related error.
"""
# Node-related exceptions
class NodeException(WavelinkException):
"""
Generic Node error with optional HTTP status code.
Attributes:
- status: int | None - HTTP status code if available
"""
def __init__(self, msg: str | None = None, status: int | None = None):
"""
Initialize NodeException.
Parameters:
- msg: Error message
- status: HTTP status code
"""
status: int | None
class InvalidClientException(WavelinkException):
"""
Exception raised when an invalid discord.Client is provided
while connecting a wavelink.Node.
"""
class AuthorizationFailedException(WavelinkException):
"""
Exception raised when Lavalink fails to authenticate a Node
with the provided password.
"""
class InvalidNodeException(WavelinkException):
"""
Exception raised when a Node is tried to be retrieved from the
Pool without existing, or the Pool is empty.
"""
# Lavalink server exceptions
class LavalinkException(WavelinkException):
"""
Exception raised when Lavalink returns an invalid response.
Attributes:
- timestamp: int - Error timestamp
- status: int - HTTP response status code
- error: str - Error message from Lavalink
- trace: str | None - Stack trace if available
- path: str - Request path that caused the error
"""
def __init__(self, msg: str | None = None, /, *, data: dict):
"""
Initialize LavalinkException from error response data.
Parameters:
- msg: Custom error message
- data: Error response data from Lavalink
"""
timestamp: int
status: int
error: str
trace: str | None
path: str
class LavalinkLoadException(WavelinkException):
"""
Exception raised when an error occurred loading tracks via Lavalink.
Attributes:
- error: str - Error message from Lavalink
- severity: str - Error severity level
- cause: str - Cause of the error
"""
def __init__(self, msg: str | None = None, /, *, data: dict):
"""
Initialize LavalinkLoadException from load error data.
Parameters:
- msg: Custom error message
- data: Load error data from Lavalink
"""
error: str
severity: str
cause: str
# Player-related exceptions
class InvalidChannelStateException(WavelinkException):
"""
Exception raised when a Player tries to connect to an invalid channel
or has invalid permissions to use this channel.
"""
class ChannelTimeoutException(WavelinkException):
"""
Exception raised when connecting to a voice channel times out.
"""
# Queue-related exceptions
class QueueEmpty(WavelinkException):
"""
Exception raised when you try to retrieve from an empty queue.
"""
# Cache-related exceptions
class CapacityZero(WavelinkException):
"""
Exception raised when LFU cache has zero capacity.
"""Data structures containing information passed to event handlers for various wavelink events.
# Node events
class NodeReadyEventPayload:
"""Data structure for node ready events."""
resumed: bool
session_id: str
# Track events
class TrackStartEventPayload:
"""Data structure for track start events."""
track: dict
player: Player
class TrackEndEventPayload:
"""Data structure for track end events."""
track: dict
reason: str
player: Player
class TrackExceptionEventPayload:
"""Data structure for track exception events."""
track: dict
exception: dict
player: Player
class TrackStuckEventPayload:
"""Data structure for track stuck events."""
track: dict
threshold_ms: int
player: Player
# WebSocket events
class WebsocketClosedEventPayload:
"""Data structure for WebSocket closed events."""
code: int
reason: str
by_remote: bool
player: Player
# Player events
class PlayerUpdateEventPayload:
"""Data structure for player update events."""
state: dict
player: Player
# Statistics events
class StatsEventMemory:
"""Memory statistics data."""
free: int
used: int
allocated: int
reservable: int
class StatsEventCPU:
"""CPU statistics data."""
cores: int
system_load: float
lavalink_load: float
class StatsEventFrames:
"""Frame statistics data."""
sent: int
nulled: int
deficit: int
class StatsEventPayload:
"""Node statistics event data."""
players: int
playing_players: int
uptime: int
memory: StatsEventMemory
cpu: StatsEventCPU
frame_stats: StatsEventFrames | None
# Response payloads
class StatsResponsePayload:
"""Statistics response from node."""
players: int
playing_players: int
uptime: int
memory: dict
cpu: dict
frame_stats: dict | None
class PlayerStatePayload:
"""Player state information."""
time: int
position: int
connected: bool
ping: int
class VoiceStatePayload:
"""Voice state information."""
token: str
endpoint: str
session_id: str
class PlayerResponsePayload:
"""Player response information."""
guild_id: str
track: dict | None
volume: int
paused: bool
state: PlayerStatePayload
voice: VoiceStatePayload
filters: dict
class GitResponsePayload:
"""Git information response."""
branch: str
commit: str
commit_time: int
class VersionResponsePayload:
"""Version information response."""
semver: str
major: int
minor: int
patch: int
pre_release: str | None
build: str | None
class PluginResponsePayload:
"""Plugin information response."""
name: str
version: str
class InfoResponsePayload:
"""Node information response."""
version: VersionResponsePayload
build_time: int
git: GitResponsePayload
jvm: str
lavaplayer: str
source_managers: list[str]
filters: list[str]
plugins: list[PluginResponsePayload]
class ExtraEventPayload:
"""Extra event data for custom events."""
data: dictEvent handlers that can be implemented in your bot to respond to wavelink events.
# Event handler signatures for discord.py bots
async def on_wavelink_node_ready(payload: NodeReadyEventPayload) -> None:
"""Called when a node connects and is ready."""
async def on_wavelink_track_start(payload: TrackStartEventPayload) -> None:
"""Called when a track starts playing."""
async def on_wavelink_track_end(payload: TrackEndEventPayload) -> None:
"""Called when a track finishes playing."""
async def on_wavelink_track_exception(payload: TrackExceptionEventPayload) -> None:
"""Called when a track encounters an exception."""
async def on_wavelink_track_stuck(payload: TrackStuckEventPayload) -> None:
"""Called when a track gets stuck."""
async def on_wavelink_websocket_closed(payload: WebsocketClosedEventPayload) -> None:
"""Called when the WebSocket connection closes."""
async def on_wavelink_player_update(payload: PlayerUpdateEventPayload) -> None:
"""Called when player state updates."""
async def on_wavelink_stats_update(payload: StatsEventPayload) -> None:
"""Called when node statistics update."""
async def on_wavelink_extra_event(payload: ExtraEventPayload) -> None:
"""Called for custom events from Lavalink plugins."""import wavelink
import discord
from discord.ext import commands
@bot.event
async def on_command_error(ctx, error):
"""Global error handler for wavelink exceptions."""
if isinstance(error, commands.CommandInvokeError):
error = error.original
if isinstance(error, wavelink.WavelinkException):
# Handle wavelink-specific errors
if isinstance(error, wavelink.LavalinkLoadException):
embed = discord.Embed(
title="❌ Failed to Load Track",
description=f"**Error**: {error.error}\n**Cause**: {error.cause}",
color=discord.Color.red()
)
await ctx.send(embed=embed)
elif isinstance(error, wavelink.QueueEmpty):
await ctx.send("❌ The queue is empty!")
elif isinstance(error, wavelink.InvalidChannelStateException):
await ctx.send("❌ Cannot connect to that voice channel!")
elif isinstance(error, wavelink.ChannelTimeoutException):
await ctx.send("❌ Connection to voice channel timed out!")
elif isinstance(error, wavelink.AuthorizationFailedException):
await ctx.send("❌ Failed to authenticate with Lavalink server!")
elif isinstance(error, wavelink.NodeException):
embed = discord.Embed(
title="❌ Node Error",
description=f"Status Code: {error.status}" if error.status else "Unknown node error",
color=discord.Color.red()
)
await ctx.send(embed=embed)
else:
# Generic wavelink error
await ctx.send(f"❌ Wavelink error: {error}")
else:
# Handle non-wavelink errors
await ctx.send(f"❌ An error occurred: {error}")
@bot.command()
async def safe_play(ctx, *, query: str):
"""Play command with comprehensive error handling."""
try:
# Ensure player exists
if not ctx.voice_client:
if not ctx.author.voice:
return await ctx.send("❌ You're not in a voice channel!")
try:
player = await ctx.author.voice.channel.connect(cls=wavelink.Player)
except wavelink.InvalidChannelStateException:
return await ctx.send("❌ I don't have permission to connect to that channel!")
except wavelink.ChannelTimeoutException:
return await ctx.send("❌ Connection timed out!")
else:
player = ctx.voice_client
# Search for tracks
try:
tracks = await wavelink.Pool.fetch_tracks(query)
except wavelink.LavalinkLoadException as e:
return await ctx.send(f"❌ Search failed: {e.error}")
except wavelink.InvalidNodeException:
return await ctx.send("❌ No Lavalink nodes available!")
if not tracks:
return await ctx.send("❌ No tracks found!")
# Handle results
if isinstance(tracks, wavelink.Playlist):
added = player.queue.put(tracks)
await ctx.send(f"✅ Added playlist: **{tracks.name}** ({added} tracks)")
else:
track = tracks[0]
if player.playing:
player.queue.put(track)
await ctx.send(f"✅ Added to queue: **{track.title}**")
else:
await player.play(track)
await ctx.send(f"▶️ Now playing: **{track.title}**")
except wavelink.WavelinkException as e:
await ctx.send(f"❌ Wavelink error: {e}")
except Exception as e:
await ctx.send(f"❌ Unexpected error: {e}")@bot.event
async def on_wavelink_node_ready(payload: wavelink.NodeReadyEventPayload):
"""Handle node ready events."""
print(f"Node is ready! Session ID: {payload.session_id}")
print(f"Resumed previous session: {payload.resumed}")
@bot.event
async def on_wavelink_track_start(payload: wavelink.TrackStartEventPayload):
"""Handle track start events."""
player = payload.player
track = player.current
if track:
# Send now playing message to the last channel
guild = player.guild
if guild:
# Find a suitable channel to send the message
channel = discord.utils.get(guild.text_channels, name='music')
if not channel:
channel = guild.text_channels[0] # Fallback to first channel
embed = discord.Embed(
title="🎵 Now Playing",
description=f"**{track.title}**\nby {track.author}",
color=discord.Color.green()
)
if track.artwork:
embed.set_thumbnail(url=track.artwork)
duration = f"{track.length // 60000}:{(track.length // 1000) % 60:02d}"
embed.add_field(name="Duration", value=duration, inline=True)
embed.add_field(name="Source", value=track.source.name, inline=True)
embed.add_field(name="Queue", value=f"{player.queue.count} tracks", inline=True)
try:
await channel.send(embed=embed)
except discord.HTTPException:
pass # Ignore if we can't send messages
@bot.event
async def on_wavelink_track_end(payload: wavelink.TrackEndEventPayload):
"""Handle track end events."""
player = payload.player
# Auto-play next track if queue isn't empty
if not player.queue.is_empty:
try:
next_track = player.queue.get()
await player.play(next_track)
except wavelink.QueueEmpty:
pass # Queue became empty between check and get
elif player.autoplay != wavelink.AutoPlayMode.disabled:
# Let AutoPlay handle track recommendation
pass
else:
# Disconnect after inactivity timeout
await asyncio.sleep(300) # Wait 5 minutes
if not player.playing and player.queue.is_empty:
await player.disconnect()
@bot.event
async def on_wavelink_track_exception(payload: wavelink.TrackExceptionEventPayload):
"""Handle track exceptions."""
player = payload.player
exception = payload.exception
print(f"Track exception in guild {player.guild.id if player.guild else 'Unknown'}")
print(f"Exception: {exception}")
# Try to play next track if available
if not player.queue.is_empty:
try:
next_track = player.queue.get()
await player.play(next_track)
except wavelink.QueueEmpty:
pass
@bot.event
async def on_wavelink_track_stuck(payload: wavelink.TrackStuckEventPayload):
"""Handle stuck tracks."""
player = payload.player
threshold = payload.threshold_ms
print(f"Track stuck for {threshold}ms in guild {player.guild.id if player.guild else 'Unknown'}")
# Skip to next track
try:
await player.skip()
except Exception:
pass
@bot.event
async def on_wavelink_websocket_closed(payload: wavelink.WebsocketClosedEventPayload):
"""Handle WebSocket disconnections."""
player = payload.player
code = payload.code
reason = payload.reason
print(f"WebSocket closed for guild {player.guild.id if player.guild else 'Unknown'}")
print(f"Code: {code}, Reason: {reason}, By Remote: {payload.by_remote}")
# Handle specific close codes
if code == 4014: # Disconnected
# Voice channel was deleted or bot was disconnected
try:
await player.disconnect()
except Exception:
pass
@bot.event
async def on_wavelink_player_update(payload: wavelink.PlayerUpdateEventPayload):
"""Handle player state updates."""
player = payload.player
state = payload.state
# Update any player tracking/UI if needed
# This event fires frequently, so be careful with heavy operations
pass
@bot.event
async def on_wavelink_stats_update(payload: wavelink.StatsEventPayload):
"""Handle node statistics updates."""
stats = payload
print(f"Node Stats - Players: {stats.players}, Playing: {stats.playing_players}")
print(f"Memory: {stats.memory.used}MB used, CPU: {stats.cpu.lavalink_load:.2f}%")
# Monitor node health
if stats.memory.used > 1000: # Over 1GB memory usage
print("⚠️ High memory usage detected!")
if stats.cpu.lavalink_load > 80: # Over 80% CPU
print("⚠️ High CPU usage detected!")class RobustPlayer(wavelink.Player):
"""Enhanced player with automatic error recovery."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.reconnect_attempts = 0
self.max_reconnect_attempts = 3
async def handle_disconnect(self):
"""Handle unexpected disconnections with retry logic."""
if self.reconnect_attempts < self.max_reconnect_attempts:
self.reconnect_attempts += 1
try:
# Wait before reconnecting
await asyncio.sleep(5 * self.reconnect_attempts)
# Attempt to reconnect
if self.channel:
await self.connect(self.channel)
print(f"Successfully reconnected (attempt {self.reconnect_attempts})")
# Resume playback if there was a current track
if self.current and not self.playing:
await self.play(self.current, start=self.position)
except Exception as e:
print(f"Reconnection attempt {self.reconnect_attempts} failed: {e}")
if self.reconnect_attempts >= self.max_reconnect_attempts:
print("Max reconnection attempts reached, giving up")
else:
print("Already exceeded max reconnection attempts")
async def safe_play(self, track, **kwargs):
"""Play with automatic retry on failure."""
for attempt in range(3):
try:
await self.play(track, **kwargs)
return
except Exception as e:
print(f"Play attempt {attempt + 1} failed: {e}")
if attempt < 2:
await asyncio.sleep(1)
else:
raise
# Use the robust player
@bot.command()
async def connect_robust(ctx):
"""Connect with enhanced error recovery."""
if ctx.voice_client:
return await ctx.send("Already connected!")
if not ctx.author.voice:
return await ctx.send("You're not in a voice channel!")
try:
player = await ctx.author.voice.channel.connect(cls=RobustPlayer)
await ctx.send("✅ Connected with enhanced error recovery!")
except Exception as e:
await ctx.send(f"❌ Failed to connect: {e}")import logging
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO)
wavelink_logger = logging.getLogger('wavelink_events')
@bot.event
async def on_wavelink_track_start(payload):
"""Log track start events."""
player = payload.player
track = player.current
wavelink_logger.info(
f"Track started - Guild: {player.guild.id if player.guild else 'Unknown'}, "
f"Track: {track.title if track else 'Unknown'}, "
f"User Count: {len(player.channel.members) if player.channel else 0}"
)
@bot.event
async def on_wavelink_track_end(payload):
"""Log track end events with reason."""
player = payload.player
reason = payload.reason
wavelink_logger.info(
f"Track ended - Guild: {player.guild.id if player.guild else 'Unknown'}, "
f"Reason: {reason}, Queue: {player.queue.count} tracks"
)
@bot.event
async def on_wavelink_track_exception(payload):
"""Log track exceptions for debugging."""
player = payload.player
exception = payload.exception
wavelink_logger.error(
f"Track exception - Guild: {player.guild.id if player.guild else 'Unknown'}, "
f"Exception: {exception.get('message', 'Unknown')}, "
f"Severity: {exception.get('severity', 'Unknown')}"
)
# Command to view recent logs
@bot.command()
@commands.has_permissions(administrator=True)
async def view_logs(ctx, lines: int = 20):
"""View recent wavelink event logs."""
# In a production bot, you'd read from actual log files
await ctx.send(f"Check the console for the last {lines} wavelink events.")Install with Tessl CLI
npx tessl i tessl/pypi-wavelink