CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap-etl-api

CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform

Pending
Overview
Eval results
Files

join-operations.mddocs/

Join Operations

Advanced join operations with automatic join optimization, comprehensive join definitions, and error handling for combining data from multiple inputs in CDAP ETL pipelines.

Core Join Interfaces

AutoJoiner

Interface for automatic join operations with intelligent optimization.

package io.cdap.cdap.etl.api.join;

public interface AutoJoiner {
    /**
     * Define the join operation with context information.
     */
    JoinDefinition define(AutoJoinerContext context);
}

AutoJoinerContext

Context interface providing input schemas and validation capabilities.

package io.cdap.cdap.etl.api.join;

public interface AutoJoinerContext {
    /**
     * Get input schemas for all stages.
     */
    Map<String, Schema> getInputSchemas();
    
    /**
     * Get failure collector for validation.
     */
    FailureCollector getFailureCollector();
}

AutoJoiner Implementation Example:

@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)
@Name("SmartJoiner")
@Description("Automatically optimized join operation")
public class SmartJoiner extends BatchAutoJoiner {
    
    private final Config config;
    
    @Override
    public JoinDefinition define(AutoJoinerContext context) {
        Map<String, Schema> inputSchemas = context.getInputSchemas();
        FailureCollector collector = context.getFailureCollector();
        
        // Validate configuration
        config.validate(collector, inputSchemas);
        
        // Build join definition
        JoinDefinition.Builder builder = JoinDefinition.builder();
        
        // Configure join stages
        List<JoinStage> stages = new ArrayList<>();
        for (StageInfo stageInfo : config.stages) {
            Schema inputSchema = inputSchemas.get(stageInfo.stageName);
            if (inputSchema == null) {
                collector.addFailure("Unknown stage: " + stageInfo.stageName, null);
                continue;
            }
            
            // Determine if stage should be broadcast based on estimated size
            boolean shouldBroadcast = shouldBroadcastStage(stageInfo, inputSchema);
            
            JoinStage stage = new JoinStage(
                stageInfo.stageName,
                stageInfo.joinType,
                stageInfo.getSelectedFields(inputSchema),
                stageInfo.required,
                shouldBroadcast
            );
            stages.add(stage);
        }
        
        builder.from(stages);
        
        // Configure join keys
        List<JoinKey> joinKeys = new ArrayList<>();
        for (String stageName : inputSchemas.keySet()) {
            List<String> keyFields = config.getJoinKeysForStage(stageName);
            if (!keyFields.isEmpty()) {
                joinKeys.add(new JoinKey(stageName, keyFields));
            }
        }
        builder.on(joinKeys);
        
        // Configure output fields
        List<JoinField> outputFields = buildOutputFields(inputSchemas, collector);
        builder.select(outputFields);
        
        // Set distribution strategy if specified
        if (config.distributionSize != null) {
            builder.setDistribution(new JoinDistribution(config.distributionSize));
        }
        
        // Add join condition if specified
        if (config.condition != null && !config.condition.isEmpty()) {
            builder.setCondition(new JoinCondition(config.condition));
        }
        
        return builder.build();
    }
    
    private boolean shouldBroadcastStage(StageInfo stageInfo, Schema schema) {
        // Simple heuristic: broadcast if estimated size is small
        int estimatedRecords = stageInfo.estimatedRecords;
        int fieldCount = schema.getFields().size();
        
        // Estimate size (rough calculation)
        long estimatedSizeBytes = (long) estimatedRecords * fieldCount * 50; // 50 bytes avg per field
        
        // Broadcast if less than 100MB
        return estimatedSizeBytes < 100 * 1024 * 1024;
    }
}

Join Configuration

JoinDefinition

Comprehensive definition of join operation with all configuration options.

package io.cdap.cdap.etl.api.join;

public class JoinDefinition {
    /**
     * Create builder for join definition.
     */
    public static Builder builder() {}
    
    /**
     * Get join stages.
     */
    public List<JoinStage> getStages() {}
    
    /**
     * Get join keys.
     */
    public List<JoinKey> getKeys() {}
    
    /**
     * Get output schema.
     */
    public Schema getOutputSchema() {}
    
    /**
     * Get join condition.
     */
    public JoinCondition getCondition() {}
    
    /**
     * Get distribution strategy.
     */
    public JoinDistribution getDistribution() {}
}

JoinDefinition Builder Usage:

JoinDefinition joinDef = JoinDefinition.builder()
    .select(Arrays.asList(
        new JoinField("customers", "id", "customer_id"),
        new JoinField("customers", "name", "customer_name"),
        new JoinField("customers", "email", "customer_email"),
        new JoinField("orders", "id", "order_id"),
        new JoinField("orders", "amount", "order_amount"),
        new JoinField("orders", "date", "order_date")
    ))
    .from(Arrays.asList(
        new JoinStage("customers", JoinType.INNER, 
                     Collections.emptyList(), true, false),
        new JoinStage("orders", JoinType.LEFT_OUTER, 
                     Collections.emptyList(), false, true)
    ))
    .on(Arrays.asList(
        new JoinKey("customers", Arrays.asList("id")),
        new JoinKey("orders", Arrays.asList("customer_id"))
    ))
    .setCondition(new JoinCondition("customers.status = 'active' AND orders.amount > 0"))
    .setDistribution(new JoinDistribution(4))
    .build();

JoinStage

Definition of a stage participating in the join operation.

package io.cdap.cdap.etl.api.join;

public class JoinStage {
    /**
     * Create join stage with configuration.
     */
    public JoinStage(String stageName, JoinType joinType, List<JoinField> fields, 
                    boolean required, boolean broadcast) {}
    
    /**
     * Get stage name.
     */
    public String getStageName() {}
    
    /**
     * Get join type for this stage.
     */
    public JoinType getJoinType() {}
    
    /**
     * Get selected fields from this stage.
     */
    public List<JoinField> getFields() {}
    
    /**
     * Check if stage is required for join.
     */
    public boolean isRequired() {}
    
    /**
     * Check if stage should be broadcast.
     */
    public boolean isBroadcast() {}
}

JoinField

Field definition in join operation with aliasing support.

package io.cdap.cdap.etl.api.join;

public class JoinField {
    /**
     * Create join field with stage name, field name, and alias.
     */
    public JoinField(String stageName, String fieldName, String alias) {}
    
    /**
     * Get source stage name.
     */
    public String getStageName() {}
    
    /**
     * Get source field name.
     */
    public String getFieldName() {}
    
    /**
     * Get output field alias.
     */
    public String getAlias() {}
}

JoinKey

Key definition for join operations supporting composite keys.

package io.cdap.cdap.etl.api.join;

public class JoinKey {
    /**
     * Create join key for stage with field list.
     */
    public JoinKey(String stageName, List<String> fields) {}
    
    /**
     * Get stage name.
     */
    public String getStageName() {}
    
    /**
     * Get join key fields.
     */
    public List<String> getFields() {}
}

JoinCondition

Advanced join condition with expression support.

package io.cdap.cdap.etl.api.join;

public class JoinCondition {
    /**
     * Create join condition with expression.
     */
    public JoinCondition(String expression) {}
    
    /**
     * Get condition expression.
     */
    public String getExpression() {}
}

JoinDistribution

Distribution strategy for join optimization.

package io.cdap.cdap.etl.api.join;

public class JoinDistribution {
    /**
     * Create distribution with partition count.
     */
    public JoinDistribution(int partitions) {}
    
    /**
     * Get number of partitions.
     */
    public int getPartitions() {}
}

Join Types

Join operations support various join types:

public enum JoinType {
    INNER,          // Inner join - only matching records
    LEFT_OUTER,     // Left outer join - all records from left side
    RIGHT_OUTER,    // Right outer join - all records from right side  
    FULL_OUTER      // Full outer join - all records from both sides
}

Complex Join Examples

Multi-Table Customer Analytics Join

@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)
@Name("CustomerAnalyticsJoiner")
public class CustomerAnalyticsJoiner extends BatchAutoJoiner {
    
    @Override
    public JoinDefinition define(AutoJoinerContext context) {
        return JoinDefinition.builder()
            // Select comprehensive customer view
            .select(Arrays.asList(
                // Customer information
                new JoinField("customers", "customer_id", "customer_id"),
                new JoinField("customers", "name", "customer_name"),
                new JoinField("customers", "email", "customer_email"),
                new JoinField("customers", "registration_date", "customer_since"),
                
                // Order summary
                new JoinField("orders", "total_orders", "total_orders"),
                new JoinField("orders", "total_amount", "lifetime_value"),
                new JoinField("orders", "last_order_date", "last_order_date"),
                
                // Product preferences
                new JoinField("preferences", "favorite_category", "favorite_category"),
                new JoinField("preferences", "avg_rating", "avg_rating"),
                
                // Support interactions
                new JoinField("support", "ticket_count", "support_tickets"),
                new JoinField("support", "satisfaction_score", "satisfaction_score")
            ))
            
            // Define join stages with optimization hints
            .from(Arrays.asList(
                // Customers as the main table (required)
                new JoinStage("customers", JoinType.INNER, 
                            Collections.emptyList(), true, false),
                            
                // Orders aggregated (left join for customers without orders)
                new JoinStage("orders", JoinType.LEFT_OUTER, 
                            Collections.emptyList(), false, false),
                            
                // Product preferences (small lookup table - broadcast)
                new JoinStage("preferences", JoinType.LEFT_OUTER, 
                            Collections.emptyList(), false, true),
                            
                // Support data (left join - not all customers have tickets)
                new JoinStage("support", JoinType.LEFT_OUTER, 
                            Collections.emptyList(), false, false)
            ))
            
            // Define join keys
            .on(Arrays.asList(
                new JoinKey("customers", Arrays.asList("customer_id")),
                new JoinKey("orders", Arrays.asList("customer_id")),
                new JoinKey("preferences", Arrays.asList("customer_id")),
                new JoinKey("support", Arrays.asList("customer_id"))
            ))
            
            // Add business logic conditions
            .setCondition(new JoinCondition(
                "customers.status = 'active' AND " +
                "(orders.total_amount IS NULL OR orders.total_amount >= 0)"
            ))
            
            // Optimize distribution
            .setDistribution(new JoinDistribution(8))
            .build();
    }
}

Time-Series Data Join with Window Functions

@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)
@Name("TimeSeriesJoiner")
public class TimeSeriesJoiner extends BatchAutoJoiner {
    
    private final Config config;
    
    @Override
    public JoinDefinition define(AutoJoinerContext context) {
        // Build time-based join for sensor data
        return JoinDefinition.builder()
            .select(Arrays.asList(
                // Time dimension
                new JoinField("timestamps", "timestamp", "event_time"),
                new JoinField("timestamps", "hour", "hour"),
                new JoinField("timestamps", "day", "day"),
                
                // Sensor measurements
                new JoinField("temperature", "value", "temperature"),
                new JoinField("humidity", "value", "humidity"),
                new JoinField("pressure", "value", "pressure"),
                
                // Calculated fields
                new JoinField("weather", "condition", "weather_condition"),
                new JoinField("weather", "alert", "weather_alert")
            ))
            
            .from(Arrays.asList(
                // Time dimension table (main driver)
                new JoinStage("timestamps", JoinType.INNER, 
                            Collections.emptyList(), true, false),
                            
                // Sensor data (may have gaps)
                new JoinStage("temperature", JoinType.LEFT_OUTER, 
                            Collections.emptyList(), false, false),
                new JoinStage("humidity", JoinType.LEFT_OUTER, 
                            Collections.emptyList(), false, false),
                new JoinStage("pressure", JoinType.LEFT_OUTER, 
                            Collections.emptyList(), false, false),
                            
                // Weather data (external enrichment - broadcast)
                new JoinStage("weather", JoinType.LEFT_OUTER, 
                            Collections.emptyList(), false, true)
            ))
            
            // Time-based join keys with tolerance
            .on(Arrays.asList(
                new JoinKey("timestamps", Arrays.asList("timestamp")),
                new JoinKey("temperature", Arrays.asList("timestamp")),
                new JoinKey("humidity", Arrays.asList("timestamp")),  
                new JoinKey("pressure", Arrays.asList("timestamp")),
                new JoinKey("weather", Arrays.asList("timestamp"))
            ))
            
            // Filter for valid time range
            .setCondition(new JoinCondition(
                "timestamps.timestamp >= '" + config.startTime + "' AND " +
                "timestamps.timestamp <= '" + config.endTime + "'"
            ))
            
            .setDistribution(new JoinDistribution(config.partitions))
            .build();
    }
}

Join Error Handling

The join API provides comprehensive error handling classes in the io.cdap.cdap.etl.api.join.error package:

JoinError

Base class for join operation errors.

package io.cdap.cdap.etl.api.join.error;

public class JoinError {
    // Base error class for join operations
}

Specific Join Errors

BroadcastError

Error related to broadcast join operations.

package io.cdap.cdap.etl.api.join.error;

public class BroadcastError extends JoinError {
    // Errors in broadcast join configuration or execution
}

DistributionSizeError

Error related to distribution size configuration.

package io.cdap.cdap.etl.api.join.error;

public class DistributionSizeError extends JoinError {
    // Errors in distribution size specification
}

DistributionStageError

Error related to distribution stage configuration.

package io.cdap.cdap.etl.api.join.error;

public class DistributionStageError extends JoinError {
    // Errors in stage distribution configuration
}

ExpressionConditionError

Error in join expression conditions.

package io.cdap.cdap.etl.api.join.error;

public class ExpressionConditionError extends JoinError {
    // Errors in join condition expressions
}

JoinKeyError

Error related to join keys.

package io.cdap.cdap.etl.api.join.error;

public class JoinKeyError extends JoinError {
    // Errors in join key specification
}

JoinKeyFieldError

Error in join key field specification.

package io.cdap.cdap.etl.api.join.error;

public class JoinKeyFieldError extends JoinError {
    // Errors in join key field names or types
}

OutputSchemaError

Error in output schema definition.

package io.cdap.cdap.etl.api.join.error;

public class OutputSchemaError extends JoinError {
    // Errors in output schema specification
}

SelectedFieldError

Error in selected field specification.

package io.cdap.cdap.etl.api.join.error;

public class SelectedFieldError extends JoinError {
    // Errors in field selection for join output
}

InvalidJoinException

Exception for invalid join operations.

package io.cdap.cdap.etl.api.join;

public class InvalidJoinException extends Exception {
    /**
     * Exception thrown for invalid join configurations.
     */
    public InvalidJoinException(String message) {}
    public InvalidJoinException(String message, Throwable cause) {}
}

Join Validation and Error Handling

Comprehensive Join Validation

public class JoinValidator {
    
    public static void validateJoinDefinition(JoinDefinition joinDef, 
                                            Map<String, Schema> inputSchemas,
                                            FailureCollector collector) {
        // Validate stages
        validateJoinStages(joinDef.getStages(), inputSchemas, collector);
        
        // Validate join keys
        validateJoinKeys(joinDef.getKeys(), inputSchemas, collector);
        
        // Validate selected fields
        validateSelectedFields(joinDef.getStages(), inputSchemas, collector);
        
        // Validate join condition
        if (joinDef.getCondition() != null) {
            validateJoinCondition(joinDef.getCondition(), inputSchemas, collector);
        }
        
        // Validate distribution strategy
        if (joinDef.getDistribution() != null) {
            validateDistribution(joinDef.getDistribution(), collector);
        }
    }
    
    private static void validateJoinStages(List<JoinStage> stages, 
                                         Map<String, Schema> inputSchemas,
                                         FailureCollector collector) {
        Set<String> stageNames = new HashSet<>();
        boolean hasRequiredStage = false;
        
        for (JoinStage stage : stages) {
            String stageName = stage.getStageName();
            
            // Check for duplicate stage names
            if (stageNames.contains(stageName)) {
                collector.addFailure("Duplicate stage name: " + stageName, 
                                   "Use unique stage names in join");
            }
            stageNames.add(stageName);
            
            // Check if stage exists in input schemas
            if (!inputSchemas.containsKey(stageName)) {
                collector.addFailure("Unknown stage: " + stageName, 
                                   "Verify stage name exists in pipeline");
            }
            
            // Check if at least one stage is required
            if (stage.isRequired()) {
                hasRequiredStage = true;
            }
            
            // Validate broadcast hint
            if (stage.isBroadcast() && stage.getJoinType() == JoinType.FULL_OUTER) {
                collector.addFailure("Cannot broadcast stage with FULL_OUTER join: " + stageName,
                                   "Use different join type or disable broadcast");
            }
        }
        
        if (!hasRequiredStage) {
            collector.addFailure("At least one stage must be required", 
                               "Set required=true for main stage");
        }
    }
    
    private static void validateJoinKeys(List<JoinKey> joinKeys, 
                                       Map<String, Schema> inputSchemas,
                                       FailureCollector collector) {
        if (joinKeys.isEmpty()) {
            collector.addFailure("Join keys are required", "Specify join keys for stages");
            return;
        }
        
        // Group keys by stage
        Map<String, JoinKey> keysByStage = new HashMap<>();
        for (JoinKey key : joinKeys) {
            keysByStage.put(key.getStageName(), key);
        }
        
        // Validate each key
        Set<List<Schema.Type>> keyTypes = new HashSet<>();
        for (JoinKey key : joinKeys) {
            String stageName = key.getStageName();
            Schema schema = inputSchemas.get(stageName);
            
            if (schema == null) {
                collector.addFailure("Unknown stage in join key: " + stageName, null);
                continue;
            }
            
            List<Schema.Type> stageKeyTypes = new ArrayList<>();
            for (String fieldName : key.getFields()) {
                Schema.Field field = schema.getField(fieldName);
                if (field == null) {
                    collector.addFailure("Unknown field in join key: " + stageName + "." + fieldName,
                                       "Verify field exists in stage schema");
                } else {
                    stageKeyTypes.add(field.getSchema().isNullable() ? 
                                    field.getSchema().getNonNullable().getType() : 
                                    field.getSchema().getType());
                }
            }
            keyTypes.add(stageKeyTypes);
        }
        
        // Validate key type compatibility
        if (keyTypes.size() > 1) {
            collector.addFailure("Join key types are not compatible across stages", 
                               "Ensure all join keys have the same types");
        }
    }
}

Performance Optimization

Broadcast Join Optimization

private boolean shouldBroadcastStage(String stageName, Schema schema, 
                                   Map<String, Object> stageProperties) {
    // Check explicit broadcast hint
    Object broadcastHint = stageProperties.get("broadcast");
    if (Boolean.TRUE.equals(broadcastHint)) {
        return true;
    }
    
    // Estimate stage size
    Object recordCountHint = stageProperties.get("estimatedRecords");
    if (recordCountHint instanceof Number) {
        long estimatedRecords = ((Number) recordCountHint).longValue();
        int fieldCount = schema.getFields().size();
        
        // Rough size estimation (bytes)
        long estimatedSize = estimatedRecords * fieldCount * 50; // 50 bytes avg per field
        
        // Broadcast if less than 200MB
        return estimatedSize < 200 * 1024 * 1024;
    }
    
    // Default: don't broadcast
    return false;
}

Partitioning Strategy

private int calculateOptimalPartitions(Map<String, Schema> inputSchemas, 
                                     JoinDefinition joinDef) {
    // Calculate total estimated input size
    long totalEstimatedSize = 0;
    for (JoinStage stage : joinDef.getStages()) {
        if (!stage.isBroadcast()) {
            // Estimate non-broadcast stage sizes
            totalEstimatedSize += estimateStageSize(stage.getStageName(), inputSchemas);
        }
    }
    
    // Target ~128MB per partition
    long targetPartitionSize = 128 * 1024 * 1024;
    int calculatedPartitions = (int) Math.max(1, totalEstimatedSize / targetPartitionSize);
    
    // Cap at reasonable limits
    return Math.min(Math.max(calculatedPartitions, 1), 1000);
}

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap-etl-api

docs

actions-conditions.md

batch-processing.md

core-pipeline.md

data-connectors.md

index.md

join-operations.md

lineage-metadata.md

sql-engine.md

validation.md

tile.json