CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nextcord

A Python wrapper for the Discord API forked from discord.py

Pending
Overview
Eval results
Files

tasks.mddocs/

Nextcord Background Tasks

Task scheduling and background operations with the tasks extension for periodic and scheduled operations, providing robust async task management.

Loop Decorator

The primary decorator for creating background tasks with flexible scheduling options.

Task Loop Decorator { .api }

import nextcord
from nextcord.ext import tasks, commands
from datetime import datetime, time, timedelta
from typing import Optional, Union, Sequence, Any, Callable

def loop(
    *,
    seconds: float = MISSING,
    minutes: float = MISSING,
    hours: float = MISSING,
    time: Union[time, Sequence[time]] = MISSING,
    count: Optional[int] = None,
    reconnect: bool = True
) -> Callable:
    """Decorator to create a task loop.
    
    Parameters
    ----------
    seconds: float
        Number of seconds between iterations.
    minutes: float  
        Number of minutes between iterations.
    hours: float
        Number of hours between iterations.
    time: Union[time, Sequence[time]]
        Specific time(s) to run the task daily.
    count: Optional[int]
        Number of times to run the task. None for infinite.
    reconnect: bool
        Whether to reconnect the bot if it disconnects during the task.
    
    Returns
    -------
    Callable
        The decorated function as a Loop object.
    """
    ...

# Basic periodic tasks
@tasks.loop(seconds=30)
async def status_updater():
    """Update bot status every 30 seconds."""
    guild_count = len(bot.guilds)
    member_count = sum(guild.member_count for guild in bot.guilds)
    
    activity = nextcord.Activity(
        type=nextcord.ActivityType.watching,
        name=f"{guild_count} servers • {member_count} members"
    )
    
    await bot.change_presence(activity=activity)

@tasks.loop(minutes=5)
async def backup_data():
    """Backup important data every 5 minutes."""
    print(f"[{datetime.now()}] Starting data backup...")
    
    # Simulate backup process
    await backup_user_data()
    await backup_guild_settings()
    
    print(f"[{datetime.now()}] Data backup completed!")

@tasks.loop(hours=1)
async def cleanup_temp_files():
    """Clean up temporary files hourly."""
    import os
    import glob
    
    temp_dir = "./temp/"
    if os.path.exists(temp_dir):
        old_files = glob.glob(os.path.join(temp_dir, "*"))
        
        for file_path in old_files:
            try:
                # Delete files older than 1 hour
                if os.path.getmtime(file_path) < (datetime.now().timestamp() - 3600):
                    os.remove(file_path)
                    print(f"Deleted old temp file: {file_path}")
            except OSError:
                pass

# Daily tasks at specific times
@tasks.loop(time=time(hour=0, minute=0))  # Midnight UTC
async def daily_reset():
    """Reset daily statistics at midnight."""
    print("Performing daily reset...")
    
    # Reset daily user stats
    await reset_daily_stats()
    
    # Send daily report to admin channel
    admin_channel = bot.get_channel(ADMIN_CHANNEL_ID)
    if admin_channel:
        stats = await get_daily_statistics()
        
        embed = nextcord.Embed(
            title="📊 Daily Report",
            description=f"Statistics for {datetime.now().strftime('%Y-%m-%d')}",
            color=nextcord.Color.blue()
        )
        
        embed.add_field(name="Messages Processed", value=stats['messages'], inline=True)
        embed.add_field(name="Commands Used", value=stats['commands'], inline=True)
        embed.add_field(name="New Users", value=stats['new_users'], inline=True)
        
        await admin_channel.send(embed=embed)

# Multiple daily times
@tasks.loop(time=[time(hour=9), time(hour=15), time(hour=21)])  # 9 AM, 3 PM, 9 PM
async def reminder_check():
    """Check for due reminders three times daily."""
    due_reminders = await get_due_reminders()
    
    for reminder in due_reminders:
        try:
            user = bot.get_user(reminder['user_id'])
            if user:
                embed = nextcord.Embed(
                    title="⏰ Reminder",
                    description=reminder['message'],
                    color=nextcord.Color.orange()
                )
                embed.set_footer(text=f"Set on {reminder['created_at']}")
                
                await user.send(embed=embed)
                await mark_reminder_completed(reminder['id'])
        except nextcord.Forbidden:
            # User has DMs disabled
            pass

# Limited count tasks
@tasks.loop(seconds=10, count=5)
async def startup_checks():
    """Perform startup checks 5 times with 10-second intervals."""
    check_number = startup_checks.current_loop + 1
    print(f"Startup check {check_number}/5")
    
    # Check database connection
    if await check_database_connection():
        print("✅ Database connection OK")
    else:
        print("❌ Database connection failed")
    
    # Check API endpoints
    if await check_external_apis():
        print("✅ External APIs OK")
    else:
        print("❌ External API check failed")

Loop Class

The Loop class provides control over background tasks with start, stop, and restart functionality.

Loop Control Methods { .api }

class Loop:
    """Represents a background task loop.
    
    Attributes
    ----------
    coro: Callable
        The coroutine function being looped.
    seconds: Optional[float]
        Seconds between each iteration.
    minutes: Optional[float]
        Minutes between each iteration.
    hours: Optional[float]
        Hours between each iteration.
    time: Optional[Sequence[time]]
        Specific times to run daily.
    count: Optional[int]
        Maximum number of iterations.
    current_loop: int
        Current iteration number (0-indexed).
    next_iteration: Optional[datetime]
        When the next iteration will run.
    """
    
    def start(self, *args, **kwargs) -> None:
        """Start the loop.
        
        Parameters
        ----------
        *args, **kwargs
            Arguments to pass to the loop function.
        """
        ...
    
    def stop(self) -> None:
        """Stop the loop."""
        ...
    
    def restart(self, *args, **kwargs) -> None:
        """Restart the loop with new arguments.
        
        Parameters
        ----------
        *args, **kwargs
            New arguments to pass to the loop function.
        """
        ...
    
    def cancel(self) -> None:
        """Cancel the loop (alias for stop)."""
        ...
    
    def change_interval(
        self,
        *,
        seconds: float = MISSING,
        minutes: float = MISSING,
        hours: float = MISSING,
        time: Union[time, Sequence[time]] = MISSING
    ) -> None:
        """Change the loop interval.
        
        Parameters
        ----------
        seconds: float
            New seconds interval.
        minutes: float
            New minutes interval.
        hours: float
            New hours interval.
        time: Union[time, Sequence[time]]
            New time(s) for daily execution.
        """
        ...
    
    def is_running(self) -> bool:
        """bool: Whether the loop is currently running."""
        ...
    
    def is_being_cancelled(self) -> bool:
        """bool: Whether the loop is being cancelled."""
        ...
    
    def failed(self) -> bool:
        """bool: Whether the loop has failed and stopped."""
        ...
    
    def get_task(self) -> Optional[asyncio.Task]:
        """Optional[asyncio.Task]: The underlying asyncio task."""
        ...
    
    # Event handlers that can be overridden
    async def before_loop(self) -> None:
        """Called before the loop starts."""
        ...
    
    async def after_loop(self) -> None:
        """Called after the loop ends."""
        ...
    
    def error(self, exception: Exception) -> None:
        """Called when an exception occurs in the loop.
        
        Parameters
        ----------
        exception: Exception
            The exception that occurred.
        """
        ...

# Advanced loop control example
class DatabaseManager:
    def __init__(self, bot):
        self.bot = bot
        self.connection_check.start()
        self.maintenance_task.start()
    
    @tasks.loop(minutes=1)
    async def connection_check(self):
        """Monitor database connection health."""
        try:
            # Test database connection
            await self.test_connection()
            
            if not self.is_healthy:
                print("Database connection restored!")
                self.is_healthy = True
                
        except Exception as e:
            if self.is_healthy:
                print(f"Database connection lost: {e}")
                self.is_healthy = False
                
                # Notify administrators
                await self.notify_admins("🚨 Database connection lost!")
    
    @connection_check.before_loop
    async def before_connection_check(self):
        """Wait for bot to be ready before starting connection checks."""
        await self.bot.wait_until_ready()
        self.is_healthy = True
        print("Starting database connection monitoring...")
    
    @connection_check.error
    async def connection_check_error(self, exception):
        """Handle errors in connection check loop."""
        print(f"Error in connection check: {exception}")
        
        # If the error is critical, restart the loop
        if isinstance(exception, CriticalDatabaseError):
            print("Critical database error - restarting connection check...")
            self.connection_check.restart()
    
    @tasks.loop(hours=24)
    async def maintenance_task(self):
        """Daily database maintenance."""
        print("Starting daily database maintenance...")
        
        try:
            await self.optimize_tables()
            await self.clean_old_records()
            await self.update_statistics()
            
            print("Database maintenance completed successfully")
            
        except Exception as e:
            print(f"Maintenance task failed: {e}")
            raise

# Dynamic loop management
class TaskManager:
    def __init__(self, bot):
        self.bot = bot
        self.active_loops = {}
    
    def create_reminder_loop(self, user_id: int, message: str, interval: int):
        """Create a personalized reminder loop."""
        
        @tasks.loop(seconds=interval)
        async def user_reminder():
            user = self.bot.get_user(user_id)
            if user:
                try:
                    await user.send(f"⏰ Reminder: {message}")
                except nextcord.Forbidden:
                    # Stop loop if user blocks bot
                    user_reminder.stop()
                    del self.active_loops[user_id]
        
        # Store and start the loop
        self.active_loops[user_id] = user_reminder
        user_reminder.start()
        
        return user_reminder
    
    def stop_user_reminder(self, user_id: int):
        """Stop a user's reminder loop."""
        if user_id in self.active_loops:
            self.active_loops[user_id].stop()
            del self.active_loops[user_id]
            return True
        return False
    
    def get_user_reminder_status(self, user_id: int) -> dict:
        """Get status of a user's reminder loop."""
        if user_id not in self.active_loops:
            return {"active": False}
        
        loop = self.active_loops[user_id]
        return {
            "active": loop.is_running(),
            "current_loop": loop.current_loop,
            "next_iteration": loop.next_iteration
        }

# Task manager usage
task_manager = TaskManager(bot)

@bot.command()
async def set_reminder(ctx, interval: int, *, message: str):
    """Set a personal reminder that repeats every X seconds."""
    if interval < 60:
        await ctx.send("❌ Interval must be at least 60 seconds.")
        return
    
    if interval > 86400:  # 24 hours
        await ctx.send("❌ Interval cannot exceed 24 hours.")
        return
    
    # Stop existing reminder if any
    task_manager.stop_user_reminder(ctx.author.id)
    
    # Create new reminder
    loop = task_manager.create_reminder_loop(ctx.author.id, message, interval)
    
    await ctx.send(f"✅ Reminder set! Will remind you every {interval} seconds: {message}")

@bot.command()
async def stop_reminder(ctx):
    """Stop your personal reminder."""
    if task_manager.stop_user_reminder(ctx.author.id):
        await ctx.send("✅ Your reminder has been stopped.")
    else:
        await ctx.send("❌ You don't have an active reminder.")

@bot.command()
async def reminder_status(ctx):
    """Check your reminder status."""
    status = task_manager.get_user_reminder_status(ctx.author.id)
    
    if not status["active"]:
        await ctx.send("You don't have an active reminder.")
        return
    
    embed = nextcord.Embed(title="Reminder Status", color=nextcord.Color.blue())
    embed.add_field(name="Status", value="Active ✅", inline=True)
    embed.add_field(name="Iteration", value=status["current_loop"] + 1, inline=True)
    
    if status["next_iteration"]:
        next_time = status["next_iteration"].strftime("%H:%M:%S UTC")
        embed.add_field(name="Next Reminder", value=next_time, inline=True)
    
    await ctx.send(embed=embed)

Task Scheduling

Advanced scheduling patterns for complex timing requirements.

Time-based Scheduling { .api }

from datetime import time, datetime, timezone, timedelta
import asyncio

# Multiple specific times
@tasks.loop(time=[
    time(hour=8, minute=0),   # 8:00 AM
    time(hour=12, minute=0),  # 12:00 PM  
    time(hour=18, minute=0),  # 6:00 PM
    time(hour=22, minute=0)   # 10:00 PM
])
async def quarterly_announcements():
    """Send announcements four times daily."""
    announcements = await get_pending_announcements()
    
    for guild in bot.guilds:
        announcement_channel = get_guild_announcement_channel(guild.id)
        if announcement_channel and announcements:
            for announcement in announcements:
                embed = nextcord.Embed(
                    title="📢 Announcement",
                    description=announcement['message'],
                    color=nextcord.Color.gold()
                )
                embed.set_footer(text=announcement['author'])
                
                try:
                    await announcement_channel.send(embed=embed)
                except nextcord.Forbidden:
                    pass  # No permission to send messages

# Timezone-aware scheduling
import pytz

@tasks.loop(time=time(hour=9, minute=0, tzinfo=pytz.timezone('US/Eastern')))
async def eastern_morning_report():
    """Send morning report at 9 AM Eastern Time."""
    eastern = pytz.timezone('US/Eastern')
    current_time = datetime.now(eastern)
    
    embed = nextcord.Embed(
        title="🌅 Morning Report",
        description=f"Good morning! It's {current_time.strftime('%A, %B %d, %Y')}",
        color=nextcord.Color.orange()
    )
    
    # Add daily statistics
    stats = await get_overnight_statistics()
    embed.add_field(name="Overnight Activity", value=f"{stats['messages']} messages", inline=True)
    embed.add_field(name="New Members", value=stats['new_members'], inline=True)
    embed.add_field(name="Server Status", value="🟢 Online", inline=True)
    
    # Send to all configured channels
    for guild in bot.guilds:
        channel = get_guild_report_channel(guild.id)
        if channel:
            try:
                await channel.send(embed=embed)
            except nextcord.Forbidden:
                pass

# Weekly scheduling
@tasks.loop(time=time(hour=10, minute=0))  # Daily at 10 AM
async def weekly_summary():
    """Send weekly summary every Sunday."""
    if datetime.now().weekday() != 6:  # Sunday is 6
        return
    
    # Calculate week range
    today = datetime.now()
    week_start = today - timedelta(days=7)
    
    embed = nextcord.Embed(
        title="📊 Weekly Summary",
        description=f"Week of {week_start.strftime('%B %d')} - {today.strftime('%B %d, %Y')}",
        color=nextcord.Color.purple()
    )
    
    # Gather weekly statistics
    weekly_stats = await get_weekly_statistics(week_start, today)
    
    embed.add_field(name="Total Messages", value=f"{weekly_stats['messages']:,}", inline=True)
    embed.add_field(name="Active Users", value=f"{weekly_stats['active_users']:,}", inline=True)
    embed.add_field(name="Commands Used", value=f"{weekly_stats['commands']:,}", inline=True)
    embed.add_field(name="New Members", value=f"{weekly_stats['new_members']:,}", inline=True)
    embed.add_field(name="Peak Hour", value=weekly_stats['peak_hour'], inline=True)
    embed.add_field(name="Top Channel", value=weekly_stats['top_channel'], inline=True)
    
    # Send to admin channels
    for guild in bot.guilds:
        admin_channel = get_guild_admin_channel(guild.id)
        if admin_channel:
            try:
                await admin_channel.send(embed=embed)
            except nextcord.Forbidden:
                pass

# Monthly scheduling
@tasks.loop(time=time(hour=0, minute=0))  # Daily at midnight
async def monthly_cleanup():
    """Perform monthly cleanup on the first day of each month."""
    today = datetime.now()
    if today.day != 1:  # Only run on first day of month
        return
    
    print(f"Starting monthly cleanup for {today.strftime('%B %Y')}")
    
    try:
        # Archive old messages
        await archive_old_messages(days=90)
        
        # Clean up inactive users
        inactive_users = await cleanup_inactive_users(days=30)
        
        # Generate monthly report
        report = await generate_monthly_report(today.year, today.month - 1)
        
        # Send admin notification
        admin_channel = bot.get_channel(ADMIN_CHANNEL_ID)
        if admin_channel:
            embed = nextcord.Embed(
                title="🗂️ Monthly Cleanup Complete",
                description=f"Cleanup completed for {(today - timedelta(days=1)).strftime('%B %Y')}",
                color=nextcord.Color.green()
            )
            
            embed.add_field(name="Messages Archived", value=f"{report['archived_messages']:,}", inline=True)
            embed.add_field(name="Inactive Users Removed", value=len(inactive_users), inline=True)
            embed.add_field(name="Storage Freed", value=f"{report['storage_freed']} MB", inline=True)
            
            await admin_channel.send(embed=embed)
            
        print("Monthly cleanup completed successfully")
        
    except Exception as e:
        print(f"Monthly cleanup failed: {e}")
        
        # Notify admins of failure
        if admin_channel:
            error_embed = nextcord.Embed(
                title="❌ Monthly Cleanup Failed",
                description=f"Cleanup process encountered an error: {str(e)}",
                color=nextcord.Color.red()
            )
            await admin_channel.send(embed=error_embed)

Error Handling and Recovery

Robust error handling and automatic recovery mechanisms for background tasks.

Task Error Management { .api }

# Comprehensive error handling
class RobustTaskManager:
    def __init__(self, bot):
        self.bot = bot
        self.error_counts = {}
        self.max_retries = 3
        self.retry_delay = 60  # seconds
        
    @tasks.loop(minutes=5)
    async def health_monitor(self):
        """Monitor bot health and performance."""
        try:
            # Check memory usage
            memory_usage = await self.get_memory_usage()
            
            # Check database connection
            db_status = await self.check_database()
            
            # Check API rate limits
            rate_limit_status = await self.check_rate_limits()
            
            # Log health metrics
            await self.log_health_metrics({
                'memory': memory_usage,
                'database': db_status,
                'rate_limits': rate_limit_status,
                'timestamp': datetime.now()
            })
            
            # Alert if thresholds exceeded
            if memory_usage > 0.8:  # 80% memory usage
                await self.send_alert("High memory usage detected", "warning")
            
            if not db_status:
                await self.send_alert("Database connection failed", "error")
                
        except Exception as e:
            await self.handle_task_error('health_monitor', e)
    
    @health_monitor.error
    async def health_monitor_error(self, exception):
        """Handle errors in health monitoring."""
        print(f"Health monitor error: {exception}")
        
        # Log the error
        await self.log_error('health_monitor', exception)
        
        # Increment error count
        task_name = 'health_monitor'
        self.error_counts[task_name] = self.error_counts.get(task_name, 0) + 1
        
        # If too many errors, stop the task and alert admins
        if self.error_counts[task_name] >= self.max_retries:
            await self.send_alert(
                f"Health monitor has failed {self.max_retries} times and has been stopped",
                "critical"
            )
            self.health_monitor.stop()
    
    @tasks.loop(seconds=30)
    async def api_sync(self):
        """Sync data with external APIs."""
        try:
            # Reset error count on successful run
            self.error_counts.pop('api_sync', None)
            
            # Perform API synchronization
            await self.sync_user_data()
            await self.sync_server_stats()
            
        except aiohttp.ClientTimeout:
            # Temporary network issue - don't count as error
            print("API sync timed out - retrying next cycle")
            
        except aiohttp.ClientError as e:
            # Network error - retry with backoff
            await self.handle_network_error('api_sync', e)
            
        except Exception as e:
            await self.handle_task_error('api_sync', e)
    
    @api_sync.error
    async def api_sync_error(self, exception):
        """Handle API sync errors with exponential backoff."""
        task_name = 'api_sync'
        error_count = self.error_counts.get(task_name, 0) + 1
        self.error_counts[task_name] = error_count
        
        print(f"API sync error #{error_count}: {exception}")
        
        if error_count < self.max_retries:
            # Exponential backoff: 60s, 120s, 240s
            delay = self.retry_delay * (2 ** (error_count - 1))
            print(f"Retrying API sync in {delay} seconds...")
            
            # Restart with delay
            await asyncio.sleep(delay)
            if not self.api_sync.is_running():
                self.api_sync.start()
        else:
            await self.send_alert(
                "API sync has failed repeatedly and requires manual intervention",
                "critical"
            )
            self.api_sync.stop()
    
    async def handle_task_error(self, task_name: str, exception: Exception):
        """Generic task error handler."""
        print(f"Task {task_name} error: {exception}")
        
        # Log to file/database
        await self.log_error(task_name, exception)
        
        # Send alert for critical errors
        if isinstance(exception, (DatabaseError, CriticalError)):
            await self.send_alert(
                f"Critical error in {task_name}: {exception}",
                "critical"
            )
    
    async def send_alert(self, message: str, level: str):
        """Send alert to administrators."""
        admin_channel = self.bot.get_channel(ADMIN_CHANNEL_ID)
        if not admin_channel:
            return
        
        colors = {
            "info": nextcord.Color.blue(),
            "warning": nextcord.Color.orange(),
            "error": nextcord.Color.red(),
            "critical": nextcord.Color.from_rgb(139, 0, 0)  # Dark red
        }
        
        icons = {
            "info": "ℹ️",
            "warning": "⚠️", 
            "error": "❌",
            "critical": "🚨"
        }
        
        embed = nextcord.Embed(
            title=f"{icons.get(level, '❓')} {level.title()} Alert",
            description=message,
            color=colors.get(level, nextcord.Color.default()),
            timestamp=datetime.now()
        )
        
        try:
            await admin_channel.send(embed=embed)
        except nextcord.Forbidden:
            print(f"Cannot send alert to admin channel: {message}")

# Graceful shutdown handling
class GracefulTaskShutdown:
    def __init__(self):
        self.tasks = []
        self.shutdown_event = asyncio.Event()
    
    def register_task(self, task_loop):
        """Register a task for graceful shutdown."""
        self.tasks.append(task_loop)
    
    async def shutdown_all_tasks(self):
        """Gracefully shutdown all registered tasks."""
        print("Initiating graceful task shutdown...")
        
        # Set shutdown event
        self.shutdown_event.set()
        
        # Stop all tasks
        for task in self.tasks:
            if task.is_running():
                task.stop()
        
        # Wait for tasks to complete current iteration
        await asyncio.sleep(2)
        
        print("All tasks shut down gracefully")
    
    @tasks.loop(seconds=10)
    async def example_task(self):
        """Example task with shutdown check."""
        if self.shutdown_event.is_set():
            print("Shutdown requested - stopping task")
            self.example_task.stop()
            return
        
        # Normal task work here
        await self.do_task_work()
    
    async def do_task_work(self):
        """Simulate task work."""
        await asyncio.sleep(1)

# Usage with bot events
shutdown_manager = GracefulTaskShutdown()

@bot.event
async def on_ready():
    """Start all background tasks when bot is ready."""
    print(f'{bot.user} is ready!')
    
    # Start task manager
    task_manager = RobustTaskManager(bot)
    task_manager.health_monitor.start()
    task_manager.api_sync.start()
    
    # Register tasks for graceful shutdown
    shutdown_manager.register_task(task_manager.health_monitor)
    shutdown_manager.register_task(task_manager.api_sync)
    
    print("All background tasks started")

@bot.event
async def on_disconnect():
    """Handle bot disconnection."""
    print("Bot disconnected - maintaining tasks")

# Graceful shutdown on SIGINT/SIGTERM
import signal

def signal_handler(signum, frame):
    """Handle shutdown signals."""
    print(f"Received signal {signum}")
    asyncio.create_task(shutdown_manager.shutdown_all_tasks())
    asyncio.create_task(bot.close())

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

# Start the bot
if __name__ == "__main__":
    bot.run(TOKEN)

Advanced Task Patterns

Sophisticated task patterns for complex scheduling and coordination requirements.

Conditional and Dependent Tasks { .api }

# Task coordination and dependencies
class TaskCoordinator:
    def __init__(self, bot):
        self.bot = bot
        self.task_states = {}
        self.dependencies = {}
        
    @tasks.loop(minutes=1)
    async def data_collector(self):
        """Collect data - prerequisite for other tasks."""
        try:
            # Collect various metrics
            data = {
                'timestamp': datetime.now(),
                'guild_count': len(self.bot.guilds),
                'user_count': sum(g.member_count for g in self.bot.guilds),
                'message_rate': await self.get_message_rate(),
                'memory_usage': await self.get_memory_usage()
            }
            
            # Store collected data
            await self.store_metrics(data)
            
            # Mark task as successful
            self.task_states['data_collector'] = {
                'last_success': datetime.now(),
                'status': 'success',
                'data': data
            }
            
        except Exception as e:
            self.task_states['data_collector'] = {
                'last_error': datetime.now(),
                'status': 'error',
                'error': str(e)
            }
            raise
    
    @tasks.loop(minutes=5)
    async def data_analyzer(self):
        """Analyze collected data - depends on data_collector."""
        # Check if dependency is met
        if not self.check_dependency('data_analyzer', 'data_collector', max_age_minutes=2):
            print("Data analyzer skipped - waiting for fresh data")
            return
        
        try:
            # Get the latest data
            collector_state = self.task_states.get('data_collector', {})
            if 'data' not in collector_state:
                return
            
            data = collector_state['data']
            
            # Perform analysis
            analysis = {
                'growth_rate': await self.calculate_growth_rate(data),
                'performance_score': await self.calculate_performance(data),
                'alerts': await self.check_thresholds(data),
                'timestamp': datetime.now()
            }
            
            # Store analysis results
            await self.store_analysis(analysis)
            
            # Update task state
            self.task_states['data_analyzer'] = {
                'last_success': datetime.now(),
                'status': 'success',
                'analysis': analysis
            }
            
            # Trigger alerts if necessary
            if analysis['alerts']:
                await self.send_analysis_alerts(analysis['alerts'])
                
        except Exception as e:
            self.task_states['data_analyzer'] = {
                'last_error': datetime.now(),
                'status': 'error',
                'error': str(e)
            }
            print(f"Data analysis failed: {e}")
    
    @tasks.loop(hours=1)
    async def report_generator(self):
        """Generate reports - depends on both collector and analyzer."""
        # Check multiple dependencies
        dependencies = ['data_collector', 'data_analyzer']
        
        if not all(self.check_dependency('report_generator', dep, max_age_minutes=10) for dep in dependencies):
            print("Report generation skipped - dependencies not met")
            return
        
        try:
            # Generate comprehensive report
            report = await self.generate_hourly_report()
            
            # Send to configured channels
            await self.distribute_report(report)
            
            # Update task state
            self.task_states['report_generator'] = {
                'last_success': datetime.now(),
                'status': 'success',
                'report_id': report['id']
            }
            
        except Exception as e:
            self.task_states['report_generator'] = {
                'last_error': datetime.now(),
                'status': 'error',
                'error': str(e)
            }
            print(f"Report generation failed: {e}")
    
    def check_dependency(self, task_name: str, dependency: str, max_age_minutes: int = 5) -> bool:
        """Check if a task dependency is satisfied."""
        dep_state = self.task_states.get(dependency, {})
        
        # Check if dependency task has succeeded recently
        if dep_state.get('status') != 'success':
            return False
        
        last_success = dep_state.get('last_success')
        if not last_success:
            return False
        
        # Check if the success is recent enough
        age = (datetime.now() - last_success).total_seconds() / 60
        return age <= max_age_minutes
    
    def start_all_tasks(self):
        """Start all coordinated tasks."""
        self.data_collector.start()
        self.data_analyzer.start()
        self.report_generator.start()

# Conditional execution based on external factors
class ConditionalTasks:
    def __init__(self, bot):
        self.bot = bot
        
    @tasks.loop(minutes=10)
    async def weather_alerts(self):
        """Send weather alerts only during severe weather."""
        # Check if any guilds have weather alerts enabled
        enabled_guilds = await self.get_weather_enabled_guilds()
        if not enabled_guilds:
            return
        
        # Get current weather conditions
        severe_weather = await self.check_severe_weather()
        
        if not severe_weather:
            return  # No severe weather, skip this iteration
        
        # Send alerts to enabled guilds
        for guild_id in enabled_guilds:
            guild = self.bot.get_guild(guild_id)
            if not guild:
                continue
            
            weather_channel = await self.get_guild_weather_channel(guild_id)
            if not weather_channel:
                continue
            
            # Create weather alert embed
            embed = nextcord.Embed(
                title="🌪️ Severe Weather Alert",
                description=severe_weather['description'],
                color=nextcord.Color.red()
            )
            
            embed.add_field(name="Type", value=severe_weather['type'], inline=True)
            embed.add_field(name="Severity", value=severe_weather['severity'], inline=True)
            embed.add_field(name="Location", value=severe_weather['location'], inline=True)
            
            if severe_weather.get('expires'):
                embed.add_field(name="Expires", value=severe_weather['expires'], inline=False)
            
            try:
                await weather_channel.send(embed=embed)
            except nextcord.Forbidden:
                # Remove channel if bot can't send messages
                await self.remove_weather_channel(guild_id)
    
    @tasks.loop(time=time(hour=6))  # 6 AM daily
    async def school_announcements(self):
        """Send school announcements only on weekdays during school year."""
        now = datetime.now()
        
        # Skip weekends
        if now.weekday() >= 5:  # Saturday = 5, Sunday = 6
            return
        
        # Skip during summer break (customize dates as needed)
        summer_start = datetime(now.year, 6, 15)  # June 15
        summer_end = datetime(now.year, 8, 25)    # August 25
        
        if summer_start <= now <= summer_end:
            return
        
        # Skip during winter break
        if now.month == 12 and now.day >= 20:
            return
        if now.month == 1 and now.day <= 5:
            return
        
        # Send school announcements
        announcements = await self.get_school_announcements()
        
        for guild_id in await self.get_school_guilds():
            guild = self.bot.get_guild(guild_id)
            if not guild:
                continue
                
            school_channel = await self.get_school_channel(guild_id)
            if school_channel and announcements:
                for announcement in announcements:
                    embed = nextcord.Embed(
                        title="🏫 School Announcement",
                        description=announcement['content'],
                        color=nextcord.Color.blue()
                    )
                    
                    embed.set_footer(text=f"From: {announcement['source']}")
                    
                    try:
                        await school_channel.send(embed=embed)
                    except nextcord.Forbidden:
                        pass

# Resource-aware task scheduling
class ResourceAwareScheduler:
    def __init__(self, bot):
        self.bot = bot
        self.high_load_threshold = 0.8  # 80% resource usage
        
    @tasks.loop(seconds=30)
    async def resource_monitor(self):
        """Monitor system resources and adjust task frequency."""
        resources = await self.get_system_resources()
        
        if resources['cpu_percent'] > self.high_load_threshold:
            # Reduce task frequency during high load
            await self.reduce_task_frequency()
        elif resources['cpu_percent'] < 0.3:  # Low load
            # Restore normal frequency during low load
            await self.restore_task_frequency()
    
    async def reduce_task_frequency(self):
        """Reduce frequency of non-critical tasks during high load."""
        # Change intervals for resource-intensive tasks
        if hasattr(self, 'data_backup') and self.data_backup.is_running():
            self.data_backup.change_interval(minutes=15)  # Reduce from 5 to 15 minutes
        
        if hasattr(self, 'statistics_update') and self.statistics_update.is_running():
            self.statistics_update.change_interval(minutes=30)  # Reduce frequency
    
    async def restore_task_frequency(self):
        """Restore normal task frequency when resources are available."""
        if hasattr(self, 'data_backup') and self.data_backup.is_running():
            self.data_backup.change_interval(minutes=5)  # Back to normal
        
        if hasattr(self, 'statistics_update') and self.statistics_update.is_running():
            self.statistics_update.change_interval(minutes=10)  # Back to normal

This comprehensive documentation covers all aspects of nextcord's task system, providing developers with robust tools for creating sophisticated background operations and scheduled tasks.

Install with Tessl CLI

npx tessl i tessl/pypi-nextcord

docs

application-commands.md

channels.md

client.md

commands.md

errors.md

events.md

guild.md

index.md

messages.md

permissions.md

tasks.md

ui.md

users.md

utilities.md

voice.md

webhooks.md

tile.json