Comprehensive Databricks integration for Apache Airflow with operators, hooks, sensors, and triggers for orchestrating data workflows
npx @tessl/cli install tessl/pypi-apache-airflow-providers-databricks@7.7.0The Apache Airflow Databricks Provider offers comprehensive integration with Databricks platforms, enabling you to orchestrate data engineering and machine learning workflows through Airflow DAGs. This provider supports job execution, notebook runs, SQL operations, repository management, and advanced workflow orchestration on Databricks clusters and SQL endpoints.
pip install apache-airflow-providers-databricksThe provider is organized into several main modules for different types of operations:
# Job Management - Submit and run Databricks jobs
from airflow.providers.databricks.operators.databricks import (
DatabricksSubmitRunOperator,
DatabricksRunNowOperator,
DatabricksNotebookOperator
)
# SQL Operations - Execute SQL on Databricks SQL endpoints
from airflow.providers.databricks.operators.databricks_sql import (
DatabricksSqlOperator,
DatabricksCopyIntoOperator
)
# Repository Management - Git repository operations
from airflow.providers.databricks.operators.databricks_repos import (
DatabricksReposCreateOperator,
DatabricksReposUpdateOperator,
DatabricksReposDeleteOperator
)
# Workflow Orchestration - Complex multi-task workflows
from airflow.providers.databricks.operators.databricks_workflow import (
DatabricksWorkflowTaskGroup,
DatabricksTaskOperator
)
# Connection and Authentication - API connectivity
from airflow.providers.databricks.hooks.databricks import DatabricksHook
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
# Monitoring and Sensing - Job and data monitoring
from airflow.providers.databricks.sensors.databricks import DatabricksSQLStatementsSensor
from airflow.providers.databricks.sensors.databricks_sql import DatabricksSqlSensor
from airflow.providers.databricks.sensors.databricks_partition import DatabricksPartitionSensor
# Async Triggers - Deferrable task support
from airflow.providers.databricks.triggers.databricks import (
DatabricksExecutionTrigger,
DatabricksSQLStatementExecutionTrigger
)Here's a simple example that demonstrates running a Databricks notebook:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'databricks_notebook_example',
default_args=default_args,
description='Execute a Databricks notebook',
schedule_interval=timedelta(hours=1),
catchup=False
)
# Run a notebook on an existing cluster
notebook_task = DatabricksNotebookOperator(
task_id='run_analysis_notebook',
databricks_conn_id='databricks_default',
notebook_path='/Shared/analysis/daily_report',
existing_cluster_id='0123-456789-test123',
base_parameters={
'input_date': '{{ ds }}',
'output_path': '/tmp/reports/{{ ds }}'
},
dag=dag
)The Databricks provider follows a layered architecture designed for flexibility and scalability:
DatabricksSubmitRunOperator)DatabricksRunNowOperator)DatabricksNotebookOperator)DatabricksSqlOperator)Execute various types of Databricks jobs including JAR tasks, Python scripts, notebooks, and SQL queries.
# Submit a Spark job with custom cluster configuration
job_run = DatabricksSubmitRunOperator(
task_id='submit_spark_job',
spark_python_task={
'python_file': 'dbfs:/mnt/scripts/etl_job.py',
'parameters': ['--input', '/data/raw', '--output', '/data/processed']
},
new_cluster={
'spark_version': '11.3.x-scala2.12',
'node_type_id': 'i3.xlarge',
'num_workers': 2
}
)Learn more: Job Management
Execute SQL queries on Databricks SQL endpoints with support for multiple data formats and bulk operations.
# Execute SQL query with results export
sql_task = DatabricksSqlOperator(
task_id='run_analytics_query',
sql="""
SELECT customer_id, COUNT(*) as order_count
FROM orders
WHERE order_date = '{{ ds }}'
GROUP BY customer_id
""",
databricks_conn_id='databricks_sql',
output_path='/tmp/daily_analytics_{{ ds }}.csv',
output_format='csv'
)Learn more: SQL Operations
Manage Git repositories in Databricks Repos for version-controlled notebook and code deployment.
# Create and update repository for notebook deployment
create_repo = DatabricksReposCreateOperator(
task_id='create_analytics_repo',
git_url='https://github.com/company/analytics-notebooks.git',
repo_path='/Repos/production/analytics'
)Learn more: Repository Management
Create complex multi-task workflows that run as coordinated Databricks jobs with dependency management.
# Define workflow with multiple dependent tasks
with DatabricksWorkflowTaskGroup(group_id='data_pipeline') as workflow:
extract_task = DatabricksTaskOperator(
task_id='extract_data',
task_config={
'notebook_task': {
'notebook_path': '/pipelines/extract',
'base_parameters': {'date': '{{ ds }}'}
}
}
)Learn more: Workflow Orchestration
Flexible authentication methods including personal access tokens, Azure AD, and service principal authentication.
# Custom hook usage with specific connection settings
hook = DatabricksHook(
databricks_conn_id='databricks_production',
timeout_seconds=3600,
retry_limit=3
)
run_result = hook.submit_run(job_config)Learn more: Connection & Authentication
Monitor job completion, data availability, and query results with support for deferrable execution.
# Monitor job completion with sensor
job_sensor = DatabricksSensor(
task_id='wait_for_job_completion',
run_id="{{ task_instance.xcom_pull(task_ids='submit_job', key='run_id') }}",
databricks_conn_id='databricks_default',
deferrable=True
)Learn more: Monitoring & Sensing
pip install apache-airflow-providers-databricksThe Databricks provider enables you to leverage the full power of Databricks within your Airflow orchestration, from simple notebook execution to complex multi-stage data processing pipelines.