0
# Spark Operators
1
2
Operators for executing various types of Spark jobs within Airflow workflows. These operators provide task-level components that handle Spark application submission, SQL query execution, and JDBC database operations with comprehensive configuration and monitoring capabilities.
3
4
## Capabilities
5
6
### Spark Application Submission
7
8
Execute Spark applications using the spark-submit binary with full support for cluster managers, resource configuration, authentication, and dependency management.
9
10
```python { .api }
11
class SparkSubmitOperator(BaseOperator):
12
"""
13
Execute Spark applications via spark-submit binary.
14
15
Parameters:
16
- application (str): Path to Spark application file (.py, .jar, .scala, .R) (default: "")
17
- conf (dict[Any, Any] | None): Spark configuration properties as key-value pairs
18
- conn_id (str): Airflow connection ID for Spark cluster (default: "spark_default")
19
- files (str | None): Comma-separated list of files to place in working directory
20
- py_files (str | None): Comma-separated list of .zip, .egg, .py files for Python path
21
- archives (str | None): Comma-separated list of archives to extract in working directory
22
- driver_class_path (str | None): Extra classpath entries for driver
23
- jars (str | None): Comma-separated list of JAR files to include
24
- java_class (str | None): Main class for Java/Scala applications
25
- packages (str | None): Maven coordinates of packages to include
26
- exclude_packages (str | None): Maven coordinates of packages to exclude
27
- repositories (str | None): Additional remote repositories for dependency resolution
28
- total_executor_cores (int | None): Total cores for all executors
29
- executor_cores (int | None): Number of cores per executor
30
- executor_memory (str | None): Memory per executor (e.g., '4g', '2048m')
31
- driver_memory (str | None): Memory for driver (e.g., '2g', '1024m')
32
- keytab (str | None): Path to Kerberos keytab file
33
- principal (str | None): Kerberos principal
34
- proxy_user (str | None): User to impersonate when running job
35
- name (str): Name for Spark application (default: "arrow-spark")
36
- num_executors (int | None): Number of executors to launch
37
- application_args (list[Any] | None): Arguments passed to main method of application
38
- env_vars (dict[str, Any] | None): Environment variables for Spark application
39
- verbose (bool): Enable verbose output (default: False)
40
- spark_binary (str | None): Spark binary to use (uses connection setting if not specified)
41
- properties_file (str | None): Path to properties file with Spark configuration
42
- yarn_queue (str | None): YARN queue to submit to
43
- deploy_mode (str | None): Deploy mode (client or cluster)
44
- status_poll_interval (int): Seconds between polls of driver status (default: 1)
45
- use_krb5ccache (bool): Use Kerberos credential cache (default: False)
46
- openlineage_inject_parent_job_info (bool): Inject OpenLineage parent job info (default: False)
47
- openlineage_inject_transport_info (bool): Inject OpenLineage transport info (default: False)
48
49
Template Fields: application, conf, files, py_files, jars, driver_class_path,
50
packages, exclude_packages, keytab, principal, proxy_user, name,
51
application_args, env_vars, properties_file
52
"""
53
54
template_fields = (
55
"application", "conf", "files", "py_files", "jars",
56
"driver_class_path", "packages", "exclude_packages",
57
"keytab", "principal", "proxy_user", "name",
58
"application_args", "env_vars", "properties_file"
59
)
60
61
def __init__(
62
self,
63
*,
64
application: str = "",
65
conf: dict[Any, Any] | None = None,
66
conn_id: str = "spark_default",
67
files: str | None = None,
68
py_files: str | None = None,
69
archives: str | None = None,
70
driver_class_path: str | None = None,
71
jars: str | None = None,
72
java_class: str | None = None,
73
packages: str | None = None,
74
exclude_packages: str | None = None,
75
repositories: str | None = None,
76
total_executor_cores: int | None = None,
77
executor_cores: int | None = None,
78
executor_memory: str | None = None,
79
driver_memory: str | None = None,
80
keytab: str | None = None,
81
principal: str | None = None,
82
proxy_user: str | None = None,
83
name: str = "arrow-spark",
84
num_executors: int | None = None,
85
status_poll_interval: int = 1,
86
application_args: list[Any] | None = None,
87
env_vars: dict[str, Any] | None = None,
88
verbose: bool = False,
89
spark_binary: str | None = None,
90
properties_file: str | None = None,
91
yarn_queue: str | None = None,
92
deploy_mode: str | None = None,
93
use_krb5ccache: bool = False,
94
openlineage_inject_parent_job_info: bool = False,
95
openlineage_inject_transport_info: bool = False,
96
**kwargs
97
): ...
98
99
def execute(self, context) -> None:
100
"""Execute Spark application using SparkSubmitHook."""
101
102
def on_kill(self) -> None:
103
"""Kill running Spark job."""
104
```
105
106
#### Usage Example
107
108
```python
109
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
110
111
# Submit PySpark application
112
pyspark_job = SparkSubmitOperator(
113
task_id='process_data',
114
application='/path/to/data_processing.py',
115
conn_id='spark_cluster',
116
conf={
117
'spark.executor.memory': '4g',
118
'spark.executor.cores': '2',
119
'spark.driver.memory': '2g',
120
'spark.sql.adaptive.enabled': 'true',
121
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
122
},
123
py_files='s3://bucket/dependencies.zip',
124
application_args=['--input', 's3://bucket/input/', '--output', 's3://bucket/output/'],
125
env_vars={'SPARK_ENV': 'production'},
126
dag=dag,
127
)
128
129
# Submit Scala/Java application
130
scala_job = SparkSubmitOperator(
131
task_id='run_scala_job',
132
application='/path/to/app.jar',
133
java_class='com.example.SparkApplication',
134
jars='/path/to/additional.jar',
135
packages='org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0',
136
conf={
137
'spark.executor.instances': '10',
138
'spark.executor.memory': '8g',
139
},
140
dag=dag,
141
)
142
```
143
144
### Spark SQL Execution
145
146
Execute Spark SQL queries using the spark-sql binary with support for Hive integration, query templating, and various output formats.
147
148
```python { .api }
149
class SparkSqlOperator(BaseOperator):
150
"""
151
Execute Spark SQL queries via spark-sql binary.
152
153
Parameters:
154
- sql (str): SQL query to execute (can be templated)
155
- conf (dict[str, Any] | str | None): Spark configuration properties
156
- conn_id (str): Airflow connection ID (default: "spark_sql_default")
157
- total_executor_cores (int | None): Total cores for all executors
158
- executor_cores (int | None): Number of cores per executor
159
- executor_memory (str | None): Memory per executor
160
- keytab (str | None): Path to Kerberos keytab file
161
- principal (str | None): Kerberos principal
162
- master (str | None): Cluster manager URL (default: None, uses connection)
163
- name (str): Name for Spark application (default: "default-name")
164
- num_executors (int | None): Number of executors
165
- verbose (bool): Enable verbose output (default: True)
166
- yarn_queue (str | None): YARN queue name
167
168
Template Fields: sql
169
Template Extensions: .sql, .hql
170
"""
171
172
template_fields = ("sql",)
173
template_ext = (".sql", ".hql")
174
template_fields_renderers = {"sql": "sql"}
175
176
def __init__(
177
self,
178
*,
179
sql: str,
180
conf: dict[str, Any] | str | None = None,
181
conn_id: str = "spark_sql_default",
182
total_executor_cores: int | None = None,
183
executor_cores: int | None = None,
184
executor_memory: str | None = None,
185
keytab: str | None = None,
186
principal: str | None = None,
187
master: str | None = None,
188
name: str = "default-name",
189
num_executors: int | None = None,
190
verbose: bool = True,
191
yarn_queue: str | None = None,
192
**kwargs
193
): ...
194
195
def execute(self, context) -> None:
196
"""Execute SQL query using SparkSqlHook."""
197
198
def on_kill(self) -> None:
199
"""Kill running SQL query."""
200
```
201
202
#### Usage Example
203
204
```python
205
from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
206
207
# Execute SQL query from string
208
sql_analysis = SparkSqlOperator(
209
task_id='analyze_user_data',
210
sql="""
211
CREATE TABLE user_summary AS
212
SELECT region,
213
COUNT(*) as user_count,
214
AVG(age) as avg_age,
215
SUM(total_purchases) as total_revenue
216
FROM users
217
WHERE active = true
218
AND registration_date >= '{{ ds }}'
219
GROUP BY region
220
""",
221
conn_id='spark_sql_cluster',
222
conf={
223
'spark.sql.adaptive.enabled': 'true',
224
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
225
},
226
dag=dag,
227
)
228
229
# Execute SQL from file
230
sql_from_file = SparkSqlOperator(
231
task_id='run_etl_script',
232
sql='etl_queries.sql', # File in DAG folder or templated path
233
conn_id='spark_sql_default',
234
dag=dag,
235
)
236
```
237
238
### Spark JDBC Operations
239
240
Transfer data between Spark and JDBC databases with support for batch processing, partitioning, and various save modes.
241
242
```python { .api }
243
class SparkJDBCOperator(SparkSubmitOperator):
244
"""
245
Execute Spark JDBC operations to transfer data between Spark and databases.
246
Inherits from SparkSubmitOperator for Spark configuration.
247
248
Parameters:
249
- spark_app_name (str): Name for Spark application (default: 'airflow-spark-jdbc')
250
- spark_conn_id (str): Spark connection ID (default: 'spark_default')
251
- spark_conf (dict): Spark configuration properties
252
- spark_py_files (str): Python files for Spark
253
- spark_files (str): Files for Spark
254
- spark_jars (str): JAR files for Spark (include JDBC drivers)
255
- num_executors (int): Number of Spark executors
256
- executor_cores (int): Cores per executor
257
- executor_memory (str): Memory per executor
258
- driver_memory (str): Driver memory
259
- verbose (bool): Enable verbose output
260
- principal (str): Kerberos principal
261
- keytab (str): Kerberos keytab path
262
- cmd_type (str): Operation type ('spark_to_jdbc' or 'jdbc_to_spark')
263
- jdbc_table (str): JDBC table name
264
- jdbc_conn_id (str): JDBC connection ID (default: 'jdbc_default')
265
- jdbc_driver (str): JDBC driver class name
266
- metastore_table (str): Spark metastore table name
267
- jdbc_truncate (bool): Truncate JDBC table before write (default: False)
268
- save_mode (str): Save mode ('append', 'overwrite', 'ignore', 'error')
269
- save_format (str): Save format ('json', 'parquet', 'csv', etc.)
270
- batch_size (int): JDBC batch size for writes
271
- fetch_size (int): JDBC fetch size for reads
272
- num_partitions (int): Number of partitions for parallel reads/writes
273
- partition_column (str): Column for partitioning JDBC reads
274
- lower_bound (str): Lower bound for partition column
275
- upper_bound (str): Upper bound for partition column
276
- create_table_column_types (str): Column types for table creation
277
- use_krb5ccache (bool): Use Kerberos credential cache
278
"""
279
280
def __init__(
281
self,
282
spark_app_name: str = 'airflow-spark-jdbc',
283
spark_conn_id: str = 'spark_default',
284
spark_conf: dict[str, Any] | None = None,
285
spark_py_files: str = None,
286
spark_files: str = None,
287
spark_jars: str = None,
288
num_executors: int = None,
289
executor_cores: int = None,
290
executor_memory: str = None,
291
driver_memory: str = None,
292
verbose: bool = False,
293
principal: str = None,
294
keytab: str = None,
295
cmd_type: str = 'spark_to_jdbc',
296
jdbc_table: str = None,
297
jdbc_conn_id: str = 'jdbc_default',
298
jdbc_driver: str = None,
299
metastore_table: str = None,
300
jdbc_truncate: bool = False,
301
save_mode: str = None,
302
save_format: str = None,
303
batch_size: int = None,
304
fetch_size: int = None,
305
num_partitions: int = None,
306
partition_column: str = None,
307
lower_bound: str = None,
308
upper_bound: str = None,
309
create_table_column_types: str = None,
310
use_krb5ccache: bool = False,
311
**kwargs
312
): ...
313
314
def execute(self, context) -> None:
315
"""Execute JDBC transfer using SparkJDBCHook."""
316
317
def on_kill(self) -> None:
318
"""Kill running JDBC job."""
319
```
320
321
#### Usage Example
322
323
```python
324
from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator
325
326
# Transfer data from Spark to database
327
spark_to_db = SparkJDBCOperator(
328
task_id='export_results_to_db',
329
spark_conn_id='spark_cluster',
330
jdbc_conn_id='postgres_default',
331
cmd_type='spark_to_jdbc',
332
metastore_table='processed_data',
333
jdbc_table='analytics_results',
334
jdbc_driver='org.postgresql.Driver',
335
spark_jars='postgresql-42.2.18.jar',
336
save_mode='overwrite',
337
jdbc_truncate=True,
338
batch_size=10000,
339
dag=dag,
340
)
341
342
# Transfer data from database to Spark with partitioning
343
db_to_spark = SparkJDBCOperator(
344
task_id='load_data_from_db',
345
spark_conn_id='spark_cluster',
346
jdbc_conn_id='mysql_warehouse',
347
cmd_type='jdbc_to_spark',
348
jdbc_table='large_table',
349
metastore_table='imported_data',
350
jdbc_driver='com.mysql.cj.jdbc.Driver',
351
spark_jars='mysql-connector-java-8.0.23.jar',
352
num_partitions=10,
353
partition_column='id',
354
lower_bound='1',
355
upper_bound='1000000',
356
fetch_size=50000,
357
dag=dag,
358
)
359
```
360
361
## Error Handling
362
363
Common exceptions and error scenarios:
364
365
### Application Errors
366
- **Application not found**: Verify application path is accessible to Spark cluster
367
- **Class not found**: Ensure main class exists and dependencies are included
368
- **Resource allocation failures**: Check cluster capacity and resource requirements
369
370
### Configuration Errors
371
- **Invalid Spark configuration**: Validate conf parameters against Spark documentation
372
- **Connection failures**: Verify connection configurations and cluster accessibility
373
- **Authentication errors**: Check Kerberos settings, keytab files, and principals
374
375
### JDBC Errors
376
- **Driver not found**: Include JDBC driver JAR in spark_jars parameter
377
- **Connection failures**: Verify JDBC connection settings and database accessibility
378
- **Table/column errors**: Ensure target tables exist and column types are compatible
379
380
### Best Practices
381
- Use connection pooling for JDBC operations
382
- Configure appropriate batch sizes for data transfers
383
- Monitor cluster resources and adjust executor settings
384
- Use partitioning for large dataset transfers
385
- Implement proper error handling and retry logic in DAGs