or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actions-conditions.mdbatch-processing.mdcore-pipeline.mddata-connectors.mdindex.mdjoin-operations.mdlineage-metadata.mdsql-engine.mdvalidation.md

validation.mddocs/

0

# Validation Framework

1

2

Comprehensive validation system for early error detection, structured error reporting, and configuration validation in CDAP ETL pipelines.

3

4

## Core Validation Interfaces

5

6

### ValidationException

7

8

Exception class for validation failures with structured error information.

9

10

```java { .api }

11

package io.cdap.cdap.etl.api.validation;

12

13

public class ValidationException extends Exception implements FailureDetailsProvider {

14

/**

15

* Create validation exception with list of failures.

16

*/

17

public ValidationException(List<ValidationFailure> failures) {}

18

19

/**

20

* Get validation failures.

21

*/

22

public List<ValidationFailure> getFailures() {}

23

}

24

```

25

26

### ValidationFailure

27

28

Individual validation failure with detailed error information.

29

30

```java { .api }

31

package io.cdap.cdap.etl.api.validation;

32

33

public class ValidationFailure {

34

/**

35

* Create validation failure with message.

36

*/

37

public ValidationFailure(String message) {}

38

39

/**

40

* Add cause information to failure.

41

*/

42

public ValidationFailure withCause(Cause cause) {}

43

44

/**

45

* Associate failure with configuration property.

46

*/

47

public ValidationFailure withConfigProperty(String stageConfigProperty) {}

48

49

/**

50

* Associate failure with input schema field.

51

*/

52

public ValidationFailure withInputSchemaField(String fieldName) {}

53

54

/**

55

* Associate failure with output schema field.

56

*/

57

public ValidationFailure withOutputSchemaField(String fieldName) {}

58

59

/**

60

* Add corrective action suggestion.

61

*/

62

public ValidationFailure withCorrectiveAction(String correctiveAction) {}

63

}

64

```

65

66

### FailureCollector

67

68

Interface for collecting validation failures during pipeline configuration.

69

70

```java { .api }

71

package io.cdap.cdap.etl.api;

72

73

public interface FailureCollector {

74

/**

75

* Add validation failure with message and corrective action.

76

*/

77

ValidationFailure addFailure(String message, @Nullable String correctiveAction);

78

79

/**

80

* Get all collected validation failures.

81

*/

82

List<ValidationFailure> getValidationFailures();

83

}

84

```

85

86

**Validation Usage Example:**

87

```java

88

@Plugin(type = Transform.PLUGIN_TYPE)

89

@Name("DataValidator")

90

public class DataValidatorTransform extends Transform<StructuredRecord, StructuredRecord> {

91

92

private final Config config;

93

94

@Override

95

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

96

StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();

97

FailureCollector collector = stageConfigurer.getFailureCollector();

98

99

// Validate configuration

100

validateConfig(collector);

101

102

// Validate input schema

103

Schema inputSchema = stageConfigurer.getInputSchema();

104

if (inputSchema != null) {

105

validateInputSchema(inputSchema, collector);

106

107

// Set output schema if validation passes

108

if (collector.getValidationFailures().isEmpty()) {

109

Schema outputSchema = buildOutputSchema(inputSchema);

110

stageConfigurer.setOutputSchema(outputSchema);

111

}

112

}

113

}

114

115

private void validateConfig(FailureCollector collector) {

116

// Validate required fields

117

if (config.requiredFields == null || config.requiredFields.isEmpty()) {

118

collector.addFailure("Required fields must be specified",

119

"Provide comma-separated list of required field names")

120

.withConfigProperty("requiredFields");

121

}

122

123

// Validate field patterns

124

if (config.fieldPattern != null && !config.fieldPattern.isEmpty()) {

125

try {

126

Pattern.compile(config.fieldPattern);

127

} catch (PatternSyntaxException e) {

128

collector.addFailure("Invalid field pattern: " + e.getMessage(),

129

"Provide valid regular expression")

130

.withConfigProperty("fieldPattern")

131

.withCause(new Cause(CauseAttributes.STAGE_CONFIG));

132

}

133

}

134

135

// Validate numeric ranges

136

if (config.minValue != null && config.maxValue != null &&

137

config.minValue > config.maxValue) {

138

collector.addFailure("Minimum value cannot be greater than maximum value",

139

"Ensure minValue <= maxValue")

140

.withConfigProperty("minValue")

141

.withConfigProperty("maxValue");

142

}

143

}

144

145

private void validateInputSchema(Schema inputSchema, FailureCollector collector) {

146

// Check required fields exist

147

for (String requiredField : config.requiredFields) {

148

Schema.Field field = inputSchema.getField(requiredField);

149

if (field == null) {

150

collector.addFailure("Required field not found: " + requiredField,

151

"Add field to input schema or update configuration")

152

.withInputSchemaField(requiredField);

153

} else {

154

// Validate field types

155

validateFieldType(field, collector);

156

}

157

}

158

159

// Check for unsupported field types

160

for (Schema.Field field : inputSchema.getFields()) {

161

if (field.getSchema().getType() == Schema.Type.UNION) {

162

Schema nonNullSchema = field.getSchema().isNullable() ?

163

field.getSchema().getNonNullable() : field.getSchema();

164

165

if (nonNullSchema.getType() == Schema.Type.MAP ||

166

nonNullSchema.getType() == Schema.Type.ARRAY) {

167

collector.addFailure("Unsupported field type: " + nonNullSchema.getType(),

168

"Use simple types or flatten complex structures")

169

.withInputSchemaField(field.getName());

170

}

171

}

172

}

173

}

174

175

private void validateFieldType(Schema.Field field, FailureCollector collector) {

176

Schema fieldSchema = field.getSchema().isNullable() ?

177

field.getSchema().getNonNullable() : field.getSchema();

178

179

String fieldName = field.getName();

180

181

// Check if field type is supported for validation

182

switch (fieldSchema.getType()) {

183

case STRING:

184

if (config.stringValidation != null) {

185

validateStringField(fieldName, config.stringValidation, collector);

186

}

187

break;

188

case INT:

189

case LONG:

190

case FLOAT:

191

case DOUBLE:

192

if (config.numericValidation != null) {

193

validateNumericField(fieldName, fieldSchema.getType(),

194

config.numericValidation, collector);

195

}

196

break;

197

case BOOLEAN:

198

// Boolean validation (if needed)

199

break;

200

default:

201

if (config.strictTypeChecking) {

202

collector.addFailure("Unsupported field type for validation: " +

203

fieldSchema.getType(),

204

"Use supported types or disable strict checking")

205

.withInputSchemaField(fieldName);

206

}

207

}

208

}

209

}

210

```

211

212

## Format Validation

213

214

### ValidatingInputFormat

215

216

Input format with validation capabilities for file-based sources.

217

218

```java { .api }

219

package io.cdap.cdap.etl.api.validation;

220

221

public interface ValidatingInputFormat extends InputFormatProvider {

222

public static final String PLUGIN_TYPE = "validatingInputFormat";

223

}

224

```

225

226

### ValidatingOutputFormat

227

228

Output format with validation capabilities for file-based sinks.

229

230

```java { .api }

231

package io.cdap.cdap.etl.api.validation;

232

233

public interface ValidatingOutputFormat extends OutputFormatProvider {

234

public static final String PLUGIN_TYPE = "validatingOutputFormat";

235

}

236

```

237

238

**Format Validation Example:**

239

```java

240

@Plugin(type = ValidatingInputFormat.PLUGIN_TYPE)

241

@Name("ValidatingCSVFormat")

242

public class ValidatingCSVInputFormat implements ValidatingInputFormat {

243

244

private final Config config;

245

246

@Override

247

public String getInputFormatClassName() {

248

return TextInputFormat.class.getName();

249

}

250

251

@Override

252

public Map<String, String> getInputFormatConfiguration() {

253

Map<String, String> conf = new HashMap<>();

254

// Configure input format

255

return conf;

256

}

257

258

public void validateFormat(FormatContext context) throws ValidationException {

259

FailureCollector collector = context.getFailureCollector();

260

Schema inputSchema = context.getInputSchema();

261

262

if (inputSchema == null) {

263

collector.addFailure("Input schema is required for CSV validation",

264

"Define input schema");

265

return;

266

}

267

268

// Validate CSV configuration

269

validateCSVConfig(collector);

270

271

// Validate schema compatibility

272

validateSchemaForCSV(inputSchema, collector);

273

274

if (!collector.getValidationFailures().isEmpty()) {

275

throw new ValidationException(collector.getValidationFailures());

276

}

277

}

278

279

private void validateCSVConfig(FailureCollector collector) {

280

if (config.delimiter == null || config.delimiter.isEmpty()) {

281

collector.addFailure("CSV delimiter is required", "Specify field delimiter")

282

.withConfigProperty("delimiter");

283

} else if (config.delimiter.length() > 1) {

284

collector.addFailure("CSV delimiter must be single character",

285

"Use single character delimiter")

286

.withConfigProperty("delimiter");

287

}

288

289

if (config.skipHeader && config.headerLine < 0) {

290

collector.addFailure("Invalid header line number",

291

"Header line must be non-negative")

292

.withConfigProperty("headerLine");

293

}

294

}

295

296

private void validateSchemaForCSV(Schema schema, FailureCollector collector) {

297

for (Schema.Field field : schema.getFields()) {

298

Schema fieldSchema = field.getSchema().isNullable() ?

299

field.getSchema().getNonNullable() : field.getSchema();

300

301

// Check if field type is supported in CSV

302

if (fieldSchema.getType() == Schema.Type.ARRAY ||

303

fieldSchema.getType() == Schema.Type.MAP ||

304

fieldSchema.getType() == Schema.Type.RECORD) {

305

collector.addFailure("Complex type not supported in CSV: " + fieldSchema.getType(),

306

"Use simple types for CSV format")

307

.withInputSchemaField(field.getName());

308

}

309

}

310

}

311

}

312

```

313

314

## File Input Validation

315

316

### InputFiles

317

318

Collection interface for input files to be validated.

319

320

```java { .api }

321

package io.cdap.cdap.etl.api.validation;

322

323

public interface InputFiles extends Iterable<InputFile> {

324

// Provides iteration over input files for validation

325

}

326

```

327

328

### InputFile

329

330

Individual input file interface for validation operations.

331

332

```java { .api }

333

package io.cdap.cdap.etl.api.validation;

334

335

public interface InputFile {

336

/**

337

* Get file name.

338

*/

339

String getName();

340

341

/**

342

* Get file size in bytes.

343

*/

344

long getSize();

345

346

/**

347

* Open file for reading.

348

*/

349

SeekableInputStream open() throws IOException;

350

}

351

```

352

353

### SeekableInputStream

354

355

Abstract seekable input stream for file validation.

356

357

```java { .api }

358

package io.cdap.cdap.etl.api.validation;

359

360

public abstract class SeekableInputStream extends InputStream {

361

/**

362

* Seek to position in stream.

363

*/

364

public abstract void seek(long pos) throws IOException;

365

366

/**

367

* Get current position in stream.

368

*/

369

public abstract long getPos() throws IOException;

370

}

371

```

372

373

### DelegatingSeekableInputStream

374

375

Delegating implementation of seekable input stream.

376

377

```java { .api }

378

package io.cdap.cdap.etl.api.validation;

379

380

public class DelegatingSeekableInputStream extends SeekableInputStream {

381

// Delegates to underlying seekable stream implementation

382

}

383

```

384

385

**File Validation Example:**

386

```java

387

public class FileFormatValidator {

388

389

public static void validateJSONFiles(InputFiles inputFiles, FailureCollector collector) {

390

for (InputFile inputFile : inputFiles) {

391

try (SeekableInputStream stream = inputFile.open()) {

392

validateJSONFile(inputFile.getName(), stream, collector);

393

} catch (IOException e) {

394

collector.addFailure("Failed to read file: " + inputFile.getName(),

395

"Check file permissions and format")

396

.withCause(new Cause(CauseAttributes.IO_ERROR));

397

}

398

}

399

}

400

401

private static void validateJSONFile(String fileName, SeekableInputStream stream,

402

FailureCollector collector) throws IOException {

403

// Sample first 1MB for validation

404

byte[] buffer = new byte[1024 * 1024];

405

int bytesRead = stream.read(buffer);

406

407

if (bytesRead <= 0) {

408

collector.addFailure("Empty file: " + fileName, "Provide non-empty JSON file");

409

return;

410

}

411

412

String content = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);

413

414

// Validate JSON structure

415

try {

416

JsonParser parser = new JsonParser();

417

JsonElement element = parser.parse(content);

418

419

if (!element.isJsonObject() && !element.isJsonArray()) {

420

collector.addFailure("Invalid JSON structure in: " + fileName,

421

"File must contain JSON object or array");

422

}

423

} catch (JsonSyntaxException e) {

424

collector.addFailure("Invalid JSON syntax in: " + fileName + " - " + e.getMessage(),

425

"Fix JSON syntax errors");

426

}

427

428

// Reset stream position

429

stream.seek(0);

430

}

431

}

432

```

433

434

## Validation Context and Configuration

435

436

### FormatContext

437

438

Context for format validation operations.

439

440

```java { .api }

441

package io.cdap.cdap.etl.api.validation;

442

443

public class FormatContext {

444

/**

445

* Create format context with collector and input schema.

446

*/

447

public FormatContext(FailureCollector collector, @Nullable Schema inputSchema) {}

448

449

/**

450

* Get failure collector.

451

*/

452

public FailureCollector getFailureCollector() {}

453

454

/**

455

* Get input schema.

456

*/

457

@Nullable

458

public Schema getInputSchema() {}

459

}

460

```

461

462

## Validation Exceptions

463

464

### InvalidStageException

465

466

Exception for invalid stage configuration.

467

468

```java { .api }

469

package io.cdap.cdap.etl.api.validation;

470

471

public class InvalidStageException extends Exception {

472

/**

473

* Create exception with message.

474

*/

475

public InvalidStageException(String message) {}

476

477

/**

478

* Create exception with message and cause.

479

*/

480

public InvalidStageException(String message, Throwable cause) {}

481

}

482

```

483

484

### InvalidConfigPropertyException

485

486

Exception for invalid configuration properties.

487

488

```java { .api }

489

package io.cdap.cdap.etl.api.validation;

490

491

public class InvalidConfigPropertyException extends InvalidStageException {

492

/**

493

* Create exception for invalid property.

494

*/

495

public InvalidConfigPropertyException(String message, String propertyName) {}

496

497

/**

498

* Get property name that caused the exception.

499

*/

500

public String getPropertyName() {}

501

}

502

```

503

504

## Validation Utilities

505

506

### CauseAttributes

507

508

Attributes for validation failure causes.

509

510

```java { .api }

511

package io.cdap.cdap.etl.api.validation;

512

513

public class CauseAttributes {

514

// Constants for different cause types

515

public static final String STAGE_CONFIG = "stageConfig";

516

public static final String INPUT_SCHEMA = "inputSchema";

517

public static final String OUTPUT_SCHEMA = "outputSchema";

518

public static final String IO_ERROR = "ioError";

519

public static final String NETWORK_ERROR = "networkError";

520

}

521

```

522

523

## Advanced Validation Patterns

524

525

### Schema Compatibility Validation

526

527

```java

528

public class SchemaValidator {

529

530

public static void validateSchemaCompatibility(Schema sourceSchema, Schema targetSchema,

531

FailureCollector collector) {

532

if (sourceSchema == null || targetSchema == null) {

533

collector.addFailure("Schema cannot be null", "Provide valid schema");

534

return;

535

}

536

537

// Check field compatibility

538

for (Schema.Field targetField : targetSchema.getFields()) {

539

String fieldName = targetField.getName();

540

Schema.Field sourceField = sourceSchema.getField(fieldName);

541

542

if (sourceField == null) {

543

if (!targetField.getSchema().isNullable()) {

544

collector.addFailure("Required field missing in source: " + fieldName,

545

"Add field to source or make target field nullable")

546

.withInputSchemaField(fieldName)

547

.withOutputSchemaField(fieldName);

548

}

549

} else {

550

validateFieldCompatibility(sourceField, targetField, collector);

551

}

552

}

553

}

554

555

private static void validateFieldCompatibility(Schema.Field sourceField,

556

Schema.Field targetField,

557

FailureCollector collector) {

558

String fieldName = sourceField.getName();

559

Schema sourceType = sourceField.getSchema().isNullable() ?

560

sourceField.getSchema().getNonNullable() : sourceField.getSchema();

561

Schema targetType = targetField.getSchema().isNullable() ?

562

targetField.getSchema().getNonNullable() : targetField.getSchema();

563

564

if (!isCompatibleType(sourceType, targetType)) {

565

collector.addFailure("Incompatible field types for: " + fieldName +

566

" (source: " + sourceType.getType() +

567

", target: " + targetType.getType() + ")",

568

"Convert field type or update schema")

569

.withInputSchemaField(fieldName)

570

.withOutputSchemaField(fieldName);

571

}

572

573

// Check nullability

574

if (!sourceField.getSchema().isNullable() && targetField.getSchema().isNullable()) {

575

// This is fine - non-null to nullable is safe

576

} else if (sourceField.getSchema().isNullable() && !targetField.getSchema().isNullable()) {

577

collector.addFailure("Cannot convert nullable field to non-nullable: " + fieldName,

578

"Make target field nullable or add null handling")

579

.withInputSchemaField(fieldName)

580

.withOutputSchemaField(fieldName);

581

}

582

}

583

584

private static boolean isCompatibleType(Schema sourceType, Schema targetType) {

585

if (sourceType.getType() == targetType.getType()) {

586

return true;

587

}

588

589

// Check for safe type conversions

590

switch (sourceType.getType()) {

591

case INT:

592

return targetType.getType() == Schema.Type.LONG ||

593

targetType.getType() == Schema.Type.FLOAT ||

594

targetType.getType() == Schema.Type.DOUBLE ||

595

targetType.getType() == Schema.Type.STRING;

596

case LONG:

597

return targetType.getType() == Schema.Type.FLOAT ||

598

targetType.getType() == Schema.Type.DOUBLE ||

599

targetType.getType() == Schema.Type.STRING;

600

case FLOAT:

601

return targetType.getType() == Schema.Type.DOUBLE ||

602

targetType.getType() == Schema.Type.STRING;

603

case DOUBLE:

604

return targetType.getType() == Schema.Type.STRING;

605

case BOOLEAN:

606

return targetType.getType() == Schema.Type.STRING;

607

default:

608

return false;

609

}

610

}

611

}

612

```

613

614

### Configuration Validation Patterns

615

616

```java

617

public abstract class BaseValidatedPlugin {

618

619

protected void validateRequiredProperty(String propertyValue, String propertyName,

620

FailureCollector collector) {

621

if (propertyValue == null || propertyValue.trim().isEmpty()) {

622

collector.addFailure("Property is required: " + propertyName,

623

"Provide value for " + propertyName)

624

.withConfigProperty(propertyName);

625

}

626

}

627

628

protected void validateNumericRange(String value, String propertyName,

629

long minValue, long maxValue,

630

FailureCollector collector) {

631

if (value == null || value.trim().isEmpty()) {

632

return; // Let required validation handle null/empty

633

}

634

635

try {

636

long numValue = Long.parseLong(value.trim());

637

if (numValue < minValue || numValue > maxValue) {

638

collector.addFailure("Property value out of range: " + propertyName +

639

" (valid range: " + minValue + "-" + maxValue + ")",

640

"Set value between " + minValue + " and " + maxValue)

641

.withConfigProperty(propertyName);

642

}

643

} catch (NumberFormatException e) {

644

collector.addFailure("Invalid numeric value for: " + propertyName,

645

"Provide valid integer value")

646

.withConfigProperty(propertyName);

647

}

648

}

649

650

protected void validateRegexPattern(String pattern, String propertyName,

651

FailureCollector collector) {

652

if (pattern == null || pattern.trim().isEmpty()) {

653

return;

654

}

655

656

try {

657

Pattern.compile(pattern);

658

} catch (PatternSyntaxException e) {

659

collector.addFailure("Invalid regex pattern for: " + propertyName +

660

" - " + e.getMessage(),

661

"Provide valid regular expression")

662

.withConfigProperty(propertyName)

663

.withCause(new Cause(CauseAttributes.STAGE_CONFIG));

664

}

665

}

666

667

protected void validateConnectionString(String connectionString, String propertyName,

668

FailureCollector collector) {

669

if (connectionString == null || connectionString.trim().isEmpty()) {

670

collector.addFailure("Connection string is required: " + propertyName,

671

"Provide valid connection string")

672

.withConfigProperty(propertyName);

673

return;

674

}

675

676

try {

677

// Basic URL validation

678

new URL(connectionString);

679

} catch (MalformedURLException e) {

680

// Try as JDBC URL

681

if (!connectionString.startsWith("jdbc:")) {

682

collector.addFailure("Invalid connection string format: " + propertyName,

683

"Provide valid URL or JDBC connection string")

684

.withConfigProperty(propertyName);

685

}

686

}

687

}

688

}

689

```