Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.
—
CDAP's Plugin Framework provides an extensible architecture for adding custom processing logic, data sources, sinks, and transformations to applications without modifying core application code.
public class PluginConfig {
// Base plugin configuration class
// Extend this class to add plugin-specific configuration properties
}Base configuration class for plugins. All plugin configurations should extend this class and use annotations to define configurable properties.
public class Plugin {
public String getType();
public String getName();
public ArtifactId getArtifactId();
public PluginClass getPluginClass();
public PluginProperties getProperties();
}Represents a plugin instance with its metadata and configuration.
public class PluginClass {
public String getType();
public String getName();
public String getDescription();
public String getClassName();
public String getConfigFieldName();
public Map<String, PluginPropertyField> getProperties();
public Set<String> getEndpoints();
public ArtifactId getParent();
}Metadata describing a plugin class including its properties and capabilities.
public interface PluginContext {
<T> T newPluginInstance(String pluginId) throws InstantiationException;
<T> T newPluginInstance(String pluginId, MacroEvaluator macroEvaluator)
throws InstantiationException;
<T> Class<T> loadPluginClass(String pluginId);
boolean isPluginAvailable(String pluginId);
Map<String, String> getPluginProperties(String pluginId);
PluginProperties getPluginProperties(String pluginId, MacroEvaluator macroEvaluator);
}Runtime context for accessing and instantiating plugins within programs and services.
public interface PluginConfigurer {
void usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties);
void usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties,
PluginSelector selector);
<T> T usePluginClass(String pluginType, String pluginName, String pluginId, PluginProperties properties);
<T> T usePluginClass(String pluginType, String pluginName, String pluginId, PluginProperties properties,
PluginSelector selector);
}Interface for configuring plugin usage in applications and programs.
public class PluginProperties {
public static Builder builder();
public Map<String, String> getProperties();
public String get(String key);
public String get(String key, String defaultValue);
public static class Builder {
public Builder add(String key, String value);
public Builder addAll(Map<String, String> properties);
public PluginProperties build();
}
}Properties container for plugin configuration values.
public class PluginPropertyField {
public String getName();
public String getType();
public String getDescription();
public boolean isRequired();
public boolean isMacroSupported();
public Set<String> getChildren();
}Metadata for individual plugin configuration properties.
public interface PluginSelector {
Map.Entry<ArtifactId, PluginClass> select(SortedMap<ArtifactId, PluginClass> plugins);
}Interface for custom plugin selection logic when multiple versions are available.
public class Requirements {
public static Builder builder();
public Set<String> getCapabilities();
public Set<String> getDatasetTypes();
public static class Builder {
public Builder addCapabilities(String... capabilities);
public Builder addDatasetTypes(String... datasetTypes);
public Requirements build();
}
}Specifies requirements that must be satisfied for plugin execution.
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Plugin {
String type();
}Marks a class as a CDAP plugin of the specified type.
@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Name {
String value();
}Specifies the name for plugins, plugin properties, or other named elements.
@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Description {
String value();
}Provides human-readable descriptions for plugins and properties.
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Macro {
}Indicates that a plugin property supports macro substitution.
@Plugin(type = "transform")
@Name("FieldUppercase")
@Description("Transforms specified field values to uppercase")
public class FieldUppercaseTransform extends PluginConfig {
@Name("field")
@Description("Name of the field to transform")
@Macro
private String fieldName;
@Name("preserveOriginal")
@Description("Whether to preserve the original field value")
private boolean preserveOriginal = false;
public String getFieldName() {
return fieldName;
}
public boolean shouldPreserveOriginal() {
return preserveOriginal;
}
public Record transform(Record input) {
Record.Builder builder = Record.builder(input);
String originalValue = input.get(fieldName);
if (originalValue != null) {
String transformedValue = originalValue.toUpperCase();
builder.set(fieldName, transformedValue);
if (preserveOriginal) {
builder.set(fieldName + "_original", originalValue);
}
}
return builder.build();
}
}@Plugin(type = "batchsource")
@Name("FileSource")
@Description("Reads data from files in specified format")
public class FileSourceConfig extends PluginConfig {
@Name("path")
@Description("Path to input files")
@Macro
private String path;
@Name("format")
@Description("Input file format")
private String format = "csv";
@Name("schema")
@Description("Schema of the input data")
private String schema;
// Getters and validation methods
public String getPath() { return path; }
public String getFormat() { return format; }
public Schema getSchema() { return Schema.parseJson(schema); }
public void validate() {
if (path == null || path.isEmpty()) {
throw new IllegalArgumentException("Path must be specified");
}
if (schema == null || schema.isEmpty()) {
throw new IllegalArgumentException("Schema must be specified");
}
}
}
@Plugin(type = "batchsource")
public class FileSource extends BatchSource<NullWritable, Text, StructuredRecord> {
private final FileSourceConfig config;
public FileSource(FileSourceConfig config) {
this.config = config;
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
config.validate();
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
}
@Override
public void prepareRun(BatchSourceContext context) throws Exception {
Job job = JobUtils.createInstance();
FileInputFormat.addInputPath(job, new Path(config.getPath()));
context.setInput(Input.of(config.getReferenceName(), new InputFormatProvider(config.getFormat(), job.getConfiguration())));
}
}public class PluginApplication extends AbstractApplication<Config> {
@Override
public void configure() {
setName("PluginBasedApp");
// Use transform plugin
usePlugin("transform", "fieldTransform", "transformer1",
PluginProperties.builder()
.add("field", "customerName")
.add("operation", "uppercase")
.build());
// Use data source plugin
usePlugin("batchsource", "fileSource", "source1",
PluginProperties.builder()
.add("path", "/data/input")
.add("format", "json")
.add("schema", customerSchema)
.build());
addMapReduce(new PluginBasedProcessor());
}
}public class PluginBasedMapReduce extends AbstractMapReduce {
@Override
public void configure(MapReduceConfigurer configurer) {
configurer.usePlugin("validator", "dataValidator", "validator1",
PluginProperties.builder()
.add("rules", validationRules)
.build());
}
@Override
public void initialize(MapReduceContext context) throws Exception {
Job job = context.getHadoopJob();
job.setMapperClass(PluginAwareMapper.class);
context.addInput(Input.ofDataset("inputData"));
context.addOutput(Output.ofDataset("validatedData"));
}
public static class PluginAwareMapper extends Mapper<byte[], Record, byte[], Record> {
private DataValidator validator;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
MapReduceTaskContext<byte[], Record, byte[], Record> cdapContext =
(MapReduceTaskContext<byte[], Record, byte[], Record>) context;
validator = cdapContext.getPluginContext().newPluginInstance("validator1");
}
@Override
protected void map(byte[] key, Record record, Context context)
throws IOException, InterruptedException {
if (validator.isValid(record)) {
context.write(key, record);
} else {
// Log invalid record or write to error dataset
context.getCounter("Validation", "InvalidRecords").increment(1);
}
}
}
}@Plugin(type = "sink")
@Name("DatabaseSink")
@Description("Writes data to database tables")
@Requirements(capabilities = {"database.connection"})
public class DatabaseSinkConfig extends PluginConfig {
@Name("connectionString")
@Description("Database connection string")
@Macro
private String connectionString;
@Name("tableName")
@Description("Target table name")
@Macro
private String tableName;
@Name("batchSize")
@Description("Batch size for inserts")
private int batchSize = 100;
@Name("credentials")
@Description("Database credentials")
private DatabaseCredentials credentials;
// Configuration validation
public void validate() {
if (connectionString == null || connectionString.isEmpty()) {
throw new IllegalArgumentException("Connection string is required");
}
if (tableName == null || tableName.isEmpty()) {
throw new IllegalArgumentException("Table name is required");
}
if (batchSize <= 0) {
throw new IllegalArgumentException("Batch size must be positive");
}
}
// Getters
public String getConnectionString() { return connectionString; }
public String getTableName() { return tableName; }
public int getBatchSize() { return batchSize; }
public DatabaseCredentials getCredentials() { return credentials; }
}
@Plugin(type = "sink")
public class DatabaseSink extends BatchSink<StructuredRecord, NullWritable, NullWritable> {
private final DatabaseSinkConfig config;
public DatabaseSink(DatabaseSinkConfig config) {
this.config = config;
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
config.validate();
// Verify database connection and table schema
try (Connection connection = createConnection()) {
validateTableSchema(connection, config.getTableName());
} catch (SQLException e) {
throw new IllegalArgumentException("Cannot connect to database: " + e.getMessage(), e);
}
}
@Override
public void prepareRun(BatchSinkContext context) throws Exception {
Job job = JobUtils.createInstance();
job.getConfiguration().set("db.connection.string", config.getConnectionString());
job.getConfiguration().set("db.table.name", config.getTableName());
job.getConfiguration().setInt("db.batch.size", config.getBatchSize());
context.addOutput(Output.of(config.getReferenceName(), new OutputFormatProvider("DatabaseOutputFormat", job.getConfiguration())));
}
}This plugin framework enables modular, configurable, and reusable components that can be shared across different applications and use cases.
Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-api