or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Dagster Celery K8s

1

2

A Dagster integration that provides a Kubernetes-native distributed execution system by combining Celery task queuing with Kubernetes job orchestration. It enables scalable data pipeline execution through a two-tier architecture where a run worker Kubernetes Job traverses the execution plan and submits individual steps to Celery queues, while Celery workers spawn separate Kubernetes Jobs for each step execution.

3

4

## Package Information

5

6

- **Package Name**: dagster-celery-k8s

7

- **Language**: Python

8

- **Installation**: `pip install dagster-celery-k8s`

9

- **Version**: 0.27.9

10

11

## Core Imports

12

13

```python

14

from dagster_celery_k8s import CeleryK8sRunLauncher, celery_k8s_job_executor

15

```

16

17

Import version information:

18

19

```python

20

from dagster_celery_k8s import __version__

21

```

22

23

Import types for full type annotations:

24

25

```python

26

from typing import Optional, List, Dict, Any

27

from dagster import DagsterRun, LaunchRunContext

28

from dagster._core.launcher.base import CheckRunHealthResult

29

from dagster._serdes import ConfigurableClassData

30

from dagster_k8s import DagsterK8sJobConfig

31

```

32

33

## Basic Usage

34

35

```python

36

from dagster import job

37

from dagster_celery_k8s import celery_k8s_job_executor

38

39

# Define a job using the Celery K8s executor

40

@job(executor_def=celery_k8s_job_executor)

41

def my_distributed_job():

42

# Your job operations here

43

pass

44

```

45

46

Configure in Dagster instance (dagster.yaml):

47

48

```yaml

49

run_launcher:

50

module: dagster_k8s.launcher

51

class: CeleryK8sRunLauncher

52

config:

53

instance_config_map: "dagster-k8s-instance-config-map"

54

dagster_home: "/opt/dagster/dagster_home"

55

postgres_password_secret: "dagster-k8s-pg-password"

56

broker: "pyamqp://guest@localhost//"

57

backend: "rpc://"

58

59

execution:

60

config:

61

job_image: 'my_repo.com/image_name:latest'

62

job_namespace: 'dagster-execution'

63

broker: 'pyamqp://guest@localhost//'

64

backend: 'rpc://'

65

```

66

67

## Architecture

68

69

The dagster-celery-k8s package implements a two-tier distributed execution architecture:

70

71

1. **Run Worker Tier**: A Kubernetes Job that traverses the Dagster execution plan and submits individual steps to Celery queues

72

2. **Step Execution Tier**: Celery workers that pick up tasks and spawn separate Kubernetes Jobs for each step execution

73

74

This design provides:

75

- **Scalability**: Dynamic resource allocation through Kubernetes

76

- **Reliability**: Container orchestration with Kubernetes job management

77

- **Flexibility**: Celery's distributed task processing capabilities

78

- **Cloud-native**: Optimized for cloud environments with automatic resource management

79

80

## Capabilities

81

82

### Run Launcher

83

84

The primary component for launching Dagster runs as Kubernetes jobs with Celery task distribution.

85

86

```python { .api }

87

class CeleryK8sRunLauncher(RunLauncher, ConfigurableClass):

88

"""

89

Run launcher for Kubernetes-based execution with Celery task queuing.

90

91

Launches dagster runs as Kubernetes Jobs that traverse execution plans

92

and submit steps to Celery queues for distributed execution.

93

"""

94

95

def __init__(

96

self,

97

instance_config_map,

98

dagster_home,

99

postgres_password_secret,

100

load_incluster_config=True,

101

kubeconfig_file=None,

102

broker=None,

103

backend=None,

104

include=None,

105

config_source=None,

106

retries=None,

107

inst_data: Optional[ConfigurableClassData] = None,

108

k8s_client_batch_api=None,

109

env_config_maps=None,

110

env_secrets=None,

111

volume_mounts=None,

112

volumes=None,

113

service_account_name=None,

114

image_pull_policy=None,

115

image_pull_secrets=None,

116

labels=None,

117

fail_pod_on_run_failure=None,

118

job_namespace=None,

119

):

120

"""

121

Initialize the CeleryK8sRunLauncher.

122

123

Args:

124

instance_config_map: Name of the ConfigMap containing instance configuration

125

dagster_home: Path to Dagster home directory

126

postgres_password_secret: Name of secret containing PostgreSQL password

127

load_incluster_config: Whether to load Kubernetes config from within cluster

128

kubeconfig_file: Path to kubeconfig file if not using in-cluster config

129

broker: Celery broker URL

130

backend: Celery backend URL

131

include: List of modules for Celery workers to import

132

config_source: Additional Celery configuration

133

retries: Retry configuration

134

inst_data: Configurable class instance data

135

k8s_client_batch_api: Override for Kubernetes batch API client

136

env_config_maps: List of ConfigMaps to mount as environment variables

137

env_secrets: List of Secrets to mount as environment variables

138

volume_mounts: List of volume mounts for pods

139

volumes: List of volumes for pods

140

service_account_name: Kubernetes service account name

141

image_pull_policy: Image pull policy for containers

142

image_pull_secrets: List of image pull secrets

143

labels: Labels to apply to Kubernetes resources

144

fail_pod_on_run_failure: Whether to fail pod on run failure

145

job_namespace: Kubernetes namespace for jobs

146

"""

147

148

def launch_run(self, context: LaunchRunContext) -> None:

149

"""

150

Launch a Dagster run as a Kubernetes job.

151

152

Args:

153

context: Launch context containing run information and code origin

154

"""

155

156

def terminate(self, run_id):

157

"""

158

Terminate a running Dagster job.

159

160

Args:

161

run_id: ID of the run to terminate

162

163

Returns:

164

True if termination was successful, False if run is already finished,

165

None if an exception occurred during termination

166

"""

167

168

def check_run_worker_health(self, run: DagsterRun) -> CheckRunHealthResult:

169

"""

170

Check the health of a run worker.

171

172

Args:

173

run: The Dagster run to check

174

175

Returns:

176

Health check result with worker status

177

"""

178

179

@classmethod

180

def config_type(cls):

181

"""Return the configuration schema for this run launcher."""

182

183

@classmethod

184

def from_config_value(cls, inst_data, config_value):

185

"""Create instance from configuration values."""

186

187

def get_k8s_job_config(self, job_image, exc_config) -> DagsterK8sJobConfig:

188

"""

189

Get Kubernetes job configuration.

190

191

Args:

192

job_image: Docker image for the job

193

exc_config: Executor configuration dictionary

194

195

Returns:

196

Kubernetes job configuration object

197

"""

198

199

def get_namespace_from_run_config(self, run_id: str) -> str:

200

"""

201

Extract namespace from run configuration.

202

203

Args:

204

run_id: Run identifier

205

206

Returns:

207

Kubernetes namespace name

208

"""

209

210

@property

211

def supports_check_run_worker_health(self) -> bool:

212

"""Whether this launcher supports health checks."""

213

214

@property

215

def inst_data(self) -> Optional[ConfigurableClassData]:

216

"""Configuration instance data."""

217

```

218

219

### Executor

220

221

The executor function that creates the distributed execution engine.

222

223

```python { .api }

224

@executor(

225

name="celery-k8s",

226

config_schema=celery_k8s_executor_config(),

227

requirements=multiple_process_executor_requirements(),

228

)

229

def celery_k8s_job_executor(init_context):

230

"""

231

Celery-based executor which launches tasks as Kubernetes Jobs.

232

233

The executor exposes config settings for the underlying Celery app and

234

Kubernetes job configuration. It works in concert with CeleryK8sRunLauncher

235

to provide distributed execution where:

236

237

1. A run worker Kubernetes Job traverses the dagster run execution plan

238

and submits steps to Celery queues for execution

239

2. Celery workers pick up step executions and spawn Kubernetes Jobs

240

for each step

241

242

Args:

243

init_context: Executor initialization context containing configuration

244

245

Returns:

246

CeleryK8sJobExecutor instance configured for distributed execution

247

"""

248

```

249

250

### Configuration

251

252

Configuration utilities for setting up the executor and launcher.

253

254

```python { .api }

255

def celery_k8s_executor_config():

256

"""

257

Return configuration schema for celery-k8s executor.

258

259

Merges Celery configuration, Kubernetes job configuration, and

260

additional celery-k8s specific options.

261

262

Returns:

263

Configuration dictionary with all available options

264

"""

265

266

def get_celery_engine_config(

267

image_pull_policy=None,

268

additional_env_config_maps=None

269

):

270

"""

271

Get engine configuration for Celery K8s execution.

272

273

Args:

274

image_pull_policy: Kubernetes image pull policy

275

additional_env_config_maps: Additional ConfigMaps for environment variables

276

277

Returns:

278

Engine configuration dictionary

279

"""

280

281

def get_celery_engine_job_config(

282

image_pull_policy=None,

283

additional_env_config_maps=None

284

):

285

"""

286

Get job configuration for Celery K8s execution.

287

288

Args:

289

image_pull_policy: Kubernetes image pull policy

290

additional_env_config_maps: Additional ConfigMaps for environment variables

291

292

Returns:

293

Job configuration dictionary

294

"""

295

```

296

297

### CeleryK8sJobExecutor

298

299

The executor implementation class that handles distributed step execution.

300

301

```python { .api }

302

class CeleryK8sJobExecutor(Executor):

303

"""

304

Executor that runs steps as Kubernetes Jobs via Celery task queuing.

305

306

This executor works in concert with CeleryK8sRunLauncher to provide

307

distributed execution of Dagster steps.

308

"""

309

310

def __init__(

311

self,

312

retries,

313

broker=None,

314

backend=None,

315

include=None,

316

config_source=None,

317

job_config=None,

318

job_namespace=None,

319

load_incluster_config=False,

320

kubeconfig_file=None,

321

repo_location_name=None,

322

job_wait_timeout=None,

323

per_step_k8s_config=None,

324

):

325

"""

326

Initialize the CeleryK8sJobExecutor.

327

328

Args:

329

retries: Retry mode configuration

330

broker: Celery broker URL

331

backend: Celery backend URL

332

include: List of modules for Celery workers to import

333

config_source: Additional Celery configuration dictionary

334

job_config: Kubernetes job configuration object

335

job_namespace: Kubernetes namespace for jobs

336

load_incluster_config: Whether to load Kubernetes config from within cluster

337

kubeconfig_file: Path to kubeconfig file

338

repo_location_name: Repository location name for execution

339

job_wait_timeout: Timeout for job completion in seconds

340

per_step_k8s_config: Per-step Kubernetes configuration overrides

341

"""

342

343

def execute(self, plan_context, execution_plan):

344

"""

345

Execute the execution plan using Celery task distribution.

346

347

Args:

348

plan_context: Pipeline execution context

349

execution_plan: Execution plan to run

350

351

Returns:

352

Generator of execution events

353

"""

354

355

def app_args(self):

356

"""

357

Return arguments for Celery app configuration.

358

359

Returns:

360

Dictionary with broker, backend, include, config_source, and retries

361

"""

362

363

@property

364

def retries(self):

365

"""Return retry mode configuration."""

366

```

367

368

### Celery Application

369

370

Pre-configured Celery application for K8s job execution.

371

372

```python { .api }

373

app: celery.Celery

374

"""Pre-configured Celery application with task routes for K8s job execution."""

375

376

execute_step_k8s_job: celery.Task

377

"""Celery task instance for executing steps as K8s jobs."""

378

```

379

380

### Constants

381

382

```python { .api }

383

CELERY_K8S_CONFIG_KEY = "celery-k8s"

384

"""Configuration key for celery-k8s executor."""

385

386

__version__ = "0.27.9"

387

"""Package version."""

388

```

389

390

## Configuration Options

391

392

The executor supports extensive configuration for both Celery and Kubernetes:

393

394

### Kubernetes Configuration

395

- `job_image`: Docker image for step execution jobs

396

- `job_namespace`: Kubernetes namespace for jobs (default: "default")

397

- `load_incluster_config`: Load K8s config from within cluster (default: True)

398

- `kubeconfig_file`: Path to kubeconfig file

399

- `job_wait_timeout`: Timeout for job completion (default: 4 hours)

400

- `env_config_maps`: ConfigMaps to mount as environment variables

401

- `env_secrets`: Secrets to mount as environment variables

402

- `volume_mounts`: Volume mounts for job pods

403

- `volumes`: Volumes for job pods

404

- `service_account_name`: Kubernetes service account

405

- `image_pull_policy`: Image pull policy (default: "IfNotPresent")

406

- `image_pull_secrets`: Image pull secrets

407

- `labels`: Labels for Kubernetes resources

408

409

### Celery Configuration

410

- `broker`: Celery broker URL (e.g., "pyamqp://guest@localhost//")

411

- `backend`: Celery results backend URL (e.g., "rpc://")

412

- `include`: Modules for Celery workers to import

413

- `config_source`: Additional Celery configuration dictionary

414

- `retries`: Retry configuration for failed tasks

415

416

### Per-Step Configuration

417

- `per_step_k8s_config`: Per-operation Kubernetes configuration overrides

418

419

## Usage Examples

420

421

### Basic Job Definition

422

423

```python

424

from dagster import op, job

425

from dagster_celery_k8s import celery_k8s_job_executor

426

427

@op

428

def process_data():

429

return "processed"

430

431

@op

432

def save_results(data: str):

433

print(f"Saving: {data}")

434

435

@job(executor_def=celery_k8s_job_executor)

436

def my_pipeline():

437

save_results(process_data())

438

```

439

440

### Advanced Configuration

441

442

```python

443

from dagster import job, RunConfig

444

from dagster_celery_k8s import celery_k8s_job_executor

445

446

@job(executor_def=celery_k8s_job_executor)

447

def advanced_pipeline():

448

# Job operations

449

pass

450

451

# Run with specific configuration

452

run_config = RunConfig(

453

execution={

454

"config": {

455

"job_image": "my-registry/my-image:v1.0.0",

456

"job_namespace": "data-processing",

457

"broker": "redis://redis-service:6379/0",

458

"backend": "redis://redis-service:6379/1",

459

"job_wait_timeout": 3600, # 1 hour

460

"per_step_k8s_config": {

461

"heavy_computation": {

462

"container_config": {

463

"resources": {

464

"requests": {"cpu": "2", "memory": "4Gi"},

465

"limits": {"cpu": "4", "memory": "8Gi"}

466

}

467

}

468

}

469

}

470

}

471

}

472

)

473

```

474

475

### Instance Configuration

476

477

Configure in your `dagster.yaml`:

478

479

```yaml

480

run_launcher:

481

module: dagster_k8s.launcher

482

class: CeleryK8sRunLauncher

483

config:

484

# Required configuration

485

instance_config_map: "dagster-instance-config"

486

dagster_home: "/opt/dagster/dagster_home"

487

postgres_password_secret: "dagster-postgres-secret"

488

489

# Celery configuration

490

broker: "redis://redis-service:6379/0"

491

backend: "redis://redis-service:6379/1"

492

493

# Kubernetes configuration

494

job_namespace: "dagster-execution"

495

service_account_name: "dagster-service-account"

496

image_pull_policy: "Always"

497

498

# Resource configuration

499

env_config_maps:

500

- "dagster-env-config"

501

env_secrets:

502

- "dagster-secrets"

503

504

labels:

505

team: "data-engineering"

506

environment: "production"

507

```

508

509

## Error Handling

510

511

The package provides comprehensive error handling for:

512

513

### Kubernetes Errors

514

- Job creation failures and conflicts

515

- Pod scheduling issues

516

- Resource allocation problems

517

- Network connectivity issues

518

- Timeout scenarios

519

520

### Celery Errors

521

- Broker connection failures

522

- Task serialization issues

523

- Worker availability problems

524

- Message queue overflow

525

526

### Common Error Patterns

527

528

```python

529

from dagster import DagsterInvariantViolationError

530

531

# Image configuration error

532

if not job_image:

533

raise DagsterInvariantViolationError(

534

"You have not specified a job_image in your executor configuration. "

535

"Specify the job_image in the executor config section."

536

)

537

538

# Compatibility error

539

if not isinstance(run_launcher, CeleryK8sRunLauncher):

540

raise DagsterUnmetExecutorRequirementsError(

541

"This executor is only compatible with a CeleryK8sRunLauncher; "

542

"configure the CeleryK8sRunLauncher on your instance to use it."

543

)

544

```

545

546

## Integration Requirements

547

548

### Dependencies

549

- dagster==1.11.9

550

- dagster-k8s==0.27.9

551

- dagster-celery==0.27.9

552

- kubernetes (Python client)

553

554

### Environment Setup

555

1. **Kubernetes Cluster**: Access to a Kubernetes cluster

556

2. **Celery Broker**: Redis or RabbitMQ broker service

557

3. **Dagster Instance**: PostgreSQL-backed Dagster instance

558

4. **Container Registry**: Access to push/pull Docker images

559

5. **RBAC**: Kubernetes permissions for job creation and management

560

561

### Deployment Considerations

562

- Celery workers must be deployed with `-A dagster_celery_k8s.app` argument

563

- Kubernetes Jobs require appropriate resource quotas and limits

564

- Network policies must allow communication between components

565

- Persistent volumes may be needed for data sharing between steps