or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

connections.mdindex.mdjob-management.mdmonitoring.mdrepositories.mdsql-operations.mdworkflows.md
tile.json

tessl/pypi-apache-airflow-providers-databricks

Comprehensive Databricks integration for Apache Airflow with operators, hooks, sensors, and triggers for orchestrating data workflows

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-databricks@7.7.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-databricks@7.7.0

index.mddocs/

Apache Airflow Databricks Provider

The Apache Airflow Databricks Provider offers comprehensive integration with Databricks platforms, enabling you to orchestrate data engineering and machine learning workflows through Airflow DAGs. This provider supports job execution, notebook runs, SQL operations, repository management, and advanced workflow orchestration on Databricks clusters and SQL endpoints.

Package Information

  • Name: apache-airflow-providers-databricks
  • Type: Airflow Provider Package
  • Language: Python 3.8+
  • Installation: pip install apache-airflow-providers-databricks
  • Databricks API: Supports Databricks REST API 2.0/2.1
  • Dependencies: databricks-sql-connector, requests

Core Imports

The provider is organized into several main modules for different types of operations:

# Job Management - Submit and run Databricks jobs
from airflow.providers.databricks.operators.databricks import (
    DatabricksSubmitRunOperator,
    DatabricksRunNowOperator,
    DatabricksNotebookOperator
)

# SQL Operations - Execute SQL on Databricks SQL endpoints
from airflow.providers.databricks.operators.databricks_sql import (
    DatabricksSqlOperator,
    DatabricksCopyIntoOperator
)

# Repository Management - Git repository operations
from airflow.providers.databricks.operators.databricks_repos import (
    DatabricksReposCreateOperator,
    DatabricksReposUpdateOperator,
    DatabricksReposDeleteOperator
)

# Workflow Orchestration - Complex multi-task workflows
from airflow.providers.databricks.operators.databricks_workflow import (
    DatabricksWorkflowTaskGroup,
    DatabricksTaskOperator
)

# Connection and Authentication - API connectivity
from airflow.providers.databricks.hooks.databricks import DatabricksHook
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook

# Monitoring and Sensing - Job and data monitoring
from airflow.providers.databricks.sensors.databricks import DatabricksSQLStatementsSensor
from airflow.providers.databricks.sensors.databricks_sql import DatabricksSqlSensor
from airflow.providers.databricks.sensors.databricks_partition import DatabricksPartitionSensor

# Async Triggers - Deferrable task support
from airflow.providers.databricks.triggers.databricks import (
    DatabricksExecutionTrigger,
    DatabricksSQLStatementExecutionTrigger
)

Basic Usage Example

Here's a simple example that demonstrates running a Databricks notebook:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'databricks_notebook_example',
    default_args=default_args,
    description='Execute a Databricks notebook',
    schedule_interval=timedelta(hours=1),
    catchup=False
)

# Run a notebook on an existing cluster
notebook_task = DatabricksNotebookOperator(
    task_id='run_analysis_notebook',
    databricks_conn_id='databricks_default',
    notebook_path='/Shared/analysis/daily_report',
    existing_cluster_id='0123-456789-test123',
    base_parameters={
        'input_date': '{{ ds }}',
        'output_path': '/tmp/reports/{{ ds }}'
    },
    dag=dag
)

Architecture

The Databricks provider follows a layered architecture designed for flexibility and scalability:

Connection Layer

  • DatabricksHook: Core API client for Databricks REST API operations
  • DatabricksSqlHook: SQL-specific client for Databricks SQL endpoints and clusters
  • BaseDatabricksHook: Base functionality including authentication and error handling

Execution Layer

  • Submit Operators: Create and execute one-time runs (DatabricksSubmitRunOperator)
  • Job Operators: Trigger existing job definitions (DatabricksRunNowOperator)
  • Notebook Operators: Execute notebooks with parameters (DatabricksNotebookOperator)
  • SQL Operators: Execute SQL queries and data operations (DatabricksSqlOperator)

Management Layer

  • Repository Operators: Manage Git repositories in Databricks Repos
  • Workflow Groups: Orchestrate complex multi-task workflows
  • Resource Management: Handle clusters, libraries, and job configurations

Monitoring Layer

  • Sensors: Monitor job completion, data availability, and SQL query results
  • Triggers: Async monitoring for deferrable task execution
  • Status Tracking: Real-time job state monitoring and error handling

Capabilities

1. Job Management

Execute various types of Databricks jobs including JAR tasks, Python scripts, notebooks, and SQL queries.

# Submit a Spark job with custom cluster configuration
job_run = DatabricksSubmitRunOperator(
    task_id='submit_spark_job',
    spark_python_task={
        'python_file': 'dbfs:/mnt/scripts/etl_job.py',
        'parameters': ['--input', '/data/raw', '--output', '/data/processed']
    },
    new_cluster={
        'spark_version': '11.3.x-scala2.12',
        'node_type_id': 'i3.xlarge',
        'num_workers': 2
    }
)

Learn more: Job Management

2. SQL Operations

Execute SQL queries on Databricks SQL endpoints with support for multiple data formats and bulk operations.

# Execute SQL query with results export
sql_task = DatabricksSqlOperator(
    task_id='run_analytics_query',
    sql="""
        SELECT customer_id, COUNT(*) as order_count
        FROM orders 
        WHERE order_date = '{{ ds }}'
        GROUP BY customer_id
    """,
    databricks_conn_id='databricks_sql',
    output_path='/tmp/daily_analytics_{{ ds }}.csv',
    output_format='csv'
)

Learn more: SQL Operations

3. Repository Management

Manage Git repositories in Databricks Repos for version-controlled notebook and code deployment.

# Create and update repository for notebook deployment
create_repo = DatabricksReposCreateOperator(
    task_id='create_analytics_repo',
    git_url='https://github.com/company/analytics-notebooks.git',
    repo_path='/Repos/production/analytics'
)

Learn more: Repository Management

4. Workflow Orchestration

Create complex multi-task workflows that run as coordinated Databricks jobs with dependency management.

# Define workflow with multiple dependent tasks
with DatabricksWorkflowTaskGroup(group_id='data_pipeline') as workflow:
    extract_task = DatabricksTaskOperator(
        task_id='extract_data',
        task_config={
            'notebook_task': {
                'notebook_path': '/pipelines/extract',
                'base_parameters': {'date': '{{ ds }}'}
            }
        }
    )

Learn more: Workflow Orchestration

5. Connection & Authentication

Flexible authentication methods including personal access tokens, Azure AD, and service principal authentication.

# Custom hook usage with specific connection settings
hook = DatabricksHook(
    databricks_conn_id='databricks_production',
    timeout_seconds=3600,
    retry_limit=3
)
run_result = hook.submit_run(job_config)

Learn more: Connection & Authentication

6. Monitoring & Sensing

Monitor job completion, data availability, and query results with support for deferrable execution.

# Monitor job completion with sensor
job_sensor = DatabricksSensor(
    task_id='wait_for_job_completion',
    run_id="{{ task_instance.xcom_pull(task_ids='submit_job', key='run_id') }}",
    databricks_conn_id='databricks_default',
    deferrable=True
)

Learn more: Monitoring & Sensing

Key Features

  • Deferrable Execution: All operators support async/deferrable mode for improved resource efficiency
  • Template Support: Comprehensive Jinja2 templating for dynamic parameter configuration
  • XCom Integration: Automatic XCom pushing for run IDs, job URLs, and query results
  • Error Handling: Robust error parsing and retry mechanisms with configurable timeouts
  • Multi-Format Support: CSV, JSON, and Parquet output formats for query results
  • Resource Management: Automatic cluster management with support for job clusters and existing clusters
  • OpenLineage Integration: Built-in data lineage tracking for data governance
  • UI Integration: Direct links to Databricks job runs and workflows from Airflow UI

Getting Started

  1. Install the provider: pip install apache-airflow-providers-databricks
  2. Configure connection: Set up Databricks connection in Airflow with your workspace URL and authentication
  3. Choose your use case: Start with the relevant capability guide above
  4. Build incrementally: Begin with simple operations and expand to complex workflows

The Databricks provider enables you to leverage the full power of Databricks within your Airflow orchestration, from simple notebook execution to complex multi-stage data processing pipelines.