A Python wrapper for the Discord API forked from discord.py
—
The nextcord library provides robust client classes for creating Discord bots with sophisticated connection management, event handling, and sharding capabilities. This documentation covers the core client classes and their connection management features.
The main bot client for Discord interactions, providing lifecycle management, event handling, and Discord API operations.
import nextcord
from nextcord import Intents
# Basic client initialization
intents = Intents.default()
intents.message_content = True # Required for message content access
client = nextcord.Client(intents=intents)
@client.event
async def on_ready():
print(f'{client.user} has connected to Discord!')
# Run the client
client.run('your_bot_token')class Client:
def __init__(
self,
*,
max_messages: Optional[int] = 1000,
connector: Optional[aiohttp.BaseConnector] = None,
proxy: Optional[str] = None,
proxy_auth: Optional[aiohttp.BasicAuth] = None,
shard_id: Optional[int] = None,
shard_count: Optional[int] = None,
application_id: Optional[int] = None,
intents: Intents = Intents.default(),
member_cache_flags: MemberCacheFlags = MISSING,
chunk_guilds_at_startup: bool = MISSING,
status: Optional[Status] = None,
activity: Optional[BaseActivity] = None,
allowed_mentions: Optional[AllowedMentions] = None,
heartbeat_timeout: float = 60.0,
guild_ready_timeout: float = 2.0,
assume_unsync_clock: bool = True,
enable_debug_events: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None,
lazy_load_commands: bool = True,
rollout_associate_known: bool = True,
rollout_delete_unknown: bool = True,
rollout_register_new: bool = True,
rollout_update_known: bool = True,
rollout_all_guilds: bool = False,
default_guild_ids: Optional[List[int]] = None,
) -> None:Parameters:
max_messages: Maximum number of messages to store in cache (default: 1000, None disables)intents: Gateway intents controlling which events the bot receivesheartbeat_timeout: Maximum seconds before timing out heartbeat (default: 60.0)guild_ready_timeout: Maximum seconds to wait for GUILD_CREATE stream (default: 2.0)status: Initial status upon login (online, idle, dnd, invisible)activity: Initial activity/presence upon loginallowed_mentions: Global mention configuration for all messagesenable_debug_events: Enable debug events like socket receive/sendshard_id/shard_count: Manual sharding configuration (use AutoShardedClient instead)# Login with bot token
async def login(self, token: str) -> None:
"""Logs in the client with the specified credentials."""
# Connect to Discord gateway
async def connect(self, *, reconnect: bool = True) -> None:
"""Creates websocket connection and handles events."""
# Combined login + connect
async def start(self, token: str, *, reconnect: bool = True) -> None:
"""Shorthand for login() followed by connect()."""
# Blocking run method
def run(self, token: str, *, reconnect: bool = True) -> None:
"""Blocking call that handles the event loop automatically."""# Close connection
async def close(self) -> None:
"""Closes the connection to Discord."""
# Clear internal state for reconnection
def clear(self) -> None:
"""Clears internal state - bot can be considered 're-opened'."""
# Check connection status
def is_closed(self) -> bool:
"""Returns True if the websocket connection is closed."""
def is_ready(self) -> bool:
"""Returns True if the client's internal cache is ready."""
# Wait for ready state
async def wait_until_ready(self) -> None:
"""Waits until the client's internal cache is ready."""@property
def latency(self) -> float:
"""Latency between HEARTBEAT and HEARTBEAT_ACK in seconds."""
@property
def user(self) -> Optional[ClientUser]:
"""The connected client user. None if not logged in."""
@property
def guilds(self) -> List[Guild]:
"""Guilds the connected client is a member of."""
@property
def voice_clients(self) -> List[VoiceProtocol]:
"""List of voice connections (usually VoiceClient instances)."""
def is_ws_ratelimited(self) -> bool:
"""Whether the websocket is currently rate limited."""Core methods for fetching Discord objects by ID, bypassing local cache for fresh data from Discord's API.
async def fetch_user(self, user_id: int, /) -> User:
"""
Retrieves a User based on their ID from Discord's API.
Parameters:
- user_id: The ID of the user to fetch
Returns:
User: The user object
Raises:
NotFound: User not found
HTTPException: Fetching failed
"""
async def fetch_guild(self, guild_id: int, /, *, with_counts: bool = True) -> Guild:
"""
Retrieves a Guild by ID from Discord's API.
Parameters:
- guild_id: The ID of the guild to fetch
- with_counts: Whether to include approximate member/presence counts
Returns:
Guild: The guild object
"""
async def fetch_channel(self, channel_id: int, /) -> Union[GuildChannel, PrivateChannel, Thread]:
"""
Retrieves a Channel by ID from Discord's API.
Parameters:
- channel_id: The ID of the channel to fetch
Returns:
Union[GuildChannel, PrivateChannel, Thread]: The channel object
"""
async def fetch_guild_preview(self, guild_id: int, /) -> GuildPreview:
"""
Retrieves a GuildPreview for a public guild.
Parameters:
- guild_id: The ID of the guild to preview
Returns:
GuildPreview: The guild preview
"""Methods for managing and syncing application commands across Discord.
async def sync_all_application_commands(
self,
data: Optional[Dict[Optional[int], List[ApplicationCommandPayload]]] = None,
*,
use_rollout: bool = True,
associate_known: bool = True,
delete_unknown: bool = True,
) -> None:
"""
Syncs all application commands with Discord.
Parameters:
- data: Optional command data to sync
- use_rollout: Whether to use command rollout features
- associate_known: Whether to associate known commands
- delete_unknown: Whether to delete unknown commands from Discord
"""
async def deploy_application_commands(
self,
*,
guild_id: Optional[int] = None,
use_rollout: bool = True,
) -> None:
"""
Deploys application commands to Discord.
Parameters:
- guild_id: Optional guild ID to deploy to (None for global)
- use_rollout: Whether to use rollout features
"""# Decorator for event registration
def event(self, coro: Callable) -> Callable:
"""Decorator to register an event listener."""
@client.event
async def on_message(message):
if message.author == client.user:
return
if message.content.startswith('!hello'):
await message.channel.send('Hello!')
# Wait for specific events
async def wait_for(
self,
event: str,
*,
check: Optional[Callable[..., bool]] = None,
timeout: Optional[float] = None,
) -> Any:
"""Waits for a WebSocket event to be dispatched."""
# Example: Wait for user response
@client.event
async def on_message(message):
if message.content == '!question':
await message.channel.send('What is your favorite color?')
def check(m):
return m.author == message.author and m.channel == message.channel
try:
response = await client.wait_for('message', check=check, timeout=30.0)
await message.channel.send(f'Your favorite color is {response.content}!')
except asyncio.TimeoutError:
await message.channel.send('You took too long to respond!')def dispatch(self, event: str, *args: Any, **kwargs: Any) -> None:
"""Dispatches an event to registered listeners."""
# Error handling
async def on_error(self, event_method: str, *args: Any, **kwargs: Any) -> None:
"""Default error handler - override for custom error handling."""Automatically manages multiple shards for large bots (1000+ guilds).
import nextcord
from nextcord import AutoShardedClient, Intents
intents = Intents.default()
intents.message_content = True
# Automatic sharding - library determines shard count
client = AutoShardedClient(intents=intents)
# Manual shard configuration
client = AutoShardedClient(
intents=intents,
shard_count=4,
shard_ids=[0, 1] # Only run shards 0 and 1
)
@client.event
async def on_ready():
print(f'Logged in as {client.user}')
print(f'Shard count: {len(client.shards)}')
for shard_id, shard in client.shards.items():
print(f'Shard {shard_id}: {shard.latency:.2f}s latency')
client.run('your_bot_token')class AutoShardedClient(Client):
def __init__(
self,
*,
shard_ids: Optional[List[int]] = None,
shard_count: Optional[int] = None,
# ... all Client parameters ...
) -> None:Additional Parameters:
shard_ids: Specific shard IDs to launch (requires shard_count)shard_count: Total number of shards (auto-detected if None)# Shard information access
@property
def shards(self) -> Dict[int, ShardInfo]:
"""Mapping of shard IDs to ShardInfo objects."""
def get_shard(self, shard_id: int) -> Optional[ShardInfo]:
"""Gets shard information for a specific shard ID."""
@property
def latency(self) -> float:
"""Average latency across all shards."""
@property
def latencies(self) -> List[Tuple[int, float]]:
"""List of (shard_id, latency) tuples for all shards."""
# Per-shard presence changes
async def change_presence(
self,
*,
activity: Optional[BaseActivity] = None,
status: Optional[Status] = None,
shard_id: Optional[int] = None, # None = all shards
) -> None:
"""Changes presence for specific shard or all shards."""class ShardInfo:
"""Information and control over a specific shard."""
@property
def id(self) -> int:
"""The shard ID."""
@property
def shard_count(self) -> Optional[int]:
"""Total shard count for this cluster."""
@property
def latency(self) -> float:
"""Latency for this specific shard."""
def is_closed(self) -> bool:
"""Whether the shard connection is closed."""
def is_ws_ratelimited(self) -> bool:
"""Whether this shard's websocket is rate limited."""
async def disconnect(self) -> None:
"""Disconnects this shard."""
async def reconnect(self) -> None:
"""Disconnects and reconnects this shard."""
async def connect(self) -> None:
"""Connects this shard if disconnected."""The client uses WebSocket connections to Discord's gateway for real-time events.
# Connection properties
@property
def ws(self) -> Optional[DiscordWebSocket]:
"""Current websocket gateway connection."""
# Rate limiting
def is_ws_ratelimited(self) -> bool:
"""Check if websocket is rate limited."""
# Reconnection handling
async def connect(self, *, reconnect: bool = True) -> None:
"""
Connect with automatic reconnection handling.
Parameters:
- reconnect: Whether to attempt reconnection on failure
"""# Connection lifecycle events
@client.event
async def on_connect():
"""Called when client connects to Discord."""
@client.event
async def on_ready():
"""Called when client is ready to receive events."""
@client.event
async def on_disconnect():
"""Called when client disconnects from Discord."""
@client.event
async def on_resume():
"""Called when websocket resumes a previous connection."""
# Shard-specific events (AutoShardedClient only)
@client.event
async def on_shard_connect(shard_id: int):
"""Called when a shard connects."""
@client.event
async def on_shard_ready(shard_id: int):
"""Called when a shard is ready."""
@client.event
async def on_shard_disconnect(shard_id: int):
"""Called when a shard disconnects."""
@client.event
async def on_shard_resume(shard_id: int):
"""Called when a shard resumes."""Intents control which gateway events your bot receives. Configure them based on your bot's needs.
from nextcord import Intents
# Default intents (recommended starting point)
intents = Intents.default() # Excludes privileged intents
# All intents (requires privileged intents approval)
intents = Intents.all()
# No intents
intents = Intents.none()
# Custom intent configuration
intents = Intents.default()
intents.members = True # Privileged - member events
intents.presences = True # Privileged - presence updates
intents.message_content = True # Privileged - message content access
intents.guilds = True # Guild events (recommended)
intents.guild_messages = True # Guild message events
intents.dm_messages = True # DM message events
intents.guild_reactions = True # Reaction events
intents.voice_states = True # Required for voice featuresclass Intents:
# Basic intents
guilds: bool # Guild events (join, leave, updates)
guild_messages: bool # Guild message events
dm_messages: bool # Direct message events
guild_reactions: bool # Guild reaction events
dm_reactions: bool # DM reaction events
guild_typing: bool # Guild typing events
dm_typing: bool # DM typing events
# Privileged intents (require approval for verified bots)
members: bool # Member join/leave/update events
presences: bool # Member presence/status updates
message_content: bool # Access to message content
# Other intents
moderation: bool # Moderation events (bans, audit logs)
emojis_and_stickers: bool # Emoji and sticker events
integrations: bool # Integration events
webhooks: bool # Webhook events
invites: bool # Invite create/delete events
voice_states: bool # Voice state updates (required for voice)
guild_scheduled_events: bool # Scheduled event updates
auto_moderation_configuration: bool # Auto-mod rule changes
auto_moderation_execution: bool # Auto-mod actions
# Convenience properties
messages: bool # Both guild_messages and dm_messages
reactions: bool # Both guild_reactions and dm_reactions
typing: bool # Both guild_typing and dm_typing
auto_moderation: bool # Both auto_moderation intentsfrom nextcord import Status, Activity, ActivityType
# Status configuration
client = nextcord.Client(
status=Status.idle, # online, idle, dnd, invisible
activity=Activity(type=ActivityType.playing, name="with the Discord API")
)
# Change presence after connection
await client.change_presence(
status=Status.dnd,
activity=Activity(type=ActivityType.listening, name="user commands")
)
# Activity types
ActivityType.playing # "Playing {name}"
ActivityType.listening # "Listening to {name}"
ActivityType.watching # "Watching {name}"
ActivityType.competing # "Competing in {name}"
ActivityType.streaming # "Streaming {name}" (requires url)
ActivityType.custom # Custom status# Configure message caching
client = nextcord.Client(
max_messages=5000, # Cache up to 5000 messages
# max_messages=None, # Disable message caching
# max_messages=0, # No message caching
)
# Access cached messages
@client.event
async def on_ready():
print(f"Cached {len(client.cached_messages)} messages")
for message in client.cached_messages:
print(f"{message.author}: {message.content}")import aiohttp
# Advanced client configuration
connector = aiohttp.TCPConnector(limit=100)
client = nextcord.Client(
connector=connector, # Custom aiohttp connector
proxy="http://proxy.example.com:8080", # HTTP proxy
proxy_auth=aiohttp.BasicAuth("user", "pass"), # Proxy authentication
heartbeat_timeout=90.0, # Heartbeat timeout (seconds)
guild_ready_timeout=5.0, # Guild ready timeout (seconds)
assume_unsync_clock=False, # Use system clock for rate limits
enable_debug_events=True, # Enable socket debug events
chunk_guilds_at_startup=False, # Don't chunk members at startup
)
# Member cache configuration
from nextcord import MemberCacheFlags
member_cache = MemberCacheFlags.all()
member_cache.joined = False # Don't cache join times
client = nextcord.Client(
intents=intents,
member_cache_flags=member_cache
)# Customize identify behavior (useful for large bots)
async def before_identify_hook(self, shard_id: Optional[int], *, initial: bool = False) -> None:
"""
Called before IDENTIFY for each shard.
Parameters:
- shard_id: The shard ID being identified
- initial: Whether this is the first IDENTIFY
"""
if not initial:
await asyncio.sleep(5.0) # Default: wait 5 seconds between identifies
# Custom logic for identify timing
if shard_id is not None and shard_id > 0:
await asyncio.sleep(shard_id * 2) # Staggered identificationfrom nextcord import ConnectionClosed, GatewayNotFound, PrivilegedIntentsRequired
try:
await client.start(token)
except ConnectionClosed as e:
print(f"Connection closed: {e.code} - {e.reason}")
except GatewayNotFound:
print("Could not find Discord gateway (API outage?)")
except PrivilegedIntentsRequired as e:
print(f"Privileged intents required for shard {e.shard_id}")
# Custom error handling
@client.event
async def on_error(event, *args, **kwargs):
import traceback
print(f'Error in {event}:')
traceback.print_exc()import logging
import nextcord
from nextcord import Intents
# Configure logging
logging.basicConfig(level=logging.INFO)
# Production client setup
intents = Intents.default()
intents.message_content = True # Only if needed
client = nextcord.Client(
intents=intents,
max_messages=1000, # Reasonable message cache
heartbeat_timeout=60.0, # Default heartbeat timeout
chunk_guilds_at_startup=False, # Faster startup for large bots
)
@client.event
async def on_ready():
logging.info(f'{client.user} is ready!')
logging.info(f'Latency: {client.latency:.2f}s')
logging.info(f'Guilds: {len(client.guilds)}')
@client.event
async def on_error(event, *args, **kwargs):
logging.exception(f'Error in event {event}')
# Graceful shutdown
import signal
def signal_handler(signum, frame):
logging.info('Received shutdown signal')
client.loop.create_task(client.close())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
client.run(token)# For bots in 1000+ guilds
from nextcord import AutoShardedClient, Intents
intents = Intents.default()
intents.members = False # Disable if not needed (reduces memory)
intents.presences = False # Disable if not needed (reduces memory)
intents.message_content = True # Only if needed
client = AutoShardedClient(
intents=intents,
max_messages=500, # Lower message cache per shard
chunk_guilds_at_startup=False, # Essential for large bots
member_cache_flags=MemberCacheFlags.none(), # Minimize memory usage
)
@client.event
async def on_ready():
total_guilds = len(client.guilds)
shard_count = len(client.shards)
avg_latency = client.latency
print(f'Bot ready: {total_guilds} guilds across {shard_count} shards')
print(f'Average latency: {avg_latency:.2f}s')
# Log per-shard info
for shard_id, shard in client.shards.items():
guilds_on_shard = len([g for g in client.guilds if g.shard_id == shard_id])
print(f'Shard {shard_id}: {guilds_on_shard} guilds, {shard.latency:.2f}s latency')
client.run(token)This comprehensive guide covers all aspects of nextcord's client and connection management capabilities, providing both basic usage examples and advanced configuration options for production deployments.
Install with Tessl CLI
npx tessl i tessl/pypi-nextcord