0
# Plugin System
1
2
The CDAP Plugin System provides a powerful extensibility framework that allows developers to create reusable, configurable components for data processing pipelines. Plugins enable modular application development and promote code reuse across different applications and organizations.
3
4
## Plugin Architecture
5
6
### Core Plugin Interfaces
7
8
```java { .api }
9
import io.cdap.cdap.api.plugin.*;
10
import io.cdap.cdap.api.annotation.*;
11
12
// Plugin configurer interface
13
public interface PluginConfigurer {
14
<T> T usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties);
15
<T> T usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties,
16
PluginSelector selector);
17
<T> Class<T> usePluginClass(String pluginType, String pluginName, String pluginId,
18
PluginProperties properties);
19
<T> Class<T> usePluginClass(String pluginType, String pluginName, String pluginId,
20
PluginProperties properties, PluginSelector selector);
21
}
22
23
// Plugin runtime context
24
public interface PluginContext extends FeatureFlagsProvider {
25
<T> T newPluginInstance(String pluginId) throws InstantiationException;
26
<T> Class<T> loadPluginClass(String pluginId);
27
PluginProperties getPluginProperties(String pluginId);
28
Map<String, PluginProperties> getPlugins();
29
}
30
31
// Plugin metadata
32
public final class Plugin {
33
public static Plugin of(String type, String name, String pluginId, PluginProperties properties) {
34
/* create plugin instance */
35
}
36
37
public String getPluginType() { /* returns plugin type */ }
38
public String getPluginName() { /* returns plugin name */ }
39
public String getPluginId() { /* returns plugin ID */ }
40
public PluginProperties getProperties() { /* returns plugin properties */ }
41
public PluginSelector getSelector() { /* returns plugin selector */ }
42
}
43
```
44
45
### Plugin Properties and Configuration
46
47
```java { .api }
48
// Plugin properties container
49
public class PluginProperties implements Serializable {
50
public static Builder builder() { return new Builder(); }
51
public static PluginProperties of(Map<String, String> properties) { /* create from map */ }
52
53
public Map<String, String> getProperties() { /* returns properties map */ }
54
public String getProperty(String key) { /* returns property value */ }
55
public String getProperty(String key, String defaultValue) { /* returns property with default */ }
56
57
public static class Builder {
58
public Builder add(String key, String value) { /* add property */ return this; }
59
public Builder addAll(Map<String, String> properties) { /* add all properties */ return this; }
60
public PluginProperties build() { /* build properties */ }
61
}
62
}
63
64
// Base plugin configuration class
65
public abstract class PluginConfig extends Config implements Serializable {
66
// Base class for all plugin configurations
67
// Extend this class for typed plugin configurations
68
}
69
70
// Plugin class metadata
71
public class PluginClass {
72
public String getName() { /* returns plugin name */ }
73
public String getType() { /* returns plugin type */ }
74
public String getDescription() { /* returns plugin description */ }
75
public String getClassName() { /* returns plugin class name */ }
76
public String getCategory() { /* returns plugin category */ }
77
public Set<PluginPropertyField> getProperties() { /* returns plugin properties */ }
78
public Map<String, PluginPropertyField> getPropertiesMap() { /* returns properties as map */ }
79
public Requirements getRequirements() { /* returns plugin requirements */ }
80
}
81
82
// Plugin property field metadata
83
public class PluginPropertyField {
84
public String getName() { /* returns field name */ }
85
public String getDescription() { /* returns field description */ }
86
public String getType() { /* returns field type */ }
87
public boolean isRequired() { /* returns if field is required */ }
88
public boolean isMacroSupported() { /* returns if macros are supported */ }
89
public boolean isMacroEscapingEnabled() { /* returns if macro escaping is enabled */ }
90
public Set<String> getChildren() { /* returns child field names */ }
91
}
92
```
93
94
### Plugin Annotations
95
96
```java { .api }
97
// Core plugin annotations
98
@Plugin(type = "source") // Marks a class as a plugin of specific type
99
@Name("MySourcePlugin") // Specifies the plugin name
100
@Description("Reads data from external source") // Provides plugin description
101
@Category("source") // Categorizes the plugin
102
103
// Property annotations
104
@Property // Marks fields as configuration properties
105
@Macro // Enables macro substitution in field values
106
@Description("Input path for data files") // Describes configuration properties
107
108
// Metadata annotations
109
@Metadata(properties = {
110
@MetadataProperty(key = "doc.url", value = "https://example.com/docs"),
111
@MetadataProperty(key = "author", value = "Data Team")
112
})
113
```
114
115
## Plugin Types and Development
116
117
### Source Plugins
118
119
Source plugins read data from external systems:
120
121
```java { .api }
122
// Source plugin configuration
123
public class FileSourceConfig extends PluginConfig {
124
@Name("path")
125
@Description("Path to input files")
126
@Macro
127
@Property
128
private String path;
129
130
@Name("format")
131
@Description("File format (json, csv, avro, parquet)")
132
@Property
133
private String format = "json";
134
135
@Name("schema")
136
@Description("Schema of the input data")
137
@Property
138
private String schema;
139
140
@Name("recursive")
141
@Description("Whether to read files recursively")
142
@Property
143
private Boolean recursive = false;
144
145
// Getters and validation methods
146
public String getPath() { return path; }
147
public String getFormat() { return format; }
148
public String getSchema() { return schema; }
149
public Boolean getRecursive() { return recursive; }
150
151
public void validate() {
152
if (path == null || path.isEmpty()) {
153
throw new IllegalArgumentException("Path cannot be empty");
154
}
155
if (!Arrays.asList("json", "csv", "avro", "parquet").contains(format)) {
156
throw new IllegalArgumentException("Unsupported format: " + format);
157
}
158
}
159
}
160
161
// Source plugin implementation
162
@Plugin(type = "batchsource")
163
@Name("FileSource")
164
@Description("Reads data from files in various formats")
165
@Category("source")
166
@Metadata(properties = {
167
@MetadataProperty(key = "doc.url", value = "https://docs.example.com/plugins/file-source")
168
})
169
public class FileSourcePlugin extends BatchSource<NullWritable, Text, StructuredRecord> {
170
171
private final FileSourceConfig config;
172
173
public FileSourcePlugin(FileSourceConfig config) {
174
this.config = config;
175
}
176
177
@Override
178
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
179
// Validate configuration
180
config.validate();
181
182
// Set output schema
183
try {
184
Schema outputSchema = Schema.parseJson(config.getSchema());
185
pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
186
} catch (IOException e) {
187
throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e);
188
}
189
}
190
191
@Override
192
public void prepareRun(BatchSourceContext context) throws Exception {
193
// Prepare the source for execution
194
Job job = context.getHadoopJob();
195
196
// Configure input format based on file type
197
switch (config.getFormat().toLowerCase()) {
198
case "json":
199
job.setInputFormatClass(TextInputFormat.class);
200
break;
201
case "csv":
202
job.setInputFormatClass(TextInputFormat.class);
203
break;
204
case "avro":
205
job.setInputFormatClass(AvroKeyInputFormat.class);
206
break;
207
case "parquet":
208
job.setInputFormatClass(ParquetInputFormat.class);
209
break;
210
default:
211
throw new IllegalArgumentException("Unsupported format: " + config.getFormat());
212
}
213
214
// Set input path
215
FileInputFormat.addInputPath(job, new Path(config.getPath()));
216
217
// Configure recursive search if enabled
218
if (config.getRecursive()) {
219
FileInputFormat.setInputDirRecursive(job, true);
220
}
221
}
222
223
@Override
224
public void transform(KeyValue<NullWritable, Text> input, Emitter<StructuredRecord> emitter) throws Exception {
225
String line = input.getValue().toString();
226
227
// Parse based on format
228
StructuredRecord record = parseRecord(line, config.getFormat(), config.getSchema());
229
if (record != null) {
230
emitter.emit(record);
231
}
232
}
233
234
private StructuredRecord parseRecord(String line, String format, String schemaStr) throws IOException {
235
Schema schema = Schema.parseJson(schemaStr);
236
237
switch (format.toLowerCase()) {
238
case "json":
239
return parseJsonRecord(line, schema);
240
case "csv":
241
return parseCsvRecord(line, schema);
242
default:
243
throw new UnsupportedOperationException("Format not supported in transform: " + format);
244
}
245
}
246
247
private StructuredRecord parseJsonRecord(String jsonLine, Schema schema) {
248
try {
249
JsonObject json = new JsonParser().parse(jsonLine).getAsJsonObject();
250
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
251
252
for (Schema.Field field : schema.getFields()) {
253
String fieldName = field.getName();
254
if (json.has(fieldName) && !json.get(fieldName).isJsonNull()) {
255
Object value = parseJsonValue(json.get(fieldName), field.getSchema());
256
builder.set(fieldName, value);
257
}
258
}
259
260
return builder.build();
261
} catch (Exception e) {
262
// Log error and skip malformed records
263
LOG.warn("Failed to parse JSON record: {}", jsonLine, e);
264
return null;
265
}
266
}
267
268
private Object parseJsonValue(JsonElement element, Schema fieldSchema) {
269
Schema.Type type = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getType() : fieldSchema.getType();
270
271
switch (type) {
272
case STRING:
273
return element.getAsString();
274
case INT:
275
return element.getAsInt();
276
case LONG:
277
return element.getAsLong();
278
case DOUBLE:
279
return element.getAsDouble();
280
case BOOLEAN:
281
return element.getAsBoolean();
282
default:
283
return element.getAsString();
284
}
285
}
286
}
287
```
288
289
### Transform Plugins
290
291
Transform plugins process and modify data:
292
293
```java { .api }
294
// Transform plugin configuration
295
public class DataCleaningConfig extends PluginConfig {
296
@Name("fieldsToClean")
297
@Description("Comma-separated list of fields to clean")
298
@Property
299
private String fieldsToClean;
300
301
@Name("removeNulls")
302
@Description("Whether to remove records with null values")
303
@Property
304
private Boolean removeNulls = true;
305
306
@Name("trimWhitespace")
307
@Description("Whether to trim whitespace from string fields")
308
@Property
309
private Boolean trimWhitespace = true;
310
311
@Name("lowercaseStrings")
312
@Description("Whether to convert strings to lowercase")
313
@Property
314
private Boolean lowercaseStrings = false;
315
316
public List<String> getFieldsToClean() {
317
if (fieldsToClean == null || fieldsToClean.isEmpty()) {
318
return Collections.emptyList();
319
}
320
return Arrays.asList(fieldsToClean.split(","))
321
.stream()
322
.map(String::trim)
323
.collect(Collectors.toList());
324
}
325
326
// Other getters...
327
}
328
329
// Transform plugin implementation
330
@Plugin(type = "transform")
331
@Name("DataCleaning")
332
@Description("Cleans and standardizes data fields")
333
@Category("cleansing")
334
public class DataCleaningPlugin extends Transform<StructuredRecord, StructuredRecord> {
335
336
private final DataCleaningConfig config;
337
private List<String> fieldsToClean;
338
private Schema outputSchema;
339
340
public DataCleaningPlugin(DataCleaningConfig config) {
341
this.config = config;
342
}
343
344
@Override
345
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
346
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
347
Schema inputSchema = stageConfigurer.getInputSchema();
348
349
if (inputSchema != null) {
350
// Validate that specified fields exist
351
List<String> fieldsToClean = config.getFieldsToClean();
352
for (String fieldName : fieldsToClean) {
353
if (inputSchema.getField(fieldName) == null) {
354
throw new IllegalArgumentException("Field '" + fieldName + "' does not exist in input schema");
355
}
356
}
357
358
// Output schema is the same as input schema for cleaning operations
359
stageConfigurer.setOutputSchema(inputSchema);
360
}
361
}
362
363
@Override
364
public void initialize(TransformContext context) throws Exception {
365
super.initialize(context);
366
this.fieldsToClean = config.getFieldsToClean();
367
this.outputSchema = context.getOutputSchema();
368
}
369
370
@Override
371
public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
372
// Check if we should remove records with null values
373
if (config.getRemoveNulls() && hasNullFields(input)) {
374
// Skip this record
375
return;
376
}
377
378
StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema);
379
380
// Copy and clean each field
381
for (Schema.Field field : input.getSchema().getFields()) {
382
String fieldName = field.getName();
383
Object value = input.get(fieldName);
384
385
if (fieldsToClean.isEmpty() || fieldsToClean.contains(fieldName)) {
386
value = cleanFieldValue(value, field.getSchema());
387
}
388
389
builder.set(fieldName, value);
390
}
391
392
emitter.emit(builder.build());
393
}
394
395
private boolean hasNullFields(StructuredRecord record) {
396
for (String fieldName : fieldsToClean) {
397
if (record.get(fieldName) == null) {
398
return true;
399
}
400
}
401
return false;
402
}
403
404
private Object cleanFieldValue(Object value, Schema fieldSchema) {
405
if (value == null) {
406
return null;
407
}
408
409
Schema.Type type = fieldSchema.isNullable() ?
410
fieldSchema.getNonNullable().getType() : fieldSchema.getType();
411
412
if (type == Schema.Type.STRING) {
413
String stringValue = value.toString();
414
415
if (config.getTrimWhitespace()) {
416
stringValue = stringValue.trim();
417
}
418
419
if (config.getLowercaseStrings()) {
420
stringValue = stringValue.toLowerCase();
421
}
422
423
return stringValue;
424
}
425
426
return value;
427
}
428
}
429
```
430
431
### Sink Plugins
432
433
Sink plugins write data to external systems:
434
435
```java { .api }
436
// Sink plugin configuration
437
public class DatabaseSinkConfig extends PluginConfig {
438
@Name("connectionString")
439
@Description("JDBC connection string")
440
@Macro
441
@Property
442
private String connectionString;
443
444
@Name("tableName")
445
@Description("Target table name")
446
@Macro
447
@Property
448
private String tableName;
449
450
@Name("username")
451
@Description("Database username")
452
@Macro
453
@Property
454
private String username;
455
456
@Name("password")
457
@Description("Database password")
458
@Macro
459
@Property
460
private String password;
461
462
@Name("batchSize")
463
@Description("Number of records to write in each batch")
464
@Property
465
private Integer batchSize = 1000;
466
467
// Getters and validation...
468
}
469
470
// Sink plugin implementation
471
@Plugin(type = "batchsink")
472
@Name("DatabaseSink")
473
@Description("Writes data to a relational database")
474
@Category("sink")
475
public class DatabaseSinkPlugin extends BatchSink<StructuredRecord, NullWritable, NullWritable> {
476
477
private final DatabaseSinkConfig config;
478
479
public DatabaseSinkPlugin(DatabaseSinkConfig config) {
480
this.config = config;
481
}
482
483
@Override
484
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
485
// Validate configuration
486
config.validate();
487
488
// Test database connection if not using macros
489
if (!containsMacros()) {
490
testConnection();
491
}
492
}
493
494
@Override
495
public void prepareRun(BatchSinkContext context) throws Exception {
496
Job job = context.getHadoopJob();
497
498
// Configure database output format
499
job.setOutputFormatClass(DatabaseOutputFormat.class);
500
job.setOutputKeyClass(NullWritable.class);
501
job.setOutputValueClass(NullWritable.class);
502
503
// Set database connection properties
504
DatabaseConfiguration.configureDB(job.getConfiguration(),
505
config.getConnectionString(),
506
config.getUsername(),
507
config.getPassword(),
508
config.getTableName());
509
}
510
511
@Override
512
public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, NullWritable>> emitter)
513
throws Exception {
514
515
// Convert StructuredRecord to database format and write
516
// This would typically buffer records and write in batches
517
writeRecordToDatabase(input);
518
519
// Emit to continue pipeline (if needed)
520
emitter.emit(new KeyValue<>(NullWritable.get(), NullWritable.get()));
521
}
522
523
private void writeRecordToDatabase(StructuredRecord record) throws SQLException {
524
// Implementation for writing record to database
525
// Use prepared statements and batch operations for efficiency
526
}
527
528
private boolean containsMacros() {
529
return config.getConnectionString().contains("${") ||
530
config.getTableName().contains("${") ||
531
config.getUsername().contains("${") ||
532
config.getPassword().contains("${");
533
}
534
535
private void testConnection() {
536
try (Connection conn = DriverManager.getConnection(
537
config.getConnectionString(), config.getUsername(), config.getPassword())) {
538
// Test connection and verify table exists
539
DatabaseMetaData metaData = conn.getMetaData();
540
try (ResultSet tables = metaData.getTables(null, null, config.getTableName(), null)) {
541
if (!tables.next()) {
542
throw new IllegalArgumentException("Table '" + config.getTableName() + "' does not exist");
543
}
544
}
545
} catch (SQLException e) {
546
throw new IllegalArgumentException("Failed to connect to database: " + e.getMessage(), e);
547
}
548
}
549
}
550
```
551
552
## Plugin Selection and Requirements
553
554
### Plugin Selector
555
556
```java { .api }
557
// Plugin selector for choosing among multiple plugin candidates
558
public class PluginSelector {
559
public static final PluginSelector EMPTY = new PluginSelector(SortOrder.UNSPECIFIED, null);
560
561
public enum SortOrder {
562
CREATION_TIME_ASC,
563
CREATION_TIME_DESC,
564
VERSION_ASC,
565
VERSION_DESC,
566
UNSPECIFIED
567
}
568
569
public PluginSelector(SortOrder sortOrder) { /* constructor */ }
570
public PluginSelector(SortOrder sortOrder, String subtaskName) { /* constructor with subtask */ }
571
572
public SortOrder getSortOrder() { /* returns sort order */ }
573
public String getSubtaskName() { /* returns subtask name */ }
574
}
575
576
// Plugin requirements specification
577
public class Requirements {
578
public static Builder builder() { return new Builder(); }
579
580
public Set<String> getCapabilities() { /* returns required capabilities */ }
581
public Set<String> getDatasetTypes() { /* returns required dataset types */ }
582
583
public static class Builder {
584
public Builder addCapabilities(String... capabilities) { /* add capabilities */ return this; }
585
public Builder addDatasetTypes(String... datasetTypes) { /* add dataset types */ return this; }
586
public Requirements build() { /* build requirements */ }
587
}
588
}
589
```
590
591
### Plugin Validation and Error Handling
592
593
```java { .api }
594
// Plugin configuration validation
595
public class InvalidPluginConfigException extends RuntimeException {
596
private final Set<InvalidPluginProperty> invalidProperties;
597
598
public InvalidPluginConfigException(String message, Set<InvalidPluginProperty> invalidProperties) {
599
super(message);
600
this.invalidProperties = invalidProperties;
601
}
602
603
public Set<InvalidPluginProperty> getInvalidProperties() {
604
return invalidProperties;
605
}
606
}
607
608
// Invalid plugin property details
609
public class InvalidPluginProperty {
610
public InvalidPluginProperty(String propertyName, String message) { /* constructor */ }
611
612
public String getPropertyName() { /* returns property name */ }
613
public String getMessage() { /* returns error message */ }
614
}
615
616
// Plugin validation utility
617
public abstract class ValidatingPluginConfig extends PluginConfig {
618
619
public final void validate() throws InvalidPluginConfigException {
620
Set<InvalidPluginProperty> errors = new HashSet<>();
621
622
try {
623
validateConfig(errors);
624
} catch (Exception e) {
625
errors.add(new InvalidPluginProperty("general", "Validation failed: " + e.getMessage()));
626
}
627
628
if (!errors.isEmpty()) {
629
throw new InvalidPluginConfigException("Plugin configuration is invalid", errors);
630
}
631
}
632
633
protected abstract void validateConfig(Set<InvalidPluginProperty> errors);
634
635
protected void validateRequired(String propertyName, String value, Set<InvalidPluginProperty> errors) {
636
if (value == null || value.trim().isEmpty()) {
637
errors.add(new InvalidPluginProperty(propertyName, "Property is required"));
638
}
639
}
640
641
protected void validateFormat(String propertyName, String value, String pattern,
642
Set<InvalidPluginProperty> errors) {
643
if (value != null && !value.matches(pattern)) {
644
errors.add(new InvalidPluginProperty(propertyName, "Invalid format"));
645
}
646
}
647
}
648
```
649
650
## Advanced Plugin Patterns
651
652
### Plugin Composition
653
654
```java { .api }
655
// Multi-stage plugin configuration
656
public class CompositeTransformConfig extends PluginConfig {
657
@Name("stages")
658
@Description("JSON array of transformation stages")
659
@Property
660
private String stages;
661
662
public List<TransformStage> getStages() throws IOException {
663
JsonArray stagesArray = new JsonParser().parse(stages).getAsJsonArray();
664
List<TransformStage> stageList = new ArrayList<>();
665
666
for (JsonElement element : stagesArray) {
667
JsonObject stageObj = element.getAsJsonObject();
668
TransformStage stage = new TransformStage(
669
stageObj.get("name").getAsString(),
670
stageObj.get("type").getAsString(),
671
stageObj.get("config").getAsJsonObject()
672
);
673
stageList.add(stage);
674
}
675
676
return stageList;
677
}
678
}
679
680
// Composite plugin that chains multiple transformations
681
@Plugin(type = "transform")
682
@Name("CompositeTransform")
683
@Description("Applies multiple transformations in sequence")
684
public class CompositeTransformPlugin extends Transform<StructuredRecord, StructuredRecord> {
685
686
private final CompositeTransformConfig config;
687
private List<Transform<StructuredRecord, StructuredRecord>> transforms;
688
689
@Override
690
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
691
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
692
Schema currentSchema = stageConfigurer.getInputSchema();
693
694
// Configure each transform stage
695
for (TransformStage stage : config.getStages()) {
696
// Dynamically load and configure transform plugin
697
Transform<StructuredRecord, StructuredRecord> transform =
698
loadTransformPlugin(stage, pipelineConfigurer);
699
700
// Update schema through the pipeline
701
currentSchema = getTransformOutputSchema(transform, currentSchema);
702
}
703
704
stageConfigurer.setOutputSchema(currentSchema);
705
}
706
707
@Override
708
public void initialize(TransformContext context) throws Exception {
709
// Initialize all child transforms
710
transforms = new ArrayList<>();
711
for (TransformStage stage : config.getStages()) {
712
Transform<StructuredRecord, StructuredRecord> transform =
713
context.newPluginInstance(stage.getName());
714
transforms.add(transform);
715
}
716
}
717
718
@Override
719
public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
720
StructuredRecord current = input;
721
722
// Apply each transformation in sequence
723
for (Transform<StructuredRecord, StructuredRecord> transform : transforms) {
724
CollectingEmitter<StructuredRecord> collector = new CollectingEmitter<>();
725
transform.transform(current, collector);
726
727
List<StructuredRecord> results = collector.getEmitted();
728
if (results.size() == 1) {
729
current = results.get(0);
730
} else if (results.isEmpty()) {
731
// Record was filtered out
732
return;
733
} else {
734
// Multiple records produced - emit all but last, use last for next stage
735
for (int i = 0; i < results.size() - 1; i++) {
736
emitter.emit(results.get(i));
737
}
738
current = results.get(results.size() - 1);
739
}
740
}
741
742
emitter.emit(current);
743
}
744
}
745
746
// Plugin factory pattern
747
public class PluginFactory {
748
public static <T> T createPlugin(String pluginType, String pluginName,
749
PluginProperties properties, PluginContext context) {
750
return context.newPluginInstance(pluginName);
751
}
752
753
public static PluginProperties mergeProperties(PluginProperties base,
754
PluginProperties override) {
755
PluginProperties.Builder builder = PluginProperties.builder();
756
builder.addAll(base.getProperties());
757
builder.addAll(override.getProperties());
758
return builder.build();
759
}
760
}
761
```
762
763
## Plugin Testing and Development Tools
764
765
### Plugin Testing Framework
766
767
```java { .api }
768
// Plugin test base class
769
public abstract class PluginTestBase {
770
771
protected <T extends PluginConfig> void validatePluginConfig(Class<T> configClass,
772
Map<String, String> properties)
773
throws Exception {
774
T config = deserializeConfig(configClass, properties);
775
if (config instanceof ValidatingPluginConfig) {
776
((ValidatingPluginConfig) config).validate();
777
}
778
}
779
780
protected Schema createTestSchema(String... fieldSpecs) {
781
List<Schema.Field> fields = new ArrayList<>();
782
for (String spec : fieldSpecs) {
783
String[] parts = spec.split(":");
784
String name = parts[0];
785
Schema.Type type = Schema.Type.valueOf(parts[1].toUpperCase());
786
fields.add(Schema.Field.of(name, Schema.of(type)));
787
}
788
return Schema.recordOf("TestRecord", fields);
789
}
790
791
protected StructuredRecord createTestRecord(Schema schema, Object... values) {
792
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
793
List<Schema.Field> fields = schema.getFields();
794
795
for (int i = 0; i < Math.min(fields.size(), values.length); i++) {
796
builder.set(fields.get(i).getName(), values[i]);
797
}
798
799
return builder.build();
800
}
801
802
private <T> T deserializeConfig(Class<T> configClass, Map<String, String> properties)
803
throws Exception {
804
// Implementation for deserializing configuration from properties
805
return configClass.newInstance(); // Simplified - real implementation would use reflection
806
}
807
}
808
809
// Mock emitter for testing
810
public class MockEmitter<T> implements Emitter<T> {
811
private final List<T> emitted = new ArrayList<>();
812
private final List<InvalidEntry<T>> errors = new ArrayList<>();
813
814
@Override
815
public void emit(T value) {
816
emitted.add(value);
817
}
818
819
@Override
820
public void emitError(InvalidEntry<T> invalidEntry) {
821
errors.add(invalidEntry);
822
}
823
824
public List<T> getEmitted() {
825
return new ArrayList<>(emitted);
826
}
827
828
public List<InvalidEntry<T>> getErrors() {
829
return new ArrayList<>(errors);
830
}
831
832
public void clear() {
833
emitted.clear();
834
errors.clear();
835
}
836
}
837
```
838
839
The CDAP Plugin System enables building modular, reusable data processing components with strong type safety, comprehensive configuration management, and enterprise-grade operational features. This extensibility framework is essential for creating scalable, maintainable data processing applications.