0
# Task Management
1
2
Core models and utilities for defining, configuring, and tracking periodic tasks including task arguments, routing, scheduling, and execution management.
3
4
## Capabilities
5
6
### Periodic Task Model
7
8
Central model for defining periodic tasks with full configuration options.
9
10
```python { .api }
11
class PeriodicTask(models.Model):
12
"""
13
Model representing a periodic task to be executed by Celery.
14
15
Must be associated with exactly one schedule type (interval, crontab, solar, or clocked).
16
"""
17
# Task identification
18
name: str # Unique task name, max 200 chars
19
task: str # Celery task name to execute, max 200 chars
20
21
# Schedule associations (exactly one must be set)
22
interval: Optional[ForeignKey[IntervalSchedule]]
23
crontab: Optional[ForeignKey[CrontabSchedule]]
24
solar: Optional[ForeignKey[SolarSchedule]]
25
clocked: Optional[ForeignKey[ClockedSchedule]]
26
27
# Task arguments
28
args: str # JSON encoded positional arguments, default '[]'
29
kwargs: str # JSON encoded keyword arguments, default '{}'
30
31
# Routing and delivery
32
queue: Optional[str] # Queue override, max 200 chars
33
exchange: Optional[str] # AMQP exchange override, max 200 chars
34
routing_key: Optional[str] # AMQP routing key override, max 200 chars
35
headers: str # JSON encoded AMQP headers, default '{}'
36
priority: Optional[int] # Task priority 0-255
37
38
# Execution control
39
expires: Optional[datetime.datetime] # Absolute expiry time
40
expire_seconds: Optional[int] # Expiry in seconds from now
41
one_off: bool # Execute only once, default False
42
start_time: Optional[datetime.datetime] # When to start executing
43
enabled: bool # Whether task is enabled, default True
44
45
# Execution tracking (auto-managed)
46
last_run_at: Optional[datetime.datetime] # Last execution time
47
total_run_count: int # Number of times executed, default 0
48
date_changed: datetime.datetime # Last modified timestamp
49
50
# Documentation
51
description: str # Task description
52
53
# Manager and internal attributes
54
objects: PeriodicTaskQuerySet # Custom QuerySet manager
55
no_changes: bool # Internal flag for change tracking
56
57
@property
58
def scheduler(self) -> Union[IntervalSchedule, CrontabSchedule, SolarSchedule, ClockedSchedule]: ...
59
60
@property
61
def schedule(self) -> schedules.BaseSchedule: ...
62
63
@property
64
def expires_(self) -> Optional[datetime.datetime]: ...
65
66
def due_start_time(self, tz: Optional[tzinfo.tzinfo] = None) -> datetime.datetime: ...
67
68
def validate_unique(self): ...
69
```
70
71
**Usage Examples:**
72
73
```python
74
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
75
import json
76
from datetime import datetime, timedelta
77
78
# Create task with interval schedule
79
schedule = IntervalSchedule.objects.create(every=30, period=IntervalSchedule.MINUTES)
80
task = PeriodicTask.objects.create(
81
name='Process queue every 30 minutes',
82
task='myapp.tasks.process_queue',
83
interval=schedule,
84
args=json.dumps([]),
85
kwargs=json.dumps({'priority': 'high'}),
86
enabled=True
87
)
88
89
# Create task with cron schedule and expiry
90
cron_schedule = CrontabSchedule.objects.create(
91
minute='0',
92
hour='2',
93
day_of_week='*',
94
day_of_month='*',
95
month_of_year='*'
96
)
97
task = PeriodicTask.objects.create(
98
name='Daily backup at 2 AM',
99
task='myapp.tasks.backup_database',
100
crontab=cron_schedule,
101
expires=datetime.utcnow() + timedelta(days=30),
102
queue='backup_queue',
103
description='Automated daily database backup'
104
)
105
106
# Create one-off task
107
task = PeriodicTask.objects.create(
108
name='One-time data migration',
109
task='myapp.tasks.migrate_data',
110
interval=IntervalSchedule.objects.create(every=1, period=IntervalSchedule.SECONDS),
111
one_off=True,
112
args=json.dumps(['migration_v2'])
113
)
114
```
115
116
### Task Change Tracking
117
118
Model for tracking when periodic tasks are modified to trigger scheduler updates.
119
120
```python { .api }
121
class PeriodicTasks(models.Model):
122
"""
123
Helper model to track when periodic task schedules change.
124
125
Contains a single row with ID=1 that gets updated whenever tasks are modified.
126
"""
127
ident: int # Primary key, always 1
128
last_update: datetime.datetime # Timestamp of last change
129
130
@classmethod
131
def changed(cls, instance: PeriodicTask, **kwargs): ...
132
133
@classmethod
134
def update_changed(cls, **kwargs): ...
135
136
@classmethod
137
def last_change(cls) -> Optional[datetime.datetime]: ...
138
```
139
140
**Usage Examples:**
141
142
```python
143
from django_celery_beat.models import PeriodicTasks, PeriodicTask
144
145
# Check when tasks were last changed
146
last_change = PeriodicTasks.last_change()
147
print(f"Tasks last modified: {last_change}")
148
149
# Manually update change timestamp (for bulk operations)
150
# Note: This is automatically called when tasks are saved/deleted
151
PeriodicTask.objects.filter(enabled=False).update(enabled=True)
152
PeriodicTasks.update_changed() # Notify scheduler of bulk changes
153
154
# The changed() method is automatically called by Django signals
155
# when individual tasks are saved or deleted
156
```
157
158
### Custom QuerySet
159
160
Optimized queryset for periodic task queries with prefetch relationships.
161
162
```python { .api }
163
class PeriodicTaskQuerySet(QuerySet):
164
"""
165
Custom queryset for PeriodicTask with optimizations.
166
"""
167
def enabled(self) -> 'PeriodicTaskQuerySet': ...
168
```
169
170
**Usage Examples:**
171
172
```python
173
from django_celery_beat.models import PeriodicTask
174
175
# Get enabled tasks with optimized queries
176
enabled_tasks = PeriodicTask.objects.enabled()
177
178
# The enabled() method automatically includes:
179
# - prefetch_related('interval', 'crontab', 'solar', 'clocked')
180
# - filter(enabled=True)
181
182
for task in enabled_tasks:
183
print(f"Task: {task.name}, Schedule: {task.scheduler}")
184
# No additional queries needed due to prefetch_related
185
```
186
187
## Task Configuration Patterns
188
189
### JSON Argument Handling
190
191
Tasks use JSON encoding for arguments to ensure database compatibility.
192
193
```python
194
import json
195
196
# Positional arguments
197
args = json.dumps(['arg1', 'arg2', 123])
198
199
# Keyword arguments
200
kwargs = json.dumps({
201
'param1': 'value1',
202
'param2': True,
203
'param3': {'nested': 'dict'}
204
})
205
206
# AMQP headers
207
headers = json.dumps({
208
'priority': 9,
209
'retry_policy': {'max_retries': 3}
210
})
211
212
task = PeriodicTask.objects.create(
213
name='Complex task',
214
task='myapp.tasks.complex_task',
215
interval=schedule,
216
args=args,
217
kwargs=kwargs,
218
headers=headers
219
)
220
```
221
222
### Routing Configuration
223
224
Tasks can be routed to specific queues, exchanges, and routing keys.
225
226
```python
227
# Route to specific queue
228
task = PeriodicTask.objects.create(
229
name='High priority task',
230
task='myapp.tasks.urgent_task',
231
interval=schedule,
232
queue='high_priority',
233
priority=9
234
)
235
236
# Use custom exchange and routing key
237
task = PeriodicTask.objects.create(
238
name='Custom routing task',
239
task='myapp.tasks.custom_task',
240
interval=schedule,
241
exchange='custom_exchange',
242
routing_key='custom.routing.key'
243
)
244
```
245
246
### Task Lifecycle Management
247
248
```python
249
from datetime import datetime, timedelta
250
251
# Temporarily disable task
252
task = PeriodicTask.objects.get(name='my_task')
253
task.enabled = False
254
task.save()
255
256
# Set task expiry
257
task.expires = datetime.utcnow() + timedelta(hours=24)
258
task.save()
259
260
# Create task that starts in the future
261
task = PeriodicTask.objects.create(
262
name='Future task',
263
task='myapp.tasks.future_task',
264
interval=schedule,
265
start_time=datetime.utcnow() + timedelta(hours=2)
266
)
267
268
# Reset task execution history
269
task.last_run_at = None
270
task.total_run_count = 0
271
task.save()
272
```
273
274
## Error Handling
275
276
### Validation Errors
277
278
The PeriodicTask model enforces several validation rules:
279
280
- Exactly one schedule type must be specified
281
- Task name must be unique
282
- JSON fields must contain valid JSON
283
- Priority must be between 0-255 if specified
284
285
```python
286
from django.core.exceptions import ValidationError
287
288
try:
289
# This will raise ValidationError - no schedule specified
290
task = PeriodicTask(
291
name='Invalid task',
292
task='myapp.tasks.test'
293
)
294
task.full_clean() # Triggers validation
295
except ValidationError as e:
296
print(f"Validation error: {e}")
297
298
try:
299
# This will raise ValidationError - multiple schedules
300
task = PeriodicTask(
301
name='Invalid task',
302
task='myapp.tasks.test',
303
interval=interval_schedule,
304
crontab=crontab_schedule # Can't have both
305
)
306
task.full_clean()
307
except ValidationError as e:
308
print(f"Validation error: {e}")
309
```
310
311
### Common Patterns
312
313
```python
314
# Safe task creation with error handling
315
def create_periodic_task(name, task_name, schedule, **kwargs):
316
try:
317
task = PeriodicTask.objects.create(
318
name=name,
319
task=task_name,
320
**{schedule_type: schedule for schedule_type in ['interval', 'crontab', 'solar', 'clocked']
321
if schedule_type in kwargs},
322
**{k: v for k, v in kwargs.items()
323
if k not in ['interval', 'crontab', 'solar', 'clocked']}
324
)
325
return task
326
except ValidationError as e:
327
print(f"Failed to create task {name}: {e}")
328
return None
329
except Exception as e:
330
print(f"Unexpected error creating task {name}: {e}")
331
return None
332
333
# Get or create pattern with proper error handling
334
def get_or_create_task(name, task_name, schedule_kwargs, task_kwargs=None):
335
try:
336
task, created = PeriodicTask.objects.get_or_create(
337
name=name,
338
defaults={
339
'task': task_name,
340
**schedule_kwargs,
341
**(task_kwargs or {})
342
}
343
)
344
return task, created
345
except Exception as e:
346
print(f"Error getting/creating task {name}: {e}")
347
return None, False
348
```