or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-dagster--celery--docker

A Dagster executor that enables running Dagster steps within Docker containers orchestrated by Celery for scalable distributed pipeline execution

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dagster-celery-docker@0.27.x

To install, run

npx @tessl/cli install tessl/pypi-dagster--celery--docker@0.27.0

index.mddocs/

Dagster Celery Docker

A Dagster executor that enables running Dagster steps within Docker containers orchestrated by Celery. This integration combines the distributed task execution capabilities of Celery with Docker containerization for isolated, scalable pipeline execution. The executor allows configuration of Docker images, registries, environment variables, and container networking while maintaining integration with Dagster's event and logging systems.

Package Information

  • Package Name: dagster-celery-docker
  • Package Type: pypi
  • Language: Python
  • Installation: pip install dagster-celery-docker

Core Imports

from dagster_celery_docker import celery_docker_executor

For accessing version information:

from dagster_celery_docker import __version__

For the Celery app (used in worker deployment):

from dagster_celery_docker import app

Basic Usage

from dagster import job, op
from dagster_celery_docker import celery_docker_executor

@op
def my_op():
    return "Hello from Docker container!"

@job(executor_def=celery_docker_executor)
def my_docker_job():
    my_op()

# Execute with configuration
if __name__ == "__main__":
    result = my_docker_job.execute_in_process(
        run_config={
            "execution": {
                "config": {
                    "docker": {
                        "image": "python:3.9-slim",
                        "env_vars": ["DAGSTER_HOME"],
                        "container_kwargs": {
                            "auto_remove": True
                        }
                    },
                    "broker": "redis://localhost:6379/0",
                    "backend": "redis://localhost:6379/0"
                }
            }
        }
    )

Architecture

The dagster-celery-docker integration extends Dagster's execution model with containerized distributed computing:

  • CeleryDockerExecutor: Main executor class that orchestrates Docker container execution via Celery workers
  • Task Submission: Steps are packaged as Celery tasks with Docker execution parameters
  • Container Management: Automatic Docker image pulling, container lifecycle management, and cleanup
  • Event Integration: Container execution events are reported back to Dagster's event system
  • Error Handling: Docker container errors are captured and reported through Dagster's standard error handling

This design enables scalable pipeline execution across multiple worker nodes while providing the isolation and consistency of containerized environments.

Capabilities

Executor Definition

The main executor function that creates a Celery-based executor for running Dagster steps in Docker containers.

@executor(
    name="celery-docker",
    config_schema=celery_docker_config(),
    requirements=multiple_process_executor_requirements(),
)
def celery_docker_executor(init_context):
    """
    Celery-based executor which launches tasks in docker containers.
    
    The Celery executor exposes config settings for the underlying Celery app under
    the ``config_source`` key. This config corresponds to the \"new lowercase settings\" introduced
    in Celery version 4.0 and the object constructed from config will be passed to the
    :py:class:`celery.Celery` constructor as its ``config_source`` argument.
    
    Parameters:
    - init_context: ExecutorInitContext from Dagster
    
    Returns:
    CeleryDockerExecutor instance configured with the provided settings
    
    Configuration Schema:
    - docker: Dict[str, Any] (required) - Docker configuration
        - image: StringSource (optional) - Docker image for step execution
        - registry: Dict[str, StringSource] (optional) - Registry configuration
            - url: StringSource - Registry URL
            - username: StringSource - Registry username  
            - password: StringSource - Registry password
        - env_vars: List[str] (optional) - Environment variables to forward to container
        - network: str (optional) - Docker network name for container
        - container_kwargs: Permissive (optional) - Additional Docker container arguments
    - broker: str (optional) - Celery broker URL
    - backend: str (optional) - Celery results backend URL  
    - include: List[str] (optional) - Modules for workers to import
    - config_source: Dict[str, Any] (optional) - Additional Celery configuration
    - retries: RetryMode config (optional) - Retry configuration
    """

Version Information

Package version constant for version checking and compatibility verification.

__version__: str

The current version string for the dagster-celery-docker package.

Celery App

The configured Celery application instance used by workers for task execution.

app: celery.Celery

Celery app instance configured with task routes for docker step execution. Used when starting Celery workers with the -A dagster_celery_docker.app argument.

Executor Class

The main executor class that orchestrates Docker container execution via Celery workers.

class CeleryDockerExecutor(Executor):
    def __init__(self, retries, docker_config, broker=None, backend=None, include=None, config_source=None):
        """
        Initialize the Celery Docker executor.
        
        Args:
            retries: RetryMode instance for retry configuration
            docker_config: Dict containing Docker configuration
            broker: Optional Celery broker URL
            backend: Optional Celery results backend URL
            include: Optional list of modules for workers to import
            config_source: Optional additional Celery configuration
        """
    
    def execute(self, plan_context, execution_plan):
        """Execute the given execution plan using Celery workers and Docker containers."""
    
    def app_args(self):
        """Return arguments for Celery app configuration."""

Configuration Schema

Docker Configuration (Required)

The executor requires Docker configuration to specify how containers should be created and managed:

execution:
  config:
    docker:
      image: 'my-repo.com/my-image:latest'  # Docker image for step execution
      registry:  # Optional registry authentication
        url: 'my-repo.com'
        username: 'my-user'
        password: {env: 'DOCKER_PASSWORD'}
      env_vars: ["DAGSTER_HOME", "AWS_PROFILE"]  # Environment variables to pass
      network: 'my-network'  # Docker network to connect container
      container_kwargs:  # Additional Docker container arguments
        volumes: ['/host/path:/container/path']
        memory: '1g'
        cpu_count: 2

Celery Configuration (Optional)

Standard Celery configuration options inherited from dagster-celery:

execution:
  config:
    broker: 'redis://localhost:6379/0'  # Celery message broker
    backend: 'redis://localhost:6379/0'  # Celery results backend
    include: ['my_module']  # Modules for workers to import
    config_source:  # Additional Celery worker configuration
      task_serializer: 'json'
      result_serializer: 'json'
      task_routes:
        'my_task': {'queue': 'priority'}

Retry Configuration (Optional)

Configure retry behavior for failed step executions:

execution:
  config:
    retries:
      enabled: true
      max_retries: 3
      retry_delay: 60  # seconds

Usage Examples

Basic Docker Execution

from dagster import job, op
from dagster_celery_docker import celery_docker_executor

@op
def process_data():
    import pandas as pd
    # Data processing logic
    return {"status": "processed"}

@job(executor_def=celery_docker_executor)
def data_pipeline():
    process_data()

# Run with minimal configuration
result = data_pipeline.execute_in_process(
    run_config={
        "execution": {
            "config": {
                "docker": {
                    "image": "python:3.9-slim"
                }
            }
        }
    }
)

Advanced Configuration with Registry and Environment

from dagster import job, op, Config
from dagster_celery_docker import celery_docker_executor

class ProcessingConfig(Config):
    input_path: str
    output_path: str

@op
def secure_processing(config: ProcessingConfig):
    # Processing that requires specific environment and credentials
    return f"Processed {config.input_path} -> {config.output_path}"

@job(executor_def=celery_docker_executor)
def secure_pipeline():
    secure_processing()

# Run with full configuration
result = secure_pipeline.execute_in_process(
    run_config={
        "ops": {
            "secure_processing": {
                "config": {
                    "input_path": "/data/input.csv",
                    "output_path": "/data/output.csv"
                }
            }
        },
        "execution": {
            "config": {
                "docker": {
                    "image": "my-company.com/data-processor:v1.2.3",
                    "registry": {
                        "url": "my-company.com",
                        "username": "deploy-user",
                        "password": {"env": "REGISTRY_PASSWORD"}
                    },
                    "env_vars": ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "DAGSTER_HOME"],
                    "network": "data-processing-network",
                    "container_kwargs": {
                        "volumes": ["/host/data:/data", "/host/tmp:/tmp"],
                        "memory": "4g",
                        "cpu_count": 4,
                        "auto_remove": True
                    }
                },
                "broker": "redis://redis-cluster:6379/0",
                "backend": "redis://redis-cluster:6379/1",
                "retries": {
                    "enabled": True,
                    "max_retries": 3,
                    "retry_delay": 120
                }
            }
        }
    }
)

Multi-Worker Distributed Execution

from dagster import job, op, DynamicOut, DynamicOutput
from dagster_celery_docker import celery_docker_executor
from typing import List

@op(out=DynamicOut())
def split_work() -> List[DynamicOutput]:
    # Create multiple work items that can be distributed
    work_items = [f"task_{i}" for i in range(10)]
    return [DynamicOutput(item, mapping_key=str(i)) for i, item in enumerate(work_items)]

@op
def process_item(item: str) -> str:
    # Each item processed in its own Docker container across workers
    import time
    time.sleep(5)  # Simulate processing
    return f"processed_{item}"

@op
def combine_results(results: List[str]) -> str:
    return f"Combined {len(results)} results: {', '.join(results[:3])}..."

@job(executor_def=celery_docker_executor)
def distributed_pipeline():
    results = split_work().map(process_item)
    combine_results(results.collect())

# Execute with worker scaling configuration
result = distributed_pipeline.execute_in_process(
    run_config={
        "execution": {
            "config": {
                "docker": {
                    "image": "python:3.9-slim",
                    "container_kwargs": {
                        "cpu_count": 2,
                        "memory": "2g"
                    }
                },
                "broker": "amqp://guest@rabbitmq:5672//",
                "backend": "rpc://",
                "config_source": {
                    "worker_prefetch_multiplier": 1,
                    "task_acks_late": True,
                    "worker_max_tasks_per_child": 100
                }
            }
        }
    }
)

Error Handling

The executor handles various error conditions and reports them through Dagster's event system:

  • Container Creation Errors: Issues with Docker image pulling, registry authentication, or container configuration
  • Container Execution Errors: Runtime errors within containers, including non-zero exit codes
  • Network Errors: Container networking issues or communication failures
  • Resource Errors: Insufficient memory, CPU, or disk space for container execution
  • Celery Worker Errors: Worker disconnections, task routing failures, or broker communication issues

All errors are captured with detailed metadata and reported as Dagster engine events, maintaining full observability of distributed execution.

Dependencies

  • dagster: Core Dagster framework (==1.11.9)
  • dagster-celery: Celery integration for Dagster (==0.27.9)
  • dagster-graphql: GraphQL support for Dagster (==1.11.9)
  • docker: Docker Python client for container management

Deployment Considerations

Celery Worker Setup

Workers must be started with the dagster-celery-docker app:

celery -A dagster_celery_docker.app worker --loglevel=info --queues=dagster

Docker Access

Workers need Docker daemon access:

  • Docker socket mounted: -v /var/run/docker.sock:/var/run/docker.sock
  • Or Docker-in-Docker setup for containerized workers
  • Appropriate permissions for Docker operations

Network Configuration

  • Containers need network access to Dagster instance
  • Consider container networking for inter-step communication
  • Security implications of container network access

Resource Management

  • Configure appropriate container resource limits
  • Monitor Docker disk usage for container images and volumes
  • Plan for concurrent container execution resource requirements