A Feature-rich Discord Bot Framework for Python with comprehensive API coverage and modern interfaces
—
Extension system, utilities, converters, tasks, and additional functionality for organizing and enhancing Discord bots.
from interactions import Extension, slash_command, SlashContext, listen, events
class BasicExtension(Extension):
"""A basic extension example"""
@slash_command(name="ext_hello", description="Hello from extension")
async def hello_command(self, ctx: SlashContext):
await ctx.send("Hello from extension!")
@listen(events.Ready)
async def on_ready(self, event: events.Ready):
print(f"Extension loaded! Bot: {self.bot.user}")
# Load the extension
bot.load_extension("extensions.basic") # Loads from extensions/basic.py# extensions/moderation.py
from interactions import Extension, slash_command, SlashContext, Client
class ModerationExtension(Extension):
def __init__(self, bot: Client):
self.banned_words = ["spam", "inappropriate"]
@slash_command(name="ban", description="Ban a user")
async def ban_command(self, ctx: SlashContext, user: Member, reason: str = None):
"""Ban a user from the guild"""
if not ctx.author.guild_permissions.BAN_MEMBERS:
await ctx.send("You don't have permission to ban members!", ephemeral=True)
return
await user.ban(reason=reason)
await ctx.send(f"Banned {user.mention}. Reason: {reason or 'No reason provided'}")
@slash_command(name="kick", description="Kick a user")
async def kick_command(self, ctx: SlashContext, user: Member, reason: str = None):
"""Kick a user from the guild"""
if not ctx.author.guild_permissions.KICK_MEMBERS:
await ctx.send("You don't have permission to kick members!", ephemeral=True)
return
await user.kick(reason=reason)
await ctx.send(f"Kicked {user.mention}. Reason: {reason or 'No reason provided'}")
@listen(events.MessageCreate)
async def check_banned_words(self, event: events.MessageCreate):
"""Auto-moderation for banned words"""
message = event.message
if message.author.bot:
return
content_lower = message.content.lower()
for word in self.banned_words:
if word in content_lower:
await message.delete()
await message.author.timeout(duration=300, reason=f"Used banned word: {word}")
break
def setup(bot):
"""Setup function called when extension is loaded"""
ModerationExtension(bot)class LifecycleExtension(Extension):
def __init__(self, bot: Client):
print("Extension initialized")
def drop(self):
"""Called when extension is unloaded"""
print("Extension unloaded - cleaning up resources")
# Clean up resources, close connections, etc.
@Extension.listener
async def on_extension_load(self, event: events.ExtensionLoad):
"""Called when ANY extension loads"""
print(f"Extension loaded: {event.extension.name}")
@Extension.listener
async def on_extension_unload(self, event: events.ExtensionUnload):
"""Called when ANY extension unloads"""
print(f"Extension unloaded: {event.extension.name}")
def setup(bot):
LifecycleExtension(bot)# Load single extension
bot.load_extension("extensions.moderation")
# Load multiple extensions
extensions = [
"extensions.moderation",
"extensions.fun",
"extensions.admin",
"extensions.utility"
]
for ext in extensions:
try:
bot.load_extension(ext)
print(f"Loaded {ext}")
except Exception as e:
print(f"Failed to load {ext}: {e}")
# Reload extension (unload then load)
bot.reload_extension("extensions.moderation")
# Unload extension
bot.unload_extension("extensions.moderation")from interactions import Task, IntervalTrigger, Extension
import asyncio
class TaskExtension(Extension):
def __init__(self, bot: Client):
# Create task that runs every 60 seconds
self.status_task = Task(
coro=self.update_status,
trigger=IntervalTrigger(seconds=60)
)
self.status_task.start()
async def update_status(self):
"""Update bot status every minute"""
guild_count = len(self.bot.guilds)
activity = Activity(
name=f"Watching {guild_count} servers",
type=ActivityType.WATCHING
)
await self.bot.change_presence(activity=activity)
def drop(self):
"""Clean up when extension unloads"""
self.status_task.stop()
def setup(bot):
TaskExtension(bot)from interactions import Task, TimeTrigger, DateTrigger, OrTrigger
from datetime import time, date, datetime
class ScheduledTaskExtension(Extension):
def __init__(self, bot: Client):
# Daily reminder at 9:00 AM
self.daily_reminder = Task(
coro=self.send_daily_reminder,
trigger=TimeTrigger(hour=9, minute=0)
)
# Specific date task
self.special_event = Task(
coro=self.special_event_reminder,
trigger=DateTrigger(year=2024, month=12, day=25, hour=12)
)
# Multiple time triggers
self.hourly_check = Task(
coro=self.hourly_maintenance,
trigger=OrTrigger(
TimeTrigger(minute=0), # Every hour at :00
TimeTrigger(minute=30) # Every hour at :30
)
)
# Start all tasks
self.daily_reminder.start()
self.special_event.start()
self.hourly_check.start()
async def send_daily_reminder(self):
"""Send daily reminder to all guilds"""
for guild in self.bot.guilds:
if guild.system_channel:
await guild.system_channel.send("📅 Daily reminder: Don't forget to check the announcements!")
async def special_event_reminder(self):
"""Special event reminder"""
for guild in self.bot.guilds:
if guild.system_channel:
await guild.system_channel.send("🎄 Special event today!")
async def hourly_maintenance(self):
"""Perform maintenance tasks"""
print("Running hourly maintenance...")
# Clean up expired data, update caches, etc.
def setup(bot):
ScheduledTaskExtension(bot)from interactions import Task, IntervalTrigger
class DecoratorTaskExtension(Extension):
@Task.create(IntervalTrigger(minutes=5))
async def cleanup_task(self):
"""Cleanup task using decorator syntax"""
print("Running cleanup...")
# Cleanup logic here
@Task.create(IntervalTrigger(hours=1))
async def backup_data(self):
"""Hourly data backup"""
print("Backing up data...")
# Backup logic here
def setup(bot):
DecoratorTaskExtension(bot)from interactions import cooldown, Buckets, SlashContext
class CooldownExtension(Extension):
@slash_command(name="limited", description="Command with cooldown")
@cooldown(bucket=Buckets.USER, rate=1, per=30) # 1 use per 30 seconds per user
async def limited_command(self, ctx: SlashContext):
await ctx.send("This command has a 30-second cooldown per user!")
@slash_command(name="guild_limited", description="Guild-wide cooldown")
@cooldown(bucket=Buckets.GUILD, rate=3, per=60) # 3 uses per minute per guild
async def guild_limited_command(self, ctx: SlashContext):
await ctx.send("This command has a guild-wide cooldown!")
@slash_command(name="channel_limited", description="Channel cooldown")
@cooldown(bucket=Buckets.CHANNEL, rate=2, per=45) # 2 uses per 45 seconds per channel
async def channel_limited_command(self, ctx: SlashContext):
await ctx.send("This command has a per-channel cooldown!")
def setup(bot):
CooldownExtension(bot)from interactions import CooldownSystem, MaxConcurrency
class AdvancedCooldownExtension(Extension):
@slash_command(name="complex", description="Complex rate limiting")
@cooldown(bucket=Buckets.USER, rate=5, per=300) # 5 uses per 5 minutes
@cooldown(bucket=Buckets.GUILD, rate=20, per=300) # 20 uses per 5 minutes per guild
@max_concurrency(limit=1, per=Buckets.USER) # Only 1 concurrent use per user
async def complex_limited_command(self, ctx: SlashContext):
await ctx.defer() # This might take a while
# Simulate long operation
await asyncio.sleep(10)
await ctx.send("Complex operation completed!")
def setup(bot):
AdvancedCooldownExtension(bot)@listen(events.CommandError)
async def cooldown_error_handler(event: events.CommandError):
"""Handle cooldown errors"""
if isinstance(event.error, interactions.errors.CommandOnCooldown):
ctx = event.ctx
error = event.error
retry_after = round(error.retry_after, 2)
await ctx.send(
f"Command is on cooldown! Try again in {retry_after} seconds.",
ephemeral=True
)from interactions import check, guild_only, dm_only, has_role, has_any_role, is_owner
class CheckExtension(Extension):
@slash_command(name="admin_only", description="Admin only command")
@check(lambda ctx: ctx.author.guild_permissions.ADMINISTRATOR)
async def admin_only_command(self, ctx: SlashContext):
await ctx.send("You are an administrator!")
@slash_command(name="guild_only", description="Guild only command")
@guild_only()
async def guild_only_command(self, ctx: SlashContext):
await ctx.send("This only works in servers!")
@slash_command(name="dm_only", description="DM only command")
@dm_only()
async def dm_only_command(self, ctx: SlashContext):
await ctx.send("This only works in DMs!")
@slash_command(name="role_check", description="Requires specific role")
@has_role("Moderator")
async def role_check_command(self, ctx: SlashContext):
await ctx.send("You have the Moderator role!")
@slash_command(name="any_role_check", description="Requires one of several roles")
@has_any_role("Admin", "Moderator", "Helper")
async def any_role_check_command(self, ctx: SlashContext):
await ctx.send("You have at least one of the required roles!")
@slash_command(name="owner_only", description="Bot owner only")
@is_owner()
async def owner_only_command(self, ctx: SlashContext):
await ctx.send("You are the bot owner!")
def setup(bot):
CheckExtension(bot)def is_premium_user():
"""Custom check for premium users"""
async def predicate(ctx: SlashContext):
# Check if user is premium (from database, etc.)
return await check_premium_status(ctx.author.id)
return check(predicate)
def in_allowed_channel(*channel_ids):
"""Check if command is used in allowed channels"""
async def predicate(ctx: SlashContext):
return ctx.channel.id in channel_ids
return check(predicate)
class CustomCheckExtension(Extension):
@slash_command(name="premium", description="Premium users only")
@is_premium_user()
async def premium_command(self, ctx: SlashContext):
await ctx.send("Welcome, premium user! 🌟")
@slash_command(name="channel_restricted", description="Only in specific channels")
@in_allowed_channel(123456789, 987654321) # Replace with actual channel IDs
async def channel_restricted_command(self, ctx: SlashContext):
await ctx.send("This command works in this channel!")
def setup(bot):
CustomCheckExtension(bot)The library provides many built-in converters for automatic argument conversion:
from interactions import (
UserConverter, MemberConverter, RoleConverter, GuildConverter,
ChannelConverter, MessageConverter, CustomEmojiConverter,
SnowflakeConverter, Converter
)
# Converters are used automatically based on parameter types
@slash_command(name="info", description="Get info about various objects")
async def info_command(
ctx: SlashContext,
user: User, # Automatically converted using UserConverter
channel: GuildText, # Automatically converted using ChannelConverter
role: Role # Automatically converted using RoleConverter
):
await ctx.send(f"User: {user}, Channel: {channel.mention}, Role: {role.mention}")from interactions import Converter
class TemperatureConverter(Converter):
"""Convert temperature strings to celsius"""
async def convert(self, ctx: SlashContext, argument: str) -> float:
"""Convert temperature to celsius"""
argument = argument.lower().strip()
if argument.endswith('f') or argument.endswith('°f'):
# Convert Fahrenheit to Celsius
temp = float(argument.rstrip('f°'))
return (temp - 32) * 5/9
elif argument.endswith('k'):
# Convert Kelvin to Celsius
temp = float(argument.rstrip('k'))
return temp - 273.15
elif argument.endswith('c') or argument.endswith('°c'):
# Already Celsius
return float(argument.rstrip('c°'))
else:
# Assume Celsius if no unit
try:
return float(argument)
except ValueError:
raise ValueError(f"Invalid temperature format: {argument}")
# Use custom converter
@slash_command(name="temperature", description="Convert temperature")
async def temperature_command(ctx: SlashContext, temp: TemperatureConverter):
"""Command using custom temperature converter"""
fahrenheit = (temp * 9/5) + 32
kelvin = temp + 273.15
await ctx.send(f"{temp}°C = {fahrenheit}°F = {kelvin}K")from interactions import Wait
class WaitExtension(Extension):
@slash_command(name="wait_demo", description="Demonstrate wait functionality")
async def wait_demo_command(self, ctx: SlashContext):
await ctx.send("Say something in the next 30 seconds...")
try:
# Wait for a message from the same user in the same channel
message_event = await Wait.for_message(
self.bot,
check=lambda m: m.author.id == ctx.author.id and m.channel.id == ctx.channel.id,
timeout=30
)
message = message_event.message
await ctx.followup(f"You said: {message.content}")
except asyncio.TimeoutError:
await ctx.followup("You didn't say anything in time!")
@slash_command(name="wait_reaction", description="Wait for reaction")
async def wait_reaction_command(self, ctx: SlashContext):
msg = await ctx.send("React with 👍 or 👎")
# Add reactions
await msg.add_reaction("👍")
await msg.add_reaction("👎")
try:
# Wait for reaction from command user
reaction_event = await Wait.for_reaction(
self.bot,
message=msg,
check=lambda r, u: u.id == ctx.author.id and str(r.emoji) in ["👍", "👎"],
timeout=60
)
reaction, user = reaction_event.reaction, reaction_event.user
if str(reaction.emoji) == "👍":
await ctx.followup("You liked it!")
else:
await ctx.followup("You didn't like it.")
except asyncio.TimeoutError:
await ctx.followup("No reaction received!")
def setup(bot):
WaitExtension(bot)@slash_command(name="wait_button", description="Wait for button press")
async def wait_button_command(self, ctx: SlashContext):
button = Button(
style=ButtonStyle.PRIMARY,
label="Click Me!",
custom_id="wait_button"
)
await ctx.send("Click the button:", components=[ActionRow(button)])
try:
# Wait for component interaction
component_event = await Wait.for_component(
self.bot,
custom_id="wait_button",
timeout=30
)
component_ctx = component_event.ctx
await component_ctx.send("Button clicked!", ephemeral=True)
except asyncio.TimeoutError:
# Remove button after timeout
await ctx.edit_origin("Button expired.", components=[])from interactions import auto_defer, AutoDefer
class AutoDeferExtension(Extension):
@slash_command(name="slow_command", description="A slow command")
@auto_defer() # Automatically defer after 2 seconds
async def slow_command(self, ctx: SlashContext):
# Simulate slow operation
await asyncio.sleep(5)
await ctx.send("Slow operation completed!")
@slash_command(name="custom_defer", description="Custom defer timing")
@auto_defer(time_until_defer=1.0, ephemeral=True) # Defer after 1 second, ephemeral
async def custom_defer_command(self, ctx: SlashContext):
await asyncio.sleep(3)
await ctx.send("Custom defer completed!")
def setup(bot):
AutoDeferExtension(bot)# Configure auto-defer globally when creating the client
bot = Client(
token="TOKEN",
auto_defer=AutoDefer(enabled=True, time_until_defer=2.0, ephemeral=False)
)from interactions import Greedy
@slash_command(name="ban_multiple", description="Ban multiple users")
async def ban_multiple_command(ctx: SlashContext, users: Greedy[Member], reason: str = None):
"""Ban multiple users with greedy parsing"""
if not users:
await ctx.send("No users specified!")
return
banned_count = 0
for user in users:
try:
await user.ban(reason=reason)
banned_count += 1
except Exception as e:
print(f"Failed to ban {user}: {e}")
await ctx.send(f"Successfully banned {banned_count} users.")from interactions import AsyncIterator
class UtilityExtension(Extension):
@slash_command(name="list_messages", description="List recent messages")
async def list_messages_command(self, ctx: SlashContext, limit: int = 10):
"""List recent messages using async iterator"""
messages = []
# AsyncIterator for channel history
async for message in AsyncIterator(ctx.channel.history(limit=limit)):
messages.append(f"{message.author.display_name}: {message.content[:50]}")
if messages:
message_list = "\n".join(messages[:10]) # Limit display
await ctx.send(f"Recent messages:\n```\n{message_list}\n```")
else:
await ctx.send("No messages found.")
def setup(bot):
UtilityExtension(bot)from interactions import Typing
class TypingExtension(Extension):
@slash_command(name="typing_demo", description="Show typing indicator")
async def typing_demo_command(self, ctx: SlashContext):
await ctx.defer()
# Show typing indicator while processing
async with Typing(ctx.channel):
# Simulate work
await asyncio.sleep(3)
await ctx.send("Done processing!")
def setup(bot):
TypingExtension(bot)# extensions/database.py
class DatabaseExtension(Extension):
def __init__(self, bot: Client):
self.connection = None
async def connect_database(self):
"""Connect to database"""
# Database connection logic
pass
def drop(self):
"""Close database connection"""
if self.connection:
self.connection.close()
def setup(bot):
DatabaseExtension(bot)
# extensions/user_management.py
class UserManagementExtension(Extension):
def __init__(self, bot: Client):
# Get database extension
self.db_ext = bot.get_extension("DatabaseExtension")
if not self.db_ext:
raise Exception("DatabaseExtension is required!")
@slash_command(name="user_info", description="Get user info from database")
async def user_info_command(self, ctx: SlashContext, user: User):
# Use database extension
user_data = await self.db_ext.get_user_data(user.id)
await ctx.send(f"User data: {user_data}")
def setup(bot):
UserManagementExtension(bot)import json
import os
class ConfigurableExtension(Extension):
def __init__(self, bot: Client):
# Load configuration
config_path = "extensions/config/moderation.json"
self.config = self.load_config(config_path)
# Use config values
self.auto_mod_enabled = self.config.get("auto_mod_enabled", True)
self.banned_words = self.config.get("banned_words", [])
self.warning_threshold = self.config.get("warning_threshold", 3)
def load_config(self, path: str) -> dict:
"""Load configuration from JSON file"""
if os.path.exists(path):
with open(path, 'r') as f:
return json.load(f)
return {}
def save_config(self, path: str):
"""Save configuration to JSON file"""
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
json.dump(self.config, f, indent=4)
@slash_command(name="config", description="Configure extension")
async def config_command(self, ctx: SlashContext, setting: str, value: str):
"""Update configuration setting"""
if setting in self.config:
# Type conversion based on current value
current_value = self.config[setting]
if isinstance(current_value, bool):
self.config[setting] = value.lower() in ('true', '1', 'yes', 'on')
elif isinstance(current_value, int):
self.config[setting] = int(value)
elif isinstance(current_value, list):
self.config[setting] = value.split(',')
else:
self.config[setting] = value
self.save_config("extensions/config/moderation.json")
await ctx.send(f"Updated {setting} to {value}")
else:
await ctx.send(f"Unknown setting: {setting}")
def setup(bot):
ConfigurableExtension(bot)Install with Tessl CLI
npx tessl i tessl/pypi-discord-py-interactions