or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfacets-metadata.mdindex.mdlineage-extraction.mdplugin-integration.mdselective-control.mdspark-integration.mdsql-parsing.mdsql-utilities.mdtemplate-macros.mdutility-functions.md

plugin-integration.mddocs/

0

# Plugin Integration

1

2

Core plugin components for Airflow integration, including event listeners, adapters, automatic event emission, and OpenLineage client management. This integration provides seamless data lineage tracking across the entire Airflow ecosystem.

3

4

## Capabilities

5

6

### OpenLineage Adapter

7

8

Core adapter for creating and emitting OpenLineage events, managing the OpenLineage client, and coordinating event lifecycle.

9

10

```python { .api }

11

class OpenLineageAdapter:

12

"""

13

Core adapter for OpenLineage event creation and emission.

14

15

Manages OpenLineage client instances, event building, and emission to

16

configured transport backends.

17

"""

18

19

def __init__(self, client: OpenLineageClient | None = None, secrets_masker: SecretsMasker | None = None):

20

"""

21

Initialize adapter with optional client and secrets masker.

22

23

Args:

24

client: Pre-configured OpenLineage client

25

secrets_masker: Secrets masking utility for sensitive data

26

"""

27

28

def get_or_create_openlineage_client(self) -> OpenLineageClient:

29

"""

30

Get existing or create new OpenLineage client from configuration.

31

32

Returns:

33

OpenLineageClient: Configured client for event emission

34

"""

35

36

def get_openlineage_config(self) -> dict | None:

37

"""

38

Get complete OpenLineage configuration dictionary.

39

40

Returns:

41

dict: Configuration settings or None if not configured

42

"""

43

44

@staticmethod

45

def build_dag_run_id(dag_id: str, logical_date: datetime, clear_number: int) -> str:

46

"""

47

Build unique DAG run identifier for OpenLineage events.

48

49

Args:

50

dag_id: DAG identifier

51

logical_date: DAG run logical execution date

52

clear_number: Clear/retry number for the run

53

54

Returns:

55

str: Unique DAG run identifier

56

"""

57

58

@staticmethod

59

def build_task_instance_run_id(

60

dag_id: str,

61

task_id: str,

62

execution_date: datetime,

63

try_number: int

64

) -> str:

65

"""

66

Build unique task instance run identifier.

67

68

Args:

69

dag_id: DAG identifier

70

task_id: Task identifier

71

execution_date: Task execution date

72

try_number: Task attempt number

73

74

Returns:

75

str: Unique task run identifier

76

"""

77

78

def emit(self, event: RunEvent) -> RunEvent:

79

"""

80

Emit OpenLineage event to configured transport.

81

82

Args:

83

event: OpenLineage run event to emit

84

85

Returns:

86

RunEvent: The emitted event (for chaining/logging)

87

"""

88

89

def start_task(

90

self,

91

run_id: str,

92

job_name: str,

93

job_description: str,

94

event_time: datetime,

95

parent_run_id: str | None,

96

code_location: str | None,

97

nominal_start_time: datetime | None,

98

inputs: list[Dataset],

99

outputs: list[Dataset],

100

run_facets: dict[str, RunFacet] | None = None,

101

job_facets: dict[str, JobFacet] | None = None

102

) -> RunEvent:

103

"""

104

Create and emit task start event.

105

106

Args:

107

run_id: Unique run identifier

108

job_name: Job name

109

job_description: Job description

110

event_time: Event timestamp

111

parent_run_id: Parent run identifier (for nested jobs)

112

code_location: Source code location

113

nominal_start_time: Scheduled start time

114

inputs: Input datasets

115

outputs: Output datasets

116

run_facets: Runtime metadata facets

117

job_facets: Job-level metadata facets

118

119

Returns:

120

RunEvent: Created and emitted start event

121

"""

122

123

def complete_task(

124

self,

125

run_id: str,

126

job_name: str,

127

job_description: str,

128

event_time: datetime,

129

parent_run_id: str | None,

130

code_location: str | None,

131

nominal_start_time: datetime | None,

132

inputs: list[Dataset],

133

outputs: list[Dataset],

134

run_facets: dict[str, RunFacet] | None = None,

135

job_facets: dict[str, JobFacet] | None = None

136

) -> RunEvent:

137

"""

138

Create and emit task completion event.

139

140

Args:

141

run_id: Unique run identifier

142

job_name: Job name

143

job_description: Job description

144

event_time: Event timestamp

145

parent_run_id: Parent run identifier

146

code_location: Source code location

147

nominal_start_time: Scheduled start time

148

inputs: Input datasets

149

outputs: Output datasets

150

run_facets: Runtime metadata facets

151

job_facets: Job-level metadata facets

152

153

Returns:

154

RunEvent: Created and emitted completion event

155

"""

156

157

def fail_task(

158

self,

159

run_id: str,

160

job_name: str,

161

job_description: str,

162

event_time: datetime,

163

parent_run_id: str | None,

164

code_location: str | None,

165

nominal_start_time: datetime | None,

166

inputs: list[Dataset],

167

outputs: list[Dataset],

168

run_facets: dict[str, RunFacet] | None = None,

169

job_facets: dict[str, JobFacet] | None = None

170

) -> RunEvent:

171

"""

172

Create and emit task failure event.

173

174

Args:

175

run_id: Unique run identifier

176

job_name: Job name

177

job_description: Job description

178

event_time: Event timestamp

179

parent_run_id: Parent run identifier

180

code_location: Source code location

181

nominal_start_time: Scheduled start time

182

inputs: Input datasets

183

outputs: Output datasets

184

run_facets: Runtime metadata facets

185

job_facets: Job-level metadata facets

186

187

Returns:

188

RunEvent: Created and emitted failure event

189

"""

190

191

def dag_started(

192

self,

193

dag_run: DagRun,

194

msg: str,

195

nominal_start_time: datetime,

196

dag: DAG

197

):

198

"""

199

Handle DAG start event and emit corresponding OpenLineage event.

200

201

Args:

202

dag_run: Started DAG run instance

203

msg: Event message

204

nominal_start_time: Scheduled start time

205

dag: DAG instance

206

"""

207

208

def dag_success(

209

self,

210

dag_run: DagRun,

211

msg: str,

212

nominal_start_time: datetime,

213

dag: DAG

214

):

215

"""

216

Handle DAG success event and emit corresponding OpenLineage event.

217

218

Args:

219

dag_run: Successful DAG run instance

220

msg: Event message

221

nominal_start_time: Scheduled start time

222

dag: DAG instance

223

"""

224

225

def dag_failed(

226

self,

227

dag_run: DagRun,

228

msg: str,

229

nominal_start_time: datetime,

230

dag: DAG

231

):

232

"""

233

Handle DAG failure event and emit corresponding OpenLineage event.

234

235

Args:

236

dag_run: Failed DAG run instance

237

msg: Event message

238

nominal_start_time: Scheduled start time

239

dag: DAG instance

240

"""

241

```

242

243

### OpenLineage Event Listener

244

245

Event listener that captures Airflow DAG and task lifecycle events and coordinates with the adapter for OpenLineage event emission.

246

247

```python { .api }

248

class OpenLineageListener:

249

"""

250

Event listener for DAG and task lifecycle events.

251

252

Integrates with Airflow's event system to automatically capture

253

DAG runs, task instances, and their state changes for lineage tracking.

254

"""

255

# Implementation details are internal - provides event listening capabilities

256

257

def get_openlineage_listener() -> OpenLineageListener:

258

"""

259

Get singleton instance of OpenLineage event listener.

260

261

Returns:

262

OpenLineageListener: Global listener instance for event capture

263

"""

264

```

265

266

### Main Plugin Class

267

268

Primary Airflow plugin that registers OpenLineage functionality with the Airflow ecosystem.

269

270

```python { .api }

271

class OpenLineageProviderPlugin:

272

"""

273

Main Airflow plugin class for OpenLineage integration.

274

275

Automatically registers OpenLineage listeners, macros, and other

276

components when the provider package is installed.

277

"""

278

```

279

280

### Run State Enumeration

281

282

Enumeration for OpenLineage run states used in event creation.

283

284

```python { .api }

285

class RunState(Enum):

286

"""

287

Enumeration for OpenLineage run states.

288

289

Values:

290

START: Job/task is starting

291

RUNNING: Job/task is currently running

292

COMPLETE: Job/task completed successfully

293

ABORT: Job/task was aborted

294

FAIL: Job/task failed with error

295

"""

296

START = "START"

297

RUNNING = "RUNNING"

298

COMPLETE = "COMPLETE"

299

ABORT = "ABORT"

300

FAIL = "FAIL"

301

```

302

303

## Usage Examples

304

305

### Basic Adapter Usage

306

307

```python

308

from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter

309

from openlineage.client.event_v2 import Dataset

310

from datetime import datetime

311

312

# Initialize adapter

313

adapter = OpenLineageAdapter()

314

315

# Create datasets

316

inputs = [Dataset(namespace='db', name='raw.users')]

317

outputs = [Dataset(namespace='db', name='analytics.user_summary')]

318

319

# Emit task start event

320

start_event = adapter.start_task(

321

run_id='my-task-run-123',

322

job_name='process_users',

323

job_description='Process user data for analytics',

324

event_time=datetime.utcnow(),

325

parent_run_id=None,

326

code_location='dags/user_processing.py',

327

nominal_start_time=datetime.utcnow(),

328

inputs=inputs,

329

outputs=outputs,

330

run_facets={},

331

job_facets={}

332

)

333

334

print(f"Emitted start event: {start_event.eventType}")

335

```

336

337

### Manual Event Emission

338

339

```python

340

from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter

341

from openlineage.client.event_v2 import RunEvent, Run, Job

342

from datetime import datetime

343

344

adapter = OpenLineageAdapter()

345

346

# Create custom run event

347

event = RunEvent(

348

eventTime=datetime.utcnow(),

349

eventType='START',

350

run=Run(runId='custom-run-456'),

351

job=Job(namespace='my-namespace', name='custom-job'),

352

inputs=[],

353

outputs=[],

354

producer='airflow-openlineage-provider'

355

)

356

357

# Emit event

358

emitted_event = adapter.emit(event)

359

print(f"Emitted custom event: {emitted_event}")

360

```

361

362

### Client Configuration

363

364

```python

365

from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter

366

367

# Initialize adapter (will create client from config)

368

adapter = OpenLineageAdapter()

369

370

# Get client for direct usage

371

client = adapter.get_or_create_openlineage_client()

372

print(f"Client transport: {client.transport}")

373

374

# Get configuration

375

config = adapter.get_openlineage_config()

376

if config:

377

print(f"Transport type: {config.get('transport', {}).get('type')}")

378

print(f"Namespace: {config.get('namespace')}")

379

```

380

381

### Run ID Generation

382

383

```python

384

from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter

385

from datetime import datetime

386

387

# Generate DAG run ID

388

dag_run_id = OpenLineageAdapter.build_dag_run_id(

389

dag_id='user_processing_dag',

390

logical_date=datetime(2023, 12, 1),

391

clear_number=0

392

)

393

print(f"DAG run ID: {dag_run_id}")

394

395

# Generate task run ID

396

task_run_id = OpenLineageAdapter.build_task_instance_run_id(

397

dag_id='user_processing_dag',

398

task_id='extract_users',

399

execution_date=datetime(2023, 12, 1),

400

try_number=1

401

)

402

print(f"Task run ID: {task_run_id}")

403

```

404

405

### DAG Event Handling

406

407

```python

408

from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter

409

from airflow.models import DagRun, DAG

410

from datetime import datetime

411

412

adapter = OpenLineageAdapter()

413

414

# Handle DAG lifecycle events

415

dag_run = DagRun(

416

dag_id='my_dag',

417

execution_date=datetime.utcnow(),

418

run_id='manual_2023-12-01'

419

)

420

421

dag = DAG('my_dag', start_date=datetime(2023, 1, 1))

422

423

# Emit DAG start event

424

adapter.dag_started(

425

dag_run=dag_run,

426

msg='DAG started execution',

427

nominal_start_time=datetime.utcnow(),

428

dag=dag

429

)

430

431

# Later, emit DAG completion

432

adapter.dag_success(

433

dag_run=dag_run,

434

msg='DAG completed successfully',

435

nominal_start_time=datetime.utcnow(),

436

dag=dag

437

)

438

```

439

440

### Integration with Task Context

441

442

```python

443

from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter

444

from airflow.operators.python import PythonOperator

445

446

def my_task_function(**context):

447

# Access adapter within task

448

adapter = OpenLineageAdapter()

449

450

# Get task instance info

451

task_instance = context['task_instance']

452

453

# Build run ID for current task

454

run_id = OpenLineageAdapter.build_task_instance_run_id(

455

dag_id=task_instance.dag_id,

456

task_id=task_instance.task_id,

457

execution_date=task_instance.execution_date,

458

try_number=task_instance.try_number

459

)

460

461

print(f"Current task run ID: {run_id}")

462

463

# Task logic here...

464

465

# Use in DAG

466

task = PythonOperator(

467

task_id='my_task',

468

python_callable=my_task_function,

469

provide_context=True,

470

dag=dag

471

)

472

```

473

474

## Automatic Integration

475

476

The plugin integrates automatically when the provider is installed:

477

478

### Plugin Registration

479

480

```python

481

# In pyproject.toml

482

[project.entry-points."airflow.plugins"]

483

openlineage = "airflow.providers.openlineage.plugins.openlineage:OpenLineageProviderPlugin"

484

```

485

486

### Listener Registration

487

488

The plugin automatically registers the OpenLineage listener with Airflow's event system:

489

490

```python

491

# Automatic registration in Airflow

492

from airflow.providers.openlineage.plugins.listener import get_openlineage_listener

493

494

# Listener is automatically registered to capture:

495

# - DAG run events (start, success, failure)

496

# - Task instance events (start, success, failure, retry)

497

# - Task state changes

498

```

499

500

### Configuration Integration

501

502

The adapter automatically reads configuration from Airflow settings:

503

504

```ini

505

# airflow.cfg

506

[openlineage]

507

transport = {"type": "http", "url": "http://marquez:5000"}

508

namespace = production_airflow

509

disabled = false

510

```

511

512

## Error Handling and Resilience

513

514

### Transport Failures

515

516

```python

517

from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter

518

519

adapter = OpenLineageAdapter()

520

521

try:

522

event = adapter.start_task(...)

523

print("Event emitted successfully")

524

except Exception as e:

525

print(f"Failed to emit event: {e}")

526

# Task continues executing - lineage failures don't break DAGs

527

```

528

529

### Client Recovery

530

531

```python

532

# Adapter handles client failures gracefully

533

adapter = OpenLineageAdapter()

534

535

# If client creation fails, adapter continues with no-op behavior

536

client = adapter.get_or_create_openlineage_client()

537

538

# Events may be silently dropped if transport is unavailable

539

# This ensures DAG execution is not impacted by lineage issues

540

```

541

542

## Advanced Integration Patterns

543

544

### Custom Transport Configuration

545

546

```python

547

from openlineage.client import OpenLineageClient

548

from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter

549

550

# Create custom client

551

custom_client = OpenLineageClient(

552

url='http://my-lineage-backend:8080',

553

session_timeout=30

554

)

555

556

# Use with adapter

557

adapter = OpenLineageAdapter(client=custom_client)

558

```

559

560

### Secrets Masking

561

562

```python

563

from openlineage.client.secrets import SecretsMasker

564

from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter

565

566

# Create custom secrets masker

567

masker = SecretsMasker(

568

patterns=['password', 'api_key', 'token'],

569

replacement='***MASKED***'

570

)

571

572

# Use with adapter

573

adapter = OpenLineageAdapter(secrets_masker=masker)

574

```