The Cask Data Application Platform (CDAP) is an integrated, open source application development platform for the Hadoop ecosystem that provides developers with data and application abstractions to simplify and accelerate application development.
—
The CDAP Plugin System provides a powerful extensibility framework that allows developers to create reusable, configurable components for data processing pipelines. Plugins enable modular application development and promote code reuse across different applications and organizations.
import io.cdap.cdap.api.plugin.*;
import io.cdap.cdap.api.annotation.*;
// Plugin configurer interface
public interface PluginConfigurer {
<T> T usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties);
<T> T usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties,
PluginSelector selector);
<T> Class<T> usePluginClass(String pluginType, String pluginName, String pluginId,
PluginProperties properties);
<T> Class<T> usePluginClass(String pluginType, String pluginName, String pluginId,
PluginProperties properties, PluginSelector selector);
}
// Plugin runtime context
public interface PluginContext extends FeatureFlagsProvider {
<T> T newPluginInstance(String pluginId) throws InstantiationException;
<T> Class<T> loadPluginClass(String pluginId);
PluginProperties getPluginProperties(String pluginId);
Map<String, PluginProperties> getPlugins();
}
// Plugin metadata
public final class Plugin {
public static Plugin of(String type, String name, String pluginId, PluginProperties properties) {
/* create plugin instance */
}
public String getPluginType() { /* returns plugin type */ }
public String getPluginName() { /* returns plugin name */ }
public String getPluginId() { /* returns plugin ID */ }
public PluginProperties getProperties() { /* returns plugin properties */ }
public PluginSelector getSelector() { /* returns plugin selector */ }
}// Plugin properties container
public class PluginProperties implements Serializable {
public static Builder builder() { return new Builder(); }
public static PluginProperties of(Map<String, String> properties) { /* create from map */ }
public Map<String, String> getProperties() { /* returns properties map */ }
public String getProperty(String key) { /* returns property value */ }
public String getProperty(String key, String defaultValue) { /* returns property with default */ }
public static class Builder {
public Builder add(String key, String value) { /* add property */ return this; }
public Builder addAll(Map<String, String> properties) { /* add all properties */ return this; }
public PluginProperties build() { /* build properties */ }
}
}
// Base plugin configuration class
public abstract class PluginConfig extends Config implements Serializable {
// Base class for all plugin configurations
// Extend this class for typed plugin configurations
}
// Plugin class metadata
public class PluginClass {
public String getName() { /* returns plugin name */ }
public String getType() { /* returns plugin type */ }
public String getDescription() { /* returns plugin description */ }
public String getClassName() { /* returns plugin class name */ }
public String getCategory() { /* returns plugin category */ }
public Set<PluginPropertyField> getProperties() { /* returns plugin properties */ }
public Map<String, PluginPropertyField> getPropertiesMap() { /* returns properties as map */ }
public Requirements getRequirements() { /* returns plugin requirements */ }
}
// Plugin property field metadata
public class PluginPropertyField {
public String getName() { /* returns field name */ }
public String getDescription() { /* returns field description */ }
public String getType() { /* returns field type */ }
public boolean isRequired() { /* returns if field is required */ }
public boolean isMacroSupported() { /* returns if macros are supported */ }
public boolean isMacroEscapingEnabled() { /* returns if macro escaping is enabled */ }
public Set<String> getChildren() { /* returns child field names */ }
}// Core plugin annotations
@Plugin(type = "source") // Marks a class as a plugin of specific type
@Name("MySourcePlugin") // Specifies the plugin name
@Description("Reads data from external source") // Provides plugin description
@Category("source") // Categorizes the plugin
// Property annotations
@Property // Marks fields as configuration properties
@Macro // Enables macro substitution in field values
@Description("Input path for data files") // Describes configuration properties
// Metadata annotations
@Metadata(properties = {
@MetadataProperty(key = "doc.url", value = "https://example.com/docs"),
@MetadataProperty(key = "author", value = "Data Team")
})Source plugins read data from external systems:
// Source plugin configuration
public class FileSourceConfig extends PluginConfig {
@Name("path")
@Description("Path to input files")
@Macro
@Property
private String path;
@Name("format")
@Description("File format (json, csv, avro, parquet)")
@Property
private String format = "json";
@Name("schema")
@Description("Schema of the input data")
@Property
private String schema;
@Name("recursive")
@Description("Whether to read files recursively")
@Property
private Boolean recursive = false;
// Getters and validation methods
public String getPath() { return path; }
public String getFormat() { return format; }
public String getSchema() { return schema; }
public Boolean getRecursive() { return recursive; }
public void validate() {
if (path == null || path.isEmpty()) {
throw new IllegalArgumentException("Path cannot be empty");
}
if (!Arrays.asList("json", "csv", "avro", "parquet").contains(format)) {
throw new IllegalArgumentException("Unsupported format: " + format);
}
}
}
// Source plugin implementation
@Plugin(type = "batchsource")
@Name("FileSource")
@Description("Reads data from files in various formats")
@Category("source")
@Metadata(properties = {
@MetadataProperty(key = "doc.url", value = "https://docs.example.com/plugins/file-source")
})
public class FileSourcePlugin extends BatchSource<NullWritable, Text, StructuredRecord> {
private final FileSourceConfig config;
public FileSourcePlugin(FileSourceConfig config) {
this.config = config;
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
// Validate configuration
config.validate();
// Set output schema
try {
Schema outputSchema = Schema.parseJson(config.getSchema());
pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
} catch (IOException e) {
throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e);
}
}
@Override
public void prepareRun(BatchSourceContext context) throws Exception {
// Prepare the source for execution
Job job = context.getHadoopJob();
// Configure input format based on file type
switch (config.getFormat().toLowerCase()) {
case "json":
job.setInputFormatClass(TextInputFormat.class);
break;
case "csv":
job.setInputFormatClass(TextInputFormat.class);
break;
case "avro":
job.setInputFormatClass(AvroKeyInputFormat.class);
break;
case "parquet":
job.setInputFormatClass(ParquetInputFormat.class);
break;
default:
throw new IllegalArgumentException("Unsupported format: " + config.getFormat());
}
// Set input path
FileInputFormat.addInputPath(job, new Path(config.getPath()));
// Configure recursive search if enabled
if (config.getRecursive()) {
FileInputFormat.setInputDirRecursive(job, true);
}
}
@Override
public void transform(KeyValue<NullWritable, Text> input, Emitter<StructuredRecord> emitter) throws Exception {
String line = input.getValue().toString();
// Parse based on format
StructuredRecord record = parseRecord(line, config.getFormat(), config.getSchema());
if (record != null) {
emitter.emit(record);
}
}
private StructuredRecord parseRecord(String line, String format, String schemaStr) throws IOException {
Schema schema = Schema.parseJson(schemaStr);
switch (format.toLowerCase()) {
case "json":
return parseJsonRecord(line, schema);
case "csv":
return parseCsvRecord(line, schema);
default:
throw new UnsupportedOperationException("Format not supported in transform: " + format);
}
}
private StructuredRecord parseJsonRecord(String jsonLine, Schema schema) {
try {
JsonObject json = new JsonParser().parse(jsonLine).getAsJsonObject();
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
for (Schema.Field field : schema.getFields()) {
String fieldName = field.getName();
if (json.has(fieldName) && !json.get(fieldName).isJsonNull()) {
Object value = parseJsonValue(json.get(fieldName), field.getSchema());
builder.set(fieldName, value);
}
}
return builder.build();
} catch (Exception e) {
// Log error and skip malformed records
LOG.warn("Failed to parse JSON record: {}", jsonLine, e);
return null;
}
}
private Object parseJsonValue(JsonElement element, Schema fieldSchema) {
Schema.Type type = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getType() : fieldSchema.getType();
switch (type) {
case STRING:
return element.getAsString();
case INT:
return element.getAsInt();
case LONG:
return element.getAsLong();
case DOUBLE:
return element.getAsDouble();
case BOOLEAN:
return element.getAsBoolean();
default:
return element.getAsString();
}
}
}Transform plugins process and modify data:
// Transform plugin configuration
public class DataCleaningConfig extends PluginConfig {
@Name("fieldsToClean")
@Description("Comma-separated list of fields to clean")
@Property
private String fieldsToClean;
@Name("removeNulls")
@Description("Whether to remove records with null values")
@Property
private Boolean removeNulls = true;
@Name("trimWhitespace")
@Description("Whether to trim whitespace from string fields")
@Property
private Boolean trimWhitespace = true;
@Name("lowercaseStrings")
@Description("Whether to convert strings to lowercase")
@Property
private Boolean lowercaseStrings = false;
public List<String> getFieldsToClean() {
if (fieldsToClean == null || fieldsToClean.isEmpty()) {
return Collections.emptyList();
}
return Arrays.asList(fieldsToClean.split(","))
.stream()
.map(String::trim)
.collect(Collectors.toList());
}
// Other getters...
}
// Transform plugin implementation
@Plugin(type = "transform")
@Name("DataCleaning")
@Description("Cleans and standardizes data fields")
@Category("cleansing")
public class DataCleaningPlugin extends Transform<StructuredRecord, StructuredRecord> {
private final DataCleaningConfig config;
private List<String> fieldsToClean;
private Schema outputSchema;
public DataCleaningPlugin(DataCleaningConfig config) {
this.config = config;
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
Schema inputSchema = stageConfigurer.getInputSchema();
if (inputSchema != null) {
// Validate that specified fields exist
List<String> fieldsToClean = config.getFieldsToClean();
for (String fieldName : fieldsToClean) {
if (inputSchema.getField(fieldName) == null) {
throw new IllegalArgumentException("Field '" + fieldName + "' does not exist in input schema");
}
}
// Output schema is the same as input schema for cleaning operations
stageConfigurer.setOutputSchema(inputSchema);
}
}
@Override
public void initialize(TransformContext context) throws Exception {
super.initialize(context);
this.fieldsToClean = config.getFieldsToClean();
this.outputSchema = context.getOutputSchema();
}
@Override
public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
// Check if we should remove records with null values
if (config.getRemoveNulls() && hasNullFields(input)) {
// Skip this record
return;
}
StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema);
// Copy and clean each field
for (Schema.Field field : input.getSchema().getFields()) {
String fieldName = field.getName();
Object value = input.get(fieldName);
if (fieldsToClean.isEmpty() || fieldsToClean.contains(fieldName)) {
value = cleanFieldValue(value, field.getSchema());
}
builder.set(fieldName, value);
}
emitter.emit(builder.build());
}
private boolean hasNullFields(StructuredRecord record) {
for (String fieldName : fieldsToClean) {
if (record.get(fieldName) == null) {
return true;
}
}
return false;
}
private Object cleanFieldValue(Object value, Schema fieldSchema) {
if (value == null) {
return null;
}
Schema.Type type = fieldSchema.isNullable() ?
fieldSchema.getNonNullable().getType() : fieldSchema.getType();
if (type == Schema.Type.STRING) {
String stringValue = value.toString();
if (config.getTrimWhitespace()) {
stringValue = stringValue.trim();
}
if (config.getLowercaseStrings()) {
stringValue = stringValue.toLowerCase();
}
return stringValue;
}
return value;
}
}Sink plugins write data to external systems:
// Sink plugin configuration
public class DatabaseSinkConfig extends PluginConfig {
@Name("connectionString")
@Description("JDBC connection string")
@Macro
@Property
private String connectionString;
@Name("tableName")
@Description("Target table name")
@Macro
@Property
private String tableName;
@Name("username")
@Description("Database username")
@Macro
@Property
private String username;
@Name("password")
@Description("Database password")
@Macro
@Property
private String password;
@Name("batchSize")
@Description("Number of records to write in each batch")
@Property
private Integer batchSize = 1000;
// Getters and validation...
}
// Sink plugin implementation
@Plugin(type = "batchsink")
@Name("DatabaseSink")
@Description("Writes data to a relational database")
@Category("sink")
public class DatabaseSinkPlugin extends BatchSink<StructuredRecord, NullWritable, NullWritable> {
private final DatabaseSinkConfig config;
public DatabaseSinkPlugin(DatabaseSinkConfig config) {
this.config = config;
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
// Validate configuration
config.validate();
// Test database connection if not using macros
if (!containsMacros()) {
testConnection();
}
}
@Override
public void prepareRun(BatchSinkContext context) throws Exception {
Job job = context.getHadoopJob();
// Configure database output format
job.setOutputFormatClass(DatabaseOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
// Set database connection properties
DatabaseConfiguration.configureDB(job.getConfiguration(),
config.getConnectionString(),
config.getUsername(),
config.getPassword(),
config.getTableName());
}
@Override
public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, NullWritable>> emitter)
throws Exception {
// Convert StructuredRecord to database format and write
// This would typically buffer records and write in batches
writeRecordToDatabase(input);
// Emit to continue pipeline (if needed)
emitter.emit(new KeyValue<>(NullWritable.get(), NullWritable.get()));
}
private void writeRecordToDatabase(StructuredRecord record) throws SQLException {
// Implementation for writing record to database
// Use prepared statements and batch operations for efficiency
}
private boolean containsMacros() {
return config.getConnectionString().contains("${") ||
config.getTableName().contains("${") ||
config.getUsername().contains("${") ||
config.getPassword().contains("${");
}
private void testConnection() {
try (Connection conn = DriverManager.getConnection(
config.getConnectionString(), config.getUsername(), config.getPassword())) {
// Test connection and verify table exists
DatabaseMetaData metaData = conn.getMetaData();
try (ResultSet tables = metaData.getTables(null, null, config.getTableName(), null)) {
if (!tables.next()) {
throw new IllegalArgumentException("Table '" + config.getTableName() + "' does not exist");
}
}
} catch (SQLException e) {
throw new IllegalArgumentException("Failed to connect to database: " + e.getMessage(), e);
}
}
}// Plugin selector for choosing among multiple plugin candidates
public class PluginSelector {
public static final PluginSelector EMPTY = new PluginSelector(SortOrder.UNSPECIFIED, null);
public enum SortOrder {
CREATION_TIME_ASC,
CREATION_TIME_DESC,
VERSION_ASC,
VERSION_DESC,
UNSPECIFIED
}
public PluginSelector(SortOrder sortOrder) { /* constructor */ }
public PluginSelector(SortOrder sortOrder, String subtaskName) { /* constructor with subtask */ }
public SortOrder getSortOrder() { /* returns sort order */ }
public String getSubtaskName() { /* returns subtask name */ }
}
// Plugin requirements specification
public class Requirements {
public static Builder builder() { return new Builder(); }
public Set<String> getCapabilities() { /* returns required capabilities */ }
public Set<String> getDatasetTypes() { /* returns required dataset types */ }
public static class Builder {
public Builder addCapabilities(String... capabilities) { /* add capabilities */ return this; }
public Builder addDatasetTypes(String... datasetTypes) { /* add dataset types */ return this; }
public Requirements build() { /* build requirements */ }
}
}// Plugin configuration validation
public class InvalidPluginConfigException extends RuntimeException {
private final Set<InvalidPluginProperty> invalidProperties;
public InvalidPluginConfigException(String message, Set<InvalidPluginProperty> invalidProperties) {
super(message);
this.invalidProperties = invalidProperties;
}
public Set<InvalidPluginProperty> getInvalidProperties() {
return invalidProperties;
}
}
// Invalid plugin property details
public class InvalidPluginProperty {
public InvalidPluginProperty(String propertyName, String message) { /* constructor */ }
public String getPropertyName() { /* returns property name */ }
public String getMessage() { /* returns error message */ }
}
// Plugin validation utility
public abstract class ValidatingPluginConfig extends PluginConfig {
public final void validate() throws InvalidPluginConfigException {
Set<InvalidPluginProperty> errors = new HashSet<>();
try {
validateConfig(errors);
} catch (Exception e) {
errors.add(new InvalidPluginProperty("general", "Validation failed: " + e.getMessage()));
}
if (!errors.isEmpty()) {
throw new InvalidPluginConfigException("Plugin configuration is invalid", errors);
}
}
protected abstract void validateConfig(Set<InvalidPluginProperty> errors);
protected void validateRequired(String propertyName, String value, Set<InvalidPluginProperty> errors) {
if (value == null || value.trim().isEmpty()) {
errors.add(new InvalidPluginProperty(propertyName, "Property is required"));
}
}
protected void validateFormat(String propertyName, String value, String pattern,
Set<InvalidPluginProperty> errors) {
if (value != null && !value.matches(pattern)) {
errors.add(new InvalidPluginProperty(propertyName, "Invalid format"));
}
}
}// Multi-stage plugin configuration
public class CompositeTransformConfig extends PluginConfig {
@Name("stages")
@Description("JSON array of transformation stages")
@Property
private String stages;
public List<TransformStage> getStages() throws IOException {
JsonArray stagesArray = new JsonParser().parse(stages).getAsJsonArray();
List<TransformStage> stageList = new ArrayList<>();
for (JsonElement element : stagesArray) {
JsonObject stageObj = element.getAsJsonObject();
TransformStage stage = new TransformStage(
stageObj.get("name").getAsString(),
stageObj.get("type").getAsString(),
stageObj.get("config").getAsJsonObject()
);
stageList.add(stage);
}
return stageList;
}
}
// Composite plugin that chains multiple transformations
@Plugin(type = "transform")
@Name("CompositeTransform")
@Description("Applies multiple transformations in sequence")
public class CompositeTransformPlugin extends Transform<StructuredRecord, StructuredRecord> {
private final CompositeTransformConfig config;
private List<Transform<StructuredRecord, StructuredRecord>> transforms;
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
Schema currentSchema = stageConfigurer.getInputSchema();
// Configure each transform stage
for (TransformStage stage : config.getStages()) {
// Dynamically load and configure transform plugin
Transform<StructuredRecord, StructuredRecord> transform =
loadTransformPlugin(stage, pipelineConfigurer);
// Update schema through the pipeline
currentSchema = getTransformOutputSchema(transform, currentSchema);
}
stageConfigurer.setOutputSchema(currentSchema);
}
@Override
public void initialize(TransformContext context) throws Exception {
// Initialize all child transforms
transforms = new ArrayList<>();
for (TransformStage stage : config.getStages()) {
Transform<StructuredRecord, StructuredRecord> transform =
context.newPluginInstance(stage.getName());
transforms.add(transform);
}
}
@Override
public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
StructuredRecord current = input;
// Apply each transformation in sequence
for (Transform<StructuredRecord, StructuredRecord> transform : transforms) {
CollectingEmitter<StructuredRecord> collector = new CollectingEmitter<>();
transform.transform(current, collector);
List<StructuredRecord> results = collector.getEmitted();
if (results.size() == 1) {
current = results.get(0);
} else if (results.isEmpty()) {
// Record was filtered out
return;
} else {
// Multiple records produced - emit all but last, use last for next stage
for (int i = 0; i < results.size() - 1; i++) {
emitter.emit(results.get(i));
}
current = results.get(results.size() - 1);
}
}
emitter.emit(current);
}
}
// Plugin factory pattern
public class PluginFactory {
public static <T> T createPlugin(String pluginType, String pluginName,
PluginProperties properties, PluginContext context) {
return context.newPluginInstance(pluginName);
}
public static PluginProperties mergeProperties(PluginProperties base,
PluginProperties override) {
PluginProperties.Builder builder = PluginProperties.builder();
builder.addAll(base.getProperties());
builder.addAll(override.getProperties());
return builder.build();
}
}// Plugin test base class
public abstract class PluginTestBase {
protected <T extends PluginConfig> void validatePluginConfig(Class<T> configClass,
Map<String, String> properties)
throws Exception {
T config = deserializeConfig(configClass, properties);
if (config instanceof ValidatingPluginConfig) {
((ValidatingPluginConfig) config).validate();
}
}
protected Schema createTestSchema(String... fieldSpecs) {
List<Schema.Field> fields = new ArrayList<>();
for (String spec : fieldSpecs) {
String[] parts = spec.split(":");
String name = parts[0];
Schema.Type type = Schema.Type.valueOf(parts[1].toUpperCase());
fields.add(Schema.Field.of(name, Schema.of(type)));
}
return Schema.recordOf("TestRecord", fields);
}
protected StructuredRecord createTestRecord(Schema schema, Object... values) {
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
List<Schema.Field> fields = schema.getFields();
for (int i = 0; i < Math.min(fields.size(), values.length); i++) {
builder.set(fields.get(i).getName(), values[i]);
}
return builder.build();
}
private <T> T deserializeConfig(Class<T> configClass, Map<String, String> properties)
throws Exception {
// Implementation for deserializing configuration from properties
return configClass.newInstance(); // Simplified - real implementation would use reflection
}
}
// Mock emitter for testing
public class MockEmitter<T> implements Emitter<T> {
private final List<T> emitted = new ArrayList<>();
private final List<InvalidEntry<T>> errors = new ArrayList<>();
@Override
public void emit(T value) {
emitted.add(value);
}
@Override
public void emitError(InvalidEntry<T> invalidEntry) {
errors.add(invalidEntry);
}
public List<T> getEmitted() {
return new ArrayList<>(emitted);
}
public List<InvalidEntry<T>> getErrors() {
return new ArrayList<>(errors);
}
public void clear() {
emitted.clear();
errors.clear();
}
}The CDAP Plugin System enables building modular, reusable data processing components with strong type safety, comprehensive configuration management, and enterprise-grade operational features. This extensibility framework is essential for creating scalable, maintainable data processing applications.
Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap