0
# Annotations and Configuration
1
2
CDAP's annotation system provides declarative configuration for dependency injection, transaction control, data access patterns, plugin metadata, and program behavior.
3
4
## Dataset Injection Annotations
5
6
### @UseDataSet
7
8
```java { .api }
9
@Target(ElementType.FIELD)
10
@Retention(RetentionPolicy.RUNTIME)
11
public @interface UseDataSet {
12
String value();
13
}
14
```
15
16
Declares that a Flowlet method uses a specific dataset. Used in Flowlet classes to inject dataset instances.
17
18
**Usage Example:**
19
```java
20
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
21
import co.cask.cdap.api.dataset.lib.ObjectStore;
22
23
public class PurchaseStore extends AbstractFlowlet {
24
@UseDataSet("myTable")
25
private ObjectStore<Purchase> store;
26
27
@ProcessInput
28
public void process(Purchase purchase) {
29
store.write(Bytes.toBytes(purchase.getPurchaseTime()), purchase);
30
}
31
}
32
```
33
34
**Note:** This annotation is specifically designed for Flowlet classes and dataset field injection in the Flowlet context.
35
36
## Plugin Annotations
37
38
### @Plugin
39
40
```java { .api }
41
@Target(ElementType.TYPE)
42
@Retention(RetentionPolicy.RUNTIME)
43
public @interface Plugin {
44
String type();
45
}
46
```
47
48
Marks a class as a CDAP plugin of the specified type.
49
50
### @Name
51
52
```java { .api }
53
@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})
54
@Retention(RetentionPolicy.RUNTIME)
55
public @interface Name {
56
String value();
57
}
58
```
59
60
Specifies the name for plugins, configuration properties, or other named elements.
61
62
### @Description
63
64
```java { .api }
65
@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})
66
@Retention(RetentionPolicy.RUNTIME)
67
public @interface Description {
68
String value();
69
}
70
```
71
72
Provides human-readable descriptions for plugins, properties, and methods.
73
74
**Plugin Example:**
75
```java
76
@Plugin(type = "transform")
77
@Name("FieldCleaner")
78
@Description("Cleans and validates field values")
79
public class FieldCleanerConfig extends PluginConfig {
80
81
@Name("targetField")
82
@Description("Field to clean and validate")
83
private String targetField;
84
85
@Name("cleaningRules")
86
@Description("Comma-separated list of cleaning rules")
87
private String cleaningRules = "trim,lowercase";
88
}
89
```
90
91
## Configuration Annotations
92
93
### @Property
94
95
```java { .api }
96
@Target(ElementType.FIELD)
97
@Retention(RetentionPolicy.RUNTIME)
98
public @interface Property {
99
}
100
```
101
102
Marks fields as configurable properties in plugin configurations.
103
104
### @Macro
105
106
```java { .api }
107
@Target(ElementType.FIELD)
108
@Retention(RetentionPolicy.RUNTIME)
109
public @interface Macro {
110
}
111
```
112
113
Indicates that a configuration property supports macro substitution at runtime.
114
115
**Configuration Example:**
116
```java
117
public class DatabaseConfig extends PluginConfig {
118
@Property
119
@Name("connectionString")
120
@Description("Database connection string")
121
@Macro
122
private String connectionString;
123
124
@Property
125
@Name("tableName")
126
@Description("Target table name")
127
private String tableName;
128
129
@Property
130
@Name("batchSize")
131
@Description("Batch size for operations")
132
private int batchSize = 1000;
133
}
134
```
135
136
## Transaction Control Annotations
137
138
### @TransactionPolicy
139
140
```java { .api }
141
@Target({ElementType.TYPE, ElementType.METHOD})
142
@Retention(RetentionPolicy.RUNTIME)
143
public @interface TransactionPolicy {
144
TransactionControl value();
145
}
146
```
147
148
Controls transaction behavior for programs and methods.
149
150
### TransactionControl
151
152
```java { .api }
153
public enum TransactionControl {
154
IMPLICIT, // Automatic transaction management
155
EXPLICIT // Manual transaction management
156
}
157
```
158
159
**Transaction Examples:**
160
```java
161
@TransactionPolicy(TransactionControl.EXPLICIT)
162
public class ExplicitTransactionWorker extends AbstractWorker {
163
164
@Override
165
public void run() {
166
WorkerContext context = getContext();
167
168
context.execute(new TxRunnable() {
169
@Override
170
public void run(DatasetContext context) throws Exception {
171
// Transactional operations
172
KeyValueTable data = context.getDataset("data");
173
data.write("key", "value");
174
}
175
});
176
}
177
}
178
179
public class ImplicitTransactionMapReduce extends AbstractMapReduce {
180
181
@TransactionPolicy(TransactionControl.IMPLICIT)
182
@Override
183
public void initialize(MapReduceContext context) {
184
// Automatically wrapped in transaction
185
KeyValueTable config = context.getDataset("config");
186
String value = config.read("setting");
187
}
188
}
189
```
190
191
## Data Access Annotations
192
193
### @ReadOnly
194
195
```java { .api }
196
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})
197
@Retention(RetentionPolicy.RUNTIME)
198
public @interface ReadOnly {
199
}
200
```
201
202
Indicates read-only access pattern for datasets.
203
204
### @ReadWrite
205
206
```java { .api }
207
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})
208
@Retention(RetentionPolicy.RUNTIME)
209
public @interface ReadWrite {
210
}
211
```
212
213
Indicates read-write access pattern for datasets.
214
215
### @WriteOnly
216
217
```java { .api }
218
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})
219
@Retention(RetentionPolicy.RUNTIME)
220
public @interface WriteOnly {
221
}
222
```
223
224
Indicates write-only access pattern for datasets.
225
226
**Data Access Examples:**
227
```java
228
public class DataAccessExample extends AbstractMapReduce {
229
230
@UseDataSet("readOnlyData")
231
@ReadOnly
232
private KeyValueTable readOnlyData;
233
234
@UseDataSet("writeOnlyResults")
235
@WriteOnly
236
private ObjectStore<Result> results;
237
238
@UseDataSet("readWriteCache")
239
@ReadWrite
240
private KeyValueTable cache;
241
}
242
```
243
244
## Processing Annotations
245
246
### @ProcessInput (Deprecated)
247
248
```java { .api }
249
@Target(ElementType.METHOD)
250
@Retention(RetentionPolicy.RUNTIME)
251
@Deprecated
252
public @interface ProcessInput {
253
String value() default "";
254
}
255
```
256
257
Used in deprecated Flow programs for input processing methods.
258
259
### @Batch
260
261
```java { .api }
262
@Target(ElementType.METHOD)
263
@Retention(RetentionPolicy.RUNTIME)
264
public @interface Batch {
265
int value() default 1;
266
}
267
```
268
269
Specifies batch processing size for input processing.
270
271
### @HashPartition
272
273
```java { .api }
274
@Target(ElementType.METHOD)
275
@Retention(RetentionPolicy.RUNTIME)
276
public @interface HashPartition {
277
String value();
278
}
279
```
280
281
Specifies hash partitioning for data distribution.
282
283
### @RoundRobin
284
285
```java { .api }
286
@Target(ElementType.METHOD)
287
@Retention(RetentionPolicy.RUNTIME)
288
public @interface RoundRobin {
289
}
290
```
291
292
Specifies round-robin distribution for data processing.
293
294
### @Tick
295
296
```java { .api }
297
@Target(ElementType.METHOD)
298
@Retention(RetentionPolicy.RUNTIME)
299
public @interface Tick {
300
long delay();
301
TimeUnit unit() default TimeUnit.SECONDS;
302
}
303
```
304
305
Specifies time-based periodic processing.
306
307
**Processing Examples:**
308
```java
309
public class ProcessingAnnotationsExample {
310
311
@Batch(100)
312
@HashPartition("userId")
313
public void processBatch(List<UserEvent> events) {
314
// Process batch of 100 events partitioned by userId
315
}
316
317
@Tick(delay = 30, unit = TimeUnit.SECONDS)
318
public void periodicCleanup() {
319
// Execute every 30 seconds
320
}
321
322
@RoundRobin
323
public void distributeWork(WorkItem item) {
324
// Distribute work items in round-robin fashion
325
}
326
}
327
```
328
329
## Requirement Annotations
330
331
### @Requirements
332
333
```java { .api }
334
@Target(ElementType.TYPE)
335
@Retention(RetentionPolicy.RUNTIME)
336
public @interface Requirements {
337
String[] capabilities() default {};
338
String[] datasetTypes() default {};
339
}
340
```
341
342
Specifies requirements that must be satisfied for plugin or program execution.
343
344
**Requirements Example:**
345
```java
346
@Plugin(type = "sink")
347
@Requirements(
348
capabilities = {"database.connection", "ssl.support"},
349
datasetTypes = {"keyValueTable", "objectStore"}
350
)
351
public class SecureDatabaseSink extends PluginConfig {
352
// Plugin implementation
353
}
354
```
355
356
## Complete Annotation Usage Example
357
358
```java
359
@Plugin(type = "batchsource")
360
@Name("EnhancedFileSource")
361
@Description("Enhanced file source with validation and transformation")
362
@Requirements(capabilities = {"file.access", "validation"})
363
public class EnhancedFileSourceConfig extends PluginConfig {
364
365
@Property
366
@Name("path")
367
@Description("Input file path with macro support")
368
@Macro
369
private String path;
370
371
@Property
372
@Name("format")
373
@Description("File format (csv, json, avro)")
374
private String format = "csv";
375
376
@Property
377
@Name("validateSchema")
378
@Description("Enable schema validation")
379
private boolean validateSchema = true;
380
381
@Property
382
@Name("batchSize")
383
@Description("Processing batch size")
384
private int batchSize = 1000;
385
386
// Configuration validation and getters
387
public void validate() {
388
if (path == null || path.isEmpty()) {
389
throw new IllegalArgumentException("Path is required");
390
}
391
}
392
393
public String getPath() { return path; }
394
public String getFormat() { return format; }
395
public boolean isValidateSchema() { return validateSchema; }
396
public int getBatchSize() { return batchSize; }
397
}
398
399
@TransactionPolicy(TransactionControl.EXPLICIT)
400
public class AnnotatedProcessor extends AbstractWorker {
401
402
@UseDataSet("inputData")
403
@ReadOnly
404
private FileSet inputData;
405
406
@UseDataSet("processedData")
407
@WriteOnly
408
private ObjectStore<ProcessedRecord> processedData;
409
410
@UseDataSet("errorLog")
411
@WriteOnly
412
private KeyValueTable errorLog;
413
414
@Override
415
public void configure(WorkerConfigurer configurer) {
416
configurer.setName("AnnotatedProcessor");
417
configurer.useDataset("inputData");
418
configurer.useDataset("processedData");
419
configurer.useDataset("errorLog");
420
}
421
422
@Override
423
public void run() {
424
WorkerContext context = getContext();
425
426
context.execute(new TxRunnable() {
427
@Override
428
public void run(DatasetContext txContext) throws Exception {
429
// Explicit transaction for batch processing
430
processBatch(txContext);
431
}
432
});
433
}
434
435
@Batch(500)
436
@HashPartition("recordType")
437
private void processBatch(DatasetContext context) {
438
// Process batch of 500 records partitioned by type
439
}
440
441
@Tick(delay = 60, unit = TimeUnit.SECONDS)
442
private void logStatistics() {
443
// Log statistics every minute
444
}
445
}
446
```
447
448
CDAP's annotation system provides a powerful declarative approach to configuration, reducing boilerplate code and improving maintainability while ensuring type safety and runtime validation.