0
# Configuration
1
2
Celery configuration system for brokers, backends, serialization, routing, and runtime behavior. Configuration provides comprehensive control over distributed task execution, message handling, result storage, and worker behavior across different environments.
3
4
## Capabilities
5
6
### Configuration Management
7
8
Core configuration interface and methods for loading and managing Celery application settings.
9
10
```python { .api }
11
class Celery:
12
def config_from_object(self, obj, silent=False, force=False, namespace=None):
13
"""
14
Load configuration from object, module, or class.
15
16
Args:
17
obj: Configuration object, module name, or class
18
silent (bool): Don't raise exceptions on import errors
19
force (bool): Force update even if app is finalized
20
namespace (str): Only load keys with this prefix
21
"""
22
23
def config_from_envvar(self, variable_name, silent=False, force=False):
24
"""
25
Load configuration from environment variable pointing to config file.
26
27
Args:
28
variable_name (str): Environment variable name
29
silent (bool): Don't raise if variable doesn't exist
30
force (bool): Force update even if finalized
31
"""
32
33
@property
34
def conf(self):
35
"""
36
Configuration namespace for accessing and setting options.
37
38
Returns:
39
Configuration object with attribute access
40
"""
41
```
42
43
### Broker Configuration
44
45
Message broker settings for connecting to RabbitMQ, Redis, and other message transport systems.
46
47
```python { .api }
48
# Broker connection settings
49
broker_url = 'redis://localhost:6379/0' # Primary broker URL
50
broker_read_url = 'redis://replica:6379/0' # Read-only broker URL
51
broker_write_url = 'redis://primary:6379/0' # Write-only broker URL
52
53
# Connection options
54
broker_connection_timeout = 4.0 # Connection timeout in seconds
55
broker_connection_retry = True # Retry connections on failure
56
broker_connection_max_retries = 100 # Maximum connection retries
57
broker_failover_strategy = 'round-robin' # Failover strategy
58
broker_heartbeat = 120 # Heartbeat interval
59
broker_pool_limit = 10 # Connection pool size
60
61
# Transport options
62
broker_transport_options = {
63
'region': 'us-east-1', # AWS SQS region
64
'predefined_queues': { # SQS queue definitions
65
'my-queue': {
66
'url': 'https://sqs.us-east-1.amazonaws.com/123/my-queue'
67
}
68
}
69
}
70
71
# Login credentials
72
broker_use_ssl = {
73
'keyfile': '/var/ssl/private/client-key.pem',
74
'certfile': '/var/ssl/certs/client-cert.pem',
75
'ca_certs': '/var/ssl/certs/ca-cert.pem',
76
'cert_reqs': 'ssl.CERT_REQUIRED'
77
}
78
```
79
80
### Result Backend Configuration
81
82
Result storage settings for persisting task results and metadata across different backend systems.
83
84
```python { .api }
85
# Backend selection
86
result_backend = 'redis://localhost:6379/1' # Redis backend
87
result_backend = 'db+postgresql://user:pass@localhost/celery' # Database backend
88
result_backend = 'cache+memcached://127.0.0.1:11211/' # Memcached backend
89
result_backend = 'mongodb://localhost:27017/celery' # MongoDB backend
90
91
# Result options
92
result_serializer = 'json' # Result serialization format
93
result_compression = 'gzip' # Result compression
94
result_expires = 3600 # Result expiration in seconds
95
result_persistent = True # Persist results to disk
96
result_backend_always_retry = True # Retry backend operations
97
98
# Backend-specific options
99
result_backend_transport_options = {
100
'region': 'us-west-2', # AWS region for S3/DynamoDB
101
'mysql_engine': 'InnoDB' # MySQL engine for database backend
102
}
103
104
# Cache backend options
105
cache_backend_options = {
106
'binary': True, # Use binary protocol
107
'behaviors': {
108
'tcp_nodelay': True,
109
'ketama': True
110
}
111
}
112
```
113
114
### Task Serialization and Content
115
116
Serialization formats and content type handling for secure and efficient message transport.
117
118
```python { .api }
119
# Serialization settings
120
task_serializer = 'json' # Task argument serializer
121
result_serializer = 'json' # Result serializer
122
accept_content = ['json'] # Accepted content types
123
result_accept_content = ['json'] # Accepted result content
124
125
# Available serializers: 'json', 'pickle', 'yaml', 'msgpack'
126
# Security note: avoid 'pickle' in production due to security risks
127
128
# Compression options
129
task_compression = 'gzip' # Task compression method
130
result_compression = 'gzip' # Result compression method
131
132
# Message properties
133
task_message_max_retries = 3 # Max message retries
134
task_reject_on_worker_lost = None # Reject tasks when worker lost
135
```
136
137
### Task Routing and Queues
138
139
Task routing configuration for distributing work across different queues and workers based on task type, priority, or custom logic.
140
141
```python { .api }
142
# Default queue settings
143
task_default_queue = 'celery' # Default queue name
144
task_default_exchange = 'celery' # Default exchange name
145
task_default_exchange_type = 'direct' # Exchange type
146
task_default_routing_key = 'celery' # Default routing key
147
148
# Queue definitions
149
task_routes = {
150
'myapp.tasks.heavy_task': {'queue': 'heavy'},
151
'myapp.tasks.priority_task': {'queue': 'priority', 'routing_key': 'priority'},
152
'myapp.tasks.*': {'queue': 'default'}
153
}
154
155
# Advanced routing with functions
156
def route_task(name, args, kwargs, options, task=None, **kwds):
157
"""
158
Custom task routing function.
159
160
Args:
161
name (str): Task name
162
args (tuple): Task arguments
163
kwargs (dict): Task keyword arguments
164
options (dict): Task options
165
task: Task class
166
167
Returns:
168
dict: Routing information {'queue': 'name', 'routing_key': 'key'}
169
"""
170
if 'priority' in kwargs and kwargs['priority'] == 'high':
171
return {'queue': 'priority'}
172
return {'queue': 'default'}
173
174
task_routes = [route_task]
175
176
# Queue annotations for per-queue settings
177
task_annotations = {
178
'tasks.add': {'rate_limit': '10/s'},
179
'*': {'rate_limit': '10/s'}
180
}
181
```
182
183
### Worker Configuration
184
185
Worker process behavior, concurrency settings, and performance tuning options.
186
187
```python { .api }
188
# Concurrency and prefetch
189
worker_concurrency = 4 # Number of worker processes/threads
190
worker_prefetch_multiplier = 1 # Tasks to prefetch per worker
191
worker_max_tasks_per_child = 1000 # Restart workers after N tasks
192
worker_max_memory_per_child = 12000 # Restart workers after N MB
193
194
# Task execution
195
task_acks_late = False # Acknowledge tasks after completion
196
task_reject_on_worker_lost = None # Reject tasks when worker dies
197
worker_disable_rate_limits = False # Disable rate limiting
198
task_ignore_result = False # Don't store task results
199
200
# Time limits
201
task_time_limit = 30 * 60 # Hard time limit (30 minutes)
202
task_soft_time_limit = 25 * 60 # Soft time limit (25 minutes)
203
worker_send_task_events = False # Send task events for monitoring
204
205
# Pool settings
206
worker_pool = 'prefork' # Pool implementation
207
worker_pool_restarts = True # Allow pool restarts
208
```
209
210
### Security Configuration
211
212
Security settings for message signing, SSL/TLS, and authentication mechanisms.
213
214
```python { .api }
215
# Message security
216
task_always_eager = False # Execute tasks synchronously (dev only)
217
task_eager_propagates = True # Propagate exceptions in eager mode
218
task_store_eager_result = True # Store results in eager mode
219
220
# SSL/TLS configuration
221
broker_use_ssl = {
222
'keyfile': '/path/to/key.pem',
223
'certfile': '/path/to/cert.pem',
224
'ca_certs': '/path/to/ca.pem',
225
'cert_reqs': 'ssl.CERT_REQUIRED'
226
}
227
228
# Authentication
229
security_key = 'secret-key' # Message signing key
230
security_certificate = '/path/to/cert.pem' # Security certificate
231
security_digest = 'sha256' # Digest algorithm
232
```
233
234
### Monitoring and Events
235
236
Configuration for task monitoring, event collection, and performance tracking.
237
238
```python { .api }
239
# Event monitoring
240
worker_send_task_events = True # Enable task events
241
task_send_sent_event = True # Send task-sent events
242
event_queue_expires = 60.0 # Event queue expiration
243
event_queue_ttl = 5.0 # Event queue TTL
244
245
# Monitoring options
246
worker_enable_remote_control = True # Enable remote control commands
247
task_track_started = False # Track when tasks start
248
task_publish_retry = True # Retry failed publishes
249
task_publish_retry_policy = {
250
'max_retries': 3,
251
'interval_start': 0,
252
'interval_step': 0.2,
253
'interval_max': 0.2,
254
}
255
```
256
257
### Beat Scheduler Configuration
258
259
Periodic task scheduler settings and schedule definitions for recurring jobs.
260
261
```python { .api }
262
# Beat scheduler settings
263
beat_schedule = {
264
'add-every-30-seconds': {
265
'task': 'tasks.add',
266
'schedule': 30.0,
267
'args': (16, 16)
268
},
269
'daily-report': {
270
'task': 'tasks.generate_report',
271
'schedule': crontab(hour=4, minute=0),
272
'kwargs': {'report_type': 'daily'}
273
}
274
}
275
276
beat_scheduler = 'celery.beat:PersistentScheduler' # Scheduler class
277
beat_schedule_filename = 'celerybeat-schedule' # Schedule persistence file
278
beat_max_loop_interval = 5 # Max seconds between schedule checks
279
```
280
281
### Timezone and Localization
282
283
Timezone handling and datetime configuration for consistent time handling across distributed systems.
284
285
```python { .api }
286
# Timezone settings
287
timezone = 'UTC' # Default timezone
288
enable_utc = True # Use UTC internally
289
result_expires = 3600 # Result expiration (seconds)
290
291
# Datetime handling
292
task_always_eager = False # Execute immediately (testing)
293
task_eager_propagates = True # Propagate exceptions when eager
294
```
295
296
### Database Backend Configuration
297
298
Settings specific to database result backends including connection pooling and table configuration.
299
300
```python { .api }
301
# Database backend options
302
database_url = 'postgresql://user:pass@localhost/celery'
303
database_engine_options = {
304
'pool_size': 20,
305
'max_overflow': 0,
306
'pool_pre_ping': True,
307
'pool_recycle': -1,
308
'echo': False
309
}
310
311
# Table configuration
312
database_table_schemas = {
313
'task': 'celery_tasks',
314
'group': 'celery_groups'
315
}
316
database_table_names = {
317
'task': 'celery_taskmeta',
318
'group': 'celery_groupmeta'
319
}
320
```
321
322
## Usage Examples
323
324
### Basic Configuration Setup
325
326
```python
327
from celery import Celery
328
329
# Create app with basic configuration
330
app = Celery('myapp')
331
332
# Configure via dictionary
333
app.conf.update(
334
broker_url='redis://localhost:6379/0',
335
result_backend='redis://localhost:6379/1',
336
task_serializer='json',
337
accept_content=['json'],
338
result_serializer='json',
339
timezone='UTC',
340
enable_utc=True,
341
)
342
343
# Alternative: configure individual settings
344
app.conf.broker_url = 'redis://localhost:6379/0'
345
app.conf.result_backend = 'redis://localhost:6379/1'
346
app.conf.task_serializer = 'json'
347
```
348
349
### Configuration from Module
350
351
```python
352
# celeryconfig.py
353
broker_url = 'redis://localhost:6379/0'
354
result_backend = 'redis://localhost:6379/1'
355
356
task_serializer = 'json'
357
result_serializer = 'json'
358
accept_content = ['json']
359
timezone = 'UTC'
360
enable_utc = True
361
362
# Task routing
363
task_routes = {
364
'myapp.tasks.heavy_computation': {'queue': 'heavy'},
365
'myapp.tasks.send_email': {'queue': 'email'},
366
}
367
368
# Worker configuration
369
worker_concurrency = 4
370
worker_prefetch_multiplier = 1
371
task_acks_late = True
372
373
# Load configuration in app
374
app = Celery('myapp')
375
app.config_from_object('celeryconfig')
376
```
377
378
### Environment-Based Configuration
379
380
```python
381
import os
382
383
# Production configuration
384
class ProductionConfig:
385
broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://redis:6379/0')
386
result_backend = os.environ.get('CELERY_RESULT_BACKEND', 'redis://redis:6379/1')
387
388
# Security settings
389
task_serializer = 'json'
390
accept_content = ['json']
391
result_serializer = 'json'
392
393
# Performance settings
394
worker_concurrency = int(os.environ.get('WORKER_CONCURRENCY', '4'))
395
worker_prefetch_multiplier = 1
396
task_acks_late = True
397
398
# SSL configuration for production
399
broker_use_ssl = {
400
'keyfile': '/etc/ssl/private/worker-key.pem',
401
'certfile': '/etc/ssl/certs/worker-cert.pem',
402
'ca_certs': '/etc/ssl/certs/ca-bundle.pem',
403
'cert_reqs': 'ssl.CERT_REQUIRED'
404
}
405
406
# Development configuration
407
class DevelopmentConfig:
408
broker_url = 'redis://localhost:6379/0'
409
result_backend = 'redis://localhost:6379/1'
410
411
# Development-friendly settings
412
task_always_eager = True # Execute synchronously
413
task_eager_propagates = True
414
task_store_eager_result = True
415
416
# Load configuration based on environment
417
config_class = ProductionConfig if os.environ.get('FLASK_ENV') == 'production' else DevelopmentConfig
418
app.config_from_object(config_class)
419
```
420
421
### Advanced Routing Configuration
422
423
```python
424
from kombu import Queue, Exchange
425
426
# Define exchanges and queues
427
app.conf.task_routes = {
428
# Route by task name patterns
429
'myapp.tasks.email.*': {'queue': 'email'},
430
'myapp.tasks.report.*': {'queue': 'reports'},
431
'myapp.tasks.urgent.*': {'queue': 'priority'},
432
}
433
434
# Advanced routing with custom function
435
def route_task(name, args, kwargs, options, task=None, **kwds):
436
"""Custom routing based on task parameters."""
437
438
# Route by priority argument
439
if kwargs.get('priority') == 'high':
440
return {'queue': 'priority', 'routing_key': 'priority.high'}
441
442
# Route heavy tasks to dedicated workers
443
if 'heavy' in name or kwargs.get('estimated_time', 0) > 300:
444
return {'queue': 'heavy', 'routing_key': 'heavy.long'}
445
446
# Route by user type
447
user_id = kwargs.get('user_id')
448
if user_id and is_premium_user(user_id):
449
return {'queue': 'premium', 'routing_key': 'premium.user'}
450
451
# Default routing
452
return {'queue': 'default'}
453
454
app.conf.task_routes = [route_task]
455
456
# Queue definitions with specific settings
457
app.conf.task_routes = {
458
'myapp.tasks.process_image': {
459
'queue': 'media',
460
'routing_key': 'media.image',
461
'priority': 5
462
}
463
}
464
465
def is_premium_user(user_id):
466
# Check if user is premium
467
return user_id % 2 == 0 # Example logic
468
```
469
470
### Multi-Backend Configuration
471
472
```python
473
# Different backends for different purposes
474
app.conf.update(
475
# Main result backend
476
result_backend='redis://localhost:6379/1',
477
478
# Cache backend for temporary data
479
cache_backend='memcached://127.0.0.1:11211/',
480
481
# Database backend for persistent results
482
database_url='postgresql://user:pass@localhost/celery_results',
483
)
484
485
# Backend-specific options
486
app.conf.result_backend_transport_options = {
487
'retry_on_timeout': True,
488
'socket_keepalive': True,
489
'socket_keepalive_options': {
490
'TCP_KEEPINTVL': 1,
491
'TCP_KEEPCNT': 3,
492
'TCP_KEEPIDLE': 1,
493
}
494
}
495
496
# Database backend specific settings
497
app.conf.database_engine_options = {
498
'pool_size': 10,
499
'max_overflow': 20,
500
'pool_pre_ping': True,
501
'pool_recycle': 300
502
}
503
```
504
505
### Security Configuration
506
507
```python
508
# Message signing for secure transport
509
app.conf.update(
510
# Enable message signing
511
task_serializer='auth',
512
result_serializer='json',
513
accept_content=['auth', 'json'],
514
515
# Security keys
516
security_key='your-secret-key-here',
517
security_certificate='/path/to/certificate.pem',
518
security_digest='sha256',
519
)
520
521
# SSL/TLS configuration for broker
522
app.conf.broker_use_ssl = {
523
'keyfile': '/etc/ssl/private/client-key.pem',
524
'certfile': '/etc/ssl/certs/client-cert.pem',
525
'ca_certs': '/etc/ssl/certs/ca-cert.pem',
526
'cert_reqs': 'ssl.CERT_REQUIRED',
527
'ssl_version': 'ssl.PROTOCOL_TLSv1_2',
528
'ciphers': 'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS'
529
}
530
531
# Result backend SSL
532
app.conf.redis_backend_use_ssl = {
533
'ssl_cert_reqs': 'ssl.CERT_REQUIRED',
534
'ssl_ca_certs': '/etc/ssl/certs/ca-cert.pem',
535
'ssl_certfile': '/etc/ssl/certs/client-cert.pem',
536
'ssl_keyfile': '/etc/ssl/private/client-key.pem',
537
}
538
```
539
540
### Performance Tuning Configuration
541
542
```python
543
# High-performance configuration
544
app.conf.update(
545
# Worker performance
546
worker_concurrency=8, # Match CPU cores
547
worker_prefetch_multiplier=1, # Prevent memory issues
548
worker_max_tasks_per_child=1000, # Prevent memory leaks
549
worker_max_memory_per_child=200000, # 200MB limit
550
551
# Task execution
552
task_acks_late=True, # Acknowledge after completion
553
task_reject_on_worker_lost=True, # Requeue on worker death
554
worker_disable_rate_limits=False, # Keep rate limits
555
556
# Connection pooling
557
broker_pool_limit=10, # Connection pool size
558
broker_connection_retry=True,
559
broker_connection_max_retries=100,
560
561
# Result backend optimization
562
result_expires=3600, # 1 hour expiration
563
result_compression='gzip', # Compress results
564
result_serializer='json', # Fast serialization
565
566
# Task routing optimization
567
task_default_delivery_mode=2, # Persistent messages
568
task_compression='gzip', # Compress task data
569
570
# Monitoring (disable in production for performance)
571
worker_send_task_events=False,
572
task_send_sent_event=False,
573
)
574
```
575
576
### Beat Schedule Configuration
577
578
```python
579
from celery.schedules import crontab, schedule
580
from datetime import timedelta
581
582
# Comprehensive beat schedule
583
app.conf.beat_schedule = {
584
# Simple interval task
585
'heartbeat': {
586
'task': 'myapp.tasks.heartbeat',
587
'schedule': 30.0, # Every 30 seconds
588
},
589
590
# Cron-style scheduling
591
'daily-report': {
592
'task': 'myapp.tasks.generate_daily_report',
593
'schedule': crontab(hour=6, minute=0), # 6:00 AM daily
594
'args': ('daily',),
595
'kwargs': {'email_recipients': ['admin@example.com']}
596
},
597
598
# Complex cron schedule
599
'business-hours-check': {
600
'task': 'myapp.tasks.health_check',
601
'schedule': crontab(minute='*/15', hour='9-17', day_of_week='1-5'),
602
'options': {'expires': 60} # Expire after 1 minute
603
},
604
605
# Timedelta schedule
606
'cleanup-temp': {
607
'task': 'myapp.tasks.cleanup_temp_files',
608
'schedule': timedelta(hours=1),
609
'options': {
610
'queue': 'maintenance',
611
'routing_key': 'maintenance.cleanup'
612
}
613
},
614
615
# Dynamic schedule based on configuration
616
'dynamic-task': {
617
'task': 'myapp.tasks.dynamic_processor',
618
'schedule': float(os.environ.get('DYNAMIC_INTERVAL', '300')), # 5 minutes default
619
'kwargs': {
620
'batch_size': int(os.environ.get('BATCH_SIZE', '100'))
621
}
622
}
623
}
624
625
# Beat scheduler settings
626
app.conf.update(
627
beat_scheduler='celery.beat:PersistentScheduler',
628
beat_schedule_filename='celerybeat-schedule.db',
629
beat_max_loop_interval=30, # Check schedule every 30 seconds max
630
)
631
```
632
633
### Configuration Validation
634
635
```python
636
def validate_config(app):
637
"""Validate Celery configuration for common issues."""
638
639
conf = app.conf
640
issues = []
641
642
# Check broker URL
643
if not conf.broker_url:
644
issues.append("broker_url is not configured")
645
646
# Check serialization security
647
if 'pickle' in conf.accept_content:
648
issues.append("pickle serialization is insecure, use json instead")
649
650
# Check for production readiness
651
if conf.task_always_eager and os.environ.get('ENVIRONMENT') == 'production':
652
issues.append("task_always_eager should not be True in production")
653
654
# Check worker settings
655
if conf.worker_prefetch_multiplier > 4:
656
issues.append("worker_prefetch_multiplier > 4 may cause memory issues")
657
658
# Check SSL in production
659
if (os.environ.get('ENVIRONMENT') == 'production' and
660
not conf.broker_use_ssl and
661
'rediss://' not in conf.broker_url):
662
issues.append("SSL should be enabled in production")
663
664
if issues:
665
print("Configuration Issues:")
666
for issue in issues:
667
print(f" - {issue}")
668
return False
669
670
print("Configuration validation passed")
671
return True
672
673
# Validate after configuration
674
validate_config(app)
675
```