or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow-providers-databricks

Comprehensive Databricks integration for Apache Airflow with operators, hooks, sensors, and triggers for orchestrating data workflows

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-databricks@7.7.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-databricks@7.7.0

0

# Apache Airflow Databricks Provider

1

2

The Apache Airflow Databricks Provider offers comprehensive integration with Databricks platforms, enabling you to orchestrate data engineering and machine learning workflows through Airflow DAGs. This provider supports job execution, notebook runs, SQL operations, repository management, and advanced workflow orchestration on Databricks clusters and SQL endpoints.

3

4

## Package Information

5

6

- **Name**: apache-airflow-providers-databricks

7

- **Type**: Airflow Provider Package

8

- **Language**: Python 3.8+

9

- **Installation**: `pip install apache-airflow-providers-databricks`

10

- **Databricks API**: Supports Databricks REST API 2.0/2.1

11

- **Dependencies**: databricks-sql-connector, requests

12

13

## Core Imports

14

15

The provider is organized into several main modules for different types of operations:

16

17

```python { .api }

18

# Job Management - Submit and run Databricks jobs

19

from airflow.providers.databricks.operators.databricks import (

20

DatabricksSubmitRunOperator,

21

DatabricksRunNowOperator,

22

DatabricksNotebookOperator

23

)

24

25

# SQL Operations - Execute SQL on Databricks SQL endpoints

26

from airflow.providers.databricks.operators.databricks_sql import (

27

DatabricksSqlOperator,

28

DatabricksCopyIntoOperator

29

)

30

31

# Repository Management - Git repository operations

32

from airflow.providers.databricks.operators.databricks_repos import (

33

DatabricksReposCreateOperator,

34

DatabricksReposUpdateOperator,

35

DatabricksReposDeleteOperator

36

)

37

38

# Workflow Orchestration - Complex multi-task workflows

39

from airflow.providers.databricks.operators.databricks_workflow import (

40

DatabricksWorkflowTaskGroup,

41

DatabricksTaskOperator

42

)

43

44

# Connection and Authentication - API connectivity

45

from airflow.providers.databricks.hooks.databricks import DatabricksHook

46

from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook

47

48

# Monitoring and Sensing - Job and data monitoring

49

from airflow.providers.databricks.sensors.databricks import DatabricksSQLStatementsSensor

50

from airflow.providers.databricks.sensors.databricks_sql import DatabricksSqlSensor

51

from airflow.providers.databricks.sensors.databricks_partition import DatabricksPartitionSensor

52

53

# Async Triggers - Deferrable task support

54

from airflow.providers.databricks.triggers.databricks import (

55

DatabricksExecutionTrigger,

56

DatabricksSQLStatementExecutionTrigger

57

)

58

```

59

60

## Basic Usage Example

61

62

Here's a simple example that demonstrates running a Databricks notebook:

63

64

```python { .api }

65

from datetime import datetime, timedelta

66

from airflow import DAG

67

from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator

68

69

default_args = {

70

'owner': 'data-team',

71

'depends_on_past': False,

72

'start_date': datetime(2024, 1, 1),

73

'retries': 1,

74

'retry_delay': timedelta(minutes=5)

75

}

76

77

dag = DAG(

78

'databricks_notebook_example',

79

default_args=default_args,

80

description='Execute a Databricks notebook',

81

schedule_interval=timedelta(hours=1),

82

catchup=False

83

)

84

85

# Run a notebook on an existing cluster

86

notebook_task = DatabricksNotebookOperator(

87

task_id='run_analysis_notebook',

88

databricks_conn_id='databricks_default',

89

notebook_path='/Shared/analysis/daily_report',

90

existing_cluster_id='0123-456789-test123',

91

base_parameters={

92

'input_date': '{{ ds }}',

93

'output_path': '/tmp/reports/{{ ds }}'

94

},

95

dag=dag

96

)

97

```

98

99

## Architecture

100

101

The Databricks provider follows a layered architecture designed for flexibility and scalability:

102

103

### Connection Layer

104

- **DatabricksHook**: Core API client for Databricks REST API operations

105

- **DatabricksSqlHook**: SQL-specific client for Databricks SQL endpoints and clusters

106

- **BaseDatabricksHook**: Base functionality including authentication and error handling

107

108

### Execution Layer

109

- **Submit Operators**: Create and execute one-time runs (`DatabricksSubmitRunOperator`)

110

- **Job Operators**: Trigger existing job definitions (`DatabricksRunNowOperator`)

111

- **Notebook Operators**: Execute notebooks with parameters (`DatabricksNotebookOperator`)

112

- **SQL Operators**: Execute SQL queries and data operations (`DatabricksSqlOperator`)

113

114

### Management Layer

115

- **Repository Operators**: Manage Git repositories in Databricks Repos

116

- **Workflow Groups**: Orchestrate complex multi-task workflows

117

- **Resource Management**: Handle clusters, libraries, and job configurations

118

119

### Monitoring Layer

120

- **Sensors**: Monitor job completion, data availability, and SQL query results

121

- **Triggers**: Async monitoring for deferrable task execution

122

- **Status Tracking**: Real-time job state monitoring and error handling

123

124

## Capabilities

125

126

### 1. Job Management

127

Execute various types of Databricks jobs including JAR tasks, Python scripts, notebooks, and SQL queries.

128

129

```python { .api }

130

# Submit a Spark job with custom cluster configuration

131

job_run = DatabricksSubmitRunOperator(

132

task_id='submit_spark_job',

133

spark_python_task={

134

'python_file': 'dbfs:/mnt/scripts/etl_job.py',

135

'parameters': ['--input', '/data/raw', '--output', '/data/processed']

136

},

137

new_cluster={

138

'spark_version': '11.3.x-scala2.12',

139

'node_type_id': 'i3.xlarge',

140

'num_workers': 2

141

}

142

)

143

```

144

**Learn more**: [Job Management](job-management.md)

145

146

### 2. SQL Operations

147

Execute SQL queries on Databricks SQL endpoints with support for multiple data formats and bulk operations.

148

149

```python { .api }

150

# Execute SQL query with results export

151

sql_task = DatabricksSqlOperator(

152

task_id='run_analytics_query',

153

sql="""

154

SELECT customer_id, COUNT(*) as order_count

155

FROM orders

156

WHERE order_date = '{{ ds }}'

157

GROUP BY customer_id

158

""",

159

databricks_conn_id='databricks_sql',

160

output_path='/tmp/daily_analytics_{{ ds }}.csv',

161

output_format='csv'

162

)

163

```

164

**Learn more**: [SQL Operations](sql-operations.md)

165

166

### 3. Repository Management

167

Manage Git repositories in Databricks Repos for version-controlled notebook and code deployment.

168

169

```python { .api }

170

# Create and update repository for notebook deployment

171

create_repo = DatabricksReposCreateOperator(

172

task_id='create_analytics_repo',

173

git_url='https://github.com/company/analytics-notebooks.git',

174

repo_path='/Repos/production/analytics'

175

)

176

```

177

**Learn more**: [Repository Management](repositories.md)

178

179

### 4. Workflow Orchestration

180

Create complex multi-task workflows that run as coordinated Databricks jobs with dependency management.

181

182

```python { .api }

183

# Define workflow with multiple dependent tasks

184

with DatabricksWorkflowTaskGroup(group_id='data_pipeline') as workflow:

185

extract_task = DatabricksTaskOperator(

186

task_id='extract_data',

187

task_config={

188

'notebook_task': {

189

'notebook_path': '/pipelines/extract',

190

'base_parameters': {'date': '{{ ds }}'}

191

}

192

}

193

)

194

```

195

**Learn more**: [Workflow Orchestration](workflows.md)

196

197

### 5. Connection & Authentication

198

Flexible authentication methods including personal access tokens, Azure AD, and service principal authentication.

199

200

```python { .api }

201

# Custom hook usage with specific connection settings

202

hook = DatabricksHook(

203

databricks_conn_id='databricks_production',

204

timeout_seconds=3600,

205

retry_limit=3

206

)

207

run_result = hook.submit_run(job_config)

208

```

209

**Learn more**: [Connection & Authentication](connections.md)

210

211

### 6. Monitoring & Sensing

212

Monitor job completion, data availability, and query results with support for deferrable execution.

213

214

```python { .api }

215

# Monitor job completion with sensor

216

job_sensor = DatabricksSensor(

217

task_id='wait_for_job_completion',

218

run_id="{{ task_instance.xcom_pull(task_ids='submit_job', key='run_id') }}",

219

databricks_conn_id='databricks_default',

220

deferrable=True

221

)

222

```

223

**Learn more**: [Monitoring & Sensing](monitoring.md)

224

225

## Key Features

226

227

- **Deferrable Execution**: All operators support async/deferrable mode for improved resource efficiency

228

- **Template Support**: Comprehensive Jinja2 templating for dynamic parameter configuration

229

- **XCom Integration**: Automatic XCom pushing for run IDs, job URLs, and query results

230

- **Error Handling**: Robust error parsing and retry mechanisms with configurable timeouts

231

- **Multi-Format Support**: CSV, JSON, and Parquet output formats for query results

232

- **Resource Management**: Automatic cluster management with support for job clusters and existing clusters

233

- **OpenLineage Integration**: Built-in data lineage tracking for data governance

234

- **UI Integration**: Direct links to Databricks job runs and workflows from Airflow UI

235

236

## Getting Started

237

238

1. **Install the provider**: `pip install apache-airflow-providers-databricks`

239

2. **Configure connection**: Set up Databricks connection in Airflow with your workspace URL and authentication

240

3. **Choose your use case**: Start with the relevant capability guide above

241

4. **Build incrementally**: Begin with simple operations and expand to complex workflows

242

243

The Databricks provider enables you to leverage the full power of Databricks within your Airflow orchestration, from simple notebook execution to complex multi-stage data processing pipelines.