0
# Spark Hooks
1
2
Connection management hooks for various Spark interfaces including spark-submit, Spark SQL, JDBC operations, and Spark Connect protocol. These hooks handle authentication, connection configuration, and communication with different Spark deployment modes.
3
4
## Capabilities
5
6
### Spark Submit Connection Management
7
8
Manage connections for spark-submit operations across different cluster managers with support for comprehensive Spark configuration and resource management.
9
10
```python { .api }
11
class SparkSubmitHook(BaseHook):
12
"""
13
Hook for spark-submit binary execution with extensive configuration support.
14
15
Connection Information:
16
- conn_name_attr: "conn_id"
17
- default_conn_name: "spark_default"
18
- conn_type: "spark"
19
- hook_name: "Spark"
20
21
Parameters:
22
- conf (dict): Spark configuration properties
23
- conn_id (str): Connection ID (default: 'spark_default')
24
- files (str): Files to place in working directory
25
- py_files (str): Python files for Python path
26
- archives (str): Archives to extract
27
- driver_class_path (str): Extra classpath for driver
28
- jars (str): JAR files to include
29
- java_class (str): Main class for Java/Scala apps
30
- packages (str): Maven packages to include
31
- exclude_packages (str): Maven packages to exclude
32
- repositories (str): Additional repositories
33
- total_executor_cores (int): Total cores for executors
34
- executor_cores (int): Cores per executor
35
- executor_memory (str): Memory per executor
36
- driver_memory (str): Driver memory
37
- keytab (str): Kerberos keytab path
38
- principal (str): Kerberos principal
39
- proxy_user (str): User to impersonate
40
- name (str): Application name
41
- num_executors (int): Number of executors
42
- status_poll_interval (int): Poll interval in seconds (default: 1)
43
- application_args (list): Application arguments
44
- env_vars (dict): Environment variables
45
- verbose (bool): Verbose output (default: False)
46
- spark_binary (str): Spark binary (default: 'spark-submit')
47
- properties_file (str): Properties file path
48
- yarn_queue (str): YARN queue name
49
- deploy_mode (str): Deploy mode (client or cluster)
50
- use_krb5ccache (bool): Use Kerberos credential cache (default: False)
51
"""
52
53
conn_name_attr = "conn_id"
54
default_conn_name = "spark_default"
55
conn_type = "spark"
56
hook_name = "Spark"
57
58
def __init__(
59
self,
60
conf: dict = None,
61
conn_id: str = 'spark_default',
62
files: str = None,
63
py_files: str = None,
64
archives: str = None,
65
driver_class_path: str = None,
66
jars: str = None,
67
java_class: str = None,
68
packages: str = None,
69
exclude_packages: str = None,
70
repositories: str = None,
71
total_executor_cores: int = None,
72
executor_cores: int = None,
73
executor_memory: str = None,
74
driver_memory: str = None,
75
keytab: str = None,
76
principal: str = None,
77
proxy_user: str = None,
78
name: str = 'default-name',
79
num_executors: int = None,
80
status_poll_interval: int = 1,
81
application_args: list = None,
82
env_vars: dict = None,
83
verbose: bool = False,
84
spark_binary: str = 'spark-submit',
85
properties_file: str = None,
86
yarn_queue: str = None,
87
deploy_mode: str = None,
88
*,
89
use_krb5ccache: bool = False
90
): ...
91
92
def submit(self, application: str, **kwargs) -> None:
93
"""
94
Submit Spark application for execution.
95
96
Parameters:
97
- application (str): Path to Spark application file
98
- **kwargs: Additional arguments override hook defaults
99
"""
100
101
def on_kill(self) -> None:
102
"""Kill the running Spark job."""
103
104
def get_conn(self):
105
"""Get connection (no-op for Spark submit)."""
106
107
@staticmethod
108
def get_ui_field_behaviour() -> dict:
109
"""Return UI field configuration for connection form."""
110
111
@staticmethod
112
def get_connection_form_widgets() -> dict:
113
"""Return connection form widgets configuration."""
114
```
115
116
#### Usage Example
117
118
```python
119
from airflow.providers.apache.spark.hooks.spark_submit import SparkSubmitHook
120
121
# Create hook with configuration
122
hook = SparkSubmitHook(
123
conn_id='spark_cluster',
124
conf={
125
'spark.executor.memory': '4g',
126
'spark.executor.cores': '2',
127
'spark.driver.memory': '2g',
128
},
129
packages='org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0',
130
verbose=True
131
)
132
133
# Submit application
134
hook.submit(
135
application='/path/to/app.py',
136
application_args=['--input', 'hdfs://data/input', '--output', 'hdfs://data/output']
137
)
138
```
139
140
### Spark SQL Connection Management
141
142
Manage connections for Spark SQL operations with support for Hive integration and distributed SQL execution.
143
144
```python { .api }
145
class SparkSqlHook(BaseHook):
146
"""
147
Hook for spark-sql binary execution with SQL query support.
148
149
Connection Information:
150
- conn_name_attr: "conn_id"
151
- default_conn_name: "spark_sql_default"
152
- conn_type: "spark_sql"
153
- hook_name: "Spark SQL"
154
155
Parameters:
156
- sql (str): SQL query to execute
157
- conf (dict): Spark configuration properties
158
- conn_id (str): Connection ID (default: 'spark_sql_default')
159
- total_executor_cores (int): Total cores for executors
160
- executor_cores (int): Cores per executor
161
- executor_memory (str): Memory per executor
162
- keytab (str): Kerberos keytab path
163
- principal (str): Kerberos principal
164
- master (str): Cluster manager URL (default: 'yarn')
165
- name (str): Application name (default: 'default')
166
- num_executors (int): Number of executors
167
- verbose (bool): Verbose output (default: True)
168
- yarn_queue (str): YARN queue (default: 'default')
169
- properties_file (str): Properties file path
170
- application_args (list): Additional arguments
171
"""
172
173
conn_name_attr = "conn_id"
174
default_conn_name = "spark_sql_default"
175
conn_type = "spark_sql"
176
hook_name = "Spark SQL"
177
178
def __init__(
179
self,
180
sql: str = None,
181
conf: dict = None,
182
conn_id: str = 'spark_sql_default',
183
total_executor_cores: int = None,
184
executor_cores: int = None,
185
executor_memory: str = None,
186
keytab: str = None,
187
principal: str = None,
188
master: str = 'yarn',
189
name: str = 'default',
190
num_executors: int = None,
191
verbose: bool = True,
192
yarn_queue: str = 'default',
193
properties_file: str = None,
194
application_args: list = None
195
): ...
196
197
def run_query(self, cmd: str = None, **kwargs) -> None:
198
"""
199
Execute Spark SQL query.
200
201
Parameters:
202
- cmd (str): SQL command to execute (overrides sql parameter)
203
- **kwargs: Additional execution parameters
204
"""
205
206
def kill(self) -> None:
207
"""Kill the running SQL query."""
208
209
def get_conn(self):
210
"""Get connection (no-op for Spark SQL)."""
211
212
@staticmethod
213
def get_ui_field_behaviour() -> dict:
214
"""Return UI field configuration for connection form."""
215
216
@staticmethod
217
def get_connection_form_widgets() -> dict:
218
"""Return connection form widgets configuration."""
219
```
220
221
#### Usage Example
222
223
```python
224
from airflow.providers.apache.spark.hooks.spark_sql import SparkSqlHook
225
226
# Create hook for SQL execution
227
hook = SparkSqlHook(
228
conn_id='spark_sql_cluster',
229
conf={
230
'spark.sql.adaptive.enabled': 'true',
231
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
232
},
233
master='yarn',
234
yarn_queue='analytics'
235
)
236
237
# Execute SQL query
238
hook.run_query("""
239
CREATE TABLE daily_summary AS
240
SELECT date, region, COUNT(*) as transactions
241
FROM sales
242
WHERE date = current_date()
243
GROUP BY date, region
244
""")
245
```
246
247
### Spark JDBC Connection Management
248
249
Manage connections for Spark JDBC operations with database integration support including authentication and connection pooling.
250
251
```python { .api }
252
class SparkJDBCHook(SparkSubmitHook):
253
"""
254
Hook for Spark JDBC operations, extends SparkSubmitHook.
255
256
Connection Information:
257
- conn_name_attr: "spark_conn_id"
258
- default_conn_name: "spark_default"
259
- conn_type: "spark_jdbc"
260
- hook_name: "Spark JDBC"
261
262
Parameters:
263
- spark_app_name (str): Spark application name (default: 'airflow-spark-jdbc')
264
- spark_conn_id (str): Spark connection ID (default: 'spark_default')
265
- spark_conf (dict): Spark configuration
266
- spark_py_files (str): Python files
267
- spark_files (str): Additional files
268
- spark_jars (str): JAR files (include JDBC drivers)
269
- num_executors (int): Number of executors
270
- executor_cores (int): Cores per executor
271
- executor_memory (str): Memory per executor
272
- driver_memory (str): Driver memory
273
- verbose (bool): Verbose output
274
- keytab (str): Kerberos keytab
275
- principal (str): Kerberos principal
276
- cmd_type (str): Operation type ('spark_to_jdbc', 'jdbc_to_spark')
277
- jdbc_table (str): JDBC table name
278
- jdbc_conn_id (str): JDBC connection ID (default: 'jdbc_default')
279
- jdbc_driver (str): JDBC driver class
280
- metastore_table (str): Spark metastore table
281
- jdbc_truncate (bool): Truncate table before write
282
- save_mode (str): Save mode for writes
283
- save_format (str): Data format
284
- batch_size (int): JDBC batch size
285
- fetch_size (int): JDBC fetch size
286
- num_partitions (int): Number of partitions
287
- partition_column (str): Partitioning column
288
- lower_bound (str): Partition lower bound
289
- upper_bound (str): Partition upper bound
290
- create_table_column_types (str): Column types for table creation
291
- use_krb5ccache (bool): Use Kerberos credential cache
292
"""
293
294
conn_name_attr = "spark_conn_id"
295
default_conn_name = "spark_default"
296
conn_type = "spark_jdbc"
297
hook_name = "Spark JDBC"
298
299
def __init__(
300
self,
301
spark_app_name: str = 'airflow-spark-jdbc',
302
spark_conn_id: str = 'spark_default',
303
spark_conf: dict = None,
304
spark_py_files: str = None,
305
spark_files: str = None,
306
spark_jars: str = None,
307
num_executors: int = None,
308
executor_cores: int = None,
309
executor_memory: str = None,
310
driver_memory: str = None,
311
verbose: bool = False,
312
keytab: str = None,
313
principal: str = None,
314
cmd_type: str = 'spark_to_jdbc',
315
jdbc_table: str = None,
316
jdbc_conn_id: str = 'jdbc_default',
317
jdbc_driver: str = None,
318
metastore_table: str = None,
319
jdbc_truncate: bool = False,
320
save_mode: str = None,
321
save_format: str = None,
322
batch_size: int = None,
323
fetch_size: int = None,
324
num_partitions: int = None,
325
partition_column: str = None,
326
lower_bound: str = None,
327
upper_bound: str = None,
328
create_table_column_types: str = None,
329
use_krb5ccache: bool = False
330
): ...
331
332
def submit_jdbc_job(self) -> None:
333
"""Submit Spark JDBC transfer job."""
334
335
def get_conn(self):
336
"""Get connection (no-op for Spark JDBC)."""
337
```
338
339
#### Usage Example
340
341
```python
342
from airflow.providers.apache.spark.hooks.spark_jdbc import SparkJDBCHook
343
344
# Create hook for database transfer
345
hook = SparkJDBCHook(
346
spark_conn_id='spark_cluster',
347
jdbc_conn_id='postgres_warehouse',
348
cmd_type='jdbc_to_spark',
349
jdbc_table='customer_data',
350
metastore_table='customers',
351
jdbc_driver='org.postgresql.Driver',
352
spark_jars='postgresql-42.2.18.jar',
353
num_partitions=8,
354
partition_column='customer_id',
355
lower_bound='1',
356
upper_bound='1000000'
357
)
358
359
# Execute transfer
360
hook.submit_jdbc_job()
361
```
362
363
### Spark Connect Connection Management
364
365
Manage connections using the modern Spark Connect protocol for improved performance and scalability.
366
367
```python { .api }
368
class SparkConnectHook(BaseHook):
369
"""
370
Hook for Spark Connect protocol connections.
371
372
Connection Information:
373
- conn_name_attr: "conn_id"
374
- default_conn_name: "spark_connect_default"
375
- conn_type: "spark_connect"
376
- hook_name: "Spark Connect"
377
378
Constants:
379
- PARAM_USE_SSL: "use_ssl"
380
- PARAM_TOKEN: "token"
381
- PARAM_USER_ID: "user_id"
382
383
Parameters:
384
- conn_id (str): Connection ID (default: 'spark_connect_default')
385
"""
386
387
conn_name_attr = "conn_id"
388
default_conn_name = "spark_connect_default"
389
conn_type = "spark_connect"
390
hook_name = "Spark Connect"
391
392
PARAM_USE_SSL = "use_ssl"
393
PARAM_TOKEN = "token"
394
PARAM_USER_ID = "user_id"
395
396
def __init__(self, conn_id: str = 'spark_connect_default'): ...
397
398
def get_connection_url(self) -> str:
399
"""
400
Build Spark Connect connection URL.
401
402
Returns:
403
str: Complete Spark Connect URL for client connections
404
"""
405
406
@staticmethod
407
def get_ui_field_behaviour() -> dict:
408
"""Return UI field configuration for connection form."""
409
410
@staticmethod
411
def get_connection_form_widgets() -> dict:
412
"""Return connection form widgets configuration."""
413
```
414
415
#### Usage Example
416
417
```python
418
from airflow.providers.apache.spark.hooks.spark_connect import SparkConnectHook
419
420
# Create Spark Connect hook
421
hook = SparkConnectHook(conn_id='spark_connect_cluster')
422
423
# Get connection URL for Spark Connect client
424
connect_url = hook.get_connection_url()
425
# Returns: sc://hostname:15002/;token=abc123;user_id=airflow
426
427
# Use with Spark Connect client
428
from pyspark.sql import SparkSession
429
430
spark = SparkSession.builder \
431
.remote(connect_url) \
432
.appName("Airflow Spark Connect") \
433
.getOrCreate()
434
```
435
436
## Connection Configuration
437
438
### Spark Connection (`spark`)
439
- **Host**: Cluster manager URL (e.g., `yarn`, `spark://master:7077`, `k8s://api-server`)
440
- **Extra**: JSON with additional Spark configuration
441
- **Login/Password**: For cluster authentication if required
442
443
### Spark SQL Connection (`spark_sql`)
444
- **Host**: Cluster manager URL
445
- **Extra**: JSON with Spark SQL specific configuration
446
- **Schema**: Default database/schema
447
448
### Spark JDBC Connection (`spark_jdbc`)
449
- **Inherits**: Spark connection configuration
450
- **Extra**: JDBC-specific settings and driver configuration
451
452
### Spark Connect Connection (`spark_connect`)
453
- **Host**: Spark Connect server hostname
454
- **Port**: Spark Connect server port (default: 15002)
455
- **Extra**: JSON with SSL settings, tokens, and user authentication
456
457
## Error Handling and Best Practices
458
459
### Connection Management
460
- **Connection pooling**: Configure appropriate pool sizes for JDBC operations
461
- **Authentication**: Ensure proper Kerberos or token-based authentication setup
462
- **SSL/TLS**: Use secure connections for production environments
463
- **Timeouts**: Configure appropriate connection and query timeouts
464
465
### Resource Management
466
- **Memory allocation**: Balance driver and executor memory based on workload
467
- **Core allocation**: Optimize core allocation for cluster utilization
468
- **Dynamic allocation**: Use Spark's dynamic allocation for variable workloads
469
470
### Error Recovery
471
- **Retry logic**: Implement retry mechanisms for transient failures
472
- **Graceful degradation**: Handle cluster unavailability scenarios
473
- **Resource monitoring**: Monitor cluster resources and job progress