CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform
—
Comprehensive validation system for early error detection, structured error reporting, and configuration validation in CDAP ETL pipelines.
Exception class for validation failures with structured error information.
package io.cdap.cdap.etl.api.validation;
public class ValidationException extends Exception implements FailureDetailsProvider {
/**
* Create validation exception with list of failures.
*/
public ValidationException(List<ValidationFailure> failures) {}
/**
* Get validation failures.
*/
public List<ValidationFailure> getFailures() {}
}Individual validation failure with detailed error information.
package io.cdap.cdap.etl.api.validation;
public class ValidationFailure {
/**
* Create validation failure with message.
*/
public ValidationFailure(String message) {}
/**
* Add cause information to failure.
*/
public ValidationFailure withCause(Cause cause) {}
/**
* Associate failure with configuration property.
*/
public ValidationFailure withConfigProperty(String stageConfigProperty) {}
/**
* Associate failure with input schema field.
*/
public ValidationFailure withInputSchemaField(String fieldName) {}
/**
* Associate failure with output schema field.
*/
public ValidationFailure withOutputSchemaField(String fieldName) {}
/**
* Add corrective action suggestion.
*/
public ValidationFailure withCorrectiveAction(String correctiveAction) {}
}Interface for collecting validation failures during pipeline configuration.
package io.cdap.cdap.etl.api;
public interface FailureCollector {
/**
* Add validation failure with message and corrective action.
*/
ValidationFailure addFailure(String message, @Nullable String correctiveAction);
/**
* Get all collected validation failures.
*/
List<ValidationFailure> getValidationFailures();
}Validation Usage Example:
@Plugin(type = Transform.PLUGIN_TYPE)
@Name("DataValidator")
public class DataValidatorTransform extends Transform<StructuredRecord, StructuredRecord> {
private final Config config;
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
FailureCollector collector = stageConfigurer.getFailureCollector();
// Validate configuration
validateConfig(collector);
// Validate input schema
Schema inputSchema = stageConfigurer.getInputSchema();
if (inputSchema != null) {
validateInputSchema(inputSchema, collector);
// Set output schema if validation passes
if (collector.getValidationFailures().isEmpty()) {
Schema outputSchema = buildOutputSchema(inputSchema);
stageConfigurer.setOutputSchema(outputSchema);
}
}
}
private void validateConfig(FailureCollector collector) {
// Validate required fields
if (config.requiredFields == null || config.requiredFields.isEmpty()) {
collector.addFailure("Required fields must be specified",
"Provide comma-separated list of required field names")
.withConfigProperty("requiredFields");
}
// Validate field patterns
if (config.fieldPattern != null && !config.fieldPattern.isEmpty()) {
try {
Pattern.compile(config.fieldPattern);
} catch (PatternSyntaxException e) {
collector.addFailure("Invalid field pattern: " + e.getMessage(),
"Provide valid regular expression")
.withConfigProperty("fieldPattern")
.withCause(new Cause(CauseAttributes.STAGE_CONFIG));
}
}
// Validate numeric ranges
if (config.minValue != null && config.maxValue != null &&
config.minValue > config.maxValue) {
collector.addFailure("Minimum value cannot be greater than maximum value",
"Ensure minValue <= maxValue")
.withConfigProperty("minValue")
.withConfigProperty("maxValue");
}
}
private void validateInputSchema(Schema inputSchema, FailureCollector collector) {
// Check required fields exist
for (String requiredField : config.requiredFields) {
Schema.Field field = inputSchema.getField(requiredField);
if (field == null) {
collector.addFailure("Required field not found: " + requiredField,
"Add field to input schema or update configuration")
.withInputSchemaField(requiredField);
} else {
// Validate field types
validateFieldType(field, collector);
}
}
// Check for unsupported field types
for (Schema.Field field : inputSchema.getFields()) {
if (field.getSchema().getType() == Schema.Type.UNION) {
Schema nonNullSchema = field.getSchema().isNullable() ?
field.getSchema().getNonNullable() : field.getSchema();
if (nonNullSchema.getType() == Schema.Type.MAP ||
nonNullSchema.getType() == Schema.Type.ARRAY) {
collector.addFailure("Unsupported field type: " + nonNullSchema.getType(),
"Use simple types or flatten complex structures")
.withInputSchemaField(field.getName());
}
}
}
}
private void validateFieldType(Schema.Field field, FailureCollector collector) {
Schema fieldSchema = field.getSchema().isNullable() ?
field.getSchema().getNonNullable() : field.getSchema();
String fieldName = field.getName();
// Check if field type is supported for validation
switch (fieldSchema.getType()) {
case STRING:
if (config.stringValidation != null) {
validateStringField(fieldName, config.stringValidation, collector);
}
break;
case INT:
case LONG:
case FLOAT:
case DOUBLE:
if (config.numericValidation != null) {
validateNumericField(fieldName, fieldSchema.getType(),
config.numericValidation, collector);
}
break;
case BOOLEAN:
// Boolean validation (if needed)
break;
default:
if (config.strictTypeChecking) {
collector.addFailure("Unsupported field type for validation: " +
fieldSchema.getType(),
"Use supported types or disable strict checking")
.withInputSchemaField(fieldName);
}
}
}
}Input format with validation capabilities for file-based sources.
package io.cdap.cdap.etl.api.validation;
public interface ValidatingInputFormat extends InputFormatProvider {
public static final String PLUGIN_TYPE = "validatingInputFormat";
}Output format with validation capabilities for file-based sinks.
package io.cdap.cdap.etl.api.validation;
public interface ValidatingOutputFormat extends OutputFormatProvider {
public static final String PLUGIN_TYPE = "validatingOutputFormat";
}Format Validation Example:
@Plugin(type = ValidatingInputFormat.PLUGIN_TYPE)
@Name("ValidatingCSVFormat")
public class ValidatingCSVInputFormat implements ValidatingInputFormat {
private final Config config;
@Override
public String getInputFormatClassName() {
return TextInputFormat.class.getName();
}
@Override
public Map<String, String> getInputFormatConfiguration() {
Map<String, String> conf = new HashMap<>();
// Configure input format
return conf;
}
public void validateFormat(FormatContext context) throws ValidationException {
FailureCollector collector = context.getFailureCollector();
Schema inputSchema = context.getInputSchema();
if (inputSchema == null) {
collector.addFailure("Input schema is required for CSV validation",
"Define input schema");
return;
}
// Validate CSV configuration
validateCSVConfig(collector);
// Validate schema compatibility
validateSchemaForCSV(inputSchema, collector);
if (!collector.getValidationFailures().isEmpty()) {
throw new ValidationException(collector.getValidationFailures());
}
}
private void validateCSVConfig(FailureCollector collector) {
if (config.delimiter == null || config.delimiter.isEmpty()) {
collector.addFailure("CSV delimiter is required", "Specify field delimiter")
.withConfigProperty("delimiter");
} else if (config.delimiter.length() > 1) {
collector.addFailure("CSV delimiter must be single character",
"Use single character delimiter")
.withConfigProperty("delimiter");
}
if (config.skipHeader && config.headerLine < 0) {
collector.addFailure("Invalid header line number",
"Header line must be non-negative")
.withConfigProperty("headerLine");
}
}
private void validateSchemaForCSV(Schema schema, FailureCollector collector) {
for (Schema.Field field : schema.getFields()) {
Schema fieldSchema = field.getSchema().isNullable() ?
field.getSchema().getNonNullable() : field.getSchema();
// Check if field type is supported in CSV
if (fieldSchema.getType() == Schema.Type.ARRAY ||
fieldSchema.getType() == Schema.Type.MAP ||
fieldSchema.getType() == Schema.Type.RECORD) {
collector.addFailure("Complex type not supported in CSV: " + fieldSchema.getType(),
"Use simple types for CSV format")
.withInputSchemaField(field.getName());
}
}
}
}Collection interface for input files to be validated.
package io.cdap.cdap.etl.api.validation;
public interface InputFiles extends Iterable<InputFile> {
// Provides iteration over input files for validation
}Individual input file interface for validation operations.
package io.cdap.cdap.etl.api.validation;
public interface InputFile {
/**
* Get file name.
*/
String getName();
/**
* Get file size in bytes.
*/
long getSize();
/**
* Open file for reading.
*/
SeekableInputStream open() throws IOException;
}Abstract seekable input stream for file validation.
package io.cdap.cdap.etl.api.validation;
public abstract class SeekableInputStream extends InputStream {
/**
* Seek to position in stream.
*/
public abstract void seek(long pos) throws IOException;
/**
* Get current position in stream.
*/
public abstract long getPos() throws IOException;
}Delegating implementation of seekable input stream.
package io.cdap.cdap.etl.api.validation;
public class DelegatingSeekableInputStream extends SeekableInputStream {
// Delegates to underlying seekable stream implementation
}File Validation Example:
public class FileFormatValidator {
public static void validateJSONFiles(InputFiles inputFiles, FailureCollector collector) {
for (InputFile inputFile : inputFiles) {
try (SeekableInputStream stream = inputFile.open()) {
validateJSONFile(inputFile.getName(), stream, collector);
} catch (IOException e) {
collector.addFailure("Failed to read file: " + inputFile.getName(),
"Check file permissions and format")
.withCause(new Cause(CauseAttributes.IO_ERROR));
}
}
}
private static void validateJSONFile(String fileName, SeekableInputStream stream,
FailureCollector collector) throws IOException {
// Sample first 1MB for validation
byte[] buffer = new byte[1024 * 1024];
int bytesRead = stream.read(buffer);
if (bytesRead <= 0) {
collector.addFailure("Empty file: " + fileName, "Provide non-empty JSON file");
return;
}
String content = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
// Validate JSON structure
try {
JsonParser parser = new JsonParser();
JsonElement element = parser.parse(content);
if (!element.isJsonObject() && !element.isJsonArray()) {
collector.addFailure("Invalid JSON structure in: " + fileName,
"File must contain JSON object or array");
}
} catch (JsonSyntaxException e) {
collector.addFailure("Invalid JSON syntax in: " + fileName + " - " + e.getMessage(),
"Fix JSON syntax errors");
}
// Reset stream position
stream.seek(0);
}
}Context for format validation operations.
package io.cdap.cdap.etl.api.validation;
public class FormatContext {
/**
* Create format context with collector and input schema.
*/
public FormatContext(FailureCollector collector, @Nullable Schema inputSchema) {}
/**
* Get failure collector.
*/
public FailureCollector getFailureCollector() {}
/**
* Get input schema.
*/
@Nullable
public Schema getInputSchema() {}
}Exception for invalid stage configuration.
package io.cdap.cdap.etl.api.validation;
public class InvalidStageException extends Exception {
/**
* Create exception with message.
*/
public InvalidStageException(String message) {}
/**
* Create exception with message and cause.
*/
public InvalidStageException(String message, Throwable cause) {}
}Exception for invalid configuration properties.
package io.cdap.cdap.etl.api.validation;
public class InvalidConfigPropertyException extends InvalidStageException {
/**
* Create exception for invalid property.
*/
public InvalidConfigPropertyException(String message, String propertyName) {}
/**
* Get property name that caused the exception.
*/
public String getPropertyName() {}
}Attributes for validation failure causes.
package io.cdap.cdap.etl.api.validation;
public class CauseAttributes {
// Constants for different cause types
public static final String STAGE_CONFIG = "stageConfig";
public static final String INPUT_SCHEMA = "inputSchema";
public static final String OUTPUT_SCHEMA = "outputSchema";
public static final String IO_ERROR = "ioError";
public static final String NETWORK_ERROR = "networkError";
}public class SchemaValidator {
public static void validateSchemaCompatibility(Schema sourceSchema, Schema targetSchema,
FailureCollector collector) {
if (sourceSchema == null || targetSchema == null) {
collector.addFailure("Schema cannot be null", "Provide valid schema");
return;
}
// Check field compatibility
for (Schema.Field targetField : targetSchema.getFields()) {
String fieldName = targetField.getName();
Schema.Field sourceField = sourceSchema.getField(fieldName);
if (sourceField == null) {
if (!targetField.getSchema().isNullable()) {
collector.addFailure("Required field missing in source: " + fieldName,
"Add field to source or make target field nullable")
.withInputSchemaField(fieldName)
.withOutputSchemaField(fieldName);
}
} else {
validateFieldCompatibility(sourceField, targetField, collector);
}
}
}
private static void validateFieldCompatibility(Schema.Field sourceField,
Schema.Field targetField,
FailureCollector collector) {
String fieldName = sourceField.getName();
Schema sourceType = sourceField.getSchema().isNullable() ?
sourceField.getSchema().getNonNullable() : sourceField.getSchema();
Schema targetType = targetField.getSchema().isNullable() ?
targetField.getSchema().getNonNullable() : targetField.getSchema();
if (!isCompatibleType(sourceType, targetType)) {
collector.addFailure("Incompatible field types for: " + fieldName +
" (source: " + sourceType.getType() +
", target: " + targetType.getType() + ")",
"Convert field type or update schema")
.withInputSchemaField(fieldName)
.withOutputSchemaField(fieldName);
}
// Check nullability
if (!sourceField.getSchema().isNullable() && targetField.getSchema().isNullable()) {
// This is fine - non-null to nullable is safe
} else if (sourceField.getSchema().isNullable() && !targetField.getSchema().isNullable()) {
collector.addFailure("Cannot convert nullable field to non-nullable: " + fieldName,
"Make target field nullable or add null handling")
.withInputSchemaField(fieldName)
.withOutputSchemaField(fieldName);
}
}
private static boolean isCompatibleType(Schema sourceType, Schema targetType) {
if (sourceType.getType() == targetType.getType()) {
return true;
}
// Check for safe type conversions
switch (sourceType.getType()) {
case INT:
return targetType.getType() == Schema.Type.LONG ||
targetType.getType() == Schema.Type.FLOAT ||
targetType.getType() == Schema.Type.DOUBLE ||
targetType.getType() == Schema.Type.STRING;
case LONG:
return targetType.getType() == Schema.Type.FLOAT ||
targetType.getType() == Schema.Type.DOUBLE ||
targetType.getType() == Schema.Type.STRING;
case FLOAT:
return targetType.getType() == Schema.Type.DOUBLE ||
targetType.getType() == Schema.Type.STRING;
case DOUBLE:
return targetType.getType() == Schema.Type.STRING;
case BOOLEAN:
return targetType.getType() == Schema.Type.STRING;
default:
return false;
}
}
}public abstract class BaseValidatedPlugin {
protected void validateRequiredProperty(String propertyValue, String propertyName,
FailureCollector collector) {
if (propertyValue == null || propertyValue.trim().isEmpty()) {
collector.addFailure("Property is required: " + propertyName,
"Provide value for " + propertyName)
.withConfigProperty(propertyName);
}
}
protected void validateNumericRange(String value, String propertyName,
long minValue, long maxValue,
FailureCollector collector) {
if (value == null || value.trim().isEmpty()) {
return; // Let required validation handle null/empty
}
try {
long numValue = Long.parseLong(value.trim());
if (numValue < minValue || numValue > maxValue) {
collector.addFailure("Property value out of range: " + propertyName +
" (valid range: " + minValue + "-" + maxValue + ")",
"Set value between " + minValue + " and " + maxValue)
.withConfigProperty(propertyName);
}
} catch (NumberFormatException e) {
collector.addFailure("Invalid numeric value for: " + propertyName,
"Provide valid integer value")
.withConfigProperty(propertyName);
}
}
protected void validateRegexPattern(String pattern, String propertyName,
FailureCollector collector) {
if (pattern == null || pattern.trim().isEmpty()) {
return;
}
try {
Pattern.compile(pattern);
} catch (PatternSyntaxException e) {
collector.addFailure("Invalid regex pattern for: " + propertyName +
" - " + e.getMessage(),
"Provide valid regular expression")
.withConfigProperty(propertyName)
.withCause(new Cause(CauseAttributes.STAGE_CONFIG));
}
}
protected void validateConnectionString(String connectionString, String propertyName,
FailureCollector collector) {
if (connectionString == null || connectionString.trim().isEmpty()) {
collector.addFailure("Connection string is required: " + propertyName,
"Provide valid connection string")
.withConfigProperty(propertyName);
return;
}
try {
// Basic URL validation
new URL(connectionString);
} catch (MalformedURLException e) {
// Try as JDBC URL
if (!connectionString.startsWith("jdbc:")) {
collector.addFailure("Invalid connection string format: " + propertyName,
"Provide valid URL or JDBC connection string")
.withConfigProperty(propertyName);
}
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-etl-api