CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nextcord

A Python wrapper for the Discord API forked from discord.py

Pending
Overview
Eval results
Files

client.mddocs/

Nextcord Client and Connection Management

Overview

The nextcord library provides robust client classes for creating Discord bots with sophisticated connection management, event handling, and sharding capabilities. This documentation covers the core client classes and their connection management features.

Client Class

The main bot client for Discord interactions, providing lifecycle management, event handling, and Discord API operations.

Basic Client Setup { .api }

import nextcord
from nextcord import Intents

# Basic client initialization
intents = Intents.default()
intents.message_content = True  # Required for message content access

client = nextcord.Client(intents=intents)

@client.event
async def on_ready():
    print(f'{client.user} has connected to Discord!')

# Run the client
client.run('your_bot_token')

Client Constructor { .api }

class Client:
    def __init__(
        self,
        *,
        max_messages: Optional[int] = 1000,
        connector: Optional[aiohttp.BaseConnector] = None,
        proxy: Optional[str] = None,
        proxy_auth: Optional[aiohttp.BasicAuth] = None,
        shard_id: Optional[int] = None,
        shard_count: Optional[int] = None,
        application_id: Optional[int] = None,
        intents: Intents = Intents.default(),
        member_cache_flags: MemberCacheFlags = MISSING,
        chunk_guilds_at_startup: bool = MISSING,
        status: Optional[Status] = None,
        activity: Optional[BaseActivity] = None,
        allowed_mentions: Optional[AllowedMentions] = None,
        heartbeat_timeout: float = 60.0,
        guild_ready_timeout: float = 2.0,
        assume_unsync_clock: bool = True,
        enable_debug_events: bool = False,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        lazy_load_commands: bool = True,
        rollout_associate_known: bool = True,
        rollout_delete_unknown: bool = True,
        rollout_register_new: bool = True,
        rollout_update_known: bool = True,
        rollout_all_guilds: bool = False,
        default_guild_ids: Optional[List[int]] = None,
    ) -> None:

Parameters:

  • max_messages: Maximum number of messages to store in cache (default: 1000, None disables)
  • intents: Gateway intents controlling which events the bot receives
  • heartbeat_timeout: Maximum seconds before timing out heartbeat (default: 60.0)
  • guild_ready_timeout: Maximum seconds to wait for GUILD_CREATE stream (default: 2.0)
  • status: Initial status upon login (online, idle, dnd, invisible)
  • activity: Initial activity/presence upon login
  • allowed_mentions: Global mention configuration for all messages
  • enable_debug_events: Enable debug events like socket receive/send
  • shard_id/shard_count: Manual sharding configuration (use AutoShardedClient instead)

Lifecycle Methods

Authentication and Connection { .api }

# Login with bot token
async def login(self, token: str) -> None:
    """Logs in the client with the specified credentials."""

# Connect to Discord gateway
async def connect(self, *, reconnect: bool = True) -> None:
    """Creates websocket connection and handles events."""

# Combined login + connect
async def start(self, token: str, *, reconnect: bool = True) -> None:
    """Shorthand for login() followed by connect()."""

# Blocking run method
def run(self, token: str, *, reconnect: bool = True) -> None:
    """Blocking call that handles the event loop automatically."""

Connection Management { .api }

# Close connection
async def close(self) -> None:
    """Closes the connection to Discord."""

# Clear internal state for reconnection
def clear(self) -> None:
    """Clears internal state - bot can be considered 're-opened'."""

# Check connection status
def is_closed(self) -> bool:
    """Returns True if the websocket connection is closed."""

def is_ready(self) -> bool:
    """Returns True if the client's internal cache is ready."""

# Wait for ready state
async def wait_until_ready(self) -> None:
    """Waits until the client's internal cache is ready."""

Connection State Properties { .api }

@property
def latency(self) -> float:
    """Latency between HEARTBEAT and HEARTBEAT_ACK in seconds."""

@property
def user(self) -> Optional[ClientUser]:
    """The connected client user. None if not logged in."""

@property
def guilds(self) -> List[Guild]:
    """Guilds the connected client is a member of."""

@property
def voice_clients(self) -> List[VoiceProtocol]:
    """List of voice connections (usually VoiceClient instances)."""

def is_ws_ratelimited(self) -> bool:
    """Whether the websocket is currently rate limited."""

Data Fetching Methods { .api }

Core methods for fetching Discord objects by ID, bypassing local cache for fresh data from Discord's API.

async def fetch_user(self, user_id: int, /) -> User:
    """
    Retrieves a User based on their ID from Discord's API.
    
    Parameters:
    - user_id: The ID of the user to fetch
    
    Returns:
    User: The user object
    
    Raises:
    NotFound: User not found
    HTTPException: Fetching failed
    """

async def fetch_guild(self, guild_id: int, /, *, with_counts: bool = True) -> Guild:
    """
    Retrieves a Guild by ID from Discord's API.
    
    Parameters:
    - guild_id: The ID of the guild to fetch
    - with_counts: Whether to include approximate member/presence counts
    
    Returns:
    Guild: The guild object
    """

async def fetch_channel(self, channel_id: int, /) -> Union[GuildChannel, PrivateChannel, Thread]:
    """
    Retrieves a Channel by ID from Discord's API.
    
    Parameters:
    - channel_id: The ID of the channel to fetch
    
    Returns:
    Union[GuildChannel, PrivateChannel, Thread]: The channel object
    """

async def fetch_guild_preview(self, guild_id: int, /) -> GuildPreview:
    """
    Retrieves a GuildPreview for a public guild.
    
    Parameters:
    - guild_id: The ID of the guild to preview
    
    Returns:
    GuildPreview: The guild preview
    """

Application Command Management { .api }

Methods for managing and syncing application commands across Discord.

async def sync_all_application_commands(
    self,
    data: Optional[Dict[Optional[int], List[ApplicationCommandPayload]]] = None,
    *,
    use_rollout: bool = True,
    associate_known: bool = True,
    delete_unknown: bool = True,
) -> None:
    """
    Syncs all application commands with Discord.
    
    Parameters:
    - data: Optional command data to sync
    - use_rollout: Whether to use command rollout features
    - associate_known: Whether to associate known commands
    - delete_unknown: Whether to delete unknown commands from Discord
    """

async def deploy_application_commands(
    self,
    *,
    guild_id: Optional[int] = None,
    use_rollout: bool = True,
) -> None:
    """
    Deploys application commands to Discord.
    
    Parameters:
    - guild_id: Optional guild ID to deploy to (None for global)
    - use_rollout: Whether to use rollout features
    """

Event System

Event Registration { .api }

# Decorator for event registration
def event(self, coro: Callable) -> Callable:
    """Decorator to register an event listener."""

@client.event
async def on_message(message):
    if message.author == client.user:
        return
    if message.content.startswith('!hello'):
        await message.channel.send('Hello!')

# Wait for specific events
async def wait_for(
    self,
    event: str,
    *,
    check: Optional[Callable[..., bool]] = None,
    timeout: Optional[float] = None,
) -> Any:
    """Waits for a WebSocket event to be dispatched."""

# Example: Wait for user response
@client.event
async def on_message(message):
    if message.content == '!question':
        await message.channel.send('What is your favorite color?')
        
        def check(m):
            return m.author == message.author and m.channel == message.channel
            
        try:
            response = await client.wait_for('message', check=check, timeout=30.0)
            await message.channel.send(f'Your favorite color is {response.content}!')
        except asyncio.TimeoutError:
            await message.channel.send('You took too long to respond!')

Event Dispatch { .api }

def dispatch(self, event: str, *args: Any, **kwargs: Any) -> None:
    """Dispatches an event to registered listeners."""

# Error handling
async def on_error(self, event_method: str, *args: Any, **kwargs: Any) -> None:
    """Default error handler - override for custom error handling."""

AutoShardedClient

Automatically manages multiple shards for large bots (1000+ guilds).

AutoShardedClient Setup { .api }

import nextcord
from nextcord import AutoShardedClient, Intents

intents = Intents.default()
intents.message_content = True

# Automatic sharding - library determines shard count
client = AutoShardedClient(intents=intents)

# Manual shard configuration
client = AutoShardedClient(
    intents=intents,
    shard_count=4,
    shard_ids=[0, 1]  # Only run shards 0 and 1
)

@client.event
async def on_ready():
    print(f'Logged in as {client.user}')
    print(f'Shard count: {len(client.shards)}')
    for shard_id, shard in client.shards.items():
        print(f'Shard {shard_id}: {shard.latency:.2f}s latency')

client.run('your_bot_token')

AutoShardedClient Constructor { .api }

class AutoShardedClient(Client):
    def __init__(
        self,
        *,
        shard_ids: Optional[List[int]] = None,
        shard_count: Optional[int] = None,
        # ... all Client parameters ...
    ) -> None:

Additional Parameters:

  • shard_ids: Specific shard IDs to launch (requires shard_count)
  • shard_count: Total number of shards (auto-detected if None)

Shard Management { .api }

# Shard information access
@property
def shards(self) -> Dict[int, ShardInfo]:
    """Mapping of shard IDs to ShardInfo objects."""

def get_shard(self, shard_id: int) -> Optional[ShardInfo]:
    """Gets shard information for a specific shard ID."""

@property
def latency(self) -> float:
    """Average latency across all shards."""

@property
def latencies(self) -> List[Tuple[int, float]]:
    """List of (shard_id, latency) tuples for all shards."""

# Per-shard presence changes
async def change_presence(
    self,
    *,
    activity: Optional[BaseActivity] = None,
    status: Optional[Status] = None,
    shard_id: Optional[int] = None,  # None = all shards
) -> None:
    """Changes presence for specific shard or all shards."""

ShardInfo Class { .api }

class ShardInfo:
    """Information and control over a specific shard."""
    
    @property
    def id(self) -> int:
        """The shard ID."""
    
    @property
    def shard_count(self) -> Optional[int]:
        """Total shard count for this cluster."""
    
    @property
    def latency(self) -> float:
        """Latency for this specific shard."""
    
    def is_closed(self) -> bool:
        """Whether the shard connection is closed."""
    
    def is_ws_ratelimited(self) -> bool:
        """Whether this shard's websocket is rate limited."""
    
    async def disconnect(self) -> None:
        """Disconnects this shard."""
    
    async def reconnect(self) -> None:
        """Disconnects and reconnects this shard."""
    
    async def connect(self) -> None:
        """Connects this shard if disconnected."""

Connection Management

Gateway Connection { .api }

The client uses WebSocket connections to Discord's gateway for real-time events.

# Connection properties
@property
def ws(self) -> Optional[DiscordWebSocket]:
    """Current websocket gateway connection."""

# Rate limiting
def is_ws_ratelimited(self) -> bool:
    """Check if websocket is rate limited."""

# Reconnection handling
async def connect(self, *, reconnect: bool = True) -> None:
    """
    Connect with automatic reconnection handling.
    
    Parameters:
    - reconnect: Whether to attempt reconnection on failure
    """

Connection Events { .api }

# Connection lifecycle events
@client.event
async def on_connect():
    """Called when client connects to Discord."""

@client.event  
async def on_ready():
    """Called when client is ready to receive events."""

@client.event
async def on_disconnect():
    """Called when client disconnects from Discord."""

@client.event
async def on_resume():
    """Called when websocket resumes a previous connection."""

# Shard-specific events (AutoShardedClient only)
@client.event
async def on_shard_connect(shard_id: int):
    """Called when a shard connects."""

@client.event
async def on_shard_ready(shard_id: int):
    """Called when a shard is ready."""

@client.event
async def on_shard_disconnect(shard_id: int):
    """Called when a shard disconnects."""

@client.event
async def on_shard_resume(shard_id: int):
    """Called when a shard resumes."""

Configuration Options

Intents Configuration { .api }

Intents control which gateway events your bot receives. Configure them based on your bot's needs.

from nextcord import Intents

# Default intents (recommended starting point)
intents = Intents.default()  # Excludes privileged intents

# All intents (requires privileged intents approval)
intents = Intents.all()

# No intents
intents = Intents.none()

# Custom intent configuration
intents = Intents.default()
intents.members = True        # Privileged - member events
intents.presences = True      # Privileged - presence updates  
intents.message_content = True # Privileged - message content access
intents.guilds = True         # Guild events (recommended)
intents.guild_messages = True # Guild message events
intents.dm_messages = True    # DM message events
intents.guild_reactions = True # Reaction events
intents.voice_states = True   # Required for voice features

Intent Flags Reference { .api }

class Intents:
    # Basic intents
    guilds: bool                    # Guild events (join, leave, updates)
    guild_messages: bool            # Guild message events  
    dm_messages: bool              # Direct message events
    guild_reactions: bool          # Guild reaction events
    dm_reactions: bool             # DM reaction events
    guild_typing: bool             # Guild typing events
    dm_typing: bool                # DM typing events
    
    # Privileged intents (require approval for verified bots)
    members: bool                  # Member join/leave/update events
    presences: bool                # Member presence/status updates  
    message_content: bool          # Access to message content
    
    # Other intents
    moderation: bool               # Moderation events (bans, audit logs)
    emojis_and_stickers: bool      # Emoji and sticker events
    integrations: bool             # Integration events
    webhooks: bool                 # Webhook events
    invites: bool                  # Invite create/delete events
    voice_states: bool             # Voice state updates (required for voice)
    guild_scheduled_events: bool   # Scheduled event updates
    auto_moderation_configuration: bool  # Auto-mod rule changes
    auto_moderation_execution: bool      # Auto-mod actions
    
    # Convenience properties
    messages: bool                 # Both guild_messages and dm_messages
    reactions: bool                # Both guild_reactions and dm_reactions  
    typing: bool                   # Both guild_typing and dm_typing
    auto_moderation: bool          # Both auto_moderation intents

Presence Configuration { .api }

from nextcord import Status, Activity, ActivityType

# Status configuration
client = nextcord.Client(
    status=Status.idle,  # online, idle, dnd, invisible
    activity=Activity(type=ActivityType.playing, name="with the Discord API")
)

# Change presence after connection
await client.change_presence(
    status=Status.dnd,
    activity=Activity(type=ActivityType.listening, name="user commands")
)

# Activity types
ActivityType.playing     # "Playing {name}"
ActivityType.listening   # "Listening to {name}" 
ActivityType.watching    # "Watching {name}"
ActivityType.competing   # "Competing in {name}"
ActivityType.streaming   # "Streaming {name}" (requires url)
ActivityType.custom      # Custom status

Message Cache Configuration { .api }

# Configure message caching
client = nextcord.Client(
    max_messages=5000,     # Cache up to 5000 messages
    # max_messages=None,   # Disable message caching
    # max_messages=0,      # No message caching
)

# Access cached messages
@client.event
async def on_ready():
    print(f"Cached {len(client.cached_messages)} messages")
    for message in client.cached_messages:
        print(f"{message.author}: {message.content}")

Advanced Configuration { .api }

import aiohttp

# Advanced client configuration
connector = aiohttp.TCPConnector(limit=100)

client = nextcord.Client(
    connector=connector,                    # Custom aiohttp connector
    proxy="http://proxy.example.com:8080", # HTTP proxy
    proxy_auth=aiohttp.BasicAuth("user", "pass"),  # Proxy authentication
    heartbeat_timeout=90.0,                 # Heartbeat timeout (seconds)
    guild_ready_timeout=5.0,                # Guild ready timeout (seconds)
    assume_unsync_clock=False,              # Use system clock for rate limits
    enable_debug_events=True,               # Enable socket debug events
    chunk_guilds_at_startup=False,          # Don't chunk members at startup
)

# Member cache configuration  
from nextcord import MemberCacheFlags

member_cache = MemberCacheFlags.all()
member_cache.joined = False  # Don't cache join times

client = nextcord.Client(
    intents=intents,
    member_cache_flags=member_cache
)

Connection Hooks

Before Identify Hook { .api }

# Customize identify behavior (useful for large bots)
async def before_identify_hook(self, shard_id: Optional[int], *, initial: bool = False) -> None:
    """
    Called before IDENTIFY for each shard.
    
    Parameters:
    - shard_id: The shard ID being identified
    - initial: Whether this is the first IDENTIFY
    """
    if not initial:
        await asyncio.sleep(5.0)  # Default: wait 5 seconds between identifies
    
    # Custom logic for identify timing
    if shard_id is not None and shard_id > 0:
        await asyncio.sleep(shard_id * 2)  # Staggered identification

Error Handling

Connection Errors { .api }

from nextcord import ConnectionClosed, GatewayNotFound, PrivilegedIntentsRequired

try:
    await client.start(token)
except ConnectionClosed as e:
    print(f"Connection closed: {e.code} - {e.reason}")
except GatewayNotFound:
    print("Could not find Discord gateway (API outage?)")
except PrivilegedIntentsRequired as e:
    print(f"Privileged intents required for shard {e.shard_id}")

# Custom error handling
@client.event  
async def on_error(event, *args, **kwargs):
    import traceback
    print(f'Error in {event}:')
    traceback.print_exc()

Best Practices

Production Setup { .api }

import logging
import nextcord
from nextcord import Intents

# Configure logging
logging.basicConfig(level=logging.INFO)

# Production client setup
intents = Intents.default()
intents.message_content = True  # Only if needed

client = nextcord.Client(
    intents=intents,
    max_messages=1000,              # Reasonable message cache
    heartbeat_timeout=60.0,         # Default heartbeat timeout
    chunk_guilds_at_startup=False,  # Faster startup for large bots
)

@client.event
async def on_ready():
    logging.info(f'{client.user} is ready!')
    logging.info(f'Latency: {client.latency:.2f}s')
    logging.info(f'Guilds: {len(client.guilds)}')

@client.event
async def on_error(event, *args, **kwargs):
    logging.exception(f'Error in event {event}')

# Graceful shutdown
import signal

def signal_handler(signum, frame):
    logging.info('Received shutdown signal')
    client.loop.create_task(client.close())

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

client.run(token)

Large Bot Configuration { .api }

# For bots in 1000+ guilds
from nextcord import AutoShardedClient, Intents

intents = Intents.default()
intents.members = False          # Disable if not needed (reduces memory)
intents.presences = False        # Disable if not needed (reduces memory)
intents.message_content = True   # Only if needed

client = AutoShardedClient(
    intents=intents,
    max_messages=500,              # Lower message cache per shard
    chunk_guilds_at_startup=False, # Essential for large bots
    member_cache_flags=MemberCacheFlags.none(),  # Minimize memory usage
)

@client.event
async def on_ready():
    total_guilds = len(client.guilds)
    shard_count = len(client.shards)
    avg_latency = client.latency
    
    print(f'Bot ready: {total_guilds} guilds across {shard_count} shards')
    print(f'Average latency: {avg_latency:.2f}s')
    
    # Log per-shard info
    for shard_id, shard in client.shards.items():
        guilds_on_shard = len([g for g in client.guilds if g.shard_id == shard_id])
        print(f'Shard {shard_id}: {guilds_on_shard} guilds, {shard.latency:.2f}s latency')

client.run(token)

This comprehensive guide covers all aspects of nextcord's client and connection management capabilities, providing both basic usage examples and advanced configuration options for production deployments.

Install with Tessl CLI

npx tessl i tessl/pypi-nextcord

docs

application-commands.md

channels.md

client.md

commands.md

errors.md

events.md

guild.md

index.md

messages.md

permissions.md

tasks.md

ui.md

users.md

utilities.md

voice.md

webhooks.md

tile.json