CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap

The Cask Data Application Platform (CDAP) is an integrated, open source application development platform for the Hadoop ecosystem that provides developers with data and application abstractions to simplify and accelerate application development.

Pending
Overview
Eval results
Files

plugin-system.mddocs/

Plugin System

The CDAP Plugin System provides a powerful extensibility framework that allows developers to create reusable, configurable components for data processing pipelines. Plugins enable modular application development and promote code reuse across different applications and organizations.

Plugin Architecture

Core Plugin Interfaces

import io.cdap.cdap.api.plugin.*;
import io.cdap.cdap.api.annotation.*;

// Plugin configurer interface
public interface PluginConfigurer {
    <T> T usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties);
    <T> T usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties, 
                   PluginSelector selector);
    <T> Class<T> usePluginClass(String pluginType, String pluginName, String pluginId, 
                               PluginProperties properties);
    <T> Class<T> usePluginClass(String pluginType, String pluginName, String pluginId, 
                               PluginProperties properties, PluginSelector selector);
}

// Plugin runtime context
public interface PluginContext extends FeatureFlagsProvider {
    <T> T newPluginInstance(String pluginId) throws InstantiationException;
    <T> Class<T> loadPluginClass(String pluginId);
    PluginProperties getPluginProperties(String pluginId);
    Map<String, PluginProperties> getPlugins();
}

// Plugin metadata
public final class Plugin {
    public static Plugin of(String type, String name, String pluginId, PluginProperties properties) { 
        /* create plugin instance */ 
    }
    
    public String getPluginType() { /* returns plugin type */ }
    public String getPluginName() { /* returns plugin name */ }
    public String getPluginId() { /* returns plugin ID */ }
    public PluginProperties getProperties() { /* returns plugin properties */ }
    public PluginSelector getSelector() { /* returns plugin selector */ }
}

Plugin Properties and Configuration

// Plugin properties container
public class PluginProperties implements Serializable {
    public static Builder builder() { return new Builder(); }
    public static PluginProperties of(Map<String, String> properties) { /* create from map */ }
    
    public Map<String, String> getProperties() { /* returns properties map */ }
    public String getProperty(String key) { /* returns property value */ }
    public String getProperty(String key, String defaultValue) { /* returns property with default */ }
    
    public static class Builder {
        public Builder add(String key, String value) { /* add property */ return this; }
        public Builder addAll(Map<String, String> properties) { /* add all properties */ return this; }
        public PluginProperties build() { /* build properties */ }
    }
}

// Base plugin configuration class
public abstract class PluginConfig extends Config implements Serializable {
    // Base class for all plugin configurations
    // Extend this class for typed plugin configurations
}

// Plugin class metadata
public class PluginClass {
    public String getName() { /* returns plugin name */ }
    public String getType() { /* returns plugin type */ }
    public String getDescription() { /* returns plugin description */ }
    public String getClassName() { /* returns plugin class name */ }
    public String getCategory() { /* returns plugin category */ }
    public Set<PluginPropertyField> getProperties() { /* returns plugin properties */ }
    public Map<String, PluginPropertyField> getPropertiesMap() { /* returns properties as map */ }
    public Requirements getRequirements() { /* returns plugin requirements */ }
}

// Plugin property field metadata
public class PluginPropertyField {
    public String getName() { /* returns field name */ }
    public String getDescription() { /* returns field description */ }
    public String getType() { /* returns field type */ }
    public boolean isRequired() { /* returns if field is required */ }
    public boolean isMacroSupported() { /* returns if macros are supported */ }
    public boolean isMacroEscapingEnabled() { /* returns if macro escaping is enabled */ }
    public Set<String> getChildren() { /* returns child field names */ }
}

Plugin Annotations

// Core plugin annotations
@Plugin(type = "source")  // Marks a class as a plugin of specific type
@Name("MySourcePlugin")   // Specifies the plugin name
@Description("Reads data from external source")  // Provides plugin description
@Category("source")       // Categorizes the plugin

// Property annotations
@Property                 // Marks fields as configuration properties
@Macro                   // Enables macro substitution in field values
@Description("Input path for data files")  // Describes configuration properties

// Metadata annotations  
@Metadata(properties = {
    @MetadataProperty(key = "doc.url", value = "https://example.com/docs"),
    @MetadataProperty(key = "author", value = "Data Team")
})

Plugin Types and Development

Source Plugins

Source plugins read data from external systems:

// Source plugin configuration
public class FileSourceConfig extends PluginConfig {
    @Name("path")
    @Description("Path to input files")
    @Macro
    @Property
    private String path;
    
    @Name("format")
    @Description("File format (json, csv, avro, parquet)")
    @Property
    private String format = "json";
    
    @Name("schema")
    @Description("Schema of the input data")
    @Property
    private String schema;
    
    @Name("recursive")
    @Description("Whether to read files recursively")
    @Property
    private Boolean recursive = false;
    
    // Getters and validation methods
    public String getPath() { return path; }
    public String getFormat() { return format; }
    public String getSchema() { return schema; }
    public Boolean getRecursive() { return recursive; }
    
    public void validate() {
        if (path == null || path.isEmpty()) {
            throw new IllegalArgumentException("Path cannot be empty");
        }
        if (!Arrays.asList("json", "csv", "avro", "parquet").contains(format)) {
            throw new IllegalArgumentException("Unsupported format: " + format);
        }
    }
}

// Source plugin implementation
@Plugin(type = "batchsource")
@Name("FileSource")
@Description("Reads data from files in various formats")
@Category("source")
@Metadata(properties = {
    @MetadataProperty(key = "doc.url", value = "https://docs.example.com/plugins/file-source")
})
public class FileSourcePlugin extends BatchSource<NullWritable, Text, StructuredRecord> {
    
    private final FileSourceConfig config;
    
    public FileSourcePlugin(FileSourceConfig config) {
        this.config = config;
    }
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        // Validate configuration
        config.validate();
        
        // Set output schema
        try {
            Schema outputSchema = Schema.parseJson(config.getSchema());
            pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
        } catch (IOException e) {
            throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e);
        }
    }
    
    @Override
    public void prepareRun(BatchSourceContext context) throws Exception {
        // Prepare the source for execution
        Job job = context.getHadoopJob();
        
        // Configure input format based on file type
        switch (config.getFormat().toLowerCase()) {
            case "json":
                job.setInputFormatClass(TextInputFormat.class);
                break;
            case "csv":
                job.setInputFormatClass(TextInputFormat.class);
                break;
            case "avro":
                job.setInputFormatClass(AvroKeyInputFormat.class);
                break;
            case "parquet":
                job.setInputFormatClass(ParquetInputFormat.class);
                break;
            default:
                throw new IllegalArgumentException("Unsupported format: " + config.getFormat());
        }
        
        // Set input path
        FileInputFormat.addInputPath(job, new Path(config.getPath()));
        
        // Configure recursive search if enabled
        if (config.getRecursive()) {
            FileInputFormat.setInputDirRecursive(job, true);
        }
    }
    
    @Override
    public void transform(KeyValue<NullWritable, Text> input, Emitter<StructuredRecord> emitter) throws Exception {
        String line = input.getValue().toString();
        
        // Parse based on format
        StructuredRecord record = parseRecord(line, config.getFormat(), config.getSchema());
        if (record != null) {
            emitter.emit(record);
        }
    }
    
    private StructuredRecord parseRecord(String line, String format, String schemaStr) throws IOException {
        Schema schema = Schema.parseJson(schemaStr);
        
        switch (format.toLowerCase()) {
            case "json":
                return parseJsonRecord(line, schema);
            case "csv":
                return parseCsvRecord(line, schema);
            default:
                throw new UnsupportedOperationException("Format not supported in transform: " + format);
        }
    }
    
    private StructuredRecord parseJsonRecord(String jsonLine, Schema schema) {
        try {
            JsonObject json = new JsonParser().parse(jsonLine).getAsJsonObject();
            StructuredRecord.Builder builder = StructuredRecord.builder(schema);
            
            for (Schema.Field field : schema.getFields()) {
                String fieldName = field.getName();
                if (json.has(fieldName) && !json.get(fieldName).isJsonNull()) {
                    Object value = parseJsonValue(json.get(fieldName), field.getSchema());
                    builder.set(fieldName, value);
                }
            }
            
            return builder.build();
        } catch (Exception e) {
            // Log error and skip malformed records
            LOG.warn("Failed to parse JSON record: {}", jsonLine, e);
            return null;
        }
    }
    
    private Object parseJsonValue(JsonElement element, Schema fieldSchema) {
        Schema.Type type = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getType() : fieldSchema.getType();
        
        switch (type) {
            case STRING:
                return element.getAsString();
            case INT:
                return element.getAsInt();
            case LONG:
                return element.getAsLong();
            case DOUBLE:
                return element.getAsDouble();
            case BOOLEAN:
                return element.getAsBoolean();
            default:
                return element.getAsString();
        }
    }
}

Transform Plugins

Transform plugins process and modify data:

// Transform plugin configuration
public class DataCleaningConfig extends PluginConfig {
    @Name("fieldsToClean")
    @Description("Comma-separated list of fields to clean")
    @Property
    private String fieldsToClean;
    
    @Name("removeNulls")
    @Description("Whether to remove records with null values")
    @Property  
    private Boolean removeNulls = true;
    
    @Name("trimWhitespace")
    @Description("Whether to trim whitespace from string fields")
    @Property
    private Boolean trimWhitespace = true;
    
    @Name("lowercaseStrings")
    @Description("Whether to convert strings to lowercase")
    @Property
    private Boolean lowercaseStrings = false;
    
    public List<String> getFieldsToClean() {
        if (fieldsToClean == null || fieldsToClean.isEmpty()) {
            return Collections.emptyList();
        }
        return Arrays.asList(fieldsToClean.split(","))
            .stream()
            .map(String::trim)
            .collect(Collectors.toList());
    }
    
    // Other getters...
}

// Transform plugin implementation
@Plugin(type = "transform")
@Name("DataCleaning")
@Description("Cleans and standardizes data fields")
@Category("cleansing")
public class DataCleaningPlugin extends Transform<StructuredRecord, StructuredRecord> {
    
    private final DataCleaningConfig config;
    private List<String> fieldsToClean;
    private Schema outputSchema;
    
    public DataCleaningPlugin(DataCleaningConfig config) {
        this.config = config;
    }
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        Schema inputSchema = stageConfigurer.getInputSchema();
        
        if (inputSchema != null) {
            // Validate that specified fields exist
            List<String> fieldsToClean = config.getFieldsToClean();
            for (String fieldName : fieldsToClean) {
                if (inputSchema.getField(fieldName) == null) {
                    throw new IllegalArgumentException("Field '" + fieldName + "' does not exist in input schema");
                }
            }
            
            // Output schema is the same as input schema for cleaning operations
            stageConfigurer.setOutputSchema(inputSchema);
        }
    }
    
    @Override
    public void initialize(TransformContext context) throws Exception {
        super.initialize(context);
        this.fieldsToClean = config.getFieldsToClean();
        this.outputSchema = context.getOutputSchema();
    }
    
    @Override
    public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
        // Check if we should remove records with null values
        if (config.getRemoveNulls() && hasNullFields(input)) {
            // Skip this record
            return;
        }
        
        StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema);
        
        // Copy and clean each field
        for (Schema.Field field : input.getSchema().getFields()) {
            String fieldName = field.getName();
            Object value = input.get(fieldName);
            
            if (fieldsToClean.isEmpty() || fieldsToClean.contains(fieldName)) {
                value = cleanFieldValue(value, field.getSchema());
            }
            
            builder.set(fieldName, value);
        }
        
        emitter.emit(builder.build());
    }
    
    private boolean hasNullFields(StructuredRecord record) {
        for (String fieldName : fieldsToClean) {
            if (record.get(fieldName) == null) {
                return true;
            }
        }
        return false;
    }
    
    private Object cleanFieldValue(Object value, Schema fieldSchema) {
        if (value == null) {
            return null;
        }
        
        Schema.Type type = fieldSchema.isNullable() ? 
            fieldSchema.getNonNullable().getType() : fieldSchema.getType();
            
        if (type == Schema.Type.STRING) {
            String stringValue = value.toString();
            
            if (config.getTrimWhitespace()) {
                stringValue = stringValue.trim();
            }
            
            if (config.getLowercaseStrings()) {
                stringValue = stringValue.toLowerCase();
            }
            
            return stringValue;
        }
        
        return value;
    }
}

Sink Plugins

Sink plugins write data to external systems:

// Sink plugin configuration
public class DatabaseSinkConfig extends PluginConfig {
    @Name("connectionString")
    @Description("JDBC connection string")
    @Macro
    @Property
    private String connectionString;
    
    @Name("tableName") 
    @Description("Target table name")
    @Macro
    @Property
    private String tableName;
    
    @Name("username")
    @Description("Database username")
    @Macro
    @Property
    private String username;
    
    @Name("password")
    @Description("Database password")
    @Macro
    @Property
    private String password;
    
    @Name("batchSize")
    @Description("Number of records to write in each batch")
    @Property
    private Integer batchSize = 1000;
    
    // Getters and validation...
}

// Sink plugin implementation
@Plugin(type = "batchsink")
@Name("DatabaseSink") 
@Description("Writes data to a relational database")
@Category("sink")
public class DatabaseSinkPlugin extends BatchSink<StructuredRecord, NullWritable, NullWritable> {
    
    private final DatabaseSinkConfig config;
    
    public DatabaseSinkPlugin(DatabaseSinkConfig config) {
        this.config = config;
    }
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        // Validate configuration
        config.validate();
        
        // Test database connection if not using macros
        if (!containsMacros()) {
            testConnection();
        }
    }
    
    @Override
    public void prepareRun(BatchSinkContext context) throws Exception {
        Job job = context.getHadoopJob();
        
        // Configure database output format
        job.setOutputFormatClass(DatabaseOutputFormat.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(NullWritable.class);
        
        // Set database connection properties
        DatabaseConfiguration.configureDB(job.getConfiguration(),
            config.getConnectionString(),
            config.getUsername(), 
            config.getPassword(),
            config.getTableName());
    }
    
    @Override
    public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, NullWritable>> emitter) 
        throws Exception {
        
        // Convert StructuredRecord to database format and write
        // This would typically buffer records and write in batches
        writeRecordToDatabase(input);
        
        // Emit to continue pipeline (if needed)
        emitter.emit(new KeyValue<>(NullWritable.get(), NullWritable.get()));
    }
    
    private void writeRecordToDatabase(StructuredRecord record) throws SQLException {
        // Implementation for writing record to database
        // Use prepared statements and batch operations for efficiency
    }
    
    private boolean containsMacros() {
        return config.getConnectionString().contains("${") ||
               config.getTableName().contains("${") ||
               config.getUsername().contains("${") ||
               config.getPassword().contains("${");
    }
    
    private void testConnection() {
        try (Connection conn = DriverManager.getConnection(
             config.getConnectionString(), config.getUsername(), config.getPassword())) {
            // Test connection and verify table exists
            DatabaseMetaData metaData = conn.getMetaData();
            try (ResultSet tables = metaData.getTables(null, null, config.getTableName(), null)) {
                if (!tables.next()) {
                    throw new IllegalArgumentException("Table '" + config.getTableName() + "' does not exist");
                }
            }
        } catch (SQLException e) {
            throw new IllegalArgumentException("Failed to connect to database: " + e.getMessage(), e);
        }
    }
}

Plugin Selection and Requirements

Plugin Selector

// Plugin selector for choosing among multiple plugin candidates
public class PluginSelector {
    public static final PluginSelector EMPTY = new PluginSelector(SortOrder.UNSPECIFIED, null);
    
    public enum SortOrder {
        CREATION_TIME_ASC,
        CREATION_TIME_DESC,
        VERSION_ASC,
        VERSION_DESC,
        UNSPECIFIED
    }
    
    public PluginSelector(SortOrder sortOrder) { /* constructor */ }
    public PluginSelector(SortOrder sortOrder, String subtaskName) { /* constructor with subtask */ }
    
    public SortOrder getSortOrder() { /* returns sort order */ }
    public String getSubtaskName() { /* returns subtask name */ }
}

// Plugin requirements specification
public class Requirements {
    public static Builder builder() { return new Builder(); }
    
    public Set<String> getCapabilities() { /* returns required capabilities */ }
    public Set<String> getDatasetTypes() { /* returns required dataset types */ }
    
    public static class Builder {
        public Builder addCapabilities(String... capabilities) { /* add capabilities */ return this; }
        public Builder addDatasetTypes(String... datasetTypes) { /* add dataset types */ return this; }
        public Requirements build() { /* build requirements */ }
    }
}

Plugin Validation and Error Handling

// Plugin configuration validation
public class InvalidPluginConfigException extends RuntimeException {
    private final Set<InvalidPluginProperty> invalidProperties;
    
    public InvalidPluginConfigException(String message, Set<InvalidPluginProperty> invalidProperties) {
        super(message);
        this.invalidProperties = invalidProperties;
    }
    
    public Set<InvalidPluginProperty> getInvalidProperties() {
        return invalidProperties;
    }
}

// Invalid plugin property details
public class InvalidPluginProperty {
    public InvalidPluginProperty(String propertyName, String message) { /* constructor */ }
    
    public String getPropertyName() { /* returns property name */ }
    public String getMessage() { /* returns error message */ }
}

// Plugin validation utility
public abstract class ValidatingPluginConfig extends PluginConfig {
    
    public final void validate() throws InvalidPluginConfigException {
        Set<InvalidPluginProperty> errors = new HashSet<>();
        
        try {
            validateConfig(errors);
        } catch (Exception e) {
            errors.add(new InvalidPluginProperty("general", "Validation failed: " + e.getMessage()));
        }
        
        if (!errors.isEmpty()) {
            throw new InvalidPluginConfigException("Plugin configuration is invalid", errors);
        }
    }
    
    protected abstract void validateConfig(Set<InvalidPluginProperty> errors);
    
    protected void validateRequired(String propertyName, String value, Set<InvalidPluginProperty> errors) {
        if (value == null || value.trim().isEmpty()) {
            errors.add(new InvalidPluginProperty(propertyName, "Property is required"));
        }
    }
    
    protected void validateFormat(String propertyName, String value, String pattern, 
                                Set<InvalidPluginProperty> errors) {
        if (value != null && !value.matches(pattern)) {
            errors.add(new InvalidPluginProperty(propertyName, "Invalid format"));
        }
    }
}

Advanced Plugin Patterns

Plugin Composition

// Multi-stage plugin configuration
public class CompositeTransformConfig extends PluginConfig {
    @Name("stages")
    @Description("JSON array of transformation stages")
    @Property
    private String stages;
    
    public List<TransformStage> getStages() throws IOException {
        JsonArray stagesArray = new JsonParser().parse(stages).getAsJsonArray();
        List<TransformStage> stageList = new ArrayList<>();
        
        for (JsonElement element : stagesArray) {
            JsonObject stageObj = element.getAsJsonObject();
            TransformStage stage = new TransformStage(
                stageObj.get("name").getAsString(),
                stageObj.get("type").getAsString(),
                stageObj.get("config").getAsJsonObject()
            );
            stageList.add(stage);
        }
        
        return stageList;
    }
}

// Composite plugin that chains multiple transformations
@Plugin(type = "transform")
@Name("CompositeTransform")
@Description("Applies multiple transformations in sequence")
public class CompositeTransformPlugin extends Transform<StructuredRecord, StructuredRecord> {
    
    private final CompositeTransformConfig config;
    private List<Transform<StructuredRecord, StructuredRecord>> transforms;
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        Schema currentSchema = stageConfigurer.getInputSchema();
        
        // Configure each transform stage
        for (TransformStage stage : config.getStages()) {
            // Dynamically load and configure transform plugin
            Transform<StructuredRecord, StructuredRecord> transform = 
                loadTransformPlugin(stage, pipelineConfigurer);
            
            // Update schema through the pipeline
            currentSchema = getTransformOutputSchema(transform, currentSchema);
        }
        
        stageConfigurer.setOutputSchema(currentSchema);
    }
    
    @Override
    public void initialize(TransformContext context) throws Exception {
        // Initialize all child transforms
        transforms = new ArrayList<>();
        for (TransformStage stage : config.getStages()) {
            Transform<StructuredRecord, StructuredRecord> transform = 
                context.newPluginInstance(stage.getName());
            transforms.add(transform);
        }
    }
    
    @Override
    public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
        StructuredRecord current = input;
        
        // Apply each transformation in sequence
        for (Transform<StructuredRecord, StructuredRecord> transform : transforms) {
            CollectingEmitter<StructuredRecord> collector = new CollectingEmitter<>();
            transform.transform(current, collector);
            
            List<StructuredRecord> results = collector.getEmitted();
            if (results.size() == 1) {
                current = results.get(0);
            } else if (results.isEmpty()) {
                // Record was filtered out
                return;
            } else {
                // Multiple records produced - emit all but last, use last for next stage
                for (int i = 0; i < results.size() - 1; i++) {
                    emitter.emit(results.get(i));
                }
                current = results.get(results.size() - 1);
            }
        }
        
        emitter.emit(current);
    }
}

// Plugin factory pattern
public class PluginFactory {
    public static <T> T createPlugin(String pluginType, String pluginName, 
                                   PluginProperties properties, PluginContext context) {
        return context.newPluginInstance(pluginName);
    }
    
    public static PluginProperties mergeProperties(PluginProperties base, 
                                                 PluginProperties override) {
        PluginProperties.Builder builder = PluginProperties.builder();
        builder.addAll(base.getProperties());
        builder.addAll(override.getProperties());
        return builder.build();
    }
}

Plugin Testing and Development Tools

Plugin Testing Framework

// Plugin test base class
public abstract class PluginTestBase {
    
    protected <T extends PluginConfig> void validatePluginConfig(Class<T> configClass, 
                                                               Map<String, String> properties) 
                                                               throws Exception {
        T config = deserializeConfig(configClass, properties);
        if (config instanceof ValidatingPluginConfig) {
            ((ValidatingPluginConfig) config).validate();
        }
    }
    
    protected Schema createTestSchema(String... fieldSpecs) {
        List<Schema.Field> fields = new ArrayList<>();
        for (String spec : fieldSpecs) {
            String[] parts = spec.split(":");
            String name = parts[0];
            Schema.Type type = Schema.Type.valueOf(parts[1].toUpperCase());
            fields.add(Schema.Field.of(name, Schema.of(type)));
        }
        return Schema.recordOf("TestRecord", fields);
    }
    
    protected StructuredRecord createTestRecord(Schema schema, Object... values) {
        StructuredRecord.Builder builder = StructuredRecord.builder(schema);
        List<Schema.Field> fields = schema.getFields();
        
        for (int i = 0; i < Math.min(fields.size(), values.length); i++) {
            builder.set(fields.get(i).getName(), values[i]);
        }
        
        return builder.build();
    }
    
    private <T> T deserializeConfig(Class<T> configClass, Map<String, String> properties) 
        throws Exception {
        // Implementation for deserializing configuration from properties
        return configClass.newInstance(); // Simplified - real implementation would use reflection
    }
}

// Mock emitter for testing
public class MockEmitter<T> implements Emitter<T> {
    private final List<T> emitted = new ArrayList<>();
    private final List<InvalidEntry<T>> errors = new ArrayList<>();
    
    @Override
    public void emit(T value) {
        emitted.add(value);
    }
    
    @Override
    public void emitError(InvalidEntry<T> invalidEntry) {
        errors.add(invalidEntry);
    }
    
    public List<T> getEmitted() {
        return new ArrayList<>(emitted);
    }
    
    public List<InvalidEntry<T>> getErrors() {
        return new ArrayList<>(errors);
    }
    
    public void clear() {
        emitted.clear();
        errors.clear();
    }
}

The CDAP Plugin System enables building modular, reusable data processing components with strong type safety, comprehensive configuration management, and enterprise-grade operational features. This extensibility framework is essential for creating scalable, maintainable data processing applications.

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap

docs

application-framework.md

data-management.md

data-processing.md

index.md

operational.md

plugin-system.md

security-metadata.md

tile.json