A Dagster integration for celery-k8s-executor that provides Kubernetes-native distributed execution system combining Celery task queuing with Kubernetes job orchestration
npx @tessl/cli install tessl/pypi-dagster-celery-k8s@0.27.0A Dagster integration that provides a Kubernetes-native distributed execution system by combining Celery task queuing with Kubernetes job orchestration. It enables scalable data pipeline execution through a two-tier architecture where a run worker Kubernetes Job traverses the execution plan and submits individual steps to Celery queues, while Celery workers spawn separate Kubernetes Jobs for each step execution.
pip install dagster-celery-k8sfrom dagster_celery_k8s import CeleryK8sRunLauncher, celery_k8s_job_executorImport version information:
from dagster_celery_k8s import __version__Import types for full type annotations:
from typing import Optional, List, Dict, Any
from dagster import DagsterRun, LaunchRunContext
from dagster._core.launcher.base import CheckRunHealthResult
from dagster._serdes import ConfigurableClassData
from dagster_k8s import DagsterK8sJobConfigfrom dagster import job
from dagster_celery_k8s import celery_k8s_job_executor
# Define a job using the Celery K8s executor
@job(executor_def=celery_k8s_job_executor)
def my_distributed_job():
# Your job operations here
passConfigure in Dagster instance (dagster.yaml):
run_launcher:
module: dagster_k8s.launcher
class: CeleryK8sRunLauncher
config:
instance_config_map: "dagster-k8s-instance-config-map"
dagster_home: "/opt/dagster/dagster_home"
postgres_password_secret: "dagster-k8s-pg-password"
broker: "pyamqp://guest@localhost//"
backend: "rpc://"
execution:
config:
job_image: 'my_repo.com/image_name:latest'
job_namespace: 'dagster-execution'
broker: 'pyamqp://guest@localhost//'
backend: 'rpc://'The dagster-celery-k8s package implements a two-tier distributed execution architecture:
This design provides:
The primary component for launching Dagster runs as Kubernetes jobs with Celery task distribution.
class CeleryK8sRunLauncher(RunLauncher, ConfigurableClass):
"""
Run launcher for Kubernetes-based execution with Celery task queuing.
Launches dagster runs as Kubernetes Jobs that traverse execution plans
and submit steps to Celery queues for distributed execution.
"""
def __init__(
self,
instance_config_map,
dagster_home,
postgres_password_secret,
load_incluster_config=True,
kubeconfig_file=None,
broker=None,
backend=None,
include=None,
config_source=None,
retries=None,
inst_data: Optional[ConfigurableClassData] = None,
k8s_client_batch_api=None,
env_config_maps=None,
env_secrets=None,
volume_mounts=None,
volumes=None,
service_account_name=None,
image_pull_policy=None,
image_pull_secrets=None,
labels=None,
fail_pod_on_run_failure=None,
job_namespace=None,
):
"""
Initialize the CeleryK8sRunLauncher.
Args:
instance_config_map: Name of the ConfigMap containing instance configuration
dagster_home: Path to Dagster home directory
postgres_password_secret: Name of secret containing PostgreSQL password
load_incluster_config: Whether to load Kubernetes config from within cluster
kubeconfig_file: Path to kubeconfig file if not using in-cluster config
broker: Celery broker URL
backend: Celery backend URL
include: List of modules for Celery workers to import
config_source: Additional Celery configuration
retries: Retry configuration
inst_data: Configurable class instance data
k8s_client_batch_api: Override for Kubernetes batch API client
env_config_maps: List of ConfigMaps to mount as environment variables
env_secrets: List of Secrets to mount as environment variables
volume_mounts: List of volume mounts for pods
volumes: List of volumes for pods
service_account_name: Kubernetes service account name
image_pull_policy: Image pull policy for containers
image_pull_secrets: List of image pull secrets
labels: Labels to apply to Kubernetes resources
fail_pod_on_run_failure: Whether to fail pod on run failure
job_namespace: Kubernetes namespace for jobs
"""
def launch_run(self, context: LaunchRunContext) -> None:
"""
Launch a Dagster run as a Kubernetes job.
Args:
context: Launch context containing run information and code origin
"""
def terminate(self, run_id):
"""
Terminate a running Dagster job.
Args:
run_id: ID of the run to terminate
Returns:
True if termination was successful, False if run is already finished,
None if an exception occurred during termination
"""
def check_run_worker_health(self, run: DagsterRun) -> CheckRunHealthResult:
"""
Check the health of a run worker.
Args:
run: The Dagster run to check
Returns:
Health check result with worker status
"""
@classmethod
def config_type(cls):
"""Return the configuration schema for this run launcher."""
@classmethod
def from_config_value(cls, inst_data, config_value):
"""Create instance from configuration values."""
def get_k8s_job_config(self, job_image, exc_config) -> DagsterK8sJobConfig:
"""
Get Kubernetes job configuration.
Args:
job_image: Docker image for the job
exc_config: Executor configuration dictionary
Returns:
Kubernetes job configuration object
"""
def get_namespace_from_run_config(self, run_id: str) -> str:
"""
Extract namespace from run configuration.
Args:
run_id: Run identifier
Returns:
Kubernetes namespace name
"""
@property
def supports_check_run_worker_health(self) -> bool:
"""Whether this launcher supports health checks."""
@property
def inst_data(self) -> Optional[ConfigurableClassData]:
"""Configuration instance data."""The executor function that creates the distributed execution engine.
@executor(
name="celery-k8s",
config_schema=celery_k8s_executor_config(),
requirements=multiple_process_executor_requirements(),
)
def celery_k8s_job_executor(init_context):
"""
Celery-based executor which launches tasks as Kubernetes Jobs.
The executor exposes config settings for the underlying Celery app and
Kubernetes job configuration. It works in concert with CeleryK8sRunLauncher
to provide distributed execution where:
1. A run worker Kubernetes Job traverses the dagster run execution plan
and submits steps to Celery queues for execution
2. Celery workers pick up step executions and spawn Kubernetes Jobs
for each step
Args:
init_context: Executor initialization context containing configuration
Returns:
CeleryK8sJobExecutor instance configured for distributed execution
"""Configuration utilities for setting up the executor and launcher.
def celery_k8s_executor_config():
"""
Return configuration schema for celery-k8s executor.
Merges Celery configuration, Kubernetes job configuration, and
additional celery-k8s specific options.
Returns:
Configuration dictionary with all available options
"""
def get_celery_engine_config(
image_pull_policy=None,
additional_env_config_maps=None
):
"""
Get engine configuration for Celery K8s execution.
Args:
image_pull_policy: Kubernetes image pull policy
additional_env_config_maps: Additional ConfigMaps for environment variables
Returns:
Engine configuration dictionary
"""
def get_celery_engine_job_config(
image_pull_policy=None,
additional_env_config_maps=None
):
"""
Get job configuration for Celery K8s execution.
Args:
image_pull_policy: Kubernetes image pull policy
additional_env_config_maps: Additional ConfigMaps for environment variables
Returns:
Job configuration dictionary
"""The executor implementation class that handles distributed step execution.
class CeleryK8sJobExecutor(Executor):
"""
Executor that runs steps as Kubernetes Jobs via Celery task queuing.
This executor works in concert with CeleryK8sRunLauncher to provide
distributed execution of Dagster steps.
"""
def __init__(
self,
retries,
broker=None,
backend=None,
include=None,
config_source=None,
job_config=None,
job_namespace=None,
load_incluster_config=False,
kubeconfig_file=None,
repo_location_name=None,
job_wait_timeout=None,
per_step_k8s_config=None,
):
"""
Initialize the CeleryK8sJobExecutor.
Args:
retries: Retry mode configuration
broker: Celery broker URL
backend: Celery backend URL
include: List of modules for Celery workers to import
config_source: Additional Celery configuration dictionary
job_config: Kubernetes job configuration object
job_namespace: Kubernetes namespace for jobs
load_incluster_config: Whether to load Kubernetes config from within cluster
kubeconfig_file: Path to kubeconfig file
repo_location_name: Repository location name for execution
job_wait_timeout: Timeout for job completion in seconds
per_step_k8s_config: Per-step Kubernetes configuration overrides
"""
def execute(self, plan_context, execution_plan):
"""
Execute the execution plan using Celery task distribution.
Args:
plan_context: Pipeline execution context
execution_plan: Execution plan to run
Returns:
Generator of execution events
"""
def app_args(self):
"""
Return arguments for Celery app configuration.
Returns:
Dictionary with broker, backend, include, config_source, and retries
"""
@property
def retries(self):
"""Return retry mode configuration."""Pre-configured Celery application for K8s job execution.
app: celery.Celery
"""Pre-configured Celery application with task routes for K8s job execution."""
execute_step_k8s_job: celery.Task
"""Celery task instance for executing steps as K8s jobs."""CELERY_K8S_CONFIG_KEY = "celery-k8s"
"""Configuration key for celery-k8s executor."""
__version__ = "0.27.9"
"""Package version."""The executor supports extensive configuration for both Celery and Kubernetes:
job_image: Docker image for step execution jobsjob_namespace: Kubernetes namespace for jobs (default: "default")load_incluster_config: Load K8s config from within cluster (default: True)kubeconfig_file: Path to kubeconfig filejob_wait_timeout: Timeout for job completion (default: 4 hours)env_config_maps: ConfigMaps to mount as environment variablesenv_secrets: Secrets to mount as environment variablesvolume_mounts: Volume mounts for job podsvolumes: Volumes for job podsservice_account_name: Kubernetes service accountimage_pull_policy: Image pull policy (default: "IfNotPresent")image_pull_secrets: Image pull secretslabels: Labels for Kubernetes resourcesbroker: Celery broker URL (e.g., "pyamqp://guest@localhost//")backend: Celery results backend URL (e.g., "rpc://")include: Modules for Celery workers to importconfig_source: Additional Celery configuration dictionaryretries: Retry configuration for failed tasksper_step_k8s_config: Per-operation Kubernetes configuration overridesfrom dagster import op, job
from dagster_celery_k8s import celery_k8s_job_executor
@op
def process_data():
return "processed"
@op
def save_results(data: str):
print(f"Saving: {data}")
@job(executor_def=celery_k8s_job_executor)
def my_pipeline():
save_results(process_data())from dagster import job, RunConfig
from dagster_celery_k8s import celery_k8s_job_executor
@job(executor_def=celery_k8s_job_executor)
def advanced_pipeline():
# Job operations
pass
# Run with specific configuration
run_config = RunConfig(
execution={
"config": {
"job_image": "my-registry/my-image:v1.0.0",
"job_namespace": "data-processing",
"broker": "redis://redis-service:6379/0",
"backend": "redis://redis-service:6379/1",
"job_wait_timeout": 3600, # 1 hour
"per_step_k8s_config": {
"heavy_computation": {
"container_config": {
"resources": {
"requests": {"cpu": "2", "memory": "4Gi"},
"limits": {"cpu": "4", "memory": "8Gi"}
}
}
}
}
}
}
)Configure in your dagster.yaml:
run_launcher:
module: dagster_k8s.launcher
class: CeleryK8sRunLauncher
config:
# Required configuration
instance_config_map: "dagster-instance-config"
dagster_home: "/opt/dagster/dagster_home"
postgres_password_secret: "dagster-postgres-secret"
# Celery configuration
broker: "redis://redis-service:6379/0"
backend: "redis://redis-service:6379/1"
# Kubernetes configuration
job_namespace: "dagster-execution"
service_account_name: "dagster-service-account"
image_pull_policy: "Always"
# Resource configuration
env_config_maps:
- "dagster-env-config"
env_secrets:
- "dagster-secrets"
labels:
team: "data-engineering"
environment: "production"The package provides comprehensive error handling for:
from dagster import DagsterInvariantViolationError
# Image configuration error
if not job_image:
raise DagsterInvariantViolationError(
"You have not specified a job_image in your executor configuration. "
"Specify the job_image in the executor config section."
)
# Compatibility error
if not isinstance(run_launcher, CeleryK8sRunLauncher):
raise DagsterUnmetExecutorRequirementsError(
"This executor is only compatible with a CeleryK8sRunLauncher; "
"configure the CeleryK8sRunLauncher on your instance to use it."
)-A dagster_celery_k8s.app argument