Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.
—
CDAP's annotation system provides declarative configuration for dependency injection, transaction control, data access patterns, plugin metadata, and program behavior.
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface UseDataSet {
String value();
}Declares that a Flowlet method uses a specific dataset. Used in Flowlet classes to inject dataset instances.
Usage Example:
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.dataset.lib.ObjectStore;
public class PurchaseStore extends AbstractFlowlet {
@UseDataSet("myTable")
private ObjectStore<Purchase> store;
@ProcessInput
public void process(Purchase purchase) {
store.write(Bytes.toBytes(purchase.getPurchaseTime()), purchase);
}
}Note: This annotation is specifically designed for Flowlet classes and dataset field injection in the Flowlet context.
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Plugin {
String type();
}Marks a class as a CDAP plugin of the specified type.
@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Name {
String value();
}Specifies the name for plugins, configuration properties, or other named elements.
@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Description {
String value();
}Provides human-readable descriptions for plugins, properties, and methods.
Plugin Example:
@Plugin(type = "transform")
@Name("FieldCleaner")
@Description("Cleans and validates field values")
public class FieldCleanerConfig extends PluginConfig {
@Name("targetField")
@Description("Field to clean and validate")
private String targetField;
@Name("cleaningRules")
@Description("Comma-separated list of cleaning rules")
private String cleaningRules = "trim,lowercase";
}@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Property {
}Marks fields as configurable properties in plugin configurations.
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Macro {
}Indicates that a configuration property supports macro substitution at runtime.
Configuration Example:
public class DatabaseConfig extends PluginConfig {
@Property
@Name("connectionString")
@Description("Database connection string")
@Macro
private String connectionString;
@Property
@Name("tableName")
@Description("Target table name")
private String tableName;
@Property
@Name("batchSize")
@Description("Batch size for operations")
private int batchSize = 1000;
}@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface TransactionPolicy {
TransactionControl value();
}Controls transaction behavior for programs and methods.
public enum TransactionControl {
IMPLICIT, // Automatic transaction management
EXPLICIT // Manual transaction management
}Transaction Examples:
@TransactionPolicy(TransactionControl.EXPLICIT)
public class ExplicitTransactionWorker extends AbstractWorker {
@Override
public void run() {
WorkerContext context = getContext();
context.execute(new TxRunnable() {
@Override
public void run(DatasetContext context) throws Exception {
// Transactional operations
KeyValueTable data = context.getDataset("data");
data.write("key", "value");
}
});
}
}
public class ImplicitTransactionMapReduce extends AbstractMapReduce {
@TransactionPolicy(TransactionControl.IMPLICIT)
@Override
public void initialize(MapReduceContext context) {
// Automatically wrapped in transaction
KeyValueTable config = context.getDataset("config");
String value = config.read("setting");
}
}@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
}Indicates read-only access pattern for datasets.
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadWrite {
}Indicates read-write access pattern for datasets.
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface WriteOnly {
}Indicates write-only access pattern for datasets.
Data Access Examples:
public class DataAccessExample extends AbstractMapReduce {
@UseDataSet("readOnlyData")
@ReadOnly
private KeyValueTable readOnlyData;
@UseDataSet("writeOnlyResults")
@WriteOnly
private ObjectStore<Result> results;
@UseDataSet("readWriteCache")
@ReadWrite
private KeyValueTable cache;
}@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Deprecated
public @interface ProcessInput {
String value() default "";
}Used in deprecated Flow programs for input processing methods.
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Batch {
int value() default 1;
}Specifies batch processing size for input processing.
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface HashPartition {
String value();
}Specifies hash partitioning for data distribution.
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RoundRobin {
}Specifies round-robin distribution for data processing.
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Tick {
long delay();
TimeUnit unit() default TimeUnit.SECONDS;
}Specifies time-based periodic processing.
Processing Examples:
public class ProcessingAnnotationsExample {
@Batch(100)
@HashPartition("userId")
public void processBatch(List<UserEvent> events) {
// Process batch of 100 events partitioned by userId
}
@Tick(delay = 30, unit = TimeUnit.SECONDS)
public void periodicCleanup() {
// Execute every 30 seconds
}
@RoundRobin
public void distributeWork(WorkItem item) {
// Distribute work items in round-robin fashion
}
}@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Requirements {
String[] capabilities() default {};
String[] datasetTypes() default {};
}Specifies requirements that must be satisfied for plugin or program execution.
Requirements Example:
@Plugin(type = "sink")
@Requirements(
capabilities = {"database.connection", "ssl.support"},
datasetTypes = {"keyValueTable", "objectStore"}
)
public class SecureDatabaseSink extends PluginConfig {
// Plugin implementation
}@Plugin(type = "batchsource")
@Name("EnhancedFileSource")
@Description("Enhanced file source with validation and transformation")
@Requirements(capabilities = {"file.access", "validation"})
public class EnhancedFileSourceConfig extends PluginConfig {
@Property
@Name("path")
@Description("Input file path with macro support")
@Macro
private String path;
@Property
@Name("format")
@Description("File format (csv, json, avro)")
private String format = "csv";
@Property
@Name("validateSchema")
@Description("Enable schema validation")
private boolean validateSchema = true;
@Property
@Name("batchSize")
@Description("Processing batch size")
private int batchSize = 1000;
// Configuration validation and getters
public void validate() {
if (path == null || path.isEmpty()) {
throw new IllegalArgumentException("Path is required");
}
}
public String getPath() { return path; }
public String getFormat() { return format; }
public boolean isValidateSchema() { return validateSchema; }
public int getBatchSize() { return batchSize; }
}
@TransactionPolicy(TransactionControl.EXPLICIT)
public class AnnotatedProcessor extends AbstractWorker {
@UseDataSet("inputData")
@ReadOnly
private FileSet inputData;
@UseDataSet("processedData")
@WriteOnly
private ObjectStore<ProcessedRecord> processedData;
@UseDataSet("errorLog")
@WriteOnly
private KeyValueTable errorLog;
@Override
public void configure(WorkerConfigurer configurer) {
configurer.setName("AnnotatedProcessor");
configurer.useDataset("inputData");
configurer.useDataset("processedData");
configurer.useDataset("errorLog");
}
@Override
public void run() {
WorkerContext context = getContext();
context.execute(new TxRunnable() {
@Override
public void run(DatasetContext txContext) throws Exception {
// Explicit transaction for batch processing
processBatch(txContext);
}
});
}
@Batch(500)
@HashPartition("recordType")
private void processBatch(DatasetContext context) {
// Process batch of 500 records partitioned by type
}
@Tick(delay = 60, unit = TimeUnit.SECONDS)
private void logStatistics() {
// Log statistics every minute
}
}CDAP's annotation system provides a powerful declarative approach to configuration, reducing boilerplate code and improving maintainability while ensuring type safety and runtime validation.
Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-api