0
# Template Macros
1
2
Template macros for accessing OpenLineage information within DAG definitions and task templates. These macros provide runtime access to lineage identifiers and metadata for use in dynamic task configurations and downstream processing.
3
4
## Capabilities
5
6
### Job and Run Identification Macros
7
8
Macros for accessing OpenLineage job names and run identifiers within templates.
9
10
```python { .api }
11
def lineage_job_namespace() -> str:
12
"""
13
Get the OpenLineage namespace for the current context.
14
15
Returns:
16
str: Configured OpenLineage namespace
17
"""
18
19
def lineage_job_name(task_instance: TaskInstance) -> str:
20
"""
21
Get the OpenLineage job name for a task instance.
22
23
Args:
24
task_instance: Current task instance
25
26
Returns:
27
str: Formatted job name for OpenLineage events
28
"""
29
30
def lineage_run_id(task_instance: TaskInstance) -> str:
31
"""
32
Get the OpenLineage run ID for a task instance.
33
34
Args:
35
task_instance: Current task instance
36
37
Returns:
38
str: Unique run identifier for the task execution
39
"""
40
```
41
42
### Parent and Root Tracking Macros
43
44
Macros for accessing parent job and root execution information for hierarchical lineage tracking.
45
46
```python { .api }
47
def lineage_parent_id(task_instance: TaskInstance) -> str:
48
"""
49
Get the parent run identifier for a task instance.
50
51
Used for tracking nested job relationships and DAG-level lineage.
52
53
Args:
54
task_instance: Current task instance
55
56
Returns:
57
str: Parent run identifier (typically the DAG run)
58
"""
59
60
def lineage_root_parent_id(task_instance: TaskInstance) -> str:
61
"""
62
Get the root parent run identifier for a task instance.
63
64
Tracks the top-level execution context across nested workflows.
65
66
Args:
67
task_instance: Current task instance
68
69
Returns:
70
str: Root parent run identifier
71
"""
72
73
def lineage_root_job_name(task_instance: TaskInstance) -> str:
74
"""
75
Get the root job name for a task instance.
76
77
Provides the top-level job context for nested execution hierarchies.
78
79
Args:
80
task_instance: Current task instance
81
82
Returns:
83
str: Root job name identifier
84
"""
85
86
def lineage_root_run_id(task_instance: TaskInstance) -> str:
87
"""
88
Get the root run ID for a task instance.
89
90
Tracks the original execution that triggered nested workflows.
91
92
Args:
93
task_instance: Current task instance
94
95
Returns:
96
str: Root run identifier
97
"""
98
```
99
100
## Usage Examples
101
102
### Basic Template Usage
103
104
```python
105
from airflow import DAG
106
from airflow.operators.bash import BashOperator
107
from datetime import datetime
108
109
dag = DAG(
110
'lineage_macro_example',
111
start_date=datetime(2023, 1, 1),
112
schedule_interval='@daily'
113
)
114
115
# Use lineage macros in bash command
116
lineage_task = BashOperator(
117
task_id='log_lineage_info',
118
bash_command='''
119
echo "Job Namespace: {{ lineage_job_namespace() }}"
120
echo "Job Name: {{ lineage_job_name(task_instance) }}"
121
echo "Run ID: {{ lineage_run_id(task_instance) }}"
122
echo "Parent ID: {{ lineage_parent_id(task_instance) }}"
123
''',
124
dag=dag
125
)
126
```
127
128
### Python Operator Template Usage
129
130
```python
131
from airflow.operators.python import PythonOperator
132
133
def process_with_lineage_info(**context):
134
"""Process data with lineage information."""
135
136
# Access lineage info through context
137
namespace = context['lineage_job_namespace']()
138
job_name = context['lineage_job_name'](context['task_instance'])
139
run_id = context['lineage_run_id'](context['task_instance'])
140
parent_id = context['lineage_parent_id'](context['task_instance'])
141
142
print(f"Processing in namespace: {namespace}")
143
print(f"Job: {job_name}")
144
print(f"Run ID: {run_id}")
145
print(f"Parent Run: {parent_id}")
146
147
# Use lineage info in processing logic
148
output_path = f"/data/processed/{namespace}/{job_name}/{run_id}/result.parquet"
149
150
# ... processing logic ...
151
152
return output_path
153
154
python_task = PythonOperator(
155
task_id='process_with_lineage',
156
python_callable=process_with_lineage_info,
157
provide_context=True,
158
dag=dag
159
)
160
```
161
162
### SQL Operator Template Usage
163
164
```python
165
from airflow.providers.postgres.operators.postgres import PostgresOperator
166
167
# Use lineage macros in SQL templates
168
sql_task = PostgresOperator(
169
task_id='insert_lineage_metadata',
170
postgres_conn_id='analytics_db',
171
sql='''
172
INSERT INTO job_execution_log (
173
namespace,
174
job_name,
175
run_id,
176
parent_run_id,
177
execution_date,
178
created_at
179
) VALUES (
180
'{{ lineage_job_namespace() }}',
181
'{{ lineage_job_name(task_instance) }}',
182
'{{ lineage_run_id(task_instance) }}',
183
'{{ lineage_parent_id(task_instance) }}',
184
'{{ ds }}',
185
NOW()
186
);
187
''',
188
dag=dag
189
)
190
```
191
192
### Dynamic File Path Generation
193
194
```python
195
from airflow.operators.python import PythonOperator
196
197
def create_output_paths(**context):
198
"""Create standardized output paths using lineage information."""
199
200
namespace = context['lineage_job_namespace']()
201
job_name = context['lineage_job_name'](context['task_instance'])
202
run_id = context['lineage_run_id'](context['task_instance'])
203
204
# Create hierarchical paths
205
base_path = f"/data/warehouse/{namespace}"
206
job_path = f"{base_path}/{job_name}"
207
run_path = f"{job_path}/{run_id}"
208
209
paths = {
210
'base_path': base_path,
211
'job_path': job_path,
212
'run_path': run_path,
213
'output_file': f"{run_path}/processed_data.parquet",
214
'metadata_file': f"{run_path}/metadata.json"
215
}
216
217
return paths
218
219
path_task = PythonOperator(
220
task_id='create_paths',
221
python_callable=create_output_paths,
222
dag=dag
223
)
224
225
# Use XCom to pass paths to downstream tasks
226
def process_data(**context):
227
"""Process data using generated paths."""
228
229
paths = context['task_instance'].xcom_pull(task_ids='create_paths')
230
231
print(f"Writing output to: {paths['output_file']}")
232
print(f"Writing metadata to: {paths['metadata_file']}")
233
234
# ... processing logic ...
235
236
process_task = PythonOperator(
237
task_id='process_data',
238
python_callable=process_data,
239
dag=dag
240
)
241
242
path_task >> process_task
243
```
244
245
### Conditional Logic with Lineage Macros
246
247
```python
248
from airflow.operators.python import BranchPythonOperator
249
250
def choose_processing_branch(**context):
251
"""Choose processing branch based on lineage context."""
252
253
namespace = context['lineage_job_namespace']()
254
job_name = context['lineage_job_name'](context['task_instance'])
255
256
# Different processing for different namespaces
257
if namespace == 'production':
258
return 'production_processing'
259
elif namespace == 'staging':
260
return 'staging_processing'
261
else:
262
return 'development_processing'
263
264
branch_task = BranchPythonOperator(
265
task_id='choose_branch',
266
python_callable=choose_processing_branch,
267
dag=dag
268
)
269
270
# Different tasks for different environments
271
production_task = PythonOperator(
272
task_id='production_processing',
273
python_callable=lambda: print("Production processing"),
274
dag=dag
275
)
276
277
staging_task = PythonOperator(
278
task_id='staging_processing',
279
python_callable=lambda: print("Staging processing"),
280
dag=dag
281
)
282
283
development_task = PythonOperator(
284
task_id='development_processing',
285
python_callable=lambda: print("Development processing"),
286
dag=dag
287
)
288
289
branch_task >> [production_task, staging_task, development_task]
290
```
291
292
### External System Integration
293
294
```python
295
from airflow.operators.bash import BashOperator
296
297
# Call external API with lineage information
298
api_call_task = BashOperator(
299
task_id='notify_external_system',
300
bash_command='''
301
curl -X POST https://external-system.com/api/job-started \
302
-H "Content-Type: application/json" \
303
-d '{
304
"namespace": "{{ lineage_job_namespace() }}",
305
"job_name": "{{ lineage_job_name(task_instance) }}",
306
"run_id": "{{ lineage_run_id(task_instance) }}",
307
"parent_run_id": "{{ lineage_parent_id(task_instance) }}",
308
"execution_date": "{{ ds }}",
309
"dag_id": "{{ dag.dag_id }}",
310
"task_id": "{{ task.task_id }}"
311
}'
312
''',
313
dag=dag
314
)
315
```
316
317
### Root Context Tracking
318
319
```python
320
from airflow.operators.python import PythonOperator
321
322
def track_execution_hierarchy(**context):
323
"""Track complete execution hierarchy using root context."""
324
325
current_run = context['lineage_run_id'](context['task_instance'])
326
parent_run = context['lineage_parent_id'](context['task_instance'])
327
root_run = context['lineage_root_run_id'](context['task_instance'])
328
root_job = context['lineage_root_job_name'](context['task_instance'])
329
330
hierarchy = {
331
'current_run': current_run,
332
'parent_run': parent_run,
333
'root_run': root_run,
334
'root_job': root_job,
335
'hierarchy_depth': 0 if current_run == root_run else 1
336
}
337
338
print(f"Execution hierarchy: {hierarchy}")
339
340
# Store hierarchy for downstream processing
341
return hierarchy
342
343
hierarchy_task = PythonOperator(
344
task_id='track_hierarchy',
345
python_callable=track_execution_hierarchy,
346
dag=dag
347
)
348
```
349
350
### Custom Macro Usage
351
352
```python
353
from airflow.operators.python import PythonOperator
354
355
def custom_lineage_processing(**context):
356
"""Custom processing using all available lineage macros."""
357
358
ti = context['task_instance']
359
360
lineage_info = {
361
'namespace': context['lineage_job_namespace'](),
362
'job_name': context['lineage_job_name'](ti),
363
'run_id': context['lineage_run_id'](ti),
364
'parent_id': context['lineage_parent_id'](ti),
365
'root_parent_id': context['lineage_root_parent_id'](ti),
366
'root_job_name': context['lineage_root_job_name'](ti),
367
'root_run_id': context['lineage_root_run_id'](ti)
368
}
369
370
print("Complete lineage context:")
371
for key, value in lineage_info.items():
372
print(f" {key}: {value}")
373
374
# Use in business logic
375
unique_id = f"{lineage_info['namespace']}.{lineage_info['job_name']}.{lineage_info['run_id']}"
376
377
return {
378
'lineage_info': lineage_info,
379
'unique_id': unique_id
380
}
381
382
comprehensive_task = PythonOperator(
383
task_id='comprehensive_lineage',
384
python_callable=custom_lineage_processing,
385
dag=dag
386
)
387
```
388
389
## Integration Patterns
390
391
### Data Pipeline Traceability
392
393
```python
394
from airflow import DAG
395
from airflow.operators.python import PythonOperator
396
from datetime import datetime
397
398
def create_traceable_dag():
399
"""Create DAG with comprehensive lineage tracing."""
400
401
dag = DAG(
402
'traceable_pipeline',
403
start_date=datetime(2023, 1, 1),
404
schedule_interval='@daily'
405
)
406
407
def log_execution_start(**context):
408
lineage_info = {
409
'namespace': context['lineage_job_namespace'](),
410
'job_name': context['lineage_job_name'](context['task_instance']),
411
'run_id': context['lineage_run_id'](context['task_instance']),
412
'parent_id': context['lineage_parent_id'](context['task_instance'])
413
}
414
415
# Log to external tracing system
416
print(f"TRACE: Starting execution {lineage_info}")
417
return lineage_info
418
419
def process_data(**context):
420
lineage_info = context['task_instance'].xcom_pull(task_ids='log_start')
421
422
# Use lineage info in processing
423
print(f"TRACE: Processing data for {lineage_info['job_name']}")
424
425
# ... data processing ...
426
427
return "Processing complete"
428
429
def log_execution_end(**context):
430
lineage_info = context['task_instance'].xcom_pull(task_ids='log_start')
431
432
print(f"TRACE: Completed execution {lineage_info}")
433
434
start_task = PythonOperator(
435
task_id='log_start',
436
python_callable=log_execution_start,
437
dag=dag
438
)
439
440
process_task = PythonOperator(
441
task_id='process',
442
python_callable=process_data,
443
dag=dag
444
)
445
446
end_task = PythonOperator(
447
task_id='log_end',
448
python_callable=log_execution_end,
449
dag=dag
450
)
451
452
start_task >> process_task >> end_task
453
454
return dag
455
456
traceable_dag = create_traceable_dag()
457
```
458
459
### Multi-Environment Configuration
460
461
```python
462
import os
463
464
def create_environment_dag():
465
"""Create DAG with environment-specific lineage handling."""
466
467
environment = os.getenv('AIRFLOW_ENV', 'development')
468
469
dag = DAG(
470
f'multi_env_pipeline_{environment}',
471
start_date=datetime(2023, 1, 1)
472
)
473
474
def environment_specific_processing(**context):
475
namespace = context['lineage_job_namespace']()
476
job_name = context['lineage_job_name'](context['task_instance'])
477
478
# Environment-specific logic
479
if 'production' in namespace:
480
# Production-specific processing
481
output_path = f"/prod/data/{job_name}"
482
elif 'staging' in namespace:
483
# Staging-specific processing
484
output_path = f"/staging/data/{job_name}"
485
else:
486
# Development processing
487
output_path = f"/dev/data/{job_name}"
488
489
print(f"Processing for {environment} environment: {output_path}")
490
return output_path
491
492
process_task = PythonOperator(
493
task_id='environment_process',
494
python_callable=environment_specific_processing,
495
dag=dag
496
)
497
498
return dag
499
500
env_dag = create_environment_dag()
501
```
502
503
## Macro Availability
504
505
The lineage macros are automatically available in Airflow templates when the OpenLineage provider is installed:
506
507
```python
508
# Available in all template contexts:
509
# - Bash commands
510
# - SQL queries
511
# - Python operator arguments
512
# - Email templates
513
# - Any Airflow template field
514
515
# Example template usage across operators:
516
email_task = EmailOperator(
517
task_id='send_notification',
518
to=['admin@company.com'],
519
subject='Job {{ lineage_job_name(task_instance) }} completed',
520
html_content='''
521
<h2>Job Execution Complete</h2>
522
<p><strong>Namespace:</strong> {{ lineage_job_namespace() }}</p>
523
<p><strong>Job:</strong> {{ lineage_job_name(task_instance) }}</p>
524
<p><strong>Run ID:</strong> {{ lineage_run_id(task_instance) }}</p>
525
<p><strong>Parent Run:</strong> {{ lineage_parent_id(task_instance) }}</p>
526
''',
527
dag=dag
528
)
529
```