or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfacets-metadata.mdindex.mdlineage-extraction.mdplugin-integration.mdselective-control.mdspark-integration.mdsql-parsing.mdsql-utilities.mdtemplate-macros.mdutility-functions.md

lineage-extraction.mddocs/

0

# Lineage Extraction Framework

1

2

Extensible framework for extracting lineage metadata from Airflow operators, including base classes, built-in extractors, custom extractor registration, and centralized management. This framework enables automatic discovery of data flows and transformations across different operator types.

3

4

## Capabilities

5

6

### Base Extractor Classes

7

8

Core classes that define the extraction interface and data structures for lineage metadata.

9

10

```python { .api }

11

class OperatorLineage:

12

"""

13

Generic container for lineage data including inputs, outputs, and metadata facets.

14

"""

15

16

inputs: list[Dataset] # Input datasets consumed by the operation

17

outputs: list[Dataset] # Output datasets produced by the operation

18

run_facets: dict[str, BaseFacet] # Runtime metadata facets

19

job_facets: dict[str, BaseFacet] # Job-level metadata facets

20

21

class BaseExtractor:

22

"""

23

Abstract base class for implementing custom lineage extractors.

24

"""

25

26

def __init__(self, operator):

27

"""

28

Initialize extractor with operator instance.

29

30

Args:

31

operator: Airflow operator instance to extract lineage from

32

"""

33

34

@classmethod

35

def get_operator_classnames(cls) -> list[str]:

36

"""

37

Return list of operator class names this extractor handles.

38

39

Returns:

40

list[str]: Fully qualified operator class names

41

"""

42

43

def extract(self) -> OperatorLineage | None:

44

"""

45

Extract lineage metadata for task start events.

46

47

Returns:

48

OperatorLineage: Extracted lineage metadata or None if no extraction possible

49

"""

50

51

def extract_on_complete(self, task_instance) -> OperatorLineage | None:

52

"""

53

Extract lineage metadata for task completion events.

54

55

Args:

56

task_instance: Completed task instance

57

58

Returns:

59

OperatorLineage: Extracted lineage metadata or None

60

"""

61

62

def extract_on_failure(self, task_instance) -> OperatorLineage | None:

63

"""

64

Extract lineage metadata for task failure events.

65

66

Args:

67

task_instance: Failed task instance

68

69

Returns:

70

OperatorLineage: Extracted lineage metadata or None

71

"""

72

73

class DefaultExtractor(BaseExtractor):

74

"""

75

Default implementation of BaseExtractor that provides fallback extraction.

76

77

Uses operator's built-in OpenLineage methods if available, otherwise

78

attempts to extract basic lineage from operator properties.

79

"""

80

```

81

82

### Extractor Manager

83

84

Central management system for registering extractors and coordinating lineage extraction.

85

86

```python { .api }

87

class ExtractorManager:

88

"""

89

Central manager for registering and executing lineage extractors.

90

"""

91

92

def __init__(self):

93

"""Initialize extractor manager with built-in extractors."""

94

95

def add_extractor(self, operator_class: str, extractor: type[BaseExtractor]):

96

"""

97

Register a custom extractor for specific operator class.

98

99

Args:

100

operator_class: Fully qualified operator class name

101

extractor: Extractor class to handle the operator

102

"""

103

104

def extract_metadata(

105

self,

106

dagrun,

107

task,

108

task_instance_state: TaskInstanceState,

109

task_instance=None

110

) -> OperatorLineage:

111

"""

112

Extract lineage metadata for a task using appropriate extractor.

113

114

Args:

115

dagrun: DAG run instance

116

task: Task/operator instance

117

task_instance_state: Current task instance state

118

task_instance: Task instance (optional)

119

120

Returns:

121

OperatorLineage: Extracted lineage metadata

122

"""

123

124

def get_extractor_class(self, task: Operator) -> type[BaseExtractor] | None:

125

"""

126

Get appropriate extractor class for a task.

127

128

Args:

129

task: Airflow task/operator

130

131

Returns:

132

type[BaseExtractor]: Extractor class or None if no match

133

"""

134

135

def extract_inlets_and_outlets(self, task_metadata: OperatorLineage, task):

136

"""

137

Extract additional lineage from task's inlets and outlets properties.

138

139

Args:

140

task_metadata: Existing task lineage metadata

141

task: Airflow task instance

142

"""

143

144

def get_hook_lineage(self) -> tuple[list[Dataset], list[Dataset]] | None:

145

"""

146

Extract lineage from database hooks if available.

147

148

Returns:

149

tuple: (input_datasets, output_datasets) or None

150

"""

151

152

@staticmethod

153

def convert_to_ol_dataset_from_object_storage_uri(uri: str) -> Dataset | None:

154

"""

155

Convert object storage URI to OpenLineage Dataset.

156

157

Args:

158

uri: Object storage URI (s3://, gcs://, etc.)

159

160

Returns:

161

Dataset: OpenLineage dataset or None if conversion fails

162

"""

163

164

@staticmethod

165

def convert_to_ol_dataset_from_table(table: Table) -> Dataset:

166

"""

167

Convert Airflow Table object to OpenLineage Dataset.

168

169

Args:

170

table: Airflow Table instance

171

172

Returns:

173

Dataset: OpenLineage dataset representation

174

"""

175

176

@staticmethod

177

def convert_to_ol_dataset(obj) -> Dataset | None:

178

"""

179

Convert various object types to OpenLineage Dataset.

180

181

Args:

182

obj: Object to convert (URI string, Table, etc.)

183

184

Returns:

185

Dataset: OpenLineage dataset or None if conversion not supported

186

"""

187

188

def validate_task_metadata(self, task_metadata) -> OperatorLineage | None:

189

"""

190

Validate and normalize task metadata.

191

192

Args:

193

task_metadata: Raw task metadata to validate

194

195

Returns:

196

OperatorLineage: Validated metadata or None if invalid

197

"""

198

```

199

200

### Built-in Extractors

201

202

Pre-built extractors for common Airflow operator types.

203

204

```python { .api }

205

class BashExtractor(BaseExtractor):

206

"""

207

Extractor for BashOperator tasks that captures command execution metadata.

208

209

Extracts command text, working directory, and environment variables

210

when source code inclusion is enabled.

211

"""

212

213

@classmethod

214

def get_operator_classnames(cls) -> list[str]:

215

"""Returns: ['airflow.operators.bash.BashOperator']"""

216

217

class PythonExtractor(BaseExtractor):

218

"""

219

Extractor for PythonOperator tasks that captures function execution metadata.

220

221

Extracts function source code, callable information, and context variables

222

when source code inclusion is enabled.

223

"""

224

225

@classmethod

226

def get_operator_classnames(cls) -> list[str]:

227

"""Returns: ['airflow.operators.python.PythonOperator']"""

228

```

229

230

### Extraction Constants

231

232

Method name constants for operator-level OpenLineage integration.

233

234

```python { .api }

235

OL_METHOD_NAME_START: str = "get_openlineage_facets_on_start"

236

"""Method name for start event lineage extraction in operators."""

237

238

OL_METHOD_NAME_COMPLETE: str = "get_openlineage_facets_on_complete"

239

"""Method name for completion event lineage extraction in operators."""

240

241

OL_METHOD_NAME_FAIL: str = "get_openlineage_facets_on_failure"

242

"""Method name for failure event lineage extraction in operators."""

243

```

244

245

## Usage Examples

246

247

### Creating Custom Extractor

248

249

```python

250

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage

251

from openlineage.client.event_v2 import Dataset

252

from my_package.operators import CustomDataOperator

253

254

class CustomDataExtractor(BaseExtractor):

255

"""Custom extractor for CustomDataOperator."""

256

257

@classmethod

258

def get_operator_classnames(cls):

259

return ['my_package.operators.CustomDataOperator']

260

261

def extract(self):

262

# Extract lineage from operator properties

263

operator = self._operator

264

265

inputs = []

266

if hasattr(operator, 'input_path'):

267

inputs.append(Dataset(

268

namespace='file',

269

name=operator.input_path

270

))

271

272

outputs = []

273

if hasattr(operator, 'output_path'):

274

outputs.append(Dataset(

275

namespace='file',

276

name=operator.output_path

277

))

278

279

return OperatorLineage(

280

inputs=inputs,

281

outputs=outputs,

282

run_facets={},

283

job_facets={}

284

)

285

286

# Register custom extractor

287

from airflow.providers.openlineage.extractors.manager import ExtractorManager

288

289

manager = ExtractorManager()

290

manager.add_extractor('my_package.operators.CustomDataOperator', CustomDataExtractor)

291

```

292

293

### Using Extractor Manager

294

295

```python

296

from airflow.providers.openlineage.extractors.manager import ExtractorManager

297

from airflow.operators.python import PythonOperator

298

from airflow.utils.state import TaskInstanceState

299

300

# Initialize manager

301

manager = ExtractorManager()

302

303

# Create sample task

304

def my_function():

305

return "Hello World"

306

307

task = PythonOperator(

308

task_id='python_task',

309

python_callable=my_function,

310

dag=dag

311

)

312

313

# Extract lineage metadata

314

lineage = manager.extract_metadata(

315

dagrun=dag_run,

316

task=task,

317

task_instance_state=TaskInstanceState.RUNNING,

318

task_instance=task_instance

319

)

320

321

print(f"Inputs: {lineage.inputs}")

322

print(f"Outputs: {lineage.outputs}")

323

print(f"Run facets: {lineage.run_facets}")

324

```

325

326

### Dataset Conversion Utilities

327

328

```python

329

from airflow.providers.openlineage.extractors.manager import ExtractorManager

330

from airflow.models import Table

331

332

# Convert S3 URI to dataset

333

s3_uri = "s3://my-bucket/data/users.parquet"

334

s3_dataset = ExtractorManager.convert_to_ol_dataset_from_object_storage_uri(s3_uri)

335

print(f"S3 Dataset: {s3_dataset}")

336

337

# Convert Airflow Table to dataset

338

table = Table(

339

name='users',

340

schema='public',

341

database='analytics'

342

)

343

table_dataset = ExtractorManager.convert_to_ol_dataset_from_table(table)

344

print(f"Table Dataset: {table_dataset}")

345

346

# Generic conversion

347

mixed_objects = [

348

"s3://bucket/file.csv",

349

table,

350

{"name": "custom_dataset"}

351

]

352

353

datasets = [

354

ExtractorManager.convert_to_ol_dataset(obj)

355

for obj in mixed_objects

356

if ExtractorManager.convert_to_ol_dataset(obj) is not None

357

]

358

print(f"Converted datasets: {datasets}")

359

```

360

361

### Operator Integration

362

363

Custom operators can implement OpenLineage methods directly:

364

365

```python

366

from airflow import BaseOperator

367

from airflow.providers.openlineage.extractors.base import OperatorLineage

368

from openlineage.client.event_v2 import Dataset

369

370

class MyCustomOperator(BaseOperator):

371

def __init__(self, input_file: str, output_file: str, **kwargs):

372

super().__init__(**kwargs)

373

self.input_file = input_file

374

self.output_file = output_file

375

376

def execute(self, context):

377

# Operator logic here

378

pass

379

380

def get_openlineage_facets_on_start(self) -> OperatorLineage:

381

"""Called when task starts."""

382

return OperatorLineage(

383

inputs=[Dataset(namespace='file', name=self.input_file)],

384

outputs=[Dataset(namespace='file', name=self.output_file)],

385

run_facets={},

386

job_facets={}

387

)

388

389

def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:

390

"""Called when task completes successfully."""

391

# Can include actual file sizes, row counts, etc.

392

return OperatorLineage(

393

inputs=[Dataset(namespace='file', name=self.input_file)],

394

outputs=[Dataset(namespace='file', name=self.output_file)],

395

run_facets={

396

'processing_stats': {

397

'rows_processed': 1000,

398

'execution_time': task_instance.duration

399

}

400

},

401

job_facets={}

402

)

403

```

404

405

### Extracting from Task Properties

406

407

```python

408

from airflow.providers.openlineage.extractors.manager import ExtractorManager

409

410

# Extract from task's inlets/outlets

411

task_with_lineage = PythonOperator(

412

task_id='process_data',

413

python_callable=my_function,

414

inlets=[

415

Dataset(namespace='db', name='raw.users'),

416

Dataset(namespace='db', name='raw.orders')

417

],

418

outlets=[

419

Dataset(namespace='db', name='analytics.user_metrics')

420

],

421

dag=dag

422

)

423

424

manager = ExtractorManager()

425

base_lineage = OperatorLineage(inputs=[], outputs=[], run_facets={}, job_facets={})

426

427

# This will merge inlets/outlets into the lineage

428

manager.extract_inlets_and_outlets(base_lineage, task_with_lineage)

429

430

print(f"Final inputs: {base_lineage.inputs}")

431

print(f"Final outputs: {base_lineage.outputs}")

432

```

433

434

## Extractor Registration

435

436

### Configuration-based Registration

437

438

Register extractors via Airflow configuration:

439

440

```ini

441

[openlineage]

442

extractors = my_package.extractors.CustomSQLExtractor;my_package.extractors.KafkaExtractor

443

```

444

445

### Programmatic Registration

446

447

```python

448

from airflow.providers.openlineage.extractors.manager import ExtractorManager

449

450

# Get the global manager instance

451

manager = ExtractorManager()

452

453

# Register multiple extractors

454

extractors = {

455

'my_package.operators.S3Operator': 'my_package.extractors.S3Extractor',

456

'my_package.operators.KafkaOperator': 'my_package.extractors.KafkaExtractor',

457

}

458

459

for operator_class, extractor_class in extractors.items():

460

# Import and register

461

extractor = __import__(extractor_class, fromlist=[''])

462

manager.add_extractor(operator_class, extractor)

463

```

464

465

## Advanced Patterns

466

467

### Conditional Extraction

468

469

```python

470

class ConditionalExtractor(BaseExtractor):

471

def extract(self):

472

# Only extract if certain conditions are met

473

if not hasattr(self._operator, 'enable_lineage') or not self._operator.enable_lineage:

474

return None

475

476

# Normal extraction logic

477

return OperatorLineage(...)

478

```

479

480

### Multi-format Support

481

482

```python

483

class FileExtractor(BaseExtractor):

484

def extract(self):

485

file_path = self._operator.file_path

486

487

# Determine format and create appropriate metadata

488

if file_path.endswith('.parquet'):

489

namespace = 'parquet'

490

elif file_path.endswith('.csv'):

491

namespace = 'csv'

492

else:

493

namespace = 'file'

494

495

return OperatorLineage(

496

inputs=[Dataset(namespace=namespace, name=file_path)],

497

outputs=[],

498

run_facets={},

499

job_facets={}

500

)

501

```