CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap-etl-api

CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform

Pending
Overview
Eval results
Files

validation.mddocs/

Validation Framework

Comprehensive validation system for early error detection, structured error reporting, and configuration validation in CDAP ETL pipelines.

Core Validation Interfaces

ValidationException

Exception class for validation failures with structured error information.

package io.cdap.cdap.etl.api.validation;

public class ValidationException extends Exception implements FailureDetailsProvider {
    /**
     * Create validation exception with list of failures.
     */
    public ValidationException(List<ValidationFailure> failures) {}
    
    /**
     * Get validation failures.
     */
    public List<ValidationFailure> getFailures() {}
}

ValidationFailure

Individual validation failure with detailed error information.

package io.cdap.cdap.etl.api.validation;

public class ValidationFailure {
    /**
     * Create validation failure with message.
     */
    public ValidationFailure(String message) {}
    
    /**
     * Add cause information to failure.
     */
    public ValidationFailure withCause(Cause cause) {}
    
    /**
     * Associate failure with configuration property.
     */
    public ValidationFailure withConfigProperty(String stageConfigProperty) {}
    
    /**
     * Associate failure with input schema field.
     */
    public ValidationFailure withInputSchemaField(String fieldName) {}
    
    /**
     * Associate failure with output schema field.
     */
    public ValidationFailure withOutputSchemaField(String fieldName) {}
    
    /**
     * Add corrective action suggestion.
     */
    public ValidationFailure withCorrectiveAction(String correctiveAction) {}
}

FailureCollector

Interface for collecting validation failures during pipeline configuration.

package io.cdap.cdap.etl.api;

public interface FailureCollector {
    /**
     * Add validation failure with message and corrective action.
     */
    ValidationFailure addFailure(String message, @Nullable String correctiveAction);
    
    /**
     * Get all collected validation failures.
     */
    List<ValidationFailure> getValidationFailures();
}

Validation Usage Example:

@Plugin(type = Transform.PLUGIN_TYPE)
@Name("DataValidator")
public class DataValidatorTransform extends Transform<StructuredRecord, StructuredRecord> {
    
    private final Config config;
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        FailureCollector collector = stageConfigurer.getFailureCollector();
        
        // Validate configuration
        validateConfig(collector);
        
        // Validate input schema
        Schema inputSchema = stageConfigurer.getInputSchema();
        if (inputSchema != null) {
            validateInputSchema(inputSchema, collector);
            
            // Set output schema if validation passes
            if (collector.getValidationFailures().isEmpty()) {
                Schema outputSchema = buildOutputSchema(inputSchema);
                stageConfigurer.setOutputSchema(outputSchema);
            }
        }
    }
    
    private void validateConfig(FailureCollector collector) {
        // Validate required fields
        if (config.requiredFields == null || config.requiredFields.isEmpty()) {
            collector.addFailure("Required fields must be specified", 
                               "Provide comma-separated list of required field names")
                    .withConfigProperty("requiredFields");
        }
        
        // Validate field patterns
        if (config.fieldPattern != null && !config.fieldPattern.isEmpty()) {
            try {
                Pattern.compile(config.fieldPattern);
            } catch (PatternSyntaxException e) {
                collector.addFailure("Invalid field pattern: " + e.getMessage(), 
                                   "Provide valid regular expression")
                        .withConfigProperty("fieldPattern")
                        .withCause(new Cause(CauseAttributes.STAGE_CONFIG));
            }
        }
        
        // Validate numeric ranges
        if (config.minValue != null && config.maxValue != null && 
            config.minValue > config.maxValue) {
            collector.addFailure("Minimum value cannot be greater than maximum value", 
                               "Ensure minValue <= maxValue")
                    .withConfigProperty("minValue")
                    .withConfigProperty("maxValue");
        }
    }
    
    private void validateInputSchema(Schema inputSchema, FailureCollector collector) {
        // Check required fields exist
        for (String requiredField : config.requiredFields) {
            Schema.Field field = inputSchema.getField(requiredField);
            if (field == null) {
                collector.addFailure("Required field not found: " + requiredField, 
                                   "Add field to input schema or update configuration")
                        .withInputSchemaField(requiredField);
            } else {
                // Validate field types
                validateFieldType(field, collector);
            }
        }
        
        // Check for unsupported field types
        for (Schema.Field field : inputSchema.getFields()) {
            if (field.getSchema().getType() == Schema.Type.UNION) {
                Schema nonNullSchema = field.getSchema().isNullable() ? 
                    field.getSchema().getNonNullable() : field.getSchema();
                    
                if (nonNullSchema.getType() == Schema.Type.MAP || 
                    nonNullSchema.getType() == Schema.Type.ARRAY) {
                    collector.addFailure("Unsupported field type: " + nonNullSchema.getType(), 
                                       "Use simple types or flatten complex structures")
                            .withInputSchemaField(field.getName());
                }
            }
        }
    }
    
    private void validateFieldType(Schema.Field field, FailureCollector collector) {
        Schema fieldSchema = field.getSchema().isNullable() ? 
            field.getSchema().getNonNullable() : field.getSchema();
            
        String fieldName = field.getName();
        
        // Check if field type is supported for validation
        switch (fieldSchema.getType()) {
            case STRING:
                if (config.stringValidation != null) {
                    validateStringField(fieldName, config.stringValidation, collector);
                }
                break;
            case INT:
            case LONG:
            case FLOAT:
            case DOUBLE:
                if (config.numericValidation != null) {
                    validateNumericField(fieldName, fieldSchema.getType(), 
                                       config.numericValidation, collector);
                }
                break;
            case BOOLEAN:
                // Boolean validation (if needed)
                break;
            default:
                if (config.strictTypeChecking) {
                    collector.addFailure("Unsupported field type for validation: " + 
                                       fieldSchema.getType(), 
                                       "Use supported types or disable strict checking")
                            .withInputSchemaField(fieldName);
                }
        }
    }
}

Format Validation

ValidatingInputFormat

Input format with validation capabilities for file-based sources.

package io.cdap.cdap.etl.api.validation;

public interface ValidatingInputFormat extends InputFormatProvider {
    public static final String PLUGIN_TYPE = "validatingInputFormat";
}

ValidatingOutputFormat

Output format with validation capabilities for file-based sinks.

package io.cdap.cdap.etl.api.validation;

public interface ValidatingOutputFormat extends OutputFormatProvider {
    public static final String PLUGIN_TYPE = "validatingOutputFormat";
}

Format Validation Example:

@Plugin(type = ValidatingInputFormat.PLUGIN_TYPE)
@Name("ValidatingCSVFormat")
public class ValidatingCSVInputFormat implements ValidatingInputFormat {
    
    private final Config config;
    
    @Override
    public String getInputFormatClassName() {
        return TextInputFormat.class.getName();
    }
    
    @Override
    public Map<String, String> getInputFormatConfiguration() {
        Map<String, String> conf = new HashMap<>();
        // Configure input format
        return conf;
    }
    
    public void validateFormat(FormatContext context) throws ValidationException {
        FailureCollector collector = context.getFailureCollector();
        Schema inputSchema = context.getInputSchema();
        
        if (inputSchema == null) {
            collector.addFailure("Input schema is required for CSV validation", 
                               "Define input schema");
            return;
        }
        
        // Validate CSV configuration
        validateCSVConfig(collector);
        
        // Validate schema compatibility
        validateSchemaForCSV(inputSchema, collector);
        
        if (!collector.getValidationFailures().isEmpty()) {
            throw new ValidationException(collector.getValidationFailures());
        }
    }
    
    private void validateCSVConfig(FailureCollector collector) {
        if (config.delimiter == null || config.delimiter.isEmpty()) {
            collector.addFailure("CSV delimiter is required", "Specify field delimiter")
                    .withConfigProperty("delimiter");
        } else if (config.delimiter.length() > 1) {
            collector.addFailure("CSV delimiter must be single character", 
                               "Use single character delimiter")
                    .withConfigProperty("delimiter");
        }
        
        if (config.skipHeader && config.headerLine < 0) {
            collector.addFailure("Invalid header line number", 
                               "Header line must be non-negative")
                    .withConfigProperty("headerLine");
        }
    }
    
    private void validateSchemaForCSV(Schema schema, FailureCollector collector) {
        for (Schema.Field field : schema.getFields()) {
            Schema fieldSchema = field.getSchema().isNullable() ? 
                field.getSchema().getNonNullable() : field.getSchema();
                
            // Check if field type is supported in CSV
            if (fieldSchema.getType() == Schema.Type.ARRAY || 
                fieldSchema.getType() == Schema.Type.MAP || 
                fieldSchema.getType() == Schema.Type.RECORD) {
                collector.addFailure("Complex type not supported in CSV: " + fieldSchema.getType(),
                                   "Use simple types for CSV format")
                        .withInputSchemaField(field.getName());
            }
        }
    }
}

File Input Validation

InputFiles

Collection interface for input files to be validated.

package io.cdap.cdap.etl.api.validation;

public interface InputFiles extends Iterable<InputFile> {
    // Provides iteration over input files for validation
}

InputFile

Individual input file interface for validation operations.

package io.cdap.cdap.etl.api.validation;

public interface InputFile {
    /**
     * Get file name.
     */
    String getName();
    
    /**
     * Get file size in bytes.
     */
    long getSize();
    
    /**
     * Open file for reading.
     */
    SeekableInputStream open() throws IOException;
}

SeekableInputStream

Abstract seekable input stream for file validation.

package io.cdap.cdap.etl.api.validation;

public abstract class SeekableInputStream extends InputStream {
    /**
     * Seek to position in stream.
     */
    public abstract void seek(long pos) throws IOException;
    
    /**
     * Get current position in stream.
     */
    public abstract long getPos() throws IOException;
}

DelegatingSeekableInputStream

Delegating implementation of seekable input stream.

package io.cdap.cdap.etl.api.validation;

public class DelegatingSeekableInputStream extends SeekableInputStream {
    // Delegates to underlying seekable stream implementation
}

File Validation Example:

public class FileFormatValidator {
    
    public static void validateJSONFiles(InputFiles inputFiles, FailureCollector collector) {
        for (InputFile inputFile : inputFiles) {
            try (SeekableInputStream stream = inputFile.open()) {
                validateJSONFile(inputFile.getName(), stream, collector);
            } catch (IOException e) {
                collector.addFailure("Failed to read file: " + inputFile.getName(), 
                                   "Check file permissions and format")
                        .withCause(new Cause(CauseAttributes.IO_ERROR));
            }
        }
    }
    
    private static void validateJSONFile(String fileName, SeekableInputStream stream,
                                       FailureCollector collector) throws IOException {
        // Sample first 1MB for validation
        byte[] buffer = new byte[1024 * 1024];
        int bytesRead = stream.read(buffer);
        
        if (bytesRead <= 0) {
            collector.addFailure("Empty file: " + fileName, "Provide non-empty JSON file");
            return;
        }
        
        String content = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
        
        // Validate JSON structure
        try {
            JsonParser parser = new JsonParser();
            JsonElement element = parser.parse(content);
            
            if (!element.isJsonObject() && !element.isJsonArray()) {
                collector.addFailure("Invalid JSON structure in: " + fileName,
                                   "File must contain JSON object or array");
            }
        } catch (JsonSyntaxException e) {
            collector.addFailure("Invalid JSON syntax in: " + fileName + " - " + e.getMessage(),
                               "Fix JSON syntax errors");
        }
        
        // Reset stream position
        stream.seek(0);
    }
}

Validation Context and Configuration

FormatContext

Context for format validation operations.

package io.cdap.cdap.etl.api.validation;

public class FormatContext {
    /**
     * Create format context with collector and input schema.
     */
    public FormatContext(FailureCollector collector, @Nullable Schema inputSchema) {}
    
    /**
     * Get failure collector.
     */
    public FailureCollector getFailureCollector() {}
    
    /**
     * Get input schema.
     */
    @Nullable
    public Schema getInputSchema() {}
}

Validation Exceptions

InvalidStageException

Exception for invalid stage configuration.

package io.cdap.cdap.etl.api.validation;

public class InvalidStageException extends Exception {
    /**
     * Create exception with message.
     */
    public InvalidStageException(String message) {}
    
    /**
     * Create exception with message and cause.
     */
    public InvalidStageException(String message, Throwable cause) {}
}

InvalidConfigPropertyException

Exception for invalid configuration properties.

package io.cdap.cdap.etl.api.validation;

public class InvalidConfigPropertyException extends InvalidStageException {
    /**
     * Create exception for invalid property.
     */
    public InvalidConfigPropertyException(String message, String propertyName) {}
    
    /**
     * Get property name that caused the exception.
     */
    public String getPropertyName() {}
}

Validation Utilities

CauseAttributes

Attributes for validation failure causes.

package io.cdap.cdap.etl.api.validation;

public class CauseAttributes {
    // Constants for different cause types
    public static final String STAGE_CONFIG = "stageConfig";
    public static final String INPUT_SCHEMA = "inputSchema";
    public static final String OUTPUT_SCHEMA = "outputSchema";
    public static final String IO_ERROR = "ioError";
    public static final String NETWORK_ERROR = "networkError";
}

Advanced Validation Patterns

Schema Compatibility Validation

public class SchemaValidator {
    
    public static void validateSchemaCompatibility(Schema sourceSchema, Schema targetSchema,
                                                 FailureCollector collector) {
        if (sourceSchema == null || targetSchema == null) {
            collector.addFailure("Schema cannot be null", "Provide valid schema");
            return;
        }
        
        // Check field compatibility
        for (Schema.Field targetField : targetSchema.getFields()) {
            String fieldName = targetField.getName();
            Schema.Field sourceField = sourceSchema.getField(fieldName);
            
            if (sourceField == null) {
                if (!targetField.getSchema().isNullable()) {
                    collector.addFailure("Required field missing in source: " + fieldName,
                                       "Add field to source or make target field nullable")
                            .withInputSchemaField(fieldName)
                            .withOutputSchemaField(fieldName);
                }
            } else {
                validateFieldCompatibility(sourceField, targetField, collector);
            }
        }
    }
    
    private static void validateFieldCompatibility(Schema.Field sourceField, 
                                                 Schema.Field targetField,
                                                 FailureCollector collector) {
        String fieldName = sourceField.getName();
        Schema sourceType = sourceField.getSchema().isNullable() ? 
            sourceField.getSchema().getNonNullable() : sourceField.getSchema();
        Schema targetType = targetField.getSchema().isNullable() ? 
            targetField.getSchema().getNonNullable() : targetField.getSchema();
        
        if (!isCompatibleType(sourceType, targetType)) {
            collector.addFailure("Incompatible field types for: " + fieldName + 
                               " (source: " + sourceType.getType() + 
                               ", target: " + targetType.getType() + ")",
                               "Convert field type or update schema")
                    .withInputSchemaField(fieldName)
                    .withOutputSchemaField(fieldName);
        }
        
        // Check nullability
        if (!sourceField.getSchema().isNullable() && targetField.getSchema().isNullable()) {
            // This is fine - non-null to nullable is safe
        } else if (sourceField.getSchema().isNullable() && !targetField.getSchema().isNullable()) {
            collector.addFailure("Cannot convert nullable field to non-nullable: " + fieldName,
                               "Make target field nullable or add null handling")
                    .withInputSchemaField(fieldName)
                    .withOutputSchemaField(fieldName);
        }
    }
    
    private static boolean isCompatibleType(Schema sourceType, Schema targetType) {
        if (sourceType.getType() == targetType.getType()) {
            return true;
        }
        
        // Check for safe type conversions
        switch (sourceType.getType()) {
            case INT:
                return targetType.getType() == Schema.Type.LONG || 
                       targetType.getType() == Schema.Type.FLOAT ||
                       targetType.getType() == Schema.Type.DOUBLE ||
                       targetType.getType() == Schema.Type.STRING;
            case LONG:
                return targetType.getType() == Schema.Type.FLOAT ||
                       targetType.getType() == Schema.Type.DOUBLE ||
                       targetType.getType() == Schema.Type.STRING;
            case FLOAT:
                return targetType.getType() == Schema.Type.DOUBLE ||
                       targetType.getType() == Schema.Type.STRING;
            case DOUBLE:
                return targetType.getType() == Schema.Type.STRING;
            case BOOLEAN:
                return targetType.getType() == Schema.Type.STRING;
            default:
                return false;
        }
    }
}

Configuration Validation Patterns

public abstract class BaseValidatedPlugin {
    
    protected void validateRequiredProperty(String propertyValue, String propertyName,
                                          FailureCollector collector) {
        if (propertyValue == null || propertyValue.trim().isEmpty()) {
            collector.addFailure("Property is required: " + propertyName,
                               "Provide value for " + propertyName)
                    .withConfigProperty(propertyName);
        }
    }
    
    protected void validateNumericRange(String value, String propertyName, 
                                      long minValue, long maxValue,
                                      FailureCollector collector) {
        if (value == null || value.trim().isEmpty()) {
            return; // Let required validation handle null/empty
        }
        
        try {
            long numValue = Long.parseLong(value.trim());
            if (numValue < minValue || numValue > maxValue) {
                collector.addFailure("Property value out of range: " + propertyName +
                                   " (valid range: " + minValue + "-" + maxValue + ")",
                                   "Set value between " + minValue + " and " + maxValue)
                        .withConfigProperty(propertyName);
            }
        } catch (NumberFormatException e) {
            collector.addFailure("Invalid numeric value for: " + propertyName,
                               "Provide valid integer value")
                    .withConfigProperty(propertyName);
        }
    }
    
    protected void validateRegexPattern(String pattern, String propertyName,
                                      FailureCollector collector) {
        if (pattern == null || pattern.trim().isEmpty()) {
            return;
        }
        
        try {
            Pattern.compile(pattern);
        } catch (PatternSyntaxException e) {
            collector.addFailure("Invalid regex pattern for: " + propertyName + 
                               " - " + e.getMessage(),
                               "Provide valid regular expression")
                    .withConfigProperty(propertyName)
                    .withCause(new Cause(CauseAttributes.STAGE_CONFIG));
        }
    }
    
    protected void validateConnectionString(String connectionString, String propertyName,
                                          FailureCollector collector) {
        if (connectionString == null || connectionString.trim().isEmpty()) {
            collector.addFailure("Connection string is required: " + propertyName,
                               "Provide valid connection string")
                    .withConfigProperty(propertyName);
            return;
        }
        
        try {
            // Basic URL validation
            new URL(connectionString);
        } catch (MalformedURLException e) {
            // Try as JDBC URL
            if (!connectionString.startsWith("jdbc:")) {
                collector.addFailure("Invalid connection string format: " + propertyName,
                                   "Provide valid URL or JDBC connection string")
                        .withConfigProperty(propertyName);
            }
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap-etl-api

docs

actions-conditions.md

batch-processing.md

core-pipeline.md

data-connectors.md

index.md

join-operations.md

lineage-metadata.md

sql-engine.md

validation.md

tile.json