0
# Database Scheduler
1
2
Celery beat scheduler implementation that reads periodic tasks from the database instead of configuration files, enabling dynamic task management and real-time schedule updates.
3
4
## Capabilities
5
6
### Database Scheduler Class
7
8
Main scheduler class that replaces Celery's default file-based scheduler.
9
10
```python { .api }
11
class DatabaseScheduler(Scheduler):
12
"""
13
Database-backed beat scheduler for Celery.
14
15
Reads periodic tasks from Django database instead of configuration files,
16
allowing dynamic task management through Django ORM and Admin interface.
17
"""
18
# Class attributes
19
Entry = ModelEntry # Entry class for database-backed entries
20
Model = PeriodicTask # Django model for periodic tasks
21
Changes = PeriodicTasks # Model for tracking changes
22
23
# Configuration constants
24
DEFAULT_MAX_INTERVAL = 5 # Default max interval between database checks
25
SCHEDULE_SYNC_MAX_INTERVAL = 300 # Max interval for schedule synchronization
26
27
def setup_schedule(self): ...
28
29
def all_as_schedule(self) -> dict: ...
30
31
def enabled_models(self) -> list[PeriodicTask]: ...
32
33
def enabled_models_qs(self) -> QuerySet[PeriodicTask]: ...
34
35
def schedule_changed(self) -> bool: ...
36
37
def sync(self): ...
38
39
def update_from_dict(self, mapping: dict): ...
40
41
def install_default_entries(self, data: dict): ...
42
43
def reserve(self, entry: ModelEntry) -> ModelEntry: ...
44
45
def schedules_equal(self, *args, **kwargs) -> bool: ...
46
```
47
48
**Usage Examples:**
49
50
```python
51
# Configure Celery to use database scheduler
52
from celery import Celery
53
54
app = Celery('myapp')
55
app.conf.update(
56
beat_scheduler='django_celery_beat.schedulers:DatabaseScheduler',
57
# or use the shorthand:
58
# beat_scheduler='django',
59
)
60
61
# Start beat service with database scheduler
62
# Command line:
63
# celery -A myapp beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
64
# or:
65
# celery -A myapp beat -l info -S django
66
67
# Programmatic scheduler usage (advanced)
68
from django_celery_beat.schedulers import DatabaseScheduler
69
70
scheduler = DatabaseScheduler(app=app)
71
scheduler.setup_schedule()
72
73
# Check if schedule has changed
74
if scheduler.schedule_changed():
75
scheduler.sync()
76
77
# Get all enabled tasks as schedule dict
78
schedule_dict = scheduler.all_as_schedule()
79
print(f"Found {len(schedule_dict)} enabled tasks")
80
```
81
82
### Model Entry Class
83
84
Database-backed schedule entry that represents individual periodic tasks.
85
86
```python { .api }
87
class ModelEntry(ScheduleEntry):
88
"""
89
Database-backed schedule entry for periodic tasks.
90
91
Represents a single periodic task with its schedule and configuration,
92
managing execution state and database synchronization.
93
"""
94
model_schedules = {
95
schedules.crontab: 'crontab',
96
schedules.schedule: 'interval',
97
schedules.solar: 'solar',
98
clocked: 'clocked',
99
}
100
101
save_fields = [
102
'last_run_at', 'total_run_count', 'no_changes',
103
]
104
105
def __init__(self, model: PeriodicTask, app: Celery = None): ...
106
107
def is_due(self) -> tuple[bool, float]: ...
108
109
def save(self): ...
110
111
@classmethod
112
def to_model_schedule(cls, schedule: schedules.BaseSchedule) -> dict: ...
113
114
@classmethod
115
def from_entry(cls, name: str, app: Celery = None, **entry_kwargs) -> 'ModelEntry': ...
116
```
117
118
**Usage Examples:**
119
120
```python
121
from django_celery_beat.schedulers import ModelEntry
122
from django_celery_beat.models import PeriodicTask
123
124
# Create model entry from database task
125
task = PeriodicTask.objects.get(name='my_task')
126
entry = ModelEntry(task)
127
128
# Check if task is due
129
is_due, next_time = entry.is_due()
130
if is_due:
131
print(f"Task {entry.name} is due for execution")
132
entry.save() # Update last_run_at and total_run_count
133
else:
134
print(f"Task {entry.name} next due in {next_time} seconds")
135
136
# Create entry from configuration dict (advanced usage)
137
entry_config = {
138
'task': 'myapp.tasks.example_task',
139
'schedule': schedules.schedule(run_every=30), # Every 30 seconds
140
'args': (),
141
'kwargs': {},
142
'options': {'queue': 'default'}
143
}
144
entry = ModelEntry.from_entry('example_task', **entry_config)
145
```
146
147
## Scheduler Configuration
148
149
### Beat Service Setup
150
151
Configure and run the Celery beat service with database scheduler.
152
153
```python
154
# settings.py or celery.py
155
from celery import Celery
156
157
app = Celery('myproject')
158
159
# Method 1: Full scheduler path
160
app.conf.beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'
161
162
# Method 2: Short form (requires django-celery-beat entry point)
163
app.conf.beat_scheduler = 'django'
164
165
# Additional beat scheduler configuration
166
app.conf.update(
167
beat_max_loop_interval=60, # Max seconds between database checks
168
beat_sync_every=0, # Sync immediately on changes (0 = immediate)
169
)
170
```
171
172
### Django Integration
173
174
```python
175
# settings.py
176
INSTALLED_APPS = [
177
# ... other apps
178
'django_celery_beat',
179
]
180
181
# Optional: Configure timezone for cron schedules
182
CELERY_TIMEZONE = 'America/New_York'
183
184
# Database configuration (standard Django)
185
DATABASES = {
186
'default': {
187
'ENGINE': 'django.db.backends.postgresql',
188
'NAME': 'myproject',
189
# ... other database settings
190
}
191
}
192
```
193
194
### Command Line Usage
195
196
```bash
197
# Start beat service with database scheduler
198
celery -A myproject beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
199
200
# Short form using entry point
201
celery -A myproject beat -l info -S django
202
203
# Combined worker and beat (development only)
204
celery -A myproject worker --beat --scheduler django --loglevel=info
205
206
# Beat service with custom settings
207
celery -A myproject beat -l info -S django --max-interval=30
208
```
209
210
## Advanced Scheduler Operations
211
212
### Schedule Synchronization
213
214
The scheduler automatically detects and synchronizes changes from the database.
215
216
```python
217
from django_celery_beat.schedulers import DatabaseScheduler
218
from django_celery_beat.models import PeriodicTasks
219
220
# Manual schedule sync trigger
221
scheduler = DatabaseScheduler(app)
222
223
# Check if database has changes
224
if scheduler.schedule_changed():
225
print("Database schedule has changed, syncing...")
226
scheduler.sync()
227
228
# Force schedule reload
229
scheduler.setup_schedule()
230
231
# Manually trigger change detection
232
PeriodicTasks.update_changed()
233
```
234
235
### Custom Scheduler Extensions
236
237
```python
238
from django_celery_beat.schedulers import DatabaseScheduler
239
from django_celery_beat.models import PeriodicTask
240
241
class CustomDatabaseScheduler(DatabaseScheduler):
242
"""
243
Extended database scheduler with custom behavior.
244
"""
245
246
def enabled_models_qs(self):
247
"""Override to add custom filtering."""
248
qs = super().enabled_models_qs()
249
# Add custom filtering, e.g., by environment
250
return qs.filter(description__contains='production')
251
252
def sync(self):
253
"""Override to add custom sync behavior."""
254
print("Performing custom sync operations...")
255
super().sync()
256
print("Custom sync completed")
257
258
def install_default_entries(self, data):
259
"""Override to customize default task installation."""
260
# Add custom default tasks
261
custom_defaults = {
262
'custom-cleanup': {
263
'task': 'myapp.tasks.custom_cleanup',
264
'schedule': schedules.crontab(hour=1, minute=0),
265
'options': {'queue': 'maintenance'}
266
}
267
}
268
data.update(custom_defaults)
269
super().install_default_entries(data)
270
271
# Use custom scheduler
272
app.conf.beat_scheduler = 'myproject.schedulers:CustomDatabaseScheduler'
273
```
274
275
### Monitoring and Debugging
276
277
```python
278
from django_celery_beat.schedulers import DatabaseScheduler
279
from django_celery_beat.models import PeriodicTask, PeriodicTasks
280
281
def debug_scheduler_state():
282
"""Debug function to inspect scheduler state."""
283
284
# Check enabled tasks
285
enabled_tasks = PeriodicTask.objects.filter(enabled=True)
286
print(f"Enabled tasks: {enabled_tasks.count()}")
287
288
for task in enabled_tasks:
289
print(f" - {task.name}: {task.task}")
290
print(f" Schedule: {task.scheduler}")
291
print(f" Last run: {task.last_run_at}")
292
print(f" Run count: {task.total_run_count}")
293
294
# Check change tracking
295
last_change = PeriodicTasks.last_change()
296
print(f"Last schedule change: {last_change}")
297
298
# Test scheduler detection
299
scheduler = DatabaseScheduler(app)
300
changed = scheduler.schedule_changed()
301
print(f"Scheduler detects changes: {changed}")
302
303
# Performance monitoring
304
def monitor_scheduler_performance():
305
"""Monitor scheduler database query performance."""
306
from django.db import connection
307
from django.conf import settings
308
309
# Enable query logging
310
settings.LOGGING = {
311
'version': 1,
312
'handlers': {
313
'console': {
314
'class': 'logging.StreamHandler',
315
},
316
},
317
'loggers': {
318
'django.db.backends': {
319
'handlers': ['console'],
320
'level': 'DEBUG',
321
},
322
},
323
}
324
325
scheduler = DatabaseScheduler(app)
326
327
# Monitor enabled_models_qs performance
328
with connection.cursor() as cursor:
329
initial_queries = len(connection.queries)
330
tasks = scheduler.enabled_models()
331
final_queries = len(connection.queries)
332
print(f"Scheduler loaded {len(tasks)} tasks with {final_queries - initial_queries} queries")
333
```
334
335
## Error Handling
336
337
### Common Scheduler Errors
338
339
```python
340
from django_celery_beat.schedulers import DatabaseScheduler
341
from django.db import OperationalError
342
import logging
343
344
logger = logging.getLogger(__name__)
345
346
def robust_scheduler_setup():
347
"""Set up scheduler with error handling."""
348
try:
349
scheduler = DatabaseScheduler(app)
350
scheduler.setup_schedule()
351
return scheduler
352
except OperationalError as e:
353
logger.error(f"Database connection error in scheduler: {e}")
354
# Fallback to empty schedule or retry logic
355
return None
356
except Exception as e:
357
logger.error(f"Unexpected scheduler error: {e}")
358
return None
359
360
def safe_schedule_sync(scheduler):
361
"""Safely sync schedule with error handling."""
362
try:
363
if scheduler and scheduler.schedule_changed():
364
scheduler.sync()
365
logger.info("Schedule synchronized successfully")
366
except Exception as e:
367
logger.error(f"Error syncing schedule: {e}")
368
# Continue with current schedule
369
```
370
371
### Scheduler Health Checks
372
373
```python
374
from django_celery_beat.models import PeriodicTask, PeriodicTasks
375
from django.core.management.base import BaseCommand
376
377
class Command(BaseCommand):
378
"""Management command to check scheduler health."""
379
380
def handle(self, *args, **options):
381
# Check database connectivity
382
try:
383
task_count = PeriodicTask.objects.count()
384
self.stdout.write(f"✓ Database accessible, {task_count} tasks found")
385
except Exception as e:
386
self.stdout.write(f"✗ Database error: {e}")
387
return
388
389
# Check for tasks with invalid schedules
390
invalid_tasks = []
391
for task in PeriodicTask.objects.filter(enabled=True):
392
try:
393
_ = task.schedule # This will raise if schedule is invalid
394
except Exception as e:
395
invalid_tasks.append((task.name, str(e)))
396
397
if invalid_tasks:
398
self.stdout.write("✗ Invalid schedules found:")
399
for name, error in invalid_tasks:
400
self.stdout.write(f" - {name}: {error}")
401
else:
402
self.stdout.write("✓ All enabled tasks have valid schedules")
403
404
# Check change tracking
405
try:
406
last_change = PeriodicTasks.last_change()
407
self.stdout.write(f"✓ Change tracking working, last change: {last_change}")
408
except Exception as e:
409
self.stdout.write(f"✗ Change tracking error: {e}")
410
```