or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfacets-metadata.mdindex.mdlineage-extraction.mdplugin-integration.mdselective-control.mdspark-integration.mdsql-parsing.mdsql-utilities.mdtemplate-macros.mdutility-functions.md

template-macros.mddocs/

0

# Template Macros

1

2

Template macros for accessing OpenLineage information within DAG definitions and task templates. These macros provide runtime access to lineage identifiers and metadata for use in dynamic task configurations and downstream processing.

3

4

## Capabilities

5

6

### Job and Run Identification Macros

7

8

Macros for accessing OpenLineage job names and run identifiers within templates.

9

10

```python { .api }

11

def lineage_job_namespace() -> str:

12

"""

13

Get the OpenLineage namespace for the current context.

14

15

Returns:

16

str: Configured OpenLineage namespace

17

"""

18

19

def lineage_job_name(task_instance: TaskInstance) -> str:

20

"""

21

Get the OpenLineage job name for a task instance.

22

23

Args:

24

task_instance: Current task instance

25

26

Returns:

27

str: Formatted job name for OpenLineage events

28

"""

29

30

def lineage_run_id(task_instance: TaskInstance) -> str:

31

"""

32

Get the OpenLineage run ID for a task instance.

33

34

Args:

35

task_instance: Current task instance

36

37

Returns:

38

str: Unique run identifier for the task execution

39

"""

40

```

41

42

### Parent and Root Tracking Macros

43

44

Macros for accessing parent job and root execution information for hierarchical lineage tracking.

45

46

```python { .api }

47

def lineage_parent_id(task_instance: TaskInstance) -> str:

48

"""

49

Get the parent run identifier for a task instance.

50

51

Used for tracking nested job relationships and DAG-level lineage.

52

53

Args:

54

task_instance: Current task instance

55

56

Returns:

57

str: Parent run identifier (typically the DAG run)

58

"""

59

60

def lineage_root_parent_id(task_instance: TaskInstance) -> str:

61

"""

62

Get the root parent run identifier for a task instance.

63

64

Tracks the top-level execution context across nested workflows.

65

66

Args:

67

task_instance: Current task instance

68

69

Returns:

70

str: Root parent run identifier

71

"""

72

73

def lineage_root_job_name(task_instance: TaskInstance) -> str:

74

"""

75

Get the root job name for a task instance.

76

77

Provides the top-level job context for nested execution hierarchies.

78

79

Args:

80

task_instance: Current task instance

81

82

Returns:

83

str: Root job name identifier

84

"""

85

86

def lineage_root_run_id(task_instance: TaskInstance) -> str:

87

"""

88

Get the root run ID for a task instance.

89

90

Tracks the original execution that triggered nested workflows.

91

92

Args:

93

task_instance: Current task instance

94

95

Returns:

96

str: Root run identifier

97

"""

98

```

99

100

## Usage Examples

101

102

### Basic Template Usage

103

104

```python

105

from airflow import DAG

106

from airflow.operators.bash import BashOperator

107

from datetime import datetime

108

109

dag = DAG(

110

'lineage_macro_example',

111

start_date=datetime(2023, 1, 1),

112

schedule_interval='@daily'

113

)

114

115

# Use lineage macros in bash command

116

lineage_task = BashOperator(

117

task_id='log_lineage_info',

118

bash_command='''

119

echo "Job Namespace: {{ lineage_job_namespace() }}"

120

echo "Job Name: {{ lineage_job_name(task_instance) }}"

121

echo "Run ID: {{ lineage_run_id(task_instance) }}"

122

echo "Parent ID: {{ lineage_parent_id(task_instance) }}"

123

''',

124

dag=dag

125

)

126

```

127

128

### Python Operator Template Usage

129

130

```python

131

from airflow.operators.python import PythonOperator

132

133

def process_with_lineage_info(**context):

134

"""Process data with lineage information."""

135

136

# Access lineage info through context

137

namespace = context['lineage_job_namespace']()

138

job_name = context['lineage_job_name'](context['task_instance'])

139

run_id = context['lineage_run_id'](context['task_instance'])

140

parent_id = context['lineage_parent_id'](context['task_instance'])

141

142

print(f"Processing in namespace: {namespace}")

143

print(f"Job: {job_name}")

144

print(f"Run ID: {run_id}")

145

print(f"Parent Run: {parent_id}")

146

147

# Use lineage info in processing logic

148

output_path = f"/data/processed/{namespace}/{job_name}/{run_id}/result.parquet"

149

150

# ... processing logic ...

151

152

return output_path

153

154

python_task = PythonOperator(

155

task_id='process_with_lineage',

156

python_callable=process_with_lineage_info,

157

provide_context=True,

158

dag=dag

159

)

160

```

161

162

### SQL Operator Template Usage

163

164

```python

165

from airflow.providers.postgres.operators.postgres import PostgresOperator

166

167

# Use lineage macros in SQL templates

168

sql_task = PostgresOperator(

169

task_id='insert_lineage_metadata',

170

postgres_conn_id='analytics_db',

171

sql='''

172

INSERT INTO job_execution_log (

173

namespace,

174

job_name,

175

run_id,

176

parent_run_id,

177

execution_date,

178

created_at

179

) VALUES (

180

'{{ lineage_job_namespace() }}',

181

'{{ lineage_job_name(task_instance) }}',

182

'{{ lineage_run_id(task_instance) }}',

183

'{{ lineage_parent_id(task_instance) }}',

184

'{{ ds }}',

185

NOW()

186

);

187

''',

188

dag=dag

189

)

190

```

191

192

### Dynamic File Path Generation

193

194

```python

195

from airflow.operators.python import PythonOperator

196

197

def create_output_paths(**context):

198

"""Create standardized output paths using lineage information."""

199

200

namespace = context['lineage_job_namespace']()

201

job_name = context['lineage_job_name'](context['task_instance'])

202

run_id = context['lineage_run_id'](context['task_instance'])

203

204

# Create hierarchical paths

205

base_path = f"/data/warehouse/{namespace}"

206

job_path = f"{base_path}/{job_name}"

207

run_path = f"{job_path}/{run_id}"

208

209

paths = {

210

'base_path': base_path,

211

'job_path': job_path,

212

'run_path': run_path,

213

'output_file': f"{run_path}/processed_data.parquet",

214

'metadata_file': f"{run_path}/metadata.json"

215

}

216

217

return paths

218

219

path_task = PythonOperator(

220

task_id='create_paths',

221

python_callable=create_output_paths,

222

dag=dag

223

)

224

225

# Use XCom to pass paths to downstream tasks

226

def process_data(**context):

227

"""Process data using generated paths."""

228

229

paths = context['task_instance'].xcom_pull(task_ids='create_paths')

230

231

print(f"Writing output to: {paths['output_file']}")

232

print(f"Writing metadata to: {paths['metadata_file']}")

233

234

# ... processing logic ...

235

236

process_task = PythonOperator(

237

task_id='process_data',

238

python_callable=process_data,

239

dag=dag

240

)

241

242

path_task >> process_task

243

```

244

245

### Conditional Logic with Lineage Macros

246

247

```python

248

from airflow.operators.python import BranchPythonOperator

249

250

def choose_processing_branch(**context):

251

"""Choose processing branch based on lineage context."""

252

253

namespace = context['lineage_job_namespace']()

254

job_name = context['lineage_job_name'](context['task_instance'])

255

256

# Different processing for different namespaces

257

if namespace == 'production':

258

return 'production_processing'

259

elif namespace == 'staging':

260

return 'staging_processing'

261

else:

262

return 'development_processing'

263

264

branch_task = BranchPythonOperator(

265

task_id='choose_branch',

266

python_callable=choose_processing_branch,

267

dag=dag

268

)

269

270

# Different tasks for different environments

271

production_task = PythonOperator(

272

task_id='production_processing',

273

python_callable=lambda: print("Production processing"),

274

dag=dag

275

)

276

277

staging_task = PythonOperator(

278

task_id='staging_processing',

279

python_callable=lambda: print("Staging processing"),

280

dag=dag

281

)

282

283

development_task = PythonOperator(

284

task_id='development_processing',

285

python_callable=lambda: print("Development processing"),

286

dag=dag

287

)

288

289

branch_task >> [production_task, staging_task, development_task]

290

```

291

292

### External System Integration

293

294

```python

295

from airflow.operators.bash import BashOperator

296

297

# Call external API with lineage information

298

api_call_task = BashOperator(

299

task_id='notify_external_system',

300

bash_command='''

301

curl -X POST https://external-system.com/api/job-started \

302

-H "Content-Type: application/json" \

303

-d '{

304

"namespace": "{{ lineage_job_namespace() }}",

305

"job_name": "{{ lineage_job_name(task_instance) }}",

306

"run_id": "{{ lineage_run_id(task_instance) }}",

307

"parent_run_id": "{{ lineage_parent_id(task_instance) }}",

308

"execution_date": "{{ ds }}",

309

"dag_id": "{{ dag.dag_id }}",

310

"task_id": "{{ task.task_id }}"

311

}'

312

''',

313

dag=dag

314

)

315

```

316

317

### Root Context Tracking

318

319

```python

320

from airflow.operators.python import PythonOperator

321

322

def track_execution_hierarchy(**context):

323

"""Track complete execution hierarchy using root context."""

324

325

current_run = context['lineage_run_id'](context['task_instance'])

326

parent_run = context['lineage_parent_id'](context['task_instance'])

327

root_run = context['lineage_root_run_id'](context['task_instance'])

328

root_job = context['lineage_root_job_name'](context['task_instance'])

329

330

hierarchy = {

331

'current_run': current_run,

332

'parent_run': parent_run,

333

'root_run': root_run,

334

'root_job': root_job,

335

'hierarchy_depth': 0 if current_run == root_run else 1

336

}

337

338

print(f"Execution hierarchy: {hierarchy}")

339

340

# Store hierarchy for downstream processing

341

return hierarchy

342

343

hierarchy_task = PythonOperator(

344

task_id='track_hierarchy',

345

python_callable=track_execution_hierarchy,

346

dag=dag

347

)

348

```

349

350

### Custom Macro Usage

351

352

```python

353

from airflow.operators.python import PythonOperator

354

355

def custom_lineage_processing(**context):

356

"""Custom processing using all available lineage macros."""

357

358

ti = context['task_instance']

359

360

lineage_info = {

361

'namespace': context['lineage_job_namespace'](),

362

'job_name': context['lineage_job_name'](ti),

363

'run_id': context['lineage_run_id'](ti),

364

'parent_id': context['lineage_parent_id'](ti),

365

'root_parent_id': context['lineage_root_parent_id'](ti),

366

'root_job_name': context['lineage_root_job_name'](ti),

367

'root_run_id': context['lineage_root_run_id'](ti)

368

}

369

370

print("Complete lineage context:")

371

for key, value in lineage_info.items():

372

print(f" {key}: {value}")

373

374

# Use in business logic

375

unique_id = f"{lineage_info['namespace']}.{lineage_info['job_name']}.{lineage_info['run_id']}"

376

377

return {

378

'lineage_info': lineage_info,

379

'unique_id': unique_id

380

}

381

382

comprehensive_task = PythonOperator(

383

task_id='comprehensive_lineage',

384

python_callable=custom_lineage_processing,

385

dag=dag

386

)

387

```

388

389

## Integration Patterns

390

391

### Data Pipeline Traceability

392

393

```python

394

from airflow import DAG

395

from airflow.operators.python import PythonOperator

396

from datetime import datetime

397

398

def create_traceable_dag():

399

"""Create DAG with comprehensive lineage tracing."""

400

401

dag = DAG(

402

'traceable_pipeline',

403

start_date=datetime(2023, 1, 1),

404

schedule_interval='@daily'

405

)

406

407

def log_execution_start(**context):

408

lineage_info = {

409

'namespace': context['lineage_job_namespace'](),

410

'job_name': context['lineage_job_name'](context['task_instance']),

411

'run_id': context['lineage_run_id'](context['task_instance']),

412

'parent_id': context['lineage_parent_id'](context['task_instance'])

413

}

414

415

# Log to external tracing system

416

print(f"TRACE: Starting execution {lineage_info}")

417

return lineage_info

418

419

def process_data(**context):

420

lineage_info = context['task_instance'].xcom_pull(task_ids='log_start')

421

422

# Use lineage info in processing

423

print(f"TRACE: Processing data for {lineage_info['job_name']}")

424

425

# ... data processing ...

426

427

return "Processing complete"

428

429

def log_execution_end(**context):

430

lineage_info = context['task_instance'].xcom_pull(task_ids='log_start')

431

432

print(f"TRACE: Completed execution {lineage_info}")

433

434

start_task = PythonOperator(

435

task_id='log_start',

436

python_callable=log_execution_start,

437

dag=dag

438

)

439

440

process_task = PythonOperator(

441

task_id='process',

442

python_callable=process_data,

443

dag=dag

444

)

445

446

end_task = PythonOperator(

447

task_id='log_end',

448

python_callable=log_execution_end,

449

dag=dag

450

)

451

452

start_task >> process_task >> end_task

453

454

return dag

455

456

traceable_dag = create_traceable_dag()

457

```

458

459

### Multi-Environment Configuration

460

461

```python

462

import os

463

464

def create_environment_dag():

465

"""Create DAG with environment-specific lineage handling."""

466

467

environment = os.getenv('AIRFLOW_ENV', 'development')

468

469

dag = DAG(

470

f'multi_env_pipeline_{environment}',

471

start_date=datetime(2023, 1, 1)

472

)

473

474

def environment_specific_processing(**context):

475

namespace = context['lineage_job_namespace']()

476

job_name = context['lineage_job_name'](context['task_instance'])

477

478

# Environment-specific logic

479

if 'production' in namespace:

480

# Production-specific processing

481

output_path = f"/prod/data/{job_name}"

482

elif 'staging' in namespace:

483

# Staging-specific processing

484

output_path = f"/staging/data/{job_name}"

485

else:

486

# Development processing

487

output_path = f"/dev/data/{job_name}"

488

489

print(f"Processing for {environment} environment: {output_path}")

490

return output_path

491

492

process_task = PythonOperator(

493

task_id='environment_process',

494

python_callable=environment_specific_processing,

495

dag=dag

496

)

497

498

return dag

499

500

env_dag = create_environment_dag()

501

```

502

503

## Macro Availability

504

505

The lineage macros are automatically available in Airflow templates when the OpenLineage provider is installed:

506

507

```python

508

# Available in all template contexts:

509

# - Bash commands

510

# - SQL queries

511

# - Python operator arguments

512

# - Email templates

513

# - Any Airflow template field

514

515

# Example template usage across operators:

516

email_task = EmailOperator(

517

task_id='send_notification',

518

to=['admin@company.com'],

519

subject='Job {{ lineage_job_name(task_instance) }} completed',

520

html_content='''

521

<h2>Job Execution Complete</h2>

522

<p><strong>Namespace:</strong> {{ lineage_job_namespace() }}</p>

523

<p><strong>Job:</strong> {{ lineage_job_name(task_instance) }}</p>

524

<p><strong>Run ID:</strong> {{ lineage_run_id(task_instance) }}</p>

525

<p><strong>Parent Run:</strong> {{ lineage_parent_id(task_instance) }}</p>

526

''',

527

dag=dag

528

)

529

```