CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-api

Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.

Pending
Overview
Eval results
Files

plugin-framework.mddocs/

Plugin Framework

CDAP's Plugin Framework provides an extensible architecture for adding custom processing logic, data sources, sinks, and transformations to applications without modifying core application code.

Core Plugin Classes

PluginConfig

public class PluginConfig {
    // Base plugin configuration class
    // Extend this class to add plugin-specific configuration properties
}

Base configuration class for plugins. All plugin configurations should extend this class and use annotations to define configurable properties.

Plugin

public class Plugin {
    public String getType();
    public String getName();
    public ArtifactId getArtifactId();
    public PluginClass getPluginClass();
    public PluginProperties getProperties();
}

Represents a plugin instance with its metadata and configuration.

PluginClass

public class PluginClass {
    public String getType();
    public String getName();
    public String getDescription();
    public String getClassName();
    public String getConfigFieldName();
    public Map<String, PluginPropertyField> getProperties();
    public Set<String> getEndpoints();
    public ArtifactId getParent();
}

Metadata describing a plugin class including its properties and capabilities.

Plugin Context and Management

PluginContext

public interface PluginContext {
    <T> T newPluginInstance(String pluginId) throws InstantiationException;
    <T> T newPluginInstance(String pluginId, MacroEvaluator macroEvaluator) 
        throws InstantiationException;
    
    <T> Class<T> loadPluginClass(String pluginId);
    
    boolean isPluginAvailable(String pluginId);
    
    Map<String, String> getPluginProperties(String pluginId);
    PluginProperties getPluginProperties(String pluginId, MacroEvaluator macroEvaluator);
}

Runtime context for accessing and instantiating plugins within programs and services.

PluginConfigurer

public interface PluginConfigurer {
    void usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties);
    void usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties,
                   PluginSelector selector);
    
    <T> T usePluginClass(String pluginType, String pluginName, String pluginId, PluginProperties properties);
    <T> T usePluginClass(String pluginType, String pluginName, String pluginId, PluginProperties properties,
                         PluginSelector selector);
}

Interface for configuring plugin usage in applications and programs.

Plugin Properties and Configuration

PluginProperties

public class PluginProperties {
    public static Builder builder();
    
    public Map<String, String> getProperties();
    public String get(String key);
    public String get(String key, String defaultValue);
    
    public static class Builder {
        public Builder add(String key, String value);
        public Builder addAll(Map<String, String> properties);
        public PluginProperties build();
    }
}

Properties container for plugin configuration values.

PluginPropertyField

public class PluginPropertyField {
    public String getName();
    public String getType();
    public String getDescription();
    public boolean isRequired();
    public boolean isMacroSupported();
    public Set<String> getChildren();
}

Metadata for individual plugin configuration properties.

Plugin Selection

PluginSelector

public interface PluginSelector {
    Map.Entry<ArtifactId, PluginClass> select(SortedMap<ArtifactId, PluginClass> plugins);
}

Interface for custom plugin selection logic when multiple versions are available.

Requirements

public class Requirements {
    public static Builder builder();
    
    public Set<String> getCapabilities();
    public Set<String> getDatasetTypes();
    
    public static class Builder {
        public Builder addCapabilities(String... capabilities);
        public Builder addDatasetTypes(String... datasetTypes);
        public Requirements build();
    }
}

Specifies requirements that must be satisfied for plugin execution.

Plugin Annotations

@Plugin

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Plugin {
    String type();
}

Marks a class as a CDAP plugin of the specified type.

@Name

@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Name {
    String value();
}

Specifies the name for plugins, plugin properties, or other named elements.

@Description

@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Description {
    String value();
}

Provides human-readable descriptions for plugins and properties.

@Macro

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Macro {
}

Indicates that a plugin property supports macro substitution.

Usage Examples

Basic Plugin Implementation

@Plugin(type = "transform")
@Name("FieldUppercase")
@Description("Transforms specified field values to uppercase")
public class FieldUppercaseTransform extends PluginConfig {
    
    @Name("field")
    @Description("Name of the field to transform")
    @Macro
    private String fieldName;
    
    @Name("preserveOriginal")
    @Description("Whether to preserve the original field value")
    private boolean preserveOriginal = false;
    
    public String getFieldName() {
        return fieldName;
    }
    
    public boolean shouldPreserveOriginal() {
        return preserveOriginal;
    }
    
    public Record transform(Record input) {
        Record.Builder builder = Record.builder(input);
        
        String originalValue = input.get(fieldName);
        if (originalValue != null) {
            String transformedValue = originalValue.toUpperCase();
            builder.set(fieldName, transformedValue);
            
            if (preserveOriginal) {
                builder.set(fieldName + "_original", originalValue);
            }
        }
        
        return builder.build();
    }
}

Data Source Plugin

@Plugin(type = "batchsource")
@Name("FileSource")
@Description("Reads data from files in specified format")
public class FileSourceConfig extends PluginConfig {
    
    @Name("path")
    @Description("Path to input files")
    @Macro
    private String path;
    
    @Name("format")
    @Description("Input file format")
    private String format = "csv";
    
    @Name("schema")
    @Description("Schema of the input data")
    private String schema;
    
    // Getters and validation methods
    public String getPath() { return path; }
    public String getFormat() { return format; }
    public Schema getSchema() { return Schema.parseJson(schema); }
    
    public void validate() {
        if (path == null || path.isEmpty()) {
            throw new IllegalArgumentException("Path must be specified");
        }
        
        if (schema == null || schema.isEmpty()) {
            throw new IllegalArgumentException("Schema must be specified");
        }
    }
}

@Plugin(type = "batchsource")
public class FileSource extends BatchSource<NullWritable, Text, StructuredRecord> {
    
    private final FileSourceConfig config;
    
    public FileSource(FileSourceConfig config) {
        this.config = config;
    }
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        config.validate();
        pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
    }
    
    @Override
    public void prepareRun(BatchSourceContext context) throws Exception {
        Job job = JobUtils.createInstance();
        FileInputFormat.addInputPath(job, new Path(config.getPath()));
        context.setInput(Input.of(config.getReferenceName(), new InputFormatProvider(config.getFormat(), job.getConfiguration())));
    }
}

Plugin Usage in Applications

public class PluginApplication extends AbstractApplication<Config> {
    
    @Override
    public void configure() {
        setName("PluginBasedApp");
        
        // Use transform plugin
        usePlugin("transform", "fieldTransform", "transformer1",
                 PluginProperties.builder()
                     .add("field", "customerName")
                     .add("operation", "uppercase")
                     .build());
        
        // Use data source plugin
        usePlugin("batchsource", "fileSource", "source1",
                 PluginProperties.builder()
                     .add("path", "/data/input")
                     .add("format", "json")
                     .add("schema", customerSchema)
                     .build());
        
        addMapReduce(new PluginBasedProcessor());
    }
}

Plugin Usage in Programs

public class PluginBasedMapReduce extends AbstractMapReduce {
    
    @Override
    public void configure(MapReduceConfigurer configurer) {
        configurer.usePlugin("validator", "dataValidator", "validator1",
                            PluginProperties.builder()
                                .add("rules", validationRules)
                                .build());
    }
    
    @Override
    public void initialize(MapReduceContext context) throws Exception {
        Job job = context.getHadoopJob();
        job.setMapperClass(PluginAwareMapper.class);
        
        context.addInput(Input.ofDataset("inputData"));
        context.addOutput(Output.ofDataset("validatedData"));
    }
    
    public static class PluginAwareMapper extends Mapper<byte[], Record, byte[], Record> {
        private DataValidator validator;
        
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            MapReduceTaskContext<byte[], Record, byte[], Record> cdapContext = 
                (MapReduceTaskContext<byte[], Record, byte[], Record>) context;
            
            validator = cdapContext.getPluginContext().newPluginInstance("validator1");
        }
        
        @Override
        protected void map(byte[] key, Record record, Context context) 
            throws IOException, InterruptedException {
            
            if (validator.isValid(record)) {
                context.write(key, record);
            } else {
                // Log invalid record or write to error dataset
                context.getCounter("Validation", "InvalidRecords").increment(1);
            }
        }
    }
}

Advanced Plugin with Dependencies

@Plugin(type = "sink")
@Name("DatabaseSink")
@Description("Writes data to database tables")
@Requirements(capabilities = {"database.connection"})
public class DatabaseSinkConfig extends PluginConfig {
    
    @Name("connectionString")
    @Description("Database connection string")
    @Macro
    private String connectionString;
    
    @Name("tableName")
    @Description("Target table name")
    @Macro
    private String tableName;
    
    @Name("batchSize")
    @Description("Batch size for inserts")
    private int batchSize = 100;
    
    @Name("credentials")
    @Description("Database credentials")
    private DatabaseCredentials credentials;
    
    // Configuration validation
    public void validate() {
        if (connectionString == null || connectionString.isEmpty()) {
            throw new IllegalArgumentException("Connection string is required");
        }
        
        if (tableName == null || tableName.isEmpty()) {
            throw new IllegalArgumentException("Table name is required");
        }
        
        if (batchSize <= 0) {
            throw new IllegalArgumentException("Batch size must be positive");
        }
    }
    
    // Getters
    public String getConnectionString() { return connectionString; }
    public String getTableName() { return tableName; }
    public int getBatchSize() { return batchSize; }
    public DatabaseCredentials getCredentials() { return credentials; }
}

@Plugin(type = "sink")
public class DatabaseSink extends BatchSink<StructuredRecord, NullWritable, NullWritable> {
    
    private final DatabaseSinkConfig config;
    
    public DatabaseSink(DatabaseSinkConfig config) {
        this.config = config;
    }
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        config.validate();
        
        // Verify database connection and table schema
        try (Connection connection = createConnection()) {
            validateTableSchema(connection, config.getTableName());
        } catch (SQLException e) {
            throw new IllegalArgumentException("Cannot connect to database: " + e.getMessage(), e);
        }
    }
    
    @Override
    public void prepareRun(BatchSinkContext context) throws Exception {
        Job job = JobUtils.createInstance();
        job.getConfiguration().set("db.connection.string", config.getConnectionString());
        job.getConfiguration().set("db.table.name", config.getTableName());
        job.getConfiguration().setInt("db.batch.size", config.getBatchSize());
        
        context.addOutput(Output.of(config.getReferenceName(), new OutputFormatProvider("DatabaseOutputFormat", job.getConfiguration())));
    }
}

This plugin framework enables modular, configurable, and reusable components that can be shared across different applications and use cases.

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-api

docs

annotations.md

application-framework.md

dataset-management.md

index.md

mapreduce-programs.md

plugin-framework.md

scheduling.md

service-programs.md

spark-programs.md

system-services.md

transactions.md

worker-programs.md

workflow-programs.md

tile.json