CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-databricks

Package for Databricks-specific Dagster framework op and resource components.

Pending
Overview
Eval results
Files

pyspark-step-launcher.mddocs/

PySpark Step Launcher

Step launcher that executes individual Dagster ops on Databricks clusters using PySpark. This component provides cluster provisioning, code packaging, dependency management, and result collection for seamless integration of Dagster ops with Databricks compute resources.

Capabilities

DatabricksPySparkStepLauncher

Step launcher resource that runs individual Dagster ops on Databricks clusters with automatic code packaging and dependency management.

class DatabricksPySparkStepLauncher:
    """Step launcher for running PySpark steps on Databricks clusters."""

Resource Factory

Resource function that creates and configures the Databricks PySpark step launcher.

def databricks_pyspark_step_launcher(init_context: InitResourceContext) -> DatabricksPySparkStepLauncher:
    """
    Create a DatabricksPySparkStepLauncher resource from configuration.
    
    Parameters:
    - init_context: Dagster resource initialization context
    
    Returns:
    DatabricksPySparkStepLauncher: Configured step launcher instance
    """

Configuration Schema

Configuration class defining all available options for the step launcher.

class DatabricksConfig:
    """Configuration schema for Databricks step launcher."""

Configuration Options

The step launcher supports comprehensive configuration through the resource definition:

Basic Configuration

from dagster import job
from dagster_databricks import databricks_pyspark_step_launcher

@job(
    resource_defs={
        "step_launcher": databricks_pyspark_step_launcher.configured({
            "run_config": {
                "cluster": {
                    "existing": "your-cluster-id"
                }
            },
            "databricks_host": "https://your-workspace.cloud.databricks.com",
            "databricks_token": {"env": "DATABRICKS_TOKEN"}
        })
    }
)
def my_databricks_job():
    my_op()

Run Configuration Structure

The run_config section defines cluster and job execution parameters:

{
    "run_config": {
        # Cluster configuration (required)
        "cluster": {
            "existing": "cluster-id"  # Use existing cluster
            # OR create new cluster:
            # "new": {
            #     "nodes": {
            #         "node_types": {
            #             "node_type_id": "i3.xlarge"
            #         }
            #     },
            #     "size": {"num_workers": 2},
            #     "spark_version": "11.3.x-scala2.12"
            # }
        },
        
        # Additional libraries to install
        "libraries": [
            {"pypi": {"package": "pandas==1.5.0"}},
            {"pypi": {"package": "scikit-learn>=1.0.0"}}
        ],
        
        # Whether to install default Dagster libraries
        "install_default_libraries": True,
        
        # Job timeout and naming
        "timeout_seconds": 3600,
        "run_name": "Dagster Step Execution",
        
        # Notifications
        "email_notifications": {
            "on_failure": ["admin@company.com"]
        }
    }
}

Authentication Configuration

Multiple authentication methods are supported:

# Personal Access Token
{
    "databricks_host": "https://your-workspace.cloud.databricks.com",
    "databricks_token": {"env": "DATABRICKS_TOKEN"}
}

# OAuth Service Principal
{
    "databricks_host": "https://your-workspace.cloud.databricks.com",
    "oauth_credentials": {
        "client_id": {"env": "DATABRICKS_CLIENT_ID"},
        "client_secret": {"env": "DATABRICKS_CLIENT_SECRET"}
    }
}

# Azure Service Principal
{
    "databricks_host": "https://your-workspace.cloud.databricks.com",
    "azure_credentials": {
        "azure_client_id": {"env": "AZURE_CLIENT_ID"},
        "azure_client_secret": {"env": "AZURE_CLIENT_SECRET"},
        "azure_tenant_id": {"env": "AZURE_TENANT_ID"}
    }
}

Storage Configuration

Configure where code and data are stored during execution:

{
    "storage": {
        "s3": {
            "s3_bucket": "my-dagster-bucket",
            "s3_prefix": "dagster-runs/"
        }
        # OR DBFS storage:
        # "dbfs": {
        #     "dbfs_prefix": "/dagster-runs/"
        # }
    }
}

Environment and Secrets

Configure environment variables and secret injection:

{
    "env_variables": {
        "SPARK_CONF_DIR": "/opt/spark/conf",
        "PYTHONPATH": "/custom/python/path"
    },
    "secrets_to_env_variables": {
        "API_KEY": {
            "scope": "my-secret-scope",
            "key": "api-key"
        }
    }
}

Usage Examples

Basic Step Launcher Setup

from dagster import job, op, Config
from dagster_databricks import databricks_pyspark_step_launcher

@op
def process_data():
    import pandas as pd
    
    # This code runs on Databricks cluster
    df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
    result = df.sum().to_dict()
    return result

@job(
    resource_defs={
        "step_launcher": databricks_pyspark_step_launcher.configured({
            "run_config": {
                "cluster": {"existing": "your-cluster-id"}
            },
            "databricks_host": "https://your-workspace.cloud.databricks.com",
            "databricks_token": {"env": "DATABRICKS_TOKEN"}
        })
    }
)
def my_databricks_job():
    process_data()

Advanced Configuration with New Cluster

step_launcher_config = {
    "run_config": {
        "cluster": {
            "new": {
                "nodes": {
                    "node_types": {
                        "node_type_id": "i3.xlarge",
                        "driver_node_type_id": "i3.2xlarge"
                    }
                },
                "size": {
                    "autoscale": {"min_workers": 1, "max_workers": 5}
                },
                "spark_version": "11.3.x-scala2.12",
                "custom_tags": {
                    "project": "ml-pipeline",
                    "cost-center": "data-science"
                }
            }
        },
        "libraries": [
            {"pypi": {"package": "scikit-learn==1.1.0"}},
            {"pypi": {"package": "mlflow>=2.0.0"}},
            {"maven": {"coordinates": "org.apache.spark:spark-sql_2.12:3.3.0"}}
        ],
        "timeout_seconds": 7200,
        "email_notifications": {
            "on_failure": ["data-team@company.com"]
        }
    },
    "databricks_host": "https://your-workspace.cloud.databricks.com",
    "oauth_credentials": {
        "client_id": {"env": "DATABRICKS_CLIENT_ID"},
        "client_secret": {"env": "DATABRICKS_CLIENT_SECRET"}
    },
    "storage": {
        "s3": {
            "s3_bucket": "my-dagster-storage",
            "s3_prefix": "step-launcher-runs/"
        }
    },
    "env_variables": {
        "MLFLOW_TRACKING_URI": "databricks"
    }
}

@job(
    resource_defs={
        "step_launcher": databricks_pyspark_step_launcher.configured(step_launcher_config)
    }
)
def advanced_ml_pipeline():
    train_model()
    evaluate_model()
    deploy_model()

PySpark Operations

@op
def spark_data_processing():
    from pyspark.sql import SparkSession
    
    # SparkSession is automatically available on Databricks
    spark = SparkSession.builder.getOrCreate()
    
    # Read data from various sources
    df = spark.read.format("delta").load("/path/to/delta/table")
    
    # Perform transformations
    result_df = df.groupBy("category").agg({"amount": "sum"})
    
    # Write results
    result_df.write.format("delta").mode("overwrite").save("/path/to/output")
    
    return result_df.count()

@op
def ml_training():
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import train_test_split
    import mlflow
    
    # Load data
    df = pd.read_parquet("/dbfs/data/training-data.parquet")
    
    # Prepare features
    X = df.drop("target", axis=1)
    y = df["target"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
    
    # Train model with MLflow tracking
    with mlflow.start_run():
        model = RandomForestClassifier(n_estimators=100)
        model.fit(X_train, y_train)
        
        accuracy = model.score(X_test, y_test)
        mlflow.log_metric("accuracy", accuracy)
        mlflow.sklearn.log_model(model, "model")
    
    return accuracy

Local Development Considerations

When developing ops that will run on Databricks, consider:

@op
def data_processing_op():
    try:
        # Try to use Spark if available (on Databricks)
        from pyspark.sql import SparkSession
        spark = SparkSession.getOrCreate()
        # Spark-based processing
        return process_with_spark(spark)
    except ImportError:
        # Fallback for local development
        import pandas as pd
        return process_with_pandas()

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-databricks

docs

core-client.md

index.md

job-management.md

op-factories.md

pipes-integration.md

pyspark-step-launcher.md

resource-management.md

tile.json