or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

annotations.mdapplication-framework.mddataset-management.mdindex.mdmapreduce-programs.mdplugin-framework.mdscheduling.mdservice-programs.mdspark-programs.mdsystem-services.mdtransactions.mdworker-programs.mdworkflow-programs.md

annotations.mddocs/

0

# Annotations and Configuration

1

2

CDAP's annotation system provides declarative configuration for dependency injection, transaction control, data access patterns, plugin metadata, and program behavior.

3

4

## Dataset Injection Annotations

5

6

### @UseDataSet

7

8

```java { .api }

9

@Target(ElementType.FIELD)

10

@Retention(RetentionPolicy.RUNTIME)

11

public @interface UseDataSet {

12

String value();

13

}

14

```

15

16

Declares that a Flowlet method uses a specific dataset. Used in Flowlet classes to inject dataset instances.

17

18

**Usage Example:**

19

```java

20

import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;

21

import co.cask.cdap.api.dataset.lib.ObjectStore;

22

23

public class PurchaseStore extends AbstractFlowlet {

24

@UseDataSet("myTable")

25

private ObjectStore<Purchase> store;

26

27

@ProcessInput

28

public void process(Purchase purchase) {

29

store.write(Bytes.toBytes(purchase.getPurchaseTime()), purchase);

30

}

31

}

32

```

33

34

**Note:** This annotation is specifically designed for Flowlet classes and dataset field injection in the Flowlet context.

35

36

## Plugin Annotations

37

38

### @Plugin

39

40

```java { .api }

41

@Target(ElementType.TYPE)

42

@Retention(RetentionPolicy.RUNTIME)

43

public @interface Plugin {

44

String type();

45

}

46

```

47

48

Marks a class as a CDAP plugin of the specified type.

49

50

### @Name

51

52

```java { .api }

53

@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})

54

@Retention(RetentionPolicy.RUNTIME)

55

public @interface Name {

56

String value();

57

}

58

```

59

60

Specifies the name for plugins, configuration properties, or other named elements.

61

62

### @Description

63

64

```java { .api }

65

@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})

66

@Retention(RetentionPolicy.RUNTIME)

67

public @interface Description {

68

String value();

69

}

70

```

71

72

Provides human-readable descriptions for plugins, properties, and methods.

73

74

**Plugin Example:**

75

```java

76

@Plugin(type = "transform")

77

@Name("FieldCleaner")

78

@Description("Cleans and validates field values")

79

public class FieldCleanerConfig extends PluginConfig {

80

81

@Name("targetField")

82

@Description("Field to clean and validate")

83

private String targetField;

84

85

@Name("cleaningRules")

86

@Description("Comma-separated list of cleaning rules")

87

private String cleaningRules = "trim,lowercase";

88

}

89

```

90

91

## Configuration Annotations

92

93

### @Property

94

95

```java { .api }

96

@Target(ElementType.FIELD)

97

@Retention(RetentionPolicy.RUNTIME)

98

public @interface Property {

99

}

100

```

101

102

Marks fields as configurable properties in plugin configurations.

103

104

### @Macro

105

106

```java { .api }

107

@Target(ElementType.FIELD)

108

@Retention(RetentionPolicy.RUNTIME)

109

public @interface Macro {

110

}

111

```

112

113

Indicates that a configuration property supports macro substitution at runtime.

114

115

**Configuration Example:**

116

```java

117

public class DatabaseConfig extends PluginConfig {

118

@Property

119

@Name("connectionString")

120

@Description("Database connection string")

121

@Macro

122

private String connectionString;

123

124

@Property

125

@Name("tableName")

126

@Description("Target table name")

127

private String tableName;

128

129

@Property

130

@Name("batchSize")

131

@Description("Batch size for operations")

132

private int batchSize = 1000;

133

}

134

```

135

136

## Transaction Control Annotations

137

138

### @TransactionPolicy

139

140

```java { .api }

141

@Target({ElementType.TYPE, ElementType.METHOD})

142

@Retention(RetentionPolicy.RUNTIME)

143

public @interface TransactionPolicy {

144

TransactionControl value();

145

}

146

```

147

148

Controls transaction behavior for programs and methods.

149

150

### TransactionControl

151

152

```java { .api }

153

public enum TransactionControl {

154

IMPLICIT, // Automatic transaction management

155

EXPLICIT // Manual transaction management

156

}

157

```

158

159

**Transaction Examples:**

160

```java

161

@TransactionPolicy(TransactionControl.EXPLICIT)

162

public class ExplicitTransactionWorker extends AbstractWorker {

163

164

@Override

165

public void run() {

166

WorkerContext context = getContext();

167

168

context.execute(new TxRunnable() {

169

@Override

170

public void run(DatasetContext context) throws Exception {

171

// Transactional operations

172

KeyValueTable data = context.getDataset("data");

173

data.write("key", "value");

174

}

175

});

176

}

177

}

178

179

public class ImplicitTransactionMapReduce extends AbstractMapReduce {

180

181

@TransactionPolicy(TransactionControl.IMPLICIT)

182

@Override

183

public void initialize(MapReduceContext context) {

184

// Automatically wrapped in transaction

185

KeyValueTable config = context.getDataset("config");

186

String value = config.read("setting");

187

}

188

}

189

```

190

191

## Data Access Annotations

192

193

### @ReadOnly

194

195

```java { .api }

196

@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})

197

@Retention(RetentionPolicy.RUNTIME)

198

public @interface ReadOnly {

199

}

200

```

201

202

Indicates read-only access pattern for datasets.

203

204

### @ReadWrite

205

206

```java { .api }

207

@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})

208

@Retention(RetentionPolicy.RUNTIME)

209

public @interface ReadWrite {

210

}

211

```

212

213

Indicates read-write access pattern for datasets.

214

215

### @WriteOnly

216

217

```java { .api }

218

@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})

219

@Retention(RetentionPolicy.RUNTIME)

220

public @interface WriteOnly {

221

}

222

```

223

224

Indicates write-only access pattern for datasets.

225

226

**Data Access Examples:**

227

```java

228

public class DataAccessExample extends AbstractMapReduce {

229

230

@UseDataSet("readOnlyData")

231

@ReadOnly

232

private KeyValueTable readOnlyData;

233

234

@UseDataSet("writeOnlyResults")

235

@WriteOnly

236

private ObjectStore<Result> results;

237

238

@UseDataSet("readWriteCache")

239

@ReadWrite

240

private KeyValueTable cache;

241

}

242

```

243

244

## Processing Annotations

245

246

### @ProcessInput (Deprecated)

247

248

```java { .api }

249

@Target(ElementType.METHOD)

250

@Retention(RetentionPolicy.RUNTIME)

251

@Deprecated

252

public @interface ProcessInput {

253

String value() default "";

254

}

255

```

256

257

Used in deprecated Flow programs for input processing methods.

258

259

### @Batch

260

261

```java { .api }

262

@Target(ElementType.METHOD)

263

@Retention(RetentionPolicy.RUNTIME)

264

public @interface Batch {

265

int value() default 1;

266

}

267

```

268

269

Specifies batch processing size for input processing.

270

271

### @HashPartition

272

273

```java { .api }

274

@Target(ElementType.METHOD)

275

@Retention(RetentionPolicy.RUNTIME)

276

public @interface HashPartition {

277

String value();

278

}

279

```

280

281

Specifies hash partitioning for data distribution.

282

283

### @RoundRobin

284

285

```java { .api }

286

@Target(ElementType.METHOD)

287

@Retention(RetentionPolicy.RUNTIME)

288

public @interface RoundRobin {

289

}

290

```

291

292

Specifies round-robin distribution for data processing.

293

294

### @Tick

295

296

```java { .api }

297

@Target(ElementType.METHOD)

298

@Retention(RetentionPolicy.RUNTIME)

299

public @interface Tick {

300

long delay();

301

TimeUnit unit() default TimeUnit.SECONDS;

302

}

303

```

304

305

Specifies time-based periodic processing.

306

307

**Processing Examples:**

308

```java

309

public class ProcessingAnnotationsExample {

310

311

@Batch(100)

312

@HashPartition("userId")

313

public void processBatch(List<UserEvent> events) {

314

// Process batch of 100 events partitioned by userId

315

}

316

317

@Tick(delay = 30, unit = TimeUnit.SECONDS)

318

public void periodicCleanup() {

319

// Execute every 30 seconds

320

}

321

322

@RoundRobin

323

public void distributeWork(WorkItem item) {

324

// Distribute work items in round-robin fashion

325

}

326

}

327

```

328

329

## Requirement Annotations

330

331

### @Requirements

332

333

```java { .api }

334

@Target(ElementType.TYPE)

335

@Retention(RetentionPolicy.RUNTIME)

336

public @interface Requirements {

337

String[] capabilities() default {};

338

String[] datasetTypes() default {};

339

}

340

```

341

342

Specifies requirements that must be satisfied for plugin or program execution.

343

344

**Requirements Example:**

345

```java

346

@Plugin(type = "sink")

347

@Requirements(

348

capabilities = {"database.connection", "ssl.support"},

349

datasetTypes = {"keyValueTable", "objectStore"}

350

)

351

public class SecureDatabaseSink extends PluginConfig {

352

// Plugin implementation

353

}

354

```

355

356

## Complete Annotation Usage Example

357

358

```java

359

@Plugin(type = "batchsource")

360

@Name("EnhancedFileSource")

361

@Description("Enhanced file source with validation and transformation")

362

@Requirements(capabilities = {"file.access", "validation"})

363

public class EnhancedFileSourceConfig extends PluginConfig {

364

365

@Property

366

@Name("path")

367

@Description("Input file path with macro support")

368

@Macro

369

private String path;

370

371

@Property

372

@Name("format")

373

@Description("File format (csv, json, avro)")

374

private String format = "csv";

375

376

@Property

377

@Name("validateSchema")

378

@Description("Enable schema validation")

379

private boolean validateSchema = true;

380

381

@Property

382

@Name("batchSize")

383

@Description("Processing batch size")

384

private int batchSize = 1000;

385

386

// Configuration validation and getters

387

public void validate() {

388

if (path == null || path.isEmpty()) {

389

throw new IllegalArgumentException("Path is required");

390

}

391

}

392

393

public String getPath() { return path; }

394

public String getFormat() { return format; }

395

public boolean isValidateSchema() { return validateSchema; }

396

public int getBatchSize() { return batchSize; }

397

}

398

399

@TransactionPolicy(TransactionControl.EXPLICIT)

400

public class AnnotatedProcessor extends AbstractWorker {

401

402

@UseDataSet("inputData")

403

@ReadOnly

404

private FileSet inputData;

405

406

@UseDataSet("processedData")

407

@WriteOnly

408

private ObjectStore<ProcessedRecord> processedData;

409

410

@UseDataSet("errorLog")

411

@WriteOnly

412

private KeyValueTable errorLog;

413

414

@Override

415

public void configure(WorkerConfigurer configurer) {

416

configurer.setName("AnnotatedProcessor");

417

configurer.useDataset("inputData");

418

configurer.useDataset("processedData");

419

configurer.useDataset("errorLog");

420

}

421

422

@Override

423

public void run() {

424

WorkerContext context = getContext();

425

426

context.execute(new TxRunnable() {

427

@Override

428

public void run(DatasetContext txContext) throws Exception {

429

// Explicit transaction for batch processing

430

processBatch(txContext);

431

}

432

});

433

}

434

435

@Batch(500)

436

@HashPartition("recordType")

437

private void processBatch(DatasetContext context) {

438

// Process batch of 500 records partitioned by type

439

}

440

441

@Tick(delay = 60, unit = TimeUnit.SECONDS)

442

private void logStatistics() {

443

// Log statistics every minute

444

}

445

}

446

```

447

448

CDAP's annotation system provides a powerful declarative approach to configuration, reducing boilerplate code and improving maintainability while ensuring type safety and runtime validation.