0
# Core Application
1
2
Core Celery application classes and task creation mechanisms that form the foundation of distributed task processing. These components provide the primary interface for creating, configuring, and managing Celery applications and tasks.
3
4
## Capabilities
5
6
### Celery Application Class
7
8
Main application class that serves as the central coordination point for task management, configuration, and worker communication.
9
10
```python { .api }
11
class Celery:
12
def __init__(
13
self,
14
main=None,
15
loader=None,
16
backend=None,
17
amqp=None,
18
events=None,
19
log=None,
20
control=None,
21
set_as_current=True,
22
tasks=None,
23
broker=None,
24
include=None,
25
changes=None,
26
config_source=None,
27
fixups=None,
28
task_cls=None,
29
autofinalize=True,
30
namespace=None,
31
strict_typing=True,
32
**kwargs
33
):
34
"""
35
Create a Celery application instance.
36
37
Args:
38
main (str): Name of main module if running as __main__
39
loader: Custom loader class for configuration
40
backend (str): Result backend URL or class
41
broker (str): Message broker URL
42
include (list): Modules to import when worker starts
43
set_as_current (bool): Make this the current app
44
autofinalize (bool): Auto-finalize app on first use
45
namespace (str): Configuration key namespace
46
"""
47
48
def task(self, *args, **opts):
49
"""
50
Decorator to create task from any callable.
51
52
Args:
53
bind (bool): Create bound task with self parameter
54
name (str): Custom task name
55
base (class): Custom task base class
56
serializer (str): Task argument serializer
57
max_retries (int): Maximum retry attempts
58
default_retry_delay (int): Default retry delay in seconds
59
rate_limit (str): Rate limit (e.g., '100/m' for 100/minute)
60
time_limit (int): Hard time limit in seconds
61
soft_time_limit (int): Soft time limit in seconds
62
ignore_result (bool): Don't store task results
63
store_errors_even_if_ignored (bool): Store errors even when ignoring results
64
65
Returns:
66
Task class instance
67
"""
68
69
def send_task(
70
self,
71
name,
72
args=None,
73
kwargs=None,
74
countdown=None,
75
eta=None,
76
task_id=None,
77
producer=None,
78
connection=None,
79
router=None,
80
result_cls=None,
81
expires=None,
82
publisher=None,
83
link=None,
84
link_error=None,
85
add_to_parent=True,
86
group_id=None,
87
group_index=None,
88
retries=0,
89
chord=None,
90
reply_to=None,
91
time_limit=None,
92
soft_time_limit=None,
93
root_id=None,
94
parent_id=None,
95
route_name=None,
96
shadow=None,
97
chain=None,
98
task_type=None,
99
**options
100
):
101
"""
102
Send task by name without having the task function imported.
103
104
Args:
105
name (str): Task name to execute
106
args (tuple): Positional arguments for task
107
kwargs (dict): Keyword arguments for task
108
countdown (int): Delay execution for N seconds
109
eta (datetime): Specific execution time
110
task_id (str): Custom task ID
111
expires (datetime|int): Task expiration time
112
link (Signature): Success callback
113
link_error (Signature): Failure callback
114
115
Returns:
116
AsyncResult instance
117
"""
118
119
def signature(self, *args, **kwargs):
120
"""
121
Create signature bound to this app.
122
123
Returns:
124
Signature instance
125
"""
126
127
def start(self, argv=None):
128
"""
129
Run celery using command line arguments.
130
131
Args:
132
argv (list): Command line arguments
133
"""
134
135
def worker_main(self, argv=None):
136
"""
137
Run celery worker using command line arguments.
138
139
Args:
140
argv (list): Command line arguments
141
"""
142
143
def config_from_object(self, obj, silent=False, force=False, namespace=None):
144
"""
145
Load configuration from object.
146
147
Args:
148
obj: Configuration object, module, or string
149
silent (bool): Don't raise on import errors
150
force (bool): Force update even if finalized
151
namespace (str): Only load keys with this prefix
152
"""
153
154
def config_from_envvar(self, variable_name, silent=False, force=False):
155
"""
156
Load configuration from environment variable.
157
158
Args:
159
variable_name (str): Environment variable name
160
silent (bool): Don't raise if variable not found
161
force (bool): Force update even if finalized
162
"""
163
164
def autodiscover_tasks(self, packages=None, related_name='tasks', force=False):
165
"""
166
Automatically discover tasks from packages.
167
168
Args:
169
packages (list): Packages to search (defaults to INSTALLED_APPS)
170
related_name (str): Module name to search for tasks
171
force (bool): Force discovery even if already done
172
"""
173
174
def finalize(self, auto=False):
175
"""
176
Finalize the app configuration.
177
178
Args:
179
auto (bool): Called automatically during first use
180
"""
181
182
def close(self):
183
"""Clean up after the application."""
184
185
def connection_for_read(self, url=None, **kwargs):
186
"""
187
Get connection for consuming messages.
188
189
Args:
190
url (str): Broker URL override
191
192
Returns:
193
Connection instance
194
"""
195
196
def connection_for_write(self, url=None, **kwargs):
197
"""
198
Get connection for producing messages.
199
200
Args:
201
url (str): Broker URL override
202
203
Returns:
204
Connection instance
205
"""
206
207
def add_periodic_task(
208
self,
209
schedule,
210
sig,
211
args=(),
212
kwargs=(),
213
name=None,
214
**opts
215
):
216
"""
217
Add periodic task to beat schedule.
218
219
Args:
220
schedule: Schedule instance (crontab, schedule)
221
sig (Signature): Task to execute
222
args (tuple): Arguments for task
223
kwargs (dict): Keyword arguments for task
224
name (str): Schedule entry name
225
"""
226
227
@property
228
def conf(self):
229
"""Current configuration namespace."""
230
231
@property
232
def tasks(self):
233
"""Task registry containing all registered tasks."""
234
235
@property
236
def backend(self):
237
"""Current result backend instance."""
238
239
@property
240
def control(self):
241
"""Remote control interface for workers."""
242
243
@property
244
def events(self):
245
"""Events interface for monitoring."""
246
247
@property
248
def current_task(self):
249
"""Currently executing task."""
250
```
251
252
### Task Base Class
253
254
Base class for all Celery tasks, providing execution methods and task context access.
255
256
```python { .api }
257
class Task:
258
def __init__(self):
259
"""Initialize task instance."""
260
261
def delay(self, *args, **kwargs):
262
"""
263
Shortcut to apply_async with only positional arguments.
264
265
Args:
266
*args: Positional arguments for task
267
**kwargs: Keyword arguments for task
268
269
Returns:
270
AsyncResult instance
271
"""
272
273
def apply_async(
274
self,
275
args=None,
276
kwargs=None,
277
task_id=None,
278
producer=None,
279
link=None,
280
link_error=None,
281
shadow=None,
282
**options
283
):
284
"""
285
Apply task asynchronously.
286
287
Args:
288
args (tuple): Positional arguments
289
kwargs (dict): Keyword arguments
290
task_id (str): Custom task ID
291
producer: Message producer
292
link (Signature): Success callback
293
link_error (Signature): Error callback
294
shadow (str): Override task name in logs
295
countdown (int): Delay execution N seconds
296
eta (datetime): Execute at specific time
297
expires (datetime|int): Task expiration
298
retry (bool): Enable retries
299
retry_policy (dict): Retry configuration
300
301
Returns:
302
AsyncResult instance
303
"""
304
305
def apply(self, args=None, kwargs=None, **options):
306
"""
307
Execute task synchronously in current process.
308
309
Args:
310
args (tuple): Positional arguments
311
kwargs (dict): Keyword arguments
312
313
Returns:
314
Task result directly
315
"""
316
317
def retry(
318
self,
319
args=None,
320
kwargs=None,
321
exc=None,
322
throw=True,
323
eta=None,
324
countdown=None,
325
max_retries=None,
326
**options
327
):
328
"""
329
Retry current task.
330
331
Args:
332
args (tuple): New positional arguments
333
kwargs (dict): New keyword arguments
334
exc (Exception): Exception that caused retry
335
throw (bool): Re-raise Retry exception
336
eta (datetime): Retry at specific time
337
countdown (int): Retry after N seconds
338
max_retries (int): Override max retries
339
340
Raises:
341
Retry: To trigger task retry
342
"""
343
344
def signature(self, args=None, kwargs=None, **options):
345
"""
346
Create signature for this task.
347
348
Args:
349
args (tuple): Positional arguments
350
kwargs (dict): Keyword arguments
351
352
Returns:
353
Signature instance
354
"""
355
356
def s(self, *args, **kwargs):
357
"""
358
Shortcut for signature creation.
359
360
Args:
361
*args: Positional arguments
362
**kwargs: Keyword arguments
363
364
Returns:
365
Signature instance
366
"""
367
368
def si(self, *args, **kwargs):
369
"""
370
Create immutable signature.
371
372
Args:
373
*args: Positional arguments
374
**kwargs: Keyword arguments
375
376
Returns:
377
Immutable signature instance
378
"""
379
380
def chunks(self, it, n):
381
"""
382
Split iterator into chunks for parallel processing.
383
384
Args:
385
it: Iterator to chunk
386
n (int): Chunk size
387
388
Returns:
389
Chunks instance
390
"""
391
392
@property
393
def name(self):
394
"""Task name."""
395
396
@property
397
def app(self):
398
"""Celery app instance this task is bound to."""
399
400
@property
401
def request(self):
402
"""Current task request context."""
403
```
404
405
### Shared Task Decorator
406
407
Decorator for creating tasks that work with any Celery app, useful for reusable libraries and Django integration.
408
409
```python { .api }
410
def shared_task(*args, **kwargs):
411
"""
412
Create task that works with any Celery app instance.
413
414
Args:
415
bind (bool): Create bound task with self parameter
416
name (str): Custom task name
417
base (class): Custom task base class
418
serializer (str): Argument serializer
419
max_retries (int): Maximum retry attempts
420
default_retry_delay (int): Default retry delay
421
rate_limit (str): Task rate limit
422
ignore_result (bool): Don't store results
423
424
Returns:
425
Task decorator function
426
"""
427
428
def current_app():
429
"""
430
Get the current Celery application instance.
431
432
Returns:
433
Celery: Current application instance
434
435
Raises:
436
RuntimeError: If no current app is set
437
"""
438
439
def current_task():
440
"""
441
Get the currently executing task.
442
443
Returns:
444
Task: Current task instance or None if not in task context
445
"""
446
```
447
448
### Task Request Context
449
450
Context object providing access to current task metadata and execution information.
451
452
```python { .api }
453
class Context:
454
"""
455
Task execution context available via Task.request.
456
457
Attributes:
458
id (str): Unique task ID
459
args (tuple): Task positional arguments
460
kwargs (dict): Task keyword arguments
461
retries (int): Number of retries attempted
462
is_eager (bool): True if executed synchronously
463
eta (datetime): Scheduled execution time
464
expires (datetime): Task expiration time
465
headers (dict): Message headers
466
delivery_info (dict): Message delivery information
467
reply_to (str): Reply queue name
468
correlation_id (str): Message correlation ID
469
root_id (str): Root task ID in chain
470
parent_id (str): Parent task ID
471
group (str): Group ID if part of group
472
group_index (int): Position in group
473
chord (str): Chord ID if part of chord
474
chain (list): Chain information
475
hostname (str): Worker hostname
476
logfile (str): Worker log file
477
loglevel (int): Worker log level
478
utc (bool): Use UTC times
479
called_directly (bool): Called via apply()
480
callbacks (list): Success callbacks
481
errbacks (list): Error callbacks
482
timelimit (tuple): Time limits (soft, hard)
483
origin (str): Message origin
484
"""
485
```
486
487
## Usage Examples
488
489
### Basic Application Setup
490
491
```python
492
from celery import Celery
493
494
# Create application with Redis broker and backend
495
app = Celery(
496
'myapp',
497
broker='redis://localhost:6379/0',
498
backend='redis://localhost:6379/1'
499
)
500
501
# Configure from object
502
app.config_from_object({
503
'task_serializer': 'json',
504
'accept_content': ['json'],
505
'result_serializer': 'json',
506
'timezone': 'UTC',
507
'enable_utc': True,
508
})
509
510
# Auto-discover tasks
511
app.autodiscover_tasks(['myapp.tasks', 'myapp.utils'])
512
```
513
514
### Task Creation Patterns
515
516
```python
517
# Basic task
518
@app.task
519
def add(x, y):
520
return x + y
521
522
# Bound task with retry logic
523
@app.task(bind=True, max_retries=3)
524
def process_data(self, data_id):
525
try:
526
# Process data
527
return process(data_id)
528
except Exception as exc:
529
# Retry with exponential backoff
530
self.retry(countdown=2 ** self.request.retries, exc=exc)
531
532
# Shared task for libraries
533
@shared_task
534
def send_email(recipient, subject, body):
535
# Email sending logic
536
pass
537
538
# Custom task class
539
class DatabaseTask(app.Task):
540
def on_failure(self, exc, task_id, args, kwargs, einfo):
541
# Custom failure handling
542
logger.error(f"Task {task_id} failed: {exc}")
543
544
@app.task(base=DatabaseTask)
545
def update_user(user_id, data):
546
# Database operation
547
pass
548
```
549
550
### Task Execution
551
552
```python
553
# Synchronous execution
554
result = add.apply(args=(4, 4))
555
print(result) # 8
556
557
# Asynchronous execution
558
result = add.delay(4, 4)
559
print(result.get()) # Wait for result: 8
560
561
# Advanced async execution
562
result = add.apply_async(
563
args=(4, 4),
564
countdown=10, # Execute in 10 seconds
565
expires=60, # Expire after 60 seconds
566
retry=True,
567
retry_policy={
568
'max_retries': 3,
569
'interval_start': 0,
570
'interval_step': 0.2,
571
'interval_max': 0.2,
572
}
573
)
574
575
# Send task by name
576
result = app.send_task('myapp.tasks.add', args=(4, 4))
577
```