or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

annotations.mdapplication-framework.mddataset-management.mdindex.mdmapreduce-programs.mdplugin-framework.mdscheduling.mdservice-programs.mdspark-programs.mdsystem-services.mdtransactions.mdworker-programs.mdworkflow-programs.md

plugin-framework.mddocs/

0

# Plugin Framework

1

2

CDAP's Plugin Framework provides an extensible architecture for adding custom processing logic, data sources, sinks, and transformations to applications without modifying core application code.

3

4

## Core Plugin Classes

5

6

### PluginConfig

7

8

```java { .api }

9

public class PluginConfig {

10

// Base plugin configuration class

11

// Extend this class to add plugin-specific configuration properties

12

}

13

```

14

15

Base configuration class for plugins. All plugin configurations should extend this class and use annotations to define configurable properties.

16

17

### Plugin

18

19

```java { .api }

20

public class Plugin {

21

public String getType();

22

public String getName();

23

public ArtifactId getArtifactId();

24

public PluginClass getPluginClass();

25

public PluginProperties getProperties();

26

}

27

```

28

29

Represents a plugin instance with its metadata and configuration.

30

31

### PluginClass

32

33

```java { .api }

34

public class PluginClass {

35

public String getType();

36

public String getName();

37

public String getDescription();

38

public String getClassName();

39

public String getConfigFieldName();

40

public Map<String, PluginPropertyField> getProperties();

41

public Set<String> getEndpoints();

42

public ArtifactId getParent();

43

}

44

```

45

46

Metadata describing a plugin class including its properties and capabilities.

47

48

## Plugin Context and Management

49

50

### PluginContext

51

52

```java { .api }

53

public interface PluginContext {

54

<T> T newPluginInstance(String pluginId) throws InstantiationException;

55

<T> T newPluginInstance(String pluginId, MacroEvaluator macroEvaluator)

56

throws InstantiationException;

57

58

<T> Class<T> loadPluginClass(String pluginId);

59

60

boolean isPluginAvailable(String pluginId);

61

62

Map<String, String> getPluginProperties(String pluginId);

63

PluginProperties getPluginProperties(String pluginId, MacroEvaluator macroEvaluator);

64

}

65

```

66

67

Runtime context for accessing and instantiating plugins within programs and services.

68

69

### PluginConfigurer

70

71

```java { .api }

72

public interface PluginConfigurer {

73

void usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties);

74

void usePlugin(String pluginType, String pluginName, String pluginId, PluginProperties properties,

75

PluginSelector selector);

76

77

<T> T usePluginClass(String pluginType, String pluginName, String pluginId, PluginProperties properties);

78

<T> T usePluginClass(String pluginType, String pluginName, String pluginId, PluginProperties properties,

79

PluginSelector selector);

80

}

81

```

82

83

Interface for configuring plugin usage in applications and programs.

84

85

## Plugin Properties and Configuration

86

87

### PluginProperties

88

89

```java { .api }

90

public class PluginProperties {

91

public static Builder builder();

92

93

public Map<String, String> getProperties();

94

public String get(String key);

95

public String get(String key, String defaultValue);

96

97

public static class Builder {

98

public Builder add(String key, String value);

99

public Builder addAll(Map<String, String> properties);

100

public PluginProperties build();

101

}

102

}

103

```

104

105

Properties container for plugin configuration values.

106

107

### PluginPropertyField

108

109

```java { .api }

110

public class PluginPropertyField {

111

public String getName();

112

public String getType();

113

public String getDescription();

114

public boolean isRequired();

115

public boolean isMacroSupported();

116

public Set<String> getChildren();

117

}

118

```

119

120

Metadata for individual plugin configuration properties.

121

122

## Plugin Selection

123

124

### PluginSelector

125

126

```java { .api }

127

public interface PluginSelector {

128

Map.Entry<ArtifactId, PluginClass> select(SortedMap<ArtifactId, PluginClass> plugins);

129

}

130

```

131

132

Interface for custom plugin selection logic when multiple versions are available.

133

134

### Requirements

135

136

```java { .api }

137

public class Requirements {

138

public static Builder builder();

139

140

public Set<String> getCapabilities();

141

public Set<String> getDatasetTypes();

142

143

public static class Builder {

144

public Builder addCapabilities(String... capabilities);

145

public Builder addDatasetTypes(String... datasetTypes);

146

public Requirements build();

147

}

148

}

149

```

150

151

Specifies requirements that must be satisfied for plugin execution.

152

153

## Plugin Annotations

154

155

### @Plugin

156

157

```java { .api }

158

@Target(ElementType.TYPE)

159

@Retention(RetentionPolicy.RUNTIME)

160

public @interface Plugin {

161

String type();

162

}

163

```

164

165

Marks a class as a CDAP plugin of the specified type.

166

167

### @Name

168

169

```java { .api }

170

@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})

171

@Retention(RetentionPolicy.RUNTIME)

172

public @interface Name {

173

String value();

174

}

175

```

176

177

Specifies the name for plugins, plugin properties, or other named elements.

178

179

### @Description

180

181

```java { .api }

182

@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})

183

@Retention(RetentionPolicy.RUNTIME)

184

public @interface Description {

185

String value();

186

}

187

```

188

189

Provides human-readable descriptions for plugins and properties.

190

191

### @Macro

192

193

```java { .api }

194

@Target(ElementType.FIELD)

195

@Retention(RetentionPolicy.RUNTIME)

196

public @interface Macro {

197

}

198

```

199

200

Indicates that a plugin property supports macro substitution.

201

202

## Usage Examples

203

204

### Basic Plugin Implementation

205

206

```java

207

@Plugin(type = "transform")

208

@Name("FieldUppercase")

209

@Description("Transforms specified field values to uppercase")

210

public class FieldUppercaseTransform extends PluginConfig {

211

212

@Name("field")

213

@Description("Name of the field to transform")

214

@Macro

215

private String fieldName;

216

217

@Name("preserveOriginal")

218

@Description("Whether to preserve the original field value")

219

private boolean preserveOriginal = false;

220

221

public String getFieldName() {

222

return fieldName;

223

}

224

225

public boolean shouldPreserveOriginal() {

226

return preserveOriginal;

227

}

228

229

public Record transform(Record input) {

230

Record.Builder builder = Record.builder(input);

231

232

String originalValue = input.get(fieldName);

233

if (originalValue != null) {

234

String transformedValue = originalValue.toUpperCase();

235

builder.set(fieldName, transformedValue);

236

237

if (preserveOriginal) {

238

builder.set(fieldName + "_original", originalValue);

239

}

240

}

241

242

return builder.build();

243

}

244

}

245

```

246

247

### Data Source Plugin

248

249

```java

250

@Plugin(type = "batchsource")

251

@Name("FileSource")

252

@Description("Reads data from files in specified format")

253

public class FileSourceConfig extends PluginConfig {

254

255

@Name("path")

256

@Description("Path to input files")

257

@Macro

258

private String path;

259

260

@Name("format")

261

@Description("Input file format")

262

private String format = "csv";

263

264

@Name("schema")

265

@Description("Schema of the input data")

266

private String schema;

267

268

// Getters and validation methods

269

public String getPath() { return path; }

270

public String getFormat() { return format; }

271

public Schema getSchema() { return Schema.parseJson(schema); }

272

273

public void validate() {

274

if (path == null || path.isEmpty()) {

275

throw new IllegalArgumentException("Path must be specified");

276

}

277

278

if (schema == null || schema.isEmpty()) {

279

throw new IllegalArgumentException("Schema must be specified");

280

}

281

}

282

}

283

284

@Plugin(type = "batchsource")

285

public class FileSource extends BatchSource<NullWritable, Text, StructuredRecord> {

286

287

private final FileSourceConfig config;

288

289

public FileSource(FileSourceConfig config) {

290

this.config = config;

291

}

292

293

@Override

294

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

295

config.validate();

296

pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());

297

}

298

299

@Override

300

public void prepareRun(BatchSourceContext context) throws Exception {

301

Job job = JobUtils.createInstance();

302

FileInputFormat.addInputPath(job, new Path(config.getPath()));

303

context.setInput(Input.of(config.getReferenceName(), new InputFormatProvider(config.getFormat(), job.getConfiguration())));

304

}

305

}

306

```

307

308

### Plugin Usage in Applications

309

310

```java

311

public class PluginApplication extends AbstractApplication<Config> {

312

313

@Override

314

public void configure() {

315

setName("PluginBasedApp");

316

317

// Use transform plugin

318

usePlugin("transform", "fieldTransform", "transformer1",

319

PluginProperties.builder()

320

.add("field", "customerName")

321

.add("operation", "uppercase")

322

.build());

323

324

// Use data source plugin

325

usePlugin("batchsource", "fileSource", "source1",

326

PluginProperties.builder()

327

.add("path", "/data/input")

328

.add("format", "json")

329

.add("schema", customerSchema)

330

.build());

331

332

addMapReduce(new PluginBasedProcessor());

333

}

334

}

335

```

336

337

### Plugin Usage in Programs

338

339

```java

340

public class PluginBasedMapReduce extends AbstractMapReduce {

341

342

@Override

343

public void configure(MapReduceConfigurer configurer) {

344

configurer.usePlugin("validator", "dataValidator", "validator1",

345

PluginProperties.builder()

346

.add("rules", validationRules)

347

.build());

348

}

349

350

@Override

351

public void initialize(MapReduceContext context) throws Exception {

352

Job job = context.getHadoopJob();

353

job.setMapperClass(PluginAwareMapper.class);

354

355

context.addInput(Input.ofDataset("inputData"));

356

context.addOutput(Output.ofDataset("validatedData"));

357

}

358

359

public static class PluginAwareMapper extends Mapper<byte[], Record, byte[], Record> {

360

private DataValidator validator;

361

362

@Override

363

protected void setup(Context context) throws IOException, InterruptedException {

364

MapReduceTaskContext<byte[], Record, byte[], Record> cdapContext =

365

(MapReduceTaskContext<byte[], Record, byte[], Record>) context;

366

367

validator = cdapContext.getPluginContext().newPluginInstance("validator1");

368

}

369

370

@Override

371

protected void map(byte[] key, Record record, Context context)

372

throws IOException, InterruptedException {

373

374

if (validator.isValid(record)) {

375

context.write(key, record);

376

} else {

377

// Log invalid record or write to error dataset

378

context.getCounter("Validation", "InvalidRecords").increment(1);

379

}

380

}

381

}

382

}

383

```

384

385

### Advanced Plugin with Dependencies

386

387

```java

388

@Plugin(type = "sink")

389

@Name("DatabaseSink")

390

@Description("Writes data to database tables")

391

@Requirements(capabilities = {"database.connection"})

392

public class DatabaseSinkConfig extends PluginConfig {

393

394

@Name("connectionString")

395

@Description("Database connection string")

396

@Macro

397

private String connectionString;

398

399

@Name("tableName")

400

@Description("Target table name")

401

@Macro

402

private String tableName;

403

404

@Name("batchSize")

405

@Description("Batch size for inserts")

406

private int batchSize = 100;

407

408

@Name("credentials")

409

@Description("Database credentials")

410

private DatabaseCredentials credentials;

411

412

// Configuration validation

413

public void validate() {

414

if (connectionString == null || connectionString.isEmpty()) {

415

throw new IllegalArgumentException("Connection string is required");

416

}

417

418

if (tableName == null || tableName.isEmpty()) {

419

throw new IllegalArgumentException("Table name is required");

420

}

421

422

if (batchSize <= 0) {

423

throw new IllegalArgumentException("Batch size must be positive");

424

}

425

}

426

427

// Getters

428

public String getConnectionString() { return connectionString; }

429

public String getTableName() { return tableName; }

430

public int getBatchSize() { return batchSize; }

431

public DatabaseCredentials getCredentials() { return credentials; }

432

}

433

434

@Plugin(type = "sink")

435

public class DatabaseSink extends BatchSink<StructuredRecord, NullWritable, NullWritable> {

436

437

private final DatabaseSinkConfig config;

438

439

public DatabaseSink(DatabaseSinkConfig config) {

440

this.config = config;

441

}

442

443

@Override

444

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

445

config.validate();

446

447

// Verify database connection and table schema

448

try (Connection connection = createConnection()) {

449

validateTableSchema(connection, config.getTableName());

450

} catch (SQLException e) {

451

throw new IllegalArgumentException("Cannot connect to database: " + e.getMessage(), e);

452

}

453

}

454

455

@Override

456

public void prepareRun(BatchSinkContext context) throws Exception {

457

Job job = JobUtils.createInstance();

458

job.getConfiguration().set("db.connection.string", config.getConnectionString());

459

job.getConfiguration().set("db.table.name", config.getTableName());

460

job.getConfiguration().setInt("db.batch.size", config.getBatchSize());

461

462

context.addOutput(Output.of(config.getReferenceName(), new OutputFormatProvider("DatabaseOutputFormat", job.getConfiguration())));

463

}

464

}

465

```

466

467

This plugin framework enables modular, configurable, and reusable components that can be shared across different applications and use cases.