CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-databricks

Comprehensive Databricks integration for Apache Airflow with operators, hooks, sensors, and triggers for orchestrating data workflows

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

job-management.mddocs/

Job Management

The Databricks provider offers powerful job management capabilities for executing various types of tasks on Databricks clusters. This includes one-time runs, triggering existing jobs, and specialized notebook execution with comprehensive parameter support.

Core Operators

DatabricksSubmitRunOperator

Submit one-time runs to Databricks with flexible task configurations and cluster management.

from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator

class DatabricksSubmitRunOperator(BaseOperator):
    def __init__(
        self,
        *,
        tasks: list[dict[str, Any]] | None = None,
        spark_jar_task: dict[str, Any] | None = None,
        notebook_task: dict[str, Any] | None = None,
        spark_python_task: dict[str, Any] | None = None,
        spark_submit_task: dict[str, Any] | None = None,
        pipeline_task: dict[str, Any] | None = None,
        python_wheel_task: dict[str, Any] | None = None,
        dbt_task: dict[str, Any] | None = None,
        sql_task: dict[str, Any] | None = None,
        new_cluster: dict[str, Any] | None = None,
        existing_cluster_id: str | None = None,
        job_clusters: list[dict[str, Any]] | None = None,
        libraries: list[dict[str, Any]] | None = None,
        run_name: str | None = None,
        timeout_seconds: int | None = None,
        databricks_conn_id: str = "databricks_default",
        polling_period_seconds: int = 30,
        databricks_retry_limit: int = 3,
        databricks_retry_delay: int = 1,
        databricks_retry_args: dict[str, Any] | None = None,
        do_xcom_push: bool = True,
        idempotency_token: str | None = None,
        access_control_list: list[dict[str, Any]] | None = None,
        wait_for_termination: bool = True,
        deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
        git_source: dict[str, Any] | None = None,
        **kwargs
    ) -> None:
        """
        Submit a one-time run to Databricks.
        
        Args:
            tasks: List of tasks to execute in the run
            spark_jar_task: Configuration for Spark JAR task
            notebook_task: Configuration for notebook task  
            spark_python_task: Configuration for Spark Python task
            spark_submit_task: Configuration for Spark submit task
            pipeline_task: Configuration for Delta Live Tables pipeline task
            python_wheel_task: Configuration for Python wheel task
            dbt_task: Configuration for dbt task
            sql_task: Configuration for SQL task
            new_cluster: New cluster configuration for the run
            existing_cluster_id: ID of existing cluster to use
            job_clusters: Job cluster configurations
            libraries: Libraries to install on the cluster
            run_name: Name for the run (defaults to Airflow task name)
            timeout_seconds: Maximum time to wait for job completion
            databricks_conn_id: Airflow connection ID for Databricks
            polling_period_seconds: Seconds between status polls
            databricks_retry_limit: Number of retries for API calls
            databricks_retry_delay: Seconds between retries
            databricks_retry_args: Additional retry configuration
            do_xcom_push: Whether to push run metadata to XCom
            idempotency_token: Token to ensure idempotent execution
            access_control_list: Access control permissions for the run
            wait_for_termination: Whether to wait for run completion
            deferrable: Whether to use deferrable execution
            git_source: Git source configuration for code
        """

DatabricksRunNowOperator

Trigger existing Databricks jobs with parameter overrides and monitoring.

from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator

class DatabricksRunNowOperator(BaseOperator):
    def __init__(
        self,
        *,
        job_id: int | None = None,
        job_name: str | None = None,
        notebook_params: dict[str, str] | None = None,
        python_params: list[str] | None = None,
        spark_submit_params: list[str] | None = None,
        jar_params: list[str] | None = None,
        sql_params: dict[str, str] | None = None,
        dbt_commands: list[str] | None = None,
        python_named_params: dict[str, str] | None = None,
        pipeline_params: dict[str, str] | None = None,
        wait_for_termination: bool = True,
        timeout_seconds: int | None = None,
        databricks_conn_id: str = "databricks_default",
        polling_period_seconds: int = 30,
        databricks_retry_limit: int = 3,
        databricks_retry_delay: int = 1,
        do_xcom_push: bool = True,
        deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
        **kwargs
    ) -> None:
        """
        Trigger an existing Databricks job.
        
        Args:
            job_id: Databricks job ID to trigger
            job_name: Databricks job name to trigger (alternative to job_id)
            notebook_params: Parameters for notebook tasks
            python_params: Parameters for Python tasks
            spark_submit_params: Parameters for Spark submit tasks
            jar_params: Parameters for JAR tasks
            sql_params: Parameters for SQL tasks
            dbt_commands: Commands for dbt tasks
            python_named_params: Named parameters for Python tasks
            pipeline_params: Parameters for pipeline tasks
            wait_for_termination: Whether to wait for job completion
            timeout_seconds: Maximum time to wait for job completion
            databricks_conn_id: Airflow connection ID for Databricks
            polling_period_seconds: Seconds between status polls
            databricks_retry_limit: Number of retries for API calls
            databricks_retry_delay: Seconds between retries
            do_xcom_push: Whether to push run metadata to XCom
            deferrable: Whether to use deferrable execution
        """

DatabricksNotebookOperator

Execute Databricks notebooks with parameter support and source management.

from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator

class DatabricksNotebookOperator(BaseOperator):
    def __init__(
        self,
        *,
        notebook_path: str,
        source: str = "WORKSPACE",
        base_parameters: dict[str, str] | None = None,
        new_cluster: dict[str, Any] | None = None,
        existing_cluster_id: str | None = None,
        job_cluster_key: str | None = None,
        libraries: list[dict[str, Any]] | None = None,
        run_name: str | None = None,
        timeout_seconds: int | None = None,
        databricks_conn_id: str = "databricks_default",
        polling_period_seconds: int = 30,
        databricks_retry_limit: int = 3,
        databricks_retry_delay: int = 1,
        do_xcom_push: bool = True,
        deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
        wait_for_termination: bool = True,
        **kwargs
    ) -> None:
        """
        Execute a Databricks notebook.
        
        Args:
            notebook_path: Path to the notebook in Databricks workspace or repo
            source: Source type - "WORKSPACE" or "GIT"
            base_parameters: Parameters to pass to the notebook
            new_cluster: New cluster configuration for notebook execution
            existing_cluster_id: ID of existing cluster to use
            job_cluster_key: Key of job cluster to use (for workflow contexts)
            libraries: Libraries to install on the cluster
            run_name: Name for the notebook run
            timeout_seconds: Maximum time to wait for notebook completion
            databricks_conn_id: Airflow connection ID for Databricks
            polling_period_seconds: Seconds between status polls
            databricks_retry_limit: Number of retries for API calls
            databricks_retry_delay: Seconds between retries
            do_xcom_push: Whether to push run metadata to XCom
            deferrable: Whether to use deferrable execution
            wait_for_termination: Whether to wait for notebook completion
        """

Usage Examples

Basic Spark Job Submission

Execute a Python script on a new cluster:

from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator

spark_job = DatabricksSubmitRunOperator(
    task_id='run_data_processing',
    spark_python_task={
        'python_file': 'dbfs:/mnt/scripts/process_data.py',
        'parameters': [
            '--input-path', '/data/raw/{{ ds }}',
            '--output-path', '/data/processed/{{ ds }}',
            '--partition-count', '10'
        ]
    },
    new_cluster={
        'spark_version': '12.2.x-scala2.12',
        'node_type_id': 'i3.xlarge',
        'num_workers': 5,
        'spark_conf': {
            'spark.sql.adaptive.enabled': 'true',
            'spark.sql.adaptive.coalescePartitions.enabled': 'true'
        }
    },
    libraries=[
        {'pypi': {'package': 'pandas==1.5.0'}},
        {'pypi': {'package': 'numpy>=1.21.0'}}
    ],
    timeout_seconds=7200,
    databricks_conn_id='databricks_production'
)

JAR Task with Custom Configuration

Run a Scala JAR with specific parameters:

jar_job = DatabricksSubmitRunOperator(
    task_id='run_scala_etl',
    spark_jar_task={
        'main_class_name': 'com.company.etl.DataProcessor',
        'parameters': [
            '--config', 'production.conf',
            '--date', '{{ ds }}',
            '--batch-size', '1000'
        ]
    },
    libraries=[
        {'jar': 'dbfs:/mnt/jars/data-processor-1.2.3.jar'},
        {'maven': {'coordinates': 'org.apache.kafka:kafka-clients:3.0.0'}}
    ],
    existing_cluster_id='0123-456789-etl001',
    run_name='ETL Process {{ ds }}',
    idempotency_token='etl_{{ ds }}_{{ dag_run.run_id }}'
)

Notebook Execution with Parameters

Execute a notebook with dynamic parameters:

notebook_task = DatabricksNotebookOperator(
    task_id='run_analysis_notebook',
    notebook_path='/Shared/Analytics/Daily Report',
    base_parameters={
        'report_date': '{{ ds }}',
        'customer_segment': 'premium',
        'output_format': 'parquet',
        'include_charts': 'true'
    },
    existing_cluster_id='0123-456789-analytics',
    libraries=[
        {'pypi': {'package': 'matplotlib'}},
        {'pypi': {'package': 'seaborn'}}
    ],
    timeout_seconds=1800
)

Triggering Existing Jobs

Trigger a pre-configured Databricks job:

trigger_job = DatabricksRunNowOperator(
    task_id='trigger_daily_pipeline',
    job_id=123456,
    notebook_params={
        'input_date': '{{ ds }}',
        'refresh_mode': 'incremental'
    },
    python_params=['--verbose', '--config=prod'],
    wait_for_termination=True,
    timeout_seconds=3600
)

SQL Task Execution

Submit a SQL task as part of a job run:

sql_job = DatabricksSubmitRunOperator(
    task_id='run_sql_aggregation',
    sql_task={
        'query': {
            'query_id': 'abc123-def456-789'  # Reference to saved query
        },
        'warehouse_id': 'warehouse-xyz789',
        'parameters': {
            'start_date': '{{ ds }}',
            'end_date': '{{ next_ds }}'
        }
    },
    run_name='SQL Aggregation {{ ds }}',
    timeout_seconds=1200
)

Deferrable Execution

Use deferrable mode for long-running jobs:

long_running_job = DatabricksSubmitRunOperator(
    task_id='long_ml_training',
    spark_python_task={
        'python_file': 'dbfs:/mnt/ml/train_model.py',
        'parameters': ['--epochs', '100', '--data-path', '/data/training']
    },
    new_cluster={
        'spark_version': '12.2.x-cpu-ml-scala2.12',
        'node_type_id': 'i3.2xlarge',
        'num_workers': 8
    },
    timeout_seconds=14400,  # 4 hours
    deferrable=True,  # Use async execution
    polling_period_seconds=60  # Check every minute
)

Advanced Features

Job Clusters

Define reusable cluster configurations within job submissions:

job_with_clusters = DatabricksSubmitRunOperator(
    task_id='multi_cluster_job',
    tasks=[
        {
            'task_key': 'extract',
            'job_cluster_key': 'extract_cluster',
            'spark_python_task': {
                'python_file': 'dbfs:/scripts/extract.py'
            }
        },
        {
            'task_key': 'transform',
            'job_cluster_key': 'transform_cluster',
            'depends_on': [{'task_key': 'extract'}],
            'spark_python_task': {
                'python_file': 'dbfs:/scripts/transform.py'
            }
        }
    ],
    job_clusters=[
        {
            'job_cluster_key': 'extract_cluster',
            'new_cluster': {
                'spark_version': '12.2.x-scala2.12',
                'node_type_id': 'i3.large',
                'num_workers': 2
            }
        },
        {
            'job_cluster_key': 'transform_cluster', 
            'new_cluster': {
                'spark_version': '12.2.x-scala2.12',
                'node_type_id': 'i3.xlarge',
                'num_workers': 8
            }
        }
    ]
)

Access Control

Set permissions for job runs:

secured_job = DatabricksSubmitRunOperator(
    task_id='secured_data_job',
    notebook_task={
        'notebook_path': '/Secure/Financial Analysis'
    },
    existing_cluster_id='secure-cluster-001',
    access_control_list=[
        {
            'user_name': 'analyst@company.com',
            'permission_level': 'CAN_VIEW'
        },
        {
            'group_name': 'data-engineers',
            'permission_level': 'CAN_MANAGE_RUN'
        }
    ]
)

Git Source Integration

Execute code directly from Git repositories:

git_job = DatabricksSubmitRunOperator(
    task_id='run_from_git',
    notebook_task={
        'notebook_path': 'notebooks/data_pipeline.py',
        'source': 'GIT'
    },
    git_source={
        'git_url': 'https://github.com/company/data-pipelines.git',
        'git_branch': 'main',
        'git_provider': 'gitHub'
    },
    existing_cluster_id='dev-cluster-001'
)

Error Handling and Monitoring

XCom Integration

Automatically push job metadata to XCom for downstream tasks:

# Job run pushes run_id, job_id, and run_page_url to XCom
job_run = DatabricksSubmitRunOperator(
    task_id='data_processing',
    spark_python_task={'python_file': 'dbfs:/scripts/process.py'},
    existing_cluster_id='cluster-001',
    do_xcom_push=True  # Default is True
)

# Downstream task can access run information
def get_job_results(**context):
    run_id = context['ti'].xcom_pull(task_ids='data_processing', key='run_id')
    run_url = context['ti'].xcom_pull(task_ids='data_processing', key='run_page_url')
    print(f"Job {run_id} completed. View at: {run_url}")

Timeout and Retry Configuration

Configure robust error handling:

resilient_job = DatabricksSubmitRunOperator(
    task_id='resilient_processing',
    spark_python_task={'python_file': 'dbfs:/scripts/process.py'},
    existing_cluster_id='cluster-001',
    timeout_seconds=3600,  # 1 hour job timeout
    databricks_retry_limit=5,  # Retry API calls 5 times
    databricks_retry_delay=30,  # Wait 30 seconds between retries
    databricks_retry_args={
        'stop_max_attempt_number': 3,
        'wait_exponential_multiplier': 1000
    }
)

The job management operators provide comprehensive control over Databricks job execution with support for all major task types, cluster configurations, and monitoring capabilities. They integrate seamlessly with Airflow's templating, XCom, and error handling systems.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-databricks

docs

connections.md

index.md

job-management.md

monitoring.md

repositories.md

sql-operations.md

workflows.md

tile.json