CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-databricks

Package for Databricks-specific Dagster framework op and resource components.

Pending
Overview
Eval results
Files

op-factories.mddocs/

Op Factories

Factory functions for creating pre-configured ops that handle common Databricks workflows including running existing jobs and submitting one-time tasks. These factories provide standardized patterns for integrating Databricks job execution into Dagster pipelines.

Capabilities

Run Existing Job Op Factory

Factory function that creates an op for running existing Databricks jobs by job ID.

def create_databricks_run_now_op(
    databricks_job_id: int,
    databricks_job_configuration: Optional[dict] = None,
    poll_interval_seconds: float = 10,
    max_wait_time_seconds: float = 86400,
    name: Optional[str] = None,
    databricks_resource_key: str = "databricks",
) -> OpDefinition:
    """
    Creates an op that launches an existing databricks job.
    
    Parameters:
    - databricks_job_id: The ID of the Databricks Job to be executed
    - databricks_job_configuration: Configuration for triggering a new job run (job parameters, etc.)
    - poll_interval_seconds: How often to poll the Databricks API to check job status
    - max_wait_time_seconds: How long to wait for the job to finish before raising an error
    - name: The name of the op (defaults to _databricks_run_now_op)
    - databricks_resource_key: The name of the resource key used by this op
    
    Returns:
    OpDefinition: An op definition to run the Databricks Job
    """

Submit New Job Op Factory

Factory function that creates an op for submitting one-time Databricks job runs with full task configuration.

def create_databricks_submit_run_op(
    databricks_job_configuration: dict,
    poll_interval_seconds: float = 10,
    max_wait_time_seconds: float = 86400,
    name: Optional[str] = None,
    databricks_resource_key: str = "databricks",
) -> OpDefinition:
    """
    Creates an op that submits a one-time run of a set of tasks on Databricks.
    
    Parameters:
    - databricks_job_configuration: Configuration for submitting a one-time run (cluster, task, etc.)
    - poll_interval_seconds: How often to poll the Databricks API to check job status
    - max_wait_time_seconds: How long to wait for the job to finish before raising an error
    - name: The name of the op (defaults to _databricks_submit_run_op)
    - databricks_resource_key: The name of the resource key used by this op
    
    Returns:
    OpDefinition: An op definition to submit a one-time run on Databricks
    """

Configuration Options

Both op factories support runtime configuration through their generated ops:

Polling Configuration

class DatabricksRunNowOpConfig:
    """Runtime configuration for run_now ops."""
    poll_interval_seconds: float = 10
    max_wait_time_seconds: float = 86400

class DatabricksSubmitRunOpConfig:
    """Runtime configuration for submit_run ops."""
    poll_interval_seconds: float = 10
    max_wait_time_seconds: float = 86400

Usage Examples

Running Existing Databricks Job

from dagster import job
from dagster_databricks import create_databricks_run_now_op, DatabricksClientResource

# Define the resource
databricks_resource = DatabricksClientResource(
    host="https://your-workspace.cloud.databricks.com",
    token={"env": "DATABRICKS_TOKEN"}
)

# Create op for existing job
run_etl_job = create_databricks_run_now_op(
    databricks_job_id=12345,
    databricks_job_configuration={
        "python_params": [
            "--input", "raw_data_table",
            "--output", "processed_data_table",
            "--date", "2024-01-15"
        ],
        "jar_params": [],
        "notebook_params": {
            "environment": "production"
        }
    },
    poll_interval_seconds=30,
    max_wait_time_seconds=7200,  # 2 hours
    name="run_daily_etl"
)

@job(resource_defs={"databricks": databricks_resource})
def daily_etl_pipeline():
    run_etl_job()

Submitting One-Time Job

from dagster import job
from dagster_databricks import create_databricks_submit_run_op, DatabricksClientResource

# Create op for one-time job submission
submit_ml_training = create_databricks_submit_run_op(
    databricks_job_configuration={
        "run_name": "ML Model Training",
        "new_cluster": {
            "spark_version": "11.3.x-cpu-ml-scala2.12",
            "node_type_id": "m5d.xlarge",
            "num_workers": 4,
            "custom_tags": {
                "project": "ml-pipeline",
                "environment": "production"
            }
        },
        "libraries": [
            {"pypi": {"package": "scikit-learn==1.1.0"}},
            {"pypi": {"package": "mlflow>=2.0.0"}},
            {"pypi": {"package": "pandas>=1.5.0"}}
        ],
        "spark_python_task": {
            "python_file": "s3://ml-scripts/train_model.py",
            "parameters": [
                "--model-type", "random-forest",
                "--data-path", "s3://data-bucket/training-data/",
                "--output-path", "s3://model-bucket/models/",
                "--max-depth", "10",
                "--n-estimators", "100"
            ]
        },
        "timeout_seconds": 14400,  # 4 hours
        "email_notifications": {
            "on_success": ["ml-team@company.com"],
            "on_failure": ["ml-team@company.com", "oncall@company.com"]
        }
    },
    poll_interval_seconds=60,
    max_wait_time_seconds=18000,  # 5 hours
    name="train_ml_model"
)

@job(resource_defs={"databricks": databricks_resource})
def ml_training_pipeline():
    submit_ml_training()

Notebook-Based Workflow

# Create op for notebook execution
run_analysis_notebook = create_databricks_submit_run_op(
    databricks_job_configuration={
        "run_name": "Daily Analysis Report",
        "existing_cluster_id": "analysis-cluster-id",
        "notebook_task": {
            "notebook_path": "/Workspace/Users/analyst@company.com/DailyAnalysis",
            "base_parameters": {
                "report_date": "{{ ds }}",  # Can use templating
                "output_format": "html",
                "include_charts": "true"
            }
        },
        "libraries": [
            {"pypi": {"package": "plotly>=5.0.0"}},
            {"pypi": {"package": "seaborn>=0.11.0"}}
        ]
    },
    name="daily_analysis"
)

@job(resource_defs={"databricks": databricks_resource})
def reporting_pipeline():
    run_analysis_notebook()

JAR-Based Job

# Create op for Scala/Java JAR execution
run_spark_jar = create_databricks_submit_run_op(
    databricks_job_configuration={
        "run_name": "Spark JAR Processing",
        "new_cluster": {
            "spark_version": "11.3.x-scala2.12",
            "node_type_id": "i3.xlarge",
            "num_workers": 8
        },
        "spark_jar_task": {
            "main_class_name": "com.company.DataProcessor",
            "parameters": [
                "--input-path", "s3://input-bucket/data/",
                "--output-path", "s3://output-bucket/processed/",
                "--partition-date", "2024-01-15"
            ]
        },
        "libraries": [
            {"jar": "s3://jars-bucket/data-processor-1.0.jar"},
            {"maven": {"coordinates": "org.apache.spark:spark-sql_2.12:3.3.0"}}
        ]
    },
    name="spark_jar_processor"
)

Multi-Op Pipeline

from dagster import job, op

# Create multiple ops for different stages
extract_data_op = create_databricks_run_now_op(
    databricks_job_id=100,  # Existing extraction job
    name="extract_data"
)

transform_data_op = create_databricks_submit_run_op(
    databricks_job_configuration={
        "existing_cluster_id": "transform-cluster",
        "notebook_task": {
            "notebook_path": "/ETL/Transform",
            "base_parameters": {"stage": "transform"}
        }
    },
    name="transform_data"
)

load_data_op = create_databricks_run_now_op(
    databricks_job_id=101,  # Existing loading job
    name="load_data"
)

@job(resource_defs={"databricks": databricks_resource})
def etl_pipeline():
    # Chain the operations
    extract_result = extract_data_op()
    transform_result = transform_data_op(extract_result)
    load_data_op(transform_result)

Runtime Configuration

from dagster import job, RunConfig

# Op with runtime configuration
flexible_job_op = create_databricks_run_now_op(
    databricks_job_id=200,
    name="flexible_databricks_job"
)

# Job that accepts runtime config
@job(resource_defs={"databricks": databricks_resource})
def configurable_job():
    flexible_job_op()

# Execute with custom polling settings
if __name__ == "__main__":
    run_config = RunConfig(
        ops={
            "flexible_databricks_job": {
                "config": {
                    "poll_interval_seconds": 5,  # Poll every 5 seconds
                    "max_wait_time_seconds": 1800  # 30 minute timeout
                }
            }
        }
    )
    
    result = configurable_job.execute_in_process(run_config=run_config)

Error Handling and Monitoring

from dagster import job, op, In, Out, OpExecutionContext

# Custom op that uses the factory with additional logic
def create_monitored_databricks_op(job_id: int, name: str):
    base_op = create_databricks_run_now_op(
        databricks_job_id=job_id,
        name=name
    )
    
    @op(
        name=f"monitored_{name}",
        ins={"start_after": In(Nothing)},
        out=Out(int),
    )
    def monitored_op(context: OpExecutionContext):
        try:
            # Log start
            context.log.info(f"Starting Databricks job {job_id}")
            
            # Execute the base op
            result = base_op(context)
            
            # Additional monitoring/logging
            context.log.info(f"Databricks job {job_id} completed successfully")
            
            return result
            
        except Exception as e:
            context.log.error(f"Databricks job {job_id} failed: {str(e)}")
            # Could add alerting, retry logic, etc.
            raise
    
    return monitored_op

# Use the enhanced op
enhanced_op = create_monitored_databricks_op(300, "critical_job")

@job(resource_defs={"databricks": databricks_resource})
def monitored_pipeline():
    enhanced_op()

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-databricks

docs

core-client.md

index.md

job-management.md

op-factories.md

pipes-integration.md

pyspark-step-launcher.md

resource-management.md

tile.json