A Dagster executor that enables running Dagster steps within Docker containers orchestrated by Celery for scalable distributed pipeline execution
npx @tessl/cli install tessl/pypi-dagster--celery--docker@0.27.00
# Dagster Celery Docker
1
2
A Dagster executor that enables running Dagster steps within Docker containers orchestrated by Celery. This integration combines the distributed task execution capabilities of Celery with Docker containerization for isolated, scalable pipeline execution. The executor allows configuration of Docker images, registries, environment variables, and container networking while maintaining integration with Dagster's event and logging systems.
3
4
## Package Information
5
6
- **Package Name**: dagster-celery-docker
7
- **Package Type**: pypi
8
- **Language**: Python
9
- **Installation**: `pip install dagster-celery-docker`
10
11
## Core Imports
12
13
```python
14
from dagster_celery_docker import celery_docker_executor
15
```
16
17
For accessing version information:
18
19
```python
20
from dagster_celery_docker import __version__
21
```
22
23
For the Celery app (used in worker deployment):
24
25
```python
26
from dagster_celery_docker import app
27
```
28
29
## Basic Usage
30
31
```python
32
from dagster import job, op
33
from dagster_celery_docker import celery_docker_executor
34
35
@op
36
def my_op():
37
return "Hello from Docker container!"
38
39
@job(executor_def=celery_docker_executor)
40
def my_docker_job():
41
my_op()
42
43
# Execute with configuration
44
if __name__ == "__main__":
45
result = my_docker_job.execute_in_process(
46
run_config={
47
"execution": {
48
"config": {
49
"docker": {
50
"image": "python:3.9-slim",
51
"env_vars": ["DAGSTER_HOME"],
52
"container_kwargs": {
53
"auto_remove": True
54
}
55
},
56
"broker": "redis://localhost:6379/0",
57
"backend": "redis://localhost:6379/0"
58
}
59
}
60
}
61
)
62
```
63
64
## Architecture
65
66
The dagster-celery-docker integration extends Dagster's execution model with containerized distributed computing:
67
68
- **CeleryDockerExecutor**: Main executor class that orchestrates Docker container execution via Celery workers
69
- **Task Submission**: Steps are packaged as Celery tasks with Docker execution parameters
70
- **Container Management**: Automatic Docker image pulling, container lifecycle management, and cleanup
71
- **Event Integration**: Container execution events are reported back to Dagster's event system
72
- **Error Handling**: Docker container errors are captured and reported through Dagster's standard error handling
73
74
This design enables scalable pipeline execution across multiple worker nodes while providing the isolation and consistency of containerized environments.
75
76
## Capabilities
77
78
### Executor Definition
79
80
The main executor function that creates a Celery-based executor for running Dagster steps in Docker containers.
81
82
```python { .api }
83
@executor(
84
name="celery-docker",
85
config_schema=celery_docker_config(),
86
requirements=multiple_process_executor_requirements(),
87
)
88
def celery_docker_executor(init_context):
89
"""
90
Celery-based executor which launches tasks in docker containers.
91
92
The Celery executor exposes config settings for the underlying Celery app under
93
the ``config_source`` key. This config corresponds to the \"new lowercase settings\" introduced
94
in Celery version 4.0 and the object constructed from config will be passed to the
95
:py:class:`celery.Celery` constructor as its ``config_source`` argument.
96
97
Parameters:
98
- init_context: ExecutorInitContext from Dagster
99
100
Returns:
101
CeleryDockerExecutor instance configured with the provided settings
102
103
Configuration Schema:
104
- docker: Dict[str, Any] (required) - Docker configuration
105
- image: StringSource (optional) - Docker image for step execution
106
- registry: Dict[str, StringSource] (optional) - Registry configuration
107
- url: StringSource - Registry URL
108
- username: StringSource - Registry username
109
- password: StringSource - Registry password
110
- env_vars: List[str] (optional) - Environment variables to forward to container
111
- network: str (optional) - Docker network name for container
112
- container_kwargs: Permissive (optional) - Additional Docker container arguments
113
- broker: str (optional) - Celery broker URL
114
- backend: str (optional) - Celery results backend URL
115
- include: List[str] (optional) - Modules for workers to import
116
- config_source: Dict[str, Any] (optional) - Additional Celery configuration
117
- retries: RetryMode config (optional) - Retry configuration
118
"""
119
```
120
121
### Version Information
122
123
Package version constant for version checking and compatibility verification.
124
125
```python { .api }
126
__version__: str
127
```
128
129
The current version string for the dagster-celery-docker package.
130
131
### Celery App
132
133
The configured Celery application instance used by workers for task execution.
134
135
```python { .api }
136
app: celery.Celery
137
```
138
139
Celery app instance configured with task routes for docker step execution. Used when starting Celery workers with the `-A dagster_celery_docker.app` argument.
140
141
### Executor Class
142
143
The main executor class that orchestrates Docker container execution via Celery workers.
144
145
```python { .api }
146
class CeleryDockerExecutor(Executor):
147
def __init__(self, retries, docker_config, broker=None, backend=None, include=None, config_source=None):
148
"""
149
Initialize the Celery Docker executor.
150
151
Args:
152
retries: RetryMode instance for retry configuration
153
docker_config: Dict containing Docker configuration
154
broker: Optional Celery broker URL
155
backend: Optional Celery results backend URL
156
include: Optional list of modules for workers to import
157
config_source: Optional additional Celery configuration
158
"""
159
160
def execute(self, plan_context, execution_plan):
161
"""Execute the given execution plan using Celery workers and Docker containers."""
162
163
def app_args(self):
164
"""Return arguments for Celery app configuration."""
165
```
166
167
## Configuration Schema
168
169
### Docker Configuration (Required)
170
171
The executor requires Docker configuration to specify how containers should be created and managed:
172
173
```yaml
174
execution:
175
config:
176
docker:
177
image: 'my-repo.com/my-image:latest' # Docker image for step execution
178
registry: # Optional registry authentication
179
url: 'my-repo.com'
180
username: 'my-user'
181
password: {env: 'DOCKER_PASSWORD'}
182
env_vars: ["DAGSTER_HOME", "AWS_PROFILE"] # Environment variables to pass
183
network: 'my-network' # Docker network to connect container
184
container_kwargs: # Additional Docker container arguments
185
volumes: ['/host/path:/container/path']
186
memory: '1g'
187
cpu_count: 2
188
```
189
190
### Celery Configuration (Optional)
191
192
Standard Celery configuration options inherited from dagster-celery:
193
194
```yaml
195
execution:
196
config:
197
broker: 'redis://localhost:6379/0' # Celery message broker
198
backend: 'redis://localhost:6379/0' # Celery results backend
199
include: ['my_module'] # Modules for workers to import
200
config_source: # Additional Celery worker configuration
201
task_serializer: 'json'
202
result_serializer: 'json'
203
task_routes:
204
'my_task': {'queue': 'priority'}
205
```
206
207
### Retry Configuration (Optional)
208
209
Configure retry behavior for failed step executions:
210
211
```yaml
212
execution:
213
config:
214
retries:
215
enabled: true
216
max_retries: 3
217
retry_delay: 60 # seconds
218
```
219
220
## Usage Examples
221
222
### Basic Docker Execution
223
224
```python
225
from dagster import job, op
226
from dagster_celery_docker import celery_docker_executor
227
228
@op
229
def process_data():
230
import pandas as pd
231
# Data processing logic
232
return {"status": "processed"}
233
234
@job(executor_def=celery_docker_executor)
235
def data_pipeline():
236
process_data()
237
238
# Run with minimal configuration
239
result = data_pipeline.execute_in_process(
240
run_config={
241
"execution": {
242
"config": {
243
"docker": {
244
"image": "python:3.9-slim"
245
}
246
}
247
}
248
}
249
)
250
```
251
252
### Advanced Configuration with Registry and Environment
253
254
```python
255
from dagster import job, op, Config
256
from dagster_celery_docker import celery_docker_executor
257
258
class ProcessingConfig(Config):
259
input_path: str
260
output_path: str
261
262
@op
263
def secure_processing(config: ProcessingConfig):
264
# Processing that requires specific environment and credentials
265
return f"Processed {config.input_path} -> {config.output_path}"
266
267
@job(executor_def=celery_docker_executor)
268
def secure_pipeline():
269
secure_processing()
270
271
# Run with full configuration
272
result = secure_pipeline.execute_in_process(
273
run_config={
274
"ops": {
275
"secure_processing": {
276
"config": {
277
"input_path": "/data/input.csv",
278
"output_path": "/data/output.csv"
279
}
280
}
281
},
282
"execution": {
283
"config": {
284
"docker": {
285
"image": "my-company.com/data-processor:v1.2.3",
286
"registry": {
287
"url": "my-company.com",
288
"username": "deploy-user",
289
"password": {"env": "REGISTRY_PASSWORD"}
290
},
291
"env_vars": ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "DAGSTER_HOME"],
292
"network": "data-processing-network",
293
"container_kwargs": {
294
"volumes": ["/host/data:/data", "/host/tmp:/tmp"],
295
"memory": "4g",
296
"cpu_count": 4,
297
"auto_remove": True
298
}
299
},
300
"broker": "redis://redis-cluster:6379/0",
301
"backend": "redis://redis-cluster:6379/1",
302
"retries": {
303
"enabled": True,
304
"max_retries": 3,
305
"retry_delay": 120
306
}
307
}
308
}
309
}
310
)
311
```
312
313
### Multi-Worker Distributed Execution
314
315
```python
316
from dagster import job, op, DynamicOut, DynamicOutput
317
from dagster_celery_docker import celery_docker_executor
318
from typing import List
319
320
@op(out=DynamicOut())
321
def split_work() -> List[DynamicOutput]:
322
# Create multiple work items that can be distributed
323
work_items = [f"task_{i}" for i in range(10)]
324
return [DynamicOutput(item, mapping_key=str(i)) for i, item in enumerate(work_items)]
325
326
@op
327
def process_item(item: str) -> str:
328
# Each item processed in its own Docker container across workers
329
import time
330
time.sleep(5) # Simulate processing
331
return f"processed_{item}"
332
333
@op
334
def combine_results(results: List[str]) -> str:
335
return f"Combined {len(results)} results: {', '.join(results[:3])}..."
336
337
@job(executor_def=celery_docker_executor)
338
def distributed_pipeline():
339
results = split_work().map(process_item)
340
combine_results(results.collect())
341
342
# Execute with worker scaling configuration
343
result = distributed_pipeline.execute_in_process(
344
run_config={
345
"execution": {
346
"config": {
347
"docker": {
348
"image": "python:3.9-slim",
349
"container_kwargs": {
350
"cpu_count": 2,
351
"memory": "2g"
352
}
353
},
354
"broker": "amqp://guest@rabbitmq:5672//",
355
"backend": "rpc://",
356
"config_source": {
357
"worker_prefetch_multiplier": 1,
358
"task_acks_late": True,
359
"worker_max_tasks_per_child": 100
360
}
361
}
362
}
363
}
364
)
365
```
366
367
## Error Handling
368
369
The executor handles various error conditions and reports them through Dagster's event system:
370
371
- **Container Creation Errors**: Issues with Docker image pulling, registry authentication, or container configuration
372
- **Container Execution Errors**: Runtime errors within containers, including non-zero exit codes
373
- **Network Errors**: Container networking issues or communication failures
374
- **Resource Errors**: Insufficient memory, CPU, or disk space for container execution
375
- **Celery Worker Errors**: Worker disconnections, task routing failures, or broker communication issues
376
377
All errors are captured with detailed metadata and reported as Dagster engine events, maintaining full observability of distributed execution.
378
379
## Dependencies
380
381
- **dagster**: Core Dagster framework (==1.11.9)
382
- **dagster-celery**: Celery integration for Dagster (==0.27.9)
383
- **dagster-graphql**: GraphQL support for Dagster (==1.11.9)
384
- **docker**: Docker Python client for container management
385
386
## Deployment Considerations
387
388
### Celery Worker Setup
389
390
Workers must be started with the dagster-celery-docker app:
391
392
```bash
393
celery -A dagster_celery_docker.app worker --loglevel=info --queues=dagster
394
```
395
396
### Docker Access
397
398
Workers need Docker daemon access:
399
- Docker socket mounted: `-v /var/run/docker.sock:/var/run/docker.sock`
400
- Or Docker-in-Docker setup for containerized workers
401
- Appropriate permissions for Docker operations
402
403
### Network Configuration
404
405
- Containers need network access to Dagster instance
406
- Consider container networking for inter-step communication
407
- Security implications of container network access
408
409
### Resource Management
410
411
- Configure appropriate container resource limits
412
- Monitor Docker disk usage for container images and volumes
413
- Plan for concurrent container execution resource requirements