CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform
—
Data lineage tracking and field-level transformations for governance, debugging, and compliance in CDAP ETL pipelines.
Enumeration of data access types for lineage tracking.
package io.cdap.cdap.etl.api.lineage;
public enum AccessType {
READ, // Data read operation
WRITE, // Data write operation
UNKNOWN // Unknown access type
}Interface for recording field-level lineage information during pipeline execution.
package io.cdap.cdap.etl.api.lineage.field;
public interface LineageRecorder {
/**
* Record field operations for lineage tracking.
*/
void record(List<FieldOperation> fieldOperations);
}Lineage Recording Example:
@Plugin(type = Transform.PLUGIN_TYPE)
@Name("CustomerDataEnricher")
public class CustomerDataEnricher extends Transform<StructuredRecord, StructuredRecord> {
private final Config config;
@Override
public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
TransformContext context = getContext();
LineageRecorder lineageRecorder = context.getLineageRecorder();
// Build enriched customer record
StructuredRecord.Builder builder = StructuredRecord.builder(context.getOutputSchema());
// Direct field mappings
builder.set("customer_id", input.get("id"));
builder.set("first_name", input.get("fname"));
builder.set("last_name", input.get("lname"));
// Derived fields
String fullName = input.get("fname") + " " + input.get("lname");
builder.set("full_name", fullName);
String email = generateEmail(input.get("fname"), input.get("lname"));
builder.set("email", email);
// Lookup enrichment
String customerId = input.get("id");
CustomerProfile profile = lookupCustomerProfile(customerId);
if (profile != null) {
builder.set("segment", profile.getSegment());
builder.set("lifetime_value", profile.getLifetimeValue());
}
// Record field lineage
List<FieldOperation> operations = Arrays.asList(
// Direct field reads
new FieldReadOperation("read_customer_id",
"Read customer ID from source",
Arrays.asList("id")),
new FieldReadOperation("read_names",
"Read customer names from source",
Arrays.asList("fname", "lname")),
// Field transformations
new FieldTransformOperation("map_customer_id",
"Map source ID to customer_id",
Arrays.asList("id"),
Arrays.asList("customer_id")),
new FieldTransformOperation("map_first_name",
"Map fname to first_name",
Arrays.asList("fname"),
Arrays.asList("first_name")),
new FieldTransformOperation("map_last_name",
"Map lname to last_name",
Arrays.asList("lname"),
Arrays.asList("last_name")),
// Derived field operations
new FieldTransformOperation("derive_full_name",
"Concatenate first and last name",
Arrays.asList("fname", "lname"),
Arrays.asList("full_name")),
new FieldTransformOperation("generate_email",
"Generate email from names",
Arrays.asList("fname", "lname"),
Arrays.asList("email")),
// Lookup operations
new FieldTransformOperation("lookup_segment",
"Lookup customer segment",
Arrays.asList("id"),
Arrays.asList("segment")),
new FieldTransformOperation("lookup_lifetime_value",
"Lookup customer lifetime value",
Arrays.asList("id"),
Arrays.asList("lifetime_value")),
// Final write operation
new FieldWriteOperation("write_enriched_customer",
"Write enriched customer record",
Arrays.asList("customer_id", "first_name", "last_name",
"full_name", "email", "segment", "lifetime_value"))
);
lineageRecorder.record(operations);
emitter.emit(builder.build());
}
}Base class for field operations in lineage tracking.
package io.cdap.cdap.etl.api.lineage.field;
public class FieldOperation {
/**
* Create field operation.
*/
public FieldOperation(String name, String description, OperationType type,
List<String> inputs, List<String> outputs) {}
/**
* Get operation name.
*/
public String getName() {}
/**
* Get operation description.
*/
public String getDescription() {}
/**
* Get operation type.
*/
public OperationType getType() {}
/**
* Get input field names.
*/
public List<String> getInputs() {}
/**
* Get output field names.
*/
public List<String> getOutputs() {}
}Field operation for reading data from source fields.
package io.cdap.cdap.etl.api.lineage.field;
public class FieldReadOperation extends FieldOperation {
/**
* Create read operation for fields.
*/
public FieldReadOperation(String name, String description, List<String> fields) {
super(name, description, OperationType.READ, Collections.emptyList(), fields);
}
}Field operation for writing data to destination fields.
package io.cdap.cdap.etl.api.lineage.field;
public class FieldWriteOperation extends FieldOperation {
/**
* Create write operation for fields.
*/
public FieldWriteOperation(String name, String description, List<String> fields) {
super(name, description, OperationType.WRITE, fields, Collections.emptyList());
}
}Field operation for data transformations between fields.
package io.cdap.cdap.etl.api.lineage.field;
public class FieldTransformOperation extends FieldOperation {
/**
* Create transform operation from inputs to outputs.
*/
public FieldTransformOperation(String name, String description,
List<String> inputs, List<String> outputs) {
super(name, description, OperationType.TRANSFORM, inputs, outputs);
}
}Enumeration of field operation types.
package io.cdap.cdap.etl.api.lineage.field;
public enum OperationType {
READ, // Read operation from source
WRITE, // Write operation to destination
TRANSFORM // Transformation between fields
}@Plugin(type = BatchAggregator.PLUGIN_TYPE)
@Name("SalesAggregator")
public class SalesAggregator extends BatchAggregator<String, StructuredRecord, StructuredRecord> {
@Override
public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues,
Emitter<StructuredRecord> emitter) throws Exception {
// Collect aggregation data
double totalSales = 0.0;
int orderCount = 0;
double maxOrderValue = 0.0;
double minOrderValue = Double.MAX_VALUE;
Set<String> uniqueCustomers = new HashSet<>();
while (groupValues.hasNext()) {
StructuredRecord record = groupValues.next();
Double salesAmount = record.get("sales_amount");
String customerId = record.get("customer_id");
if (salesAmount != null) {
totalSales += salesAmount;
orderCount++;
maxOrderValue = Math.max(maxOrderValue, salesAmount);
minOrderValue = Math.min(minOrderValue, salesAmount);
}
if (customerId != null) {
uniqueCustomers.add(customerId);
}
}
double avgOrderValue = orderCount > 0 ? totalSales / orderCount : 0.0;
// Build result record
StructuredRecord result = StructuredRecord.builder(getContext().getOutputSchema())
.set("region", groupKey)
.set("total_sales", totalSales)
.set("order_count", orderCount)
.set("avg_order_value", avgOrderValue)
.set("max_order_value", maxOrderValue == 0.0 ? null : maxOrderValue)
.set("min_order_value", minOrderValue == Double.MAX_VALUE ? null : minOrderValue)
.set("unique_customers", uniqueCustomers.size())
.build();
// Record detailed lineage for aggregation
BatchRuntimeContext context = getContext();
LineageRecorder lineageRecorder = context.getLineageRecorder();
List<FieldOperation> operations = Arrays.asList(
// Input field reads
new FieldReadOperation("read_sales_data",
"Read sales transaction data",
Arrays.asList("region", "sales_amount", "customer_id")),
// Aggregation operations
new FieldTransformOperation("group_by_region",
"Group sales data by region",
Arrays.asList("region"),
Arrays.asList("region")),
new FieldTransformOperation("sum_sales_amount",
"Sum sales amounts for region",
Arrays.asList("sales_amount"),
Arrays.asList("total_sales")),
new FieldTransformOperation("count_orders",
"Count number of orders",
Arrays.asList("sales_amount"),
Arrays.asList("order_count")),
new FieldTransformOperation("calculate_avg_order",
"Calculate average order value",
Arrays.asList("sales_amount"),
Arrays.asList("avg_order_value")),
new FieldTransformOperation("find_max_order",
"Find maximum order value",
Arrays.asList("sales_amount"),
Arrays.asList("max_order_value")),
new FieldTransformOperation("find_min_order",
"Find minimum order value",
Arrays.asList("sales_amount"),
Arrays.asList("min_order_value")),
new FieldTransformOperation("count_unique_customers",
"Count unique customers in region",
Arrays.asList("customer_id"),
Arrays.asList("unique_customers")),
// Output write
new FieldWriteOperation("write_aggregated_sales",
"Write aggregated sales summary",
Arrays.asList("region", "total_sales", "order_count",
"avg_order_value", "max_order_value",
"min_order_value", "unique_customers"))
);
lineageRecorder.record(operations);
emitter.emit(result);
}
}@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)
@Name("CustomerOrderJoiner")
public class CustomerOrderJoiner extends BatchAutoJoiner {
@Override
public JoinDefinition define(AutoJoinerContext context) {
// Join definition code...
// Record lineage for join operation
recordJoinLineage(context);
return joinDefinition;
}
private void recordJoinLineage(AutoJoinerContext context) {
// This would typically be called during runtime, but shown here for illustration
List<FieldOperation> operations = Arrays.asList(
// Read operations from each input stage
new FieldReadOperation("read_customer_data",
"Read customer information",
Arrays.asList("customers.customer_id", "customers.name",
"customers.email", "customers.registration_date")),
new FieldReadOperation("read_order_data",
"Read order information",
Arrays.asList("orders.order_id", "orders.customer_id",
"orders.amount", "orders.order_date")),
// Join key matching
new FieldTransformOperation("join_on_customer_id",
"Join customers and orders on customer_id",
Arrays.asList("customers.customer_id", "orders.customer_id"),
Arrays.asList("customer_id")),
// Field selections and mappings
new FieldTransformOperation("select_customer_info",
"Select customer information fields",
Arrays.asList("customers.customer_id", "customers.name", "customers.email"),
Arrays.asList("customer_id", "customer_name", "customer_email")),
new FieldTransformOperation("select_order_info",
"Select order information fields",
Arrays.asList("orders.order_id", "orders.amount", "orders.order_date"),
Arrays.asList("order_id", "order_amount", "order_date")),
// Derived fields
new FieldTransformOperation("derive_customer_since",
"Map registration date to customer_since",
Arrays.asList("customers.registration_date"),
Arrays.asList("customer_since")),
// Output write
new FieldWriteOperation("write_joined_data",
"Write joined customer and order data",
Arrays.asList("customer_id", "customer_name", "customer_email",
"customer_since", "order_id", "order_amount", "order_date"))
);
// In actual implementation, this would be recorded during runtime
// LineageRecorder would be available in the runtime context
}
}public class PipelineLineageTracker {
public static void recordPipelineLineage(PipelineContext pipelineContext,
Map<String, List<FieldOperation>> stageOperations) {
LineageRecorder recorder = pipelineContext.getLineageRecorder();
// Combine operations from all stages
List<FieldOperation> allOperations = new ArrayList<>();
// Add stage-specific operations
for (Map.Entry<String, List<FieldOperation>> entry : stageOperations.entrySet()) {
String stageName = entry.getKey();
List<FieldOperation> operations = entry.getValue();
// Prefix operation names with stage name for clarity
List<FieldOperation> prefixedOperations = operations.stream()
.map(op -> new FieldOperation(
stageName + "." + op.getName(),
"[" + stageName + "] " + op.getDescription(),
op.getType(),
op.getInputs(),
op.getOutputs()
))
.collect(Collectors.toList());
allOperations.addAll(prefixedOperations);
}
// Add pipeline-level operations
allOperations.add(new FieldReadOperation("pipeline.source_read",
"Pipeline source data read",
Arrays.asList("raw_data.*")));
allOperations.add(new FieldWriteOperation("pipeline.sink_write",
"Pipeline final data write",
Arrays.asList("processed_data.*")));
// Record complete lineage
recorder.record(allOperations);
}
}public class LineageOperationBuilder {
public static class ReadOperationBuilder {
private String name;
private String description;
private List<String> fields = new ArrayList<>();
public ReadOperationBuilder name(String name) {
this.name = name;
return this;
}
public ReadOperationBuilder description(String description) {
this.description = description;
return this;
}
public ReadOperationBuilder fields(String... fields) {
this.fields.addAll(Arrays.asList(fields));
return this;
}
public FieldReadOperation build() {
return new FieldReadOperation(name, description, fields);
}
}
public static class TransformOperationBuilder {
private String name;
private String description;
private List<String> inputs = new ArrayList<>();
private List<String> outputs = new ArrayList<>();
public TransformOperationBuilder name(String name) {
this.name = name;
return this;
}
public TransformOperationBuilder description(String description) {
this.description = description;
return this;
}
public TransformOperationBuilder inputs(String... inputs) {
this.inputs.addAll(Arrays.asList(inputs));
return this;
}
public TransformOperationBuilder outputs(String... outputs) {
this.outputs.addAll(Arrays.asList(outputs));
return this;
}
public FieldTransformOperation build() {
return new FieldTransformOperation(name, description, inputs, outputs);
}
}
public static ReadOperationBuilder read() {
return new ReadOperationBuilder();
}
public static TransformOperationBuilder transform() {
return new TransformOperationBuilder();
}
public static FieldWriteOperation write(String name, String description, String... fields) {
return new FieldWriteOperation(name, description, Arrays.asList(fields));
}
}
// Usage example:
List<FieldOperation> operations = Arrays.asList(
LineageOperationBuilder.read()
.name("read_source")
.description("Read source customer data")
.fields("customer_id", "first_name", "last_name", "email")
.build(),
LineageOperationBuilder.transform()
.name("standardize_names")
.description("Standardize name fields")
.inputs("first_name", "last_name")
.outputs("std_first_name", "std_last_name")
.build(),
LineageOperationBuilder.write("write_output",
"Write standardized customer data",
"customer_id", "std_first_name", "std_last_name", "email")
);public class LineageAnalyzer {
public static Map<String, Set<String>> buildFieldDependencyGraph(List<FieldOperation> operations) {
Map<String, Set<String>> dependencies = new HashMap<>();
for (FieldOperation operation : operations) {
if (operation.getType() == OperationType.TRANSFORM) {
for (String output : operation.getOutputs()) {
dependencies.computeIfAbsent(output, k -> new HashSet<>())
.addAll(operation.getInputs());
}
}
}
return dependencies;
}
public static List<String> getFieldLineage(String fieldName,
Map<String, Set<String>> dependencyGraph) {
List<String> lineage = new ArrayList<>();
Set<String> visited = new HashSet<>();
buildLineage(fieldName, dependencyGraph, lineage, visited);
Collections.reverse(lineage); // Reverse to show source-to-target order
return lineage;
}
private static void buildLineage(String fieldName,
Map<String, Set<String>> dependencyGraph,
List<String> lineage,
Set<String> visited) {
if (visited.contains(fieldName)) {
return; // Avoid cycles
}
visited.add(fieldName);
lineage.add(fieldName);
Set<String> dependencies = dependencyGraph.get(fieldName);
if (dependencies != null) {
for (String dependency : dependencies) {
buildLineage(dependency, dependencyGraph, lineage, visited);
}
}
}
public static Set<String> findImpactedFields(String sourceField,
Map<String, Set<String>> dependencyGraph) {
Set<String> impacted = new HashSet<>();
for (Map.Entry<String, Set<String>> entry : dependencyGraph.entrySet()) {
if (entry.getValue().contains(sourceField)) {
impacted.add(entry.getKey());
// Recursively find fields impacted by this field
impacted.addAll(findImpactedFields(entry.getKey(), dependencyGraph));
}
}
return impacted;
}
}public class MetadataEnrichedLineage {
public static class EnrichedFieldOperation extends FieldOperation {
private final Map<String, Object> metadata;
public EnrichedFieldOperation(FieldOperation baseOperation,
Map<String, Object> metadata) {
super(baseOperation.getName(), baseOperation.getDescription(),
baseOperation.getType(), baseOperation.getInputs(),
baseOperation.getOutputs());
this.metadata = metadata;
}
public Map<String, Object> getMetadata() {
return metadata;
}
public Object getMetadata(String key) {
return metadata.get(key);
}
}
public static EnrichedFieldOperation enrichWithMetadata(FieldOperation operation,
Schema inputSchema,
Schema outputSchema) {
Map<String, Object> metadata = new HashMap<>();
// Add schema information
metadata.put("inputSchema", inputSchema != null ? inputSchema.toString() : null);
metadata.put("outputSchema", outputSchema != null ? outputSchema.toString() : null);
// Add field type information
if (inputSchema != null) {
Map<String, String> inputTypes = new HashMap<>();
for (String fieldName : operation.getInputs()) {
Schema.Field field = inputSchema.getField(fieldName);
if (field != null) {
inputTypes.put(fieldName, field.getSchema().getType().toString());
}
}
metadata.put("inputFieldTypes", inputTypes);
}
if (outputSchema != null) {
Map<String, String> outputTypes = new HashMap<>();
for (String fieldName : operation.getOutputs()) {
Schema.Field field = outputSchema.getField(fieldName);
if (field != null) {
outputTypes.put(fieldName, field.getSchema().getType().toString());
}
}
metadata.put("outputFieldTypes", outputTypes);
}
// Add timestamp
metadata.put("recordedAt", Instant.now().toString());
// Add operation complexity
metadata.put("complexity", calculateComplexity(operation));
return new EnrichedFieldOperation(operation, metadata);
}
private static String calculateComplexity(FieldOperation operation) {
int inputCount = operation.getInputs().size();
int outputCount = operation.getOutputs().size();
if (inputCount == 1 && outputCount == 1) {
return "SIMPLE";
} else if (inputCount > 1 && outputCount == 1) {
return "AGGREGATION";
} else if (inputCount == 1 && outputCount > 1) {
return "EXPANSION";
} else {
return "COMPLEX";
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-etl-api