0
# Nextcord Background Tasks
1
2
Task scheduling and background operations with the tasks extension for periodic and scheduled operations, providing robust async task management.
3
4
## Loop Decorator
5
6
The primary decorator for creating background tasks with flexible scheduling options.
7
8
### Task Loop Decorator { .api }
9
10
```python
11
import nextcord
12
from nextcord.ext import tasks, commands
13
from datetime import datetime, time, timedelta
14
from typing import Optional, Union, Sequence, Any, Callable
15
16
def loop(
17
*,
18
seconds: float = MISSING,
19
minutes: float = MISSING,
20
hours: float = MISSING,
21
time: Union[time, Sequence[time]] = MISSING,
22
count: Optional[int] = None,
23
reconnect: bool = True
24
) -> Callable:
25
"""Decorator to create a task loop.
26
27
Parameters
28
----------
29
seconds: float
30
Number of seconds between iterations.
31
minutes: float
32
Number of minutes between iterations.
33
hours: float
34
Number of hours between iterations.
35
time: Union[time, Sequence[time]]
36
Specific time(s) to run the task daily.
37
count: Optional[int]
38
Number of times to run the task. None for infinite.
39
reconnect: bool
40
Whether to reconnect the bot if it disconnects during the task.
41
42
Returns
43
-------
44
Callable
45
The decorated function as a Loop object.
46
"""
47
...
48
49
# Basic periodic tasks
50
@tasks.loop(seconds=30)
51
async def status_updater():
52
"""Update bot status every 30 seconds."""
53
guild_count = len(bot.guilds)
54
member_count = sum(guild.member_count for guild in bot.guilds)
55
56
activity = nextcord.Activity(
57
type=nextcord.ActivityType.watching,
58
name=f"{guild_count} servers β’ {member_count} members"
59
)
60
61
await bot.change_presence(activity=activity)
62
63
@tasks.loop(minutes=5)
64
async def backup_data():
65
"""Backup important data every 5 minutes."""
66
print(f"[{datetime.now()}] Starting data backup...")
67
68
# Simulate backup process
69
await backup_user_data()
70
await backup_guild_settings()
71
72
print(f"[{datetime.now()}] Data backup completed!")
73
74
@tasks.loop(hours=1)
75
async def cleanup_temp_files():
76
"""Clean up temporary files hourly."""
77
import os
78
import glob
79
80
temp_dir = "./temp/"
81
if os.path.exists(temp_dir):
82
old_files = glob.glob(os.path.join(temp_dir, "*"))
83
84
for file_path in old_files:
85
try:
86
# Delete files older than 1 hour
87
if os.path.getmtime(file_path) < (datetime.now().timestamp() - 3600):
88
os.remove(file_path)
89
print(f"Deleted old temp file: {file_path}")
90
except OSError:
91
pass
92
93
# Daily tasks at specific times
94
@tasks.loop(time=time(hour=0, minute=0)) # Midnight UTC
95
async def daily_reset():
96
"""Reset daily statistics at midnight."""
97
print("Performing daily reset...")
98
99
# Reset daily user stats
100
await reset_daily_stats()
101
102
# Send daily report to admin channel
103
admin_channel = bot.get_channel(ADMIN_CHANNEL_ID)
104
if admin_channel:
105
stats = await get_daily_statistics()
106
107
embed = nextcord.Embed(
108
title="π Daily Report",
109
description=f"Statistics for {datetime.now().strftime('%Y-%m-%d')}",
110
color=nextcord.Color.blue()
111
)
112
113
embed.add_field(name="Messages Processed", value=stats['messages'], inline=True)
114
embed.add_field(name="Commands Used", value=stats['commands'], inline=True)
115
embed.add_field(name="New Users", value=stats['new_users'], inline=True)
116
117
await admin_channel.send(embed=embed)
118
119
# Multiple daily times
120
@tasks.loop(time=[time(hour=9), time(hour=15), time(hour=21)]) # 9 AM, 3 PM, 9 PM
121
async def reminder_check():
122
"""Check for due reminders three times daily."""
123
due_reminders = await get_due_reminders()
124
125
for reminder in due_reminders:
126
try:
127
user = bot.get_user(reminder['user_id'])
128
if user:
129
embed = nextcord.Embed(
130
title="β° Reminder",
131
description=reminder['message'],
132
color=nextcord.Color.orange()
133
)
134
embed.set_footer(text=f"Set on {reminder['created_at']}")
135
136
await user.send(embed=embed)
137
await mark_reminder_completed(reminder['id'])
138
except nextcord.Forbidden:
139
# User has DMs disabled
140
pass
141
142
# Limited count tasks
143
@tasks.loop(seconds=10, count=5)
144
async def startup_checks():
145
"""Perform startup checks 5 times with 10-second intervals."""
146
check_number = startup_checks.current_loop + 1
147
print(f"Startup check {check_number}/5")
148
149
# Check database connection
150
if await check_database_connection():
151
print("β Database connection OK")
152
else:
153
print("β Database connection failed")
154
155
# Check API endpoints
156
if await check_external_apis():
157
print("β External APIs OK")
158
else:
159
print("β External API check failed")
160
```
161
162
## Loop Class
163
164
The Loop class provides control over background tasks with start, stop, and restart functionality.
165
166
### Loop Control Methods { .api }
167
168
```python
169
class Loop:
170
"""Represents a background task loop.
171
172
Attributes
173
----------
174
coro: Callable
175
The coroutine function being looped.
176
seconds: Optional[float]
177
Seconds between each iteration.
178
minutes: Optional[float]
179
Minutes between each iteration.
180
hours: Optional[float]
181
Hours between each iteration.
182
time: Optional[Sequence[time]]
183
Specific times to run daily.
184
count: Optional[int]
185
Maximum number of iterations.
186
current_loop: int
187
Current iteration number (0-indexed).
188
next_iteration: Optional[datetime]
189
When the next iteration will run.
190
"""
191
192
def start(self, *args, **kwargs) -> None:
193
"""Start the loop.
194
195
Parameters
196
----------
197
*args, **kwargs
198
Arguments to pass to the loop function.
199
"""
200
...
201
202
def stop(self) -> None:
203
"""Stop the loop."""
204
...
205
206
def restart(self, *args, **kwargs) -> None:
207
"""Restart the loop with new arguments.
208
209
Parameters
210
----------
211
*args, **kwargs
212
New arguments to pass to the loop function.
213
"""
214
...
215
216
def cancel(self) -> None:
217
"""Cancel the loop (alias for stop)."""
218
...
219
220
def change_interval(
221
self,
222
*,
223
seconds: float = MISSING,
224
minutes: float = MISSING,
225
hours: float = MISSING,
226
time: Union[time, Sequence[time]] = MISSING
227
) -> None:
228
"""Change the loop interval.
229
230
Parameters
231
----------
232
seconds: float
233
New seconds interval.
234
minutes: float
235
New minutes interval.
236
hours: float
237
New hours interval.
238
time: Union[time, Sequence[time]]
239
New time(s) for daily execution.
240
"""
241
...
242
243
def is_running(self) -> bool:
244
"""bool: Whether the loop is currently running."""
245
...
246
247
def is_being_cancelled(self) -> bool:
248
"""bool: Whether the loop is being cancelled."""
249
...
250
251
def failed(self) -> bool:
252
"""bool: Whether the loop has failed and stopped."""
253
...
254
255
def get_task(self) -> Optional[asyncio.Task]:
256
"""Optional[asyncio.Task]: The underlying asyncio task."""
257
...
258
259
# Event handlers that can be overridden
260
async def before_loop(self) -> None:
261
"""Called before the loop starts."""
262
...
263
264
async def after_loop(self) -> None:
265
"""Called after the loop ends."""
266
...
267
268
def error(self, exception: Exception) -> None:
269
"""Called when an exception occurs in the loop.
270
271
Parameters
272
----------
273
exception: Exception
274
The exception that occurred.
275
"""
276
...
277
278
# Advanced loop control example
279
class DatabaseManager:
280
def __init__(self, bot):
281
self.bot = bot
282
self.connection_check.start()
283
self.maintenance_task.start()
284
285
@tasks.loop(minutes=1)
286
async def connection_check(self):
287
"""Monitor database connection health."""
288
try:
289
# Test database connection
290
await self.test_connection()
291
292
if not self.is_healthy:
293
print("Database connection restored!")
294
self.is_healthy = True
295
296
except Exception as e:
297
if self.is_healthy:
298
print(f"Database connection lost: {e}")
299
self.is_healthy = False
300
301
# Notify administrators
302
await self.notify_admins("π¨ Database connection lost!")
303
304
@connection_check.before_loop
305
async def before_connection_check(self):
306
"""Wait for bot to be ready before starting connection checks."""
307
await self.bot.wait_until_ready()
308
self.is_healthy = True
309
print("Starting database connection monitoring...")
310
311
@connection_check.error
312
async def connection_check_error(self, exception):
313
"""Handle errors in connection check loop."""
314
print(f"Error in connection check: {exception}")
315
316
# If the error is critical, restart the loop
317
if isinstance(exception, CriticalDatabaseError):
318
print("Critical database error - restarting connection check...")
319
self.connection_check.restart()
320
321
@tasks.loop(hours=24)
322
async def maintenance_task(self):
323
"""Daily database maintenance."""
324
print("Starting daily database maintenance...")
325
326
try:
327
await self.optimize_tables()
328
await self.clean_old_records()
329
await self.update_statistics()
330
331
print("Database maintenance completed successfully")
332
333
except Exception as e:
334
print(f"Maintenance task failed: {e}")
335
raise
336
337
# Dynamic loop management
338
class TaskManager:
339
def __init__(self, bot):
340
self.bot = bot
341
self.active_loops = {}
342
343
def create_reminder_loop(self, user_id: int, message: str, interval: int):
344
"""Create a personalized reminder loop."""
345
346
@tasks.loop(seconds=interval)
347
async def user_reminder():
348
user = self.bot.get_user(user_id)
349
if user:
350
try:
351
await user.send(f"β° Reminder: {message}")
352
except nextcord.Forbidden:
353
# Stop loop if user blocks bot
354
user_reminder.stop()
355
del self.active_loops[user_id]
356
357
# Store and start the loop
358
self.active_loops[user_id] = user_reminder
359
user_reminder.start()
360
361
return user_reminder
362
363
def stop_user_reminder(self, user_id: int):
364
"""Stop a user's reminder loop."""
365
if user_id in self.active_loops:
366
self.active_loops[user_id].stop()
367
del self.active_loops[user_id]
368
return True
369
return False
370
371
def get_user_reminder_status(self, user_id: int) -> dict:
372
"""Get status of a user's reminder loop."""
373
if user_id not in self.active_loops:
374
return {"active": False}
375
376
loop = self.active_loops[user_id]
377
return {
378
"active": loop.is_running(),
379
"current_loop": loop.current_loop,
380
"next_iteration": loop.next_iteration
381
}
382
383
# Task manager usage
384
task_manager = TaskManager(bot)
385
386
@bot.command()
387
async def set_reminder(ctx, interval: int, *, message: str):
388
"""Set a personal reminder that repeats every X seconds."""
389
if interval < 60:
390
await ctx.send("β Interval must be at least 60 seconds.")
391
return
392
393
if interval > 86400: # 24 hours
394
await ctx.send("β Interval cannot exceed 24 hours.")
395
return
396
397
# Stop existing reminder if any
398
task_manager.stop_user_reminder(ctx.author.id)
399
400
# Create new reminder
401
loop = task_manager.create_reminder_loop(ctx.author.id, message, interval)
402
403
await ctx.send(f"β Reminder set! Will remind you every {interval} seconds: {message}")
404
405
@bot.command()
406
async def stop_reminder(ctx):
407
"""Stop your personal reminder."""
408
if task_manager.stop_user_reminder(ctx.author.id):
409
await ctx.send("β Your reminder has been stopped.")
410
else:
411
await ctx.send("β You don't have an active reminder.")
412
413
@bot.command()
414
async def reminder_status(ctx):
415
"""Check your reminder status."""
416
status = task_manager.get_user_reminder_status(ctx.author.id)
417
418
if not status["active"]:
419
await ctx.send("You don't have an active reminder.")
420
return
421
422
embed = nextcord.Embed(title="Reminder Status", color=nextcord.Color.blue())
423
embed.add_field(name="Status", value="Active β ", inline=True)
424
embed.add_field(name="Iteration", value=status["current_loop"] + 1, inline=True)
425
426
if status["next_iteration"]:
427
next_time = status["next_iteration"].strftime("%H:%M:%S UTC")
428
embed.add_field(name="Next Reminder", value=next_time, inline=True)
429
430
await ctx.send(embed=embed)
431
```
432
433
## Task Scheduling
434
435
Advanced scheduling patterns for complex timing requirements.
436
437
### Time-based Scheduling { .api }
438
439
```python
440
from datetime import time, datetime, timezone, timedelta
441
import asyncio
442
443
# Multiple specific times
444
@tasks.loop(time=[
445
time(hour=8, minute=0), # 8:00 AM
446
time(hour=12, minute=0), # 12:00 PM
447
time(hour=18, minute=0), # 6:00 PM
448
time(hour=22, minute=0) # 10:00 PM
449
])
450
async def quarterly_announcements():
451
"""Send announcements four times daily."""
452
announcements = await get_pending_announcements()
453
454
for guild in bot.guilds:
455
announcement_channel = get_guild_announcement_channel(guild.id)
456
if announcement_channel and announcements:
457
for announcement in announcements:
458
embed = nextcord.Embed(
459
title="π’ Announcement",
460
description=announcement['message'],
461
color=nextcord.Color.gold()
462
)
463
embed.set_footer(text=announcement['author'])
464
465
try:
466
await announcement_channel.send(embed=embed)
467
except nextcord.Forbidden:
468
pass # No permission to send messages
469
470
# Timezone-aware scheduling
471
import pytz
472
473
@tasks.loop(time=time(hour=9, minute=0, tzinfo=pytz.timezone('US/Eastern')))
474
async def eastern_morning_report():
475
"""Send morning report at 9 AM Eastern Time."""
476
eastern = pytz.timezone('US/Eastern')
477
current_time = datetime.now(eastern)
478
479
embed = nextcord.Embed(
480
title="π Morning Report",
481
description=f"Good morning! It's {current_time.strftime('%A, %B %d, %Y')}",
482
color=nextcord.Color.orange()
483
)
484
485
# Add daily statistics
486
stats = await get_overnight_statistics()
487
embed.add_field(name="Overnight Activity", value=f"{stats['messages']} messages", inline=True)
488
embed.add_field(name="New Members", value=stats['new_members'], inline=True)
489
embed.add_field(name="Server Status", value="π’ Online", inline=True)
490
491
# Send to all configured channels
492
for guild in bot.guilds:
493
channel = get_guild_report_channel(guild.id)
494
if channel:
495
try:
496
await channel.send(embed=embed)
497
except nextcord.Forbidden:
498
pass
499
500
# Weekly scheduling
501
@tasks.loop(time=time(hour=10, minute=0)) # Daily at 10 AM
502
async def weekly_summary():
503
"""Send weekly summary every Sunday."""
504
if datetime.now().weekday() != 6: # Sunday is 6
505
return
506
507
# Calculate week range
508
today = datetime.now()
509
week_start = today - timedelta(days=7)
510
511
embed = nextcord.Embed(
512
title="π Weekly Summary",
513
description=f"Week of {week_start.strftime('%B %d')} - {today.strftime('%B %d, %Y')}",
514
color=nextcord.Color.purple()
515
)
516
517
# Gather weekly statistics
518
weekly_stats = await get_weekly_statistics(week_start, today)
519
520
embed.add_field(name="Total Messages", value=f"{weekly_stats['messages']:,}", inline=True)
521
embed.add_field(name="Active Users", value=f"{weekly_stats['active_users']:,}", inline=True)
522
embed.add_field(name="Commands Used", value=f"{weekly_stats['commands']:,}", inline=True)
523
embed.add_field(name="New Members", value=f"{weekly_stats['new_members']:,}", inline=True)
524
embed.add_field(name="Peak Hour", value=weekly_stats['peak_hour'], inline=True)
525
embed.add_field(name="Top Channel", value=weekly_stats['top_channel'], inline=True)
526
527
# Send to admin channels
528
for guild in bot.guilds:
529
admin_channel = get_guild_admin_channel(guild.id)
530
if admin_channel:
531
try:
532
await admin_channel.send(embed=embed)
533
except nextcord.Forbidden:
534
pass
535
536
# Monthly scheduling
537
@tasks.loop(time=time(hour=0, minute=0)) # Daily at midnight
538
async def monthly_cleanup():
539
"""Perform monthly cleanup on the first day of each month."""
540
today = datetime.now()
541
if today.day != 1: # Only run on first day of month
542
return
543
544
print(f"Starting monthly cleanup for {today.strftime('%B %Y')}")
545
546
try:
547
# Archive old messages
548
await archive_old_messages(days=90)
549
550
# Clean up inactive users
551
inactive_users = await cleanup_inactive_users(days=30)
552
553
# Generate monthly report
554
report = await generate_monthly_report(today.year, today.month - 1)
555
556
# Send admin notification
557
admin_channel = bot.get_channel(ADMIN_CHANNEL_ID)
558
if admin_channel:
559
embed = nextcord.Embed(
560
title="ποΈ Monthly Cleanup Complete",
561
description=f"Cleanup completed for {(today - timedelta(days=1)).strftime('%B %Y')}",
562
color=nextcord.Color.green()
563
)
564
565
embed.add_field(name="Messages Archived", value=f"{report['archived_messages']:,}", inline=True)
566
embed.add_field(name="Inactive Users Removed", value=len(inactive_users), inline=True)
567
embed.add_field(name="Storage Freed", value=f"{report['storage_freed']} MB", inline=True)
568
569
await admin_channel.send(embed=embed)
570
571
print("Monthly cleanup completed successfully")
572
573
except Exception as e:
574
print(f"Monthly cleanup failed: {e}")
575
576
# Notify admins of failure
577
if admin_channel:
578
error_embed = nextcord.Embed(
579
title="β Monthly Cleanup Failed",
580
description=f"Cleanup process encountered an error: {str(e)}",
581
color=nextcord.Color.red()
582
)
583
await admin_channel.send(embed=error_embed)
584
```
585
586
## Error Handling and Recovery
587
588
Robust error handling and automatic recovery mechanisms for background tasks.
589
590
### Task Error Management { .api }
591
592
```python
593
# Comprehensive error handling
594
class RobustTaskManager:
595
def __init__(self, bot):
596
self.bot = bot
597
self.error_counts = {}
598
self.max_retries = 3
599
self.retry_delay = 60 # seconds
600
601
@tasks.loop(minutes=5)
602
async def health_monitor(self):
603
"""Monitor bot health and performance."""
604
try:
605
# Check memory usage
606
memory_usage = await self.get_memory_usage()
607
608
# Check database connection
609
db_status = await self.check_database()
610
611
# Check API rate limits
612
rate_limit_status = await self.check_rate_limits()
613
614
# Log health metrics
615
await self.log_health_metrics({
616
'memory': memory_usage,
617
'database': db_status,
618
'rate_limits': rate_limit_status,
619
'timestamp': datetime.now()
620
})
621
622
# Alert if thresholds exceeded
623
if memory_usage > 0.8: # 80% memory usage
624
await self.send_alert("High memory usage detected", "warning")
625
626
if not db_status:
627
await self.send_alert("Database connection failed", "error")
628
629
except Exception as e:
630
await self.handle_task_error('health_monitor', e)
631
632
@health_monitor.error
633
async def health_monitor_error(self, exception):
634
"""Handle errors in health monitoring."""
635
print(f"Health monitor error: {exception}")
636
637
# Log the error
638
await self.log_error('health_monitor', exception)
639
640
# Increment error count
641
task_name = 'health_monitor'
642
self.error_counts[task_name] = self.error_counts.get(task_name, 0) + 1
643
644
# If too many errors, stop the task and alert admins
645
if self.error_counts[task_name] >= self.max_retries:
646
await self.send_alert(
647
f"Health monitor has failed {self.max_retries} times and has been stopped",
648
"critical"
649
)
650
self.health_monitor.stop()
651
652
@tasks.loop(seconds=30)
653
async def api_sync(self):
654
"""Sync data with external APIs."""
655
try:
656
# Reset error count on successful run
657
self.error_counts.pop('api_sync', None)
658
659
# Perform API synchronization
660
await self.sync_user_data()
661
await self.sync_server_stats()
662
663
except aiohttp.ClientTimeout:
664
# Temporary network issue - don't count as error
665
print("API sync timed out - retrying next cycle")
666
667
except aiohttp.ClientError as e:
668
# Network error - retry with backoff
669
await self.handle_network_error('api_sync', e)
670
671
except Exception as e:
672
await self.handle_task_error('api_sync', e)
673
674
@api_sync.error
675
async def api_sync_error(self, exception):
676
"""Handle API sync errors with exponential backoff."""
677
task_name = 'api_sync'
678
error_count = self.error_counts.get(task_name, 0) + 1
679
self.error_counts[task_name] = error_count
680
681
print(f"API sync error #{error_count}: {exception}")
682
683
if error_count < self.max_retries:
684
# Exponential backoff: 60s, 120s, 240s
685
delay = self.retry_delay * (2 ** (error_count - 1))
686
print(f"Retrying API sync in {delay} seconds...")
687
688
# Restart with delay
689
await asyncio.sleep(delay)
690
if not self.api_sync.is_running():
691
self.api_sync.start()
692
else:
693
await self.send_alert(
694
"API sync has failed repeatedly and requires manual intervention",
695
"critical"
696
)
697
self.api_sync.stop()
698
699
async def handle_task_error(self, task_name: str, exception: Exception):
700
"""Generic task error handler."""
701
print(f"Task {task_name} error: {exception}")
702
703
# Log to file/database
704
await self.log_error(task_name, exception)
705
706
# Send alert for critical errors
707
if isinstance(exception, (DatabaseError, CriticalError)):
708
await self.send_alert(
709
f"Critical error in {task_name}: {exception}",
710
"critical"
711
)
712
713
async def send_alert(self, message: str, level: str):
714
"""Send alert to administrators."""
715
admin_channel = self.bot.get_channel(ADMIN_CHANNEL_ID)
716
if not admin_channel:
717
return
718
719
colors = {
720
"info": nextcord.Color.blue(),
721
"warning": nextcord.Color.orange(),
722
"error": nextcord.Color.red(),
723
"critical": nextcord.Color.from_rgb(139, 0, 0) # Dark red
724
}
725
726
icons = {
727
"info": "βΉοΈ",
728
"warning": "β οΈ",
729
"error": "β",
730
"critical": "π¨"
731
}
732
733
embed = nextcord.Embed(
734
title=f"{icons.get(level, 'β')} {level.title()} Alert",
735
description=message,
736
color=colors.get(level, nextcord.Color.default()),
737
timestamp=datetime.now()
738
)
739
740
try:
741
await admin_channel.send(embed=embed)
742
except nextcord.Forbidden:
743
print(f"Cannot send alert to admin channel: {message}")
744
745
# Graceful shutdown handling
746
class GracefulTaskShutdown:
747
def __init__(self):
748
self.tasks = []
749
self.shutdown_event = asyncio.Event()
750
751
def register_task(self, task_loop):
752
"""Register a task for graceful shutdown."""
753
self.tasks.append(task_loop)
754
755
async def shutdown_all_tasks(self):
756
"""Gracefully shutdown all registered tasks."""
757
print("Initiating graceful task shutdown...")
758
759
# Set shutdown event
760
self.shutdown_event.set()
761
762
# Stop all tasks
763
for task in self.tasks:
764
if task.is_running():
765
task.stop()
766
767
# Wait for tasks to complete current iteration
768
await asyncio.sleep(2)
769
770
print("All tasks shut down gracefully")
771
772
@tasks.loop(seconds=10)
773
async def example_task(self):
774
"""Example task with shutdown check."""
775
if self.shutdown_event.is_set():
776
print("Shutdown requested - stopping task")
777
self.example_task.stop()
778
return
779
780
# Normal task work here
781
await self.do_task_work()
782
783
async def do_task_work(self):
784
"""Simulate task work."""
785
await asyncio.sleep(1)
786
787
# Usage with bot events
788
shutdown_manager = GracefulTaskShutdown()
789
790
@bot.event
791
async def on_ready():
792
"""Start all background tasks when bot is ready."""
793
print(f'{bot.user} is ready!')
794
795
# Start task manager
796
task_manager = RobustTaskManager(bot)
797
task_manager.health_monitor.start()
798
task_manager.api_sync.start()
799
800
# Register tasks for graceful shutdown
801
shutdown_manager.register_task(task_manager.health_monitor)
802
shutdown_manager.register_task(task_manager.api_sync)
803
804
print("All background tasks started")
805
806
@bot.event
807
async def on_disconnect():
808
"""Handle bot disconnection."""
809
print("Bot disconnected - maintaining tasks")
810
811
# Graceful shutdown on SIGINT/SIGTERM
812
import signal
813
814
def signal_handler(signum, frame):
815
"""Handle shutdown signals."""
816
print(f"Received signal {signum}")
817
asyncio.create_task(shutdown_manager.shutdown_all_tasks())
818
asyncio.create_task(bot.close())
819
820
signal.signal(signal.SIGINT, signal_handler)
821
signal.signal(signal.SIGTERM, signal_handler)
822
823
# Start the bot
824
if __name__ == "__main__":
825
bot.run(TOKEN)
826
```
827
828
## Advanced Task Patterns
829
830
Sophisticated task patterns for complex scheduling and coordination requirements.
831
832
### Conditional and Dependent Tasks { .api }
833
834
```python
835
# Task coordination and dependencies
836
class TaskCoordinator:
837
def __init__(self, bot):
838
self.bot = bot
839
self.task_states = {}
840
self.dependencies = {}
841
842
@tasks.loop(minutes=1)
843
async def data_collector(self):
844
"""Collect data - prerequisite for other tasks."""
845
try:
846
# Collect various metrics
847
data = {
848
'timestamp': datetime.now(),
849
'guild_count': len(self.bot.guilds),
850
'user_count': sum(g.member_count for g in self.bot.guilds),
851
'message_rate': await self.get_message_rate(),
852
'memory_usage': await self.get_memory_usage()
853
}
854
855
# Store collected data
856
await self.store_metrics(data)
857
858
# Mark task as successful
859
self.task_states['data_collector'] = {
860
'last_success': datetime.now(),
861
'status': 'success',
862
'data': data
863
}
864
865
except Exception as e:
866
self.task_states['data_collector'] = {
867
'last_error': datetime.now(),
868
'status': 'error',
869
'error': str(e)
870
}
871
raise
872
873
@tasks.loop(minutes=5)
874
async def data_analyzer(self):
875
"""Analyze collected data - depends on data_collector."""
876
# Check if dependency is met
877
if not self.check_dependency('data_analyzer', 'data_collector', max_age_minutes=2):
878
print("Data analyzer skipped - waiting for fresh data")
879
return
880
881
try:
882
# Get the latest data
883
collector_state = self.task_states.get('data_collector', {})
884
if 'data' not in collector_state:
885
return
886
887
data = collector_state['data']
888
889
# Perform analysis
890
analysis = {
891
'growth_rate': await self.calculate_growth_rate(data),
892
'performance_score': await self.calculate_performance(data),
893
'alerts': await self.check_thresholds(data),
894
'timestamp': datetime.now()
895
}
896
897
# Store analysis results
898
await self.store_analysis(analysis)
899
900
# Update task state
901
self.task_states['data_analyzer'] = {
902
'last_success': datetime.now(),
903
'status': 'success',
904
'analysis': analysis
905
}
906
907
# Trigger alerts if necessary
908
if analysis['alerts']:
909
await self.send_analysis_alerts(analysis['alerts'])
910
911
except Exception as e:
912
self.task_states['data_analyzer'] = {
913
'last_error': datetime.now(),
914
'status': 'error',
915
'error': str(e)
916
}
917
print(f"Data analysis failed: {e}")
918
919
@tasks.loop(hours=1)
920
async def report_generator(self):
921
"""Generate reports - depends on both collector and analyzer."""
922
# Check multiple dependencies
923
dependencies = ['data_collector', 'data_analyzer']
924
925
if not all(self.check_dependency('report_generator', dep, max_age_minutes=10) for dep in dependencies):
926
print("Report generation skipped - dependencies not met")
927
return
928
929
try:
930
# Generate comprehensive report
931
report = await self.generate_hourly_report()
932
933
# Send to configured channels
934
await self.distribute_report(report)
935
936
# Update task state
937
self.task_states['report_generator'] = {
938
'last_success': datetime.now(),
939
'status': 'success',
940
'report_id': report['id']
941
}
942
943
except Exception as e:
944
self.task_states['report_generator'] = {
945
'last_error': datetime.now(),
946
'status': 'error',
947
'error': str(e)
948
}
949
print(f"Report generation failed: {e}")
950
951
def check_dependency(self, task_name: str, dependency: str, max_age_minutes: int = 5) -> bool:
952
"""Check if a task dependency is satisfied."""
953
dep_state = self.task_states.get(dependency, {})
954
955
# Check if dependency task has succeeded recently
956
if dep_state.get('status') != 'success':
957
return False
958
959
last_success = dep_state.get('last_success')
960
if not last_success:
961
return False
962
963
# Check if the success is recent enough
964
age = (datetime.now() - last_success).total_seconds() / 60
965
return age <= max_age_minutes
966
967
def start_all_tasks(self):
968
"""Start all coordinated tasks."""
969
self.data_collector.start()
970
self.data_analyzer.start()
971
self.report_generator.start()
972
973
# Conditional execution based on external factors
974
class ConditionalTasks:
975
def __init__(self, bot):
976
self.bot = bot
977
978
@tasks.loop(minutes=10)
979
async def weather_alerts(self):
980
"""Send weather alerts only during severe weather."""
981
# Check if any guilds have weather alerts enabled
982
enabled_guilds = await self.get_weather_enabled_guilds()
983
if not enabled_guilds:
984
return
985
986
# Get current weather conditions
987
severe_weather = await self.check_severe_weather()
988
989
if not severe_weather:
990
return # No severe weather, skip this iteration
991
992
# Send alerts to enabled guilds
993
for guild_id in enabled_guilds:
994
guild = self.bot.get_guild(guild_id)
995
if not guild:
996
continue
997
998
weather_channel = await self.get_guild_weather_channel(guild_id)
999
if not weather_channel:
1000
continue
1001
1002
# Create weather alert embed
1003
embed = nextcord.Embed(
1004
title="πͺοΈ Severe Weather Alert",
1005
description=severe_weather['description'],
1006
color=nextcord.Color.red()
1007
)
1008
1009
embed.add_field(name="Type", value=severe_weather['type'], inline=True)
1010
embed.add_field(name="Severity", value=severe_weather['severity'], inline=True)
1011
embed.add_field(name="Location", value=severe_weather['location'], inline=True)
1012
1013
if severe_weather.get('expires'):
1014
embed.add_field(name="Expires", value=severe_weather['expires'], inline=False)
1015
1016
try:
1017
await weather_channel.send(embed=embed)
1018
except nextcord.Forbidden:
1019
# Remove channel if bot can't send messages
1020
await self.remove_weather_channel(guild_id)
1021
1022
@tasks.loop(time=time(hour=6)) # 6 AM daily
1023
async def school_announcements(self):
1024
"""Send school announcements only on weekdays during school year."""
1025
now = datetime.now()
1026
1027
# Skip weekends
1028
if now.weekday() >= 5: # Saturday = 5, Sunday = 6
1029
return
1030
1031
# Skip during summer break (customize dates as needed)
1032
summer_start = datetime(now.year, 6, 15) # June 15
1033
summer_end = datetime(now.year, 8, 25) # August 25
1034
1035
if summer_start <= now <= summer_end:
1036
return
1037
1038
# Skip during winter break
1039
if now.month == 12 and now.day >= 20:
1040
return
1041
if now.month == 1 and now.day <= 5:
1042
return
1043
1044
# Send school announcements
1045
announcements = await self.get_school_announcements()
1046
1047
for guild_id in await self.get_school_guilds():
1048
guild = self.bot.get_guild(guild_id)
1049
if not guild:
1050
continue
1051
1052
school_channel = await self.get_school_channel(guild_id)
1053
if school_channel and announcements:
1054
for announcement in announcements:
1055
embed = nextcord.Embed(
1056
title="π« School Announcement",
1057
description=announcement['content'],
1058
color=nextcord.Color.blue()
1059
)
1060
1061
embed.set_footer(text=f"From: {announcement['source']}")
1062
1063
try:
1064
await school_channel.send(embed=embed)
1065
except nextcord.Forbidden:
1066
pass
1067
1068
# Resource-aware task scheduling
1069
class ResourceAwareScheduler:
1070
def __init__(self, bot):
1071
self.bot = bot
1072
self.high_load_threshold = 0.8 # 80% resource usage
1073
1074
@tasks.loop(seconds=30)
1075
async def resource_monitor(self):
1076
"""Monitor system resources and adjust task frequency."""
1077
resources = await self.get_system_resources()
1078
1079
if resources['cpu_percent'] > self.high_load_threshold:
1080
# Reduce task frequency during high load
1081
await self.reduce_task_frequency()
1082
elif resources['cpu_percent'] < 0.3: # Low load
1083
# Restore normal frequency during low load
1084
await self.restore_task_frequency()
1085
1086
async def reduce_task_frequency(self):
1087
"""Reduce frequency of non-critical tasks during high load."""
1088
# Change intervals for resource-intensive tasks
1089
if hasattr(self, 'data_backup') and self.data_backup.is_running():
1090
self.data_backup.change_interval(minutes=15) # Reduce from 5 to 15 minutes
1091
1092
if hasattr(self, 'statistics_update') and self.statistics_update.is_running():
1093
self.statistics_update.change_interval(minutes=30) # Reduce frequency
1094
1095
async def restore_task_frequency(self):
1096
"""Restore normal task frequency when resources are available."""
1097
if hasattr(self, 'data_backup') and self.data_backup.is_running():
1098
self.data_backup.change_interval(minutes=5) # Back to normal
1099
1100
if hasattr(self, 'statistics_update') and self.statistics_update.is_running():
1101
self.statistics_update.change_interval(minutes=10) # Back to normal
1102
```
1103
1104
This comprehensive documentation covers all aspects of nextcord's task system, providing developers with robust tools for creating sophisticated background operations and scheduled tasks.