0
# Scheduling and Beat
1
2
Periodic task scheduling capabilities including cron-like scheduling, interval-based schedules, solar event timing, and beat scheduler management for running recurring tasks in distributed environments.
3
4
## Capabilities
5
6
### Crontab Scheduling
7
8
Unix cron-style scheduling with minute, hour, day, and month specifications for precise timing control.
9
10
```python { .api }
11
class crontab:
12
def __init__(
13
self,
14
minute='*',
15
hour='*',
16
day_of_week='*',
17
day_of_month='*',
18
month_of_year='*',
19
nowfun=None,
20
app=None
21
):
22
"""
23
Create crontab schedule.
24
25
Args:
26
minute (str): Minute field (0-59, '*', '*/5', '10,20,30')
27
hour (str): Hour field (0-23, '*', '*/2', '9-17')
28
day_of_week (str): Day of week (0-6, mon-sun, '*')
29
day_of_month (str): Day of month (1-31, '*', '1,15')
30
month_of_year (str): Month (1-12, jan-dec, '*')
31
nowfun (callable): Function returning current time
32
app: Celery app instance
33
"""
34
35
def is_due(self, last_run_at):
36
"""
37
Check if schedule is due for execution.
38
39
Args:
40
last_run_at (datetime): Last execution time
41
42
Returns:
43
tuple: (is_due, next_time_to_run)
44
"""
45
46
def remaining_estimate(self, last_run_at):
47
"""
48
Estimate time until next execution.
49
50
Args:
51
last_run_at (datetime): Last execution time
52
53
Returns:
54
timedelta: Time remaining until next run
55
"""
56
57
def crontab(minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*'):
58
"""
59
Create cron-style schedule.
60
61
Args:
62
minute (str): Minutes (0-59, *, */N, ranges, lists)
63
hour (str): Hours (0-23, *, */N, ranges, lists)
64
day_of_week (str): Days of week (0-6 or names, *, */N, ranges, lists)
65
day_of_month (str): Days of month (1-31, *, */N, ranges, lists)
66
month_of_year (str): Months (1-12 or names, *, */N, ranges, lists)
67
68
Returns:
69
crontab instance
70
"""
71
```
72
73
### Interval Scheduling
74
75
Simple interval-based scheduling using timedelta objects for regular recurring execution.
76
77
```python { .api }
78
class schedule:
79
def __init__(self, run_every, relative=False, nowfun=None, app=None):
80
"""
81
Create interval schedule.
82
83
Args:
84
run_every (timedelta): Interval between executions
85
relative (bool): If True, schedule relative to last run
86
nowfun (callable): Function returning current time
87
app: Celery app instance
88
"""
89
90
def is_due(self, last_run_at):
91
"""
92
Check if schedule is due.
93
94
Args:
95
last_run_at (datetime): Last execution time
96
97
Returns:
98
tuple: (is_due, next_time_to_run)
99
"""
100
101
def remaining_estimate(self, last_run_at):
102
"""
103
Estimate time until next execution.
104
105
Args:
106
last_run_at (datetime): Last execution time
107
108
Returns:
109
timedelta: Time remaining
110
"""
111
112
def schedule(run_every, relative=False):
113
"""
114
Create interval-based schedule.
115
116
Args:
117
run_every (timedelta): Time between executions
118
relative (bool): Schedule relative to last run vs absolute intervals
119
120
Returns:
121
schedule instance
122
"""
123
```
124
125
### Solar Event Scheduling
126
127
Astronomical event-based scheduling for sunrise, sunset, and other solar events at specific geographic locations.
128
129
```python { .api }
130
class solar:
131
def __init__(self, event, lat, lon, nowfun=None, app=None):
132
"""
133
Create solar event schedule.
134
135
Args:
136
event (str): Solar event ('sunrise', 'sunset', 'dawn_civil', 'dusk_civil')
137
lat (float): Latitude coordinate
138
lon (float): Longitude coordinate
139
nowfun (callable): Function returning current time
140
app: Celery app instance
141
"""
142
143
def is_due(self, last_run_at):
144
"""
145
Check if solar event schedule is due.
146
147
Args:
148
last_run_at (datetime): Last execution time
149
150
Returns:
151
tuple: (is_due, next_time_to_run)
152
"""
153
154
def remaining_estimate(self, last_run_at):
155
"""
156
Estimate time until next solar event.
157
158
Args:
159
last_run_at (datetime): Last execution time
160
161
Returns:
162
timedelta: Time remaining
163
"""
164
165
def solar(event, lat, lon):
166
"""
167
Create solar event schedule.
168
169
Args:
170
event (str): Solar event type
171
lat (float): Latitude (-90 to 90)
172
lon (float): Longitude (-180 to 180)
173
174
Returns:
175
solar instance
176
"""
177
```
178
179
### Beat Scheduler Classes
180
181
Core beat scheduler components for managing and executing periodic tasks.
182
183
```python { .api }
184
class Scheduler:
185
def __init__(self, app, max_interval=None, Producer=None, lazy=False, **kwargs):
186
"""
187
Base scheduler class.
188
189
Args:
190
app: Celery application instance
191
max_interval (int): Maximum sleep interval between checks
192
Producer: Message producer class
193
lazy (bool): Don't start immediately
194
"""
195
196
def setup_schedule(self):
197
"""Initialize schedule from configuration."""
198
199
def sync(self):
200
"""Sync schedule with storage backend."""
201
202
def close(self):
203
"""Close scheduler and cleanup resources."""
204
205
def add(self, **kwargs):
206
"""
207
Add schedule entry.
208
209
Args:
210
**kwargs: Schedule entry parameters
211
"""
212
213
def tick(self):
214
"""
215
Check schedule and run due tasks.
216
217
Returns:
218
float: Seconds until next check needed
219
"""
220
221
@property
222
def schedule(self):
223
"""Current schedule dictionary."""
224
225
class PersistentScheduler(Scheduler):
226
def __init__(self, *args, **kwargs):
227
"""Scheduler that persists schedule to disk."""
228
229
def setup_schedule(self):
230
"""Load schedule from persistent storage."""
231
232
def sync(self):
233
"""Save schedule to persistent storage."""
234
235
class ScheduleEntry:
236
def __init__(
237
self,
238
task=None,
239
schedule=None,
240
args=None,
241
kwargs=None,
242
options=None,
243
name=None,
244
**kw
245
):
246
"""
247
Individual schedule entry.
248
249
Args:
250
task (str): Task name to execute
251
schedule: Schedule object (crontab, schedule, solar)
252
args (tuple): Task arguments
253
kwargs (dict): Task keyword arguments
254
options (dict): Task execution options
255
name (str): Entry name/identifier
256
"""
257
258
def is_due(self):
259
"""
260
Check if entry is due for execution.
261
262
Returns:
263
tuple: (is_due, next_time_to_run)
264
"""
265
266
def __call__(self, sender=None):
267
"""Execute the scheduled task."""
268
269
def update(self, other):
270
"""
271
Update entry with values from another entry.
272
273
Args:
274
other (ScheduleEntry): Entry to copy from
275
"""
276
277
class Service:
278
def __init__(self, app, max_interval=None, schedule_filename=None, scheduler_cls=None):
279
"""
280
Beat service for running periodic tasks.
281
282
Args:
283
app: Celery application instance
284
max_interval (int): Maximum sleep between schedule checks
285
schedule_filename (str): Persistent schedule file path
286
scheduler_cls: Custom scheduler class
287
"""
288
289
def start(self, embedded_process=False):
290
"""
291
Start beat service.
292
293
Args:
294
embedded_process (bool): Run as embedded process
295
"""
296
297
def sync(self):
298
"""Sync scheduler state."""
299
300
def stop(self, graceful=True):
301
"""
302
Stop beat service.
303
304
Args:
305
graceful (bool): Allow graceful shutdown
306
"""
307
```
308
309
### Beat Configuration
310
311
Configuration options and utilities for periodic task management.
312
313
```python { .api }
314
# Configuration via app.conf.beat_schedule
315
BEAT_SCHEDULE = {
316
'task-name': {
317
'task': 'myapp.tasks.my_task',
318
'schedule': crontab(minute=0, hour=4), # Execute daily at 4:00 AM
319
'args': (16, 16),
320
'kwargs': {'verbose': True},
321
'options': {'expires': 15.0}
322
}
323
}
324
325
# Add periodic task programmatically
326
app.add_periodic_task(
327
crontab(hour=7, minute=30, day_of_week=1),
328
send_weekly_report.s('Weekly Report'),
329
name='weekly report'
330
)
331
```
332
333
## Usage Examples
334
335
### Crontab Scheduling
336
337
```python
338
from celery import Celery
339
from celery.schedules import crontab
340
341
app = Celery('scheduler_example')
342
343
@app.task
344
def cleanup_temp_files():
345
# Cleanup logic here
346
return "Cleanup completed"
347
348
@app.task
349
def generate_report():
350
return "Report generated"
351
352
@app.task
353
def backup_database():
354
return "Database backed up"
355
356
# Configure periodic tasks
357
app.conf.beat_schedule = {
358
# Run every day at 2:30 AM
359
'daily-cleanup': {
360
'task': 'cleanup_temp_files',
361
'schedule': crontab(hour=2, minute=30),
362
},
363
364
# Run every Monday at 7:30 AM
365
'weekly-report': {
366
'task': 'generate_report',
367
'schedule': crontab(hour=7, minute=30, day_of_week=1),
368
},
369
370
# Run every 15 minutes during business hours
371
'frequent-check': {
372
'task': 'cleanup_temp_files',
373
'schedule': crontab(minute='*/15', hour='9-17'),
374
},
375
376
# Run on specific days of month
377
'monthly-backup': {
378
'task': 'backup_database',
379
'schedule': crontab(hour=1, minute=0, day_of_month='1,15'),
380
},
381
382
# Run multiple times per day
383
'multiple-daily': {
384
'task': 'cleanup_temp_files',
385
'schedule': crontab(hour='6,12,18', minute=0),
386
}
387
}
388
389
app.conf.timezone = 'UTC'
390
```
391
392
### Interval Scheduling
393
394
```python
395
from datetime import timedelta
396
from celery.schedules import schedule
397
398
@app.task
399
def periodic_health_check():
400
return "Health check completed"
401
402
@app.task
403
def process_queue():
404
return "Queue processed"
405
406
# Interval-based scheduling
407
app.conf.beat_schedule = {
408
# Run every 30 seconds
409
'health-check': {
410
'task': 'periodic_health_check',
411
'schedule': schedule(run_every=timedelta(seconds=30)),
412
},
413
414
# Run every 5 minutes
415
'process-queue': {
416
'task': 'process_queue',
417
'schedule': schedule(run_every=timedelta(minutes=5)),
418
'args': ('high_priority',),
419
'options': {'expires': 60} # Task expires after 60 seconds
420
},
421
422
# Run every hour with relative timing
423
'hourly-task': {
424
'task': 'cleanup_temp_files',
425
'schedule': schedule(run_every=timedelta(hours=1), relative=True),
426
}
427
}
428
```
429
430
### Solar Event Scheduling
431
432
```python
433
from celery.schedules import solar
434
435
@app.task
436
def morning_activation():
437
return "Morning systems activated"
438
439
@app.task
440
def evening_shutdown():
441
return "Evening systems shutdown"
442
443
# Solar event scheduling (San Francisco coordinates)
444
app.conf.beat_schedule = {
445
# Run at sunrise
446
'morning-startup': {
447
'task': 'morning_activation',
448
'schedule': solar('sunrise', 37.7749, -122.4194),
449
},
450
451
# Run at sunset
452
'evening-shutdown': {
453
'task': 'evening_shutdown',
454
'schedule': solar('sunset', 37.7749, -122.4194),
455
}
456
}
457
```
458
459
### Programmatic Schedule Management
460
461
```python
462
from celery.schedules import crontab
463
from datetime import timedelta
464
465
@app.task
466
def dynamic_task(message):
467
return f"Executed: {message}"
468
469
# Add periodic tasks programmatically
470
def setup_dynamic_schedule():
471
# Add daily task
472
app.add_periodic_task(
473
crontab(hour=8, minute=0),
474
dynamic_task.s('Daily morning task'),
475
name='morning task'
476
)
477
478
# Add interval task
479
app.add_periodic_task(
480
timedelta(minutes=10),
481
dynamic_task.s('Every 10 minutes'),
482
name='frequent task'
483
)
484
485
# Add conditional task
486
import datetime
487
if datetime.datetime.now().weekday() == 0: # Monday
488
app.add_periodic_task(
489
crontab(hour=9, minute=0, day_of_week=1),
490
dynamic_task.s('Monday special task'),
491
name='monday task'
492
)
493
494
# Call during app initialization
495
setup_dynamic_schedule()
496
```
497
498
### Custom Scheduler
499
500
```python
501
from celery.beat import Scheduler, ScheduleEntry
502
import json
503
import os
504
505
class DatabaseScheduler(Scheduler):
506
"""Custom scheduler that stores schedule in database."""
507
508
def __init__(self, *args, **kwargs):
509
self.database_url = kwargs.pop('database_url', 'sqlite:///schedule.db')
510
super().__init__(*args, **kwargs)
511
512
def setup_schedule(self):
513
"""Load schedule from database."""
514
# Implementation would load from database
515
self.schedule = self.load_schedule_from_db()
516
517
def sync(self):
518
"""Save schedule to database."""
519
# Implementation would save to database
520
self.save_schedule_to_db()
521
522
def load_schedule_from_db(self):
523
"""Load schedule entries from database."""
524
# Database loading logic
525
return {}
526
527
def save_schedule_to_db(self):
528
"""Save schedule entries to database."""
529
# Database saving logic
530
pass
531
532
# Use custom scheduler
533
app.conf.beat_scheduler = 'myapp.schedulers:DatabaseScheduler'
534
```
535
536
### Beat Service Management
537
538
```python
539
from celery.beat import Service
540
541
def run_beat_service():
542
"""Run beat service programmatically."""
543
544
# Create beat service
545
beat_service = Service(
546
app=app,
547
max_interval=60, # Check schedule every 60 seconds max
548
schedule_filename='celerybeat-schedule'
549
)
550
551
try:
552
# Start beat service (blocking)
553
beat_service.start()
554
except KeyboardInterrupt:
555
print("Beat service interrupted")
556
finally:
557
# Cleanup
558
beat_service.stop()
559
560
# Run beat as embedded process
561
def run_embedded_beat():
562
"""Run beat embedded in worker process."""
563
564
beat_service = Service(app=app)
565
beat_service.start(embedded_process=True)
566
567
# Continue with other work...
568
569
# Stop when needed
570
beat_service.stop()
571
```
572
573
### Advanced Scheduling Patterns
574
575
```python
576
from celery.schedules import crontab, schedule
577
from datetime import datetime, timedelta
578
579
@app.task
580
def conditional_task():
581
# Task that only runs under certain conditions
582
current_hour = datetime.now().hour
583
if 9 <= current_hour <= 17: # Business hours
584
return "Task executed during business hours"
585
else:
586
return "Task skipped outside business hours"
587
588
@app.task
589
def escalating_task(attempt=1):
590
# Task with escalating retry intervals
591
try:
592
# Some operation that might fail
593
if attempt < 3:
594
raise Exception(f"Simulated failure, attempt {attempt}")
595
return f"Success on attempt {attempt}"
596
except Exception:
597
# Retry with increasing delay
598
escalating_task.apply_async(
599
args=(attempt + 1,),
600
countdown=attempt * 60 # 1min, 2min, 3min delays
601
)
602
603
# Complex scheduling configuration
604
app.conf.beat_schedule = {
605
# Task with complex cron expression
606
'business-hours-only': {
607
'task': 'conditional_task',
608
'schedule': crontab(minute='*/30', hour='9-17', day_of_week='1-5'),
609
'options': {
610
'expires': 15 * 60, # Expire after 15 minutes
611
'retry': True,
612
'retry_policy': {
613
'max_retries': 3,
614
'interval_start': 0,
615
'interval_step': 0.2,
616
'interval_max': 0.2,
617
}
618
}
619
},
620
621
# Weekend-only task
622
'weekend-maintenance': {
623
'task': 'cleanup_temp_files',
624
'schedule': crontab(hour=3, minute=0, day_of_week='6,0'), # Saturday and Sunday
625
'kwargs': {'deep_clean': True}
626
},
627
628
# End of month reporting
629
'month-end-report': {
630
'task': 'generate_report',
631
'schedule': crontab(hour=23, minute=59, day_of_month='28-31'), # Last days of month
632
'kwargs': {'report_type': 'monthly'}
633
}
634
}
635
```
636
637
### Monitoring and Debugging
638
639
```python
640
@app.task(bind=True)
641
def monitored_periodic_task(self):
642
"""Periodic task with built-in monitoring."""
643
644
try:
645
# Task logic here
646
result = "Task completed successfully"
647
648
# Update task state
649
self.update_state(
650
state='SUCCESS',
651
meta={'result': result, 'timestamp': datetime.now().isoformat()}
652
)
653
654
return result
655
656
except Exception as exc:
657
# Handle errors
658
self.update_state(
659
state='FAILURE',
660
meta={'error': str(exc), 'timestamp': datetime.now().isoformat()}
661
)
662
raise exc
663
664
# Configure with detailed options
665
app.conf.beat_schedule = {
666
'monitored-task': {
667
'task': 'monitored_periodic_task',
668
'schedule': crontab(minute='*/5'),
669
'options': {
670
'task_track_started': True,
671
'task_serializer': 'json',
672
'task_routes': {'monitored_periodic_task': {'queue': 'periodic'}}
673
}
674
}
675
}
676
```