or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-application.mdexceptions.mdindex.mdresults-state.mdscheduling-beat.mdsignals-events.mdworkflow-primitives.md

configuration.mddocs/

0

# Configuration

1

2

Celery configuration system for brokers, backends, serialization, routing, and runtime behavior. Configuration provides comprehensive control over distributed task execution, message handling, result storage, and worker behavior across different environments.

3

4

## Capabilities

5

6

### Configuration Management

7

8

Core configuration interface and methods for loading and managing Celery application settings.

9

10

```python { .api }

11

class Celery:

12

def config_from_object(self, obj, silent=False, force=False, namespace=None):

13

"""

14

Load configuration from object, module, or class.

15

16

Args:

17

obj: Configuration object, module name, or class

18

silent (bool): Don't raise exceptions on import errors

19

force (bool): Force update even if app is finalized

20

namespace (str): Only load keys with this prefix

21

"""

22

23

def config_from_envvar(self, variable_name, silent=False, force=False):

24

"""

25

Load configuration from environment variable pointing to config file.

26

27

Args:

28

variable_name (str): Environment variable name

29

silent (bool): Don't raise if variable doesn't exist

30

force (bool): Force update even if finalized

31

"""

32

33

@property

34

def conf(self):

35

"""

36

Configuration namespace for accessing and setting options.

37

38

Returns:

39

Configuration object with attribute access

40

"""

41

```

42

43

### Broker Configuration

44

45

Message broker settings for connecting to RabbitMQ, Redis, and other message transport systems.

46

47

```python { .api }

48

# Broker connection settings

49

broker_url = 'redis://localhost:6379/0' # Primary broker URL

50

broker_read_url = 'redis://replica:6379/0' # Read-only broker URL

51

broker_write_url = 'redis://primary:6379/0' # Write-only broker URL

52

53

# Connection options

54

broker_connection_timeout = 4.0 # Connection timeout in seconds

55

broker_connection_retry = True # Retry connections on failure

56

broker_connection_max_retries = 100 # Maximum connection retries

57

broker_failover_strategy = 'round-robin' # Failover strategy

58

broker_heartbeat = 120 # Heartbeat interval

59

broker_pool_limit = 10 # Connection pool size

60

61

# Transport options

62

broker_transport_options = {

63

'region': 'us-east-1', # AWS SQS region

64

'predefined_queues': { # SQS queue definitions

65

'my-queue': {

66

'url': 'https://sqs.us-east-1.amazonaws.com/123/my-queue'

67

}

68

}

69

}

70

71

# Login credentials

72

broker_use_ssl = {

73

'keyfile': '/var/ssl/private/client-key.pem',

74

'certfile': '/var/ssl/certs/client-cert.pem',

75

'ca_certs': '/var/ssl/certs/ca-cert.pem',

76

'cert_reqs': 'ssl.CERT_REQUIRED'

77

}

78

```

79

80

### Result Backend Configuration

81

82

Result storage settings for persisting task results and metadata across different backend systems.

83

84

```python { .api }

85

# Backend selection

86

result_backend = 'redis://localhost:6379/1' # Redis backend

87

result_backend = 'db+postgresql://user:pass@localhost/celery' # Database backend

88

result_backend = 'cache+memcached://127.0.0.1:11211/' # Memcached backend

89

result_backend = 'mongodb://localhost:27017/celery' # MongoDB backend

90

91

# Result options

92

result_serializer = 'json' # Result serialization format

93

result_compression = 'gzip' # Result compression

94

result_expires = 3600 # Result expiration in seconds

95

result_persistent = True # Persist results to disk

96

result_backend_always_retry = True # Retry backend operations

97

98

# Backend-specific options

99

result_backend_transport_options = {

100

'region': 'us-west-2', # AWS region for S3/DynamoDB

101

'mysql_engine': 'InnoDB' # MySQL engine for database backend

102

}

103

104

# Cache backend options

105

cache_backend_options = {

106

'binary': True, # Use binary protocol

107

'behaviors': {

108

'tcp_nodelay': True,

109

'ketama': True

110

}

111

}

112

```

113

114

### Task Serialization and Content

115

116

Serialization formats and content type handling for secure and efficient message transport.

117

118

```python { .api }

119

# Serialization settings

120

task_serializer = 'json' # Task argument serializer

121

result_serializer = 'json' # Result serializer

122

accept_content = ['json'] # Accepted content types

123

result_accept_content = ['json'] # Accepted result content

124

125

# Available serializers: 'json', 'pickle', 'yaml', 'msgpack'

126

# Security note: avoid 'pickle' in production due to security risks

127

128

# Compression options

129

task_compression = 'gzip' # Task compression method

130

result_compression = 'gzip' # Result compression method

131

132

# Message properties

133

task_message_max_retries = 3 # Max message retries

134

task_reject_on_worker_lost = None # Reject tasks when worker lost

135

```

136

137

### Task Routing and Queues

138

139

Task routing configuration for distributing work across different queues and workers based on task type, priority, or custom logic.

140

141

```python { .api }

142

# Default queue settings

143

task_default_queue = 'celery' # Default queue name

144

task_default_exchange = 'celery' # Default exchange name

145

task_default_exchange_type = 'direct' # Exchange type

146

task_default_routing_key = 'celery' # Default routing key

147

148

# Queue definitions

149

task_routes = {

150

'myapp.tasks.heavy_task': {'queue': 'heavy'},

151

'myapp.tasks.priority_task': {'queue': 'priority', 'routing_key': 'priority'},

152

'myapp.tasks.*': {'queue': 'default'}

153

}

154

155

# Advanced routing with functions

156

def route_task(name, args, kwargs, options, task=None, **kwds):

157

"""

158

Custom task routing function.

159

160

Args:

161

name (str): Task name

162

args (tuple): Task arguments

163

kwargs (dict): Task keyword arguments

164

options (dict): Task options

165

task: Task class

166

167

Returns:

168

dict: Routing information {'queue': 'name', 'routing_key': 'key'}

169

"""

170

if 'priority' in kwargs and kwargs['priority'] == 'high':

171

return {'queue': 'priority'}

172

return {'queue': 'default'}

173

174

task_routes = [route_task]

175

176

# Queue annotations for per-queue settings

177

task_annotations = {

178

'tasks.add': {'rate_limit': '10/s'},

179

'*': {'rate_limit': '10/s'}

180

}

181

```

182

183

### Worker Configuration

184

185

Worker process behavior, concurrency settings, and performance tuning options.

186

187

```python { .api }

188

# Concurrency and prefetch

189

worker_concurrency = 4 # Number of worker processes/threads

190

worker_prefetch_multiplier = 1 # Tasks to prefetch per worker

191

worker_max_tasks_per_child = 1000 # Restart workers after N tasks

192

worker_max_memory_per_child = 12000 # Restart workers after N MB

193

194

# Task execution

195

task_acks_late = False # Acknowledge tasks after completion

196

task_reject_on_worker_lost = None # Reject tasks when worker dies

197

worker_disable_rate_limits = False # Disable rate limiting

198

task_ignore_result = False # Don't store task results

199

200

# Time limits

201

task_time_limit = 30 * 60 # Hard time limit (30 minutes)

202

task_soft_time_limit = 25 * 60 # Soft time limit (25 minutes)

203

worker_send_task_events = False # Send task events for monitoring

204

205

# Pool settings

206

worker_pool = 'prefork' # Pool implementation

207

worker_pool_restarts = True # Allow pool restarts

208

```

209

210

### Security Configuration

211

212

Security settings for message signing, SSL/TLS, and authentication mechanisms.

213

214

```python { .api }

215

# Message security

216

task_always_eager = False # Execute tasks synchronously (dev only)

217

task_eager_propagates = True # Propagate exceptions in eager mode

218

task_store_eager_result = True # Store results in eager mode

219

220

# SSL/TLS configuration

221

broker_use_ssl = {

222

'keyfile': '/path/to/key.pem',

223

'certfile': '/path/to/cert.pem',

224

'ca_certs': '/path/to/ca.pem',

225

'cert_reqs': 'ssl.CERT_REQUIRED'

226

}

227

228

# Authentication

229

security_key = 'secret-key' # Message signing key

230

security_certificate = '/path/to/cert.pem' # Security certificate

231

security_digest = 'sha256' # Digest algorithm

232

```

233

234

### Monitoring and Events

235

236

Configuration for task monitoring, event collection, and performance tracking.

237

238

```python { .api }

239

# Event monitoring

240

worker_send_task_events = True # Enable task events

241

task_send_sent_event = True # Send task-sent events

242

event_queue_expires = 60.0 # Event queue expiration

243

event_queue_ttl = 5.0 # Event queue TTL

244

245

# Monitoring options

246

worker_enable_remote_control = True # Enable remote control commands

247

task_track_started = False # Track when tasks start

248

task_publish_retry = True # Retry failed publishes

249

task_publish_retry_policy = {

250

'max_retries': 3,

251

'interval_start': 0,

252

'interval_step': 0.2,

253

'interval_max': 0.2,

254

}

255

```

256

257

### Beat Scheduler Configuration

258

259

Periodic task scheduler settings and schedule definitions for recurring jobs.

260

261

```python { .api }

262

# Beat scheduler settings

263

beat_schedule = {

264

'add-every-30-seconds': {

265

'task': 'tasks.add',

266

'schedule': 30.0,

267

'args': (16, 16)

268

},

269

'daily-report': {

270

'task': 'tasks.generate_report',

271

'schedule': crontab(hour=4, minute=0),

272

'kwargs': {'report_type': 'daily'}

273

}

274

}

275

276

beat_scheduler = 'celery.beat:PersistentScheduler' # Scheduler class

277

beat_schedule_filename = 'celerybeat-schedule' # Schedule persistence file

278

beat_max_loop_interval = 5 # Max seconds between schedule checks

279

```

280

281

### Timezone and Localization

282

283

Timezone handling and datetime configuration for consistent time handling across distributed systems.

284

285

```python { .api }

286

# Timezone settings

287

timezone = 'UTC' # Default timezone

288

enable_utc = True # Use UTC internally

289

result_expires = 3600 # Result expiration (seconds)

290

291

# Datetime handling

292

task_always_eager = False # Execute immediately (testing)

293

task_eager_propagates = True # Propagate exceptions when eager

294

```

295

296

### Database Backend Configuration

297

298

Settings specific to database result backends including connection pooling and table configuration.

299

300

```python { .api }

301

# Database backend options

302

database_url = 'postgresql://user:pass@localhost/celery'

303

database_engine_options = {

304

'pool_size': 20,

305

'max_overflow': 0,

306

'pool_pre_ping': True,

307

'pool_recycle': -1,

308

'echo': False

309

}

310

311

# Table configuration

312

database_table_schemas = {

313

'task': 'celery_tasks',

314

'group': 'celery_groups'

315

}

316

database_table_names = {

317

'task': 'celery_taskmeta',

318

'group': 'celery_groupmeta'

319

}

320

```

321

322

## Usage Examples

323

324

### Basic Configuration Setup

325

326

```python

327

from celery import Celery

328

329

# Create app with basic configuration

330

app = Celery('myapp')

331

332

# Configure via dictionary

333

app.conf.update(

334

broker_url='redis://localhost:6379/0',

335

result_backend='redis://localhost:6379/1',

336

task_serializer='json',

337

accept_content=['json'],

338

result_serializer='json',

339

timezone='UTC',

340

enable_utc=True,

341

)

342

343

# Alternative: configure individual settings

344

app.conf.broker_url = 'redis://localhost:6379/0'

345

app.conf.result_backend = 'redis://localhost:6379/1'

346

app.conf.task_serializer = 'json'

347

```

348

349

### Configuration from Module

350

351

```python

352

# celeryconfig.py

353

broker_url = 'redis://localhost:6379/0'

354

result_backend = 'redis://localhost:6379/1'

355

356

task_serializer = 'json'

357

result_serializer = 'json'

358

accept_content = ['json']

359

timezone = 'UTC'

360

enable_utc = True

361

362

# Task routing

363

task_routes = {

364

'myapp.tasks.heavy_computation': {'queue': 'heavy'},

365

'myapp.tasks.send_email': {'queue': 'email'},

366

}

367

368

# Worker configuration

369

worker_concurrency = 4

370

worker_prefetch_multiplier = 1

371

task_acks_late = True

372

373

# Load configuration in app

374

app = Celery('myapp')

375

app.config_from_object('celeryconfig')

376

```

377

378

### Environment-Based Configuration

379

380

```python

381

import os

382

383

# Production configuration

384

class ProductionConfig:

385

broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://redis:6379/0')

386

result_backend = os.environ.get('CELERY_RESULT_BACKEND', 'redis://redis:6379/1')

387

388

# Security settings

389

task_serializer = 'json'

390

accept_content = ['json']

391

result_serializer = 'json'

392

393

# Performance settings

394

worker_concurrency = int(os.environ.get('WORKER_CONCURRENCY', '4'))

395

worker_prefetch_multiplier = 1

396

task_acks_late = True

397

398

# SSL configuration for production

399

broker_use_ssl = {

400

'keyfile': '/etc/ssl/private/worker-key.pem',

401

'certfile': '/etc/ssl/certs/worker-cert.pem',

402

'ca_certs': '/etc/ssl/certs/ca-bundle.pem',

403

'cert_reqs': 'ssl.CERT_REQUIRED'

404

}

405

406

# Development configuration

407

class DevelopmentConfig:

408

broker_url = 'redis://localhost:6379/0'

409

result_backend = 'redis://localhost:6379/1'

410

411

# Development-friendly settings

412

task_always_eager = True # Execute synchronously

413

task_eager_propagates = True

414

task_store_eager_result = True

415

416

# Load configuration based on environment

417

config_class = ProductionConfig if os.environ.get('FLASK_ENV') == 'production' else DevelopmentConfig

418

app.config_from_object(config_class)

419

```

420

421

### Advanced Routing Configuration

422

423

```python

424

from kombu import Queue, Exchange

425

426

# Define exchanges and queues

427

app.conf.task_routes = {

428

# Route by task name patterns

429

'myapp.tasks.email.*': {'queue': 'email'},

430

'myapp.tasks.report.*': {'queue': 'reports'},

431

'myapp.tasks.urgent.*': {'queue': 'priority'},

432

}

433

434

# Advanced routing with custom function

435

def route_task(name, args, kwargs, options, task=None, **kwds):

436

"""Custom routing based on task parameters."""

437

438

# Route by priority argument

439

if kwargs.get('priority') == 'high':

440

return {'queue': 'priority', 'routing_key': 'priority.high'}

441

442

# Route heavy tasks to dedicated workers

443

if 'heavy' in name or kwargs.get('estimated_time', 0) > 300:

444

return {'queue': 'heavy', 'routing_key': 'heavy.long'}

445

446

# Route by user type

447

user_id = kwargs.get('user_id')

448

if user_id and is_premium_user(user_id):

449

return {'queue': 'premium', 'routing_key': 'premium.user'}

450

451

# Default routing

452

return {'queue': 'default'}

453

454

app.conf.task_routes = [route_task]

455

456

# Queue definitions with specific settings

457

app.conf.task_routes = {

458

'myapp.tasks.process_image': {

459

'queue': 'media',

460

'routing_key': 'media.image',

461

'priority': 5

462

}

463

}

464

465

def is_premium_user(user_id):

466

# Check if user is premium

467

return user_id % 2 == 0 # Example logic

468

```

469

470

### Multi-Backend Configuration

471

472

```python

473

# Different backends for different purposes

474

app.conf.update(

475

# Main result backend

476

result_backend='redis://localhost:6379/1',

477

478

# Cache backend for temporary data

479

cache_backend='memcached://127.0.0.1:11211/',

480

481

# Database backend for persistent results

482

database_url='postgresql://user:pass@localhost/celery_results',

483

)

484

485

# Backend-specific options

486

app.conf.result_backend_transport_options = {

487

'retry_on_timeout': True,

488

'socket_keepalive': True,

489

'socket_keepalive_options': {

490

'TCP_KEEPINTVL': 1,

491

'TCP_KEEPCNT': 3,

492

'TCP_KEEPIDLE': 1,

493

}

494

}

495

496

# Database backend specific settings

497

app.conf.database_engine_options = {

498

'pool_size': 10,

499

'max_overflow': 20,

500

'pool_pre_ping': True,

501

'pool_recycle': 300

502

}

503

```

504

505

### Security Configuration

506

507

```python

508

# Message signing for secure transport

509

app.conf.update(

510

# Enable message signing

511

task_serializer='auth',

512

result_serializer='json',

513

accept_content=['auth', 'json'],

514

515

# Security keys

516

security_key='your-secret-key-here',

517

security_certificate='/path/to/certificate.pem',

518

security_digest='sha256',

519

)

520

521

# SSL/TLS configuration for broker

522

app.conf.broker_use_ssl = {

523

'keyfile': '/etc/ssl/private/client-key.pem',

524

'certfile': '/etc/ssl/certs/client-cert.pem',

525

'ca_certs': '/etc/ssl/certs/ca-cert.pem',

526

'cert_reqs': 'ssl.CERT_REQUIRED',

527

'ssl_version': 'ssl.PROTOCOL_TLSv1_2',

528

'ciphers': 'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS'

529

}

530

531

# Result backend SSL

532

app.conf.redis_backend_use_ssl = {

533

'ssl_cert_reqs': 'ssl.CERT_REQUIRED',

534

'ssl_ca_certs': '/etc/ssl/certs/ca-cert.pem',

535

'ssl_certfile': '/etc/ssl/certs/client-cert.pem',

536

'ssl_keyfile': '/etc/ssl/private/client-key.pem',

537

}

538

```

539

540

### Performance Tuning Configuration

541

542

```python

543

# High-performance configuration

544

app.conf.update(

545

# Worker performance

546

worker_concurrency=8, # Match CPU cores

547

worker_prefetch_multiplier=1, # Prevent memory issues

548

worker_max_tasks_per_child=1000, # Prevent memory leaks

549

worker_max_memory_per_child=200000, # 200MB limit

550

551

# Task execution

552

task_acks_late=True, # Acknowledge after completion

553

task_reject_on_worker_lost=True, # Requeue on worker death

554

worker_disable_rate_limits=False, # Keep rate limits

555

556

# Connection pooling

557

broker_pool_limit=10, # Connection pool size

558

broker_connection_retry=True,

559

broker_connection_max_retries=100,

560

561

# Result backend optimization

562

result_expires=3600, # 1 hour expiration

563

result_compression='gzip', # Compress results

564

result_serializer='json', # Fast serialization

565

566

# Task routing optimization

567

task_default_delivery_mode=2, # Persistent messages

568

task_compression='gzip', # Compress task data

569

570

# Monitoring (disable in production for performance)

571

worker_send_task_events=False,

572

task_send_sent_event=False,

573

)

574

```

575

576

### Beat Schedule Configuration

577

578

```python

579

from celery.schedules import crontab, schedule

580

from datetime import timedelta

581

582

# Comprehensive beat schedule

583

app.conf.beat_schedule = {

584

# Simple interval task

585

'heartbeat': {

586

'task': 'myapp.tasks.heartbeat',

587

'schedule': 30.0, # Every 30 seconds

588

},

589

590

# Cron-style scheduling

591

'daily-report': {

592

'task': 'myapp.tasks.generate_daily_report',

593

'schedule': crontab(hour=6, minute=0), # 6:00 AM daily

594

'args': ('daily',),

595

'kwargs': {'email_recipients': ['admin@example.com']}

596

},

597

598

# Complex cron schedule

599

'business-hours-check': {

600

'task': 'myapp.tasks.health_check',

601

'schedule': crontab(minute='*/15', hour='9-17', day_of_week='1-5'),

602

'options': {'expires': 60} # Expire after 1 minute

603

},

604

605

# Timedelta schedule

606

'cleanup-temp': {

607

'task': 'myapp.tasks.cleanup_temp_files',

608

'schedule': timedelta(hours=1),

609

'options': {

610

'queue': 'maintenance',

611

'routing_key': 'maintenance.cleanup'

612

}

613

},

614

615

# Dynamic schedule based on configuration

616

'dynamic-task': {

617

'task': 'myapp.tasks.dynamic_processor',

618

'schedule': float(os.environ.get('DYNAMIC_INTERVAL', '300')), # 5 minutes default

619

'kwargs': {

620

'batch_size': int(os.environ.get('BATCH_SIZE', '100'))

621

}

622

}

623

}

624

625

# Beat scheduler settings

626

app.conf.update(

627

beat_scheduler='celery.beat:PersistentScheduler',

628

beat_schedule_filename='celerybeat-schedule.db',

629

beat_max_loop_interval=30, # Check schedule every 30 seconds max

630

)

631

```

632

633

### Configuration Validation

634

635

```python

636

def validate_config(app):

637

"""Validate Celery configuration for common issues."""

638

639

conf = app.conf

640

issues = []

641

642

# Check broker URL

643

if not conf.broker_url:

644

issues.append("broker_url is not configured")

645

646

# Check serialization security

647

if 'pickle' in conf.accept_content:

648

issues.append("pickle serialization is insecure, use json instead")

649

650

# Check for production readiness

651

if conf.task_always_eager and os.environ.get('ENVIRONMENT') == 'production':

652

issues.append("task_always_eager should not be True in production")

653

654

# Check worker settings

655

if conf.worker_prefetch_multiplier > 4:

656

issues.append("worker_prefetch_multiplier > 4 may cause memory issues")

657

658

# Check SSL in production

659

if (os.environ.get('ENVIRONMENT') == 'production' and

660

not conf.broker_use_ssl and

661

'rediss://' not in conf.broker_url):

662

issues.append("SSL should be enabled in production")

663

664

if issues:

665

print("Configuration Issues:")

666

for issue in issues:

667

print(f" - {issue}")

668

return False

669

670

print("Configuration validation passed")

671

return True

672

673

# Validate after configuration

674

validate_config(app)

675

```