or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-framework.mddata-management.mddata-processing.mdindex.mdoperational.mdplugin-system.mdsecurity-metadata.md

plugin-system.mddocs/

0

# Plugin System

1

2

The CDAP Plugin System provides a powerful extensibility framework that allows developers to create reusable, configurable components for data processing pipelines. Plugins enable modular application development and promote code reuse across different applications and organizations.

3

4

## Plugin Architecture

5

6

### Core Plugin Interfaces

7

8

```java { .api }

9

import io.cdap.cdap.api.plugin.*;

10

import io.cdap.cdap.api.annotation.*;

11

12

// Plugin configurer interface

13

public interface PluginConfigurer {

14

<T> T usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties);

15

<T> T usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties,

16

PluginSelector selector);

17

<T> Class<T> usePluginClass(String pluginType, String pluginName, String pluginId,

18

PluginProperties properties);

19

<T> Class<T> usePluginClass(String pluginType, String pluginName, String pluginId,

20

PluginProperties properties, PluginSelector selector);

21

}

22

23

// Plugin runtime context

24

public interface PluginContext extends FeatureFlagsProvider {

25

<T> T newPluginInstance(String pluginId) throws InstantiationException;

26

<T> Class<T> loadPluginClass(String pluginId);

27

PluginProperties getPluginProperties(String pluginId);

28

Map<String, PluginProperties> getPlugins();

29

}

30

31

// Plugin metadata

32

public final class Plugin {

33

public static Plugin of(String type, String name, String pluginId, PluginProperties properties) {

34

/* create plugin instance */

35

}

36

37

public String getPluginType() { /* returns plugin type */ }

38

public String getPluginName() { /* returns plugin name */ }

39

public String getPluginId() { /* returns plugin ID */ }

40

public PluginProperties getProperties() { /* returns plugin properties */ }

41

public PluginSelector getSelector() { /* returns plugin selector */ }

42

}

43

```

44

45

### Plugin Properties and Configuration

46

47

```java { .api }

48

// Plugin properties container

49

public class PluginProperties implements Serializable {

50

public static Builder builder() { return new Builder(); }

51

public static PluginProperties of(Map<String, String> properties) { /* create from map */ }

52

53

public Map<String, String> getProperties() { /* returns properties map */ }

54

public String getProperty(String key) { /* returns property value */ }

55

public String getProperty(String key, String defaultValue) { /* returns property with default */ }

56

57

public static class Builder {

58

public Builder add(String key, String value) { /* add property */ return this; }

59

public Builder addAll(Map<String, String> properties) { /* add all properties */ return this; }

60

public PluginProperties build() { /* build properties */ }

61

}

62

}

63

64

// Base plugin configuration class

65

public abstract class PluginConfig extends Config implements Serializable {

66

// Base class for all plugin configurations

67

// Extend this class for typed plugin configurations

68

}

69

70

// Plugin class metadata

71

public class PluginClass {

72

public String getName() { /* returns plugin name */ }

73

public String getType() { /* returns plugin type */ }

74

public String getDescription() { /* returns plugin description */ }

75

public String getClassName() { /* returns plugin class name */ }

76

public String getCategory() { /* returns plugin category */ }

77

public Set<PluginPropertyField> getProperties() { /* returns plugin properties */ }

78

public Map<String, PluginPropertyField> getPropertiesMap() { /* returns properties as map */ }

79

public Requirements getRequirements() { /* returns plugin requirements */ }

80

}

81

82

// Plugin property field metadata

83

public class PluginPropertyField {

84

public String getName() { /* returns field name */ }

85

public String getDescription() { /* returns field description */ }

86

public String getType() { /* returns field type */ }

87

public boolean isRequired() { /* returns if field is required */ }

88

public boolean isMacroSupported() { /* returns if macros are supported */ }

89

public boolean isMacroEscapingEnabled() { /* returns if macro escaping is enabled */ }

90

public Set<String> getChildren() { /* returns child field names */ }

91

}

92

```

93

94

### Plugin Annotations

95

96

```java { .api }

97

// Core plugin annotations

98

@Plugin(type = "source") // Marks a class as a plugin of specific type

99

@Name("MySourcePlugin") // Specifies the plugin name

100

@Description("Reads data from external source") // Provides plugin description

101

@Category("source") // Categorizes the plugin

102

103

// Property annotations

104

@Property // Marks fields as configuration properties

105

@Macro // Enables macro substitution in field values

106

@Description("Input path for data files") // Describes configuration properties

107

108

// Metadata annotations

109

@Metadata(properties = {

110

@MetadataProperty(key = "doc.url", value = "https://example.com/docs"),

111

@MetadataProperty(key = "author", value = "Data Team")

112

})

113

```

114

115

## Plugin Types and Development

116

117

### Source Plugins

118

119

Source plugins read data from external systems:

120

121

```java { .api }

122

// Source plugin configuration

123

public class FileSourceConfig extends PluginConfig {

124

@Name("path")

125

@Description("Path to input files")

126

@Macro

127

@Property

128

private String path;

129

130

@Name("format")

131

@Description("File format (json, csv, avro, parquet)")

132

@Property

133

private String format = "json";

134

135

@Name("schema")

136

@Description("Schema of the input data")

137

@Property

138

private String schema;

139

140

@Name("recursive")

141

@Description("Whether to read files recursively")

142

@Property

143

private Boolean recursive = false;

144

145

// Getters and validation methods

146

public String getPath() { return path; }

147

public String getFormat() { return format; }

148

public String getSchema() { return schema; }

149

public Boolean getRecursive() { return recursive; }

150

151

public void validate() {

152

if (path == null || path.isEmpty()) {

153

throw new IllegalArgumentException("Path cannot be empty");

154

}

155

if (!Arrays.asList("json", "csv", "avro", "parquet").contains(format)) {

156

throw new IllegalArgumentException("Unsupported format: " + format);

157

}

158

}

159

}

160

161

// Source plugin implementation

162

@Plugin(type = "batchsource")

163

@Name("FileSource")

164

@Description("Reads data from files in various formats")

165

@Category("source")

166

@Metadata(properties = {

167

@MetadataProperty(key = "doc.url", value = "https://docs.example.com/plugins/file-source")

168

})

169

public class FileSourcePlugin extends BatchSource<NullWritable, Text, StructuredRecord> {

170

171

private final FileSourceConfig config;

172

173

public FileSourcePlugin(FileSourceConfig config) {

174

this.config = config;

175

}

176

177

@Override

178

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

179

// Validate configuration

180

config.validate();

181

182

// Set output schema

183

try {

184

Schema outputSchema = Schema.parseJson(config.getSchema());

185

pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);

186

} catch (IOException e) {

187

throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e);

188

}

189

}

190

191

@Override

192

public void prepareRun(BatchSourceContext context) throws Exception {

193

// Prepare the source for execution

194

Job job = context.getHadoopJob();

195

196

// Configure input format based on file type

197

switch (config.getFormat().toLowerCase()) {

198

case "json":

199

job.setInputFormatClass(TextInputFormat.class);

200

break;

201

case "csv":

202

job.setInputFormatClass(TextInputFormat.class);

203

break;

204

case "avro":

205

job.setInputFormatClass(AvroKeyInputFormat.class);

206

break;

207

case "parquet":

208

job.setInputFormatClass(ParquetInputFormat.class);

209

break;

210

default:

211

throw new IllegalArgumentException("Unsupported format: " + config.getFormat());

212

}

213

214

// Set input path

215

FileInputFormat.addInputPath(job, new Path(config.getPath()));

216

217

// Configure recursive search if enabled

218

if (config.getRecursive()) {

219

FileInputFormat.setInputDirRecursive(job, true);

220

}

221

}

222

223

@Override

224

public void transform(KeyValue<NullWritable, Text> input, Emitter<StructuredRecord> emitter) throws Exception {

225

String line = input.getValue().toString();

226

227

// Parse based on format

228

StructuredRecord record = parseRecord(line, config.getFormat(), config.getSchema());

229

if (record != null) {

230

emitter.emit(record);

231

}

232

}

233

234

private StructuredRecord parseRecord(String line, String format, String schemaStr) throws IOException {

235

Schema schema = Schema.parseJson(schemaStr);

236

237

switch (format.toLowerCase()) {

238

case "json":

239

return parseJsonRecord(line, schema);

240

case "csv":

241

return parseCsvRecord(line, schema);

242

default:

243

throw new UnsupportedOperationException("Format not supported in transform: " + format);

244

}

245

}

246

247

private StructuredRecord parseJsonRecord(String jsonLine, Schema schema) {

248

try {

249

JsonObject json = new JsonParser().parse(jsonLine).getAsJsonObject();

250

StructuredRecord.Builder builder = StructuredRecord.builder(schema);

251

252

for (Schema.Field field : schema.getFields()) {

253

String fieldName = field.getName();

254

if (json.has(fieldName) && !json.get(fieldName).isJsonNull()) {

255

Object value = parseJsonValue(json.get(fieldName), field.getSchema());

256

builder.set(fieldName, value);

257

}

258

}

259

260

return builder.build();

261

} catch (Exception e) {

262

// Log error and skip malformed records

263

LOG.warn("Failed to parse JSON record: {}", jsonLine, e);

264

return null;

265

}

266

}

267

268

private Object parseJsonValue(JsonElement element, Schema fieldSchema) {

269

Schema.Type type = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getType() : fieldSchema.getType();

270

271

switch (type) {

272

case STRING:

273

return element.getAsString();

274

case INT:

275

return element.getAsInt();

276

case LONG:

277

return element.getAsLong();

278

case DOUBLE:

279

return element.getAsDouble();

280

case BOOLEAN:

281

return element.getAsBoolean();

282

default:

283

return element.getAsString();

284

}

285

}

286

}

287

```

288

289

### Transform Plugins

290

291

Transform plugins process and modify data:

292

293

```java { .api }

294

// Transform plugin configuration

295

public class DataCleaningConfig extends PluginConfig {

296

@Name("fieldsToClean")

297

@Description("Comma-separated list of fields to clean")

298

@Property

299

private String fieldsToClean;

300

301

@Name("removeNulls")

302

@Description("Whether to remove records with null values")

303

@Property

304

private Boolean removeNulls = true;

305

306

@Name("trimWhitespace")

307

@Description("Whether to trim whitespace from string fields")

308

@Property

309

private Boolean trimWhitespace = true;

310

311

@Name("lowercaseStrings")

312

@Description("Whether to convert strings to lowercase")

313

@Property

314

private Boolean lowercaseStrings = false;

315

316

public List<String> getFieldsToClean() {

317

if (fieldsToClean == null || fieldsToClean.isEmpty()) {

318

return Collections.emptyList();

319

}

320

return Arrays.asList(fieldsToClean.split(","))

321

.stream()

322

.map(String::trim)

323

.collect(Collectors.toList());

324

}

325

326

// Other getters...

327

}

328

329

// Transform plugin implementation

330

@Plugin(type = "transform")

331

@Name("DataCleaning")

332

@Description("Cleans and standardizes data fields")

333

@Category("cleansing")

334

public class DataCleaningPlugin extends Transform<StructuredRecord, StructuredRecord> {

335

336

private final DataCleaningConfig config;

337

private List<String> fieldsToClean;

338

private Schema outputSchema;

339

340

public DataCleaningPlugin(DataCleaningConfig config) {

341

this.config = config;

342

}

343

344

@Override

345

public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {

346

StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();

347

Schema inputSchema = stageConfigurer.getInputSchema();

348

349

if (inputSchema != null) {

350

// Validate that specified fields exist

351

List<String> fieldsToClean = config.getFieldsToClean();

352

for (String fieldName : fieldsToClean) {

353

if (inputSchema.getField(fieldName) == null) {

354

throw new IllegalArgumentException("Field '" + fieldName + "' does not exist in input schema");

355

}

356

}

357

358

// Output schema is the same as input schema for cleaning operations

359

stageConfigurer.setOutputSchema(inputSchema);

360

}

361

}

362

363

@Override

364

public void initialize(TransformContext context) throws Exception {

365

super.initialize(context);

366

this.fieldsToClean = config.getFieldsToClean();

367

this.outputSchema = context.getOutputSchema();

368

}

369

370

@Override

371

public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {

372

// Check if we should remove records with null values

373

if (config.getRemoveNulls() && hasNullFields(input)) {

374

// Skip this record

375

return;

376

}

377

378

StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema);

379

380

// Copy and clean each field

381

for (Schema.Field field : input.getSchema().getFields()) {

382

String fieldName = field.getName();

383

Object value = input.get(fieldName);

384

385

if (fieldsToClean.isEmpty() || fieldsToClean.contains(fieldName)) {

386

value = cleanFieldValue(value, field.getSchema());

387

}

388

389

builder.set(fieldName, value);

390

}

391

392

emitter.emit(builder.build());

393

}

394

395

private boolean hasNullFields(StructuredRecord record) {

396

for (String fieldName : fieldsToClean) {

397

if (record.get(fieldName) == null) {

398

return true;

399

}

400

}

401

return false;

402

}

403

404

private Object cleanFieldValue(Object value, Schema fieldSchema) {

405

if (value == null) {

406

return null;

407

}

408

409

Schema.Type type = fieldSchema.isNullable() ?

410

fieldSchema.getNonNullable().getType() : fieldSchema.getType();

411

412

if (type == Schema.Type.STRING) {

413

String stringValue = value.toString();

414

415

if (config.getTrimWhitespace()) {

416

stringValue = stringValue.trim();

417

}

418

419

if (config.getLowercaseStrings()) {

420

stringValue = stringValue.toLowerCase();

421

}

422

423

return stringValue;

424

}

425

426

return value;

427

}

428

}

429

```

430

431

### Sink Plugins

432

433

Sink plugins write data to external systems:

434

435

```java { .api }

436

// Sink plugin configuration

437

public class DatabaseSinkConfig extends PluginConfig {

438

@Name("connectionString")

439

@Description("JDBC connection string")

440

@Macro

441

@Property

442

private String connectionString;

443

444

@Name("tableName")

445

@Description("Target table name")

446

@Macro

447

@Property

448

private String tableName;

449

450

@Name("username")

451

@Description("Database username")

452

@Macro

453

@Property

454

private String username;

455

456

@Name("password")

457

@Description("Database password")

458

@Macro

459

@Property

460

private String password;

461

462

@Name("batchSize")

463

@Description("Number of records to write in each batch")

464

@Property

465

private Integer batchSize = 1000;

466

467

// Getters and validation...

468

}

469

470

// Sink plugin implementation

471

@Plugin(type = "batchsink")

472

@Name("DatabaseSink")

473

@Description("Writes data to a relational database")

474

@Category("sink")

475

public class DatabaseSinkPlugin extends BatchSink<StructuredRecord, NullWritable, NullWritable> {

476

477

private final DatabaseSinkConfig config;

478

479

public DatabaseSinkPlugin(DatabaseSinkConfig config) {

480

this.config = config;

481

}

482

483

@Override

484

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

485

// Validate configuration

486

config.validate();

487

488

// Test database connection if not using macros

489

if (!containsMacros()) {

490

testConnection();

491

}

492

}

493

494

@Override

495

public void prepareRun(BatchSinkContext context) throws Exception {

496

Job job = context.getHadoopJob();

497

498

// Configure database output format

499

job.setOutputFormatClass(DatabaseOutputFormat.class);

500

job.setOutputKeyClass(NullWritable.class);

501

job.setOutputValueClass(NullWritable.class);

502

503

// Set database connection properties

504

DatabaseConfiguration.configureDB(job.getConfiguration(),

505

config.getConnectionString(),

506

config.getUsername(),

507

config.getPassword(),

508

config.getTableName());

509

}

510

511

@Override

512

public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, NullWritable>> emitter)

513

throws Exception {

514

515

// Convert StructuredRecord to database format and write

516

// This would typically buffer records and write in batches

517

writeRecordToDatabase(input);

518

519

// Emit to continue pipeline (if needed)

520

emitter.emit(new KeyValue<>(NullWritable.get(), NullWritable.get()));

521

}

522

523

private void writeRecordToDatabase(StructuredRecord record) throws SQLException {

524

// Implementation for writing record to database

525

// Use prepared statements and batch operations for efficiency

526

}

527

528

private boolean containsMacros() {

529

return config.getConnectionString().contains("${") ||

530

config.getTableName().contains("${") ||

531

config.getUsername().contains("${") ||

532

config.getPassword().contains("${");

533

}

534

535

private void testConnection() {

536

try (Connection conn = DriverManager.getConnection(

537

config.getConnectionString(), config.getUsername(), config.getPassword())) {

538

// Test connection and verify table exists

539

DatabaseMetaData metaData = conn.getMetaData();

540

try (ResultSet tables = metaData.getTables(null, null, config.getTableName(), null)) {

541

if (!tables.next()) {

542

throw new IllegalArgumentException("Table '" + config.getTableName() + "' does not exist");

543

}

544

}

545

} catch (SQLException e) {

546

throw new IllegalArgumentException("Failed to connect to database: " + e.getMessage(), e);

547

}

548

}

549

}

550

```

551

552

## Plugin Selection and Requirements

553

554

### Plugin Selector

555

556

```java { .api }

557

// Plugin selector for choosing among multiple plugin candidates

558

public class PluginSelector {

559

public static final PluginSelector EMPTY = new PluginSelector(SortOrder.UNSPECIFIED, null);

560

561

public enum SortOrder {

562

CREATION_TIME_ASC,

563

CREATION_TIME_DESC,

564

VERSION_ASC,

565

VERSION_DESC,

566

UNSPECIFIED

567

}

568

569

public PluginSelector(SortOrder sortOrder) { /* constructor */ }

570

public PluginSelector(SortOrder sortOrder, String subtaskName) { /* constructor with subtask */ }

571

572

public SortOrder getSortOrder() { /* returns sort order */ }

573

public String getSubtaskName() { /* returns subtask name */ }

574

}

575

576

// Plugin requirements specification

577

public class Requirements {

578

public static Builder builder() { return new Builder(); }

579

580

public Set<String> getCapabilities() { /* returns required capabilities */ }

581

public Set<String> getDatasetTypes() { /* returns required dataset types */ }

582

583

public static class Builder {

584

public Builder addCapabilities(String... capabilities) { /* add capabilities */ return this; }

585

public Builder addDatasetTypes(String... datasetTypes) { /* add dataset types */ return this; }

586

public Requirements build() { /* build requirements */ }

587

}

588

}

589

```

590

591

### Plugin Validation and Error Handling

592

593

```java { .api }

594

// Plugin configuration validation

595

public class InvalidPluginConfigException extends RuntimeException {

596

private final Set<InvalidPluginProperty> invalidProperties;

597

598

public InvalidPluginConfigException(String message, Set<InvalidPluginProperty> invalidProperties) {

599

super(message);

600

this.invalidProperties = invalidProperties;

601

}

602

603

public Set<InvalidPluginProperty> getInvalidProperties() {

604

return invalidProperties;

605

}

606

}

607

608

// Invalid plugin property details

609

public class InvalidPluginProperty {

610

public InvalidPluginProperty(String propertyName, String message) { /* constructor */ }

611

612

public String getPropertyName() { /* returns property name */ }

613

public String getMessage() { /* returns error message */ }

614

}

615

616

// Plugin validation utility

617

public abstract class ValidatingPluginConfig extends PluginConfig {

618

619

public final void validate() throws InvalidPluginConfigException {

620

Set<InvalidPluginProperty> errors = new HashSet<>();

621

622

try {

623

validateConfig(errors);

624

} catch (Exception e) {

625

errors.add(new InvalidPluginProperty("general", "Validation failed: " + e.getMessage()));

626

}

627

628

if (!errors.isEmpty()) {

629

throw new InvalidPluginConfigException("Plugin configuration is invalid", errors);

630

}

631

}

632

633

protected abstract void validateConfig(Set<InvalidPluginProperty> errors);

634

635

protected void validateRequired(String propertyName, String value, Set<InvalidPluginProperty> errors) {

636

if (value == null || value.trim().isEmpty()) {

637

errors.add(new InvalidPluginProperty(propertyName, "Property is required"));

638

}

639

}

640

641

protected void validateFormat(String propertyName, String value, String pattern,

642

Set<InvalidPluginProperty> errors) {

643

if (value != null && !value.matches(pattern)) {

644

errors.add(new InvalidPluginProperty(propertyName, "Invalid format"));

645

}

646

}

647

}

648

```

649

650

## Advanced Plugin Patterns

651

652

### Plugin Composition

653

654

```java { .api }

655

// Multi-stage plugin configuration

656

public class CompositeTransformConfig extends PluginConfig {

657

@Name("stages")

658

@Description("JSON array of transformation stages")

659

@Property

660

private String stages;

661

662

public List<TransformStage> getStages() throws IOException {

663

JsonArray stagesArray = new JsonParser().parse(stages).getAsJsonArray();

664

List<TransformStage> stageList = new ArrayList<>();

665

666

for (JsonElement element : stagesArray) {

667

JsonObject stageObj = element.getAsJsonObject();

668

TransformStage stage = new TransformStage(

669

stageObj.get("name").getAsString(),

670

stageObj.get("type").getAsString(),

671

stageObj.get("config").getAsJsonObject()

672

);

673

stageList.add(stage);

674

}

675

676

return stageList;

677

}

678

}

679

680

// Composite plugin that chains multiple transformations

681

@Plugin(type = "transform")

682

@Name("CompositeTransform")

683

@Description("Applies multiple transformations in sequence")

684

public class CompositeTransformPlugin extends Transform<StructuredRecord, StructuredRecord> {

685

686

private final CompositeTransformConfig config;

687

private List<Transform<StructuredRecord, StructuredRecord>> transforms;

688

689

@Override

690

public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {

691

StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();

692

Schema currentSchema = stageConfigurer.getInputSchema();

693

694

// Configure each transform stage

695

for (TransformStage stage : config.getStages()) {

696

// Dynamically load and configure transform plugin

697

Transform<StructuredRecord, StructuredRecord> transform =

698

loadTransformPlugin(stage, pipelineConfigurer);

699

700

// Update schema through the pipeline

701

currentSchema = getTransformOutputSchema(transform, currentSchema);

702

}

703

704

stageConfigurer.setOutputSchema(currentSchema);

705

}

706

707

@Override

708

public void initialize(TransformContext context) throws Exception {

709

// Initialize all child transforms

710

transforms = new ArrayList<>();

711

for (TransformStage stage : config.getStages()) {

712

Transform<StructuredRecord, StructuredRecord> transform =

713

context.newPluginInstance(stage.getName());

714

transforms.add(transform);

715

}

716

}

717

718

@Override

719

public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {

720

StructuredRecord current = input;

721

722

// Apply each transformation in sequence

723

for (Transform<StructuredRecord, StructuredRecord> transform : transforms) {

724

CollectingEmitter<StructuredRecord> collector = new CollectingEmitter<>();

725

transform.transform(current, collector);

726

727

List<StructuredRecord> results = collector.getEmitted();

728

if (results.size() == 1) {

729

current = results.get(0);

730

} else if (results.isEmpty()) {

731

// Record was filtered out

732

return;

733

} else {

734

// Multiple records produced - emit all but last, use last for next stage

735

for (int i = 0; i < results.size() - 1; i++) {

736

emitter.emit(results.get(i));

737

}

738

current = results.get(results.size() - 1);

739

}

740

}

741

742

emitter.emit(current);

743

}

744

}

745

746

// Plugin factory pattern

747

public class PluginFactory {

748

public static <T> T createPlugin(String pluginType, String pluginName,

749

PluginProperties properties, PluginContext context) {

750

return context.newPluginInstance(pluginName);

751

}

752

753

public static PluginProperties mergeProperties(PluginProperties base,

754

PluginProperties override) {

755

PluginProperties.Builder builder = PluginProperties.builder();

756

builder.addAll(base.getProperties());

757

builder.addAll(override.getProperties());

758

return builder.build();

759

}

760

}

761

```

762

763

## Plugin Testing and Development Tools

764

765

### Plugin Testing Framework

766

767

```java { .api }

768

// Plugin test base class

769

public abstract class PluginTestBase {

770

771

protected <T extends PluginConfig> void validatePluginConfig(Class<T> configClass,

772

Map<String, String> properties)

773

throws Exception {

774

T config = deserializeConfig(configClass, properties);

775

if (config instanceof ValidatingPluginConfig) {

776

((ValidatingPluginConfig) config).validate();

777

}

778

}

779

780

protected Schema createTestSchema(String... fieldSpecs) {

781

List<Schema.Field> fields = new ArrayList<>();

782

for (String spec : fieldSpecs) {

783

String[] parts = spec.split(":");

784

String name = parts[0];

785

Schema.Type type = Schema.Type.valueOf(parts[1].toUpperCase());

786

fields.add(Schema.Field.of(name, Schema.of(type)));

787

}

788

return Schema.recordOf("TestRecord", fields);

789

}

790

791

protected StructuredRecord createTestRecord(Schema schema, Object... values) {

792

StructuredRecord.Builder builder = StructuredRecord.builder(schema);

793

List<Schema.Field> fields = schema.getFields();

794

795

for (int i = 0; i < Math.min(fields.size(), values.length); i++) {

796

builder.set(fields.get(i).getName(), values[i]);

797

}

798

799

return builder.build();

800

}

801

802

private <T> T deserializeConfig(Class<T> configClass, Map<String, String> properties)

803

throws Exception {

804

// Implementation for deserializing configuration from properties

805

return configClass.newInstance(); // Simplified - real implementation would use reflection

806

}

807

}

808

809

// Mock emitter for testing

810

public class MockEmitter<T> implements Emitter<T> {

811

private final List<T> emitted = new ArrayList<>();

812

private final List<InvalidEntry<T>> errors = new ArrayList<>();

813

814

@Override

815

public void emit(T value) {

816

emitted.add(value);

817

}

818

819

@Override

820

public void emitError(InvalidEntry<T> invalidEntry) {

821

errors.add(invalidEntry);

822

}

823

824

public List<T> getEmitted() {

825

return new ArrayList<>(emitted);

826

}

827

828

public List<InvalidEntry<T>> getErrors() {

829

return new ArrayList<>(errors);

830

}

831

832

public void clear() {

833

emitted.clear();

834

errors.clear();

835

}

836

}

837

```

838

839

The CDAP Plugin System enables building modular, reusable data processing components with strong type safety, comprehensive configuration management, and enterprise-grade operational features. This extensibility framework is essential for creating scalable, maintainable data processing applications.