0
# Lineage and Metadata
1
2
Data lineage tracking and field-level transformations for governance, debugging, and compliance in CDAP ETL pipelines.
3
4
## Core Lineage Concepts
5
6
### AccessType
7
8
Enumeration of data access types for lineage tracking.
9
10
```java { .api }
11
package io.cdap.cdap.etl.api.lineage;
12
13
public enum AccessType {
14
READ, // Data read operation
15
WRITE, // Data write operation
16
UNKNOWN // Unknown access type
17
}
18
```
19
20
## Field-Level Lineage
21
22
### LineageRecorder
23
24
Interface for recording field-level lineage information during pipeline execution.
25
26
```java { .api }
27
package io.cdap.cdap.etl.api.lineage.field;
28
29
public interface LineageRecorder {
30
/**
31
* Record field operations for lineage tracking.
32
*/
33
void record(List<FieldOperation> fieldOperations);
34
}
35
```
36
37
**Lineage Recording Example:**
38
```java
39
@Plugin(type = Transform.PLUGIN_TYPE)
40
@Name("CustomerDataEnricher")
41
public class CustomerDataEnricher extends Transform<StructuredRecord, StructuredRecord> {
42
43
private final Config config;
44
45
@Override
46
public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
47
TransformContext context = getContext();
48
LineageRecorder lineageRecorder = context.getLineageRecorder();
49
50
// Build enriched customer record
51
StructuredRecord.Builder builder = StructuredRecord.builder(context.getOutputSchema());
52
53
// Direct field mappings
54
builder.set("customer_id", input.get("id"));
55
builder.set("first_name", input.get("fname"));
56
builder.set("last_name", input.get("lname"));
57
58
// Derived fields
59
String fullName = input.get("fname") + " " + input.get("lname");
60
builder.set("full_name", fullName);
61
62
String email = generateEmail(input.get("fname"), input.get("lname"));
63
builder.set("email", email);
64
65
// Lookup enrichment
66
String customerId = input.get("id");
67
CustomerProfile profile = lookupCustomerProfile(customerId);
68
if (profile != null) {
69
builder.set("segment", profile.getSegment());
70
builder.set("lifetime_value", profile.getLifetimeValue());
71
}
72
73
// Record field lineage
74
List<FieldOperation> operations = Arrays.asList(
75
// Direct field reads
76
new FieldReadOperation("read_customer_id",
77
"Read customer ID from source",
78
Arrays.asList("id")),
79
new FieldReadOperation("read_names",
80
"Read customer names from source",
81
Arrays.asList("fname", "lname")),
82
83
// Field transformations
84
new FieldTransformOperation("map_customer_id",
85
"Map source ID to customer_id",
86
Arrays.asList("id"),
87
Arrays.asList("customer_id")),
88
new FieldTransformOperation("map_first_name",
89
"Map fname to first_name",
90
Arrays.asList("fname"),
91
Arrays.asList("first_name")),
92
new FieldTransformOperation("map_last_name",
93
"Map lname to last_name",
94
Arrays.asList("lname"),
95
Arrays.asList("last_name")),
96
97
// Derived field operations
98
new FieldTransformOperation("derive_full_name",
99
"Concatenate first and last name",
100
Arrays.asList("fname", "lname"),
101
Arrays.asList("full_name")),
102
new FieldTransformOperation("generate_email",
103
"Generate email from names",
104
Arrays.asList("fname", "lname"),
105
Arrays.asList("email")),
106
107
// Lookup operations
108
new FieldTransformOperation("lookup_segment",
109
"Lookup customer segment",
110
Arrays.asList("id"),
111
Arrays.asList("segment")),
112
new FieldTransformOperation("lookup_lifetime_value",
113
"Lookup customer lifetime value",
114
Arrays.asList("id"),
115
Arrays.asList("lifetime_value")),
116
117
// Final write operation
118
new FieldWriteOperation("write_enriched_customer",
119
"Write enriched customer record",
120
Arrays.asList("customer_id", "first_name", "last_name",
121
"full_name", "email", "segment", "lifetime_value"))
122
);
123
124
lineageRecorder.record(operations);
125
126
emitter.emit(builder.build());
127
}
128
}
129
```
130
131
### FieldOperation
132
133
Base class for field operations in lineage tracking.
134
135
```java { .api }
136
package io.cdap.cdap.etl.api.lineage.field;
137
138
public class FieldOperation {
139
/**
140
* Create field operation.
141
*/
142
public FieldOperation(String name, String description, OperationType type,
143
List<String> inputs, List<String> outputs) {}
144
145
/**
146
* Get operation name.
147
*/
148
public String getName() {}
149
150
/**
151
* Get operation description.
152
*/
153
public String getDescription() {}
154
155
/**
156
* Get operation type.
157
*/
158
public OperationType getType() {}
159
160
/**
161
* Get input field names.
162
*/
163
public List<String> getInputs() {}
164
165
/**
166
* Get output field names.
167
*/
168
public List<String> getOutputs() {}
169
}
170
```
171
172
### FieldReadOperation
173
174
Field operation for reading data from source fields.
175
176
```java { .api }
177
package io.cdap.cdap.etl.api.lineage.field;
178
179
public class FieldReadOperation extends FieldOperation {
180
/**
181
* Create read operation for fields.
182
*/
183
public FieldReadOperation(String name, String description, List<String> fields) {
184
super(name, description, OperationType.READ, Collections.emptyList(), fields);
185
}
186
}
187
```
188
189
### FieldWriteOperation
190
191
Field operation for writing data to destination fields.
192
193
```java { .api }
194
package io.cdap.cdap.etl.api.lineage.field;
195
196
public class FieldWriteOperation extends FieldOperation {
197
/**
198
* Create write operation for fields.
199
*/
200
public FieldWriteOperation(String name, String description, List<String> fields) {
201
super(name, description, OperationType.WRITE, fields, Collections.emptyList());
202
}
203
}
204
```
205
206
### FieldTransformOperation
207
208
Field operation for data transformations between fields.
209
210
```java { .api }
211
package io.cdap.cdap.etl.api.lineage.field;
212
213
public class FieldTransformOperation extends FieldOperation {
214
/**
215
* Create transform operation from inputs to outputs.
216
*/
217
public FieldTransformOperation(String name, String description,
218
List<String> inputs, List<String> outputs) {
219
super(name, description, OperationType.TRANSFORM, inputs, outputs);
220
}
221
}
222
```
223
224
### OperationType
225
226
Enumeration of field operation types.
227
228
```java { .api }
229
package io.cdap.cdap.etl.api.lineage.field;
230
231
public enum OperationType {
232
READ, // Read operation from source
233
WRITE, // Write operation to destination
234
TRANSFORM // Transformation between fields
235
}
236
```
237
238
## Complex Lineage Tracking Examples
239
240
### Data Aggregation Lineage
241
242
```java
243
@Plugin(type = BatchAggregator.PLUGIN_TYPE)
244
@Name("SalesAggregator")
245
public class SalesAggregator extends BatchAggregator<String, StructuredRecord, StructuredRecord> {
246
247
@Override
248
public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues,
249
Emitter<StructuredRecord> emitter) throws Exception {
250
251
// Collect aggregation data
252
double totalSales = 0.0;
253
int orderCount = 0;
254
double maxOrderValue = 0.0;
255
double minOrderValue = Double.MAX_VALUE;
256
Set<String> uniqueCustomers = new HashSet<>();
257
258
while (groupValues.hasNext()) {
259
StructuredRecord record = groupValues.next();
260
Double salesAmount = record.get("sales_amount");
261
String customerId = record.get("customer_id");
262
263
if (salesAmount != null) {
264
totalSales += salesAmount;
265
orderCount++;
266
maxOrderValue = Math.max(maxOrderValue, salesAmount);
267
minOrderValue = Math.min(minOrderValue, salesAmount);
268
}
269
270
if (customerId != null) {
271
uniqueCustomers.add(customerId);
272
}
273
}
274
275
double avgOrderValue = orderCount > 0 ? totalSales / orderCount : 0.0;
276
277
// Build result record
278
StructuredRecord result = StructuredRecord.builder(getContext().getOutputSchema())
279
.set("region", groupKey)
280
.set("total_sales", totalSales)
281
.set("order_count", orderCount)
282
.set("avg_order_value", avgOrderValue)
283
.set("max_order_value", maxOrderValue == 0.0 ? null : maxOrderValue)
284
.set("min_order_value", minOrderValue == Double.MAX_VALUE ? null : minOrderValue)
285
.set("unique_customers", uniqueCustomers.size())
286
.build();
287
288
// Record detailed lineage for aggregation
289
BatchRuntimeContext context = getContext();
290
LineageRecorder lineageRecorder = context.getLineageRecorder();
291
292
List<FieldOperation> operations = Arrays.asList(
293
// Input field reads
294
new FieldReadOperation("read_sales_data",
295
"Read sales transaction data",
296
Arrays.asList("region", "sales_amount", "customer_id")),
297
298
// Aggregation operations
299
new FieldTransformOperation("group_by_region",
300
"Group sales data by region",
301
Arrays.asList("region"),
302
Arrays.asList("region")),
303
new FieldTransformOperation("sum_sales_amount",
304
"Sum sales amounts for region",
305
Arrays.asList("sales_amount"),
306
Arrays.asList("total_sales")),
307
new FieldTransformOperation("count_orders",
308
"Count number of orders",
309
Arrays.asList("sales_amount"),
310
Arrays.asList("order_count")),
311
new FieldTransformOperation("calculate_avg_order",
312
"Calculate average order value",
313
Arrays.asList("sales_amount"),
314
Arrays.asList("avg_order_value")),
315
new FieldTransformOperation("find_max_order",
316
"Find maximum order value",
317
Arrays.asList("sales_amount"),
318
Arrays.asList("max_order_value")),
319
new FieldTransformOperation("find_min_order",
320
"Find minimum order value",
321
Arrays.asList("sales_amount"),
322
Arrays.asList("min_order_value")),
323
new FieldTransformOperation("count_unique_customers",
324
"Count unique customers in region",
325
Arrays.asList("customer_id"),
326
Arrays.asList("unique_customers")),
327
328
// Output write
329
new FieldWriteOperation("write_aggregated_sales",
330
"Write aggregated sales summary",
331
Arrays.asList("region", "total_sales", "order_count",
332
"avg_order_value", "max_order_value",
333
"min_order_value", "unique_customers"))
334
);
335
336
lineageRecorder.record(operations);
337
emitter.emit(result);
338
}
339
}
340
```
341
342
### Join Operation Lineage
343
344
```java
345
@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)
346
@Name("CustomerOrderJoiner")
347
public class CustomerOrderJoiner extends BatchAutoJoiner {
348
349
@Override
350
public JoinDefinition define(AutoJoinerContext context) {
351
// Join definition code...
352
353
// Record lineage for join operation
354
recordJoinLineage(context);
355
356
return joinDefinition;
357
}
358
359
private void recordJoinLineage(AutoJoinerContext context) {
360
// This would typically be called during runtime, but shown here for illustration
361
List<FieldOperation> operations = Arrays.asList(
362
// Read operations from each input stage
363
new FieldReadOperation("read_customer_data",
364
"Read customer information",
365
Arrays.asList("customers.customer_id", "customers.name",
366
"customers.email", "customers.registration_date")),
367
new FieldReadOperation("read_order_data",
368
"Read order information",
369
Arrays.asList("orders.order_id", "orders.customer_id",
370
"orders.amount", "orders.order_date")),
371
372
// Join key matching
373
new FieldTransformOperation("join_on_customer_id",
374
"Join customers and orders on customer_id",
375
Arrays.asList("customers.customer_id", "orders.customer_id"),
376
Arrays.asList("customer_id")),
377
378
// Field selections and mappings
379
new FieldTransformOperation("select_customer_info",
380
"Select customer information fields",
381
Arrays.asList("customers.customer_id", "customers.name", "customers.email"),
382
Arrays.asList("customer_id", "customer_name", "customer_email")),
383
new FieldTransformOperation("select_order_info",
384
"Select order information fields",
385
Arrays.asList("orders.order_id", "orders.amount", "orders.order_date"),
386
Arrays.asList("order_id", "order_amount", "order_date")),
387
388
// Derived fields
389
new FieldTransformOperation("derive_customer_since",
390
"Map registration date to customer_since",
391
Arrays.asList("customers.registration_date"),
392
Arrays.asList("customer_since")),
393
394
// Output write
395
new FieldWriteOperation("write_joined_data",
396
"Write joined customer and order data",
397
Arrays.asList("customer_id", "customer_name", "customer_email",
398
"customer_since", "order_id", "order_amount", "order_date"))
399
);
400
401
// In actual implementation, this would be recorded during runtime
402
// LineageRecorder would be available in the runtime context
403
}
404
}
405
```
406
407
### Multi-Stage Pipeline Lineage
408
409
```java
410
public class PipelineLineageTracker {
411
412
public static void recordPipelineLineage(PipelineContext pipelineContext,
413
Map<String, List<FieldOperation>> stageOperations) {
414
LineageRecorder recorder = pipelineContext.getLineageRecorder();
415
416
// Combine operations from all stages
417
List<FieldOperation> allOperations = new ArrayList<>();
418
419
// Add stage-specific operations
420
for (Map.Entry<String, List<FieldOperation>> entry : stageOperations.entrySet()) {
421
String stageName = entry.getKey();
422
List<FieldOperation> operations = entry.getValue();
423
424
// Prefix operation names with stage name for clarity
425
List<FieldOperation> prefixedOperations = operations.stream()
426
.map(op -> new FieldOperation(
427
stageName + "." + op.getName(),
428
"[" + stageName + "] " + op.getDescription(),
429
op.getType(),
430
op.getInputs(),
431
op.getOutputs()
432
))
433
.collect(Collectors.toList());
434
435
allOperations.addAll(prefixedOperations);
436
}
437
438
// Add pipeline-level operations
439
allOperations.add(new FieldReadOperation("pipeline.source_read",
440
"Pipeline source data read",
441
Arrays.asList("raw_data.*")));
442
allOperations.add(new FieldWriteOperation("pipeline.sink_write",
443
"Pipeline final data write",
444
Arrays.asList("processed_data.*")));
445
446
// Record complete lineage
447
recorder.record(allOperations);
448
}
449
}
450
```
451
452
## Lineage Utilities and Best Practices
453
454
### Lineage Operation Builder
455
456
```java
457
public class LineageOperationBuilder {
458
459
public static class ReadOperationBuilder {
460
private String name;
461
private String description;
462
private List<String> fields = new ArrayList<>();
463
464
public ReadOperationBuilder name(String name) {
465
this.name = name;
466
return this;
467
}
468
469
public ReadOperationBuilder description(String description) {
470
this.description = description;
471
return this;
472
}
473
474
public ReadOperationBuilder fields(String... fields) {
475
this.fields.addAll(Arrays.asList(fields));
476
return this;
477
}
478
479
public FieldReadOperation build() {
480
return new FieldReadOperation(name, description, fields);
481
}
482
}
483
484
public static class TransformOperationBuilder {
485
private String name;
486
private String description;
487
private List<String> inputs = new ArrayList<>();
488
private List<String> outputs = new ArrayList<>();
489
490
public TransformOperationBuilder name(String name) {
491
this.name = name;
492
return this;
493
}
494
495
public TransformOperationBuilder description(String description) {
496
this.description = description;
497
return this;
498
}
499
500
public TransformOperationBuilder inputs(String... inputs) {
501
this.inputs.addAll(Arrays.asList(inputs));
502
return this;
503
}
504
505
public TransformOperationBuilder outputs(String... outputs) {
506
this.outputs.addAll(Arrays.asList(outputs));
507
return this;
508
}
509
510
public FieldTransformOperation build() {
511
return new FieldTransformOperation(name, description, inputs, outputs);
512
}
513
}
514
515
public static ReadOperationBuilder read() {
516
return new ReadOperationBuilder();
517
}
518
519
public static TransformOperationBuilder transform() {
520
return new TransformOperationBuilder();
521
}
522
523
public static FieldWriteOperation write(String name, String description, String... fields) {
524
return new FieldWriteOperation(name, description, Arrays.asList(fields));
525
}
526
}
527
528
// Usage example:
529
List<FieldOperation> operations = Arrays.asList(
530
LineageOperationBuilder.read()
531
.name("read_source")
532
.description("Read source customer data")
533
.fields("customer_id", "first_name", "last_name", "email")
534
.build(),
535
536
LineageOperationBuilder.transform()
537
.name("standardize_names")
538
.description("Standardize name fields")
539
.inputs("first_name", "last_name")
540
.outputs("std_first_name", "std_last_name")
541
.build(),
542
543
LineageOperationBuilder.write("write_output",
544
"Write standardized customer data",
545
"customer_id", "std_first_name", "std_last_name", "email")
546
);
547
```
548
549
### Lineage Analysis Tools
550
551
```java
552
public class LineageAnalyzer {
553
554
public static Map<String, Set<String>> buildFieldDependencyGraph(List<FieldOperation> operations) {
555
Map<String, Set<String>> dependencies = new HashMap<>();
556
557
for (FieldOperation operation : operations) {
558
if (operation.getType() == OperationType.TRANSFORM) {
559
for (String output : operation.getOutputs()) {
560
dependencies.computeIfAbsent(output, k -> new HashSet<>())
561
.addAll(operation.getInputs());
562
}
563
}
564
}
565
566
return dependencies;
567
}
568
569
public static List<String> getFieldLineage(String fieldName,
570
Map<String, Set<String>> dependencyGraph) {
571
List<String> lineage = new ArrayList<>();
572
Set<String> visited = new HashSet<>();
573
574
buildLineage(fieldName, dependencyGraph, lineage, visited);
575
Collections.reverse(lineage); // Reverse to show source-to-target order
576
577
return lineage;
578
}
579
580
private static void buildLineage(String fieldName,
581
Map<String, Set<String>> dependencyGraph,
582
List<String> lineage,
583
Set<String> visited) {
584
if (visited.contains(fieldName)) {
585
return; // Avoid cycles
586
}
587
588
visited.add(fieldName);
589
lineage.add(fieldName);
590
591
Set<String> dependencies = dependencyGraph.get(fieldName);
592
if (dependencies != null) {
593
for (String dependency : dependencies) {
594
buildLineage(dependency, dependencyGraph, lineage, visited);
595
}
596
}
597
}
598
599
public static Set<String> findImpactedFields(String sourceField,
600
Map<String, Set<String>> dependencyGraph) {
601
Set<String> impacted = new HashSet<>();
602
603
for (Map.Entry<String, Set<String>> entry : dependencyGraph.entrySet()) {
604
if (entry.getValue().contains(sourceField)) {
605
impacted.add(entry.getKey());
606
// Recursively find fields impacted by this field
607
impacted.addAll(findImpactedFields(entry.getKey(), dependencyGraph));
608
}
609
}
610
611
return impacted;
612
}
613
}
614
```
615
616
## Metadata and Governance Integration
617
618
### Metadata Enrichment
619
620
```java
621
public class MetadataEnrichedLineage {
622
623
public static class EnrichedFieldOperation extends FieldOperation {
624
private final Map<String, Object> metadata;
625
626
public EnrichedFieldOperation(FieldOperation baseOperation,
627
Map<String, Object> metadata) {
628
super(baseOperation.getName(), baseOperation.getDescription(),
629
baseOperation.getType(), baseOperation.getInputs(),
630
baseOperation.getOutputs());
631
this.metadata = metadata;
632
}
633
634
public Map<String, Object> getMetadata() {
635
return metadata;
636
}
637
638
public Object getMetadata(String key) {
639
return metadata.get(key);
640
}
641
}
642
643
public static EnrichedFieldOperation enrichWithMetadata(FieldOperation operation,
644
Schema inputSchema,
645
Schema outputSchema) {
646
Map<String, Object> metadata = new HashMap<>();
647
648
// Add schema information
649
metadata.put("inputSchema", inputSchema != null ? inputSchema.toString() : null);
650
metadata.put("outputSchema", outputSchema != null ? outputSchema.toString() : null);
651
652
// Add field type information
653
if (inputSchema != null) {
654
Map<String, String> inputTypes = new HashMap<>();
655
for (String fieldName : operation.getInputs()) {
656
Schema.Field field = inputSchema.getField(fieldName);
657
if (field != null) {
658
inputTypes.put(fieldName, field.getSchema().getType().toString());
659
}
660
}
661
metadata.put("inputFieldTypes", inputTypes);
662
}
663
664
if (outputSchema != null) {
665
Map<String, String> outputTypes = new HashMap<>();
666
for (String fieldName : operation.getOutputs()) {
667
Schema.Field field = outputSchema.getField(fieldName);
668
if (field != null) {
669
outputTypes.put(fieldName, field.getSchema().getType().toString());
670
}
671
}
672
metadata.put("outputFieldTypes", outputTypes);
673
}
674
675
// Add timestamp
676
metadata.put("recordedAt", Instant.now().toString());
677
678
// Add operation complexity
679
metadata.put("complexity", calculateComplexity(operation));
680
681
return new EnrichedFieldOperation(operation, metadata);
682
}
683
684
private static String calculateComplexity(FieldOperation operation) {
685
int inputCount = operation.getInputs().size();
686
int outputCount = operation.getOutputs().size();
687
688
if (inputCount == 1 && outputCount == 1) {
689
return "SIMPLE";
690
} else if (inputCount > 1 && outputCount == 1) {
691
return "AGGREGATION";
692
} else if (inputCount == 1 && outputCount > 1) {
693
return "EXPANSION";
694
} else {
695
return "COMPLEX";
696
}
697
}
698
}
699
```