0
# Validation Framework
1
2
Comprehensive validation system for early error detection, structured error reporting, and configuration validation in CDAP ETL pipelines.
3
4
## Core Validation Interfaces
5
6
### ValidationException
7
8
Exception class for validation failures with structured error information.
9
10
```java { .api }
11
package io.cdap.cdap.etl.api.validation;
12
13
public class ValidationException extends Exception implements FailureDetailsProvider {
14
/**
15
* Create validation exception with list of failures.
16
*/
17
public ValidationException(List<ValidationFailure> failures) {}
18
19
/**
20
* Get validation failures.
21
*/
22
public List<ValidationFailure> getFailures() {}
23
}
24
```
25
26
### ValidationFailure
27
28
Individual validation failure with detailed error information.
29
30
```java { .api }
31
package io.cdap.cdap.etl.api.validation;
32
33
public class ValidationFailure {
34
/**
35
* Create validation failure with message.
36
*/
37
public ValidationFailure(String message) {}
38
39
/**
40
* Add cause information to failure.
41
*/
42
public ValidationFailure withCause(Cause cause) {}
43
44
/**
45
* Associate failure with configuration property.
46
*/
47
public ValidationFailure withConfigProperty(String stageConfigProperty) {}
48
49
/**
50
* Associate failure with input schema field.
51
*/
52
public ValidationFailure withInputSchemaField(String fieldName) {}
53
54
/**
55
* Associate failure with output schema field.
56
*/
57
public ValidationFailure withOutputSchemaField(String fieldName) {}
58
59
/**
60
* Add corrective action suggestion.
61
*/
62
public ValidationFailure withCorrectiveAction(String correctiveAction) {}
63
}
64
```
65
66
### FailureCollector
67
68
Interface for collecting validation failures during pipeline configuration.
69
70
```java { .api }
71
package io.cdap.cdap.etl.api;
72
73
public interface FailureCollector {
74
/**
75
* Add validation failure with message and corrective action.
76
*/
77
ValidationFailure addFailure(String message, @Nullable String correctiveAction);
78
79
/**
80
* Get all collected validation failures.
81
*/
82
List<ValidationFailure> getValidationFailures();
83
}
84
```
85
86
**Validation Usage Example:**
87
```java
88
@Plugin(type = Transform.PLUGIN_TYPE)
89
@Name("DataValidator")
90
public class DataValidatorTransform extends Transform<StructuredRecord, StructuredRecord> {
91
92
private final Config config;
93
94
@Override
95
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
96
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
97
FailureCollector collector = stageConfigurer.getFailureCollector();
98
99
// Validate configuration
100
validateConfig(collector);
101
102
// Validate input schema
103
Schema inputSchema = stageConfigurer.getInputSchema();
104
if (inputSchema != null) {
105
validateInputSchema(inputSchema, collector);
106
107
// Set output schema if validation passes
108
if (collector.getValidationFailures().isEmpty()) {
109
Schema outputSchema = buildOutputSchema(inputSchema);
110
stageConfigurer.setOutputSchema(outputSchema);
111
}
112
}
113
}
114
115
private void validateConfig(FailureCollector collector) {
116
// Validate required fields
117
if (config.requiredFields == null || config.requiredFields.isEmpty()) {
118
collector.addFailure("Required fields must be specified",
119
"Provide comma-separated list of required field names")
120
.withConfigProperty("requiredFields");
121
}
122
123
// Validate field patterns
124
if (config.fieldPattern != null && !config.fieldPattern.isEmpty()) {
125
try {
126
Pattern.compile(config.fieldPattern);
127
} catch (PatternSyntaxException e) {
128
collector.addFailure("Invalid field pattern: " + e.getMessage(),
129
"Provide valid regular expression")
130
.withConfigProperty("fieldPattern")
131
.withCause(new Cause(CauseAttributes.STAGE_CONFIG));
132
}
133
}
134
135
// Validate numeric ranges
136
if (config.minValue != null && config.maxValue != null &&
137
config.minValue > config.maxValue) {
138
collector.addFailure("Minimum value cannot be greater than maximum value",
139
"Ensure minValue <= maxValue")
140
.withConfigProperty("minValue")
141
.withConfigProperty("maxValue");
142
}
143
}
144
145
private void validateInputSchema(Schema inputSchema, FailureCollector collector) {
146
// Check required fields exist
147
for (String requiredField : config.requiredFields) {
148
Schema.Field field = inputSchema.getField(requiredField);
149
if (field == null) {
150
collector.addFailure("Required field not found: " + requiredField,
151
"Add field to input schema or update configuration")
152
.withInputSchemaField(requiredField);
153
} else {
154
// Validate field types
155
validateFieldType(field, collector);
156
}
157
}
158
159
// Check for unsupported field types
160
for (Schema.Field field : inputSchema.getFields()) {
161
if (field.getSchema().getType() == Schema.Type.UNION) {
162
Schema nonNullSchema = field.getSchema().isNullable() ?
163
field.getSchema().getNonNullable() : field.getSchema();
164
165
if (nonNullSchema.getType() == Schema.Type.MAP ||
166
nonNullSchema.getType() == Schema.Type.ARRAY) {
167
collector.addFailure("Unsupported field type: " + nonNullSchema.getType(),
168
"Use simple types or flatten complex structures")
169
.withInputSchemaField(field.getName());
170
}
171
}
172
}
173
}
174
175
private void validateFieldType(Schema.Field field, FailureCollector collector) {
176
Schema fieldSchema = field.getSchema().isNullable() ?
177
field.getSchema().getNonNullable() : field.getSchema();
178
179
String fieldName = field.getName();
180
181
// Check if field type is supported for validation
182
switch (fieldSchema.getType()) {
183
case STRING:
184
if (config.stringValidation != null) {
185
validateStringField(fieldName, config.stringValidation, collector);
186
}
187
break;
188
case INT:
189
case LONG:
190
case FLOAT:
191
case DOUBLE:
192
if (config.numericValidation != null) {
193
validateNumericField(fieldName, fieldSchema.getType(),
194
config.numericValidation, collector);
195
}
196
break;
197
case BOOLEAN:
198
// Boolean validation (if needed)
199
break;
200
default:
201
if (config.strictTypeChecking) {
202
collector.addFailure("Unsupported field type for validation: " +
203
fieldSchema.getType(),
204
"Use supported types or disable strict checking")
205
.withInputSchemaField(fieldName);
206
}
207
}
208
}
209
}
210
```
211
212
## Format Validation
213
214
### ValidatingInputFormat
215
216
Input format with validation capabilities for file-based sources.
217
218
```java { .api }
219
package io.cdap.cdap.etl.api.validation;
220
221
public interface ValidatingInputFormat extends InputFormatProvider {
222
public static final String PLUGIN_TYPE = "validatingInputFormat";
223
}
224
```
225
226
### ValidatingOutputFormat
227
228
Output format with validation capabilities for file-based sinks.
229
230
```java { .api }
231
package io.cdap.cdap.etl.api.validation;
232
233
public interface ValidatingOutputFormat extends OutputFormatProvider {
234
public static final String PLUGIN_TYPE = "validatingOutputFormat";
235
}
236
```
237
238
**Format Validation Example:**
239
```java
240
@Plugin(type = ValidatingInputFormat.PLUGIN_TYPE)
241
@Name("ValidatingCSVFormat")
242
public class ValidatingCSVInputFormat implements ValidatingInputFormat {
243
244
private final Config config;
245
246
@Override
247
public String getInputFormatClassName() {
248
return TextInputFormat.class.getName();
249
}
250
251
@Override
252
public Map<String, String> getInputFormatConfiguration() {
253
Map<String, String> conf = new HashMap<>();
254
// Configure input format
255
return conf;
256
}
257
258
public void validateFormat(FormatContext context) throws ValidationException {
259
FailureCollector collector = context.getFailureCollector();
260
Schema inputSchema = context.getInputSchema();
261
262
if (inputSchema == null) {
263
collector.addFailure("Input schema is required for CSV validation",
264
"Define input schema");
265
return;
266
}
267
268
// Validate CSV configuration
269
validateCSVConfig(collector);
270
271
// Validate schema compatibility
272
validateSchemaForCSV(inputSchema, collector);
273
274
if (!collector.getValidationFailures().isEmpty()) {
275
throw new ValidationException(collector.getValidationFailures());
276
}
277
}
278
279
private void validateCSVConfig(FailureCollector collector) {
280
if (config.delimiter == null || config.delimiter.isEmpty()) {
281
collector.addFailure("CSV delimiter is required", "Specify field delimiter")
282
.withConfigProperty("delimiter");
283
} else if (config.delimiter.length() > 1) {
284
collector.addFailure("CSV delimiter must be single character",
285
"Use single character delimiter")
286
.withConfigProperty("delimiter");
287
}
288
289
if (config.skipHeader && config.headerLine < 0) {
290
collector.addFailure("Invalid header line number",
291
"Header line must be non-negative")
292
.withConfigProperty("headerLine");
293
}
294
}
295
296
private void validateSchemaForCSV(Schema schema, FailureCollector collector) {
297
for (Schema.Field field : schema.getFields()) {
298
Schema fieldSchema = field.getSchema().isNullable() ?
299
field.getSchema().getNonNullable() : field.getSchema();
300
301
// Check if field type is supported in CSV
302
if (fieldSchema.getType() == Schema.Type.ARRAY ||
303
fieldSchema.getType() == Schema.Type.MAP ||
304
fieldSchema.getType() == Schema.Type.RECORD) {
305
collector.addFailure("Complex type not supported in CSV: " + fieldSchema.getType(),
306
"Use simple types for CSV format")
307
.withInputSchemaField(field.getName());
308
}
309
}
310
}
311
}
312
```
313
314
## File Input Validation
315
316
### InputFiles
317
318
Collection interface for input files to be validated.
319
320
```java { .api }
321
package io.cdap.cdap.etl.api.validation;
322
323
public interface InputFiles extends Iterable<InputFile> {
324
// Provides iteration over input files for validation
325
}
326
```
327
328
### InputFile
329
330
Individual input file interface for validation operations.
331
332
```java { .api }
333
package io.cdap.cdap.etl.api.validation;
334
335
public interface InputFile {
336
/**
337
* Get file name.
338
*/
339
String getName();
340
341
/**
342
* Get file size in bytes.
343
*/
344
long getSize();
345
346
/**
347
* Open file for reading.
348
*/
349
SeekableInputStream open() throws IOException;
350
}
351
```
352
353
### SeekableInputStream
354
355
Abstract seekable input stream for file validation.
356
357
```java { .api }
358
package io.cdap.cdap.etl.api.validation;
359
360
public abstract class SeekableInputStream extends InputStream {
361
/**
362
* Seek to position in stream.
363
*/
364
public abstract void seek(long pos) throws IOException;
365
366
/**
367
* Get current position in stream.
368
*/
369
public abstract long getPos() throws IOException;
370
}
371
```
372
373
### DelegatingSeekableInputStream
374
375
Delegating implementation of seekable input stream.
376
377
```java { .api }
378
package io.cdap.cdap.etl.api.validation;
379
380
public class DelegatingSeekableInputStream extends SeekableInputStream {
381
// Delegates to underlying seekable stream implementation
382
}
383
```
384
385
**File Validation Example:**
386
```java
387
public class FileFormatValidator {
388
389
public static void validateJSONFiles(InputFiles inputFiles, FailureCollector collector) {
390
for (InputFile inputFile : inputFiles) {
391
try (SeekableInputStream stream = inputFile.open()) {
392
validateJSONFile(inputFile.getName(), stream, collector);
393
} catch (IOException e) {
394
collector.addFailure("Failed to read file: " + inputFile.getName(),
395
"Check file permissions and format")
396
.withCause(new Cause(CauseAttributes.IO_ERROR));
397
}
398
}
399
}
400
401
private static void validateJSONFile(String fileName, SeekableInputStream stream,
402
FailureCollector collector) throws IOException {
403
// Sample first 1MB for validation
404
byte[] buffer = new byte[1024 * 1024];
405
int bytesRead = stream.read(buffer);
406
407
if (bytesRead <= 0) {
408
collector.addFailure("Empty file: " + fileName, "Provide non-empty JSON file");
409
return;
410
}
411
412
String content = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
413
414
// Validate JSON structure
415
try {
416
JsonParser parser = new JsonParser();
417
JsonElement element = parser.parse(content);
418
419
if (!element.isJsonObject() && !element.isJsonArray()) {
420
collector.addFailure("Invalid JSON structure in: " + fileName,
421
"File must contain JSON object or array");
422
}
423
} catch (JsonSyntaxException e) {
424
collector.addFailure("Invalid JSON syntax in: " + fileName + " - " + e.getMessage(),
425
"Fix JSON syntax errors");
426
}
427
428
// Reset stream position
429
stream.seek(0);
430
}
431
}
432
```
433
434
## Validation Context and Configuration
435
436
### FormatContext
437
438
Context for format validation operations.
439
440
```java { .api }
441
package io.cdap.cdap.etl.api.validation;
442
443
public class FormatContext {
444
/**
445
* Create format context with collector and input schema.
446
*/
447
public FormatContext(FailureCollector collector, @Nullable Schema inputSchema) {}
448
449
/**
450
* Get failure collector.
451
*/
452
public FailureCollector getFailureCollector() {}
453
454
/**
455
* Get input schema.
456
*/
457
@Nullable
458
public Schema getInputSchema() {}
459
}
460
```
461
462
## Validation Exceptions
463
464
### InvalidStageException
465
466
Exception for invalid stage configuration.
467
468
```java { .api }
469
package io.cdap.cdap.etl.api.validation;
470
471
public class InvalidStageException extends Exception {
472
/**
473
* Create exception with message.
474
*/
475
public InvalidStageException(String message) {}
476
477
/**
478
* Create exception with message and cause.
479
*/
480
public InvalidStageException(String message, Throwable cause) {}
481
}
482
```
483
484
### InvalidConfigPropertyException
485
486
Exception for invalid configuration properties.
487
488
```java { .api }
489
package io.cdap.cdap.etl.api.validation;
490
491
public class InvalidConfigPropertyException extends InvalidStageException {
492
/**
493
* Create exception for invalid property.
494
*/
495
public InvalidConfigPropertyException(String message, String propertyName) {}
496
497
/**
498
* Get property name that caused the exception.
499
*/
500
public String getPropertyName() {}
501
}
502
```
503
504
## Validation Utilities
505
506
### CauseAttributes
507
508
Attributes for validation failure causes.
509
510
```java { .api }
511
package io.cdap.cdap.etl.api.validation;
512
513
public class CauseAttributes {
514
// Constants for different cause types
515
public static final String STAGE_CONFIG = "stageConfig";
516
public static final String INPUT_SCHEMA = "inputSchema";
517
public static final String OUTPUT_SCHEMA = "outputSchema";
518
public static final String IO_ERROR = "ioError";
519
public static final String NETWORK_ERROR = "networkError";
520
}
521
```
522
523
## Advanced Validation Patterns
524
525
### Schema Compatibility Validation
526
527
```java
528
public class SchemaValidator {
529
530
public static void validateSchemaCompatibility(Schema sourceSchema, Schema targetSchema,
531
FailureCollector collector) {
532
if (sourceSchema == null || targetSchema == null) {
533
collector.addFailure("Schema cannot be null", "Provide valid schema");
534
return;
535
}
536
537
// Check field compatibility
538
for (Schema.Field targetField : targetSchema.getFields()) {
539
String fieldName = targetField.getName();
540
Schema.Field sourceField = sourceSchema.getField(fieldName);
541
542
if (sourceField == null) {
543
if (!targetField.getSchema().isNullable()) {
544
collector.addFailure("Required field missing in source: " + fieldName,
545
"Add field to source or make target field nullable")
546
.withInputSchemaField(fieldName)
547
.withOutputSchemaField(fieldName);
548
}
549
} else {
550
validateFieldCompatibility(sourceField, targetField, collector);
551
}
552
}
553
}
554
555
private static void validateFieldCompatibility(Schema.Field sourceField,
556
Schema.Field targetField,
557
FailureCollector collector) {
558
String fieldName = sourceField.getName();
559
Schema sourceType = sourceField.getSchema().isNullable() ?
560
sourceField.getSchema().getNonNullable() : sourceField.getSchema();
561
Schema targetType = targetField.getSchema().isNullable() ?
562
targetField.getSchema().getNonNullable() : targetField.getSchema();
563
564
if (!isCompatibleType(sourceType, targetType)) {
565
collector.addFailure("Incompatible field types for: " + fieldName +
566
" (source: " + sourceType.getType() +
567
", target: " + targetType.getType() + ")",
568
"Convert field type or update schema")
569
.withInputSchemaField(fieldName)
570
.withOutputSchemaField(fieldName);
571
}
572
573
// Check nullability
574
if (!sourceField.getSchema().isNullable() && targetField.getSchema().isNullable()) {
575
// This is fine - non-null to nullable is safe
576
} else if (sourceField.getSchema().isNullable() && !targetField.getSchema().isNullable()) {
577
collector.addFailure("Cannot convert nullable field to non-nullable: " + fieldName,
578
"Make target field nullable or add null handling")
579
.withInputSchemaField(fieldName)
580
.withOutputSchemaField(fieldName);
581
}
582
}
583
584
private static boolean isCompatibleType(Schema sourceType, Schema targetType) {
585
if (sourceType.getType() == targetType.getType()) {
586
return true;
587
}
588
589
// Check for safe type conversions
590
switch (sourceType.getType()) {
591
case INT:
592
return targetType.getType() == Schema.Type.LONG ||
593
targetType.getType() == Schema.Type.FLOAT ||
594
targetType.getType() == Schema.Type.DOUBLE ||
595
targetType.getType() == Schema.Type.STRING;
596
case LONG:
597
return targetType.getType() == Schema.Type.FLOAT ||
598
targetType.getType() == Schema.Type.DOUBLE ||
599
targetType.getType() == Schema.Type.STRING;
600
case FLOAT:
601
return targetType.getType() == Schema.Type.DOUBLE ||
602
targetType.getType() == Schema.Type.STRING;
603
case DOUBLE:
604
return targetType.getType() == Schema.Type.STRING;
605
case BOOLEAN:
606
return targetType.getType() == Schema.Type.STRING;
607
default:
608
return false;
609
}
610
}
611
}
612
```
613
614
### Configuration Validation Patterns
615
616
```java
617
public abstract class BaseValidatedPlugin {
618
619
protected void validateRequiredProperty(String propertyValue, String propertyName,
620
FailureCollector collector) {
621
if (propertyValue == null || propertyValue.trim().isEmpty()) {
622
collector.addFailure("Property is required: " + propertyName,
623
"Provide value for " + propertyName)
624
.withConfigProperty(propertyName);
625
}
626
}
627
628
protected void validateNumericRange(String value, String propertyName,
629
long minValue, long maxValue,
630
FailureCollector collector) {
631
if (value == null || value.trim().isEmpty()) {
632
return; // Let required validation handle null/empty
633
}
634
635
try {
636
long numValue = Long.parseLong(value.trim());
637
if (numValue < minValue || numValue > maxValue) {
638
collector.addFailure("Property value out of range: " + propertyName +
639
" (valid range: " + minValue + "-" + maxValue + ")",
640
"Set value between " + minValue + " and " + maxValue)
641
.withConfigProperty(propertyName);
642
}
643
} catch (NumberFormatException e) {
644
collector.addFailure("Invalid numeric value for: " + propertyName,
645
"Provide valid integer value")
646
.withConfigProperty(propertyName);
647
}
648
}
649
650
protected void validateRegexPattern(String pattern, String propertyName,
651
FailureCollector collector) {
652
if (pattern == null || pattern.trim().isEmpty()) {
653
return;
654
}
655
656
try {
657
Pattern.compile(pattern);
658
} catch (PatternSyntaxException e) {
659
collector.addFailure("Invalid regex pattern for: " + propertyName +
660
" - " + e.getMessage(),
661
"Provide valid regular expression")
662
.withConfigProperty(propertyName)
663
.withCause(new Cause(CauseAttributes.STAGE_CONFIG));
664
}
665
}
666
667
protected void validateConnectionString(String connectionString, String propertyName,
668
FailureCollector collector) {
669
if (connectionString == null || connectionString.trim().isEmpty()) {
670
collector.addFailure("Connection string is required: " + propertyName,
671
"Provide valid connection string")
672
.withConfigProperty(propertyName);
673
return;
674
}
675
676
try {
677
// Basic URL validation
678
new URL(connectionString);
679
} catch (MalformedURLException e) {
680
// Try as JDBC URL
681
if (!connectionString.startsWith("jdbc:")) {
682
collector.addFailure("Invalid connection string format: " + propertyName,
683
"Provide valid URL or JDBC connection string")
684
.withConfigProperty(propertyName);
685
}
686
}
687
}
688
}
689
```