Distributed Task Queue for Python that enables asynchronous task execution across multiple workers and machines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Celery configuration system for brokers, backends, serialization, routing, and runtime behavior. Configuration provides comprehensive control over distributed task execution, message handling, result storage, and worker behavior across different environments.
Core configuration interface and methods for loading and managing Celery application settings.
class Celery:
def config_from_object(self, obj, silent=False, force=False, namespace=None):
"""
Load configuration from object, module, or class.
Args:
obj: Configuration object, module name, or class
silent (bool): Don't raise exceptions on import errors
force (bool): Force update even if app is finalized
namespace (str): Only load keys with this prefix
"""
def config_from_envvar(self, variable_name, silent=False, force=False):
"""
Load configuration from environment variable pointing to config file.
Args:
variable_name (str): Environment variable name
silent (bool): Don't raise if variable doesn't exist
force (bool): Force update even if finalized
"""
@property
def conf(self):
"""
Configuration namespace for accessing and setting options.
Returns:
Configuration object with attribute access
"""Message broker settings for connecting to RabbitMQ, Redis, and other message transport systems.
# Broker connection settings
broker_url = 'redis://localhost:6379/0' # Primary broker URL
broker_read_url = 'redis://replica:6379/0' # Read-only broker URL
broker_write_url = 'redis://primary:6379/0' # Write-only broker URL
# Connection options
broker_connection_timeout = 4.0 # Connection timeout in seconds
broker_connection_retry = True # Retry connections on failure
broker_connection_max_retries = 100 # Maximum connection retries
broker_failover_strategy = 'round-robin' # Failover strategy
broker_heartbeat = 120 # Heartbeat interval
broker_pool_limit = 10 # Connection pool size
# Transport options
broker_transport_options = {
'region': 'us-east-1', # AWS SQS region
'predefined_queues': { # SQS queue definitions
'my-queue': {
'url': 'https://sqs.us-east-1.amazonaws.com/123/my-queue'
}
}
}
# Login credentials
broker_use_ssl = {
'keyfile': '/var/ssl/private/client-key.pem',
'certfile': '/var/ssl/certs/client-cert.pem',
'ca_certs': '/var/ssl/certs/ca-cert.pem',
'cert_reqs': 'ssl.CERT_REQUIRED'
}Result storage settings for persisting task results and metadata across different backend systems.
# Backend selection
result_backend = 'redis://localhost:6379/1' # Redis backend
result_backend = 'db+postgresql://user:pass@localhost/celery' # Database backend
result_backend = 'cache+memcached://127.0.0.1:11211/' # Memcached backend
result_backend = 'mongodb://localhost:27017/celery' # MongoDB backend
# Result options
result_serializer = 'json' # Result serialization format
result_compression = 'gzip' # Result compression
result_expires = 3600 # Result expiration in seconds
result_persistent = True # Persist results to disk
result_backend_always_retry = True # Retry backend operations
# Backend-specific options
result_backend_transport_options = {
'region': 'us-west-2', # AWS region for S3/DynamoDB
'mysql_engine': 'InnoDB' # MySQL engine for database backend
}
# Cache backend options
cache_backend_options = {
'binary': True, # Use binary protocol
'behaviors': {
'tcp_nodelay': True,
'ketama': True
}
}Serialization formats and content type handling for secure and efficient message transport.
# Serialization settings
task_serializer = 'json' # Task argument serializer
result_serializer = 'json' # Result serializer
accept_content = ['json'] # Accepted content types
result_accept_content = ['json'] # Accepted result content
# Available serializers: 'json', 'pickle', 'yaml', 'msgpack'
# Security note: avoid 'pickle' in production due to security risks
# Compression options
task_compression = 'gzip' # Task compression method
result_compression = 'gzip' # Result compression method
# Message properties
task_message_max_retries = 3 # Max message retries
task_reject_on_worker_lost = None # Reject tasks when worker lostTask routing configuration for distributing work across different queues and workers based on task type, priority, or custom logic.
# Default queue settings
task_default_queue = 'celery' # Default queue name
task_default_exchange = 'celery' # Default exchange name
task_default_exchange_type = 'direct' # Exchange type
task_default_routing_key = 'celery' # Default routing key
# Queue definitions
task_routes = {
'myapp.tasks.heavy_task': {'queue': 'heavy'},
'myapp.tasks.priority_task': {'queue': 'priority', 'routing_key': 'priority'},
'myapp.tasks.*': {'queue': 'default'}
}
# Advanced routing with functions
def route_task(name, args, kwargs, options, task=None, **kwds):
"""
Custom task routing function.
Args:
name (str): Task name
args (tuple): Task arguments
kwargs (dict): Task keyword arguments
options (dict): Task options
task: Task class
Returns:
dict: Routing information {'queue': 'name', 'routing_key': 'key'}
"""
if 'priority' in kwargs and kwargs['priority'] == 'high':
return {'queue': 'priority'}
return {'queue': 'default'}
task_routes = [route_task]
# Queue annotations for per-queue settings
task_annotations = {
'tasks.add': {'rate_limit': '10/s'},
'*': {'rate_limit': '10/s'}
}Worker process behavior, concurrency settings, and performance tuning options.
# Concurrency and prefetch
worker_concurrency = 4 # Number of worker processes/threads
worker_prefetch_multiplier = 1 # Tasks to prefetch per worker
worker_max_tasks_per_child = 1000 # Restart workers after N tasks
worker_max_memory_per_child = 12000 # Restart workers after N MB
# Task execution
task_acks_late = False # Acknowledge tasks after completion
task_reject_on_worker_lost = None # Reject tasks when worker dies
worker_disable_rate_limits = False # Disable rate limiting
task_ignore_result = False # Don't store task results
# Time limits
task_time_limit = 30 * 60 # Hard time limit (30 minutes)
task_soft_time_limit = 25 * 60 # Soft time limit (25 minutes)
worker_send_task_events = False # Send task events for monitoring
# Pool settings
worker_pool = 'prefork' # Pool implementation
worker_pool_restarts = True # Allow pool restartsSecurity settings for message signing, SSL/TLS, and authentication mechanisms.
# Message security
task_always_eager = False # Execute tasks synchronously (dev only)
task_eager_propagates = True # Propagate exceptions in eager mode
task_store_eager_result = True # Store results in eager mode
# SSL/TLS configuration
broker_use_ssl = {
'keyfile': '/path/to/key.pem',
'certfile': '/path/to/cert.pem',
'ca_certs': '/path/to/ca.pem',
'cert_reqs': 'ssl.CERT_REQUIRED'
}
# Authentication
security_key = 'secret-key' # Message signing key
security_certificate = '/path/to/cert.pem' # Security certificate
security_digest = 'sha256' # Digest algorithmConfiguration for task monitoring, event collection, and performance tracking.
# Event monitoring
worker_send_task_events = True # Enable task events
task_send_sent_event = True # Send task-sent events
event_queue_expires = 60.0 # Event queue expiration
event_queue_ttl = 5.0 # Event queue TTL
# Monitoring options
worker_enable_remote_control = True # Enable remote control commands
task_track_started = False # Track when tasks start
task_publish_retry = True # Retry failed publishes
task_publish_retry_policy = {
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
}Periodic task scheduler settings and schedule definitions for recurring jobs.
# Beat scheduler settings
beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
'daily-report': {
'task': 'tasks.generate_report',
'schedule': crontab(hour=4, minute=0),
'kwargs': {'report_type': 'daily'}
}
}
beat_scheduler = 'celery.beat:PersistentScheduler' # Scheduler class
beat_schedule_filename = 'celerybeat-schedule' # Schedule persistence file
beat_max_loop_interval = 5 # Max seconds between schedule checksTimezone handling and datetime configuration for consistent time handling across distributed systems.
# Timezone settings
timezone = 'UTC' # Default timezone
enable_utc = True # Use UTC internally
result_expires = 3600 # Result expiration (seconds)
# Datetime handling
task_always_eager = False # Execute immediately (testing)
task_eager_propagates = True # Propagate exceptions when eagerSettings specific to database result backends including connection pooling and table configuration.
# Database backend options
database_url = 'postgresql://user:pass@localhost/celery'
database_engine_options = {
'pool_size': 20,
'max_overflow': 0,
'pool_pre_ping': True,
'pool_recycle': -1,
'echo': False
}
# Table configuration
database_table_schemas = {
'task': 'celery_tasks',
'group': 'celery_groups'
}
database_table_names = {
'task': 'celery_taskmeta',
'group': 'celery_groupmeta'
}from celery import Celery
# Create app with basic configuration
app = Celery('myapp')
# Configure via dictionary
app.conf.update(
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/1',
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)
# Alternative: configure individual settings
app.conf.broker_url = 'redis://localhost:6379/0'
app.conf.result_backend = 'redis://localhost:6379/1'
app.conf.task_serializer = 'json'# celeryconfig.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'UTC'
enable_utc = True
# Task routing
task_routes = {
'myapp.tasks.heavy_computation': {'queue': 'heavy'},
'myapp.tasks.send_email': {'queue': 'email'},
}
# Worker configuration
worker_concurrency = 4
worker_prefetch_multiplier = 1
task_acks_late = True
# Load configuration in app
app = Celery('myapp')
app.config_from_object('celeryconfig')import os
# Production configuration
class ProductionConfig:
broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://redis:6379/0')
result_backend = os.environ.get('CELERY_RESULT_BACKEND', 'redis://redis:6379/1')
# Security settings
task_serializer = 'json'
accept_content = ['json']
result_serializer = 'json'
# Performance settings
worker_concurrency = int(os.environ.get('WORKER_CONCURRENCY', '4'))
worker_prefetch_multiplier = 1
task_acks_late = True
# SSL configuration for production
broker_use_ssl = {
'keyfile': '/etc/ssl/private/worker-key.pem',
'certfile': '/etc/ssl/certs/worker-cert.pem',
'ca_certs': '/etc/ssl/certs/ca-bundle.pem',
'cert_reqs': 'ssl.CERT_REQUIRED'
}
# Development configuration
class DevelopmentConfig:
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
# Development-friendly settings
task_always_eager = True # Execute synchronously
task_eager_propagates = True
task_store_eager_result = True
# Load configuration based on environment
config_class = ProductionConfig if os.environ.get('FLASK_ENV') == 'production' else DevelopmentConfig
app.config_from_object(config_class)from kombu import Queue, Exchange
# Define exchanges and queues
app.conf.task_routes = {
# Route by task name patterns
'myapp.tasks.email.*': {'queue': 'email'},
'myapp.tasks.report.*': {'queue': 'reports'},
'myapp.tasks.urgent.*': {'queue': 'priority'},
}
# Advanced routing with custom function
def route_task(name, args, kwargs, options, task=None, **kwds):
"""Custom routing based on task parameters."""
# Route by priority argument
if kwargs.get('priority') == 'high':
return {'queue': 'priority', 'routing_key': 'priority.high'}
# Route heavy tasks to dedicated workers
if 'heavy' in name or kwargs.get('estimated_time', 0) > 300:
return {'queue': 'heavy', 'routing_key': 'heavy.long'}
# Route by user type
user_id = kwargs.get('user_id')
if user_id and is_premium_user(user_id):
return {'queue': 'premium', 'routing_key': 'premium.user'}
# Default routing
return {'queue': 'default'}
app.conf.task_routes = [route_task]
# Queue definitions with specific settings
app.conf.task_routes = {
'myapp.tasks.process_image': {
'queue': 'media',
'routing_key': 'media.image',
'priority': 5
}
}
def is_premium_user(user_id):
# Check if user is premium
return user_id % 2 == 0 # Example logic# Different backends for different purposes
app.conf.update(
# Main result backend
result_backend='redis://localhost:6379/1',
# Cache backend for temporary data
cache_backend='memcached://127.0.0.1:11211/',
# Database backend for persistent results
database_url='postgresql://user:pass@localhost/celery_results',
)
# Backend-specific options
app.conf.result_backend_transport_options = {
'retry_on_timeout': True,
'socket_keepalive': True,
'socket_keepalive_options': {
'TCP_KEEPINTVL': 1,
'TCP_KEEPCNT': 3,
'TCP_KEEPIDLE': 1,
}
}
# Database backend specific settings
app.conf.database_engine_options = {
'pool_size': 10,
'max_overflow': 20,
'pool_pre_ping': True,
'pool_recycle': 300
}# Message signing for secure transport
app.conf.update(
# Enable message signing
task_serializer='auth',
result_serializer='json',
accept_content=['auth', 'json'],
# Security keys
security_key='your-secret-key-here',
security_certificate='/path/to/certificate.pem',
security_digest='sha256',
)
# SSL/TLS configuration for broker
app.conf.broker_use_ssl = {
'keyfile': '/etc/ssl/private/client-key.pem',
'certfile': '/etc/ssl/certs/client-cert.pem',
'ca_certs': '/etc/ssl/certs/ca-cert.pem',
'cert_reqs': 'ssl.CERT_REQUIRED',
'ssl_version': 'ssl.PROTOCOL_TLSv1_2',
'ciphers': 'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS'
}
# Result backend SSL
app.conf.redis_backend_use_ssl = {
'ssl_cert_reqs': 'ssl.CERT_REQUIRED',
'ssl_ca_certs': '/etc/ssl/certs/ca-cert.pem',
'ssl_certfile': '/etc/ssl/certs/client-cert.pem',
'ssl_keyfile': '/etc/ssl/private/client-key.pem',
}# High-performance configuration
app.conf.update(
# Worker performance
worker_concurrency=8, # Match CPU cores
worker_prefetch_multiplier=1, # Prevent memory issues
worker_max_tasks_per_child=1000, # Prevent memory leaks
worker_max_memory_per_child=200000, # 200MB limit
# Task execution
task_acks_late=True, # Acknowledge after completion
task_reject_on_worker_lost=True, # Requeue on worker death
worker_disable_rate_limits=False, # Keep rate limits
# Connection pooling
broker_pool_limit=10, # Connection pool size
broker_connection_retry=True,
broker_connection_max_retries=100,
# Result backend optimization
result_expires=3600, # 1 hour expiration
result_compression='gzip', # Compress results
result_serializer='json', # Fast serialization
# Task routing optimization
task_default_delivery_mode=2, # Persistent messages
task_compression='gzip', # Compress task data
# Monitoring (disable in production for performance)
worker_send_task_events=False,
task_send_sent_event=False,
)from celery.schedules import crontab, schedule
from datetime import timedelta
# Comprehensive beat schedule
app.conf.beat_schedule = {
# Simple interval task
'heartbeat': {
'task': 'myapp.tasks.heartbeat',
'schedule': 30.0, # Every 30 seconds
},
# Cron-style scheduling
'daily-report': {
'task': 'myapp.tasks.generate_daily_report',
'schedule': crontab(hour=6, minute=0), # 6:00 AM daily
'args': ('daily',),
'kwargs': {'email_recipients': ['admin@example.com']}
},
# Complex cron schedule
'business-hours-check': {
'task': 'myapp.tasks.health_check',
'schedule': crontab(minute='*/15', hour='9-17', day_of_week='1-5'),
'options': {'expires': 60} # Expire after 1 minute
},
# Timedelta schedule
'cleanup-temp': {
'task': 'myapp.tasks.cleanup_temp_files',
'schedule': timedelta(hours=1),
'options': {
'queue': 'maintenance',
'routing_key': 'maintenance.cleanup'
}
},
# Dynamic schedule based on configuration
'dynamic-task': {
'task': 'myapp.tasks.dynamic_processor',
'schedule': float(os.environ.get('DYNAMIC_INTERVAL', '300')), # 5 minutes default
'kwargs': {
'batch_size': int(os.environ.get('BATCH_SIZE', '100'))
}
}
}
# Beat scheduler settings
app.conf.update(
beat_scheduler='celery.beat:PersistentScheduler',
beat_schedule_filename='celerybeat-schedule.db',
beat_max_loop_interval=30, # Check schedule every 30 seconds max
)def validate_config(app):
"""Validate Celery configuration for common issues."""
conf = app.conf
issues = []
# Check broker URL
if not conf.broker_url:
issues.append("broker_url is not configured")
# Check serialization security
if 'pickle' in conf.accept_content:
issues.append("pickle serialization is insecure, use json instead")
# Check for production readiness
if conf.task_always_eager and os.environ.get('ENVIRONMENT') == 'production':
issues.append("task_always_eager should not be True in production")
# Check worker settings
if conf.worker_prefetch_multiplier > 4:
issues.append("worker_prefetch_multiplier > 4 may cause memory issues")
# Check SSL in production
if (os.environ.get('ENVIRONMENT') == 'production' and
not conf.broker_use_ssl and
'rediss://' not in conf.broker_url):
issues.append("SSL should be enabled in production")
if issues:
print("Configuration Issues:")
for issue in issues:
print(f" - {issue}")
return False
print("Configuration validation passed")
return True
# Validate after configuration
validate_config(app)Install with Tessl CLI
npx tessl i tessl/pypi-celery