CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-wavelink

A robust and powerful, fully asynchronous Lavalink wrapper built for discord.py in Python.

Overview
Eval results
Files

events-exceptions.mddocs/

Events & Exceptions

Rich event system for track lifecycle, player state changes, and WebSocket events, plus comprehensive exception hierarchy for robust error handling. The event system enables responsive music bots that can react to playback changes, while the exception hierarchy provides detailed error information for debugging and user feedback.

Capabilities

Exception Hierarchy

Comprehensive exception system for handling various error conditions in wavelink operations.

# Base exception class
class WavelinkException(Exception):
    """
    Base wavelink exception class.
    
    All wavelink exceptions derive from this exception, making it easy
    to catch any wavelink-related error.
    """

# Node-related exceptions
class NodeException(WavelinkException):
    """
    Generic Node error with optional HTTP status code.
    
    Attributes:
    - status: int | None - HTTP status code if available
    """
    def __init__(self, msg: str | None = None, status: int | None = None):
        """
        Initialize NodeException.
        
        Parameters:
        - msg: Error message
        - status: HTTP status code
        """
    
    status: int | None

class InvalidClientException(WavelinkException):
    """
    Exception raised when an invalid discord.Client is provided
    while connecting a wavelink.Node.
    """

class AuthorizationFailedException(WavelinkException):
    """
    Exception raised when Lavalink fails to authenticate a Node
    with the provided password.
    """

class InvalidNodeException(WavelinkException):
    """
    Exception raised when a Node is tried to be retrieved from the
    Pool without existing, or the Pool is empty.
    """

# Lavalink server exceptions
class LavalinkException(WavelinkException):
    """
    Exception raised when Lavalink returns an invalid response.
    
    Attributes:
    - timestamp: int - Error timestamp
    - status: int - HTTP response status code
    - error: str - Error message from Lavalink
    - trace: str | None - Stack trace if available
    - path: str - Request path that caused the error
    """
    def __init__(self, msg: str | None = None, /, *, data: dict):
        """
        Initialize LavalinkException from error response data.
        
        Parameters:
        - msg: Custom error message
        - data: Error response data from Lavalink
        """
    
    timestamp: int
    status: int
    error: str
    trace: str | None
    path: str

class LavalinkLoadException(WavelinkException):
    """
    Exception raised when an error occurred loading tracks via Lavalink.
    
    Attributes:
    - error: str - Error message from Lavalink
    - severity: str - Error severity level
    - cause: str - Cause of the error
    """
    def __init__(self, msg: str | None = None, /, *, data: dict):
        """
        Initialize LavalinkLoadException from load error data.
        
        Parameters:
        - msg: Custom error message
        - data: Load error data from Lavalink
        """
    
    error: str
    severity: str
    cause: str

# Player-related exceptions
class InvalidChannelStateException(WavelinkException):
    """
    Exception raised when a Player tries to connect to an invalid channel
    or has invalid permissions to use this channel.
    """

class ChannelTimeoutException(WavelinkException):
    """
    Exception raised when connecting to a voice channel times out.
    """

# Queue-related exceptions
class QueueEmpty(WavelinkException):
    """
    Exception raised when you try to retrieve from an empty queue.
    """

# Cache-related exceptions
class CapacityZero(WavelinkException):
    """
    Exception raised when LFU cache has zero capacity.
    """

Event Payload Classes

Data structures containing information passed to event handlers for various wavelink events.

# Node events
class NodeReadyEventPayload:
    """Data structure for node ready events."""
    resumed: bool
    session_id: str

# Track events
class TrackStartEventPayload:
    """Data structure for track start events."""
    track: dict
    player: Player

class TrackEndEventPayload:
    """Data structure for track end events."""
    track: dict
    reason: str
    player: Player

class TrackExceptionEventPayload:
    """Data structure for track exception events."""
    track: dict
    exception: dict
    player: Player

class TrackStuckEventPayload:
    """Data structure for track stuck events."""
    track: dict
    threshold_ms: int
    player: Player

# WebSocket events
class WebsocketClosedEventPayload:
    """Data structure for WebSocket closed events."""
    code: int
    reason: str
    by_remote: bool
    player: Player

# Player events
class PlayerUpdateEventPayload:
    """Data structure for player update events."""
    state: dict
    player: Player

# Statistics events
class StatsEventMemory:
    """Memory statistics data."""
    free: int
    used: int
    allocated: int
    reservable: int

class StatsEventCPU:
    """CPU statistics data."""
    cores: int
    system_load: float
    lavalink_load: float

class StatsEventFrames:
    """Frame statistics data."""
    sent: int
    nulled: int
    deficit: int

class StatsEventPayload:
    """Node statistics event data."""
    players: int
    playing_players: int
    uptime: int
    memory: StatsEventMemory
    cpu: StatsEventCPU
    frame_stats: StatsEventFrames | None

# Response payloads
class StatsResponsePayload:
    """Statistics response from node."""
    players: int
    playing_players: int
    uptime: int
    memory: dict
    cpu: dict
    frame_stats: dict | None

class PlayerStatePayload:
    """Player state information."""
    time: int
    position: int
    connected: bool
    ping: int

class VoiceStatePayload:
    """Voice state information."""
    token: str
    endpoint: str
    session_id: str

class PlayerResponsePayload:
    """Player response information."""
    guild_id: str
    track: dict | None
    volume: int
    paused: bool
    state: PlayerStatePayload
    voice: VoiceStatePayload
    filters: dict

class GitResponsePayload:
    """Git information response."""
    branch: str
    commit: str
    commit_time: int

class VersionResponsePayload:
    """Version information response."""
    semver: str
    major: int
    minor: int
    patch: int
    pre_release: str | None
    build: str | None

class PluginResponsePayload:
    """Plugin information response."""
    name: str
    version: str

class InfoResponsePayload:
    """Node information response."""
    version: VersionResponsePayload
    build_time: int
    git: GitResponsePayload
    jvm: str
    lavaplayer: str
    source_managers: list[str]
    filters: list[str]
    plugins: list[PluginResponsePayload]

class ExtraEventPayload:
    """Extra event data for custom events."""
    data: dict

Event Handler Methods

Event handlers that can be implemented in your bot to respond to wavelink events.

# Event handler signatures for discord.py bots
async def on_wavelink_node_ready(payload: NodeReadyEventPayload) -> None:
    """Called when a node connects and is ready."""

async def on_wavelink_track_start(payload: TrackStartEventPayload) -> None:
    """Called when a track starts playing."""

async def on_wavelink_track_end(payload: TrackEndEventPayload) -> None:
    """Called when a track finishes playing."""

async def on_wavelink_track_exception(payload: TrackExceptionEventPayload) -> None:
    """Called when a track encounters an exception."""

async def on_wavelink_track_stuck(payload: TrackStuckEventPayload) -> None:
    """Called when a track gets stuck."""

async def on_wavelink_websocket_closed(payload: WebsocketClosedEventPayload) -> None:
    """Called when the WebSocket connection closes."""

async def on_wavelink_player_update(payload: PlayerUpdateEventPayload) -> None:
    """Called when player state updates."""

async def on_wavelink_stats_update(payload: StatsEventPayload) -> None:
    """Called when node statistics update."""

async def on_wavelink_extra_event(payload: ExtraEventPayload) -> None:
    """Called for custom events from Lavalink plugins."""

Usage Examples

Exception Handling

import wavelink
import discord
from discord.ext import commands

@bot.event
async def on_command_error(ctx, error):
    """Global error handler for wavelink exceptions."""
    if isinstance(error, commands.CommandInvokeError):
        error = error.original
    
    if isinstance(error, wavelink.WavelinkException):
        # Handle wavelink-specific errors
        if isinstance(error, wavelink.LavalinkLoadException):
            embed = discord.Embed(
                title="❌ Failed to Load Track",
                description=f"**Error**: {error.error}\n**Cause**: {error.cause}",
                color=discord.Color.red()
            )
            await ctx.send(embed=embed)
            
        elif isinstance(error, wavelink.QueueEmpty):
            await ctx.send("❌ The queue is empty!")
            
        elif isinstance(error, wavelink.InvalidChannelStateException):
            await ctx.send("❌ Cannot connect to that voice channel!")
            
        elif isinstance(error, wavelink.ChannelTimeoutException):
            await ctx.send("❌ Connection to voice channel timed out!")
            
        elif isinstance(error, wavelink.AuthorizationFailedException):
            await ctx.send("❌ Failed to authenticate with Lavalink server!")
            
        elif isinstance(error, wavelink.NodeException):
            embed = discord.Embed(
                title="❌ Node Error",
                description=f"Status Code: {error.status}" if error.status else "Unknown node error",
                color=discord.Color.red()
            )
            await ctx.send(embed=embed)
            
        else:
            # Generic wavelink error
            await ctx.send(f"❌ Wavelink error: {error}")
    else:
        # Handle non-wavelink errors
        await ctx.send(f"❌ An error occurred: {error}")

@bot.command()
async def safe_play(ctx, *, query: str):
    """Play command with comprehensive error handling."""
    try:
        # Ensure player exists
        if not ctx.voice_client:
            if not ctx.author.voice:
                return await ctx.send("❌ You're not in a voice channel!")
            
            try:
                player = await ctx.author.voice.channel.connect(cls=wavelink.Player)
            except wavelink.InvalidChannelStateException:
                return await ctx.send("❌ I don't have permission to connect to that channel!")
            except wavelink.ChannelTimeoutException:
                return await ctx.send("❌ Connection timed out!")
        else:
            player = ctx.voice_client
        
        # Search for tracks
        try:
            tracks = await wavelink.Pool.fetch_tracks(query)
        except wavelink.LavalinkLoadException as e:
            return await ctx.send(f"❌ Search failed: {e.error}")
        except wavelink.InvalidNodeException:
            return await ctx.send("❌ No Lavalink nodes available!")
        
        if not tracks:
            return await ctx.send("❌ No tracks found!")
        
        # Handle results
        if isinstance(tracks, wavelink.Playlist):
            added = player.queue.put(tracks)
            await ctx.send(f"✅ Added playlist: **{tracks.name}** ({added} tracks)")
        else:
            track = tracks[0]
            if player.playing:
                player.queue.put(track)
                await ctx.send(f"✅ Added to queue: **{track.title}**")
            else:
                await player.play(track)
                await ctx.send(f"▶️ Now playing: **{track.title}**")
                
    except wavelink.WavelinkException as e:
        await ctx.send(f"❌ Wavelink error: {e}")
    except Exception as e:
        await ctx.send(f"❌ Unexpected error: {e}")

Event Handling

@bot.event
async def on_wavelink_node_ready(payload: wavelink.NodeReadyEventPayload):
    """Handle node ready events."""
    print(f"Node is ready! Session ID: {payload.session_id}")
    print(f"Resumed previous session: {payload.resumed}")

@bot.event
async def on_wavelink_track_start(payload: wavelink.TrackStartEventPayload):
    """Handle track start events."""
    player = payload.player
    track = player.current
    
    if track:
        # Send now playing message to the last channel
        guild = player.guild
        if guild:
            # Find a suitable channel to send the message
            channel = discord.utils.get(guild.text_channels, name='music')
            if not channel:
                channel = guild.text_channels[0]  # Fallback to first channel
            
            embed = discord.Embed(
                title="🎵 Now Playing",
                description=f"**{track.title}**\nby {track.author}",
                color=discord.Color.green()
            )
            
            if track.artwork:
                embed.set_thumbnail(url=track.artwork)
            
            duration = f"{track.length // 60000}:{(track.length // 1000) % 60:02d}"
            embed.add_field(name="Duration", value=duration, inline=True)
            embed.add_field(name="Source", value=track.source.name, inline=True)
            embed.add_field(name="Queue", value=f"{player.queue.count} tracks", inline=True)
            
            try:
                await channel.send(embed=embed)
            except discord.HTTPException:
                pass  # Ignore if we can't send messages

@bot.event
async def on_wavelink_track_end(payload: wavelink.TrackEndEventPayload):
    """Handle track end events."""
    player = payload.player
    
    # Auto-play next track if queue isn't empty
    if not player.queue.is_empty:
        try:
            next_track = player.queue.get()
            await player.play(next_track)
        except wavelink.QueueEmpty:
            pass  # Queue became empty between check and get
    elif player.autoplay != wavelink.AutoPlayMode.disabled:
        # Let AutoPlay handle track recommendation
        pass
    else:
        # Disconnect after inactivity timeout
        await asyncio.sleep(300)  # Wait 5 minutes
        if not player.playing and player.queue.is_empty:
            await player.disconnect()

@bot.event
async def on_wavelink_track_exception(payload: wavelink.TrackExceptionEventPayload):
    """Handle track exceptions."""
    player = payload.player
    exception = payload.exception
    
    print(f"Track exception in guild {player.guild.id if player.guild else 'Unknown'}")
    print(f"Exception: {exception}")
    
    # Try to play next track if available
    if not player.queue.is_empty:
        try:
            next_track = player.queue.get()
            await player.play(next_track)
        except wavelink.QueueEmpty:
            pass

@bot.event
async def on_wavelink_track_stuck(payload: wavelink.TrackStuckEventPayload):
    """Handle stuck tracks."""
    player = payload.player
    threshold = payload.threshold_ms
    
    print(f"Track stuck for {threshold}ms in guild {player.guild.id if player.guild else 'Unknown'}")
    
    # Skip to next track
    try:
        await player.skip()
    except Exception:
        pass

@bot.event
async def on_wavelink_websocket_closed(payload: wavelink.WebsocketClosedEventPayload):
    """Handle WebSocket disconnections."""
    player = payload.player
    code = payload.code
    reason = payload.reason
    
    print(f"WebSocket closed for guild {player.guild.id if player.guild else 'Unknown'}")
    print(f"Code: {code}, Reason: {reason}, By Remote: {payload.by_remote}")
    
    # Handle specific close codes
    if code == 4014:  # Disconnected
        # Voice channel was deleted or bot was disconnected
        try:
            await player.disconnect()
        except Exception:
            pass

@bot.event
async def on_wavelink_player_update(payload: wavelink.PlayerUpdateEventPayload):
    """Handle player state updates."""
    player = payload.player
    state = payload.state
    
    # Update any player tracking/UI if needed
    # This event fires frequently, so be careful with heavy operations
    pass

@bot.event
async def on_wavelink_stats_update(payload: wavelink.StatsEventPayload):
    """Handle node statistics updates."""
    stats = payload
    
    print(f"Node Stats - Players: {stats.players}, Playing: {stats.playing_players}")
    print(f"Memory: {stats.memory.used}MB used, CPU: {stats.cpu.lavalink_load:.2f}%")
    
    # Monitor node health
    if stats.memory.used > 1000:  # Over 1GB memory usage
        print("⚠️ High memory usage detected!")
    
    if stats.cpu.lavalink_load > 80:  # Over 80% CPU
        print("⚠️ High CPU usage detected!")

Advanced Error Recovery

class RobustPlayer(wavelink.Player):
    """Enhanced player with automatic error recovery."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.reconnect_attempts = 0
        self.max_reconnect_attempts = 3
    
    async def handle_disconnect(self):
        """Handle unexpected disconnections with retry logic."""
        if self.reconnect_attempts < self.max_reconnect_attempts:
            self.reconnect_attempts += 1
            
            try:
                # Wait before reconnecting
                await asyncio.sleep(5 * self.reconnect_attempts)
                
                # Attempt to reconnect
                if self.channel:
                    await self.connect(self.channel)
                    print(f"Successfully reconnected (attempt {self.reconnect_attempts})")
                    
                    # Resume playback if there was a current track
                    if self.current and not self.playing:
                        await self.play(self.current, start=self.position)
                        
            except Exception as e:
                print(f"Reconnection attempt {self.reconnect_attempts} failed: {e}")
                
                if self.reconnect_attempts >= self.max_reconnect_attempts:
                    print("Max reconnection attempts reached, giving up")
        else:
            print("Already exceeded max reconnection attempts")
    
    async def safe_play(self, track, **kwargs):
        """Play with automatic retry on failure."""
        for attempt in range(3):
            try:
                await self.play(track, **kwargs)
                return
            except Exception as e:
                print(f"Play attempt {attempt + 1} failed: {e}")
                if attempt < 2:
                    await asyncio.sleep(1)
                else:
                    raise

# Use the robust player
@bot.command()
async def connect_robust(ctx):
    """Connect with enhanced error recovery."""
    if ctx.voice_client:
        return await ctx.send("Already connected!")
    
    if not ctx.author.voice:
        return await ctx.send("You're not in a voice channel!")
    
    try:
        player = await ctx.author.voice.channel.connect(cls=RobustPlayer)
        await ctx.send("✅ Connected with enhanced error recovery!")
    except Exception as e:
        await ctx.send(f"❌ Failed to connect: {e}")

Custom Event Logging

import logging
from datetime import datetime

# Configure logging
logging.basicConfig(level=logging.INFO)
wavelink_logger = logging.getLogger('wavelink_events')

@bot.event
async def on_wavelink_track_start(payload):
    """Log track start events."""
    player = payload.player
    track = player.current
    
    wavelink_logger.info(
        f"Track started - Guild: {player.guild.id if player.guild else 'Unknown'}, "
        f"Track: {track.title if track else 'Unknown'}, "
        f"User Count: {len(player.channel.members) if player.channel else 0}"
    )

@bot.event  
async def on_wavelink_track_end(payload):
    """Log track end events with reason."""
    player = payload.player
    reason = payload.reason
    
    wavelink_logger.info(
        f"Track ended - Guild: {player.guild.id if player.guild else 'Unknown'}, "
        f"Reason: {reason}, Queue: {player.queue.count} tracks"
    )

@bot.event
async def on_wavelink_track_exception(payload):
    """Log track exceptions for debugging."""
    player = payload.player
    exception = payload.exception
    
    wavelink_logger.error(
        f"Track exception - Guild: {player.guild.id if player.guild else 'Unknown'}, "
        f"Exception: {exception.get('message', 'Unknown')}, "
        f"Severity: {exception.get('severity', 'Unknown')}"
    )

# Command to view recent logs
@bot.command()
@commands.has_permissions(administrator=True)
async def view_logs(ctx, lines: int = 20):
    """View recent wavelink event logs."""
    # In a production bot, you'd read from actual log files
    await ctx.send(f"Check the console for the last {lines} wavelink events.")

Install with Tessl CLI

npx tessl i tessl/pypi-wavelink

docs

audio-filters.md

events-exceptions.md

index.md

node-management.md

player-control.md

queue-system.md

track-search.md

tile.json