or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-interface.mddatabase-scheduler.mdindex.mdschedule-models.mdtask-management.mdvalidation-utilities.md

database-scheduler.mddocs/

0

# Database Scheduler

1

2

Celery beat scheduler implementation that reads periodic tasks from the database instead of configuration files, enabling dynamic task management and real-time schedule updates.

3

4

## Capabilities

5

6

### Database Scheduler Class

7

8

Main scheduler class that replaces Celery's default file-based scheduler.

9

10

```python { .api }

11

class DatabaseScheduler(Scheduler):

12

"""

13

Database-backed beat scheduler for Celery.

14

15

Reads periodic tasks from Django database instead of configuration files,

16

allowing dynamic task management through Django ORM and Admin interface.

17

"""

18

# Class attributes

19

Entry = ModelEntry # Entry class for database-backed entries

20

Model = PeriodicTask # Django model for periodic tasks

21

Changes = PeriodicTasks # Model for tracking changes

22

23

# Configuration constants

24

DEFAULT_MAX_INTERVAL = 5 # Default max interval between database checks

25

SCHEDULE_SYNC_MAX_INTERVAL = 300 # Max interval for schedule synchronization

26

27

def setup_schedule(self): ...

28

29

def all_as_schedule(self) -> dict: ...

30

31

def enabled_models(self) -> list[PeriodicTask]: ...

32

33

def enabled_models_qs(self) -> QuerySet[PeriodicTask]: ...

34

35

def schedule_changed(self) -> bool: ...

36

37

def sync(self): ...

38

39

def update_from_dict(self, mapping: dict): ...

40

41

def install_default_entries(self, data: dict): ...

42

43

def reserve(self, entry: ModelEntry) -> ModelEntry: ...

44

45

def schedules_equal(self, *args, **kwargs) -> bool: ...

46

```

47

48

**Usage Examples:**

49

50

```python

51

# Configure Celery to use database scheduler

52

from celery import Celery

53

54

app = Celery('myapp')

55

app.conf.update(

56

beat_scheduler='django_celery_beat.schedulers:DatabaseScheduler',

57

# or use the shorthand:

58

# beat_scheduler='django',

59

)

60

61

# Start beat service with database scheduler

62

# Command line:

63

# celery -A myapp beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

64

# or:

65

# celery -A myapp beat -l info -S django

66

67

# Programmatic scheduler usage (advanced)

68

from django_celery_beat.schedulers import DatabaseScheduler

69

70

scheduler = DatabaseScheduler(app=app)

71

scheduler.setup_schedule()

72

73

# Check if schedule has changed

74

if scheduler.schedule_changed():

75

scheduler.sync()

76

77

# Get all enabled tasks as schedule dict

78

schedule_dict = scheduler.all_as_schedule()

79

print(f"Found {len(schedule_dict)} enabled tasks")

80

```

81

82

### Model Entry Class

83

84

Database-backed schedule entry that represents individual periodic tasks.

85

86

```python { .api }

87

class ModelEntry(ScheduleEntry):

88

"""

89

Database-backed schedule entry for periodic tasks.

90

91

Represents a single periodic task with its schedule and configuration,

92

managing execution state and database synchronization.

93

"""

94

model_schedules = {

95

schedules.crontab: 'crontab',

96

schedules.schedule: 'interval',

97

schedules.solar: 'solar',

98

clocked: 'clocked',

99

}

100

101

save_fields = [

102

'last_run_at', 'total_run_count', 'no_changes',

103

]

104

105

def __init__(self, model: PeriodicTask, app: Celery = None): ...

106

107

def is_due(self) -> tuple[bool, float]: ...

108

109

def save(self): ...

110

111

@classmethod

112

def to_model_schedule(cls, schedule: schedules.BaseSchedule) -> dict: ...

113

114

@classmethod

115

def from_entry(cls, name: str, app: Celery = None, **entry_kwargs) -> 'ModelEntry': ...

116

```

117

118

**Usage Examples:**

119

120

```python

121

from django_celery_beat.schedulers import ModelEntry

122

from django_celery_beat.models import PeriodicTask

123

124

# Create model entry from database task

125

task = PeriodicTask.objects.get(name='my_task')

126

entry = ModelEntry(task)

127

128

# Check if task is due

129

is_due, next_time = entry.is_due()

130

if is_due:

131

print(f"Task {entry.name} is due for execution")

132

entry.save() # Update last_run_at and total_run_count

133

else:

134

print(f"Task {entry.name} next due in {next_time} seconds")

135

136

# Create entry from configuration dict (advanced usage)

137

entry_config = {

138

'task': 'myapp.tasks.example_task',

139

'schedule': schedules.schedule(run_every=30), # Every 30 seconds

140

'args': (),

141

'kwargs': {},

142

'options': {'queue': 'default'}

143

}

144

entry = ModelEntry.from_entry('example_task', **entry_config)

145

```

146

147

## Scheduler Configuration

148

149

### Beat Service Setup

150

151

Configure and run the Celery beat service with database scheduler.

152

153

```python

154

# settings.py or celery.py

155

from celery import Celery

156

157

app = Celery('myproject')

158

159

# Method 1: Full scheduler path

160

app.conf.beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'

161

162

# Method 2: Short form (requires django-celery-beat entry point)

163

app.conf.beat_scheduler = 'django'

164

165

# Additional beat scheduler configuration

166

app.conf.update(

167

beat_max_loop_interval=60, # Max seconds between database checks

168

beat_sync_every=0, # Sync immediately on changes (0 = immediate)

169

)

170

```

171

172

### Django Integration

173

174

```python

175

# settings.py

176

INSTALLED_APPS = [

177

# ... other apps

178

'django_celery_beat',

179

]

180

181

# Optional: Configure timezone for cron schedules

182

CELERY_TIMEZONE = 'America/New_York'

183

184

# Database configuration (standard Django)

185

DATABASES = {

186

'default': {

187

'ENGINE': 'django.db.backends.postgresql',

188

'NAME': 'myproject',

189

# ... other database settings

190

}

191

}

192

```

193

194

### Command Line Usage

195

196

```bash

197

# Start beat service with database scheduler

198

celery -A myproject beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

199

200

# Short form using entry point

201

celery -A myproject beat -l info -S django

202

203

# Combined worker and beat (development only)

204

celery -A myproject worker --beat --scheduler django --loglevel=info

205

206

# Beat service with custom settings

207

celery -A myproject beat -l info -S django --max-interval=30

208

```

209

210

## Advanced Scheduler Operations

211

212

### Schedule Synchronization

213

214

The scheduler automatically detects and synchronizes changes from the database.

215

216

```python

217

from django_celery_beat.schedulers import DatabaseScheduler

218

from django_celery_beat.models import PeriodicTasks

219

220

# Manual schedule sync trigger

221

scheduler = DatabaseScheduler(app)

222

223

# Check if database has changes

224

if scheduler.schedule_changed():

225

print("Database schedule has changed, syncing...")

226

scheduler.sync()

227

228

# Force schedule reload

229

scheduler.setup_schedule()

230

231

# Manually trigger change detection

232

PeriodicTasks.update_changed()

233

```

234

235

### Custom Scheduler Extensions

236

237

```python

238

from django_celery_beat.schedulers import DatabaseScheduler

239

from django_celery_beat.models import PeriodicTask

240

241

class CustomDatabaseScheduler(DatabaseScheduler):

242

"""

243

Extended database scheduler with custom behavior.

244

"""

245

246

def enabled_models_qs(self):

247

"""Override to add custom filtering."""

248

qs = super().enabled_models_qs()

249

# Add custom filtering, e.g., by environment

250

return qs.filter(description__contains='production')

251

252

def sync(self):

253

"""Override to add custom sync behavior."""

254

print("Performing custom sync operations...")

255

super().sync()

256

print("Custom sync completed")

257

258

def install_default_entries(self, data):

259

"""Override to customize default task installation."""

260

# Add custom default tasks

261

custom_defaults = {

262

'custom-cleanup': {

263

'task': 'myapp.tasks.custom_cleanup',

264

'schedule': schedules.crontab(hour=1, minute=0),

265

'options': {'queue': 'maintenance'}

266

}

267

}

268

data.update(custom_defaults)

269

super().install_default_entries(data)

270

271

# Use custom scheduler

272

app.conf.beat_scheduler = 'myproject.schedulers:CustomDatabaseScheduler'

273

```

274

275

### Monitoring and Debugging

276

277

```python

278

from django_celery_beat.schedulers import DatabaseScheduler

279

from django_celery_beat.models import PeriodicTask, PeriodicTasks

280

281

def debug_scheduler_state():

282

"""Debug function to inspect scheduler state."""

283

284

# Check enabled tasks

285

enabled_tasks = PeriodicTask.objects.filter(enabled=True)

286

print(f"Enabled tasks: {enabled_tasks.count()}")

287

288

for task in enabled_tasks:

289

print(f" - {task.name}: {task.task}")

290

print(f" Schedule: {task.scheduler}")

291

print(f" Last run: {task.last_run_at}")

292

print(f" Run count: {task.total_run_count}")

293

294

# Check change tracking

295

last_change = PeriodicTasks.last_change()

296

print(f"Last schedule change: {last_change}")

297

298

# Test scheduler detection

299

scheduler = DatabaseScheduler(app)

300

changed = scheduler.schedule_changed()

301

print(f"Scheduler detects changes: {changed}")

302

303

# Performance monitoring

304

def monitor_scheduler_performance():

305

"""Monitor scheduler database query performance."""

306

from django.db import connection

307

from django.conf import settings

308

309

# Enable query logging

310

settings.LOGGING = {

311

'version': 1,

312

'handlers': {

313

'console': {

314

'class': 'logging.StreamHandler',

315

},

316

},

317

'loggers': {

318

'django.db.backends': {

319

'handlers': ['console'],

320

'level': 'DEBUG',

321

},

322

},

323

}

324

325

scheduler = DatabaseScheduler(app)

326

327

# Monitor enabled_models_qs performance

328

with connection.cursor() as cursor:

329

initial_queries = len(connection.queries)

330

tasks = scheduler.enabled_models()

331

final_queries = len(connection.queries)

332

print(f"Scheduler loaded {len(tasks)} tasks with {final_queries - initial_queries} queries")

333

```

334

335

## Error Handling

336

337

### Common Scheduler Errors

338

339

```python

340

from django_celery_beat.schedulers import DatabaseScheduler

341

from django.db import OperationalError

342

import logging

343

344

logger = logging.getLogger(__name__)

345

346

def robust_scheduler_setup():

347

"""Set up scheduler with error handling."""

348

try:

349

scheduler = DatabaseScheduler(app)

350

scheduler.setup_schedule()

351

return scheduler

352

except OperationalError as e:

353

logger.error(f"Database connection error in scheduler: {e}")

354

# Fallback to empty schedule or retry logic

355

return None

356

except Exception as e:

357

logger.error(f"Unexpected scheduler error: {e}")

358

return None

359

360

def safe_schedule_sync(scheduler):

361

"""Safely sync schedule with error handling."""

362

try:

363

if scheduler and scheduler.schedule_changed():

364

scheduler.sync()

365

logger.info("Schedule synchronized successfully")

366

except Exception as e:

367

logger.error(f"Error syncing schedule: {e}")

368

# Continue with current schedule

369

```

370

371

### Scheduler Health Checks

372

373

```python

374

from django_celery_beat.models import PeriodicTask, PeriodicTasks

375

from django.core.management.base import BaseCommand

376

377

class Command(BaseCommand):

378

"""Management command to check scheduler health."""

379

380

def handle(self, *args, **options):

381

# Check database connectivity

382

try:

383

task_count = PeriodicTask.objects.count()

384

self.stdout.write(f"✓ Database accessible, {task_count} tasks found")

385

except Exception as e:

386

self.stdout.write(f"✗ Database error: {e}")

387

return

388

389

# Check for tasks with invalid schedules

390

invalid_tasks = []

391

for task in PeriodicTask.objects.filter(enabled=True):

392

try:

393

_ = task.schedule # This will raise if schedule is invalid

394

except Exception as e:

395

invalid_tasks.append((task.name, str(e)))

396

397

if invalid_tasks:

398

self.stdout.write("✗ Invalid schedules found:")

399

for name, error in invalid_tasks:

400

self.stdout.write(f" - {name}: {error}")

401

else:

402

self.stdout.write("✓ All enabled tasks have valid schedules")

403

404

# Check change tracking

405

try:

406

last_change = PeriodicTasks.last_change()

407

self.stdout.write(f"✓ Change tracking working, last change: {last_change}")

408

except Exception as e:

409

self.stdout.write(f"✗ Change tracking error: {e}")

410

```