or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-dagster-celery-k8s

A Dagster integration for celery-k8s-executor that provides Kubernetes-native distributed execution system combining Celery task queuing with Kubernetes job orchestration

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dagster-celery-k8s@0.27.x

To install, run

npx @tessl/cli install tessl/pypi-dagster-celery-k8s@0.27.0

index.mddocs/

Dagster Celery K8s

A Dagster integration that provides a Kubernetes-native distributed execution system by combining Celery task queuing with Kubernetes job orchestration. It enables scalable data pipeline execution through a two-tier architecture where a run worker Kubernetes Job traverses the execution plan and submits individual steps to Celery queues, while Celery workers spawn separate Kubernetes Jobs for each step execution.

Package Information

  • Package Name: dagster-celery-k8s
  • Language: Python
  • Installation: pip install dagster-celery-k8s
  • Version: 0.27.9

Core Imports

from dagster_celery_k8s import CeleryK8sRunLauncher, celery_k8s_job_executor

Import version information:

from dagster_celery_k8s import __version__

Import types for full type annotations:

from typing import Optional, List, Dict, Any
from dagster import DagsterRun, LaunchRunContext
from dagster._core.launcher.base import CheckRunHealthResult
from dagster._serdes import ConfigurableClassData
from dagster_k8s import DagsterK8sJobConfig

Basic Usage

from dagster import job
from dagster_celery_k8s import celery_k8s_job_executor

# Define a job using the Celery K8s executor
@job(executor_def=celery_k8s_job_executor)
def my_distributed_job():
    # Your job operations here
    pass

Configure in Dagster instance (dagster.yaml):

run_launcher:
  module: dagster_k8s.launcher
  class: CeleryK8sRunLauncher
  config:
    instance_config_map: "dagster-k8s-instance-config-map"
    dagster_home: "/opt/dagster/dagster_home"
    postgres_password_secret: "dagster-k8s-pg-password"
    broker: "pyamqp://guest@localhost//"
    backend: "rpc://"

execution:
  config:
    job_image: 'my_repo.com/image_name:latest'
    job_namespace: 'dagster-execution'
    broker: 'pyamqp://guest@localhost//'
    backend: 'rpc://'

Architecture

The dagster-celery-k8s package implements a two-tier distributed execution architecture:

  1. Run Worker Tier: A Kubernetes Job that traverses the Dagster execution plan and submits individual steps to Celery queues
  2. Step Execution Tier: Celery workers that pick up tasks and spawn separate Kubernetes Jobs for each step execution

This design provides:

  • Scalability: Dynamic resource allocation through Kubernetes
  • Reliability: Container orchestration with Kubernetes job management
  • Flexibility: Celery's distributed task processing capabilities
  • Cloud-native: Optimized for cloud environments with automatic resource management

Capabilities

Run Launcher

The primary component for launching Dagster runs as Kubernetes jobs with Celery task distribution.

class CeleryK8sRunLauncher(RunLauncher, ConfigurableClass):
    """
    Run launcher for Kubernetes-based execution with Celery task queuing.
    
    Launches dagster runs as Kubernetes Jobs that traverse execution plans
    and submit steps to Celery queues for distributed execution.
    """
    
    def __init__(
        self,
        instance_config_map,
        dagster_home,
        postgres_password_secret,
        load_incluster_config=True,
        kubeconfig_file=None,
        broker=None,
        backend=None,
        include=None,
        config_source=None,
        retries=None,
        inst_data: Optional[ConfigurableClassData] = None,
        k8s_client_batch_api=None,
        env_config_maps=None,
        env_secrets=None,
        volume_mounts=None,
        volumes=None,
        service_account_name=None,
        image_pull_policy=None,
        image_pull_secrets=None,
        labels=None,
        fail_pod_on_run_failure=None,
        job_namespace=None,
    ):
        """
        Initialize the CeleryK8sRunLauncher.
        
        Args:
            instance_config_map: Name of the ConfigMap containing instance configuration
            dagster_home: Path to Dagster home directory
            postgres_password_secret: Name of secret containing PostgreSQL password
            load_incluster_config: Whether to load Kubernetes config from within cluster
            kubeconfig_file: Path to kubeconfig file if not using in-cluster config
            broker: Celery broker URL
            backend: Celery backend URL
            include: List of modules for Celery workers to import
            config_source: Additional Celery configuration
            retries: Retry configuration
            inst_data: Configurable class instance data
            k8s_client_batch_api: Override for Kubernetes batch API client
            env_config_maps: List of ConfigMaps to mount as environment variables
            env_secrets: List of Secrets to mount as environment variables
            volume_mounts: List of volume mounts for pods
            volumes: List of volumes for pods
            service_account_name: Kubernetes service account name
            image_pull_policy: Image pull policy for containers
            image_pull_secrets: List of image pull secrets
            labels: Labels to apply to Kubernetes resources
            fail_pod_on_run_failure: Whether to fail pod on run failure
            job_namespace: Kubernetes namespace for jobs
        """
    
    def launch_run(self, context: LaunchRunContext) -> None:
        """
        Launch a Dagster run as a Kubernetes job.
        
        Args:
            context: Launch context containing run information and code origin
        """
    
    def terminate(self, run_id):
        """
        Terminate a running Dagster job.
        
        Args:
            run_id: ID of the run to terminate
            
        Returns:
            True if termination was successful, False if run is already finished,
            None if an exception occurred during termination
        """
    
    def check_run_worker_health(self, run: DagsterRun) -> CheckRunHealthResult:
        """
        Check the health of a run worker.
        
        Args:
            run: The Dagster run to check
            
        Returns:
            Health check result with worker status
        """
    
    @classmethod
    def config_type(cls):
        """Return the configuration schema for this run launcher."""
    
    @classmethod
    def from_config_value(cls, inst_data, config_value):
        """Create instance from configuration values."""
    
    def get_k8s_job_config(self, job_image, exc_config) -> DagsterK8sJobConfig:
        """
        Get Kubernetes job configuration.
        
        Args:
            job_image: Docker image for the job
            exc_config: Executor configuration dictionary
            
        Returns:
            Kubernetes job configuration object
        """
    
    def get_namespace_from_run_config(self, run_id: str) -> str:
        """
        Extract namespace from run configuration.
        
        Args:
            run_id: Run identifier
            
        Returns:
            Kubernetes namespace name
        """
    
    @property
    def supports_check_run_worker_health(self) -> bool:
        """Whether this launcher supports health checks."""
    
    @property
    def inst_data(self) -> Optional[ConfigurableClassData]:
        """Configuration instance data."""

Executor

The executor function that creates the distributed execution engine.

@executor(
    name="celery-k8s",
    config_schema=celery_k8s_executor_config(),
    requirements=multiple_process_executor_requirements(),
)
def celery_k8s_job_executor(init_context):
    """
    Celery-based executor which launches tasks as Kubernetes Jobs.
    
    The executor exposes config settings for the underlying Celery app and
    Kubernetes job configuration. It works in concert with CeleryK8sRunLauncher
    to provide distributed execution where:
    
    1. A run worker Kubernetes Job traverses the dagster run execution plan
       and submits steps to Celery queues for execution
    2. Celery workers pick up step executions and spawn Kubernetes Jobs
       for each step
    
    Args:
        init_context: Executor initialization context containing configuration
        
    Returns:
        CeleryK8sJobExecutor instance configured for distributed execution
    """

Configuration

Configuration utilities for setting up the executor and launcher.

def celery_k8s_executor_config():
    """
    Return configuration schema for celery-k8s executor.
    
    Merges Celery configuration, Kubernetes job configuration, and
    additional celery-k8s specific options.
    
    Returns:
        Configuration dictionary with all available options
    """

def get_celery_engine_config(
    image_pull_policy=None,
    additional_env_config_maps=None
):
    """
    Get engine configuration for Celery K8s execution.
    
    Args:
        image_pull_policy: Kubernetes image pull policy
        additional_env_config_maps: Additional ConfigMaps for environment variables
        
    Returns:
        Engine configuration dictionary
    """

def get_celery_engine_job_config(
    image_pull_policy=None,
    additional_env_config_maps=None
):
    """
    Get job configuration for Celery K8s execution.
    
    Args:
        image_pull_policy: Kubernetes image pull policy
        additional_env_config_maps: Additional ConfigMaps for environment variables
        
    Returns:
        Job configuration dictionary
    """

CeleryK8sJobExecutor

The executor implementation class that handles distributed step execution.

class CeleryK8sJobExecutor(Executor):
    """
    Executor that runs steps as Kubernetes Jobs via Celery task queuing.
    
    This executor works in concert with CeleryK8sRunLauncher to provide
    distributed execution of Dagster steps.
    """
    
    def __init__(
        self,
        retries,
        broker=None,
        backend=None,
        include=None,
        config_source=None,
        job_config=None,
        job_namespace=None,
        load_incluster_config=False,
        kubeconfig_file=None,
        repo_location_name=None,
        job_wait_timeout=None,
        per_step_k8s_config=None,
    ):
        """
        Initialize the CeleryK8sJobExecutor.
        
        Args:
            retries: Retry mode configuration
            broker: Celery broker URL
            backend: Celery backend URL
            include: List of modules for Celery workers to import
            config_source: Additional Celery configuration dictionary
            job_config: Kubernetes job configuration object
            job_namespace: Kubernetes namespace for jobs
            load_incluster_config: Whether to load Kubernetes config from within cluster
            kubeconfig_file: Path to kubeconfig file
            repo_location_name: Repository location name for execution
            job_wait_timeout: Timeout for job completion in seconds
            per_step_k8s_config: Per-step Kubernetes configuration overrides
        """
    
    def execute(self, plan_context, execution_plan):
        """
        Execute the execution plan using Celery task distribution.
        
        Args:
            plan_context: Pipeline execution context
            execution_plan: Execution plan to run
            
        Returns:
            Generator of execution events
        """
    
    def app_args(self):
        """
        Return arguments for Celery app configuration.
        
        Returns:
            Dictionary with broker, backend, include, config_source, and retries
        """
    
    @property
    def retries(self):
        """Return retry mode configuration."""

Celery Application

Pre-configured Celery application for K8s job execution.

app: celery.Celery
"""Pre-configured Celery application with task routes for K8s job execution."""

execute_step_k8s_job: celery.Task
"""Celery task instance for executing steps as K8s jobs."""

Constants

CELERY_K8S_CONFIG_KEY = "celery-k8s"
"""Configuration key for celery-k8s executor."""

__version__ = "0.27.9"
"""Package version."""

Configuration Options

The executor supports extensive configuration for both Celery and Kubernetes:

Kubernetes Configuration

  • job_image: Docker image for step execution jobs
  • job_namespace: Kubernetes namespace for jobs (default: "default")
  • load_incluster_config: Load K8s config from within cluster (default: True)
  • kubeconfig_file: Path to kubeconfig file
  • job_wait_timeout: Timeout for job completion (default: 4 hours)
  • env_config_maps: ConfigMaps to mount as environment variables
  • env_secrets: Secrets to mount as environment variables
  • volume_mounts: Volume mounts for job pods
  • volumes: Volumes for job pods
  • service_account_name: Kubernetes service account
  • image_pull_policy: Image pull policy (default: "IfNotPresent")
  • image_pull_secrets: Image pull secrets
  • labels: Labels for Kubernetes resources

Celery Configuration

  • broker: Celery broker URL (e.g., "pyamqp://guest@localhost//")
  • backend: Celery results backend URL (e.g., "rpc://")
  • include: Modules for Celery workers to import
  • config_source: Additional Celery configuration dictionary
  • retries: Retry configuration for failed tasks

Per-Step Configuration

  • per_step_k8s_config: Per-operation Kubernetes configuration overrides

Usage Examples

Basic Job Definition

from dagster import op, job
from dagster_celery_k8s import celery_k8s_job_executor

@op
def process_data():
    return "processed"

@op 
def save_results(data: str):
    print(f"Saving: {data}")

@job(executor_def=celery_k8s_job_executor)
def my_pipeline():
    save_results(process_data())

Advanced Configuration

from dagster import job, RunConfig
from dagster_celery_k8s import celery_k8s_job_executor

@job(executor_def=celery_k8s_job_executor)
def advanced_pipeline():
    # Job operations
    pass

# Run with specific configuration
run_config = RunConfig(
    execution={
        "config": {
            "job_image": "my-registry/my-image:v1.0.0",
            "job_namespace": "data-processing",
            "broker": "redis://redis-service:6379/0",
            "backend": "redis://redis-service:6379/1",
            "job_wait_timeout": 3600,  # 1 hour
            "per_step_k8s_config": {
                "heavy_computation": {
                    "container_config": {
                        "resources": {
                            "requests": {"cpu": "2", "memory": "4Gi"},
                            "limits": {"cpu": "4", "memory": "8Gi"}
                        }
                    }
                }
            }
        }
    }
)

Instance Configuration

Configure in your dagster.yaml:

run_launcher:
  module: dagster_k8s.launcher
  class: CeleryK8sRunLauncher
  config:
    # Required configuration
    instance_config_map: "dagster-instance-config"
    dagster_home: "/opt/dagster/dagster_home"
    postgres_password_secret: "dagster-postgres-secret"
    
    # Celery configuration
    broker: "redis://redis-service:6379/0"
    backend: "redis://redis-service:6379/1"
    
    # Kubernetes configuration
    job_namespace: "dagster-execution"
    service_account_name: "dagster-service-account"
    image_pull_policy: "Always"
    
    # Resource configuration
    env_config_maps:
      - "dagster-env-config"
    env_secrets:
      - "dagster-secrets"
    
    labels:
      team: "data-engineering"
      environment: "production"

Error Handling

The package provides comprehensive error handling for:

Kubernetes Errors

  • Job creation failures and conflicts
  • Pod scheduling issues
  • Resource allocation problems
  • Network connectivity issues
  • Timeout scenarios

Celery Errors

  • Broker connection failures
  • Task serialization issues
  • Worker availability problems
  • Message queue overflow

Common Error Patterns

from dagster import DagsterInvariantViolationError

# Image configuration error
if not job_image:
    raise DagsterInvariantViolationError(
        "You have not specified a job_image in your executor configuration. "
        "Specify the job_image in the executor config section."
    )

# Compatibility error  
if not isinstance(run_launcher, CeleryK8sRunLauncher):
    raise DagsterUnmetExecutorRequirementsError(
        "This executor is only compatible with a CeleryK8sRunLauncher; "
        "configure the CeleryK8sRunLauncher on your instance to use it."
    )

Integration Requirements

Dependencies

  • dagster==1.11.9
  • dagster-k8s==0.27.9
  • dagster-celery==0.27.9
  • kubernetes (Python client)

Environment Setup

  1. Kubernetes Cluster: Access to a Kubernetes cluster
  2. Celery Broker: Redis or RabbitMQ broker service
  3. Dagster Instance: PostgreSQL-backed Dagster instance
  4. Container Registry: Access to push/pull Docker images
  5. RBAC: Kubernetes permissions for job creation and management

Deployment Considerations

  • Celery workers must be deployed with -A dagster_celery_k8s.app argument
  • Kubernetes Jobs require appropriate resource quotas and limits
  • Network policies must allow communication between components
  • Persistent volumes may be needed for data sharing between steps