Provider package for Apache Spark integration with Apache Airflow, offering operators, hooks, sensors, and decorators for distributed data processing workflows.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-spark@5.3.00
# Apache Airflow Providers Apache Spark
1
2
A comprehensive provider package that enables seamless integration between Apache Airflow and Apache Spark distributed computing framework. This package provides operators, hooks, and decorators for orchestrating Spark jobs within Airflow workflows, supporting multiple Spark deployment modes including Spark Submit, Spark SQL, Spark JDBC operations, and the modern Spark Connect protocol.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-apache-spark
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-apache-spark`
9
- **Requires**: Apache Airflow >= 2.10.0, PySpark >= 3.5.2, grpcio-status >= 1.59.0
10
11
## Core Imports
12
13
```python
14
# Operators for executing Spark jobs
15
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
16
from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
17
from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator
18
19
# Hooks for Spark connections
20
from airflow.providers.apache.spark.hooks.spark_submit import SparkSubmitHook
21
from airflow.providers.apache.spark.hooks.spark_sql import SparkSqlHook
22
from airflow.providers.apache.spark.hooks.spark_jdbc import SparkJDBCHook
23
from airflow.providers.apache.spark.hooks.spark_connect import SparkConnectHook
24
25
# Task decorator for PySpark functions
26
from airflow.providers.apache.spark.decorators.pyspark import pyspark_task
27
```
28
29
## Basic Usage
30
31
```python
32
from datetime import datetime, timedelta
33
from airflow import DAG
34
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
35
from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
36
from airflow.providers.apache.spark.decorators.pyspark import pyspark_task
37
38
# Define DAG
39
dag = DAG(
40
'spark_example',
41
default_args={
42
'owner': 'data-team',
43
'retries': 1,
44
'retry_delay': timedelta(minutes=5),
45
},
46
description='Example Spark workflow',
47
schedule_interval=timedelta(days=1),
48
start_date=datetime(2023, 1, 1),
49
catchup=False,
50
)
51
52
# Submit a Spark application
53
spark_job = SparkSubmitOperator(
54
task_id='run_spark_job',
55
application='/path/to/spark_app.py',
56
conn_id='spark_default',
57
conf={
58
'spark.executor.memory': '4g',
59
'spark.executor.cores': '2',
60
'spark.driver.memory': '2g',
61
},
62
dag=dag,
63
)
64
65
# Execute SQL query with Spark
66
spark_sql = SparkSqlOperator(
67
task_id='run_spark_sql',
68
sql='SELECT COUNT(*) FROM users WHERE active = true',
69
conn_id='spark_sql_default',
70
dag=dag,
71
)
72
73
# PySpark task decorator example
74
@pyspark_task(task_id='process_data')
75
def process_user_data(spark):
76
df = spark.read.parquet('/data/users.parquet')
77
result = df.filter(df.active == True).groupBy('region').count()
78
result.write.mode('overwrite').parquet('/data/user_counts.parquet')
79
return result.count()
80
81
process_task = process_user_data()
82
83
# Set task dependencies
84
spark_job >> spark_sql >> process_task
85
```
86
87
## Architecture
88
89
The Apache Spark provider follows Airflow's standard provider pattern with distinct layers:
90
91
- **Operators**: Task-level components that execute Spark jobs within Airflow workflows
92
- **Hooks**: Connection management for various Spark interfaces (Submit, SQL, JDBC, Connect)
93
- **Decorators**: Pythonic task decorators that automatically inject Spark sessions
94
- **Connection Types**: Pre-configured connection interfaces for different Spark deployment modes
95
96
This design enables flexible integration with various Spark deployment architectures including local mode, YARN clusters, Kubernetes, Standalone clusters, and cloud-managed Spark services through consistent Airflow abstractions.
97
98
## Capabilities
99
100
### Spark Job Execution
101
102
Execute Spark applications using spark-submit binary with comprehensive configuration support. Handles Spark application submission, monitoring, and resource management across different cluster managers.
103
104
```python { .api }
105
class SparkSubmitOperator(BaseOperator):
106
def __init__(
107
self,
108
application: str = None,
109
conf: dict = None,
110
conn_id: str = 'spark_default',
111
files: str = None,
112
py_files: str = None,
113
archives: str = None,
114
driver_class_path: str = None,
115
jars: str = None,
116
java_class: str = None,
117
packages: str = None,
118
exclude_packages: str = None,
119
repositories: str = None,
120
total_executor_cores: int = None,
121
executor_cores: int = None,
122
executor_memory: str = None,
123
driver_memory: str = None,
124
keytab: str = None,
125
principal: str = None,
126
proxy_user: str = None,
127
name: str = None,
128
num_executors: int = None,
129
application_args: list = None,
130
env_vars: dict = None,
131
verbose: bool = False,
132
spark_binary: str = 'spark-submit',
133
properties_file: str = None,
134
**kwargs
135
): ...
136
137
def execute(self, context): ...
138
def on_kill(self): ...
139
```
140
141
[Spark Operators](./spark-operators.md)
142
143
### Spark Connection Management
144
145
Manage connections to various Spark interfaces including traditional spark-submit, Spark SQL, JDBC operations, and modern Spark Connect protocol. Provides connection configuration, authentication, and cluster communication.
146
147
```python { .api }
148
class SparkSubmitHook(BaseHook):
149
conn_name_attr = "conn_id"
150
default_conn_name = "spark_default"
151
conn_type = "spark"
152
hook_name = "Spark"
153
154
def submit(self, application: str, **kwargs) -> None: ...
155
def on_kill(self) -> None: ...
156
def get_conn(self): ...
157
158
class SparkConnectHook(BaseHook):
159
conn_name_attr = "conn_id"
160
default_conn_name = "spark_connect_default"
161
conn_type = "spark_connect"
162
hook_name = "Spark Connect"
163
164
def get_connection_url(self) -> str: ...
165
```
166
167
[Spark Hooks](./spark-hooks.md)
168
169
### PySpark Task Integration
170
171
Create PySpark tasks that automatically receive Spark session objects, enabling seamless integration of PySpark code within Airflow workflows with automatic session management and cleanup.
172
173
```python { .api }
174
def pyspark_task(
175
python_callable: Callable | None = None,
176
multiple_outputs: bool | None = None,
177
**kwargs,
178
) -> TaskDecorator: ...
179
180
# Usage example:
181
@pyspark_task
182
def my_spark_function(spark):
183
"""Function receives SparkSession as 'spark' parameter"""
184
df = spark.createDataFrame([(1, 'a'), (2, 'b')], ['id', 'value'])
185
return df.count()
186
```
187
188
[PySpark Decorators](./pyspark-decorators.md)
189
190
## Connection Types
191
192
The provider registers these connection types in Airflow:
193
194
1. **`spark`** - For SparkSubmitHook connections to Spark clusters
195
2. **`spark_sql`** - For SparkSqlHook connections to execute SQL queries
196
3. **`spark_jdbc`** - For SparkJDBCHook connections for database transfers
197
4. **`spark_connect`** - For SparkConnectHook connections using Spark Connect protocol
198
199
Each connection type provides custom UI fields for configuration including cluster URLs, authentication credentials, SSL settings, and deployment-specific parameters.
200
201
## Error Handling
202
203
Common exceptions that may be raised:
204
205
- **`AirflowException`** - General Spark job execution failures, configuration errors
206
- **`AirflowNotFoundException`** - Missing Spark applications, connection configurations
207
- **Connection errors** - Cluster connectivity issues, authentication failures
208
- **Spark application errors** - Application-specific failures, resource constraints
209
210
Handle these exceptions in your DAG error handling and retry logic as appropriate for your use case.