or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-commands.mdchannels.mdclient.mdcommands.mderrors.mdevents.mdguild.mdindex.mdmessages.mdpermissions.mdtasks.mdui.mdusers.mdutilities.mdvoice.mdwebhooks.md

tasks.mddocs/

0

# Nextcord Background Tasks

1

2

Task scheduling and background operations with the tasks extension for periodic and scheduled operations, providing robust async task management.

3

4

## Loop Decorator

5

6

The primary decorator for creating background tasks with flexible scheduling options.

7

8

### Task Loop Decorator { .api }

9

10

```python

11

import nextcord

12

from nextcord.ext import tasks, commands

13

from datetime import datetime, time, timedelta

14

from typing import Optional, Union, Sequence, Any, Callable

15

16

def loop(

17

*,

18

seconds: float = MISSING,

19

minutes: float = MISSING,

20

hours: float = MISSING,

21

time: Union[time, Sequence[time]] = MISSING,

22

count: Optional[int] = None,

23

reconnect: bool = True

24

) -> Callable:

25

"""Decorator to create a task loop.

26

27

Parameters

28

----------

29

seconds: float

30

Number of seconds between iterations.

31

minutes: float

32

Number of minutes between iterations.

33

hours: float

34

Number of hours between iterations.

35

time: Union[time, Sequence[time]]

36

Specific time(s) to run the task daily.

37

count: Optional[int]

38

Number of times to run the task. None for infinite.

39

reconnect: bool

40

Whether to reconnect the bot if it disconnects during the task.

41

42

Returns

43

-------

44

Callable

45

The decorated function as a Loop object.

46

"""

47

...

48

49

# Basic periodic tasks

50

@tasks.loop(seconds=30)

51

async def status_updater():

52

"""Update bot status every 30 seconds."""

53

guild_count = len(bot.guilds)

54

member_count = sum(guild.member_count for guild in bot.guilds)

55

56

activity = nextcord.Activity(

57

type=nextcord.ActivityType.watching,

58

name=f"{guild_count} servers β€’ {member_count} members"

59

)

60

61

await bot.change_presence(activity=activity)

62

63

@tasks.loop(minutes=5)

64

async def backup_data():

65

"""Backup important data every 5 minutes."""

66

print(f"[{datetime.now()}] Starting data backup...")

67

68

# Simulate backup process

69

await backup_user_data()

70

await backup_guild_settings()

71

72

print(f"[{datetime.now()}] Data backup completed!")

73

74

@tasks.loop(hours=1)

75

async def cleanup_temp_files():

76

"""Clean up temporary files hourly."""

77

import os

78

import glob

79

80

temp_dir = "./temp/"

81

if os.path.exists(temp_dir):

82

old_files = glob.glob(os.path.join(temp_dir, "*"))

83

84

for file_path in old_files:

85

try:

86

# Delete files older than 1 hour

87

if os.path.getmtime(file_path) < (datetime.now().timestamp() - 3600):

88

os.remove(file_path)

89

print(f"Deleted old temp file: {file_path}")

90

except OSError:

91

pass

92

93

# Daily tasks at specific times

94

@tasks.loop(time=time(hour=0, minute=0)) # Midnight UTC

95

async def daily_reset():

96

"""Reset daily statistics at midnight."""

97

print("Performing daily reset...")

98

99

# Reset daily user stats

100

await reset_daily_stats()

101

102

# Send daily report to admin channel

103

admin_channel = bot.get_channel(ADMIN_CHANNEL_ID)

104

if admin_channel:

105

stats = await get_daily_statistics()

106

107

embed = nextcord.Embed(

108

title="πŸ“Š Daily Report",

109

description=f"Statistics for {datetime.now().strftime('%Y-%m-%d')}",

110

color=nextcord.Color.blue()

111

)

112

113

embed.add_field(name="Messages Processed", value=stats['messages'], inline=True)

114

embed.add_field(name="Commands Used", value=stats['commands'], inline=True)

115

embed.add_field(name="New Users", value=stats['new_users'], inline=True)

116

117

await admin_channel.send(embed=embed)

118

119

# Multiple daily times

120

@tasks.loop(time=[time(hour=9), time(hour=15), time(hour=21)]) # 9 AM, 3 PM, 9 PM

121

async def reminder_check():

122

"""Check for due reminders three times daily."""

123

due_reminders = await get_due_reminders()

124

125

for reminder in due_reminders:

126

try:

127

user = bot.get_user(reminder['user_id'])

128

if user:

129

embed = nextcord.Embed(

130

title="⏰ Reminder",

131

description=reminder['message'],

132

color=nextcord.Color.orange()

133

)

134

embed.set_footer(text=f"Set on {reminder['created_at']}")

135

136

await user.send(embed=embed)

137

await mark_reminder_completed(reminder['id'])

138

except nextcord.Forbidden:

139

# User has DMs disabled

140

pass

141

142

# Limited count tasks

143

@tasks.loop(seconds=10, count=5)

144

async def startup_checks():

145

"""Perform startup checks 5 times with 10-second intervals."""

146

check_number = startup_checks.current_loop + 1

147

print(f"Startup check {check_number}/5")

148

149

# Check database connection

150

if await check_database_connection():

151

print("βœ… Database connection OK")

152

else:

153

print("❌ Database connection failed")

154

155

# Check API endpoints

156

if await check_external_apis():

157

print("βœ… External APIs OK")

158

else:

159

print("❌ External API check failed")

160

```

161

162

## Loop Class

163

164

The Loop class provides control over background tasks with start, stop, and restart functionality.

165

166

### Loop Control Methods { .api }

167

168

```python

169

class Loop:

170

"""Represents a background task loop.

171

172

Attributes

173

----------

174

coro: Callable

175

The coroutine function being looped.

176

seconds: Optional[float]

177

Seconds between each iteration.

178

minutes: Optional[float]

179

Minutes between each iteration.

180

hours: Optional[float]

181

Hours between each iteration.

182

time: Optional[Sequence[time]]

183

Specific times to run daily.

184

count: Optional[int]

185

Maximum number of iterations.

186

current_loop: int

187

Current iteration number (0-indexed).

188

next_iteration: Optional[datetime]

189

When the next iteration will run.

190

"""

191

192

def start(self, *args, **kwargs) -> None:

193

"""Start the loop.

194

195

Parameters

196

----------

197

*args, **kwargs

198

Arguments to pass to the loop function.

199

"""

200

...

201

202

def stop(self) -> None:

203

"""Stop the loop."""

204

...

205

206

def restart(self, *args, **kwargs) -> None:

207

"""Restart the loop with new arguments.

208

209

Parameters

210

----------

211

*args, **kwargs

212

New arguments to pass to the loop function.

213

"""

214

...

215

216

def cancel(self) -> None:

217

"""Cancel the loop (alias for stop)."""

218

...

219

220

def change_interval(

221

self,

222

*,

223

seconds: float = MISSING,

224

minutes: float = MISSING,

225

hours: float = MISSING,

226

time: Union[time, Sequence[time]] = MISSING

227

) -> None:

228

"""Change the loop interval.

229

230

Parameters

231

----------

232

seconds: float

233

New seconds interval.

234

minutes: float

235

New minutes interval.

236

hours: float

237

New hours interval.

238

time: Union[time, Sequence[time]]

239

New time(s) for daily execution.

240

"""

241

...

242

243

def is_running(self) -> bool:

244

"""bool: Whether the loop is currently running."""

245

...

246

247

def is_being_cancelled(self) -> bool:

248

"""bool: Whether the loop is being cancelled."""

249

...

250

251

def failed(self) -> bool:

252

"""bool: Whether the loop has failed and stopped."""

253

...

254

255

def get_task(self) -> Optional[asyncio.Task]:

256

"""Optional[asyncio.Task]: The underlying asyncio task."""

257

...

258

259

# Event handlers that can be overridden

260

async def before_loop(self) -> None:

261

"""Called before the loop starts."""

262

...

263

264

async def after_loop(self) -> None:

265

"""Called after the loop ends."""

266

...

267

268

def error(self, exception: Exception) -> None:

269

"""Called when an exception occurs in the loop.

270

271

Parameters

272

----------

273

exception: Exception

274

The exception that occurred.

275

"""

276

...

277

278

# Advanced loop control example

279

class DatabaseManager:

280

def __init__(self, bot):

281

self.bot = bot

282

self.connection_check.start()

283

self.maintenance_task.start()

284

285

@tasks.loop(minutes=1)

286

async def connection_check(self):

287

"""Monitor database connection health."""

288

try:

289

# Test database connection

290

await self.test_connection()

291

292

if not self.is_healthy:

293

print("Database connection restored!")

294

self.is_healthy = True

295

296

except Exception as e:

297

if self.is_healthy:

298

print(f"Database connection lost: {e}")

299

self.is_healthy = False

300

301

# Notify administrators

302

await self.notify_admins("🚨 Database connection lost!")

303

304

@connection_check.before_loop

305

async def before_connection_check(self):

306

"""Wait for bot to be ready before starting connection checks."""

307

await self.bot.wait_until_ready()

308

self.is_healthy = True

309

print("Starting database connection monitoring...")

310

311

@connection_check.error

312

async def connection_check_error(self, exception):

313

"""Handle errors in connection check loop."""

314

print(f"Error in connection check: {exception}")

315

316

# If the error is critical, restart the loop

317

if isinstance(exception, CriticalDatabaseError):

318

print("Critical database error - restarting connection check...")

319

self.connection_check.restart()

320

321

@tasks.loop(hours=24)

322

async def maintenance_task(self):

323

"""Daily database maintenance."""

324

print("Starting daily database maintenance...")

325

326

try:

327

await self.optimize_tables()

328

await self.clean_old_records()

329

await self.update_statistics()

330

331

print("Database maintenance completed successfully")

332

333

except Exception as e:

334

print(f"Maintenance task failed: {e}")

335

raise

336

337

# Dynamic loop management

338

class TaskManager:

339

def __init__(self, bot):

340

self.bot = bot

341

self.active_loops = {}

342

343

def create_reminder_loop(self, user_id: int, message: str, interval: int):

344

"""Create a personalized reminder loop."""

345

346

@tasks.loop(seconds=interval)

347

async def user_reminder():

348

user = self.bot.get_user(user_id)

349

if user:

350

try:

351

await user.send(f"⏰ Reminder: {message}")

352

except nextcord.Forbidden:

353

# Stop loop if user blocks bot

354

user_reminder.stop()

355

del self.active_loops[user_id]

356

357

# Store and start the loop

358

self.active_loops[user_id] = user_reminder

359

user_reminder.start()

360

361

return user_reminder

362

363

def stop_user_reminder(self, user_id: int):

364

"""Stop a user's reminder loop."""

365

if user_id in self.active_loops:

366

self.active_loops[user_id].stop()

367

del self.active_loops[user_id]

368

return True

369

return False

370

371

def get_user_reminder_status(self, user_id: int) -> dict:

372

"""Get status of a user's reminder loop."""

373

if user_id not in self.active_loops:

374

return {"active": False}

375

376

loop = self.active_loops[user_id]

377

return {

378

"active": loop.is_running(),

379

"current_loop": loop.current_loop,

380

"next_iteration": loop.next_iteration

381

}

382

383

# Task manager usage

384

task_manager = TaskManager(bot)

385

386

@bot.command()

387

async def set_reminder(ctx, interval: int, *, message: str):

388

"""Set a personal reminder that repeats every X seconds."""

389

if interval < 60:

390

await ctx.send("❌ Interval must be at least 60 seconds.")

391

return

392

393

if interval > 86400: # 24 hours

394

await ctx.send("❌ Interval cannot exceed 24 hours.")

395

return

396

397

# Stop existing reminder if any

398

task_manager.stop_user_reminder(ctx.author.id)

399

400

# Create new reminder

401

loop = task_manager.create_reminder_loop(ctx.author.id, message, interval)

402

403

await ctx.send(f"βœ… Reminder set! Will remind you every {interval} seconds: {message}")

404

405

@bot.command()

406

async def stop_reminder(ctx):

407

"""Stop your personal reminder."""

408

if task_manager.stop_user_reminder(ctx.author.id):

409

await ctx.send("βœ… Your reminder has been stopped.")

410

else:

411

await ctx.send("❌ You don't have an active reminder.")

412

413

@bot.command()

414

async def reminder_status(ctx):

415

"""Check your reminder status."""

416

status = task_manager.get_user_reminder_status(ctx.author.id)

417

418

if not status["active"]:

419

await ctx.send("You don't have an active reminder.")

420

return

421

422

embed = nextcord.Embed(title="Reminder Status", color=nextcord.Color.blue())

423

embed.add_field(name="Status", value="Active βœ…", inline=True)

424

embed.add_field(name="Iteration", value=status["current_loop"] + 1, inline=True)

425

426

if status["next_iteration"]:

427

next_time = status["next_iteration"].strftime("%H:%M:%S UTC")

428

embed.add_field(name="Next Reminder", value=next_time, inline=True)

429

430

await ctx.send(embed=embed)

431

```

432

433

## Task Scheduling

434

435

Advanced scheduling patterns for complex timing requirements.

436

437

### Time-based Scheduling { .api }

438

439

```python

440

from datetime import time, datetime, timezone, timedelta

441

import asyncio

442

443

# Multiple specific times

444

@tasks.loop(time=[

445

time(hour=8, minute=0), # 8:00 AM

446

time(hour=12, minute=0), # 12:00 PM

447

time(hour=18, minute=0), # 6:00 PM

448

time(hour=22, minute=0) # 10:00 PM

449

])

450

async def quarterly_announcements():

451

"""Send announcements four times daily."""

452

announcements = await get_pending_announcements()

453

454

for guild in bot.guilds:

455

announcement_channel = get_guild_announcement_channel(guild.id)

456

if announcement_channel and announcements:

457

for announcement in announcements:

458

embed = nextcord.Embed(

459

title="πŸ“’ Announcement",

460

description=announcement['message'],

461

color=nextcord.Color.gold()

462

)

463

embed.set_footer(text=announcement['author'])

464

465

try:

466

await announcement_channel.send(embed=embed)

467

except nextcord.Forbidden:

468

pass # No permission to send messages

469

470

# Timezone-aware scheduling

471

import pytz

472

473

@tasks.loop(time=time(hour=9, minute=0, tzinfo=pytz.timezone('US/Eastern')))

474

async def eastern_morning_report():

475

"""Send morning report at 9 AM Eastern Time."""

476

eastern = pytz.timezone('US/Eastern')

477

current_time = datetime.now(eastern)

478

479

embed = nextcord.Embed(

480

title="πŸŒ… Morning Report",

481

description=f"Good morning! It's {current_time.strftime('%A, %B %d, %Y')}",

482

color=nextcord.Color.orange()

483

)

484

485

# Add daily statistics

486

stats = await get_overnight_statistics()

487

embed.add_field(name="Overnight Activity", value=f"{stats['messages']} messages", inline=True)

488

embed.add_field(name="New Members", value=stats['new_members'], inline=True)

489

embed.add_field(name="Server Status", value="🟒 Online", inline=True)

490

491

# Send to all configured channels

492

for guild in bot.guilds:

493

channel = get_guild_report_channel(guild.id)

494

if channel:

495

try:

496

await channel.send(embed=embed)

497

except nextcord.Forbidden:

498

pass

499

500

# Weekly scheduling

501

@tasks.loop(time=time(hour=10, minute=0)) # Daily at 10 AM

502

async def weekly_summary():

503

"""Send weekly summary every Sunday."""

504

if datetime.now().weekday() != 6: # Sunday is 6

505

return

506

507

# Calculate week range

508

today = datetime.now()

509

week_start = today - timedelta(days=7)

510

511

embed = nextcord.Embed(

512

title="πŸ“Š Weekly Summary",

513

description=f"Week of {week_start.strftime('%B %d')} - {today.strftime('%B %d, %Y')}",

514

color=nextcord.Color.purple()

515

)

516

517

# Gather weekly statistics

518

weekly_stats = await get_weekly_statistics(week_start, today)

519

520

embed.add_field(name="Total Messages", value=f"{weekly_stats['messages']:,}", inline=True)

521

embed.add_field(name="Active Users", value=f"{weekly_stats['active_users']:,}", inline=True)

522

embed.add_field(name="Commands Used", value=f"{weekly_stats['commands']:,}", inline=True)

523

embed.add_field(name="New Members", value=f"{weekly_stats['new_members']:,}", inline=True)

524

embed.add_field(name="Peak Hour", value=weekly_stats['peak_hour'], inline=True)

525

embed.add_field(name="Top Channel", value=weekly_stats['top_channel'], inline=True)

526

527

# Send to admin channels

528

for guild in bot.guilds:

529

admin_channel = get_guild_admin_channel(guild.id)

530

if admin_channel:

531

try:

532

await admin_channel.send(embed=embed)

533

except nextcord.Forbidden:

534

pass

535

536

# Monthly scheduling

537

@tasks.loop(time=time(hour=0, minute=0)) # Daily at midnight

538

async def monthly_cleanup():

539

"""Perform monthly cleanup on the first day of each month."""

540

today = datetime.now()

541

if today.day != 1: # Only run on first day of month

542

return

543

544

print(f"Starting monthly cleanup for {today.strftime('%B %Y')}")

545

546

try:

547

# Archive old messages

548

await archive_old_messages(days=90)

549

550

# Clean up inactive users

551

inactive_users = await cleanup_inactive_users(days=30)

552

553

# Generate monthly report

554

report = await generate_monthly_report(today.year, today.month - 1)

555

556

# Send admin notification

557

admin_channel = bot.get_channel(ADMIN_CHANNEL_ID)

558

if admin_channel:

559

embed = nextcord.Embed(

560

title="πŸ—‚οΈ Monthly Cleanup Complete",

561

description=f"Cleanup completed for {(today - timedelta(days=1)).strftime('%B %Y')}",

562

color=nextcord.Color.green()

563

)

564

565

embed.add_field(name="Messages Archived", value=f"{report['archived_messages']:,}", inline=True)

566

embed.add_field(name="Inactive Users Removed", value=len(inactive_users), inline=True)

567

embed.add_field(name="Storage Freed", value=f"{report['storage_freed']} MB", inline=True)

568

569

await admin_channel.send(embed=embed)

570

571

print("Monthly cleanup completed successfully")

572

573

except Exception as e:

574

print(f"Monthly cleanup failed: {e}")

575

576

# Notify admins of failure

577

if admin_channel:

578

error_embed = nextcord.Embed(

579

title="❌ Monthly Cleanup Failed",

580

description=f"Cleanup process encountered an error: {str(e)}",

581

color=nextcord.Color.red()

582

)

583

await admin_channel.send(embed=error_embed)

584

```

585

586

## Error Handling and Recovery

587

588

Robust error handling and automatic recovery mechanisms for background tasks.

589

590

### Task Error Management { .api }

591

592

```python

593

# Comprehensive error handling

594

class RobustTaskManager:

595

def __init__(self, bot):

596

self.bot = bot

597

self.error_counts = {}

598

self.max_retries = 3

599

self.retry_delay = 60 # seconds

600

601

@tasks.loop(minutes=5)

602

async def health_monitor(self):

603

"""Monitor bot health and performance."""

604

try:

605

# Check memory usage

606

memory_usage = await self.get_memory_usage()

607

608

# Check database connection

609

db_status = await self.check_database()

610

611

# Check API rate limits

612

rate_limit_status = await self.check_rate_limits()

613

614

# Log health metrics

615

await self.log_health_metrics({

616

'memory': memory_usage,

617

'database': db_status,

618

'rate_limits': rate_limit_status,

619

'timestamp': datetime.now()

620

})

621

622

# Alert if thresholds exceeded

623

if memory_usage > 0.8: # 80% memory usage

624

await self.send_alert("High memory usage detected", "warning")

625

626

if not db_status:

627

await self.send_alert("Database connection failed", "error")

628

629

except Exception as e:

630

await self.handle_task_error('health_monitor', e)

631

632

@health_monitor.error

633

async def health_monitor_error(self, exception):

634

"""Handle errors in health monitoring."""

635

print(f"Health monitor error: {exception}")

636

637

# Log the error

638

await self.log_error('health_monitor', exception)

639

640

# Increment error count

641

task_name = 'health_monitor'

642

self.error_counts[task_name] = self.error_counts.get(task_name, 0) + 1

643

644

# If too many errors, stop the task and alert admins

645

if self.error_counts[task_name] >= self.max_retries:

646

await self.send_alert(

647

f"Health monitor has failed {self.max_retries} times and has been stopped",

648

"critical"

649

)

650

self.health_monitor.stop()

651

652

@tasks.loop(seconds=30)

653

async def api_sync(self):

654

"""Sync data with external APIs."""

655

try:

656

# Reset error count on successful run

657

self.error_counts.pop('api_sync', None)

658

659

# Perform API synchronization

660

await self.sync_user_data()

661

await self.sync_server_stats()

662

663

except aiohttp.ClientTimeout:

664

# Temporary network issue - don't count as error

665

print("API sync timed out - retrying next cycle")

666

667

except aiohttp.ClientError as e:

668

# Network error - retry with backoff

669

await self.handle_network_error('api_sync', e)

670

671

except Exception as e:

672

await self.handle_task_error('api_sync', e)

673

674

@api_sync.error

675

async def api_sync_error(self, exception):

676

"""Handle API sync errors with exponential backoff."""

677

task_name = 'api_sync'

678

error_count = self.error_counts.get(task_name, 0) + 1

679

self.error_counts[task_name] = error_count

680

681

print(f"API sync error #{error_count}: {exception}")

682

683

if error_count < self.max_retries:

684

# Exponential backoff: 60s, 120s, 240s

685

delay = self.retry_delay * (2 ** (error_count - 1))

686

print(f"Retrying API sync in {delay} seconds...")

687

688

# Restart with delay

689

await asyncio.sleep(delay)

690

if not self.api_sync.is_running():

691

self.api_sync.start()

692

else:

693

await self.send_alert(

694

"API sync has failed repeatedly and requires manual intervention",

695

"critical"

696

)

697

self.api_sync.stop()

698

699

async def handle_task_error(self, task_name: str, exception: Exception):

700

"""Generic task error handler."""

701

print(f"Task {task_name} error: {exception}")

702

703

# Log to file/database

704

await self.log_error(task_name, exception)

705

706

# Send alert for critical errors

707

if isinstance(exception, (DatabaseError, CriticalError)):

708

await self.send_alert(

709

f"Critical error in {task_name}: {exception}",

710

"critical"

711

)

712

713

async def send_alert(self, message: str, level: str):

714

"""Send alert to administrators."""

715

admin_channel = self.bot.get_channel(ADMIN_CHANNEL_ID)

716

if not admin_channel:

717

return

718

719

colors = {

720

"info": nextcord.Color.blue(),

721

"warning": nextcord.Color.orange(),

722

"error": nextcord.Color.red(),

723

"critical": nextcord.Color.from_rgb(139, 0, 0) # Dark red

724

}

725

726

icons = {

727

"info": "ℹ️",

728

"warning": "⚠️",

729

"error": "❌",

730

"critical": "🚨"

731

}

732

733

embed = nextcord.Embed(

734

title=f"{icons.get(level, '❓')} {level.title()} Alert",

735

description=message,

736

color=colors.get(level, nextcord.Color.default()),

737

timestamp=datetime.now()

738

)

739

740

try:

741

await admin_channel.send(embed=embed)

742

except nextcord.Forbidden:

743

print(f"Cannot send alert to admin channel: {message}")

744

745

# Graceful shutdown handling

746

class GracefulTaskShutdown:

747

def __init__(self):

748

self.tasks = []

749

self.shutdown_event = asyncio.Event()

750

751

def register_task(self, task_loop):

752

"""Register a task for graceful shutdown."""

753

self.tasks.append(task_loop)

754

755

async def shutdown_all_tasks(self):

756

"""Gracefully shutdown all registered tasks."""

757

print("Initiating graceful task shutdown...")

758

759

# Set shutdown event

760

self.shutdown_event.set()

761

762

# Stop all tasks

763

for task in self.tasks:

764

if task.is_running():

765

task.stop()

766

767

# Wait for tasks to complete current iteration

768

await asyncio.sleep(2)

769

770

print("All tasks shut down gracefully")

771

772

@tasks.loop(seconds=10)

773

async def example_task(self):

774

"""Example task with shutdown check."""

775

if self.shutdown_event.is_set():

776

print("Shutdown requested - stopping task")

777

self.example_task.stop()

778

return

779

780

# Normal task work here

781

await self.do_task_work()

782

783

async def do_task_work(self):

784

"""Simulate task work."""

785

await asyncio.sleep(1)

786

787

# Usage with bot events

788

shutdown_manager = GracefulTaskShutdown()

789

790

@bot.event

791

async def on_ready():

792

"""Start all background tasks when bot is ready."""

793

print(f'{bot.user} is ready!')

794

795

# Start task manager

796

task_manager = RobustTaskManager(bot)

797

task_manager.health_monitor.start()

798

task_manager.api_sync.start()

799

800

# Register tasks for graceful shutdown

801

shutdown_manager.register_task(task_manager.health_monitor)

802

shutdown_manager.register_task(task_manager.api_sync)

803

804

print("All background tasks started")

805

806

@bot.event

807

async def on_disconnect():

808

"""Handle bot disconnection."""

809

print("Bot disconnected - maintaining tasks")

810

811

# Graceful shutdown on SIGINT/SIGTERM

812

import signal

813

814

def signal_handler(signum, frame):

815

"""Handle shutdown signals."""

816

print(f"Received signal {signum}")

817

asyncio.create_task(shutdown_manager.shutdown_all_tasks())

818

asyncio.create_task(bot.close())

819

820

signal.signal(signal.SIGINT, signal_handler)

821

signal.signal(signal.SIGTERM, signal_handler)

822

823

# Start the bot

824

if __name__ == "__main__":

825

bot.run(TOKEN)

826

```

827

828

## Advanced Task Patterns

829

830

Sophisticated task patterns for complex scheduling and coordination requirements.

831

832

### Conditional and Dependent Tasks { .api }

833

834

```python

835

# Task coordination and dependencies

836

class TaskCoordinator:

837

def __init__(self, bot):

838

self.bot = bot

839

self.task_states = {}

840

self.dependencies = {}

841

842

@tasks.loop(minutes=1)

843

async def data_collector(self):

844

"""Collect data - prerequisite for other tasks."""

845

try:

846

# Collect various metrics

847

data = {

848

'timestamp': datetime.now(),

849

'guild_count': len(self.bot.guilds),

850

'user_count': sum(g.member_count for g in self.bot.guilds),

851

'message_rate': await self.get_message_rate(),

852

'memory_usage': await self.get_memory_usage()

853

}

854

855

# Store collected data

856

await self.store_metrics(data)

857

858

# Mark task as successful

859

self.task_states['data_collector'] = {

860

'last_success': datetime.now(),

861

'status': 'success',

862

'data': data

863

}

864

865

except Exception as e:

866

self.task_states['data_collector'] = {

867

'last_error': datetime.now(),

868

'status': 'error',

869

'error': str(e)

870

}

871

raise

872

873

@tasks.loop(minutes=5)

874

async def data_analyzer(self):

875

"""Analyze collected data - depends on data_collector."""

876

# Check if dependency is met

877

if not self.check_dependency('data_analyzer', 'data_collector', max_age_minutes=2):

878

print("Data analyzer skipped - waiting for fresh data")

879

return

880

881

try:

882

# Get the latest data

883

collector_state = self.task_states.get('data_collector', {})

884

if 'data' not in collector_state:

885

return

886

887

data = collector_state['data']

888

889

# Perform analysis

890

analysis = {

891

'growth_rate': await self.calculate_growth_rate(data),

892

'performance_score': await self.calculate_performance(data),

893

'alerts': await self.check_thresholds(data),

894

'timestamp': datetime.now()

895

}

896

897

# Store analysis results

898

await self.store_analysis(analysis)

899

900

# Update task state

901

self.task_states['data_analyzer'] = {

902

'last_success': datetime.now(),

903

'status': 'success',

904

'analysis': analysis

905

}

906

907

# Trigger alerts if necessary

908

if analysis['alerts']:

909

await self.send_analysis_alerts(analysis['alerts'])

910

911

except Exception as e:

912

self.task_states['data_analyzer'] = {

913

'last_error': datetime.now(),

914

'status': 'error',

915

'error': str(e)

916

}

917

print(f"Data analysis failed: {e}")

918

919

@tasks.loop(hours=1)

920

async def report_generator(self):

921

"""Generate reports - depends on both collector and analyzer."""

922

# Check multiple dependencies

923

dependencies = ['data_collector', 'data_analyzer']

924

925

if not all(self.check_dependency('report_generator', dep, max_age_minutes=10) for dep in dependencies):

926

print("Report generation skipped - dependencies not met")

927

return

928

929

try:

930

# Generate comprehensive report

931

report = await self.generate_hourly_report()

932

933

# Send to configured channels

934

await self.distribute_report(report)

935

936

# Update task state

937

self.task_states['report_generator'] = {

938

'last_success': datetime.now(),

939

'status': 'success',

940

'report_id': report['id']

941

}

942

943

except Exception as e:

944

self.task_states['report_generator'] = {

945

'last_error': datetime.now(),

946

'status': 'error',

947

'error': str(e)

948

}

949

print(f"Report generation failed: {e}")

950

951

def check_dependency(self, task_name: str, dependency: str, max_age_minutes: int = 5) -> bool:

952

"""Check if a task dependency is satisfied."""

953

dep_state = self.task_states.get(dependency, {})

954

955

# Check if dependency task has succeeded recently

956

if dep_state.get('status') != 'success':

957

return False

958

959

last_success = dep_state.get('last_success')

960

if not last_success:

961

return False

962

963

# Check if the success is recent enough

964

age = (datetime.now() - last_success).total_seconds() / 60

965

return age <= max_age_minutes

966

967

def start_all_tasks(self):

968

"""Start all coordinated tasks."""

969

self.data_collector.start()

970

self.data_analyzer.start()

971

self.report_generator.start()

972

973

# Conditional execution based on external factors

974

class ConditionalTasks:

975

def __init__(self, bot):

976

self.bot = bot

977

978

@tasks.loop(minutes=10)

979

async def weather_alerts(self):

980

"""Send weather alerts only during severe weather."""

981

# Check if any guilds have weather alerts enabled

982

enabled_guilds = await self.get_weather_enabled_guilds()

983

if not enabled_guilds:

984

return

985

986

# Get current weather conditions

987

severe_weather = await self.check_severe_weather()

988

989

if not severe_weather:

990

return # No severe weather, skip this iteration

991

992

# Send alerts to enabled guilds

993

for guild_id in enabled_guilds:

994

guild = self.bot.get_guild(guild_id)

995

if not guild:

996

continue

997

998

weather_channel = await self.get_guild_weather_channel(guild_id)

999

if not weather_channel:

1000

continue

1001

1002

# Create weather alert embed

1003

embed = nextcord.Embed(

1004

title="πŸŒͺ️ Severe Weather Alert",

1005

description=severe_weather['description'],

1006

color=nextcord.Color.red()

1007

)

1008

1009

embed.add_field(name="Type", value=severe_weather['type'], inline=True)

1010

embed.add_field(name="Severity", value=severe_weather['severity'], inline=True)

1011

embed.add_field(name="Location", value=severe_weather['location'], inline=True)

1012

1013

if severe_weather.get('expires'):

1014

embed.add_field(name="Expires", value=severe_weather['expires'], inline=False)

1015

1016

try:

1017

await weather_channel.send(embed=embed)

1018

except nextcord.Forbidden:

1019

# Remove channel if bot can't send messages

1020

await self.remove_weather_channel(guild_id)

1021

1022

@tasks.loop(time=time(hour=6)) # 6 AM daily

1023

async def school_announcements(self):

1024

"""Send school announcements only on weekdays during school year."""

1025

now = datetime.now()

1026

1027

# Skip weekends

1028

if now.weekday() >= 5: # Saturday = 5, Sunday = 6

1029

return

1030

1031

# Skip during summer break (customize dates as needed)

1032

summer_start = datetime(now.year, 6, 15) # June 15

1033

summer_end = datetime(now.year, 8, 25) # August 25

1034

1035

if summer_start <= now <= summer_end:

1036

return

1037

1038

# Skip during winter break

1039

if now.month == 12 and now.day >= 20:

1040

return

1041

if now.month == 1 and now.day <= 5:

1042

return

1043

1044

# Send school announcements

1045

announcements = await self.get_school_announcements()

1046

1047

for guild_id in await self.get_school_guilds():

1048

guild = self.bot.get_guild(guild_id)

1049

if not guild:

1050

continue

1051

1052

school_channel = await self.get_school_channel(guild_id)

1053

if school_channel and announcements:

1054

for announcement in announcements:

1055

embed = nextcord.Embed(

1056

title="🏫 School Announcement",

1057

description=announcement['content'],

1058

color=nextcord.Color.blue()

1059

)

1060

1061

embed.set_footer(text=f"From: {announcement['source']}")

1062

1063

try:

1064

await school_channel.send(embed=embed)

1065

except nextcord.Forbidden:

1066

pass

1067

1068

# Resource-aware task scheduling

1069

class ResourceAwareScheduler:

1070

def __init__(self, bot):

1071

self.bot = bot

1072

self.high_load_threshold = 0.8 # 80% resource usage

1073

1074

@tasks.loop(seconds=30)

1075

async def resource_monitor(self):

1076

"""Monitor system resources and adjust task frequency."""

1077

resources = await self.get_system_resources()

1078

1079

if resources['cpu_percent'] > self.high_load_threshold:

1080

# Reduce task frequency during high load

1081

await self.reduce_task_frequency()

1082

elif resources['cpu_percent'] < 0.3: # Low load

1083

# Restore normal frequency during low load

1084

await self.restore_task_frequency()

1085

1086

async def reduce_task_frequency(self):

1087

"""Reduce frequency of non-critical tasks during high load."""

1088

# Change intervals for resource-intensive tasks

1089

if hasattr(self, 'data_backup') and self.data_backup.is_running():

1090

self.data_backup.change_interval(minutes=15) # Reduce from 5 to 15 minutes

1091

1092

if hasattr(self, 'statistics_update') and self.statistics_update.is_running():

1093

self.statistics_update.change_interval(minutes=30) # Reduce frequency

1094

1095

async def restore_task_frequency(self):

1096

"""Restore normal task frequency when resources are available."""

1097

if hasattr(self, 'data_backup') and self.data_backup.is_running():

1098

self.data_backup.change_interval(minutes=5) # Back to normal

1099

1100

if hasattr(self, 'statistics_update') and self.statistics_update.is_running():

1101

self.statistics_update.change_interval(minutes=10) # Back to normal

1102

```

1103

1104

This comprehensive documentation covers all aspects of nextcord's task system, providing developers with robust tools for creating sophisticated background operations and scheduled tasks.