A Dagster executor that enables running Dagster steps within Docker containers orchestrated by Celery for scalable distributed pipeline execution
npx @tessl/cli install tessl/pypi-dagster--celery--docker@0.27.0A Dagster executor that enables running Dagster steps within Docker containers orchestrated by Celery. This integration combines the distributed task execution capabilities of Celery with Docker containerization for isolated, scalable pipeline execution. The executor allows configuration of Docker images, registries, environment variables, and container networking while maintaining integration with Dagster's event and logging systems.
pip install dagster-celery-dockerfrom dagster_celery_docker import celery_docker_executorFor accessing version information:
from dagster_celery_docker import __version__For the Celery app (used in worker deployment):
from dagster_celery_docker import appfrom dagster import job, op
from dagster_celery_docker import celery_docker_executor
@op
def my_op():
return "Hello from Docker container!"
@job(executor_def=celery_docker_executor)
def my_docker_job():
my_op()
# Execute with configuration
if __name__ == "__main__":
result = my_docker_job.execute_in_process(
run_config={
"execution": {
"config": {
"docker": {
"image": "python:3.9-slim",
"env_vars": ["DAGSTER_HOME"],
"container_kwargs": {
"auto_remove": True
}
},
"broker": "redis://localhost:6379/0",
"backend": "redis://localhost:6379/0"
}
}
}
)The dagster-celery-docker integration extends Dagster's execution model with containerized distributed computing:
This design enables scalable pipeline execution across multiple worker nodes while providing the isolation and consistency of containerized environments.
The main executor function that creates a Celery-based executor for running Dagster steps in Docker containers.
@executor(
name="celery-docker",
config_schema=celery_docker_config(),
requirements=multiple_process_executor_requirements(),
)
def celery_docker_executor(init_context):
"""
Celery-based executor which launches tasks in docker containers.
The Celery executor exposes config settings for the underlying Celery app under
the ``config_source`` key. This config corresponds to the \"new lowercase settings\" introduced
in Celery version 4.0 and the object constructed from config will be passed to the
:py:class:`celery.Celery` constructor as its ``config_source`` argument.
Parameters:
- init_context: ExecutorInitContext from Dagster
Returns:
CeleryDockerExecutor instance configured with the provided settings
Configuration Schema:
- docker: Dict[str, Any] (required) - Docker configuration
- image: StringSource (optional) - Docker image for step execution
- registry: Dict[str, StringSource] (optional) - Registry configuration
- url: StringSource - Registry URL
- username: StringSource - Registry username
- password: StringSource - Registry password
- env_vars: List[str] (optional) - Environment variables to forward to container
- network: str (optional) - Docker network name for container
- container_kwargs: Permissive (optional) - Additional Docker container arguments
- broker: str (optional) - Celery broker URL
- backend: str (optional) - Celery results backend URL
- include: List[str] (optional) - Modules for workers to import
- config_source: Dict[str, Any] (optional) - Additional Celery configuration
- retries: RetryMode config (optional) - Retry configuration
"""Package version constant for version checking and compatibility verification.
__version__: strThe current version string for the dagster-celery-docker package.
The configured Celery application instance used by workers for task execution.
app: celery.CeleryCelery app instance configured with task routes for docker step execution. Used when starting Celery workers with the -A dagster_celery_docker.app argument.
The main executor class that orchestrates Docker container execution via Celery workers.
class CeleryDockerExecutor(Executor):
def __init__(self, retries, docker_config, broker=None, backend=None, include=None, config_source=None):
"""
Initialize the Celery Docker executor.
Args:
retries: RetryMode instance for retry configuration
docker_config: Dict containing Docker configuration
broker: Optional Celery broker URL
backend: Optional Celery results backend URL
include: Optional list of modules for workers to import
config_source: Optional additional Celery configuration
"""
def execute(self, plan_context, execution_plan):
"""Execute the given execution plan using Celery workers and Docker containers."""
def app_args(self):
"""Return arguments for Celery app configuration."""The executor requires Docker configuration to specify how containers should be created and managed:
execution:
config:
docker:
image: 'my-repo.com/my-image:latest' # Docker image for step execution
registry: # Optional registry authentication
url: 'my-repo.com'
username: 'my-user'
password: {env: 'DOCKER_PASSWORD'}
env_vars: ["DAGSTER_HOME", "AWS_PROFILE"] # Environment variables to pass
network: 'my-network' # Docker network to connect container
container_kwargs: # Additional Docker container arguments
volumes: ['/host/path:/container/path']
memory: '1g'
cpu_count: 2Standard Celery configuration options inherited from dagster-celery:
execution:
config:
broker: 'redis://localhost:6379/0' # Celery message broker
backend: 'redis://localhost:6379/0' # Celery results backend
include: ['my_module'] # Modules for workers to import
config_source: # Additional Celery worker configuration
task_serializer: 'json'
result_serializer: 'json'
task_routes:
'my_task': {'queue': 'priority'}Configure retry behavior for failed step executions:
execution:
config:
retries:
enabled: true
max_retries: 3
retry_delay: 60 # secondsfrom dagster import job, op
from dagster_celery_docker import celery_docker_executor
@op
def process_data():
import pandas as pd
# Data processing logic
return {"status": "processed"}
@job(executor_def=celery_docker_executor)
def data_pipeline():
process_data()
# Run with minimal configuration
result = data_pipeline.execute_in_process(
run_config={
"execution": {
"config": {
"docker": {
"image": "python:3.9-slim"
}
}
}
}
)from dagster import job, op, Config
from dagster_celery_docker import celery_docker_executor
class ProcessingConfig(Config):
input_path: str
output_path: str
@op
def secure_processing(config: ProcessingConfig):
# Processing that requires specific environment and credentials
return f"Processed {config.input_path} -> {config.output_path}"
@job(executor_def=celery_docker_executor)
def secure_pipeline():
secure_processing()
# Run with full configuration
result = secure_pipeline.execute_in_process(
run_config={
"ops": {
"secure_processing": {
"config": {
"input_path": "/data/input.csv",
"output_path": "/data/output.csv"
}
}
},
"execution": {
"config": {
"docker": {
"image": "my-company.com/data-processor:v1.2.3",
"registry": {
"url": "my-company.com",
"username": "deploy-user",
"password": {"env": "REGISTRY_PASSWORD"}
},
"env_vars": ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "DAGSTER_HOME"],
"network": "data-processing-network",
"container_kwargs": {
"volumes": ["/host/data:/data", "/host/tmp:/tmp"],
"memory": "4g",
"cpu_count": 4,
"auto_remove": True
}
},
"broker": "redis://redis-cluster:6379/0",
"backend": "redis://redis-cluster:6379/1",
"retries": {
"enabled": True,
"max_retries": 3,
"retry_delay": 120
}
}
}
}
)from dagster import job, op, DynamicOut, DynamicOutput
from dagster_celery_docker import celery_docker_executor
from typing import List
@op(out=DynamicOut())
def split_work() -> List[DynamicOutput]:
# Create multiple work items that can be distributed
work_items = [f"task_{i}" for i in range(10)]
return [DynamicOutput(item, mapping_key=str(i)) for i, item in enumerate(work_items)]
@op
def process_item(item: str) -> str:
# Each item processed in its own Docker container across workers
import time
time.sleep(5) # Simulate processing
return f"processed_{item}"
@op
def combine_results(results: List[str]) -> str:
return f"Combined {len(results)} results: {', '.join(results[:3])}..."
@job(executor_def=celery_docker_executor)
def distributed_pipeline():
results = split_work().map(process_item)
combine_results(results.collect())
# Execute with worker scaling configuration
result = distributed_pipeline.execute_in_process(
run_config={
"execution": {
"config": {
"docker": {
"image": "python:3.9-slim",
"container_kwargs": {
"cpu_count": 2,
"memory": "2g"
}
},
"broker": "amqp://guest@rabbitmq:5672//",
"backend": "rpc://",
"config_source": {
"worker_prefetch_multiplier": 1,
"task_acks_late": True,
"worker_max_tasks_per_child": 100
}
}
}
}
)The executor handles various error conditions and reports them through Dagster's event system:
All errors are captured with detailed metadata and reported as Dagster engine events, maintaining full observability of distributed execution.
Workers must be started with the dagster-celery-docker app:
celery -A dagster_celery_docker.app worker --loglevel=info --queues=dagsterWorkers need Docker daemon access:
-v /var/run/docker.sock:/var/run/docker.sock