CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap-etl-api

CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform

Pending
Overview
Eval results
Files

lineage-metadata.mddocs/

Lineage and Metadata

Data lineage tracking and field-level transformations for governance, debugging, and compliance in CDAP ETL pipelines.

Core Lineage Concepts

AccessType

Enumeration of data access types for lineage tracking.

package io.cdap.cdap.etl.api.lineage;

public enum AccessType {
    READ,     // Data read operation
    WRITE,    // Data write operation  
    UNKNOWN   // Unknown access type
}

Field-Level Lineage

LineageRecorder

Interface for recording field-level lineage information during pipeline execution.

package io.cdap.cdap.etl.api.lineage.field;

public interface LineageRecorder {
    /**
     * Record field operations for lineage tracking.
     */
    void record(List<FieldOperation> fieldOperations);
}

Lineage Recording Example:

@Plugin(type = Transform.PLUGIN_TYPE)
@Name("CustomerDataEnricher")
public class CustomerDataEnricher extends Transform<StructuredRecord, StructuredRecord> {
    
    private final Config config;
    
    @Override
    public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
        TransformContext context = getContext();
        LineageRecorder lineageRecorder = context.getLineageRecorder();
        
        // Build enriched customer record
        StructuredRecord.Builder builder = StructuredRecord.builder(context.getOutputSchema());
        
        // Direct field mappings
        builder.set("customer_id", input.get("id"));
        builder.set("first_name", input.get("fname"));
        builder.set("last_name", input.get("lname"));
        
        // Derived fields
        String fullName = input.get("fname") + " " + input.get("lname");
        builder.set("full_name", fullName);
        
        String email = generateEmail(input.get("fname"), input.get("lname"));
        builder.set("email", email);
        
        // Lookup enrichment
        String customerId = input.get("id");
        CustomerProfile profile = lookupCustomerProfile(customerId);
        if (profile != null) {
            builder.set("segment", profile.getSegment());
            builder.set("lifetime_value", profile.getLifetimeValue());
        }
        
        // Record field lineage
        List<FieldOperation> operations = Arrays.asList(
            // Direct field reads
            new FieldReadOperation("read_customer_id", 
                                 "Read customer ID from source", 
                                 Arrays.asList("id")),
            new FieldReadOperation("read_names", 
                                 "Read customer names from source", 
                                 Arrays.asList("fname", "lname")),
            
            // Field transformations
            new FieldTransformOperation("map_customer_id", 
                                      "Map source ID to customer_id", 
                                      Arrays.asList("id"), 
                                      Arrays.asList("customer_id")),
            new FieldTransformOperation("map_first_name", 
                                      "Map fname to first_name", 
                                      Arrays.asList("fname"), 
                                      Arrays.asList("first_name")),
            new FieldTransformOperation("map_last_name", 
                                      "Map lname to last_name", 
                                      Arrays.asList("lname"), 
                                      Arrays.asList("last_name")),
            
            // Derived field operations
            new FieldTransformOperation("derive_full_name", 
                                      "Concatenate first and last name", 
                                      Arrays.asList("fname", "lname"), 
                                      Arrays.asList("full_name")),
            new FieldTransformOperation("generate_email", 
                                      "Generate email from names", 
                                      Arrays.asList("fname", "lname"), 
                                      Arrays.asList("email")),
            
            // Lookup operations
            new FieldTransformOperation("lookup_segment", 
                                      "Lookup customer segment", 
                                      Arrays.asList("id"), 
                                      Arrays.asList("segment")),
            new FieldTransformOperation("lookup_lifetime_value", 
                                      "Lookup customer lifetime value", 
                                      Arrays.asList("id"), 
                                      Arrays.asList("lifetime_value")),
            
            // Final write operation
            new FieldWriteOperation("write_enriched_customer", 
                                  "Write enriched customer record", 
                                  Arrays.asList("customer_id", "first_name", "last_name", 
                                              "full_name", "email", "segment", "lifetime_value"))
        );
        
        lineageRecorder.record(operations);
        
        emitter.emit(builder.build());
    }
}

FieldOperation

Base class for field operations in lineage tracking.

package io.cdap.cdap.etl.api.lineage.field;

public class FieldOperation {
    /**
     * Create field operation.
     */
    public FieldOperation(String name, String description, OperationType type, 
                         List<String> inputs, List<String> outputs) {}
    
    /**
     * Get operation name.
     */
    public String getName() {}
    
    /**
     * Get operation description.
     */
    public String getDescription() {}
    
    /**
     * Get operation type.
     */
    public OperationType getType() {}
    
    /**
     * Get input field names.
     */
    public List<String> getInputs() {}
    
    /**
     * Get output field names.
     */
    public List<String> getOutputs() {}
}

FieldReadOperation

Field operation for reading data from source fields.

package io.cdap.cdap.etl.api.lineage.field;

public class FieldReadOperation extends FieldOperation {
    /**
     * Create read operation for fields.
     */
    public FieldReadOperation(String name, String description, List<String> fields) {
        super(name, description, OperationType.READ, Collections.emptyList(), fields);
    }
}

FieldWriteOperation

Field operation for writing data to destination fields.

package io.cdap.cdap.etl.api.lineage.field;

public class FieldWriteOperation extends FieldOperation {
    /**
     * Create write operation for fields.
     */
    public FieldWriteOperation(String name, String description, List<String> fields) {
        super(name, description, OperationType.WRITE, fields, Collections.emptyList());
    }
}

FieldTransformOperation

Field operation for data transformations between fields.

package io.cdap.cdap.etl.api.lineage.field;

public class FieldTransformOperation extends FieldOperation {
    /**
     * Create transform operation from inputs to outputs.
     */
    public FieldTransformOperation(String name, String description, 
                                  List<String> inputs, List<String> outputs) {
        super(name, description, OperationType.TRANSFORM, inputs, outputs);
    }
}

OperationType

Enumeration of field operation types.

package io.cdap.cdap.etl.api.lineage.field;

public enum OperationType {
    READ,       // Read operation from source
    WRITE,      // Write operation to destination
    TRANSFORM   // Transformation between fields
}

Complex Lineage Tracking Examples

Data Aggregation Lineage

@Plugin(type = BatchAggregator.PLUGIN_TYPE)
@Name("SalesAggregator") 
public class SalesAggregator extends BatchAggregator<String, StructuredRecord, StructuredRecord> {
    
    @Override
    public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues,
                         Emitter<StructuredRecord> emitter) throws Exception {
        
        // Collect aggregation data
        double totalSales = 0.0;
        int orderCount = 0;
        double maxOrderValue = 0.0;
        double minOrderValue = Double.MAX_VALUE;
        Set<String> uniqueCustomers = new HashSet<>();
        
        while (groupValues.hasNext()) {
            StructuredRecord record = groupValues.next();
            Double salesAmount = record.get("sales_amount");
            String customerId = record.get("customer_id");
            
            if (salesAmount != null) {
                totalSales += salesAmount;
                orderCount++;
                maxOrderValue = Math.max(maxOrderValue, salesAmount);
                minOrderValue = Math.min(minOrderValue, salesAmount);
            }
            
            if (customerId != null) {
                uniqueCustomers.add(customerId);
            }
        }
        
        double avgOrderValue = orderCount > 0 ? totalSales / orderCount : 0.0;
        
        // Build result record
        StructuredRecord result = StructuredRecord.builder(getContext().getOutputSchema())
            .set("region", groupKey)
            .set("total_sales", totalSales)
            .set("order_count", orderCount)
            .set("avg_order_value", avgOrderValue)
            .set("max_order_value", maxOrderValue == 0.0 ? null : maxOrderValue)
            .set("min_order_value", minOrderValue == Double.MAX_VALUE ? null : minOrderValue)
            .set("unique_customers", uniqueCustomers.size())
            .build();
        
        // Record detailed lineage for aggregation
        BatchRuntimeContext context = getContext();
        LineageRecorder lineageRecorder = context.getLineageRecorder();
        
        List<FieldOperation> operations = Arrays.asList(
            // Input field reads
            new FieldReadOperation("read_sales_data", 
                                 "Read sales transaction data", 
                                 Arrays.asList("region", "sales_amount", "customer_id")),
            
            // Aggregation operations
            new FieldTransformOperation("group_by_region", 
                                      "Group sales data by region", 
                                      Arrays.asList("region"), 
                                      Arrays.asList("region")),
            new FieldTransformOperation("sum_sales_amount", 
                                      "Sum sales amounts for region", 
                                      Arrays.asList("sales_amount"), 
                                      Arrays.asList("total_sales")),
            new FieldTransformOperation("count_orders", 
                                      "Count number of orders", 
                                      Arrays.asList("sales_amount"), 
                                      Arrays.asList("order_count")),
            new FieldTransformOperation("calculate_avg_order", 
                                      "Calculate average order value", 
                                      Arrays.asList("sales_amount"), 
                                      Arrays.asList("avg_order_value")),
            new FieldTransformOperation("find_max_order", 
                                      "Find maximum order value", 
                                      Arrays.asList("sales_amount"), 
                                      Arrays.asList("max_order_value")),
            new FieldTransformOperation("find_min_order", 
                                      "Find minimum order value", 
                                      Arrays.asList("sales_amount"), 
                                      Arrays.asList("min_order_value")),
            new FieldTransformOperation("count_unique_customers", 
                                      "Count unique customers in region", 
                                      Arrays.asList("customer_id"), 
                                      Arrays.asList("unique_customers")),
            
            // Output write
            new FieldWriteOperation("write_aggregated_sales", 
                                  "Write aggregated sales summary", 
                                  Arrays.asList("region", "total_sales", "order_count", 
                                              "avg_order_value", "max_order_value", 
                                              "min_order_value", "unique_customers"))
        );
        
        lineageRecorder.record(operations);
        emitter.emit(result);
    }
}

Join Operation Lineage

@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)
@Name("CustomerOrderJoiner")
public class CustomerOrderJoiner extends BatchAutoJoiner {
    
    @Override
    public JoinDefinition define(AutoJoinerContext context) {
        // Join definition code...
        
        // Record lineage for join operation
        recordJoinLineage(context);
        
        return joinDefinition;
    }
    
    private void recordJoinLineage(AutoJoinerContext context) {
        // This would typically be called during runtime, but shown here for illustration
        List<FieldOperation> operations = Arrays.asList(
            // Read operations from each input stage
            new FieldReadOperation("read_customer_data", 
                                 "Read customer information", 
                                 Arrays.asList("customers.customer_id", "customers.name", 
                                             "customers.email", "customers.registration_date")),
            new FieldReadOperation("read_order_data", 
                                 "Read order information", 
                                 Arrays.asList("orders.order_id", "orders.customer_id", 
                                             "orders.amount", "orders.order_date")),
            
            // Join key matching
            new FieldTransformOperation("join_on_customer_id", 
                                      "Join customers and orders on customer_id", 
                                      Arrays.asList("customers.customer_id", "orders.customer_id"), 
                                      Arrays.asList("customer_id")),
            
            // Field selections and mappings
            new FieldTransformOperation("select_customer_info", 
                                      "Select customer information fields", 
                                      Arrays.asList("customers.customer_id", "customers.name", "customers.email"), 
                                      Arrays.asList("customer_id", "customer_name", "customer_email")),
            new FieldTransformOperation("select_order_info", 
                                      "Select order information fields", 
                                      Arrays.asList("orders.order_id", "orders.amount", "orders.order_date"), 
                                      Arrays.asList("order_id", "order_amount", "order_date")),
            
            // Derived fields
            new FieldTransformOperation("derive_customer_since", 
                                      "Map registration date to customer_since", 
                                      Arrays.asList("customers.registration_date"), 
                                      Arrays.asList("customer_since")),
            
            // Output write
            new FieldWriteOperation("write_joined_data", 
                                  "Write joined customer and order data", 
                                  Arrays.asList("customer_id", "customer_name", "customer_email", 
                                              "customer_since", "order_id", "order_amount", "order_date"))
        );
        
        // In actual implementation, this would be recorded during runtime
        // LineageRecorder would be available in the runtime context
    }
}

Multi-Stage Pipeline Lineage

public class PipelineLineageTracker {
    
    public static void recordPipelineLineage(PipelineContext pipelineContext, 
                                           Map<String, List<FieldOperation>> stageOperations) {
        LineageRecorder recorder = pipelineContext.getLineageRecorder();
        
        // Combine operations from all stages
        List<FieldOperation> allOperations = new ArrayList<>();
        
        // Add stage-specific operations
        for (Map.Entry<String, List<FieldOperation>> entry : stageOperations.entrySet()) {
            String stageName = entry.getKey();
            List<FieldOperation> operations = entry.getValue();
            
            // Prefix operation names with stage name for clarity
            List<FieldOperation> prefixedOperations = operations.stream()
                .map(op -> new FieldOperation(
                    stageName + "." + op.getName(),
                    "[" + stageName + "] " + op.getDescription(),
                    op.getType(),
                    op.getInputs(),
                    op.getOutputs()
                ))
                .collect(Collectors.toList());
            
            allOperations.addAll(prefixedOperations);
        }
        
        // Add pipeline-level operations
        allOperations.add(new FieldReadOperation("pipeline.source_read", 
                                               "Pipeline source data read", 
                                               Arrays.asList("raw_data.*")));
        allOperations.add(new FieldWriteOperation("pipeline.sink_write", 
                                                "Pipeline final data write", 
                                                Arrays.asList("processed_data.*")));
        
        // Record complete lineage
        recorder.record(allOperations);
    }
}

Lineage Utilities and Best Practices

Lineage Operation Builder

public class LineageOperationBuilder {
    
    public static class ReadOperationBuilder {
        private String name;
        private String description;
        private List<String> fields = new ArrayList<>();
        
        public ReadOperationBuilder name(String name) {
            this.name = name;
            return this;
        }
        
        public ReadOperationBuilder description(String description) {
            this.description = description;
            return this;
        }
        
        public ReadOperationBuilder fields(String... fields) {
            this.fields.addAll(Arrays.asList(fields));
            return this;
        }
        
        public FieldReadOperation build() {
            return new FieldReadOperation(name, description, fields);
        }
    }
    
    public static class TransformOperationBuilder {
        private String name;
        private String description;
        private List<String> inputs = new ArrayList<>();
        private List<String> outputs = new ArrayList<>();
        
        public TransformOperationBuilder name(String name) {
            this.name = name;
            return this;
        }
        
        public TransformOperationBuilder description(String description) {
            this.description = description;
            return this;
        }
        
        public TransformOperationBuilder inputs(String... inputs) {
            this.inputs.addAll(Arrays.asList(inputs));
            return this;
        }
        
        public TransformOperationBuilder outputs(String... outputs) {
            this.outputs.addAll(Arrays.asList(outputs));
            return this;
        }
        
        public FieldTransformOperation build() {
            return new FieldTransformOperation(name, description, inputs, outputs);
        }
    }
    
    public static ReadOperationBuilder read() {
        return new ReadOperationBuilder();
    }
    
    public static TransformOperationBuilder transform() {
        return new TransformOperationBuilder();
    }
    
    public static FieldWriteOperation write(String name, String description, String... fields) {
        return new FieldWriteOperation(name, description, Arrays.asList(fields));
    }
}

// Usage example:
List<FieldOperation> operations = Arrays.asList(
    LineageOperationBuilder.read()
        .name("read_source")
        .description("Read source customer data")
        .fields("customer_id", "first_name", "last_name", "email")
        .build(),
        
    LineageOperationBuilder.transform()
        .name("standardize_names")
        .description("Standardize name fields")
        .inputs("first_name", "last_name")
        .outputs("std_first_name", "std_last_name")
        .build(),
        
    LineageOperationBuilder.write("write_output", 
                                "Write standardized customer data",
                                "customer_id", "std_first_name", "std_last_name", "email")
);

Lineage Analysis Tools

public class LineageAnalyzer {
    
    public static Map<String, Set<String>> buildFieldDependencyGraph(List<FieldOperation> operations) {
        Map<String, Set<String>> dependencies = new HashMap<>();
        
        for (FieldOperation operation : operations) {
            if (operation.getType() == OperationType.TRANSFORM) {
                for (String output : operation.getOutputs()) {
                    dependencies.computeIfAbsent(output, k -> new HashSet<>())
                              .addAll(operation.getInputs());
                }
            }
        }
        
        return dependencies;
    }
    
    public static List<String> getFieldLineage(String fieldName, 
                                             Map<String, Set<String>> dependencyGraph) {
        List<String> lineage = new ArrayList<>();
        Set<String> visited = new HashSet<>();
        
        buildLineage(fieldName, dependencyGraph, lineage, visited);
        Collections.reverse(lineage); // Reverse to show source-to-target order
        
        return lineage;
    }
    
    private static void buildLineage(String fieldName, 
                                   Map<String, Set<String>> dependencyGraph,
                                   List<String> lineage, 
                                   Set<String> visited) {
        if (visited.contains(fieldName)) {
            return; // Avoid cycles
        }
        
        visited.add(fieldName);
        lineage.add(fieldName);
        
        Set<String> dependencies = dependencyGraph.get(fieldName);
        if (dependencies != null) {
            for (String dependency : dependencies) {
                buildLineage(dependency, dependencyGraph, lineage, visited);
            }
        }
    }
    
    public static Set<String> findImpactedFields(String sourceField, 
                                               Map<String, Set<String>> dependencyGraph) {
        Set<String> impacted = new HashSet<>();
        
        for (Map.Entry<String, Set<String>> entry : dependencyGraph.entrySet()) {
            if (entry.getValue().contains(sourceField)) {
                impacted.add(entry.getKey());
                // Recursively find fields impacted by this field
                impacted.addAll(findImpactedFields(entry.getKey(), dependencyGraph));
            }
        }
        
        return impacted;
    }
}

Metadata and Governance Integration

Metadata Enrichment

public class MetadataEnrichedLineage {
    
    public static class EnrichedFieldOperation extends FieldOperation {
        private final Map<String, Object> metadata;
        
        public EnrichedFieldOperation(FieldOperation baseOperation, 
                                    Map<String, Object> metadata) {
            super(baseOperation.getName(), baseOperation.getDescription(),
                  baseOperation.getType(), baseOperation.getInputs(), 
                  baseOperation.getOutputs());
            this.metadata = metadata;
        }
        
        public Map<String, Object> getMetadata() {
            return metadata;
        }
        
        public Object getMetadata(String key) {
            return metadata.get(key);
        }
    }
    
    public static EnrichedFieldOperation enrichWithMetadata(FieldOperation operation, 
                                                          Schema inputSchema, 
                                                          Schema outputSchema) {
        Map<String, Object> metadata = new HashMap<>();
        
        // Add schema information
        metadata.put("inputSchema", inputSchema != null ? inputSchema.toString() : null);
        metadata.put("outputSchema", outputSchema != null ? outputSchema.toString() : null);
        
        // Add field type information
        if (inputSchema != null) {
            Map<String, String> inputTypes = new HashMap<>();
            for (String fieldName : operation.getInputs()) {
                Schema.Field field = inputSchema.getField(fieldName);
                if (field != null) {
                    inputTypes.put(fieldName, field.getSchema().getType().toString());
                }
            }
            metadata.put("inputFieldTypes", inputTypes);
        }
        
        if (outputSchema != null) {
            Map<String, String> outputTypes = new HashMap<>();
            for (String fieldName : operation.getOutputs()) {
                Schema.Field field = outputSchema.getField(fieldName);
                if (field != null) {
                    outputTypes.put(fieldName, field.getSchema().getType().toString());
                }
            }
            metadata.put("outputFieldTypes", outputTypes);
        }
        
        // Add timestamp
        metadata.put("recordedAt", Instant.now().toString());
        
        // Add operation complexity
        metadata.put("complexity", calculateComplexity(operation));
        
        return new EnrichedFieldOperation(operation, metadata);
    }
    
    private static String calculateComplexity(FieldOperation operation) {
        int inputCount = operation.getInputs().size();
        int outputCount = operation.getOutputs().size();
        
        if (inputCount == 1 && outputCount == 1) {
            return "SIMPLE";
        } else if (inputCount > 1 && outputCount == 1) {
            return "AGGREGATION";
        } else if (inputCount == 1 && outputCount > 1) {
            return "EXPANSION";
        } else {
            return "COMPLEX";
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap-etl-api

docs

actions-conditions.md

batch-processing.md

core-pipeline.md

data-connectors.md

index.md

join-operations.md

lineage-metadata.md

sql-engine.md

validation.md

tile.json