CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-api

Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.

Pending
Overview
Eval results
Files

annotations.mddocs/

Annotations and Configuration

CDAP's annotation system provides declarative configuration for dependency injection, transaction control, data access patterns, plugin metadata, and program behavior.

Dataset Injection Annotations

@UseDataSet

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface UseDataSet {
    String value();
}

Declares that a Flowlet method uses a specific dataset. Used in Flowlet classes to inject dataset instances.

Usage Example:

import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.dataset.lib.ObjectStore;

public class PurchaseStore extends AbstractFlowlet {
    @UseDataSet("myTable")
    private ObjectStore<Purchase> store;
    
    @ProcessInput
    public void process(Purchase purchase) {
        store.write(Bytes.toBytes(purchase.getPurchaseTime()), purchase);
    }
}

Note: This annotation is specifically designed for Flowlet classes and dataset field injection in the Flowlet context.

Plugin Annotations

@Plugin

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Plugin {
    String type();
}

Marks a class as a CDAP plugin of the specified type.

@Name

@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Name {
    String value();
}

Specifies the name for plugins, configuration properties, or other named elements.

@Description

@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Description {
    String value();
}

Provides human-readable descriptions for plugins, properties, and methods.

Plugin Example:

@Plugin(type = "transform")
@Name("FieldCleaner")
@Description("Cleans and validates field values")
public class FieldCleanerConfig extends PluginConfig {
    
    @Name("targetField")
    @Description("Field to clean and validate")
    private String targetField;
    
    @Name("cleaningRules")
    @Description("Comma-separated list of cleaning rules")
    private String cleaningRules = "trim,lowercase";
}

Configuration Annotations

@Property

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Property {
}

Marks fields as configurable properties in plugin configurations.

@Macro

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Macro {
}

Indicates that a configuration property supports macro substitution at runtime.

Configuration Example:

public class DatabaseConfig extends PluginConfig {
    @Property
    @Name("connectionString")
    @Description("Database connection string")
    @Macro
    private String connectionString;
    
    @Property
    @Name("tableName")
    @Description("Target table name")
    private String tableName;
    
    @Property
    @Name("batchSize")
    @Description("Batch size for operations")
    private int batchSize = 1000;
}

Transaction Control Annotations

@TransactionPolicy

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface TransactionPolicy {
    TransactionControl value();
}

Controls transaction behavior for programs and methods.

TransactionControl

public enum TransactionControl {
    IMPLICIT,  // Automatic transaction management
    EXPLICIT   // Manual transaction management
}

Transaction Examples:

@TransactionPolicy(TransactionControl.EXPLICIT)
public class ExplicitTransactionWorker extends AbstractWorker {
    
    @Override
    public void run() {
        WorkerContext context = getContext();
        
        context.execute(new TxRunnable() {
            @Override
            public void run(DatasetContext context) throws Exception {
                // Transactional operations
                KeyValueTable data = context.getDataset("data");
                data.write("key", "value");
            }
        });
    }
}

public class ImplicitTransactionMapReduce extends AbstractMapReduce {
    
    @TransactionPolicy(TransactionControl.IMPLICIT)
    @Override
    public void initialize(MapReduceContext context) {
        // Automatically wrapped in transaction
        KeyValueTable config = context.getDataset("config");
        String value = config.read("setting");
    }
}

Data Access Annotations

@ReadOnly

@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)  
public @interface ReadOnly {
}

Indicates read-only access pattern for datasets.

@ReadWrite

@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadWrite {
}

Indicates read-write access pattern for datasets.

@WriteOnly

@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface WriteOnly {
}

Indicates write-only access pattern for datasets.

Data Access Examples:

public class DataAccessExample extends AbstractMapReduce {
    
    @UseDataSet("readOnlyData")
    @ReadOnly
    private KeyValueTable readOnlyData;
    
    @UseDataSet("writeOnlyResults")
    @WriteOnly
    private ObjectStore<Result> results;
    
    @UseDataSet("readWriteCache")
    @ReadWrite
    private KeyValueTable cache;
}

Processing Annotations

@ProcessInput (Deprecated)

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Deprecated
public @interface ProcessInput {
    String value() default "";
}

Used in deprecated Flow programs for input processing methods.

@Batch

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Batch {
    int value() default 1;
}

Specifies batch processing size for input processing.

@HashPartition

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface HashPartition {
    String value();
}

Specifies hash partitioning for data distribution.

@RoundRobin

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RoundRobin {
}

Specifies round-robin distribution for data processing.

@Tick

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Tick {
    long delay();
    TimeUnit unit() default TimeUnit.SECONDS;
}

Specifies time-based periodic processing.

Processing Examples:

public class ProcessingAnnotationsExample {
    
    @Batch(100)
    @HashPartition("userId")
    public void processBatch(List<UserEvent> events) {
        // Process batch of 100 events partitioned by userId
    }
    
    @Tick(delay = 30, unit = TimeUnit.SECONDS)
    public void periodicCleanup() {
        // Execute every 30 seconds
    }
    
    @RoundRobin
    public void distributeWork(WorkItem item) {
        // Distribute work items in round-robin fashion
    }
}

Requirement Annotations

@Requirements

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Requirements {
    String[] capabilities() default {};
    String[] datasetTypes() default {};
}

Specifies requirements that must be satisfied for plugin or program execution.

Requirements Example:

@Plugin(type = "sink")
@Requirements(
    capabilities = {"database.connection", "ssl.support"},
    datasetTypes = {"keyValueTable", "objectStore"}
)
public class SecureDatabaseSink extends PluginConfig {
    // Plugin implementation
}

Complete Annotation Usage Example

@Plugin(type = "batchsource")
@Name("EnhancedFileSource")
@Description("Enhanced file source with validation and transformation")
@Requirements(capabilities = {"file.access", "validation"})
public class EnhancedFileSourceConfig extends PluginConfig {
    
    @Property
    @Name("path")
    @Description("Input file path with macro support")
    @Macro
    private String path;
    
    @Property
    @Name("format")
    @Description("File format (csv, json, avro)")
    private String format = "csv";
    
    @Property
    @Name("validateSchema")
    @Description("Enable schema validation")
    private boolean validateSchema = true;
    
    @Property
    @Name("batchSize")
    @Description("Processing batch size")
    private int batchSize = 1000;
    
    // Configuration validation and getters
    public void validate() {
        if (path == null || path.isEmpty()) {
            throw new IllegalArgumentException("Path is required");
        }
    }
    
    public String getPath() { return path; }
    public String getFormat() { return format; }
    public boolean isValidateSchema() { return validateSchema; }
    public int getBatchSize() { return batchSize; }
}

@TransactionPolicy(TransactionControl.EXPLICIT)
public class AnnotatedProcessor extends AbstractWorker {
    
    @UseDataSet("inputData")
    @ReadOnly
    private FileSet inputData;
    
    @UseDataSet("processedData")
    @WriteOnly
    private ObjectStore<ProcessedRecord> processedData;
    
    @UseDataSet("errorLog")
    @WriteOnly
    private KeyValueTable errorLog;
    
    @Override
    public void configure(WorkerConfigurer configurer) {
        configurer.setName("AnnotatedProcessor");
        configurer.useDataset("inputData");
        configurer.useDataset("processedData");
        configurer.useDataset("errorLog");
    }
    
    @Override
    public void run() {
        WorkerContext context = getContext();
        
        context.execute(new TxRunnable() {
            @Override
            public void run(DatasetContext txContext) throws Exception {
                // Explicit transaction for batch processing
                processBatch(txContext);
            }
        });
    }
    
    @Batch(500)
    @HashPartition("recordType")
    private void processBatch(DatasetContext context) {
        // Process batch of 500 records partitioned by type
    }
    
    @Tick(delay = 60, unit = TimeUnit.SECONDS)
    private void logStatistics() {
        // Log statistics every minute
    }
}

CDAP's annotation system provides a powerful declarative approach to configuration, reducing boilerplate code and improving maintainability while ensuring type safety and runtime validation.

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-api

docs

annotations.md

application-framework.md

dataset-management.md

index.md

mapreduce-programs.md

plugin-framework.md

scheduling.md

service-programs.md

spark-programs.md

system-services.md

transactions.md

worker-programs.md

workflow-programs.md

tile.json