or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfacets-metadata.mdindex.mdlineage-extraction.mdplugin-integration.mdselective-control.mdspark-integration.mdsql-parsing.mdsql-utilities.mdtemplate-macros.mdutility-functions.md

utility-functions.mddocs/

0

# Utility Functions and Helpers

1

2

General utility functions for working with OpenLineage data, including operator analysis, documentation extraction, data conversion, and helper functions for common OpenLineage operations.

3

4

## Capabilities

5

6

### Operator Analysis Functions

7

8

Functions for analyzing and extracting information from Airflow operators.

9

10

```python { .api }

11

def get_operator_class(task: BaseOperator) -> type:

12

"""

13

Get the operator class from a task instance.

14

15

Args:

16

task: Airflow task/operator instance

17

18

Returns:

19

type: Operator class type

20

"""

21

22

def get_fully_qualified_class_name(operator: BaseOperator | MappedOperator) -> str:

23

"""

24

Get the fully qualified class name of an operator.

25

26

Args:

27

operator: Airflow operator instance

28

29

Returns:

30

str: Fully qualified class name (e.g., 'airflow.operators.python.PythonOperator')

31

"""

32

33

def get_operator_provider_version(operator: BaseOperator | MappedOperator) -> str | None:

34

"""

35

Get the provider version for an operator.

36

37

Args:

38

operator: Airflow operator instance

39

40

Returns:

41

str | None: Provider version string or None if not available

42

"""

43

44

def is_operator_disabled(operator: BaseOperator | MappedOperator) -> bool:

45

"""

46

Check if an operator is disabled for lineage collection.

47

48

Args:

49

operator: Airflow operator instance

50

51

Returns:

52

bool: True if operator is disabled for lineage

53

"""

54

```

55

56

### Task and Job Identification

57

58

Functions for generating job names and identifiers for OpenLineage events.

59

60

```python { .api }

61

def get_job_name(task: TaskInstance | RuntimeTaskInstance) -> str:

62

"""

63

Get the OpenLineage job name for a task instance.

64

65

Args:

66

task: Task instance

67

68

Returns:

69

str: Formatted job name for OpenLineage events

70

"""

71

```

72

73

### Documentation Extraction

74

75

Functions for extracting documentation and metadata from DAGs and tasks.

76

77

```python { .api }

78

def get_task_documentation(operator: BaseOperator | MappedOperator | None) -> tuple[str | None, str | None]:

79

"""

80

Extract documentation from a task/operator.

81

82

Args:

83

operator: Airflow operator instance

84

85

Returns:

86

tuple: (doc_md, description) - documentation markdown and description

87

"""

88

89

def get_dag_documentation(dag: DAG | None) -> tuple[str | None, str | None]:

90

"""

91

Extract documentation from a DAG.

92

93

Args:

94

dag: Airflow DAG instance

95

96

Returns:

97

tuple: (doc_md, description) - documentation markdown and description

98

"""

99

```

100

101

### Facet Generation Functions

102

103

Functions for generating Airflow-specific facets and metadata.

104

105

```python { .api }

106

def get_task_parent_run_facet(

107

task_instance: TaskInstance,

108

dag_run: DagRun,

109

dag: DAG

110

) -> dict[str, Any]:

111

"""

112

Get parent run facet information for a task.

113

114

Args:

115

task_instance: Task instance

116

dag_run: DAG run instance

117

dag: DAG instance

118

119

Returns:

120

dict: Parent run facet data

121

"""

122

123

def get_airflow_mapped_task_facet(task_instance: TaskInstance) -> dict[str, Any]:

124

"""

125

Get mapped task facet for dynamic task mapping.

126

127

Args:

128

task_instance: Task instance (mapped task)

129

130

Returns:

131

dict: Mapped task facet data

132

"""

133

134

def get_user_provided_run_facets(ti: TaskInstance, ti_state: TaskInstanceState) -> dict[str, RunFacet]:

135

"""

136

Get user-provided custom run facets.

137

138

Args:

139

ti: Task instance

140

ti_state: Task instance state

141

142

Returns:

143

dict: User-provided run facets

144

"""

145

146

def get_airflow_dag_run_facet(dag_run: DagRun) -> dict[str, RunFacet]:

147

"""

148

Get Airflow DAG run facet.

149

150

Args:

151

dag_run: DAG run instance

152

153

Returns:

154

dict: DAG run facet data

155

"""

156

157

def get_processing_engine_facet() -> dict[str, processing_engine_run.ProcessingEngineRunFacet]:

158

"""

159

Get processing engine facet with Airflow version information.

160

161

Returns:

162

dict: Processing engine facet

163

"""

164

165

def get_airflow_debug_facet() -> dict[str, AirflowDebugRunFacet]:

166

"""

167

Get Airflow debug facet with system information.

168

169

Returns:

170

dict: Debug facet data (only if debug mode is enabled)

171

"""

172

173

def get_airflow_run_facet(

174

dag_run: DagRun,

175

dag: DAG,

176

task_instance: TaskInstance,

177

task: BaseOperator,

178

task_uuid: str

179

) -> dict[str, AirflowRunFacet]:

180

"""

181

Get comprehensive Airflow run facet.

182

183

Args:

184

dag_run: DAG run instance

185

dag: DAG instance

186

task_instance: Task instance

187

task: Task/operator

188

task_uuid: Unique task identifier

189

190

Returns:

191

dict: Comprehensive Airflow run facet

192

"""

193

194

def get_airflow_job_facet(dag_run: DagRun) -> dict[str, AirflowJobFacet]:

195

"""

196

Get Airflow job facet with DAG structure.

197

198

Args:

199

dag_run: DAG run instance

200

201

Returns:

202

dict: Airflow job facet data

203

"""

204

205

def get_airflow_state_run_facet(

206

dag_run: DagRun,

207

dag: DAG,

208

task_instance: TaskInstance

209

) -> dict[str, AirflowStateRunFacet]:

210

"""

211

Get Airflow state run facet.

212

213

Args:

214

dag_run: DAG run instance

215

dag: DAG instance

216

task_instance: Task instance

217

218

Returns:

219

dict: State run facet data

220

"""

221

222

def get_unknown_source_attribute_run_facet(task: BaseOperator, name: str | None = None):

223

"""

224

Get unknown operator facet for unhandled operators.

225

226

Args:

227

task: Airflow task/operator

228

name: Optional name override

229

230

Returns:

231

dict: Unknown operator facet data

232

"""

233

```

234

235

### Data Conversion Functions

236

237

Functions for converting between different data formats and structures.

238

239

```python { .api }

240

def translate_airflow_asset(asset: Asset, lineage_context) -> OpenLineageDataset | None:

241

"""

242

Convert Airflow Asset to OpenLineage Dataset.

243

244

Args:

245

asset: Airflow Asset instance

246

lineage_context: Lineage extraction context

247

248

Returns:

249

OpenLineageDataset | None: Converted dataset or None if conversion fails

250

"""

251

```

252

253

### Configuration and Environment Functions

254

255

Functions for checking configuration and environment settings.

256

257

```python { .api }

258

def is_selective_lineage_enabled(obj: DAG | BaseOperator | MappedOperator) -> bool:

259

"""

260

Check if selective lineage is enabled for a DAG or task.

261

262

Args:

263

obj: DAG or task instance

264

265

Returns:

266

bool: True if selective lineage is enabled

267

"""

268

269

def should_use_external_connection(hook) -> bool:

270

"""

271

Check if external connection should be used for a hook.

272

273

Args:

274

hook: Database or connection hook

275

276

Returns:

277

bool: True if external connection should be used

278

"""

279

```

280

281

### Utility Classes and Helpers

282

283

Helper classes for data serialization and processing.

284

285

```python { .api }

286

class InfoJsonEncodable:

287

"""Base class for JSON serializable info objects."""

288

289

class DagInfo(InfoJsonEncodable):

290

"""DAG information encoder for JSON serialization."""

291

292

class DagRunInfo(InfoJsonEncodable):

293

"""DAG run information encoder for JSON serialization."""

294

295

class TaskInstanceInfo(InfoJsonEncodable):

296

"""Task instance information encoder for JSON serialization."""

297

298

class AssetInfo(InfoJsonEncodable):

299

"""Asset information encoder for JSON serialization."""

300

301

class TaskInfo(InfoJsonEncodable):

302

"""Task information encoder for JSON serialization."""

303

304

class TaskInfoComplete(InfoJsonEncodable):

305

"""Complete task information encoder for JSON serialization."""

306

307

class TaskGroupInfo(InfoJsonEncodable):

308

"""Task group information encoder for JSON serialization."""

309

310

class OpenLineageRedactor:

311

"""OpenLineage-specific secrets redactor for sensitive data masking."""

312

313

def try_import_from_string(string: str) -> Any:

314

"""

315

Safe import utility with error handling.

316

317

Args:

318

string: Module or class path to import

319

320

Returns:

321

Any: Imported object or None if import fails

322

"""

323

324

def print_warning(log):

325

"""

326

Warning decorator function for logging warnings.

327

328

Args:

329

log: Logger instance

330

"""

331

```

332

333

### Version Compatibility

334

335

Version compatibility constants and utilities.

336

337

```python { .api }

338

AIRFLOW_V_3_0_PLUS: bool

339

"""Boolean flag indicating Airflow 3.0+ compatibility."""

340

```

341

342

## Usage Examples

343

344

### Operator Analysis

345

346

```python

347

from airflow.providers.openlineage.utils.utils import (

348

get_operator_class,

349

get_fully_qualified_class_name,

350

is_operator_disabled

351

)

352

from airflow.operators.python import PythonOperator

353

354

def my_function():

355

return "Hello World"

356

357

task = PythonOperator(

358

task_id='example_task',

359

python_callable=my_function,

360

dag=dag

361

)

362

363

# Analyze operator

364

operator_class = get_operator_class(task)

365

class_name = get_fully_qualified_class_name(task)

366

is_disabled = is_operator_disabled(task)

367

368

print(f"Operator class: {operator_class}")

369

print(f"Class name: {class_name}")

370

print(f"Is disabled: {is_disabled}")

371

```

372

373

### Documentation Extraction

374

375

```python

376

from airflow.providers.openlineage.utils.utils import get_task_documentation, get_dag_documentation

377

from airflow import DAG

378

from airflow.operators.python import PythonOperator

379

380

# DAG with documentation

381

dag = DAG(

382

'documented_dag',

383

start_date=datetime(2023, 1, 1),

384

description='A well-documented data processing pipeline',

385

doc_md="""

386

# Data Processing Pipeline

387

388

This pipeline processes user data and generates analytics reports.

389

"""

390

)

391

392

def documented_function():

393

"""Process user data and return results."""

394

return "Processing complete"

395

396

# Task with documentation

397

task = PythonOperator(

398

task_id='process_data',

399

python_callable=documented_function,

400

doc_md='Processes user data using advanced algorithms',

401

dag=dag

402

)

403

404

# Extract documentation

405

dag_doc_md, dag_description = get_dag_documentation(dag)

406

task_doc_md, task_description = get_task_documentation(task)

407

408

print(f"DAG documentation: {dag_doc_md}")

409

print(f"DAG description: {dag_description}")

410

print(f"Task documentation: {task_doc_md}")

411

print(f"Task description: {task_description}")

412

```

413

414

### Facet Generation

415

416

```python

417

from airflow.providers.openlineage.utils.utils import (

418

get_airflow_run_facet,

419

get_processing_engine_facet,

420

get_airflow_debug_facet

421

)

422

423

def generate_comprehensive_facets(dag_run, dag, task_instance, task):

424

"""Generate comprehensive facets for a task execution."""

425

426

# Generate task UUID

427

task_uuid = f"{dag_run.dag_id}.{task_instance.task_id}.{task_instance.execution_date}.{task_instance.try_number}"

428

429

# Get all facets

430

facets = {}

431

432

# Airflow run facet

433

run_facet = get_airflow_run_facet(dag_run, dag, task_instance, task, task_uuid)

434

facets.update(run_facet)

435

436

# Processing engine facet

437

engine_facet = get_processing_engine_facet()

438

facets.update(engine_facet)

439

440

# Debug facet (if enabled)

441

debug_facet = get_airflow_debug_facet()

442

facets.update(debug_facet)

443

444

return facets

445

446

# Usage

447

comprehensive_facets = generate_comprehensive_facets(dag_run, dag, task_instance, task)

448

print(f"Generated facets: {list(comprehensive_facets.keys())}")

449

```

450

451

### Data Conversion

452

453

```python

454

from airflow.providers.openlineage.utils.utils import translate_airflow_asset

455

from airflow.models import Asset

456

457

# Create Airflow Asset

458

asset = Asset(

459

uri="s3://my-bucket/data/users.parquet",

460

name="user_data"

461

)

462

463

# Convert to OpenLineage Dataset

464

lineage_context = {'namespace': 'production'}

465

ol_dataset = translate_airflow_asset(asset, lineage_context)

466

467

if ol_dataset:

468

print(f"OpenLineage Dataset: {ol_dataset.namespace}/{ol_dataset.name}")

469

else:

470

print("Asset conversion failed")

471

```

472

473

### Safe Import Utility

474

475

```python

476

from airflow.providers.openlineage.utils.utils import try_import_from_string

477

478

# Safely import optional dependencies

479

pandas = try_import_from_string('pandas')

480

if pandas:

481

# Use pandas functionality

482

df = pandas.DataFrame({'a': [1, 2, 3]})

483

print("Pandas available")

484

else:

485

print("Pandas not available")

486

487

# Try importing custom modules

488

custom_processor = try_import_from_string('my_package.processors.CustomProcessor')

489

if custom_processor:

490

processor = custom_processor()

491

processor.process()

492

else:

493

print("Custom processor not available")

494

```

495

496

### JSON Serialization

497

498

```python

499

from airflow.providers.openlineage.utils.utils import DagInfo, TaskInfo, DagRunInfo

500

501

# Create serializable info objects

502

dag_info = DagInfo()

503

dag_info.dag_id = dag.dag_id

504

dag_info.schedule_interval = str(dag.schedule_interval)

505

dag_info.start_date = dag.start_date.isoformat()

506

507

task_info = TaskInfo()

508

task_info.task_id = task.task_id

509

task_info.operator_class = task.__class__.__name__

510

511

dag_run_info = DagRunInfo()

512

dag_run_info.run_id = dag_run.run_id

513

dag_run_info.execution_date = dag_run.execution_date.isoformat()

514

515

# Serialize to JSON

516

import json

517

serialized_data = {

518

'dag': dag_info.__dict__,

519

'task': task_info.__dict__,

520

'dag_run': dag_run_info.__dict__

521

}

522

523

json_string = json.dumps(serialized_data, indent=2)

524

print(f"Serialized data: {json_string}")

525

```

526

527

### Secrets Redaction

528

529

```python

530

from airflow.providers.openlineage.utils.utils import OpenLineageRedactor

531

532

# Create redactor

533

redactor = OpenLineageRedactor()

534

535

# Sample data with sensitive information

536

sensitive_data = {

537

'connection_string': 'postgresql://user:password@localhost/db',

538

'api_key': 'secret-api-key-12345',

539

'config': {

540

'database_password': 'super-secret',

541

'username': 'admin'

542

}

543

}

544

545

# Redact sensitive data

546

redacted_data = redactor.redact(sensitive_data)

547

print(f"Redacted data: {redacted_data}")

548

```

549

550

### Selective Lineage Checking

551

552

```python

553

from airflow.providers.openlineage.utils.utils import is_selective_lineage_enabled

554

from airflow.providers.openlineage.utils.selective_enable import enable_lineage

555

556

# Check DAG lineage status

557

if is_selective_lineage_enabled(dag):

558

print("Selective lineage is enabled for this DAG")

559

else:

560

print("Selective lineage is not enabled for this DAG")

561

562

# Enable selective lineage

563

enabled_dag = enable_lineage(dag)

564

565

# Check again

566

if is_selective_lineage_enabled(enabled_dag):

567

print("Selective lineage is now enabled")

568

```

569

570

### Hook Connection Analysis

571

572

```python

573

from airflow.providers.openlineage.utils.utils import should_use_external_connection

574

from airflow.hooks.postgres_hook import PostgresHook

575

576

# Create hook

577

hook = PostgresHook(postgres_conn_id='analytics_db')

578

579

# Check if external connection should be used

580

use_external = should_use_external_connection(hook)

581

582

if use_external:

583

print("Using external connection for lineage")

584

else:

585

print("Using standard connection handling")

586

```

587

588

## Integration Patterns

589

590

### Custom Extractor with Utilities

591

592

```python

593

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage

594

from airflow.providers.openlineage.utils.utils import (

595

get_fully_qualified_class_name,

596

get_task_documentation,

597

get_airflow_run_facet

598

)

599

600

class UtilityExtractor(BaseExtractor):

601

def extract(self):

602

# Use utilities for comprehensive extraction

603

class_name = get_fully_qualified_class_name(self._operator)

604

doc_md, description = get_task_documentation(self._operator)

605

606

# Create lineage with metadata

607

lineage = OperatorLineage(

608

inputs=[],

609

outputs=[],

610

run_facets={

611

'operator_info': {

612

'class_name': class_name,

613

'documentation': doc_md,

614

'description': description

615

}

616

},

617

job_facets={}

618

)

619

620

return lineage

621

```

622

623

### Comprehensive Metadata Collection

624

625

```python

626

from airflow.providers.openlineage.utils.utils import *

627

628

def collect_comprehensive_metadata(dag, dag_run, task_instance, task):

629

"""Collect all available metadata using utility functions."""

630

631

metadata = {

632

'operator': {

633

'class': get_operator_class(task),

634

'fully_qualified_name': get_fully_qualified_class_name(task),

635

'provider_version': get_operator_provider_version(task),

636

'is_disabled': is_operator_disabled(task)

637

},

638

'job': {

639

'name': get_job_name(task_instance)

640

},

641

'documentation': {

642

'dag': get_dag_documentation(dag),

643

'task': get_task_documentation(task)

644

},

645

'facets': {

646

'run': get_airflow_run_facet(dag_run, dag, task_instance, task, 'uuid'),

647

'job': get_airflow_job_facet(dag_run),

648

'state': get_airflow_state_run_facet(dag_run, dag, task_instance),

649

'engine': get_processing_engine_facet(),

650

'debug': get_airflow_debug_facet()

651

},

652

'config': {

653

'selective_enabled': is_selective_lineage_enabled(task)

654

}

655

}

656

657

return metadata

658

659

# Usage

660

comprehensive_metadata = collect_comprehensive_metadata(dag, dag_run, task_instance, task)

661

```