or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfacets-metadata.mdindex.mdlineage-extraction.mdplugin-integration.mdselective-control.mdspark-integration.mdsql-parsing.mdsql-utilities.mdtemplate-macros.mdutility-functions.md

facets-metadata.mddocs/

0

# Facets and Metadata Enrichment

1

2

Custom facet definitions for enriching OpenLineage events with Airflow-specific metadata, including DAG information, task states, debug data, and unknown operator handling. Facets provide extensible metadata enrichment for comprehensive lineage tracking.

3

4

## Capabilities

5

6

### Airflow Run Facets

7

8

Comprehensive runtime metadata facets that capture Airflow-specific execution context.

9

10

```python { .api }

11

class AirflowRunFacet:

12

"""

13

Composite run facet containing comprehensive Airflow execution metadata.

14

"""

15

16

dag: dict # DAG metadata and configuration

17

dagRun: dict # DAG run instance information

18

task: dict # Task/operator metadata

19

taskInstance: dict # Task instance execution details

20

taskUuid: str # Unique task identifier

21

22

class AirflowDagRunFacet:

23

"""

24

DAG run specific facet with DAG and run metadata.

25

"""

26

27

dag: dict # DAG configuration and properties

28

dagRun: dict # DAG run execution details

29

30

class AirflowStateRunFacet:

31

"""

32

DAG and task state information facet for tracking execution states.

33

"""

34

35

dagRunState: str # Current DAG run state

36

tasksState: dict[str, str] # Mapping of task IDs to their current states

37

```

38

39

### Airflow Job Facets

40

41

Job-level metadata facets that describe DAG structure and task organization.

42

43

```python { .api }

44

class AirflowJobFacet:

45

"""

46

Composite job facet with task tree, groups, and task metadata.

47

"""

48

49

taskTree: dict # Hierarchical task dependency structure

50

taskGroups: dict # Task group organization and metadata

51

tasks: dict # Individual task metadata and configuration

52

```

53

54

### Mapped Task Facets

55

56

Facets specific to dynamic task mapping functionality in Airflow.

57

58

```python { .api }

59

class AirflowMappedTaskRunFacet:

60

"""

61

Facet for mapped task information and dynamic task execution metadata.

62

"""

63

64

mapIndex: int # Index of this mapped task instance

65

operatorClass: str # Fully qualified operator class name

66

67

@classmethod

68

def from_task_instance(cls, task_instance):

69

"""

70

Create facet from task instance.

71

72

Args:

73

task_instance: Airflow task instance

74

75

Returns:

76

AirflowMappedTaskRunFacet: Facet with mapped task metadata

77

"""

78

```

79

80

### Debug and Development Facets

81

82

Facets for debugging, development, and operational visibility.

83

84

```python { .api }

85

class AirflowDebugRunFacet:

86

"""

87

Debug information facet with package details and system information.

88

89

Includes comprehensive debugging metadata when debug mode is enabled,

90

potentially creating large events with detailed system information.

91

"""

92

93

packages: dict # Installed packages and their versions

94

95

class UnknownOperatorInstance:

96

"""

97

Descriptor for unknown or unrecognized operators.

98

"""

99

100

name: str # Operator name or identifier

101

properties: dict[str, object] # Operator properties and attributes

102

type: str # Operator type classification

103

104

class UnknownOperatorAttributeRunFacet:

105

"""

106

Facet for capturing information about unknown or unhandled operators.

107

108

Provides visibility into operators that don't have specific extractors

109

while still capturing basic metadata for lineage tracking.

110

"""

111

112

unknownItems: list[UnknownOperatorInstance] # List of unknown operator instances

113

```

114

115

## Usage Examples

116

117

### Creating Airflow Run Facets

118

119

```python

120

from airflow.providers.openlineage.plugins.facets import AirflowRunFacet

121

from airflow.models import TaskInstance, DagRun

122

123

def create_run_facet(task_instance: TaskInstance, dag_run: DagRun):

124

"""Create comprehensive Airflow run facet."""

125

126

facet = AirflowRunFacet(

127

dag={

128

'dag_id': dag_run.dag_id,

129

'schedule_interval': str(dag_run.dag.schedule_interval),

130

'start_date': dag_run.dag.start_date.isoformat(),

131

'tags': dag_run.dag.tags,

132

'owner': dag_run.dag.owner

133

},

134

dagRun={

135

'run_id': dag_run.run_id,

136

'execution_date': dag_run.execution_date.isoformat(),

137

'start_date': dag_run.start_date.isoformat() if dag_run.start_date else None,

138

'end_date': dag_run.end_date.isoformat() if dag_run.end_date else None,

139

'state': dag_run.state,

140

'run_type': dag_run.run_type

141

},

142

task={

143

'task_id': task_instance.task_id,

144

'operator_class': task_instance.operator.__class__.__name__,

145

'pool': task_instance.pool,

146

'queue': task_instance.queue,

147

'priority_weight': task_instance.priority_weight

148

},

149

taskInstance={

150

'try_number': task_instance.try_number,

151

'max_tries': task_instance.max_tries,

152

'start_date': task_instance.start_date.isoformat() if task_instance.start_date else None,

153

'end_date': task_instance.end_date.isoformat() if task_instance.end_date else None,

154

'duration': task_instance.duration,

155

'state': task_instance.state,

156

'hostname': task_instance.hostname,

157

'pid': task_instance.pid

158

},

159

taskUuid=f"{dag_run.dag_id}.{task_instance.task_id}.{task_instance.execution_date}.{task_instance.try_number}"

160

)

161

162

return facet

163

```

164

165

### Working with State Facets

166

167

```python

168

from airflow.providers.openlineage.plugins.facets import AirflowStateRunFacet

169

from airflow.models import DagRun

170

171

def create_state_facet(dag_run: DagRun):

172

"""Create state tracking facet."""

173

174

# Get task states from DAG run

175

task_states = {}

176

for task_instance in dag_run.get_task_instances():

177

task_states[task_instance.task_id] = task_instance.state or 'none'

178

179

facet = AirflowStateRunFacet(

180

dagRunState=dag_run.state or 'none',

181

tasksState=task_states

182

)

183

184

return facet

185

186

# Usage in lineage extraction

187

state_facet = create_state_facet(dag_run)

188

run_facets = {

189

'airflow_state': state_facet

190

}

191

```

192

193

### Mapped Task Facets

194

195

```python

196

from airflow.providers.openlineage.plugins.facets import AirflowMappedTaskRunFacet

197

from airflow.models import TaskInstance

198

199

def handle_mapped_task(task_instance: TaskInstance):

200

"""Handle mapped task metadata extraction."""

201

202

if hasattr(task_instance, 'map_index') and task_instance.map_index is not None:

203

# Create mapped task facet

204

mapped_facet = AirflowMappedTaskRunFacet.from_task_instance(task_instance)

205

206

print(f"Mapped task index: {mapped_facet.mapIndex}")

207

print(f"Operator class: {mapped_facet.operatorClass}")

208

209

return {

210

'airflow_mapped_task': mapped_facet

211

}

212

213

return {}

214

215

# Example with mapped operator

216

from airflow.operators.python import PythonOperator

217

218

@dag

219

def mapped_dag():

220

def process_item(item):

221

return f"Processed {item}"

222

223

# Mapped task creates multiple instances

224

mapped_task = PythonOperator.partial(

225

task_id='process_items',

226

python_callable=process_item

227

).expand(op_kwargs=[{'item': i} for i in range(5)])

228

229

return mapped_task

230

231

# Each mapped instance gets its own facet with map_index

232

```

233

234

### Job Structure Facets

235

236

```python

237

from airflow.providers.openlineage.plugins.facets import AirflowJobFacet

238

from airflow.models import DAG, DagRun

239

240

def create_job_facet(dag: DAG, dag_run: DagRun):

241

"""Create job facet with DAG structure information."""

242

243

# Build task tree

244

task_tree = {}

245

for task_id, task in dag.task_dict.items():

246

task_tree[task_id] = {

247

'operator_class': task.__class__.__name__,

248

'upstream_task_ids': list(task.upstream_task_ids),

249

'downstream_task_ids': list(task.downstream_task_ids)

250

}

251

252

# Build task groups

253

task_groups = {}

254

if hasattr(dag, 'task_group_dict'):

255

for group_id, group in dag.task_group_dict.items():

256

task_groups[group_id] = {

257

'tooltip': group.tooltip,

258

'ui_color': group.ui_color,

259

'ui_fgcolor': group.ui_fgcolor,

260

'children': list(group.children.keys())

261

}

262

263

# Build tasks metadata

264

tasks = {}

265

for task_id, task in dag.task_dict.items():

266

tasks[task_id] = {

267

'operator_class': task.__class__.__name__,

268

'template_fields': list(task.template_fields) if task.template_fields else [],

269

'pool': task.pool,

270

'queue': task.queue,

271

'retries': task.retries,

272

'retry_delay': str(task.retry_delay) if task.retry_delay else None

273

}

274

275

facet = AirflowJobFacet(

276

taskTree=task_tree,

277

taskGroups=task_groups,

278

tasks=tasks

279

)

280

281

return facet

282

```

283

284

### Unknown Operator Handling

285

286

```python

287

from airflow.providers.openlineage.plugins.facets import (

288

UnknownOperatorInstance,

289

UnknownOperatorAttributeRunFacet

290

)

291

from airflow.models import BaseOperator

292

293

def handle_unknown_operator(operator: BaseOperator):

294

"""Handle operators without specific extractors."""

295

296

# Extract basic operator properties

297

properties = {}

298

for attr in dir(operator):

299

if not attr.startswith('_') and not callable(getattr(operator, attr)):

300

try:

301

value = getattr(operator, attr)

302

# Only include serializable values

303

if isinstance(value, (str, int, float, bool, list, dict)):

304

properties[attr] = value

305

except Exception:

306

continue # Skip problematic attributes

307

308

# Create unknown operator instance

309

unknown_instance = UnknownOperatorInstance(

310

name=operator.task_id,

311

properties=properties,

312

type=operator.__class__.__name__

313

)

314

315

# Create facet

316

facet = UnknownOperatorAttributeRunFacet(

317

unknownItems=[unknown_instance]

318

)

319

320

return {

321

'airflow_unknown_operator': facet

322

}

323

324

# Usage in custom extractor

325

class GenericExtractor(BaseExtractor):

326

def extract(self):

327

# Try specific extraction first

328

if hasattr(self._operator, 'get_openlineage_facets_on_start'):

329

return self._operator.get_openlineage_facets_on_start()

330

331

# Fall back to unknown operator handling

332

unknown_facets = handle_unknown_operator(self._operator)

333

334

return OperatorLineage(

335

inputs=[],

336

outputs=[],

337

run_facets=unknown_facets,

338

job_facets={}

339

)

340

```

341

342

### Debug Facets

343

344

```python

345

from airflow.providers.openlineage.plugins.facets import AirflowDebugRunFacet

346

import pkg_resources

347

348

def create_debug_facet():

349

"""Create debug facet with system information."""

350

351

# Get installed packages

352

packages = {}

353

for dist in pkg_resources.working_set:

354

packages[dist.project_name] = dist.version

355

356

facet = AirflowDebugRunFacet(

357

packages=packages

358

)

359

360

return facet

361

362

# Usage with configuration check

363

from airflow.providers.openlineage.conf import debug_mode

364

365

def get_debug_facets():

366

"""Get debug facets if debug mode is enabled."""

367

368

if debug_mode():

369

return {

370

'airflow_debug': create_debug_facet()

371

}

372

return {}

373

```

374

375

## Integration with Lineage Extraction

376

377

### Using Facets in Extractors

378

379

```python

380

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage

381

from airflow.providers.openlineage.plugins.facets import AirflowRunFacet

382

383

class CustomExtractor(BaseExtractor):

384

def extract(self):

385

# Basic lineage

386

lineage = OperatorLineage(

387

inputs=[],

388

outputs=[],

389

run_facets={},

390

job_facets={}

391

)

392

393

# Add Airflow-specific facets

394

if hasattr(self, '_task_instance') and hasattr(self, '_dag_run'):

395

airflow_facet = create_run_facet(self._task_instance, self._dag_run)

396

lineage.run_facets['airflow'] = airflow_facet

397

398

return lineage

399

```

400

401

### Facet Utilities

402

403

```python

404

from airflow.providers.openlineage.utils.utils import (

405

get_airflow_run_facet,

406

get_airflow_job_facet,

407

get_airflow_state_run_facet,

408

get_airflow_debug_facet

409

)

410

411

def get_comprehensive_facets(task_instance, dag_run, dag):

412

"""Get all available Airflow facets."""

413

414

facets = {}

415

416

# Add run facet

417

run_facet = get_airflow_run_facet(

418

dag_run=dag_run,

419

dag=dag,

420

task_instance=task_instance,

421

task=task_instance.task,

422

task_uuid=f"{dag_run.dag_id}.{task_instance.task_id}"

423

)

424

facets.update(run_facet)

425

426

# Add job facet

427

job_facet = get_airflow_job_facet(dag_run)

428

facets.update(job_facet)

429

430

# Add state facet

431

state_facet = get_airflow_state_run_facet(dag_run, dag, task_instance)

432

facets.update(state_facet)

433

434

# Add debug facet if enabled

435

debug_facet = get_airflow_debug_facet()

436

facets.update(debug_facet)

437

438

return facets

439

```

440

441

## Custom Facet Development

442

443

### Creating Custom Facets

444

445

```python

446

from openlineage.client.facet import RunFacet

447

from dataclasses import dataclass

448

from typing import Dict, Any

449

450

@dataclass

451

class CustomProcessingRunFacet(RunFacet):

452

"""Custom facet for processing statistics."""

453

454

records_processed: int

455

processing_time_seconds: float

456

memory_usage_mb: float

457

error_count: int

458

warnings: list[str]

459

460

@staticmethod

461

def _get_schema() -> str:

462

return "https://my-company.com/schemas/custom_processing_run_facet.json"

463

464

# Usage in extractor

465

def extract_with_custom_facet(self):

466

# ... extraction logic ...

467

468

custom_facet = CustomProcessingRunFacet(

469

records_processed=1000,

470

processing_time_seconds=45.2,

471

memory_usage_mb=128,

472

error_count=0,

473

warnings=[]

474

)

475

476

return OperatorLineage(

477

inputs=inputs,

478

outputs=outputs,

479

run_facets={

480

'custom_processing': custom_facet

481

},

482

job_facets={}

483

)

484

```

485

486

### Facet Registration

487

488

```python

489

# Register custom facets via configuration

490

# In airflow.cfg:

491

[openlineage]

492

custom_run_facets = my_package.facets.custom_processing_facet;my_package.facets.performance_facet

493

494

# The functions should return dict[str, RunFacet]

495

def custom_processing_facet(task_instance, **kwargs):

496

return {

497

'custom_processing': CustomProcessingRunFacet(...)

498

}

499

```