Comprehensive Databricks integration for Apache Airflow with operators, hooks, sensors, and triggers for orchestrating data workflows
npx @tessl/cli install tessl/pypi-apache-airflow-providers-databricks@7.7.00
# Apache Airflow Databricks Provider
1
2
The Apache Airflow Databricks Provider offers comprehensive integration with Databricks platforms, enabling you to orchestrate data engineering and machine learning workflows through Airflow DAGs. This provider supports job execution, notebook runs, SQL operations, repository management, and advanced workflow orchestration on Databricks clusters and SQL endpoints.
3
4
## Package Information
5
6
- **Name**: apache-airflow-providers-databricks
7
- **Type**: Airflow Provider Package
8
- **Language**: Python 3.8+
9
- **Installation**: `pip install apache-airflow-providers-databricks`
10
- **Databricks API**: Supports Databricks REST API 2.0/2.1
11
- **Dependencies**: databricks-sql-connector, requests
12
13
## Core Imports
14
15
The provider is organized into several main modules for different types of operations:
16
17
```python { .api }
18
# Job Management - Submit and run Databricks jobs
19
from airflow.providers.databricks.operators.databricks import (
20
DatabricksSubmitRunOperator,
21
DatabricksRunNowOperator,
22
DatabricksNotebookOperator
23
)
24
25
# SQL Operations - Execute SQL on Databricks SQL endpoints
26
from airflow.providers.databricks.operators.databricks_sql import (
27
DatabricksSqlOperator,
28
DatabricksCopyIntoOperator
29
)
30
31
# Repository Management - Git repository operations
32
from airflow.providers.databricks.operators.databricks_repos import (
33
DatabricksReposCreateOperator,
34
DatabricksReposUpdateOperator,
35
DatabricksReposDeleteOperator
36
)
37
38
# Workflow Orchestration - Complex multi-task workflows
39
from airflow.providers.databricks.operators.databricks_workflow import (
40
DatabricksWorkflowTaskGroup,
41
DatabricksTaskOperator
42
)
43
44
# Connection and Authentication - API connectivity
45
from airflow.providers.databricks.hooks.databricks import DatabricksHook
46
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
47
48
# Monitoring and Sensing - Job and data monitoring
49
from airflow.providers.databricks.sensors.databricks import DatabricksSQLStatementsSensor
50
from airflow.providers.databricks.sensors.databricks_sql import DatabricksSqlSensor
51
from airflow.providers.databricks.sensors.databricks_partition import DatabricksPartitionSensor
52
53
# Async Triggers - Deferrable task support
54
from airflow.providers.databricks.triggers.databricks import (
55
DatabricksExecutionTrigger,
56
DatabricksSQLStatementExecutionTrigger
57
)
58
```
59
60
## Basic Usage Example
61
62
Here's a simple example that demonstrates running a Databricks notebook:
63
64
```python { .api }
65
from datetime import datetime, timedelta
66
from airflow import DAG
67
from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator
68
69
default_args = {
70
'owner': 'data-team',
71
'depends_on_past': False,
72
'start_date': datetime(2024, 1, 1),
73
'retries': 1,
74
'retry_delay': timedelta(minutes=5)
75
}
76
77
dag = DAG(
78
'databricks_notebook_example',
79
default_args=default_args,
80
description='Execute a Databricks notebook',
81
schedule_interval=timedelta(hours=1),
82
catchup=False
83
)
84
85
# Run a notebook on an existing cluster
86
notebook_task = DatabricksNotebookOperator(
87
task_id='run_analysis_notebook',
88
databricks_conn_id='databricks_default',
89
notebook_path='/Shared/analysis/daily_report',
90
existing_cluster_id='0123-456789-test123',
91
base_parameters={
92
'input_date': '{{ ds }}',
93
'output_path': '/tmp/reports/{{ ds }}'
94
},
95
dag=dag
96
)
97
```
98
99
## Architecture
100
101
The Databricks provider follows a layered architecture designed for flexibility and scalability:
102
103
### Connection Layer
104
- **DatabricksHook**: Core API client for Databricks REST API operations
105
- **DatabricksSqlHook**: SQL-specific client for Databricks SQL endpoints and clusters
106
- **BaseDatabricksHook**: Base functionality including authentication and error handling
107
108
### Execution Layer
109
- **Submit Operators**: Create and execute one-time runs (`DatabricksSubmitRunOperator`)
110
- **Job Operators**: Trigger existing job definitions (`DatabricksRunNowOperator`)
111
- **Notebook Operators**: Execute notebooks with parameters (`DatabricksNotebookOperator`)
112
- **SQL Operators**: Execute SQL queries and data operations (`DatabricksSqlOperator`)
113
114
### Management Layer
115
- **Repository Operators**: Manage Git repositories in Databricks Repos
116
- **Workflow Groups**: Orchestrate complex multi-task workflows
117
- **Resource Management**: Handle clusters, libraries, and job configurations
118
119
### Monitoring Layer
120
- **Sensors**: Monitor job completion, data availability, and SQL query results
121
- **Triggers**: Async monitoring for deferrable task execution
122
- **Status Tracking**: Real-time job state monitoring and error handling
123
124
## Capabilities
125
126
### 1. Job Management
127
Execute various types of Databricks jobs including JAR tasks, Python scripts, notebooks, and SQL queries.
128
129
```python { .api }
130
# Submit a Spark job with custom cluster configuration
131
job_run = DatabricksSubmitRunOperator(
132
task_id='submit_spark_job',
133
spark_python_task={
134
'python_file': 'dbfs:/mnt/scripts/etl_job.py',
135
'parameters': ['--input', '/data/raw', '--output', '/data/processed']
136
},
137
new_cluster={
138
'spark_version': '11.3.x-scala2.12',
139
'node_type_id': 'i3.xlarge',
140
'num_workers': 2
141
}
142
)
143
```
144
**Learn more**: [Job Management](job-management.md)
145
146
### 2. SQL Operations
147
Execute SQL queries on Databricks SQL endpoints with support for multiple data formats and bulk operations.
148
149
```python { .api }
150
# Execute SQL query with results export
151
sql_task = DatabricksSqlOperator(
152
task_id='run_analytics_query',
153
sql="""
154
SELECT customer_id, COUNT(*) as order_count
155
FROM orders
156
WHERE order_date = '{{ ds }}'
157
GROUP BY customer_id
158
""",
159
databricks_conn_id='databricks_sql',
160
output_path='/tmp/daily_analytics_{{ ds }}.csv',
161
output_format='csv'
162
)
163
```
164
**Learn more**: [SQL Operations](sql-operations.md)
165
166
### 3. Repository Management
167
Manage Git repositories in Databricks Repos for version-controlled notebook and code deployment.
168
169
```python { .api }
170
# Create and update repository for notebook deployment
171
create_repo = DatabricksReposCreateOperator(
172
task_id='create_analytics_repo',
173
git_url='https://github.com/company/analytics-notebooks.git',
174
repo_path='/Repos/production/analytics'
175
)
176
```
177
**Learn more**: [Repository Management](repositories.md)
178
179
### 4. Workflow Orchestration
180
Create complex multi-task workflows that run as coordinated Databricks jobs with dependency management.
181
182
```python { .api }
183
# Define workflow with multiple dependent tasks
184
with DatabricksWorkflowTaskGroup(group_id='data_pipeline') as workflow:
185
extract_task = DatabricksTaskOperator(
186
task_id='extract_data',
187
task_config={
188
'notebook_task': {
189
'notebook_path': '/pipelines/extract',
190
'base_parameters': {'date': '{{ ds }}'}
191
}
192
}
193
)
194
```
195
**Learn more**: [Workflow Orchestration](workflows.md)
196
197
### 5. Connection & Authentication
198
Flexible authentication methods including personal access tokens, Azure AD, and service principal authentication.
199
200
```python { .api }
201
# Custom hook usage with specific connection settings
202
hook = DatabricksHook(
203
databricks_conn_id='databricks_production',
204
timeout_seconds=3600,
205
retry_limit=3
206
)
207
run_result = hook.submit_run(job_config)
208
```
209
**Learn more**: [Connection & Authentication](connections.md)
210
211
### 6. Monitoring & Sensing
212
Monitor job completion, data availability, and query results with support for deferrable execution.
213
214
```python { .api }
215
# Monitor job completion with sensor
216
job_sensor = DatabricksSensor(
217
task_id='wait_for_job_completion',
218
run_id="{{ task_instance.xcom_pull(task_ids='submit_job', key='run_id') }}",
219
databricks_conn_id='databricks_default',
220
deferrable=True
221
)
222
```
223
**Learn more**: [Monitoring & Sensing](monitoring.md)
224
225
## Key Features
226
227
- **Deferrable Execution**: All operators support async/deferrable mode for improved resource efficiency
228
- **Template Support**: Comprehensive Jinja2 templating for dynamic parameter configuration
229
- **XCom Integration**: Automatic XCom pushing for run IDs, job URLs, and query results
230
- **Error Handling**: Robust error parsing and retry mechanisms with configurable timeouts
231
- **Multi-Format Support**: CSV, JSON, and Parquet output formats for query results
232
- **Resource Management**: Automatic cluster management with support for job clusters and existing clusters
233
- **OpenLineage Integration**: Built-in data lineage tracking for data governance
234
- **UI Integration**: Direct links to Databricks job runs and workflows from Airflow UI
235
236
## Getting Started
237
238
1. **Install the provider**: `pip install apache-airflow-providers-databricks`
239
2. **Configure connection**: Set up Databricks connection in Airflow with your workspace URL and authentication
240
3. **Choose your use case**: Start with the relevant capability guide above
241
4. **Build incrementally**: Begin with simple operations and expand to complex workflows
242
243
The Databricks provider enables you to leverage the full power of Databricks within your Airflow orchestration, from simple notebook execution to complex multi-stage data processing pipelines.