CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform
—
Advanced join operations with automatic join optimization, comprehensive join definitions, and error handling for combining data from multiple inputs in CDAP ETL pipelines.
Interface for automatic join operations with intelligent optimization.
package io.cdap.cdap.etl.api.join;
public interface AutoJoiner {
/**
* Define the join operation with context information.
*/
JoinDefinition define(AutoJoinerContext context);
}Context interface providing input schemas and validation capabilities.
package io.cdap.cdap.etl.api.join;
public interface AutoJoinerContext {
/**
* Get input schemas for all stages.
*/
Map<String, Schema> getInputSchemas();
/**
* Get failure collector for validation.
*/
FailureCollector getFailureCollector();
}AutoJoiner Implementation Example:
@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)
@Name("SmartJoiner")
@Description("Automatically optimized join operation")
public class SmartJoiner extends BatchAutoJoiner {
private final Config config;
@Override
public JoinDefinition define(AutoJoinerContext context) {
Map<String, Schema> inputSchemas = context.getInputSchemas();
FailureCollector collector = context.getFailureCollector();
// Validate configuration
config.validate(collector, inputSchemas);
// Build join definition
JoinDefinition.Builder builder = JoinDefinition.builder();
// Configure join stages
List<JoinStage> stages = new ArrayList<>();
for (StageInfo stageInfo : config.stages) {
Schema inputSchema = inputSchemas.get(stageInfo.stageName);
if (inputSchema == null) {
collector.addFailure("Unknown stage: " + stageInfo.stageName, null);
continue;
}
// Determine if stage should be broadcast based on estimated size
boolean shouldBroadcast = shouldBroadcastStage(stageInfo, inputSchema);
JoinStage stage = new JoinStage(
stageInfo.stageName,
stageInfo.joinType,
stageInfo.getSelectedFields(inputSchema),
stageInfo.required,
shouldBroadcast
);
stages.add(stage);
}
builder.from(stages);
// Configure join keys
List<JoinKey> joinKeys = new ArrayList<>();
for (String stageName : inputSchemas.keySet()) {
List<String> keyFields = config.getJoinKeysForStage(stageName);
if (!keyFields.isEmpty()) {
joinKeys.add(new JoinKey(stageName, keyFields));
}
}
builder.on(joinKeys);
// Configure output fields
List<JoinField> outputFields = buildOutputFields(inputSchemas, collector);
builder.select(outputFields);
// Set distribution strategy if specified
if (config.distributionSize != null) {
builder.setDistribution(new JoinDistribution(config.distributionSize));
}
// Add join condition if specified
if (config.condition != null && !config.condition.isEmpty()) {
builder.setCondition(new JoinCondition(config.condition));
}
return builder.build();
}
private boolean shouldBroadcastStage(StageInfo stageInfo, Schema schema) {
// Simple heuristic: broadcast if estimated size is small
int estimatedRecords = stageInfo.estimatedRecords;
int fieldCount = schema.getFields().size();
// Estimate size (rough calculation)
long estimatedSizeBytes = (long) estimatedRecords * fieldCount * 50; // 50 bytes avg per field
// Broadcast if less than 100MB
return estimatedSizeBytes < 100 * 1024 * 1024;
}
}Comprehensive definition of join operation with all configuration options.
package io.cdap.cdap.etl.api.join;
public class JoinDefinition {
/**
* Create builder for join definition.
*/
public static Builder builder() {}
/**
* Get join stages.
*/
public List<JoinStage> getStages() {}
/**
* Get join keys.
*/
public List<JoinKey> getKeys() {}
/**
* Get output schema.
*/
public Schema getOutputSchema() {}
/**
* Get join condition.
*/
public JoinCondition getCondition() {}
/**
* Get distribution strategy.
*/
public JoinDistribution getDistribution() {}
}JoinDefinition Builder Usage:
JoinDefinition joinDef = JoinDefinition.builder()
.select(Arrays.asList(
new JoinField("customers", "id", "customer_id"),
new JoinField("customers", "name", "customer_name"),
new JoinField("customers", "email", "customer_email"),
new JoinField("orders", "id", "order_id"),
new JoinField("orders", "amount", "order_amount"),
new JoinField("orders", "date", "order_date")
))
.from(Arrays.asList(
new JoinStage("customers", JoinType.INNER,
Collections.emptyList(), true, false),
new JoinStage("orders", JoinType.LEFT_OUTER,
Collections.emptyList(), false, true)
))
.on(Arrays.asList(
new JoinKey("customers", Arrays.asList("id")),
new JoinKey("orders", Arrays.asList("customer_id"))
))
.setCondition(new JoinCondition("customers.status = 'active' AND orders.amount > 0"))
.setDistribution(new JoinDistribution(4))
.build();Definition of a stage participating in the join operation.
package io.cdap.cdap.etl.api.join;
public class JoinStage {
/**
* Create join stage with configuration.
*/
public JoinStage(String stageName, JoinType joinType, List<JoinField> fields,
boolean required, boolean broadcast) {}
/**
* Get stage name.
*/
public String getStageName() {}
/**
* Get join type for this stage.
*/
public JoinType getJoinType() {}
/**
* Get selected fields from this stage.
*/
public List<JoinField> getFields() {}
/**
* Check if stage is required for join.
*/
public boolean isRequired() {}
/**
* Check if stage should be broadcast.
*/
public boolean isBroadcast() {}
}Field definition in join operation with aliasing support.
package io.cdap.cdap.etl.api.join;
public class JoinField {
/**
* Create join field with stage name, field name, and alias.
*/
public JoinField(String stageName, String fieldName, String alias) {}
/**
* Get source stage name.
*/
public String getStageName() {}
/**
* Get source field name.
*/
public String getFieldName() {}
/**
* Get output field alias.
*/
public String getAlias() {}
}Key definition for join operations supporting composite keys.
package io.cdap.cdap.etl.api.join;
public class JoinKey {
/**
* Create join key for stage with field list.
*/
public JoinKey(String stageName, List<String> fields) {}
/**
* Get stage name.
*/
public String getStageName() {}
/**
* Get join key fields.
*/
public List<String> getFields() {}
}Advanced join condition with expression support.
package io.cdap.cdap.etl.api.join;
public class JoinCondition {
/**
* Create join condition with expression.
*/
public JoinCondition(String expression) {}
/**
* Get condition expression.
*/
public String getExpression() {}
}Distribution strategy for join optimization.
package io.cdap.cdap.etl.api.join;
public class JoinDistribution {
/**
* Create distribution with partition count.
*/
public JoinDistribution(int partitions) {}
/**
* Get number of partitions.
*/
public int getPartitions() {}
}Join operations support various join types:
public enum JoinType {
INNER, // Inner join - only matching records
LEFT_OUTER, // Left outer join - all records from left side
RIGHT_OUTER, // Right outer join - all records from right side
FULL_OUTER // Full outer join - all records from both sides
}@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)
@Name("CustomerAnalyticsJoiner")
public class CustomerAnalyticsJoiner extends BatchAutoJoiner {
@Override
public JoinDefinition define(AutoJoinerContext context) {
return JoinDefinition.builder()
// Select comprehensive customer view
.select(Arrays.asList(
// Customer information
new JoinField("customers", "customer_id", "customer_id"),
new JoinField("customers", "name", "customer_name"),
new JoinField("customers", "email", "customer_email"),
new JoinField("customers", "registration_date", "customer_since"),
// Order summary
new JoinField("orders", "total_orders", "total_orders"),
new JoinField("orders", "total_amount", "lifetime_value"),
new JoinField("orders", "last_order_date", "last_order_date"),
// Product preferences
new JoinField("preferences", "favorite_category", "favorite_category"),
new JoinField("preferences", "avg_rating", "avg_rating"),
// Support interactions
new JoinField("support", "ticket_count", "support_tickets"),
new JoinField("support", "satisfaction_score", "satisfaction_score")
))
// Define join stages with optimization hints
.from(Arrays.asList(
// Customers as the main table (required)
new JoinStage("customers", JoinType.INNER,
Collections.emptyList(), true, false),
// Orders aggregated (left join for customers without orders)
new JoinStage("orders", JoinType.LEFT_OUTER,
Collections.emptyList(), false, false),
// Product preferences (small lookup table - broadcast)
new JoinStage("preferences", JoinType.LEFT_OUTER,
Collections.emptyList(), false, true),
// Support data (left join - not all customers have tickets)
new JoinStage("support", JoinType.LEFT_OUTER,
Collections.emptyList(), false, false)
))
// Define join keys
.on(Arrays.asList(
new JoinKey("customers", Arrays.asList("customer_id")),
new JoinKey("orders", Arrays.asList("customer_id")),
new JoinKey("preferences", Arrays.asList("customer_id")),
new JoinKey("support", Arrays.asList("customer_id"))
))
// Add business logic conditions
.setCondition(new JoinCondition(
"customers.status = 'active' AND " +
"(orders.total_amount IS NULL OR orders.total_amount >= 0)"
))
// Optimize distribution
.setDistribution(new JoinDistribution(8))
.build();
}
}@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)
@Name("TimeSeriesJoiner")
public class TimeSeriesJoiner extends BatchAutoJoiner {
private final Config config;
@Override
public JoinDefinition define(AutoJoinerContext context) {
// Build time-based join for sensor data
return JoinDefinition.builder()
.select(Arrays.asList(
// Time dimension
new JoinField("timestamps", "timestamp", "event_time"),
new JoinField("timestamps", "hour", "hour"),
new JoinField("timestamps", "day", "day"),
// Sensor measurements
new JoinField("temperature", "value", "temperature"),
new JoinField("humidity", "value", "humidity"),
new JoinField("pressure", "value", "pressure"),
// Calculated fields
new JoinField("weather", "condition", "weather_condition"),
new JoinField("weather", "alert", "weather_alert")
))
.from(Arrays.asList(
// Time dimension table (main driver)
new JoinStage("timestamps", JoinType.INNER,
Collections.emptyList(), true, false),
// Sensor data (may have gaps)
new JoinStage("temperature", JoinType.LEFT_OUTER,
Collections.emptyList(), false, false),
new JoinStage("humidity", JoinType.LEFT_OUTER,
Collections.emptyList(), false, false),
new JoinStage("pressure", JoinType.LEFT_OUTER,
Collections.emptyList(), false, false),
// Weather data (external enrichment - broadcast)
new JoinStage("weather", JoinType.LEFT_OUTER,
Collections.emptyList(), false, true)
))
// Time-based join keys with tolerance
.on(Arrays.asList(
new JoinKey("timestamps", Arrays.asList("timestamp")),
new JoinKey("temperature", Arrays.asList("timestamp")),
new JoinKey("humidity", Arrays.asList("timestamp")),
new JoinKey("pressure", Arrays.asList("timestamp")),
new JoinKey("weather", Arrays.asList("timestamp"))
))
// Filter for valid time range
.setCondition(new JoinCondition(
"timestamps.timestamp >= '" + config.startTime + "' AND " +
"timestamps.timestamp <= '" + config.endTime + "'"
))
.setDistribution(new JoinDistribution(config.partitions))
.build();
}
}The join API provides comprehensive error handling classes in the io.cdap.cdap.etl.api.join.error package:
Base class for join operation errors.
package io.cdap.cdap.etl.api.join.error;
public class JoinError {
// Base error class for join operations
}Error related to broadcast join operations.
package io.cdap.cdap.etl.api.join.error;
public class BroadcastError extends JoinError {
// Errors in broadcast join configuration or execution
}Error related to distribution size configuration.
package io.cdap.cdap.etl.api.join.error;
public class DistributionSizeError extends JoinError {
// Errors in distribution size specification
}Error related to distribution stage configuration.
package io.cdap.cdap.etl.api.join.error;
public class DistributionStageError extends JoinError {
// Errors in stage distribution configuration
}Error in join expression conditions.
package io.cdap.cdap.etl.api.join.error;
public class ExpressionConditionError extends JoinError {
// Errors in join condition expressions
}Error related to join keys.
package io.cdap.cdap.etl.api.join.error;
public class JoinKeyError extends JoinError {
// Errors in join key specification
}Error in join key field specification.
package io.cdap.cdap.etl.api.join.error;
public class JoinKeyFieldError extends JoinError {
// Errors in join key field names or types
}Error in output schema definition.
package io.cdap.cdap.etl.api.join.error;
public class OutputSchemaError extends JoinError {
// Errors in output schema specification
}Error in selected field specification.
package io.cdap.cdap.etl.api.join.error;
public class SelectedFieldError extends JoinError {
// Errors in field selection for join output
}Exception for invalid join operations.
package io.cdap.cdap.etl.api.join;
public class InvalidJoinException extends Exception {
/**
* Exception thrown for invalid join configurations.
*/
public InvalidJoinException(String message) {}
public InvalidJoinException(String message, Throwable cause) {}
}public class JoinValidator {
public static void validateJoinDefinition(JoinDefinition joinDef,
Map<String, Schema> inputSchemas,
FailureCollector collector) {
// Validate stages
validateJoinStages(joinDef.getStages(), inputSchemas, collector);
// Validate join keys
validateJoinKeys(joinDef.getKeys(), inputSchemas, collector);
// Validate selected fields
validateSelectedFields(joinDef.getStages(), inputSchemas, collector);
// Validate join condition
if (joinDef.getCondition() != null) {
validateJoinCondition(joinDef.getCondition(), inputSchemas, collector);
}
// Validate distribution strategy
if (joinDef.getDistribution() != null) {
validateDistribution(joinDef.getDistribution(), collector);
}
}
private static void validateJoinStages(List<JoinStage> stages,
Map<String, Schema> inputSchemas,
FailureCollector collector) {
Set<String> stageNames = new HashSet<>();
boolean hasRequiredStage = false;
for (JoinStage stage : stages) {
String stageName = stage.getStageName();
// Check for duplicate stage names
if (stageNames.contains(stageName)) {
collector.addFailure("Duplicate stage name: " + stageName,
"Use unique stage names in join");
}
stageNames.add(stageName);
// Check if stage exists in input schemas
if (!inputSchemas.containsKey(stageName)) {
collector.addFailure("Unknown stage: " + stageName,
"Verify stage name exists in pipeline");
}
// Check if at least one stage is required
if (stage.isRequired()) {
hasRequiredStage = true;
}
// Validate broadcast hint
if (stage.isBroadcast() && stage.getJoinType() == JoinType.FULL_OUTER) {
collector.addFailure("Cannot broadcast stage with FULL_OUTER join: " + stageName,
"Use different join type or disable broadcast");
}
}
if (!hasRequiredStage) {
collector.addFailure("At least one stage must be required",
"Set required=true for main stage");
}
}
private static void validateJoinKeys(List<JoinKey> joinKeys,
Map<String, Schema> inputSchemas,
FailureCollector collector) {
if (joinKeys.isEmpty()) {
collector.addFailure("Join keys are required", "Specify join keys for stages");
return;
}
// Group keys by stage
Map<String, JoinKey> keysByStage = new HashMap<>();
for (JoinKey key : joinKeys) {
keysByStage.put(key.getStageName(), key);
}
// Validate each key
Set<List<Schema.Type>> keyTypes = new HashSet<>();
for (JoinKey key : joinKeys) {
String stageName = key.getStageName();
Schema schema = inputSchemas.get(stageName);
if (schema == null) {
collector.addFailure("Unknown stage in join key: " + stageName, null);
continue;
}
List<Schema.Type> stageKeyTypes = new ArrayList<>();
for (String fieldName : key.getFields()) {
Schema.Field field = schema.getField(fieldName);
if (field == null) {
collector.addFailure("Unknown field in join key: " + stageName + "." + fieldName,
"Verify field exists in stage schema");
} else {
stageKeyTypes.add(field.getSchema().isNullable() ?
field.getSchema().getNonNullable().getType() :
field.getSchema().getType());
}
}
keyTypes.add(stageKeyTypes);
}
// Validate key type compatibility
if (keyTypes.size() > 1) {
collector.addFailure("Join key types are not compatible across stages",
"Ensure all join keys have the same types");
}
}
}private boolean shouldBroadcastStage(String stageName, Schema schema,
Map<String, Object> stageProperties) {
// Check explicit broadcast hint
Object broadcastHint = stageProperties.get("broadcast");
if (Boolean.TRUE.equals(broadcastHint)) {
return true;
}
// Estimate stage size
Object recordCountHint = stageProperties.get("estimatedRecords");
if (recordCountHint instanceof Number) {
long estimatedRecords = ((Number) recordCountHint).longValue();
int fieldCount = schema.getFields().size();
// Rough size estimation (bytes)
long estimatedSize = estimatedRecords * fieldCount * 50; // 50 bytes avg per field
// Broadcast if less than 200MB
return estimatedSize < 200 * 1024 * 1024;
}
// Default: don't broadcast
return false;
}private int calculateOptimalPartitions(Map<String, Schema> inputSchemas,
JoinDefinition joinDef) {
// Calculate total estimated input size
long totalEstimatedSize = 0;
for (JoinStage stage : joinDef.getStages()) {
if (!stage.isBroadcast()) {
// Estimate non-broadcast stage sizes
totalEstimatedSize += estimateStageSize(stage.getStageName(), inputSchemas);
}
}
// Target ~128MB per partition
long targetPartitionSize = 128 * 1024 * 1024;
int calculatedPartitions = (int) Math.max(1, totalEstimatedSize / targetPartitionSize);
// Cap at reasonable limits
return Math.min(Math.max(calculatedPartitions, 1), 1000);
}Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-etl-api