or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfacets-metadata.mdindex.mdlineage-extraction.mdplugin-integration.mdselective-control.mdspark-integration.mdsql-parsing.mdsql-utilities.mdtemplate-macros.mdutility-functions.md

selective-control.mddocs/

0

# Selective Lineage Control

1

2

Utilities for fine-grained control over lineage collection, allowing selective enabling/disabling at DAG and task levels. This provides granular control over which components emit OpenLineage events, enabling performance optimization and privacy controls.

3

4

## Capabilities

5

6

### Lineage Control Functions

7

8

Functions for enabling and disabling lineage collection on DAGs and tasks.

9

10

```python { .api }

11

def enable_lineage(obj: T) -> T:

12

"""

13

Enable lineage collection for a DAG or task.

14

15

Args:

16

obj: DAG or task instance to enable lineage for

17

18

Returns:

19

T: The same object with lineage enabled (for method chaining)

20

"""

21

22

def disable_lineage(obj: T) -> T:

23

"""

24

Disable lineage collection for a DAG or task.

25

26

Args:

27

obj: DAG or task instance to disable lineage for

28

29

Returns:

30

T: The same object with lineage disabled (for method chaining)

31

"""

32

```

33

34

### Lineage Status Checking

35

36

Functions to check whether lineage is enabled for specific DAGs and tasks.

37

38

```python { .api }

39

def is_task_lineage_enabled(task: BaseOperator | MappedOperator) -> bool:

40

"""

41

Check if lineage collection is enabled for a specific task.

42

43

Args:

44

task: Task/operator to check

45

46

Returns:

47

bool: True if lineage is enabled for this task

48

"""

49

50

def is_dag_lineage_enabled(dag: DAG) -> bool:

51

"""

52

Check if lineage collection is enabled for a specific DAG.

53

54

Args:

55

dag: DAG to check

56

57

Returns:

58

bool: True if lineage is enabled for this DAG

59

"""

60

```

61

62

### Parameter Constants

63

64

Constants for the lineage control parameter system.

65

66

```python { .api }

67

ENABLE_OL_PARAM_NAME: str

68

"""Name of the parameter used to control OpenLineage enablement."""

69

70

ENABLE_OL_PARAM: Param

71

"""Parameter object for enabling OpenLineage on DAGs/tasks."""

72

73

DISABLE_OL_PARAM: Param

74

"""Parameter object for disabling OpenLineage on DAGs/tasks."""

75

```

76

77

## Usage Examples

78

79

### Basic DAG Lineage Control

80

81

```python

82

from airflow import DAG

83

from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage

84

from datetime import datetime

85

86

# Enable lineage for entire DAG

87

dag = enable_lineage(DAG(

88

'analytics_pipeline',

89

start_date=datetime(2023, 1, 1),

90

schedule_interval='@daily',

91

description='Analytics data processing pipeline'

92

))

93

94

# Disable lineage for a different DAG

95

sensitive_dag = disable_lineage(DAG(

96

'sensitive_data_processing',

97

start_date=datetime(2023, 1, 1),

98

schedule_interval='@hourly',

99

description='Sensitive data processing - no lineage tracking'

100

))

101

```

102

103

### Task-Level Lineage Control

104

105

```python

106

from airflow.operators.python import PythonOperator

107

from airflow.operators.bash import BashOperator

108

from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage

109

110

def process_data():

111

return "Processing complete"

112

113

def sensitive_operation():

114

return "Sensitive processing complete"

115

116

# Enable lineage for specific task

117

process_task = enable_lineage(PythonOperator(

118

task_id='process_public_data',

119

python_callable=process_data,

120

dag=dag

121

))

122

123

# Disable lineage for sensitive task

124

sensitive_task = disable_lineage(PythonOperator(

125

task_id='process_sensitive_data',

126

python_callable=sensitive_operation,

127

dag=dag

128

))

129

130

# Mix with regular tasks (inherit DAG setting)

131

regular_task = BashOperator(

132

task_id='cleanup_temp_files',

133

bash_command='rm -rf /tmp/processing/*',

134

dag=dag

135

)

136

```

137

138

### Checking Lineage Status

139

140

```python

141

from airflow.providers.openlineage.utils.selective_enable import (

142

is_dag_lineage_enabled,

143

is_task_lineage_enabled

144

)

145

146

# Check DAG lineage status

147

if is_dag_lineage_enabled(dag):

148

print(f"Lineage is enabled for DAG: {dag.dag_id}")

149

else:

150

print(f"Lineage is disabled for DAG: {dag.dag_id}")

151

152

# Check task lineage status

153

for task_id, task in dag.task_dict.items():

154

if is_task_lineage_enabled(task):

155

print(f"Task {task_id}: lineage enabled")

156

else:

157

print(f"Task {task_id}: lineage disabled")

158

```

159

160

### Conditional Lineage Control

161

162

```python

163

from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage

164

165

def create_processing_dag(environment: str):

166

"""Create DAG with environment-specific lineage settings."""

167

168

dag = DAG(

169

f'data_processing_{environment}',

170

start_date=datetime(2023, 1, 1),

171

schedule_interval='@daily'

172

)

173

174

# Enable lineage only for production

175

if environment == 'production':

176

dag = enable_lineage(dag)

177

else:

178

dag = disable_lineage(dag)

179

180

return dag

181

182

# Create environment-specific DAGs

183

prod_dag = create_processing_dag('production') # Lineage enabled

184

dev_dag = create_processing_dag('development') # Lineage disabled

185

test_dag = create_processing_dag('testing') # Lineage disabled

186

```

187

188

### Selective Enable Mode

189

190

```python

191

from airflow.providers.openlineage.conf import selective_enable

192

from airflow.providers.openlineage.utils.selective_enable import enable_lineage

193

194

# Check if selective enable mode is active

195

if selective_enable():

196

print("Selective enable mode: Only explicitly enabled DAGs/tasks will emit lineage")

197

198

# In selective mode, must explicitly enable lineage

199

dag = enable_lineage(DAG(

200

'important_pipeline',

201

start_date=datetime(2023, 1, 1)

202

))

203

else:

204

print("Normal mode: All DAGs/tasks emit lineage unless explicitly disabled")

205

206

# In normal mode, lineage is enabled by default

207

dag = DAG(

208

'standard_pipeline',

209

start_date=datetime(2023, 1, 1)

210

)

211

```

212

213

### Advanced Patterns

214

215

```python

216

from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage

217

218

class LineageControlledDAG(DAG):

219

"""Custom DAG class with built-in lineage control."""

220

221

def __init__(self, enable_lineage_tracking=True, **kwargs):

222

super().__init__(**kwargs)

223

224

if enable_lineage_tracking:

225

enable_lineage(self)

226

else:

227

disable_lineage(self)

228

229

# Usage

230

production_dag = LineageControlledDAG(

231

'production_pipeline',

232

enable_lineage_tracking=True,

233

start_date=datetime(2023, 1, 1)

234

)

235

236

development_dag = LineageControlledDAG(

237

'development_pipeline',

238

enable_lineage_tracking=False,

239

start_date=datetime(2023, 1, 1)

240

)

241

```

242

243

### Task Group Lineage Control

244

245

```python

246

from airflow.utils.task_group import TaskGroup

247

from airflow.operators.python import PythonOperator

248

from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage

249

250

with DAG('grouped_pipeline', start_date=datetime(2023, 1, 1)) as dag:

251

252

# Enable lineage for entire task group

253

with TaskGroup('data_ingestion') as ingestion_group:

254

extract_task = PythonOperator(

255

task_id='extract_data',

256

python_callable=extract_function

257

)

258

validate_task = PythonOperator(

259

task_id='validate_data',

260

python_callable=validate_function

261

)

262

263

# Enable lineage for the group

264

enable_lineage(ingestion_group)

265

266

# Disable lineage for sensitive processing group

267

with TaskGroup('sensitive_processing') as sensitive_group:

268

process_pii = PythonOperator(

269

task_id='process_pii',

270

python_callable=process_pii_function

271

)

272

anonymize_data = PythonOperator(

273

task_id='anonymize_data',

274

python_callable=anonymize_function

275

)

276

277

# Disable lineage for the entire group

278

disable_lineage(sensitive_group)

279

```

280

281

### Dynamic Lineage Control

282

283

```python

284

from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage

285

from airflow.models import Variable

286

287

def create_dynamic_dag():

288

"""Create DAG with dynamic lineage control based on Airflow Variables."""

289

290

dag = DAG(

291

'dynamic_lineage_dag',

292

start_date=datetime(2023, 1, 1),

293

schedule_interval='@daily'

294

)

295

296

# Check Airflow Variable for lineage setting

297

lineage_enabled = Variable.get('enable_lineage_tracking', default_var='true').lower() == 'true'

298

299

if lineage_enabled:

300

dag = enable_lineage(dag)

301

print("Lineage tracking enabled via Variable")

302

else:

303

dag = disable_lineage(dag)

304

print("Lineage tracking disabled via Variable")

305

306

return dag

307

308

# Usage

309

dynamic_dag = create_dynamic_dag()

310

```

311

312

### Integration with Configuration

313

314

```python

315

from airflow.providers.openlineage.conf import selective_enable, disabled_operators

316

from airflow.providers.openlineage.utils.selective_enable import is_task_lineage_enabled

317

318

def should_collect_lineage(task):

319

"""Comprehensive lineage collection decision logic."""

320

321

# Check if operator type is disabled

322

operator_class = f"{task.__class__.__module__}.{task.__class__.__name__}"

323

if operator_class in disabled_operators():

324

return False

325

326

# Check selective enable mode

327

if selective_enable():

328

return is_task_lineage_enabled(task)

329

330

# Check task-specific disable

331

return is_task_lineage_enabled(task)

332

333

# Usage in custom extractor

334

class SmartExtractor(BaseExtractor):

335

def extract(self):

336

if not should_collect_lineage(self._operator):

337

return None

338

339

# Normal extraction logic

340

return OperatorLineage(...)

341

```

342

343

## Configuration Integration

344

345

### Airflow Configuration

346

347

The selective enable functionality integrates with Airflow configuration:

348

349

```ini

350

# airflow.cfg

351

[openlineage]

352

selective_enable = true

353

disabled_for_operators = airflow.operators.bash.BashOperator;airflow.operators.dummy.DummyOperator

354

```

355

356

### Environment Variables

357

358

```bash

359

# Enable selective mode

360

export AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true

361

362

# Disable specific operators

363

export AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS="airflow.operators.python.PythonOperator"

364

```

365

366

## Best Practices

367

368

### Production Environments

369

370

```python

371

# Enable lineage for critical data pipelines

372

critical_dag = enable_lineage(DAG(

373

'financial_reporting',

374

start_date=datetime(2023, 1, 1),

375

schedule_interval='@daily'

376

))

377

378

# Disable for high-frequency operational tasks

379

operational_dag = disable_lineage(DAG(

380

'system_monitoring',

381

start_date=datetime(2023, 1, 1),

382

schedule_interval='*/5 * * * *' # Every 5 minutes

383

))

384

```

385

386

### Development Workflows

387

388

```python

389

import os

390

391

def create_environment_aware_dag(dag_id: str, **kwargs):

392

"""Create DAG with environment-aware lineage settings."""

393

394

dag = DAG(dag_id, **kwargs)

395

396

# Enable lineage only in production and staging

397

environment = os.getenv('AIRFLOW_ENV', 'development')

398

if environment in ['production', 'staging']:

399

dag = enable_lineage(dag)

400

else:

401

dag = disable_lineage(dag)

402

403

return dag

404

```

405

406

### Performance Optimization

407

408

```python

409

# Disable lineage for resource-intensive DAGs in development

410

if os.getenv('AIRFLOW_ENV') == 'development':

411

heavy_processing_dag = disable_lineage(DAG(

412

'ml_model_training',

413

start_date=datetime(2023, 1, 1)

414

))

415

else:

416

heavy_processing_dag = enable_lineage(DAG(

417

'ml_model_training',

418

start_date=datetime(2023, 1, 1)

419

))

420

```