A Python wrapper for the Discord API forked from discord.py
—
Task scheduling and background operations with the tasks extension for periodic and scheduled operations, providing robust async task management.
The primary decorator for creating background tasks with flexible scheduling options.
import nextcord
from nextcord.ext import tasks, commands
from datetime import datetime, time, timedelta
from typing import Optional, Union, Sequence, Any, Callable
def loop(
*,
seconds: float = MISSING,
minutes: float = MISSING,
hours: float = MISSING,
time: Union[time, Sequence[time]] = MISSING,
count: Optional[int] = None,
reconnect: bool = True
) -> Callable:
"""Decorator to create a task loop.
Parameters
----------
seconds: float
Number of seconds between iterations.
minutes: float
Number of minutes between iterations.
hours: float
Number of hours between iterations.
time: Union[time, Sequence[time]]
Specific time(s) to run the task daily.
count: Optional[int]
Number of times to run the task. None for infinite.
reconnect: bool
Whether to reconnect the bot if it disconnects during the task.
Returns
-------
Callable
The decorated function as a Loop object.
"""
...
# Basic periodic tasks
@tasks.loop(seconds=30)
async def status_updater():
"""Update bot status every 30 seconds."""
guild_count = len(bot.guilds)
member_count = sum(guild.member_count for guild in bot.guilds)
activity = nextcord.Activity(
type=nextcord.ActivityType.watching,
name=f"{guild_count} servers • {member_count} members"
)
await bot.change_presence(activity=activity)
@tasks.loop(minutes=5)
async def backup_data():
"""Backup important data every 5 minutes."""
print(f"[{datetime.now()}] Starting data backup...")
# Simulate backup process
await backup_user_data()
await backup_guild_settings()
print(f"[{datetime.now()}] Data backup completed!")
@tasks.loop(hours=1)
async def cleanup_temp_files():
"""Clean up temporary files hourly."""
import os
import glob
temp_dir = "./temp/"
if os.path.exists(temp_dir):
old_files = glob.glob(os.path.join(temp_dir, "*"))
for file_path in old_files:
try:
# Delete files older than 1 hour
if os.path.getmtime(file_path) < (datetime.now().timestamp() - 3600):
os.remove(file_path)
print(f"Deleted old temp file: {file_path}")
except OSError:
pass
# Daily tasks at specific times
@tasks.loop(time=time(hour=0, minute=0)) # Midnight UTC
async def daily_reset():
"""Reset daily statistics at midnight."""
print("Performing daily reset...")
# Reset daily user stats
await reset_daily_stats()
# Send daily report to admin channel
admin_channel = bot.get_channel(ADMIN_CHANNEL_ID)
if admin_channel:
stats = await get_daily_statistics()
embed = nextcord.Embed(
title="📊 Daily Report",
description=f"Statistics for {datetime.now().strftime('%Y-%m-%d')}",
color=nextcord.Color.blue()
)
embed.add_field(name="Messages Processed", value=stats['messages'], inline=True)
embed.add_field(name="Commands Used", value=stats['commands'], inline=True)
embed.add_field(name="New Users", value=stats['new_users'], inline=True)
await admin_channel.send(embed=embed)
# Multiple daily times
@tasks.loop(time=[time(hour=9), time(hour=15), time(hour=21)]) # 9 AM, 3 PM, 9 PM
async def reminder_check():
"""Check for due reminders three times daily."""
due_reminders = await get_due_reminders()
for reminder in due_reminders:
try:
user = bot.get_user(reminder['user_id'])
if user:
embed = nextcord.Embed(
title="⏰ Reminder",
description=reminder['message'],
color=nextcord.Color.orange()
)
embed.set_footer(text=f"Set on {reminder['created_at']}")
await user.send(embed=embed)
await mark_reminder_completed(reminder['id'])
except nextcord.Forbidden:
# User has DMs disabled
pass
# Limited count tasks
@tasks.loop(seconds=10, count=5)
async def startup_checks():
"""Perform startup checks 5 times with 10-second intervals."""
check_number = startup_checks.current_loop + 1
print(f"Startup check {check_number}/5")
# Check database connection
if await check_database_connection():
print("✅ Database connection OK")
else:
print("❌ Database connection failed")
# Check API endpoints
if await check_external_apis():
print("✅ External APIs OK")
else:
print("❌ External API check failed")The Loop class provides control over background tasks with start, stop, and restart functionality.
class Loop:
"""Represents a background task loop.
Attributes
----------
coro: Callable
The coroutine function being looped.
seconds: Optional[float]
Seconds between each iteration.
minutes: Optional[float]
Minutes between each iteration.
hours: Optional[float]
Hours between each iteration.
time: Optional[Sequence[time]]
Specific times to run daily.
count: Optional[int]
Maximum number of iterations.
current_loop: int
Current iteration number (0-indexed).
next_iteration: Optional[datetime]
When the next iteration will run.
"""
def start(self, *args, **kwargs) -> None:
"""Start the loop.
Parameters
----------
*args, **kwargs
Arguments to pass to the loop function.
"""
...
def stop(self) -> None:
"""Stop the loop."""
...
def restart(self, *args, **kwargs) -> None:
"""Restart the loop with new arguments.
Parameters
----------
*args, **kwargs
New arguments to pass to the loop function.
"""
...
def cancel(self) -> None:
"""Cancel the loop (alias for stop)."""
...
def change_interval(
self,
*,
seconds: float = MISSING,
minutes: float = MISSING,
hours: float = MISSING,
time: Union[time, Sequence[time]] = MISSING
) -> None:
"""Change the loop interval.
Parameters
----------
seconds: float
New seconds interval.
minutes: float
New minutes interval.
hours: float
New hours interval.
time: Union[time, Sequence[time]]
New time(s) for daily execution.
"""
...
def is_running(self) -> bool:
"""bool: Whether the loop is currently running."""
...
def is_being_cancelled(self) -> bool:
"""bool: Whether the loop is being cancelled."""
...
def failed(self) -> bool:
"""bool: Whether the loop has failed and stopped."""
...
def get_task(self) -> Optional[asyncio.Task]:
"""Optional[asyncio.Task]: The underlying asyncio task."""
...
# Event handlers that can be overridden
async def before_loop(self) -> None:
"""Called before the loop starts."""
...
async def after_loop(self) -> None:
"""Called after the loop ends."""
...
def error(self, exception: Exception) -> None:
"""Called when an exception occurs in the loop.
Parameters
----------
exception: Exception
The exception that occurred.
"""
...
# Advanced loop control example
class DatabaseManager:
def __init__(self, bot):
self.bot = bot
self.connection_check.start()
self.maintenance_task.start()
@tasks.loop(minutes=1)
async def connection_check(self):
"""Monitor database connection health."""
try:
# Test database connection
await self.test_connection()
if not self.is_healthy:
print("Database connection restored!")
self.is_healthy = True
except Exception as e:
if self.is_healthy:
print(f"Database connection lost: {e}")
self.is_healthy = False
# Notify administrators
await self.notify_admins("🚨 Database connection lost!")
@connection_check.before_loop
async def before_connection_check(self):
"""Wait for bot to be ready before starting connection checks."""
await self.bot.wait_until_ready()
self.is_healthy = True
print("Starting database connection monitoring...")
@connection_check.error
async def connection_check_error(self, exception):
"""Handle errors in connection check loop."""
print(f"Error in connection check: {exception}")
# If the error is critical, restart the loop
if isinstance(exception, CriticalDatabaseError):
print("Critical database error - restarting connection check...")
self.connection_check.restart()
@tasks.loop(hours=24)
async def maintenance_task(self):
"""Daily database maintenance."""
print("Starting daily database maintenance...")
try:
await self.optimize_tables()
await self.clean_old_records()
await self.update_statistics()
print("Database maintenance completed successfully")
except Exception as e:
print(f"Maintenance task failed: {e}")
raise
# Dynamic loop management
class TaskManager:
def __init__(self, bot):
self.bot = bot
self.active_loops = {}
def create_reminder_loop(self, user_id: int, message: str, interval: int):
"""Create a personalized reminder loop."""
@tasks.loop(seconds=interval)
async def user_reminder():
user = self.bot.get_user(user_id)
if user:
try:
await user.send(f"⏰ Reminder: {message}")
except nextcord.Forbidden:
# Stop loop if user blocks bot
user_reminder.stop()
del self.active_loops[user_id]
# Store and start the loop
self.active_loops[user_id] = user_reminder
user_reminder.start()
return user_reminder
def stop_user_reminder(self, user_id: int):
"""Stop a user's reminder loop."""
if user_id in self.active_loops:
self.active_loops[user_id].stop()
del self.active_loops[user_id]
return True
return False
def get_user_reminder_status(self, user_id: int) -> dict:
"""Get status of a user's reminder loop."""
if user_id not in self.active_loops:
return {"active": False}
loop = self.active_loops[user_id]
return {
"active": loop.is_running(),
"current_loop": loop.current_loop,
"next_iteration": loop.next_iteration
}
# Task manager usage
task_manager = TaskManager(bot)
@bot.command()
async def set_reminder(ctx, interval: int, *, message: str):
"""Set a personal reminder that repeats every X seconds."""
if interval < 60:
await ctx.send("❌ Interval must be at least 60 seconds.")
return
if interval > 86400: # 24 hours
await ctx.send("❌ Interval cannot exceed 24 hours.")
return
# Stop existing reminder if any
task_manager.stop_user_reminder(ctx.author.id)
# Create new reminder
loop = task_manager.create_reminder_loop(ctx.author.id, message, interval)
await ctx.send(f"✅ Reminder set! Will remind you every {interval} seconds: {message}")
@bot.command()
async def stop_reminder(ctx):
"""Stop your personal reminder."""
if task_manager.stop_user_reminder(ctx.author.id):
await ctx.send("✅ Your reminder has been stopped.")
else:
await ctx.send("❌ You don't have an active reminder.")
@bot.command()
async def reminder_status(ctx):
"""Check your reminder status."""
status = task_manager.get_user_reminder_status(ctx.author.id)
if not status["active"]:
await ctx.send("You don't have an active reminder.")
return
embed = nextcord.Embed(title="Reminder Status", color=nextcord.Color.blue())
embed.add_field(name="Status", value="Active ✅", inline=True)
embed.add_field(name="Iteration", value=status["current_loop"] + 1, inline=True)
if status["next_iteration"]:
next_time = status["next_iteration"].strftime("%H:%M:%S UTC")
embed.add_field(name="Next Reminder", value=next_time, inline=True)
await ctx.send(embed=embed)Advanced scheduling patterns for complex timing requirements.
from datetime import time, datetime, timezone, timedelta
import asyncio
# Multiple specific times
@tasks.loop(time=[
time(hour=8, minute=0), # 8:00 AM
time(hour=12, minute=0), # 12:00 PM
time(hour=18, minute=0), # 6:00 PM
time(hour=22, minute=0) # 10:00 PM
])
async def quarterly_announcements():
"""Send announcements four times daily."""
announcements = await get_pending_announcements()
for guild in bot.guilds:
announcement_channel = get_guild_announcement_channel(guild.id)
if announcement_channel and announcements:
for announcement in announcements:
embed = nextcord.Embed(
title="📢 Announcement",
description=announcement['message'],
color=nextcord.Color.gold()
)
embed.set_footer(text=announcement['author'])
try:
await announcement_channel.send(embed=embed)
except nextcord.Forbidden:
pass # No permission to send messages
# Timezone-aware scheduling
import pytz
@tasks.loop(time=time(hour=9, minute=0, tzinfo=pytz.timezone('US/Eastern')))
async def eastern_morning_report():
"""Send morning report at 9 AM Eastern Time."""
eastern = pytz.timezone('US/Eastern')
current_time = datetime.now(eastern)
embed = nextcord.Embed(
title="🌅 Morning Report",
description=f"Good morning! It's {current_time.strftime('%A, %B %d, %Y')}",
color=nextcord.Color.orange()
)
# Add daily statistics
stats = await get_overnight_statistics()
embed.add_field(name="Overnight Activity", value=f"{stats['messages']} messages", inline=True)
embed.add_field(name="New Members", value=stats['new_members'], inline=True)
embed.add_field(name="Server Status", value="🟢 Online", inline=True)
# Send to all configured channels
for guild in bot.guilds:
channel = get_guild_report_channel(guild.id)
if channel:
try:
await channel.send(embed=embed)
except nextcord.Forbidden:
pass
# Weekly scheduling
@tasks.loop(time=time(hour=10, minute=0)) # Daily at 10 AM
async def weekly_summary():
"""Send weekly summary every Sunday."""
if datetime.now().weekday() != 6: # Sunday is 6
return
# Calculate week range
today = datetime.now()
week_start = today - timedelta(days=7)
embed = nextcord.Embed(
title="📊 Weekly Summary",
description=f"Week of {week_start.strftime('%B %d')} - {today.strftime('%B %d, %Y')}",
color=nextcord.Color.purple()
)
# Gather weekly statistics
weekly_stats = await get_weekly_statistics(week_start, today)
embed.add_field(name="Total Messages", value=f"{weekly_stats['messages']:,}", inline=True)
embed.add_field(name="Active Users", value=f"{weekly_stats['active_users']:,}", inline=True)
embed.add_field(name="Commands Used", value=f"{weekly_stats['commands']:,}", inline=True)
embed.add_field(name="New Members", value=f"{weekly_stats['new_members']:,}", inline=True)
embed.add_field(name="Peak Hour", value=weekly_stats['peak_hour'], inline=True)
embed.add_field(name="Top Channel", value=weekly_stats['top_channel'], inline=True)
# Send to admin channels
for guild in bot.guilds:
admin_channel = get_guild_admin_channel(guild.id)
if admin_channel:
try:
await admin_channel.send(embed=embed)
except nextcord.Forbidden:
pass
# Monthly scheduling
@tasks.loop(time=time(hour=0, minute=0)) # Daily at midnight
async def monthly_cleanup():
"""Perform monthly cleanup on the first day of each month."""
today = datetime.now()
if today.day != 1: # Only run on first day of month
return
print(f"Starting monthly cleanup for {today.strftime('%B %Y')}")
try:
# Archive old messages
await archive_old_messages(days=90)
# Clean up inactive users
inactive_users = await cleanup_inactive_users(days=30)
# Generate monthly report
report = await generate_monthly_report(today.year, today.month - 1)
# Send admin notification
admin_channel = bot.get_channel(ADMIN_CHANNEL_ID)
if admin_channel:
embed = nextcord.Embed(
title="🗂️ Monthly Cleanup Complete",
description=f"Cleanup completed for {(today - timedelta(days=1)).strftime('%B %Y')}",
color=nextcord.Color.green()
)
embed.add_field(name="Messages Archived", value=f"{report['archived_messages']:,}", inline=True)
embed.add_field(name="Inactive Users Removed", value=len(inactive_users), inline=True)
embed.add_field(name="Storage Freed", value=f"{report['storage_freed']} MB", inline=True)
await admin_channel.send(embed=embed)
print("Monthly cleanup completed successfully")
except Exception as e:
print(f"Monthly cleanup failed: {e}")
# Notify admins of failure
if admin_channel:
error_embed = nextcord.Embed(
title="❌ Monthly Cleanup Failed",
description=f"Cleanup process encountered an error: {str(e)}",
color=nextcord.Color.red()
)
await admin_channel.send(embed=error_embed)Robust error handling and automatic recovery mechanisms for background tasks.
# Comprehensive error handling
class RobustTaskManager:
def __init__(self, bot):
self.bot = bot
self.error_counts = {}
self.max_retries = 3
self.retry_delay = 60 # seconds
@tasks.loop(minutes=5)
async def health_monitor(self):
"""Monitor bot health and performance."""
try:
# Check memory usage
memory_usage = await self.get_memory_usage()
# Check database connection
db_status = await self.check_database()
# Check API rate limits
rate_limit_status = await self.check_rate_limits()
# Log health metrics
await self.log_health_metrics({
'memory': memory_usage,
'database': db_status,
'rate_limits': rate_limit_status,
'timestamp': datetime.now()
})
# Alert if thresholds exceeded
if memory_usage > 0.8: # 80% memory usage
await self.send_alert("High memory usage detected", "warning")
if not db_status:
await self.send_alert("Database connection failed", "error")
except Exception as e:
await self.handle_task_error('health_monitor', e)
@health_monitor.error
async def health_monitor_error(self, exception):
"""Handle errors in health monitoring."""
print(f"Health monitor error: {exception}")
# Log the error
await self.log_error('health_monitor', exception)
# Increment error count
task_name = 'health_monitor'
self.error_counts[task_name] = self.error_counts.get(task_name, 0) + 1
# If too many errors, stop the task and alert admins
if self.error_counts[task_name] >= self.max_retries:
await self.send_alert(
f"Health monitor has failed {self.max_retries} times and has been stopped",
"critical"
)
self.health_monitor.stop()
@tasks.loop(seconds=30)
async def api_sync(self):
"""Sync data with external APIs."""
try:
# Reset error count on successful run
self.error_counts.pop('api_sync', None)
# Perform API synchronization
await self.sync_user_data()
await self.sync_server_stats()
except aiohttp.ClientTimeout:
# Temporary network issue - don't count as error
print("API sync timed out - retrying next cycle")
except aiohttp.ClientError as e:
# Network error - retry with backoff
await self.handle_network_error('api_sync', e)
except Exception as e:
await self.handle_task_error('api_sync', e)
@api_sync.error
async def api_sync_error(self, exception):
"""Handle API sync errors with exponential backoff."""
task_name = 'api_sync'
error_count = self.error_counts.get(task_name, 0) + 1
self.error_counts[task_name] = error_count
print(f"API sync error #{error_count}: {exception}")
if error_count < self.max_retries:
# Exponential backoff: 60s, 120s, 240s
delay = self.retry_delay * (2 ** (error_count - 1))
print(f"Retrying API sync in {delay} seconds...")
# Restart with delay
await asyncio.sleep(delay)
if not self.api_sync.is_running():
self.api_sync.start()
else:
await self.send_alert(
"API sync has failed repeatedly and requires manual intervention",
"critical"
)
self.api_sync.stop()
async def handle_task_error(self, task_name: str, exception: Exception):
"""Generic task error handler."""
print(f"Task {task_name} error: {exception}")
# Log to file/database
await self.log_error(task_name, exception)
# Send alert for critical errors
if isinstance(exception, (DatabaseError, CriticalError)):
await self.send_alert(
f"Critical error in {task_name}: {exception}",
"critical"
)
async def send_alert(self, message: str, level: str):
"""Send alert to administrators."""
admin_channel = self.bot.get_channel(ADMIN_CHANNEL_ID)
if not admin_channel:
return
colors = {
"info": nextcord.Color.blue(),
"warning": nextcord.Color.orange(),
"error": nextcord.Color.red(),
"critical": nextcord.Color.from_rgb(139, 0, 0) # Dark red
}
icons = {
"info": "ℹ️",
"warning": "⚠️",
"error": "❌",
"critical": "🚨"
}
embed = nextcord.Embed(
title=f"{icons.get(level, '❓')} {level.title()} Alert",
description=message,
color=colors.get(level, nextcord.Color.default()),
timestamp=datetime.now()
)
try:
await admin_channel.send(embed=embed)
except nextcord.Forbidden:
print(f"Cannot send alert to admin channel: {message}")
# Graceful shutdown handling
class GracefulTaskShutdown:
def __init__(self):
self.tasks = []
self.shutdown_event = asyncio.Event()
def register_task(self, task_loop):
"""Register a task for graceful shutdown."""
self.tasks.append(task_loop)
async def shutdown_all_tasks(self):
"""Gracefully shutdown all registered tasks."""
print("Initiating graceful task shutdown...")
# Set shutdown event
self.shutdown_event.set()
# Stop all tasks
for task in self.tasks:
if task.is_running():
task.stop()
# Wait for tasks to complete current iteration
await asyncio.sleep(2)
print("All tasks shut down gracefully")
@tasks.loop(seconds=10)
async def example_task(self):
"""Example task with shutdown check."""
if self.shutdown_event.is_set():
print("Shutdown requested - stopping task")
self.example_task.stop()
return
# Normal task work here
await self.do_task_work()
async def do_task_work(self):
"""Simulate task work."""
await asyncio.sleep(1)
# Usage with bot events
shutdown_manager = GracefulTaskShutdown()
@bot.event
async def on_ready():
"""Start all background tasks when bot is ready."""
print(f'{bot.user} is ready!')
# Start task manager
task_manager = RobustTaskManager(bot)
task_manager.health_monitor.start()
task_manager.api_sync.start()
# Register tasks for graceful shutdown
shutdown_manager.register_task(task_manager.health_monitor)
shutdown_manager.register_task(task_manager.api_sync)
print("All background tasks started")
@bot.event
async def on_disconnect():
"""Handle bot disconnection."""
print("Bot disconnected - maintaining tasks")
# Graceful shutdown on SIGINT/SIGTERM
import signal
def signal_handler(signum, frame):
"""Handle shutdown signals."""
print(f"Received signal {signum}")
asyncio.create_task(shutdown_manager.shutdown_all_tasks())
asyncio.create_task(bot.close())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Start the bot
if __name__ == "__main__":
bot.run(TOKEN)Sophisticated task patterns for complex scheduling and coordination requirements.
# Task coordination and dependencies
class TaskCoordinator:
def __init__(self, bot):
self.bot = bot
self.task_states = {}
self.dependencies = {}
@tasks.loop(minutes=1)
async def data_collector(self):
"""Collect data - prerequisite for other tasks."""
try:
# Collect various metrics
data = {
'timestamp': datetime.now(),
'guild_count': len(self.bot.guilds),
'user_count': sum(g.member_count for g in self.bot.guilds),
'message_rate': await self.get_message_rate(),
'memory_usage': await self.get_memory_usage()
}
# Store collected data
await self.store_metrics(data)
# Mark task as successful
self.task_states['data_collector'] = {
'last_success': datetime.now(),
'status': 'success',
'data': data
}
except Exception as e:
self.task_states['data_collector'] = {
'last_error': datetime.now(),
'status': 'error',
'error': str(e)
}
raise
@tasks.loop(minutes=5)
async def data_analyzer(self):
"""Analyze collected data - depends on data_collector."""
# Check if dependency is met
if not self.check_dependency('data_analyzer', 'data_collector', max_age_minutes=2):
print("Data analyzer skipped - waiting for fresh data")
return
try:
# Get the latest data
collector_state = self.task_states.get('data_collector', {})
if 'data' not in collector_state:
return
data = collector_state['data']
# Perform analysis
analysis = {
'growth_rate': await self.calculate_growth_rate(data),
'performance_score': await self.calculate_performance(data),
'alerts': await self.check_thresholds(data),
'timestamp': datetime.now()
}
# Store analysis results
await self.store_analysis(analysis)
# Update task state
self.task_states['data_analyzer'] = {
'last_success': datetime.now(),
'status': 'success',
'analysis': analysis
}
# Trigger alerts if necessary
if analysis['alerts']:
await self.send_analysis_alerts(analysis['alerts'])
except Exception as e:
self.task_states['data_analyzer'] = {
'last_error': datetime.now(),
'status': 'error',
'error': str(e)
}
print(f"Data analysis failed: {e}")
@tasks.loop(hours=1)
async def report_generator(self):
"""Generate reports - depends on both collector and analyzer."""
# Check multiple dependencies
dependencies = ['data_collector', 'data_analyzer']
if not all(self.check_dependency('report_generator', dep, max_age_minutes=10) for dep in dependencies):
print("Report generation skipped - dependencies not met")
return
try:
# Generate comprehensive report
report = await self.generate_hourly_report()
# Send to configured channels
await self.distribute_report(report)
# Update task state
self.task_states['report_generator'] = {
'last_success': datetime.now(),
'status': 'success',
'report_id': report['id']
}
except Exception as e:
self.task_states['report_generator'] = {
'last_error': datetime.now(),
'status': 'error',
'error': str(e)
}
print(f"Report generation failed: {e}")
def check_dependency(self, task_name: str, dependency: str, max_age_minutes: int = 5) -> bool:
"""Check if a task dependency is satisfied."""
dep_state = self.task_states.get(dependency, {})
# Check if dependency task has succeeded recently
if dep_state.get('status') != 'success':
return False
last_success = dep_state.get('last_success')
if not last_success:
return False
# Check if the success is recent enough
age = (datetime.now() - last_success).total_seconds() / 60
return age <= max_age_minutes
def start_all_tasks(self):
"""Start all coordinated tasks."""
self.data_collector.start()
self.data_analyzer.start()
self.report_generator.start()
# Conditional execution based on external factors
class ConditionalTasks:
def __init__(self, bot):
self.bot = bot
@tasks.loop(minutes=10)
async def weather_alerts(self):
"""Send weather alerts only during severe weather."""
# Check if any guilds have weather alerts enabled
enabled_guilds = await self.get_weather_enabled_guilds()
if not enabled_guilds:
return
# Get current weather conditions
severe_weather = await self.check_severe_weather()
if not severe_weather:
return # No severe weather, skip this iteration
# Send alerts to enabled guilds
for guild_id in enabled_guilds:
guild = self.bot.get_guild(guild_id)
if not guild:
continue
weather_channel = await self.get_guild_weather_channel(guild_id)
if not weather_channel:
continue
# Create weather alert embed
embed = nextcord.Embed(
title="🌪️ Severe Weather Alert",
description=severe_weather['description'],
color=nextcord.Color.red()
)
embed.add_field(name="Type", value=severe_weather['type'], inline=True)
embed.add_field(name="Severity", value=severe_weather['severity'], inline=True)
embed.add_field(name="Location", value=severe_weather['location'], inline=True)
if severe_weather.get('expires'):
embed.add_field(name="Expires", value=severe_weather['expires'], inline=False)
try:
await weather_channel.send(embed=embed)
except nextcord.Forbidden:
# Remove channel if bot can't send messages
await self.remove_weather_channel(guild_id)
@tasks.loop(time=time(hour=6)) # 6 AM daily
async def school_announcements(self):
"""Send school announcements only on weekdays during school year."""
now = datetime.now()
# Skip weekends
if now.weekday() >= 5: # Saturday = 5, Sunday = 6
return
# Skip during summer break (customize dates as needed)
summer_start = datetime(now.year, 6, 15) # June 15
summer_end = datetime(now.year, 8, 25) # August 25
if summer_start <= now <= summer_end:
return
# Skip during winter break
if now.month == 12 and now.day >= 20:
return
if now.month == 1 and now.day <= 5:
return
# Send school announcements
announcements = await self.get_school_announcements()
for guild_id in await self.get_school_guilds():
guild = self.bot.get_guild(guild_id)
if not guild:
continue
school_channel = await self.get_school_channel(guild_id)
if school_channel and announcements:
for announcement in announcements:
embed = nextcord.Embed(
title="🏫 School Announcement",
description=announcement['content'],
color=nextcord.Color.blue()
)
embed.set_footer(text=f"From: {announcement['source']}")
try:
await school_channel.send(embed=embed)
except nextcord.Forbidden:
pass
# Resource-aware task scheduling
class ResourceAwareScheduler:
def __init__(self, bot):
self.bot = bot
self.high_load_threshold = 0.8 # 80% resource usage
@tasks.loop(seconds=30)
async def resource_monitor(self):
"""Monitor system resources and adjust task frequency."""
resources = await self.get_system_resources()
if resources['cpu_percent'] > self.high_load_threshold:
# Reduce task frequency during high load
await self.reduce_task_frequency()
elif resources['cpu_percent'] < 0.3: # Low load
# Restore normal frequency during low load
await self.restore_task_frequency()
async def reduce_task_frequency(self):
"""Reduce frequency of non-critical tasks during high load."""
# Change intervals for resource-intensive tasks
if hasattr(self, 'data_backup') and self.data_backup.is_running():
self.data_backup.change_interval(minutes=15) # Reduce from 5 to 15 minutes
if hasattr(self, 'statistics_update') and self.statistics_update.is_running():
self.statistics_update.change_interval(minutes=30) # Reduce frequency
async def restore_task_frequency(self):
"""Restore normal task frequency when resources are available."""
if hasattr(self, 'data_backup') and self.data_backup.is_running():
self.data_backup.change_interval(minutes=5) # Back to normal
if hasattr(self, 'statistics_update') and self.statistics_update.is_running():
self.statistics_update.change_interval(minutes=10) # Back to normalThis comprehensive documentation covers all aspects of nextcord's task system, providing developers with robust tools for creating sophisticated background operations and scheduled tasks.
Install with Tessl CLI
npx tessl i tessl/pypi-nextcord