or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-dagster--celery--docker

A Dagster executor that enables running Dagster steps within Docker containers orchestrated by Celery for scalable distributed pipeline execution

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dagster-celery-docker@0.27.x

To install, run

npx @tessl/cli install tessl/pypi-dagster--celery--docker@0.27.0

0

# Dagster Celery Docker

1

2

A Dagster executor that enables running Dagster steps within Docker containers orchestrated by Celery. This integration combines the distributed task execution capabilities of Celery with Docker containerization for isolated, scalable pipeline execution. The executor allows configuration of Docker images, registries, environment variables, and container networking while maintaining integration with Dagster's event and logging systems.

3

4

## Package Information

5

6

- **Package Name**: dagster-celery-docker

7

- **Package Type**: pypi

8

- **Language**: Python

9

- **Installation**: `pip install dagster-celery-docker`

10

11

## Core Imports

12

13

```python

14

from dagster_celery_docker import celery_docker_executor

15

```

16

17

For accessing version information:

18

19

```python

20

from dagster_celery_docker import __version__

21

```

22

23

For the Celery app (used in worker deployment):

24

25

```python

26

from dagster_celery_docker import app

27

```

28

29

## Basic Usage

30

31

```python

32

from dagster import job, op

33

from dagster_celery_docker import celery_docker_executor

34

35

@op

36

def my_op():

37

return "Hello from Docker container!"

38

39

@job(executor_def=celery_docker_executor)

40

def my_docker_job():

41

my_op()

42

43

# Execute with configuration

44

if __name__ == "__main__":

45

result = my_docker_job.execute_in_process(

46

run_config={

47

"execution": {

48

"config": {

49

"docker": {

50

"image": "python:3.9-slim",

51

"env_vars": ["DAGSTER_HOME"],

52

"container_kwargs": {

53

"auto_remove": True

54

}

55

},

56

"broker": "redis://localhost:6379/0",

57

"backend": "redis://localhost:6379/0"

58

}

59

}

60

}

61

)

62

```

63

64

## Architecture

65

66

The dagster-celery-docker integration extends Dagster's execution model with containerized distributed computing:

67

68

- **CeleryDockerExecutor**: Main executor class that orchestrates Docker container execution via Celery workers

69

- **Task Submission**: Steps are packaged as Celery tasks with Docker execution parameters

70

- **Container Management**: Automatic Docker image pulling, container lifecycle management, and cleanup

71

- **Event Integration**: Container execution events are reported back to Dagster's event system

72

- **Error Handling**: Docker container errors are captured and reported through Dagster's standard error handling

73

74

This design enables scalable pipeline execution across multiple worker nodes while providing the isolation and consistency of containerized environments.

75

76

## Capabilities

77

78

### Executor Definition

79

80

The main executor function that creates a Celery-based executor for running Dagster steps in Docker containers.

81

82

```python { .api }

83

@executor(

84

name="celery-docker",

85

config_schema=celery_docker_config(),

86

requirements=multiple_process_executor_requirements(),

87

)

88

def celery_docker_executor(init_context):

89

"""

90

Celery-based executor which launches tasks in docker containers.

91

92

The Celery executor exposes config settings for the underlying Celery app under

93

the ``config_source`` key. This config corresponds to the \"new lowercase settings\" introduced

94

in Celery version 4.0 and the object constructed from config will be passed to the

95

:py:class:`celery.Celery` constructor as its ``config_source`` argument.

96

97

Parameters:

98

- init_context: ExecutorInitContext from Dagster

99

100

Returns:

101

CeleryDockerExecutor instance configured with the provided settings

102

103

Configuration Schema:

104

- docker: Dict[str, Any] (required) - Docker configuration

105

- image: StringSource (optional) - Docker image for step execution

106

- registry: Dict[str, StringSource] (optional) - Registry configuration

107

- url: StringSource - Registry URL

108

- username: StringSource - Registry username

109

- password: StringSource - Registry password

110

- env_vars: List[str] (optional) - Environment variables to forward to container

111

- network: str (optional) - Docker network name for container

112

- container_kwargs: Permissive (optional) - Additional Docker container arguments

113

- broker: str (optional) - Celery broker URL

114

- backend: str (optional) - Celery results backend URL

115

- include: List[str] (optional) - Modules for workers to import

116

- config_source: Dict[str, Any] (optional) - Additional Celery configuration

117

- retries: RetryMode config (optional) - Retry configuration

118

"""

119

```

120

121

### Version Information

122

123

Package version constant for version checking and compatibility verification.

124

125

```python { .api }

126

__version__: str

127

```

128

129

The current version string for the dagster-celery-docker package.

130

131

### Celery App

132

133

The configured Celery application instance used by workers for task execution.

134

135

```python { .api }

136

app: celery.Celery

137

```

138

139

Celery app instance configured with task routes for docker step execution. Used when starting Celery workers with the `-A dagster_celery_docker.app` argument.

140

141

### Executor Class

142

143

The main executor class that orchestrates Docker container execution via Celery workers.

144

145

```python { .api }

146

class CeleryDockerExecutor(Executor):

147

def __init__(self, retries, docker_config, broker=None, backend=None, include=None, config_source=None):

148

"""

149

Initialize the Celery Docker executor.

150

151

Args:

152

retries: RetryMode instance for retry configuration

153

docker_config: Dict containing Docker configuration

154

broker: Optional Celery broker URL

155

backend: Optional Celery results backend URL

156

include: Optional list of modules for workers to import

157

config_source: Optional additional Celery configuration

158

"""

159

160

def execute(self, plan_context, execution_plan):

161

"""Execute the given execution plan using Celery workers and Docker containers."""

162

163

def app_args(self):

164

"""Return arguments for Celery app configuration."""

165

```

166

167

## Configuration Schema

168

169

### Docker Configuration (Required)

170

171

The executor requires Docker configuration to specify how containers should be created and managed:

172

173

```yaml

174

execution:

175

config:

176

docker:

177

image: 'my-repo.com/my-image:latest' # Docker image for step execution

178

registry: # Optional registry authentication

179

url: 'my-repo.com'

180

username: 'my-user'

181

password: {env: 'DOCKER_PASSWORD'}

182

env_vars: ["DAGSTER_HOME", "AWS_PROFILE"] # Environment variables to pass

183

network: 'my-network' # Docker network to connect container

184

container_kwargs: # Additional Docker container arguments

185

volumes: ['/host/path:/container/path']

186

memory: '1g'

187

cpu_count: 2

188

```

189

190

### Celery Configuration (Optional)

191

192

Standard Celery configuration options inherited from dagster-celery:

193

194

```yaml

195

execution:

196

config:

197

broker: 'redis://localhost:6379/0' # Celery message broker

198

backend: 'redis://localhost:6379/0' # Celery results backend

199

include: ['my_module'] # Modules for workers to import

200

config_source: # Additional Celery worker configuration

201

task_serializer: 'json'

202

result_serializer: 'json'

203

task_routes:

204

'my_task': {'queue': 'priority'}

205

```

206

207

### Retry Configuration (Optional)

208

209

Configure retry behavior for failed step executions:

210

211

```yaml

212

execution:

213

config:

214

retries:

215

enabled: true

216

max_retries: 3

217

retry_delay: 60 # seconds

218

```

219

220

## Usage Examples

221

222

### Basic Docker Execution

223

224

```python

225

from dagster import job, op

226

from dagster_celery_docker import celery_docker_executor

227

228

@op

229

def process_data():

230

import pandas as pd

231

# Data processing logic

232

return {"status": "processed"}

233

234

@job(executor_def=celery_docker_executor)

235

def data_pipeline():

236

process_data()

237

238

# Run with minimal configuration

239

result = data_pipeline.execute_in_process(

240

run_config={

241

"execution": {

242

"config": {

243

"docker": {

244

"image": "python:3.9-slim"

245

}

246

}

247

}

248

}

249

)

250

```

251

252

### Advanced Configuration with Registry and Environment

253

254

```python

255

from dagster import job, op, Config

256

from dagster_celery_docker import celery_docker_executor

257

258

class ProcessingConfig(Config):

259

input_path: str

260

output_path: str

261

262

@op

263

def secure_processing(config: ProcessingConfig):

264

# Processing that requires specific environment and credentials

265

return f"Processed {config.input_path} -> {config.output_path}"

266

267

@job(executor_def=celery_docker_executor)

268

def secure_pipeline():

269

secure_processing()

270

271

# Run with full configuration

272

result = secure_pipeline.execute_in_process(

273

run_config={

274

"ops": {

275

"secure_processing": {

276

"config": {

277

"input_path": "/data/input.csv",

278

"output_path": "/data/output.csv"

279

}

280

}

281

},

282

"execution": {

283

"config": {

284

"docker": {

285

"image": "my-company.com/data-processor:v1.2.3",

286

"registry": {

287

"url": "my-company.com",

288

"username": "deploy-user",

289

"password": {"env": "REGISTRY_PASSWORD"}

290

},

291

"env_vars": ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "DAGSTER_HOME"],

292

"network": "data-processing-network",

293

"container_kwargs": {

294

"volumes": ["/host/data:/data", "/host/tmp:/tmp"],

295

"memory": "4g",

296

"cpu_count": 4,

297

"auto_remove": True

298

}

299

},

300

"broker": "redis://redis-cluster:6379/0",

301

"backend": "redis://redis-cluster:6379/1",

302

"retries": {

303

"enabled": True,

304

"max_retries": 3,

305

"retry_delay": 120

306

}

307

}

308

}

309

}

310

)

311

```

312

313

### Multi-Worker Distributed Execution

314

315

```python

316

from dagster import job, op, DynamicOut, DynamicOutput

317

from dagster_celery_docker import celery_docker_executor

318

from typing import List

319

320

@op(out=DynamicOut())

321

def split_work() -> List[DynamicOutput]:

322

# Create multiple work items that can be distributed

323

work_items = [f"task_{i}" for i in range(10)]

324

return [DynamicOutput(item, mapping_key=str(i)) for i, item in enumerate(work_items)]

325

326

@op

327

def process_item(item: str) -> str:

328

# Each item processed in its own Docker container across workers

329

import time

330

time.sleep(5) # Simulate processing

331

return f"processed_{item}"

332

333

@op

334

def combine_results(results: List[str]) -> str:

335

return f"Combined {len(results)} results: {', '.join(results[:3])}..."

336

337

@job(executor_def=celery_docker_executor)

338

def distributed_pipeline():

339

results = split_work().map(process_item)

340

combine_results(results.collect())

341

342

# Execute with worker scaling configuration

343

result = distributed_pipeline.execute_in_process(

344

run_config={

345

"execution": {

346

"config": {

347

"docker": {

348

"image": "python:3.9-slim",

349

"container_kwargs": {

350

"cpu_count": 2,

351

"memory": "2g"

352

}

353

},

354

"broker": "amqp://guest@rabbitmq:5672//",

355

"backend": "rpc://",

356

"config_source": {

357

"worker_prefetch_multiplier": 1,

358

"task_acks_late": True,

359

"worker_max_tasks_per_child": 100

360

}

361

}

362

}

363

}

364

)

365

```

366

367

## Error Handling

368

369

The executor handles various error conditions and reports them through Dagster's event system:

370

371

- **Container Creation Errors**: Issues with Docker image pulling, registry authentication, or container configuration

372

- **Container Execution Errors**: Runtime errors within containers, including non-zero exit codes

373

- **Network Errors**: Container networking issues or communication failures

374

- **Resource Errors**: Insufficient memory, CPU, or disk space for container execution

375

- **Celery Worker Errors**: Worker disconnections, task routing failures, or broker communication issues

376

377

All errors are captured with detailed metadata and reported as Dagster engine events, maintaining full observability of distributed execution.

378

379

## Dependencies

380

381

- **dagster**: Core Dagster framework (==1.11.9)

382

- **dagster-celery**: Celery integration for Dagster (==0.27.9)

383

- **dagster-graphql**: GraphQL support for Dagster (==1.11.9)

384

- **docker**: Docker Python client for container management

385

386

## Deployment Considerations

387

388

### Celery Worker Setup

389

390

Workers must be started with the dagster-celery-docker app:

391

392

```bash

393

celery -A dagster_celery_docker.app worker --loglevel=info --queues=dagster

394

```

395

396

### Docker Access

397

398

Workers need Docker daemon access:

399

- Docker socket mounted: `-v /var/run/docker.sock:/var/run/docker.sock`

400

- Or Docker-in-Docker setup for containerized workers

401

- Appropriate permissions for Docker operations

402

403

### Network Configuration

404

405

- Containers need network access to Dagster instance

406

- Consider container networking for inter-step communication

407

- Security implications of container network access

408

409

### Resource Management

410

411

- Configure appropriate container resource limits

412

- Monitor Docker disk usage for container images and volumes

413

- Plan for concurrent container execution resource requirements