Package for Databricks-specific Dagster framework op and resource components.
—
Step launcher that executes individual Dagster ops on Databricks clusters using PySpark. This component provides cluster provisioning, code packaging, dependency management, and result collection for seamless integration of Dagster ops with Databricks compute resources.
Step launcher resource that runs individual Dagster ops on Databricks clusters with automatic code packaging and dependency management.
class DatabricksPySparkStepLauncher:
"""Step launcher for running PySpark steps on Databricks clusters."""Resource function that creates and configures the Databricks PySpark step launcher.
def databricks_pyspark_step_launcher(init_context: InitResourceContext) -> DatabricksPySparkStepLauncher:
"""
Create a DatabricksPySparkStepLauncher resource from configuration.
Parameters:
- init_context: Dagster resource initialization context
Returns:
DatabricksPySparkStepLauncher: Configured step launcher instance
"""Configuration class defining all available options for the step launcher.
class DatabricksConfig:
"""Configuration schema for Databricks step launcher."""The step launcher supports comprehensive configuration through the resource definition:
from dagster import job
from dagster_databricks import databricks_pyspark_step_launcher
@job(
resource_defs={
"step_launcher": databricks_pyspark_step_launcher.configured({
"run_config": {
"cluster": {
"existing": "your-cluster-id"
}
},
"databricks_host": "https://your-workspace.cloud.databricks.com",
"databricks_token": {"env": "DATABRICKS_TOKEN"}
})
}
)
def my_databricks_job():
my_op()The run_config section defines cluster and job execution parameters:
{
"run_config": {
# Cluster configuration (required)
"cluster": {
"existing": "cluster-id" # Use existing cluster
# OR create new cluster:
# "new": {
# "nodes": {
# "node_types": {
# "node_type_id": "i3.xlarge"
# }
# },
# "size": {"num_workers": 2},
# "spark_version": "11.3.x-scala2.12"
# }
},
# Additional libraries to install
"libraries": [
{"pypi": {"package": "pandas==1.5.0"}},
{"pypi": {"package": "scikit-learn>=1.0.0"}}
],
# Whether to install default Dagster libraries
"install_default_libraries": True,
# Job timeout and naming
"timeout_seconds": 3600,
"run_name": "Dagster Step Execution",
# Notifications
"email_notifications": {
"on_failure": ["admin@company.com"]
}
}
}Multiple authentication methods are supported:
# Personal Access Token
{
"databricks_host": "https://your-workspace.cloud.databricks.com",
"databricks_token": {"env": "DATABRICKS_TOKEN"}
}
# OAuth Service Principal
{
"databricks_host": "https://your-workspace.cloud.databricks.com",
"oauth_credentials": {
"client_id": {"env": "DATABRICKS_CLIENT_ID"},
"client_secret": {"env": "DATABRICKS_CLIENT_SECRET"}
}
}
# Azure Service Principal
{
"databricks_host": "https://your-workspace.cloud.databricks.com",
"azure_credentials": {
"azure_client_id": {"env": "AZURE_CLIENT_ID"},
"azure_client_secret": {"env": "AZURE_CLIENT_SECRET"},
"azure_tenant_id": {"env": "AZURE_TENANT_ID"}
}
}Configure where code and data are stored during execution:
{
"storage": {
"s3": {
"s3_bucket": "my-dagster-bucket",
"s3_prefix": "dagster-runs/"
}
# OR DBFS storage:
# "dbfs": {
# "dbfs_prefix": "/dagster-runs/"
# }
}
}Configure environment variables and secret injection:
{
"env_variables": {
"SPARK_CONF_DIR": "/opt/spark/conf",
"PYTHONPATH": "/custom/python/path"
},
"secrets_to_env_variables": {
"API_KEY": {
"scope": "my-secret-scope",
"key": "api-key"
}
}
}from dagster import job, op, Config
from dagster_databricks import databricks_pyspark_step_launcher
@op
def process_data():
import pandas as pd
# This code runs on Databricks cluster
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
result = df.sum().to_dict()
return result
@job(
resource_defs={
"step_launcher": databricks_pyspark_step_launcher.configured({
"run_config": {
"cluster": {"existing": "your-cluster-id"}
},
"databricks_host": "https://your-workspace.cloud.databricks.com",
"databricks_token": {"env": "DATABRICKS_TOKEN"}
})
}
)
def my_databricks_job():
process_data()step_launcher_config = {
"run_config": {
"cluster": {
"new": {
"nodes": {
"node_types": {
"node_type_id": "i3.xlarge",
"driver_node_type_id": "i3.2xlarge"
}
},
"size": {
"autoscale": {"min_workers": 1, "max_workers": 5}
},
"spark_version": "11.3.x-scala2.12",
"custom_tags": {
"project": "ml-pipeline",
"cost-center": "data-science"
}
}
},
"libraries": [
{"pypi": {"package": "scikit-learn==1.1.0"}},
{"pypi": {"package": "mlflow>=2.0.0"}},
{"maven": {"coordinates": "org.apache.spark:spark-sql_2.12:3.3.0"}}
],
"timeout_seconds": 7200,
"email_notifications": {
"on_failure": ["data-team@company.com"]
}
},
"databricks_host": "https://your-workspace.cloud.databricks.com",
"oauth_credentials": {
"client_id": {"env": "DATABRICKS_CLIENT_ID"},
"client_secret": {"env": "DATABRICKS_CLIENT_SECRET"}
},
"storage": {
"s3": {
"s3_bucket": "my-dagster-storage",
"s3_prefix": "step-launcher-runs/"
}
},
"env_variables": {
"MLFLOW_TRACKING_URI": "databricks"
}
}
@job(
resource_defs={
"step_launcher": databricks_pyspark_step_launcher.configured(step_launcher_config)
}
)
def advanced_ml_pipeline():
train_model()
evaluate_model()
deploy_model()@op
def spark_data_processing():
from pyspark.sql import SparkSession
# SparkSession is automatically available on Databricks
spark = SparkSession.builder.getOrCreate()
# Read data from various sources
df = spark.read.format("delta").load("/path/to/delta/table")
# Perform transformations
result_df = df.groupBy("category").agg({"amount": "sum"})
# Write results
result_df.write.format("delta").mode("overwrite").save("/path/to/output")
return result_df.count()
@op
def ml_training():
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import mlflow
# Load data
df = pd.read_parquet("/dbfs/data/training-data.parquet")
# Prepare features
X = df.drop("target", axis=1)
y = df["target"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# Train model with MLflow tracking
with mlflow.start_run():
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
accuracy = model.score(X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
mlflow.sklearn.log_model(model, "model")
return accuracyWhen developing ops that will run on Databricks, consider:
@op
def data_processing_op():
try:
# Try to use Spark if available (on Databricks)
from pyspark.sql import SparkSession
spark = SparkSession.getOrCreate()
# Spark-based processing
return process_with_spark(spark)
except ImportError:
# Fallback for local development
import pandas as pd
return process_with_pandas()Install with Tessl CLI
npx tessl i tessl/pypi-dagster-databricks