0
# Spark Integration
1
2
Utilities for integrating with Spark applications, including automatic injection of OpenLineage configuration into Spark properties for comprehensive lineage tracking across Spark jobs executed from Airflow.
3
4
## Capabilities
5
6
### Parent Job Information Injection
7
8
Function for injecting OpenLineage parent job information into Spark application properties.
9
10
```python { .api }
11
def inject_parent_job_information_into_spark_properties(properties: dict, context: Context) -> dict:
12
"""
13
Inject OpenLineage parent job information into Spark application properties.
14
15
Automatically injects parent job details (namespace, job name, run ID) from the
16
current Airflow task context into Spark application properties. This enables
17
Spark applications to emit lineage events that are properly linked to their
18
parent Airflow jobs.
19
20
Args:
21
properties: Existing Spark application properties dictionary
22
context: Airflow task context containing lineage information
23
24
Returns:
25
dict: Updated properties dictionary with injected parent job information
26
27
Injected Properties:
28
- spark.openlineage.parentJobNamespace: OpenLineage namespace
29
- spark.openlineage.parentJobName: Parent job name
30
- spark.openlineage.parentRunId: Parent run identifier
31
"""
32
```
33
34
### Transport Information Injection
35
36
Function for injecting OpenLineage transport configuration into Spark application properties.
37
38
```python { .api }
39
def inject_transport_information_into_spark_properties(properties: dict, context: Context) -> dict:
40
"""
41
Inject OpenLineage transport information into Spark application properties.
42
43
Automatically injects transport configuration from Airflow's OpenLineage
44
settings into Spark application properties. This allows Spark applications
45
to emit lineage events to the same backend as Airflow without requiring
46
separate configuration.
47
48
Args:
49
properties: Existing Spark application properties dictionary
50
context: Airflow task context (used for configuration access)
51
52
Returns:
53
dict: Updated properties dictionary with injected transport configuration
54
55
Injected Properties:
56
- spark.openlineage.transport.type: Transport type (http, kafka, etc.)
57
- spark.openlineage.transport.url: Transport URL (for HTTP transport)
58
- spark.openlineage.transport.endpoint: API endpoint (for HTTP transport)
59
- spark.openlineage.transport.*: Additional transport-specific properties
60
- spark.openlineage.namespace: OpenLineage namespace
61
"""
62
```
63
64
## Usage Examples
65
66
### Basic Spark Integration
67
68
```python
69
from airflow.providers.openlineage.utils.spark import (
70
inject_parent_job_information_into_spark_properties,
71
inject_transport_information_into_spark_properties
72
)
73
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
74
75
def create_spark_task_with_lineage(**context):
76
"""Create Spark task with automatic OpenLineage integration."""
77
78
# Base Spark configuration
79
spark_properties = {
80
'spark.app.name': 'data-processing-job',
81
'spark.executor.memory': '4g',
82
'spark.executor.cores': '2',
83
'spark.sql.adaptive.enabled': 'true'
84
}
85
86
# Inject parent job information
87
spark_properties = inject_parent_job_information_into_spark_properties(
88
properties=spark_properties,
89
context=context
90
)
91
92
# Inject transport information
93
spark_properties = inject_transport_information_into_spark_properties(
94
properties=spark_properties,
95
context=context
96
)
97
98
print("Spark properties with OpenLineage integration:")
99
for key, value in spark_properties.items():
100
if 'openlineage' in key.lower():
101
print(f" {key}: {value}")
102
103
return spark_properties
104
105
# Use in Spark operator
106
spark_task = SparkSubmitOperator(
107
task_id='process_data_with_lineage',
108
application='/path/to/spark_job.py',
109
conf=create_spark_task_with_lineage,
110
dag=dag
111
)
112
```
113
114
### Spark Submit with Automatic Lineage
115
116
```python
117
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
118
from airflow.providers.openlineage.utils.spark import (
119
inject_parent_job_information_into_spark_properties,
120
inject_transport_information_into_spark_properties
121
)
122
123
def spark_job_with_openlineage(**context):
124
"""Configure Spark job with OpenLineage integration."""
125
126
# Start with basic configuration
127
base_conf = {
128
'spark.app.name': f"airflow-job-{context['task_instance'].task_id}",
129
'spark.sql.adaptive.enabled': 'true',
130
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
131
'spark.executor.memory': '8g',
132
'spark.executor.cores': '4'
133
}
134
135
# Add OpenLineage parent job information
136
conf_with_parent = inject_parent_job_information_into_spark_properties(base_conf, context)
137
138
# Add OpenLineage transport configuration
139
final_conf = inject_transport_information_into_spark_properties(conf_with_parent, context)
140
141
return final_conf
142
143
# Create Spark task
144
spark_analytics = SparkSubmitOperator(
145
task_id='spark_analytics_job',
146
application='/opt/spark/jobs/analytics.py',
147
application_args=['--input', '/data/raw/', '--output', '/data/processed/'],
148
conf=spark_job_with_openlineage,
149
dag=dag
150
)
151
```
152
153
### PySpark with Custom Configuration
154
155
```python
156
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
157
from airflow.providers.openlineage.utils.spark import *
158
159
def create_pyspark_config(**context):
160
"""Create PySpark configuration with OpenLineage integration."""
161
162
# PySpark-specific configuration
163
pyspark_conf = {
164
'spark.app.name': 'pyspark-data-pipeline',
165
'spark.submit.deployMode': 'cluster',
166
'spark.executor.instances': '10',
167
'spark.executor.memory': '6g',
168
'spark.executor.cores': '3',
169
'spark.driver.memory': '2g',
170
'spark.driver.cores': '1',
171
172
# Python configuration
173
'spark.pyspark.python': '/opt/python3.9/bin/python',
174
'spark.pyspark.driver.python': '/opt/python3.9/bin/python',
175
176
# Serialization
177
'spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
178
179
# Dynamic allocation
180
'spark.dynamicAllocation.enabled': 'true',
181
'spark.dynamicAllocation.minExecutors': '2',
182
'spark.dynamicAllocation.maxExecutors': '20'
183
}
184
185
# Inject OpenLineage configuration
186
pyspark_conf = inject_parent_job_information_into_spark_properties(pyspark_conf, context)
187
pyspark_conf = inject_transport_information_into_spark_properties(pyspark_conf, context)
188
189
return pyspark_conf
190
191
pyspark_task = SparkSubmitOperator(
192
task_id='pyspark_etl',
193
application='/opt/spark/jobs/etl_pipeline.py',
194
py_files=['/opt/spark/libs/common_functions.py'],
195
files=['/opt/spark/config/database.conf'],
196
conf=create_pyspark_config,
197
dag=dag
198
)
199
```
200
201
### Conditional Spark Integration
202
203
```python
204
from airflow.providers.openlineage.conf import spark_inject_parent_job_info, spark_inject_transport_info
205
from airflow.providers.openlineage.utils.spark import *
206
207
def conditional_spark_config(**context):
208
"""Conditionally inject OpenLineage configuration based on settings."""
209
210
base_conf = {
211
'spark.app.name': 'conditional-lineage-job',
212
'spark.executor.memory': '4g',
213
'spark.executor.cores': '2'
214
}
215
216
# Check configuration flags
217
inject_parent = spark_inject_parent_job_info()
218
inject_transport = spark_inject_transport_info()
219
220
print(f"Parent job injection enabled: {inject_parent}")
221
print(f"Transport injection enabled: {inject_transport}")
222
223
if inject_parent:
224
base_conf = inject_parent_job_information_into_spark_properties(base_conf, context)
225
print("Injected parent job information")
226
227
if inject_transport:
228
base_conf = inject_transport_information_into_spark_properties(base_conf, context)
229
print("Injected transport information")
230
231
return base_conf
232
233
conditional_spark_task = SparkSubmitOperator(
234
task_id='conditional_spark_job',
235
application='/opt/spark/jobs/conditional_job.py',
236
conf=conditional_spark_config,
237
dag=dag
238
)
239
```
240
241
### EMR Spark Integration
242
243
```python
244
from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator
245
from airflow.providers.openlineage.utils.spark import *
246
247
def create_emr_spark_step(**context):
248
"""Create EMR Spark step with OpenLineage integration."""
249
250
# Base Spark configuration for EMR
251
spark_conf = {
252
'spark.app.name': 'emr-openlineage-job',
253
'spark.executor.memory': '8g',
254
'spark.executor.cores': '4',
255
'spark.driver.memory': '2g',
256
'spark.sql.adaptive.enabled': 'true'
257
}
258
259
# Add OpenLineage configuration
260
spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)
261
spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)
262
263
# Convert to EMR spark-submit format
264
spark_conf_args = []
265
for key, value in spark_conf.items():
266
spark_conf_args.extend(['--conf', f'{key}={value}'])
267
268
# EMR step configuration
269
step = {
270
'Name': 'Spark Job with OpenLineage',
271
'ActionOnFailure': 'TERMINATE_CLUSTER',
272
'HadoopJarStep': {
273
'Jar': 'command-runner.jar',
274
'Args': [
275
'spark-submit',
276
'--deploy-mode', 'cluster',
277
'--class', 'org.apache.spark.examples.SparkPi'
278
] + spark_conf_args + [
279
's3://my-bucket/spark-jobs/data-processing.py',
280
'--input', 's3://my-bucket/input/',
281
'--output', 's3://my-bucket/output/'
282
]
283
}
284
}
285
286
return [step]
287
288
emr_spark_task = EmrAddStepsOperator(
289
task_id='emr_spark_with_lineage',
290
job_flow_id='{{ ti.xcom_pull(task_ids="create_emr_cluster", key="return_value") }}',
291
steps=create_emr_spark_step,
292
dag=dag
293
)
294
```
295
296
### Databricks Integration
297
298
```python
299
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
300
from airflow.providers.openlineage.utils.spark import *
301
302
def create_databricks_config(**context):
303
"""Create Databricks configuration with OpenLineage integration."""
304
305
# Base Databricks Spark configuration
306
spark_conf = {
307
'spark.app.name': 'databricks-openlineage-job',
308
'spark.sql.adaptive.enabled': 'true',
309
'spark.sql.adaptive.coalescePartitions.enabled': 'true'
310
}
311
312
# Add OpenLineage configuration
313
spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)
314
spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)
315
316
# Databricks job configuration
317
databricks_config = {
318
'spark_python_task': {
319
'python_file': 'dbfs:/mnt/jobs/data_processing.py',
320
'parameters': ['--input', 'dbfs:/mnt/data/input', '--output', 'dbfs:/mnt/data/output']
321
},
322
'new_cluster': {
323
'spark_version': '11.3.x-scala2.12',
324
'node_type_id': 'i3.xlarge',
325
'num_workers': 4,
326
'spark_conf': spark_conf
327
}
328
}
329
330
return databricks_config
331
332
databricks_task = DatabricksSubmitRunOperator(
333
task_id='databricks_with_lineage',
334
json=create_databricks_config,
335
dag=dag
336
)
337
```
338
339
### Kubernetes Spark Operator Integration
340
341
```python
342
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
343
from airflow.providers.openlineage.utils.spark import *
344
345
def create_k8s_spark_config(**context):
346
"""Create Kubernetes Spark configuration with OpenLineage."""
347
348
# Base Kubernetes Spark configuration
349
spark_conf = {
350
'spark.app.name': 'k8s-openlineage-job',
351
'spark.kubernetes.container.image': 'my-registry/spark:3.4.0',
352
'spark.kubernetes.driver.pod.name': f"spark-driver-{context['task_instance'].task_id}",
353
'spark.executor.instances': '3',
354
'spark.executor.memory': '4g',
355
'spark.executor.cores': '2',
356
'spark.driver.memory': '2g',
357
'spark.driver.cores': '1'
358
}
359
360
# Add OpenLineage configuration
361
spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)
362
spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)
363
364
return spark_conf
365
366
k8s_spark_task = SparkKubernetesOperator(
367
task_id='k8s_spark_with_lineage',
368
application_file='/opt/spark/jobs/k8s_job.py',
369
kubernetes_conn_id='kubernetes_default',
370
conf=create_k8s_spark_config,
371
dag=dag
372
)
373
```
374
375
### Custom Spark Configuration Template
376
377
```python
378
from airflow.providers.openlineage.utils.spark import *
379
from airflow.models import Variable
380
381
def create_templated_spark_config(**context):
382
"""Create templated Spark configuration with environment-specific settings."""
383
384
# Get environment from Airflow Variable
385
environment = Variable.get('environment', default_var='development')
386
387
# Environment-specific base configuration
388
base_configs = {
389
'development': {
390
'spark.executor.memory': '2g',
391
'spark.executor.cores': '1',
392
'spark.executor.instances': '2'
393
},
394
'staging': {
395
'spark.executor.memory': '4g',
396
'spark.executor.cores': '2',
397
'spark.executor.instances': '5'
398
},
399
'production': {
400
'spark.executor.memory': '8g',
401
'spark.executor.cores': '4',
402
'spark.executor.instances': '10'
403
}
404
}
405
406
# Start with environment-specific config
407
spark_conf = base_configs.get(environment, base_configs['development']).copy()
408
409
# Add common configuration
410
spark_conf.update({
411
'spark.app.name': f"{environment}-{context['task_instance'].task_id}",
412
'spark.sql.adaptive.enabled': 'true',
413
'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'
414
})
415
416
# Add OpenLineage configuration
417
spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)
418
spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)
419
420
# Add environment-specific OpenLineage namespace override
421
if 'spark.openlineage.namespace' in spark_conf:
422
spark_conf['spark.openlineage.namespace'] = f"{environment}_{spark_conf['spark.openlineage.namespace']}"
423
424
return spark_conf
425
426
templated_spark_task = SparkSubmitOperator(
427
task_id='templated_spark_job',
428
application='/opt/spark/jobs/templated_job.py',
429
conf=create_templated_spark_config,
430
dag=dag
431
)
432
```
433
434
## Configuration Examples
435
436
### Airflow Configuration for Spark Integration
437
438
```ini
439
# airflow.cfg
440
[openlineage]
441
spark_inject_parent_job_info = true
442
spark_inject_transport_info = true
443
transport = {"type": "http", "url": "http://marquez:5000"}
444
namespace = production_airflow
445
```
446
447
### Injected Spark Properties
448
449
When the injection functions are used, the following properties are automatically added to Spark configurations:
450
451
```python
452
# Parent job information
453
{
454
'spark.openlineage.parentJobNamespace': 'production_airflow',
455
'spark.openlineage.parentJobName': 'my_dag.my_task',
456
'spark.openlineage.parentRunId': 'my_dag.my_task.2023-12-01T00:00:00+00:00.1'
457
}
458
459
# Transport information
460
{
461
'spark.openlineage.transport.type': 'http',
462
'spark.openlineage.transport.url': 'http://marquez:5000',
463
'spark.openlineage.transport.endpoint': '/api/v1/lineage',
464
'spark.openlineage.namespace': 'production_airflow'
465
}
466
```
467
468
## Integration Benefits
469
470
1. **Automatic Lineage Linking**: Spark jobs automatically link to their parent Airflow tasks
471
2. **Unified Configuration**: Single OpenLineage configuration shared between Airflow and Spark
472
3. **Complete Data Flow Tracking**: End-to-end lineage from Airflow through Spark transformations
473
4. **Simplified Setup**: No need for separate Spark OpenLineage configuration
474
5. **Environment Consistency**: Same lineage backend for both orchestration and processing layers