or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfacets-metadata.mdindex.mdlineage-extraction.mdplugin-integration.mdselective-control.mdspark-integration.mdsql-parsing.mdsql-utilities.mdtemplate-macros.mdutility-functions.md

spark-integration.mddocs/

0

# Spark Integration

1

2

Utilities for integrating with Spark applications, including automatic injection of OpenLineage configuration into Spark properties for comprehensive lineage tracking across Spark jobs executed from Airflow.

3

4

## Capabilities

5

6

### Parent Job Information Injection

7

8

Function for injecting OpenLineage parent job information into Spark application properties.

9

10

```python { .api }

11

def inject_parent_job_information_into_spark_properties(properties: dict, context: Context) -> dict:

12

"""

13

Inject OpenLineage parent job information into Spark application properties.

14

15

Automatically injects parent job details (namespace, job name, run ID) from the

16

current Airflow task context into Spark application properties. This enables

17

Spark applications to emit lineage events that are properly linked to their

18

parent Airflow jobs.

19

20

Args:

21

properties: Existing Spark application properties dictionary

22

context: Airflow task context containing lineage information

23

24

Returns:

25

dict: Updated properties dictionary with injected parent job information

26

27

Injected Properties:

28

- spark.openlineage.parentJobNamespace: OpenLineage namespace

29

- spark.openlineage.parentJobName: Parent job name

30

- spark.openlineage.parentRunId: Parent run identifier

31

"""

32

```

33

34

### Transport Information Injection

35

36

Function for injecting OpenLineage transport configuration into Spark application properties.

37

38

```python { .api }

39

def inject_transport_information_into_spark_properties(properties: dict, context: Context) -> dict:

40

"""

41

Inject OpenLineage transport information into Spark application properties.

42

43

Automatically injects transport configuration from Airflow's OpenLineage

44

settings into Spark application properties. This allows Spark applications

45

to emit lineage events to the same backend as Airflow without requiring

46

separate configuration.

47

48

Args:

49

properties: Existing Spark application properties dictionary

50

context: Airflow task context (used for configuration access)

51

52

Returns:

53

dict: Updated properties dictionary with injected transport configuration

54

55

Injected Properties:

56

- spark.openlineage.transport.type: Transport type (http, kafka, etc.)

57

- spark.openlineage.transport.url: Transport URL (for HTTP transport)

58

- spark.openlineage.transport.endpoint: API endpoint (for HTTP transport)

59

- spark.openlineage.transport.*: Additional transport-specific properties

60

- spark.openlineage.namespace: OpenLineage namespace

61

"""

62

```

63

64

## Usage Examples

65

66

### Basic Spark Integration

67

68

```python

69

from airflow.providers.openlineage.utils.spark import (

70

inject_parent_job_information_into_spark_properties,

71

inject_transport_information_into_spark_properties

72

)

73

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

74

75

def create_spark_task_with_lineage(**context):

76

"""Create Spark task with automatic OpenLineage integration."""

77

78

# Base Spark configuration

79

spark_properties = {

80

'spark.app.name': 'data-processing-job',

81

'spark.executor.memory': '4g',

82

'spark.executor.cores': '2',

83

'spark.sql.adaptive.enabled': 'true'

84

}

85

86

# Inject parent job information

87

spark_properties = inject_parent_job_information_into_spark_properties(

88

properties=spark_properties,

89

context=context

90

)

91

92

# Inject transport information

93

spark_properties = inject_transport_information_into_spark_properties(

94

properties=spark_properties,

95

context=context

96

)

97

98

print("Spark properties with OpenLineage integration:")

99

for key, value in spark_properties.items():

100

if 'openlineage' in key.lower():

101

print(f" {key}: {value}")

102

103

return spark_properties

104

105

# Use in Spark operator

106

spark_task = SparkSubmitOperator(

107

task_id='process_data_with_lineage',

108

application='/path/to/spark_job.py',

109

conf=create_spark_task_with_lineage,

110

dag=dag

111

)

112

```

113

114

### Spark Submit with Automatic Lineage

115

116

```python

117

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

118

from airflow.providers.openlineage.utils.spark import (

119

inject_parent_job_information_into_spark_properties,

120

inject_transport_information_into_spark_properties

121

)

122

123

def spark_job_with_openlineage(**context):

124

"""Configure Spark job with OpenLineage integration."""

125

126

# Start with basic configuration

127

base_conf = {

128

'spark.app.name': f"airflow-job-{context['task_instance'].task_id}",

129

'spark.sql.adaptive.enabled': 'true',

130

'spark.sql.adaptive.coalescePartitions.enabled': 'true',

131

'spark.executor.memory': '8g',

132

'spark.executor.cores': '4'

133

}

134

135

# Add OpenLineage parent job information

136

conf_with_parent = inject_parent_job_information_into_spark_properties(base_conf, context)

137

138

# Add OpenLineage transport configuration

139

final_conf = inject_transport_information_into_spark_properties(conf_with_parent, context)

140

141

return final_conf

142

143

# Create Spark task

144

spark_analytics = SparkSubmitOperator(

145

task_id='spark_analytics_job',

146

application='/opt/spark/jobs/analytics.py',

147

application_args=['--input', '/data/raw/', '--output', '/data/processed/'],

148

conf=spark_job_with_openlineage,

149

dag=dag

150

)

151

```

152

153

### PySpark with Custom Configuration

154

155

```python

156

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

157

from airflow.providers.openlineage.utils.spark import *

158

159

def create_pyspark_config(**context):

160

"""Create PySpark configuration with OpenLineage integration."""

161

162

# PySpark-specific configuration

163

pyspark_conf = {

164

'spark.app.name': 'pyspark-data-pipeline',

165

'spark.submit.deployMode': 'cluster',

166

'spark.executor.instances': '10',

167

'spark.executor.memory': '6g',

168

'spark.executor.cores': '3',

169

'spark.driver.memory': '2g',

170

'spark.driver.cores': '1',

171

172

# Python configuration

173

'spark.pyspark.python': '/opt/python3.9/bin/python',

174

'spark.pyspark.driver.python': '/opt/python3.9/bin/python',

175

176

# Serialization

177

'spark.serializer': 'org.apache.spark.serializer.KryoSerializer',

178

179

# Dynamic allocation

180

'spark.dynamicAllocation.enabled': 'true',

181

'spark.dynamicAllocation.minExecutors': '2',

182

'spark.dynamicAllocation.maxExecutors': '20'

183

}

184

185

# Inject OpenLineage configuration

186

pyspark_conf = inject_parent_job_information_into_spark_properties(pyspark_conf, context)

187

pyspark_conf = inject_transport_information_into_spark_properties(pyspark_conf, context)

188

189

return pyspark_conf

190

191

pyspark_task = SparkSubmitOperator(

192

task_id='pyspark_etl',

193

application='/opt/spark/jobs/etl_pipeline.py',

194

py_files=['/opt/spark/libs/common_functions.py'],

195

files=['/opt/spark/config/database.conf'],

196

conf=create_pyspark_config,

197

dag=dag

198

)

199

```

200

201

### Conditional Spark Integration

202

203

```python

204

from airflow.providers.openlineage.conf import spark_inject_parent_job_info, spark_inject_transport_info

205

from airflow.providers.openlineage.utils.spark import *

206

207

def conditional_spark_config(**context):

208

"""Conditionally inject OpenLineage configuration based on settings."""

209

210

base_conf = {

211

'spark.app.name': 'conditional-lineage-job',

212

'spark.executor.memory': '4g',

213

'spark.executor.cores': '2'

214

}

215

216

# Check configuration flags

217

inject_parent = spark_inject_parent_job_info()

218

inject_transport = spark_inject_transport_info()

219

220

print(f"Parent job injection enabled: {inject_parent}")

221

print(f"Transport injection enabled: {inject_transport}")

222

223

if inject_parent:

224

base_conf = inject_parent_job_information_into_spark_properties(base_conf, context)

225

print("Injected parent job information")

226

227

if inject_transport:

228

base_conf = inject_transport_information_into_spark_properties(base_conf, context)

229

print("Injected transport information")

230

231

return base_conf

232

233

conditional_spark_task = SparkSubmitOperator(

234

task_id='conditional_spark_job',

235

application='/opt/spark/jobs/conditional_job.py',

236

conf=conditional_spark_config,

237

dag=dag

238

)

239

```

240

241

### EMR Spark Integration

242

243

```python

244

from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator

245

from airflow.providers.openlineage.utils.spark import *

246

247

def create_emr_spark_step(**context):

248

"""Create EMR Spark step with OpenLineage integration."""

249

250

# Base Spark configuration for EMR

251

spark_conf = {

252

'spark.app.name': 'emr-openlineage-job',

253

'spark.executor.memory': '8g',

254

'spark.executor.cores': '4',

255

'spark.driver.memory': '2g',

256

'spark.sql.adaptive.enabled': 'true'

257

}

258

259

# Add OpenLineage configuration

260

spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)

261

spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)

262

263

# Convert to EMR spark-submit format

264

spark_conf_args = []

265

for key, value in spark_conf.items():

266

spark_conf_args.extend(['--conf', f'{key}={value}'])

267

268

# EMR step configuration

269

step = {

270

'Name': 'Spark Job with OpenLineage',

271

'ActionOnFailure': 'TERMINATE_CLUSTER',

272

'HadoopJarStep': {

273

'Jar': 'command-runner.jar',

274

'Args': [

275

'spark-submit',

276

'--deploy-mode', 'cluster',

277

'--class', 'org.apache.spark.examples.SparkPi'

278

] + spark_conf_args + [

279

's3://my-bucket/spark-jobs/data-processing.py',

280

'--input', 's3://my-bucket/input/',

281

'--output', 's3://my-bucket/output/'

282

]

283

}

284

}

285

286

return [step]

287

288

emr_spark_task = EmrAddStepsOperator(

289

task_id='emr_spark_with_lineage',

290

job_flow_id='{{ ti.xcom_pull(task_ids="create_emr_cluster", key="return_value") }}',

291

steps=create_emr_spark_step,

292

dag=dag

293

)

294

```

295

296

### Databricks Integration

297

298

```python

299

from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator

300

from airflow.providers.openlineage.utils.spark import *

301

302

def create_databricks_config(**context):

303

"""Create Databricks configuration with OpenLineage integration."""

304

305

# Base Databricks Spark configuration

306

spark_conf = {

307

'spark.app.name': 'databricks-openlineage-job',

308

'spark.sql.adaptive.enabled': 'true',

309

'spark.sql.adaptive.coalescePartitions.enabled': 'true'

310

}

311

312

# Add OpenLineage configuration

313

spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)

314

spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)

315

316

# Databricks job configuration

317

databricks_config = {

318

'spark_python_task': {

319

'python_file': 'dbfs:/mnt/jobs/data_processing.py',

320

'parameters': ['--input', 'dbfs:/mnt/data/input', '--output', 'dbfs:/mnt/data/output']

321

},

322

'new_cluster': {

323

'spark_version': '11.3.x-scala2.12',

324

'node_type_id': 'i3.xlarge',

325

'num_workers': 4,

326

'spark_conf': spark_conf

327

}

328

}

329

330

return databricks_config

331

332

databricks_task = DatabricksSubmitRunOperator(

333

task_id='databricks_with_lineage',

334

json=create_databricks_config,

335

dag=dag

336

)

337

```

338

339

### Kubernetes Spark Operator Integration

340

341

```python

342

from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator

343

from airflow.providers.openlineage.utils.spark import *

344

345

def create_k8s_spark_config(**context):

346

"""Create Kubernetes Spark configuration with OpenLineage."""

347

348

# Base Kubernetes Spark configuration

349

spark_conf = {

350

'spark.app.name': 'k8s-openlineage-job',

351

'spark.kubernetes.container.image': 'my-registry/spark:3.4.0',

352

'spark.kubernetes.driver.pod.name': f"spark-driver-{context['task_instance'].task_id}",

353

'spark.executor.instances': '3',

354

'spark.executor.memory': '4g',

355

'spark.executor.cores': '2',

356

'spark.driver.memory': '2g',

357

'spark.driver.cores': '1'

358

}

359

360

# Add OpenLineage configuration

361

spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)

362

spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)

363

364

return spark_conf

365

366

k8s_spark_task = SparkKubernetesOperator(

367

task_id='k8s_spark_with_lineage',

368

application_file='/opt/spark/jobs/k8s_job.py',

369

kubernetes_conn_id='kubernetes_default',

370

conf=create_k8s_spark_config,

371

dag=dag

372

)

373

```

374

375

### Custom Spark Configuration Template

376

377

```python

378

from airflow.providers.openlineage.utils.spark import *

379

from airflow.models import Variable

380

381

def create_templated_spark_config(**context):

382

"""Create templated Spark configuration with environment-specific settings."""

383

384

# Get environment from Airflow Variable

385

environment = Variable.get('environment', default_var='development')

386

387

# Environment-specific base configuration

388

base_configs = {

389

'development': {

390

'spark.executor.memory': '2g',

391

'spark.executor.cores': '1',

392

'spark.executor.instances': '2'

393

},

394

'staging': {

395

'spark.executor.memory': '4g',

396

'spark.executor.cores': '2',

397

'spark.executor.instances': '5'

398

},

399

'production': {

400

'spark.executor.memory': '8g',

401

'spark.executor.cores': '4',

402

'spark.executor.instances': '10'

403

}

404

}

405

406

# Start with environment-specific config

407

spark_conf = base_configs.get(environment, base_configs['development']).copy()

408

409

# Add common configuration

410

spark_conf.update({

411

'spark.app.name': f"{environment}-{context['task_instance'].task_id}",

412

'spark.sql.adaptive.enabled': 'true',

413

'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'

414

})

415

416

# Add OpenLineage configuration

417

spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)

418

spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)

419

420

# Add environment-specific OpenLineage namespace override

421

if 'spark.openlineage.namespace' in spark_conf:

422

spark_conf['spark.openlineage.namespace'] = f"{environment}_{spark_conf['spark.openlineage.namespace']}"

423

424

return spark_conf

425

426

templated_spark_task = SparkSubmitOperator(

427

task_id='templated_spark_job',

428

application='/opt/spark/jobs/templated_job.py',

429

conf=create_templated_spark_config,

430

dag=dag

431

)

432

```

433

434

## Configuration Examples

435

436

### Airflow Configuration for Spark Integration

437

438

```ini

439

# airflow.cfg

440

[openlineage]

441

spark_inject_parent_job_info = true

442

spark_inject_transport_info = true

443

transport = {"type": "http", "url": "http://marquez:5000"}

444

namespace = production_airflow

445

```

446

447

### Injected Spark Properties

448

449

When the injection functions are used, the following properties are automatically added to Spark configurations:

450

451

```python

452

# Parent job information

453

{

454

'spark.openlineage.parentJobNamespace': 'production_airflow',

455

'spark.openlineage.parentJobName': 'my_dag.my_task',

456

'spark.openlineage.parentRunId': 'my_dag.my_task.2023-12-01T00:00:00+00:00.1'

457

}

458

459

# Transport information

460

{

461

'spark.openlineage.transport.type': 'http',

462

'spark.openlineage.transport.url': 'http://marquez:5000',

463

'spark.openlineage.transport.endpoint': '/api/v1/lineage',

464

'spark.openlineage.namespace': 'production_airflow'

465

}

466

```

467

468

## Integration Benefits

469

470

1. **Automatic Lineage Linking**: Spark jobs automatically link to their parent Airflow tasks

471

2. **Unified Configuration**: Single OpenLineage configuration shared between Airflow and Spark

472

3. **Complete Data Flow Tracking**: End-to-end lineage from Airflow through Spark transformations

473

4. **Simplified Setup**: No need for separate Spark OpenLineage configuration

474

5. **Environment Consistency**: Same lineage backend for both orchestration and processing layers