A Dagster integration for celery-k8s-executor that provides Kubernetes-native distributed execution system combining Celery task queuing with Kubernetes job orchestration
npx @tessl/cli install tessl/pypi-dagster-celery-k8s@0.27.00
# Dagster Celery K8s
1
2
A Dagster integration that provides a Kubernetes-native distributed execution system by combining Celery task queuing with Kubernetes job orchestration. It enables scalable data pipeline execution through a two-tier architecture where a run worker Kubernetes Job traverses the execution plan and submits individual steps to Celery queues, while Celery workers spawn separate Kubernetes Jobs for each step execution.
3
4
## Package Information
5
6
- **Package Name**: dagster-celery-k8s
7
- **Language**: Python
8
- **Installation**: `pip install dagster-celery-k8s`
9
- **Version**: 0.27.9
10
11
## Core Imports
12
13
```python
14
from dagster_celery_k8s import CeleryK8sRunLauncher, celery_k8s_job_executor
15
```
16
17
Import version information:
18
19
```python
20
from dagster_celery_k8s import __version__
21
```
22
23
Import types for full type annotations:
24
25
```python
26
from typing import Optional, List, Dict, Any
27
from dagster import DagsterRun, LaunchRunContext
28
from dagster._core.launcher.base import CheckRunHealthResult
29
from dagster._serdes import ConfigurableClassData
30
from dagster_k8s import DagsterK8sJobConfig
31
```
32
33
## Basic Usage
34
35
```python
36
from dagster import job
37
from dagster_celery_k8s import celery_k8s_job_executor
38
39
# Define a job using the Celery K8s executor
40
@job(executor_def=celery_k8s_job_executor)
41
def my_distributed_job():
42
# Your job operations here
43
pass
44
```
45
46
Configure in Dagster instance (dagster.yaml):
47
48
```yaml
49
run_launcher:
50
module: dagster_k8s.launcher
51
class: CeleryK8sRunLauncher
52
config:
53
instance_config_map: "dagster-k8s-instance-config-map"
54
dagster_home: "/opt/dagster/dagster_home"
55
postgres_password_secret: "dagster-k8s-pg-password"
56
broker: "pyamqp://guest@localhost//"
57
backend: "rpc://"
58
59
execution:
60
config:
61
job_image: 'my_repo.com/image_name:latest'
62
job_namespace: 'dagster-execution'
63
broker: 'pyamqp://guest@localhost//'
64
backend: 'rpc://'
65
```
66
67
## Architecture
68
69
The dagster-celery-k8s package implements a two-tier distributed execution architecture:
70
71
1. **Run Worker Tier**: A Kubernetes Job that traverses the Dagster execution plan and submits individual steps to Celery queues
72
2. **Step Execution Tier**: Celery workers that pick up tasks and spawn separate Kubernetes Jobs for each step execution
73
74
This design provides:
75
- **Scalability**: Dynamic resource allocation through Kubernetes
76
- **Reliability**: Container orchestration with Kubernetes job management
77
- **Flexibility**: Celery's distributed task processing capabilities
78
- **Cloud-native**: Optimized for cloud environments with automatic resource management
79
80
## Capabilities
81
82
### Run Launcher
83
84
The primary component for launching Dagster runs as Kubernetes jobs with Celery task distribution.
85
86
```python { .api }
87
class CeleryK8sRunLauncher(RunLauncher, ConfigurableClass):
88
"""
89
Run launcher for Kubernetes-based execution with Celery task queuing.
90
91
Launches dagster runs as Kubernetes Jobs that traverse execution plans
92
and submit steps to Celery queues for distributed execution.
93
"""
94
95
def __init__(
96
self,
97
instance_config_map,
98
dagster_home,
99
postgres_password_secret,
100
load_incluster_config=True,
101
kubeconfig_file=None,
102
broker=None,
103
backend=None,
104
include=None,
105
config_source=None,
106
retries=None,
107
inst_data: Optional[ConfigurableClassData] = None,
108
k8s_client_batch_api=None,
109
env_config_maps=None,
110
env_secrets=None,
111
volume_mounts=None,
112
volumes=None,
113
service_account_name=None,
114
image_pull_policy=None,
115
image_pull_secrets=None,
116
labels=None,
117
fail_pod_on_run_failure=None,
118
job_namespace=None,
119
):
120
"""
121
Initialize the CeleryK8sRunLauncher.
122
123
Args:
124
instance_config_map: Name of the ConfigMap containing instance configuration
125
dagster_home: Path to Dagster home directory
126
postgres_password_secret: Name of secret containing PostgreSQL password
127
load_incluster_config: Whether to load Kubernetes config from within cluster
128
kubeconfig_file: Path to kubeconfig file if not using in-cluster config
129
broker: Celery broker URL
130
backend: Celery backend URL
131
include: List of modules for Celery workers to import
132
config_source: Additional Celery configuration
133
retries: Retry configuration
134
inst_data: Configurable class instance data
135
k8s_client_batch_api: Override for Kubernetes batch API client
136
env_config_maps: List of ConfigMaps to mount as environment variables
137
env_secrets: List of Secrets to mount as environment variables
138
volume_mounts: List of volume mounts for pods
139
volumes: List of volumes for pods
140
service_account_name: Kubernetes service account name
141
image_pull_policy: Image pull policy for containers
142
image_pull_secrets: List of image pull secrets
143
labels: Labels to apply to Kubernetes resources
144
fail_pod_on_run_failure: Whether to fail pod on run failure
145
job_namespace: Kubernetes namespace for jobs
146
"""
147
148
def launch_run(self, context: LaunchRunContext) -> None:
149
"""
150
Launch a Dagster run as a Kubernetes job.
151
152
Args:
153
context: Launch context containing run information and code origin
154
"""
155
156
def terminate(self, run_id):
157
"""
158
Terminate a running Dagster job.
159
160
Args:
161
run_id: ID of the run to terminate
162
163
Returns:
164
True if termination was successful, False if run is already finished,
165
None if an exception occurred during termination
166
"""
167
168
def check_run_worker_health(self, run: DagsterRun) -> CheckRunHealthResult:
169
"""
170
Check the health of a run worker.
171
172
Args:
173
run: The Dagster run to check
174
175
Returns:
176
Health check result with worker status
177
"""
178
179
@classmethod
180
def config_type(cls):
181
"""Return the configuration schema for this run launcher."""
182
183
@classmethod
184
def from_config_value(cls, inst_data, config_value):
185
"""Create instance from configuration values."""
186
187
def get_k8s_job_config(self, job_image, exc_config) -> DagsterK8sJobConfig:
188
"""
189
Get Kubernetes job configuration.
190
191
Args:
192
job_image: Docker image for the job
193
exc_config: Executor configuration dictionary
194
195
Returns:
196
Kubernetes job configuration object
197
"""
198
199
def get_namespace_from_run_config(self, run_id: str) -> str:
200
"""
201
Extract namespace from run configuration.
202
203
Args:
204
run_id: Run identifier
205
206
Returns:
207
Kubernetes namespace name
208
"""
209
210
@property
211
def supports_check_run_worker_health(self) -> bool:
212
"""Whether this launcher supports health checks."""
213
214
@property
215
def inst_data(self) -> Optional[ConfigurableClassData]:
216
"""Configuration instance data."""
217
```
218
219
### Executor
220
221
The executor function that creates the distributed execution engine.
222
223
```python { .api }
224
@executor(
225
name="celery-k8s",
226
config_schema=celery_k8s_executor_config(),
227
requirements=multiple_process_executor_requirements(),
228
)
229
def celery_k8s_job_executor(init_context):
230
"""
231
Celery-based executor which launches tasks as Kubernetes Jobs.
232
233
The executor exposes config settings for the underlying Celery app and
234
Kubernetes job configuration. It works in concert with CeleryK8sRunLauncher
235
to provide distributed execution where:
236
237
1. A run worker Kubernetes Job traverses the dagster run execution plan
238
and submits steps to Celery queues for execution
239
2. Celery workers pick up step executions and spawn Kubernetes Jobs
240
for each step
241
242
Args:
243
init_context: Executor initialization context containing configuration
244
245
Returns:
246
CeleryK8sJobExecutor instance configured for distributed execution
247
"""
248
```
249
250
### Configuration
251
252
Configuration utilities for setting up the executor and launcher.
253
254
```python { .api }
255
def celery_k8s_executor_config():
256
"""
257
Return configuration schema for celery-k8s executor.
258
259
Merges Celery configuration, Kubernetes job configuration, and
260
additional celery-k8s specific options.
261
262
Returns:
263
Configuration dictionary with all available options
264
"""
265
266
def get_celery_engine_config(
267
image_pull_policy=None,
268
additional_env_config_maps=None
269
):
270
"""
271
Get engine configuration for Celery K8s execution.
272
273
Args:
274
image_pull_policy: Kubernetes image pull policy
275
additional_env_config_maps: Additional ConfigMaps for environment variables
276
277
Returns:
278
Engine configuration dictionary
279
"""
280
281
def get_celery_engine_job_config(
282
image_pull_policy=None,
283
additional_env_config_maps=None
284
):
285
"""
286
Get job configuration for Celery K8s execution.
287
288
Args:
289
image_pull_policy: Kubernetes image pull policy
290
additional_env_config_maps: Additional ConfigMaps for environment variables
291
292
Returns:
293
Job configuration dictionary
294
"""
295
```
296
297
### CeleryK8sJobExecutor
298
299
The executor implementation class that handles distributed step execution.
300
301
```python { .api }
302
class CeleryK8sJobExecutor(Executor):
303
"""
304
Executor that runs steps as Kubernetes Jobs via Celery task queuing.
305
306
This executor works in concert with CeleryK8sRunLauncher to provide
307
distributed execution of Dagster steps.
308
"""
309
310
def __init__(
311
self,
312
retries,
313
broker=None,
314
backend=None,
315
include=None,
316
config_source=None,
317
job_config=None,
318
job_namespace=None,
319
load_incluster_config=False,
320
kubeconfig_file=None,
321
repo_location_name=None,
322
job_wait_timeout=None,
323
per_step_k8s_config=None,
324
):
325
"""
326
Initialize the CeleryK8sJobExecutor.
327
328
Args:
329
retries: Retry mode configuration
330
broker: Celery broker URL
331
backend: Celery backend URL
332
include: List of modules for Celery workers to import
333
config_source: Additional Celery configuration dictionary
334
job_config: Kubernetes job configuration object
335
job_namespace: Kubernetes namespace for jobs
336
load_incluster_config: Whether to load Kubernetes config from within cluster
337
kubeconfig_file: Path to kubeconfig file
338
repo_location_name: Repository location name for execution
339
job_wait_timeout: Timeout for job completion in seconds
340
per_step_k8s_config: Per-step Kubernetes configuration overrides
341
"""
342
343
def execute(self, plan_context, execution_plan):
344
"""
345
Execute the execution plan using Celery task distribution.
346
347
Args:
348
plan_context: Pipeline execution context
349
execution_plan: Execution plan to run
350
351
Returns:
352
Generator of execution events
353
"""
354
355
def app_args(self):
356
"""
357
Return arguments for Celery app configuration.
358
359
Returns:
360
Dictionary with broker, backend, include, config_source, and retries
361
"""
362
363
@property
364
def retries(self):
365
"""Return retry mode configuration."""
366
```
367
368
### Celery Application
369
370
Pre-configured Celery application for K8s job execution.
371
372
```python { .api }
373
app: celery.Celery
374
"""Pre-configured Celery application with task routes for K8s job execution."""
375
376
execute_step_k8s_job: celery.Task
377
"""Celery task instance for executing steps as K8s jobs."""
378
```
379
380
### Constants
381
382
```python { .api }
383
CELERY_K8S_CONFIG_KEY = "celery-k8s"
384
"""Configuration key for celery-k8s executor."""
385
386
__version__ = "0.27.9"
387
"""Package version."""
388
```
389
390
## Configuration Options
391
392
The executor supports extensive configuration for both Celery and Kubernetes:
393
394
### Kubernetes Configuration
395
- `job_image`: Docker image for step execution jobs
396
- `job_namespace`: Kubernetes namespace for jobs (default: "default")
397
- `load_incluster_config`: Load K8s config from within cluster (default: True)
398
- `kubeconfig_file`: Path to kubeconfig file
399
- `job_wait_timeout`: Timeout for job completion (default: 4 hours)
400
- `env_config_maps`: ConfigMaps to mount as environment variables
401
- `env_secrets`: Secrets to mount as environment variables
402
- `volume_mounts`: Volume mounts for job pods
403
- `volumes`: Volumes for job pods
404
- `service_account_name`: Kubernetes service account
405
- `image_pull_policy`: Image pull policy (default: "IfNotPresent")
406
- `image_pull_secrets`: Image pull secrets
407
- `labels`: Labels for Kubernetes resources
408
409
### Celery Configuration
410
- `broker`: Celery broker URL (e.g., "pyamqp://guest@localhost//")
411
- `backend`: Celery results backend URL (e.g., "rpc://")
412
- `include`: Modules for Celery workers to import
413
- `config_source`: Additional Celery configuration dictionary
414
- `retries`: Retry configuration for failed tasks
415
416
### Per-Step Configuration
417
- `per_step_k8s_config`: Per-operation Kubernetes configuration overrides
418
419
## Usage Examples
420
421
### Basic Job Definition
422
423
```python
424
from dagster import op, job
425
from dagster_celery_k8s import celery_k8s_job_executor
426
427
@op
428
def process_data():
429
return "processed"
430
431
@op
432
def save_results(data: str):
433
print(f"Saving: {data}")
434
435
@job(executor_def=celery_k8s_job_executor)
436
def my_pipeline():
437
save_results(process_data())
438
```
439
440
### Advanced Configuration
441
442
```python
443
from dagster import job, RunConfig
444
from dagster_celery_k8s import celery_k8s_job_executor
445
446
@job(executor_def=celery_k8s_job_executor)
447
def advanced_pipeline():
448
# Job operations
449
pass
450
451
# Run with specific configuration
452
run_config = RunConfig(
453
execution={
454
"config": {
455
"job_image": "my-registry/my-image:v1.0.0",
456
"job_namespace": "data-processing",
457
"broker": "redis://redis-service:6379/0",
458
"backend": "redis://redis-service:6379/1",
459
"job_wait_timeout": 3600, # 1 hour
460
"per_step_k8s_config": {
461
"heavy_computation": {
462
"container_config": {
463
"resources": {
464
"requests": {"cpu": "2", "memory": "4Gi"},
465
"limits": {"cpu": "4", "memory": "8Gi"}
466
}
467
}
468
}
469
}
470
}
471
}
472
)
473
```
474
475
### Instance Configuration
476
477
Configure in your `dagster.yaml`:
478
479
```yaml
480
run_launcher:
481
module: dagster_k8s.launcher
482
class: CeleryK8sRunLauncher
483
config:
484
# Required configuration
485
instance_config_map: "dagster-instance-config"
486
dagster_home: "/opt/dagster/dagster_home"
487
postgres_password_secret: "dagster-postgres-secret"
488
489
# Celery configuration
490
broker: "redis://redis-service:6379/0"
491
backend: "redis://redis-service:6379/1"
492
493
# Kubernetes configuration
494
job_namespace: "dagster-execution"
495
service_account_name: "dagster-service-account"
496
image_pull_policy: "Always"
497
498
# Resource configuration
499
env_config_maps:
500
- "dagster-env-config"
501
env_secrets:
502
- "dagster-secrets"
503
504
labels:
505
team: "data-engineering"
506
environment: "production"
507
```
508
509
## Error Handling
510
511
The package provides comprehensive error handling for:
512
513
### Kubernetes Errors
514
- Job creation failures and conflicts
515
- Pod scheduling issues
516
- Resource allocation problems
517
- Network connectivity issues
518
- Timeout scenarios
519
520
### Celery Errors
521
- Broker connection failures
522
- Task serialization issues
523
- Worker availability problems
524
- Message queue overflow
525
526
### Common Error Patterns
527
528
```python
529
from dagster import DagsterInvariantViolationError
530
531
# Image configuration error
532
if not job_image:
533
raise DagsterInvariantViolationError(
534
"You have not specified a job_image in your executor configuration. "
535
"Specify the job_image in the executor config section."
536
)
537
538
# Compatibility error
539
if not isinstance(run_launcher, CeleryK8sRunLauncher):
540
raise DagsterUnmetExecutorRequirementsError(
541
"This executor is only compatible with a CeleryK8sRunLauncher; "
542
"configure the CeleryK8sRunLauncher on your instance to use it."
543
)
544
```
545
546
## Integration Requirements
547
548
### Dependencies
549
- dagster==1.11.9
550
- dagster-k8s==0.27.9
551
- dagster-celery==0.27.9
552
- kubernetes (Python client)
553
554
### Environment Setup
555
1. **Kubernetes Cluster**: Access to a Kubernetes cluster
556
2. **Celery Broker**: Redis or RabbitMQ broker service
557
3. **Dagster Instance**: PostgreSQL-backed Dagster instance
558
4. **Container Registry**: Access to push/pull Docker images
559
5. **RBAC**: Kubernetes permissions for job creation and management
560
561
### Deployment Considerations
562
- Celery workers must be deployed with `-A dagster_celery_k8s.app` argument
563
- Kubernetes Jobs require appropriate resource quotas and limits
564
- Network policies must allow communication between components
565
- Persistent volumes may be needed for data sharing between steps