Package for Databricks-specific Dagster framework op and resource components.
—
Factory functions for creating pre-configured ops that handle common Databricks workflows including running existing jobs and submitting one-time tasks. These factories provide standardized patterns for integrating Databricks job execution into Dagster pipelines.
Factory function that creates an op for running existing Databricks jobs by job ID.
def create_databricks_run_now_op(
databricks_job_id: int,
databricks_job_configuration: Optional[dict] = None,
poll_interval_seconds: float = 10,
max_wait_time_seconds: float = 86400,
name: Optional[str] = None,
databricks_resource_key: str = "databricks",
) -> OpDefinition:
"""
Creates an op that launches an existing databricks job.
Parameters:
- databricks_job_id: The ID of the Databricks Job to be executed
- databricks_job_configuration: Configuration for triggering a new job run (job parameters, etc.)
- poll_interval_seconds: How often to poll the Databricks API to check job status
- max_wait_time_seconds: How long to wait for the job to finish before raising an error
- name: The name of the op (defaults to _databricks_run_now_op)
- databricks_resource_key: The name of the resource key used by this op
Returns:
OpDefinition: An op definition to run the Databricks Job
"""Factory function that creates an op for submitting one-time Databricks job runs with full task configuration.
def create_databricks_submit_run_op(
databricks_job_configuration: dict,
poll_interval_seconds: float = 10,
max_wait_time_seconds: float = 86400,
name: Optional[str] = None,
databricks_resource_key: str = "databricks",
) -> OpDefinition:
"""
Creates an op that submits a one-time run of a set of tasks on Databricks.
Parameters:
- databricks_job_configuration: Configuration for submitting a one-time run (cluster, task, etc.)
- poll_interval_seconds: How often to poll the Databricks API to check job status
- max_wait_time_seconds: How long to wait for the job to finish before raising an error
- name: The name of the op (defaults to _databricks_submit_run_op)
- databricks_resource_key: The name of the resource key used by this op
Returns:
OpDefinition: An op definition to submit a one-time run on Databricks
"""Both op factories support runtime configuration through their generated ops:
class DatabricksRunNowOpConfig:
"""Runtime configuration for run_now ops."""
poll_interval_seconds: float = 10
max_wait_time_seconds: float = 86400
class DatabricksSubmitRunOpConfig:
"""Runtime configuration for submit_run ops."""
poll_interval_seconds: float = 10
max_wait_time_seconds: float = 86400from dagster import job
from dagster_databricks import create_databricks_run_now_op, DatabricksClientResource
# Define the resource
databricks_resource = DatabricksClientResource(
host="https://your-workspace.cloud.databricks.com",
token={"env": "DATABRICKS_TOKEN"}
)
# Create op for existing job
run_etl_job = create_databricks_run_now_op(
databricks_job_id=12345,
databricks_job_configuration={
"python_params": [
"--input", "raw_data_table",
"--output", "processed_data_table",
"--date", "2024-01-15"
],
"jar_params": [],
"notebook_params": {
"environment": "production"
}
},
poll_interval_seconds=30,
max_wait_time_seconds=7200, # 2 hours
name="run_daily_etl"
)
@job(resource_defs={"databricks": databricks_resource})
def daily_etl_pipeline():
run_etl_job()from dagster import job
from dagster_databricks import create_databricks_submit_run_op, DatabricksClientResource
# Create op for one-time job submission
submit_ml_training = create_databricks_submit_run_op(
databricks_job_configuration={
"run_name": "ML Model Training",
"new_cluster": {
"spark_version": "11.3.x-cpu-ml-scala2.12",
"node_type_id": "m5d.xlarge",
"num_workers": 4,
"custom_tags": {
"project": "ml-pipeline",
"environment": "production"
}
},
"libraries": [
{"pypi": {"package": "scikit-learn==1.1.0"}},
{"pypi": {"package": "mlflow>=2.0.0"}},
{"pypi": {"package": "pandas>=1.5.0"}}
],
"spark_python_task": {
"python_file": "s3://ml-scripts/train_model.py",
"parameters": [
"--model-type", "random-forest",
"--data-path", "s3://data-bucket/training-data/",
"--output-path", "s3://model-bucket/models/",
"--max-depth", "10",
"--n-estimators", "100"
]
},
"timeout_seconds": 14400, # 4 hours
"email_notifications": {
"on_success": ["ml-team@company.com"],
"on_failure": ["ml-team@company.com", "oncall@company.com"]
}
},
poll_interval_seconds=60,
max_wait_time_seconds=18000, # 5 hours
name="train_ml_model"
)
@job(resource_defs={"databricks": databricks_resource})
def ml_training_pipeline():
submit_ml_training()# Create op for notebook execution
run_analysis_notebook = create_databricks_submit_run_op(
databricks_job_configuration={
"run_name": "Daily Analysis Report",
"existing_cluster_id": "analysis-cluster-id",
"notebook_task": {
"notebook_path": "/Workspace/Users/analyst@company.com/DailyAnalysis",
"base_parameters": {
"report_date": "{{ ds }}", # Can use templating
"output_format": "html",
"include_charts": "true"
}
},
"libraries": [
{"pypi": {"package": "plotly>=5.0.0"}},
{"pypi": {"package": "seaborn>=0.11.0"}}
]
},
name="daily_analysis"
)
@job(resource_defs={"databricks": databricks_resource})
def reporting_pipeline():
run_analysis_notebook()# Create op for Scala/Java JAR execution
run_spark_jar = create_databricks_submit_run_op(
databricks_job_configuration={
"run_name": "Spark JAR Processing",
"new_cluster": {
"spark_version": "11.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 8
},
"spark_jar_task": {
"main_class_name": "com.company.DataProcessor",
"parameters": [
"--input-path", "s3://input-bucket/data/",
"--output-path", "s3://output-bucket/processed/",
"--partition-date", "2024-01-15"
]
},
"libraries": [
{"jar": "s3://jars-bucket/data-processor-1.0.jar"},
{"maven": {"coordinates": "org.apache.spark:spark-sql_2.12:3.3.0"}}
]
},
name="spark_jar_processor"
)from dagster import job, op
# Create multiple ops for different stages
extract_data_op = create_databricks_run_now_op(
databricks_job_id=100, # Existing extraction job
name="extract_data"
)
transform_data_op = create_databricks_submit_run_op(
databricks_job_configuration={
"existing_cluster_id": "transform-cluster",
"notebook_task": {
"notebook_path": "/ETL/Transform",
"base_parameters": {"stage": "transform"}
}
},
name="transform_data"
)
load_data_op = create_databricks_run_now_op(
databricks_job_id=101, # Existing loading job
name="load_data"
)
@job(resource_defs={"databricks": databricks_resource})
def etl_pipeline():
# Chain the operations
extract_result = extract_data_op()
transform_result = transform_data_op(extract_result)
load_data_op(transform_result)from dagster import job, RunConfig
# Op with runtime configuration
flexible_job_op = create_databricks_run_now_op(
databricks_job_id=200,
name="flexible_databricks_job"
)
# Job that accepts runtime config
@job(resource_defs={"databricks": databricks_resource})
def configurable_job():
flexible_job_op()
# Execute with custom polling settings
if __name__ == "__main__":
run_config = RunConfig(
ops={
"flexible_databricks_job": {
"config": {
"poll_interval_seconds": 5, # Poll every 5 seconds
"max_wait_time_seconds": 1800 # 30 minute timeout
}
}
}
)
result = configurable_job.execute_in_process(run_config=run_config)from dagster import job, op, In, Out, OpExecutionContext
# Custom op that uses the factory with additional logic
def create_monitored_databricks_op(job_id: int, name: str):
base_op = create_databricks_run_now_op(
databricks_job_id=job_id,
name=name
)
@op(
name=f"monitored_{name}",
ins={"start_after": In(Nothing)},
out=Out(int),
)
def monitored_op(context: OpExecutionContext):
try:
# Log start
context.log.info(f"Starting Databricks job {job_id}")
# Execute the base op
result = base_op(context)
# Additional monitoring/logging
context.log.info(f"Databricks job {job_id} completed successfully")
return result
except Exception as e:
context.log.error(f"Databricks job {job_id} failed: {str(e)}")
# Could add alerting, retry logic, etc.
raise
return monitored_op
# Use the enhanced op
enhanced_op = create_monitored_databricks_op(300, "critical_job")
@job(resource_defs={"databricks": databricks_resource})
def monitored_pipeline():
enhanced_op()Install with Tessl CLI
npx tessl i tessl/pypi-dagster-databricks