0
# Plugin Framework
1
2
CDAP's Plugin Framework provides an extensible architecture for adding custom processing logic, data sources, sinks, and transformations to applications without modifying core application code.
3
4
## Core Plugin Classes
5
6
### PluginConfig
7
8
```java { .api }
9
public class PluginConfig {
10
// Base plugin configuration class
11
// Extend this class to add plugin-specific configuration properties
12
}
13
```
14
15
Base configuration class for plugins. All plugin configurations should extend this class and use annotations to define configurable properties.
16
17
### Plugin
18
19
```java { .api }
20
public class Plugin {
21
public String getType();
22
public String getName();
23
public ArtifactId getArtifactId();
24
public PluginClass getPluginClass();
25
public PluginProperties getProperties();
26
}
27
```
28
29
Represents a plugin instance with its metadata and configuration.
30
31
### PluginClass
32
33
```java { .api }
34
public class PluginClass {
35
public String getType();
36
public String getName();
37
public String getDescription();
38
public String getClassName();
39
public String getConfigFieldName();
40
public Map<String, PluginPropertyField> getProperties();
41
public Set<String> getEndpoints();
42
public ArtifactId getParent();
43
}
44
```
45
46
Metadata describing a plugin class including its properties and capabilities.
47
48
## Plugin Context and Management
49
50
### PluginContext
51
52
```java { .api }
53
public interface PluginContext {
54
<T> T newPluginInstance(String pluginId) throws InstantiationException;
55
<T> T newPluginInstance(String pluginId, MacroEvaluator macroEvaluator)
56
throws InstantiationException;
57
58
<T> Class<T> loadPluginClass(String pluginId);
59
60
boolean isPluginAvailable(String pluginId);
61
62
Map<String, String> getPluginProperties(String pluginId);
63
PluginProperties getPluginProperties(String pluginId, MacroEvaluator macroEvaluator);
64
}
65
```
66
67
Runtime context for accessing and instantiating plugins within programs and services.
68
69
### PluginConfigurer
70
71
```java { .api }
72
public interface PluginConfigurer {
73
void usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties);
74
void usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties,
75
PluginSelector selector);
76
77
<T> T usePluginClass(String pluginType, String pluginName, String pluginId, PluginProperties properties);
78
<T> T usePluginClass(String pluginType, String pluginName, String pluginId, PluginProperties properties,
79
PluginSelector selector);
80
}
81
```
82
83
Interface for configuring plugin usage in applications and programs.
84
85
## Plugin Properties and Configuration
86
87
### PluginProperties
88
89
```java { .api }
90
public class PluginProperties {
91
public static Builder builder();
92
93
public Map<String, String> getProperties();
94
public String get(String key);
95
public String get(String key, String defaultValue);
96
97
public static class Builder {
98
public Builder add(String key, String value);
99
public Builder addAll(Map<String, String> properties);
100
public PluginProperties build();
101
}
102
}
103
```
104
105
Properties container for plugin configuration values.
106
107
### PluginPropertyField
108
109
```java { .api }
110
public class PluginPropertyField {
111
public String getName();
112
public String getType();
113
public String getDescription();
114
public boolean isRequired();
115
public boolean isMacroSupported();
116
public Set<String> getChildren();
117
}
118
```
119
120
Metadata for individual plugin configuration properties.
121
122
## Plugin Selection
123
124
### PluginSelector
125
126
```java { .api }
127
public interface PluginSelector {
128
Map.Entry<ArtifactId, PluginClass> select(SortedMap<ArtifactId, PluginClass> plugins);
129
}
130
```
131
132
Interface for custom plugin selection logic when multiple versions are available.
133
134
### Requirements
135
136
```java { .api }
137
public class Requirements {
138
public static Builder builder();
139
140
public Set<String> getCapabilities();
141
public Set<String> getDatasetTypes();
142
143
public static class Builder {
144
public Builder addCapabilities(String... capabilities);
145
public Builder addDatasetTypes(String... datasetTypes);
146
public Requirements build();
147
}
148
}
149
```
150
151
Specifies requirements that must be satisfied for plugin execution.
152
153
## Plugin Annotations
154
155
### @Plugin
156
157
```java { .api }
158
@Target(ElementType.TYPE)
159
@Retention(RetentionPolicy.RUNTIME)
160
public @interface Plugin {
161
String type();
162
}
163
```
164
165
Marks a class as a CDAP plugin of the specified type.
166
167
### @Name
168
169
```java { .api }
170
@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})
171
@Retention(RetentionPolicy.RUNTIME)
172
public @interface Name {
173
String value();
174
}
175
```
176
177
Specifies the name for plugins, plugin properties, or other named elements.
178
179
### @Description
180
181
```java { .api }
182
@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})
183
@Retention(RetentionPolicy.RUNTIME)
184
public @interface Description {
185
String value();
186
}
187
```
188
189
Provides human-readable descriptions for plugins and properties.
190
191
### @Macro
192
193
```java { .api }
194
@Target(ElementType.FIELD)
195
@Retention(RetentionPolicy.RUNTIME)
196
public @interface Macro {
197
}
198
```
199
200
Indicates that a plugin property supports macro substitution.
201
202
## Usage Examples
203
204
### Basic Plugin Implementation
205
206
```java
207
@Plugin(type = "transform")
208
@Name("FieldUppercase")
209
@Description("Transforms specified field values to uppercase")
210
public class FieldUppercaseTransform extends PluginConfig {
211
212
@Name("field")
213
@Description("Name of the field to transform")
214
@Macro
215
private String fieldName;
216
217
@Name("preserveOriginal")
218
@Description("Whether to preserve the original field value")
219
private boolean preserveOriginal = false;
220
221
public String getFieldName() {
222
return fieldName;
223
}
224
225
public boolean shouldPreserveOriginal() {
226
return preserveOriginal;
227
}
228
229
public Record transform(Record input) {
230
Record.Builder builder = Record.builder(input);
231
232
String originalValue = input.get(fieldName);
233
if (originalValue != null) {
234
String transformedValue = originalValue.toUpperCase();
235
builder.set(fieldName, transformedValue);
236
237
if (preserveOriginal) {
238
builder.set(fieldName + "_original", originalValue);
239
}
240
}
241
242
return builder.build();
243
}
244
}
245
```
246
247
### Data Source Plugin
248
249
```java
250
@Plugin(type = "batchsource")
251
@Name("FileSource")
252
@Description("Reads data from files in specified format")
253
public class FileSourceConfig extends PluginConfig {
254
255
@Name("path")
256
@Description("Path to input files")
257
@Macro
258
private String path;
259
260
@Name("format")
261
@Description("Input file format")
262
private String format = "csv";
263
264
@Name("schema")
265
@Description("Schema of the input data")
266
private String schema;
267
268
// Getters and validation methods
269
public String getPath() { return path; }
270
public String getFormat() { return format; }
271
public Schema getSchema() { return Schema.parseJson(schema); }
272
273
public void validate() {
274
if (path == null || path.isEmpty()) {
275
throw new IllegalArgumentException("Path must be specified");
276
}
277
278
if (schema == null || schema.isEmpty()) {
279
throw new IllegalArgumentException("Schema must be specified");
280
}
281
}
282
}
283
284
@Plugin(type = "batchsource")
285
public class FileSource extends BatchSource<NullWritable, Text, StructuredRecord> {
286
287
private final FileSourceConfig config;
288
289
public FileSource(FileSourceConfig config) {
290
this.config = config;
291
}
292
293
@Override
294
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
295
config.validate();
296
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
297
}
298
299
@Override
300
public void prepareRun(BatchSourceContext context) throws Exception {
301
Job job = JobUtils.createInstance();
302
FileInputFormat.addInputPath(job, new Path(config.getPath()));
303
context.setInput(Input.of(config.getReferenceName(), new InputFormatProvider(config.getFormat(), job.getConfiguration())));
304
}
305
}
306
```
307
308
### Plugin Usage in Applications
309
310
```java
311
public class PluginApplication extends AbstractApplication<Config> {
312
313
@Override
314
public void configure() {
315
setName("PluginBasedApp");
316
317
// Use transform plugin
318
usePlugin("transform", "fieldTransform", "transformer1",
319
PluginProperties.builder()
320
.add("field", "customerName")
321
.add("operation", "uppercase")
322
.build());
323
324
// Use data source plugin
325
usePlugin("batchsource", "fileSource", "source1",
326
PluginProperties.builder()
327
.add("path", "/data/input")
328
.add("format", "json")
329
.add("schema", customerSchema)
330
.build());
331
332
addMapReduce(new PluginBasedProcessor());
333
}
334
}
335
```
336
337
### Plugin Usage in Programs
338
339
```java
340
public class PluginBasedMapReduce extends AbstractMapReduce {
341
342
@Override
343
public void configure(MapReduceConfigurer configurer) {
344
configurer.usePlugin("validator", "dataValidator", "validator1",
345
PluginProperties.builder()
346
.add("rules", validationRules)
347
.build());
348
}
349
350
@Override
351
public void initialize(MapReduceContext context) throws Exception {
352
Job job = context.getHadoopJob();
353
job.setMapperClass(PluginAwareMapper.class);
354
355
context.addInput(Input.ofDataset("inputData"));
356
context.addOutput(Output.ofDataset("validatedData"));
357
}
358
359
public static class PluginAwareMapper extends Mapper<byte[], Record, byte[], Record> {
360
private DataValidator validator;
361
362
@Override
363
protected void setup(Context context) throws IOException, InterruptedException {
364
MapReduceTaskContext<byte[], Record, byte[], Record> cdapContext =
365
(MapReduceTaskContext<byte[], Record, byte[], Record>) context;
366
367
validator = cdapContext.getPluginContext().newPluginInstance("validator1");
368
}
369
370
@Override
371
protected void map(byte[] key, Record record, Context context)
372
throws IOException, InterruptedException {
373
374
if (validator.isValid(record)) {
375
context.write(key, record);
376
} else {
377
// Log invalid record or write to error dataset
378
context.getCounter("Validation", "InvalidRecords").increment(1);
379
}
380
}
381
}
382
}
383
```
384
385
### Advanced Plugin with Dependencies
386
387
```java
388
@Plugin(type = "sink")
389
@Name("DatabaseSink")
390
@Description("Writes data to database tables")
391
@Requirements(capabilities = {"database.connection"})
392
public class DatabaseSinkConfig extends PluginConfig {
393
394
@Name("connectionString")
395
@Description("Database connection string")
396
@Macro
397
private String connectionString;
398
399
@Name("tableName")
400
@Description("Target table name")
401
@Macro
402
private String tableName;
403
404
@Name("batchSize")
405
@Description("Batch size for inserts")
406
private int batchSize = 100;
407
408
@Name("credentials")
409
@Description("Database credentials")
410
private DatabaseCredentials credentials;
411
412
// Configuration validation
413
public void validate() {
414
if (connectionString == null || connectionString.isEmpty()) {
415
throw new IllegalArgumentException("Connection string is required");
416
}
417
418
if (tableName == null || tableName.isEmpty()) {
419
throw new IllegalArgumentException("Table name is required");
420
}
421
422
if (batchSize <= 0) {
423
throw new IllegalArgumentException("Batch size must be positive");
424
}
425
}
426
427
// Getters
428
public String getConnectionString() { return connectionString; }
429
public String getTableName() { return tableName; }
430
public int getBatchSize() { return batchSize; }
431
public DatabaseCredentials getCredentials() { return credentials; }
432
}
433
434
@Plugin(type = "sink")
435
public class DatabaseSink extends BatchSink<StructuredRecord, NullWritable, NullWritable> {
436
437
private final DatabaseSinkConfig config;
438
439
public DatabaseSink(DatabaseSinkConfig config) {
440
this.config = config;
441
}
442
443
@Override
444
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
445
config.validate();
446
447
// Verify database connection and table schema
448
try (Connection connection = createConnection()) {
449
validateTableSchema(connection, config.getTableName());
450
} catch (SQLException e) {
451
throw new IllegalArgumentException("Cannot connect to database: " + e.getMessage(), e);
452
}
453
}
454
455
@Override
456
public void prepareRun(BatchSinkContext context) throws Exception {
457
Job job = JobUtils.createInstance();
458
job.getConfiguration().set("db.connection.string", config.getConnectionString());
459
job.getConfiguration().set("db.table.name", config.getTableName());
460
job.getConfiguration().setInt("db.batch.size", config.getBatchSize());
461
462
context.addOutput(Output.of(config.getReferenceName(), new OutputFormatProvider("DatabaseOutputFormat", job.getConfiguration())));
463
}
464
}
465
```
466
467
This plugin framework enables modular, configurable, and reusable components that can be shared across different applications and use cases.