or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

artifact-resource-management.mdconnection-data-flow.mdindex.mdlegacy-version-support.mdpipeline-configuration.mdpipeline-triggering.mdstage-plugin-management.md

stage-plugin-management.mddocs/

0

# Stage and Plugin Management

1

2

Core components for defining individual pipeline stages and their associated plugins. Each stage represents a discrete processing step in an ETL pipeline, containing plugin configuration, validation logic, and metadata necessary for pipeline execution.

3

4

## Capabilities

5

6

### ETL Stage Configuration

7

8

Individual pipeline stage configuration containing plugin definition and stage metadata.

9

10

```java { .api }

11

/**

12

* ETL Stage Configuration representing a single processing step in a pipeline

13

*/

14

public final class ETLStage {

15

/**

16

* Create ETL stage with name and plugin

17

* @param name unique stage name within the pipeline

18

* @param plugin plugin configuration for this stage

19

*/

20

public ETLStage(String name, ETLPlugin plugin);

21

22

/**

23

* Get stage name

24

* @return unique stage name

25

*/

26

public String getName();

27

28

/**

29

* Get plugin configuration

30

* @return ETL plugin configuration

31

*/

32

public ETLPlugin getPlugin();

33

34

/**

35

* Validate stage configuration

36

* @throws IllegalArgumentException if stage configuration is invalid

37

*/

38

public void validate();

39

40

/**

41

* Upgrade stage to current version with artifact resolution

42

* @param upgradeContext context providing artifact information

43

* @return upgraded stage configuration

44

*/

45

public ETLStage upgradeStage(UpgradeContext upgradeContext);

46

}

47

```

48

49

**Usage Examples:**

50

51

```java

52

import co.cask.cdap.etl.proto.v2.*;

53

54

// Create source stage

55

Map<String, String> sourceProperties = new HashMap<>();

56

sourceProperties.put("name", "customer_data");

57

sourceProperties.put("schema.row.field", "customer_id");

58

59

ETLPlugin tableSource = new ETLPlugin(

60

"Table",

61

"batchsource",

62

sourceProperties

63

);

64

ETLStage sourceStage = new ETLStage("customers", tableSource);

65

66

// Create transform stage with JavaScript

67

Map<String, String> transformProperties = new HashMap<>();

68

transformProperties.put("script", "function transform(input, emitter, context) {" +

69

" input.processed_date = new Date().toISOString();" +

70

" emitter.emit(input);" +

71

"}");

72

73

ETLPlugin jsTransform = new ETLPlugin(

74

"JavaScript",

75

"transform",

76

transformProperties

77

);

78

ETLStage transformStage = new ETLStage("add_timestamp", jsTransform);

79

80

// Create sink stage

81

Map<String, String> sinkProperties = new HashMap<>();

82

sinkProperties.put("name", "processed_customers");

83

84

ETLPlugin tableSink = new ETLPlugin(

85

"Table",

86

"batchsink",

87

sinkProperties

88

);

89

ETLStage sinkStage = new ETLStage("output", tableSink);

90

91

// Validate stages

92

sourceStage.validate();

93

transformStage.validate();

94

sinkStage.validate();

95

```

96

97

### ETL Plugin Configuration

98

99

Plugin configuration within an ETL stage, containing plugin identification, properties, and artifact selection.

100

101

```java { .api }

102

/**

103

* Plugin Configuration defining the processing logic for an ETL stage

104

*/

105

public class ETLPlugin {

106

/**

107

* Create plugin with basic configuration

108

* @param name plugin name

109

* @param type plugin type (e.g., "batchsource", "transform", "batchsink")

110

* @param properties plugin configuration properties

111

*/

112

public ETLPlugin(String name, String type, Map<String, String> properties);

113

114

/**

115

* Create plugin with artifact specification

116

* @param name plugin name

117

* @param type plugin type

118

* @param properties plugin configuration properties

119

* @param artifact artifact selector for plugin resolution

120

*/

121

public ETLPlugin(String name, String type, Map<String, String> properties, ArtifactSelectorConfig artifact);

122

123

/**

124

* Get plugin name

125

* @return plugin name

126

*/

127

public String getName();

128

129

/**

130

* Get plugin type

131

* @return plugin type identifier

132

*/

133

public String getType();

134

135

/**

136

* Get plugin properties

137

* @return immutable map of plugin properties

138

*/

139

public Map<String, String> getProperties();

140

141

/**

142

* Get plugin properties as PluginProperties object

143

* @return PluginProperties instance for CDAP plugin system

144

*/

145

public PluginProperties getPluginProperties();

146

147

/**

148

* Get artifact configuration

149

* @return artifact selector configuration, may be null

150

*/

151

public ArtifactSelectorConfig getArtifactConfig();

152

153

/**

154

* Validate plugin configuration

155

* @throws IllegalArgumentException if plugin configuration is invalid

156

*/

157

public void validate();

158

}

159

```

160

161

**Usage Examples:**

162

163

```java

164

import co.cask.cdap.etl.proto.v2.ETLPlugin;

165

import co.cask.cdap.etl.proto.ArtifactSelectorConfig;

166

167

// Basic plugin configuration

168

Map<String, String> basicProperties = new HashMap<>();

169

basicProperties.put("path", "/data/input");

170

basicProperties.put("format", "csv");

171

basicProperties.put("delimiter", ",");

172

basicProperties.put("skipHeader", "true");

173

174

ETLPlugin basicPlugin = new ETLPlugin(

175

"File",

176

"batchsource",

177

basicProperties

178

);

179

180

// Plugin with artifact specification

181

ArtifactSelectorConfig artifact = new ArtifactSelectorConfig(

182

"SYSTEM",

183

"core-plugins",

184

"2.8.0"

185

);

186

187

Map<String, String> bigQueryProperties = new HashMap<>();

188

bigQueryProperties.put("project", "my-gcp-project");

189

bigQueryProperties.put("dataset", "analytics");

190

bigQueryProperties.put("table", "events");

191

bigQueryProperties.put("serviceFilePath", "/path/to/service-account.json");

192

193

ETLPlugin pluginWithArtifact = new ETLPlugin(

194

"BigQueryTable",

195

"batchsource",

196

bigQueryProperties,

197

artifact

198

);

199

200

// Complex transform plugin

201

Map<String, String> pythonProperties = new HashMap<>();

202

pythonProperties.put("script", "def transform(input, emitter, context):\n" +

203

" if input['age'] >= 18:\n" +

204

" input['category'] = 'adult'\n" +

205

" else:\n" +

206

" input['category'] = 'minor'\n" +

207

" emitter.emit(input)");

208

209

ETLPlugin pythonTransform = new ETLPlugin(

210

"Python",

211

"transform",

212

pythonProperties

213

);

214

215

// Validate plugins

216

basicPlugin.validate();

217

pluginWithArtifact.validate();

218

pythonTransform.validate();

219

220

// Access plugin properties

221

Map<String, String> properties = basicPlugin.getProperties();

222

String path = properties.get("path");

223

```

224

225

### Plugin Types and Common Configurations

226

227

Standard plugin types and their typical configuration patterns for different ETL operations.

228

229

**Batch Source Plugins:**

230

231

```java

232

// Table source

233

Map<String, String> tableProps = new HashMap<>();

234

tableProps.put("name", "input_table");

235

tableProps.put("schema.row.field", "id");

236

ETLPlugin tableSource = new ETLPlugin("Table", "batchsource", tableProps);

237

238

// File source

239

Map<String, String> fileProps = new HashMap<>();

240

fileProps.put("path", "/data/input.csv");

241

fileProps.put("format", "csv");

242

ETLPlugin fileSource = new ETLPlugin("File", "batchsource", fileProps);

243

244

// Database source

245

Map<String, String> dbProps = new HashMap<>();

246

dbProps.put("connectionString", "jdbc:mysql://localhost:3306/db");

247

dbProps.put("tableName", "users");

248

dbProps.put("user", "admin");

249

ETLPlugin dbSource = new ETLPlugin("Database", "batchsource", dbProps);

250

```

251

252

**Transform Plugins:**

253

254

```java

255

// JavaScript transform

256

Map<String, String> jsProps = new HashMap<>();

257

jsProps.put("script", "function transform(input, emitter, context) { /* logic */ }");

258

ETLPlugin jsTransform = new ETLPlugin("JavaScript", "transform", jsProps);

259

260

// Projection transform

261

Map<String, String> projProps = new HashMap<>();

262

projProps.put("fieldsToKeep", "id,name,email");

263

projProps.put("fieldsToRename", "old_name:new_name");

264

ETLPlugin projection = new ETLPlugin("Projection", "transform", projProps);

265

266

// Validator transform

267

Map<String, String> validProps = new HashMap<>();

268

validProps.put("validators", "email:email,age:range[0,120]");

269

ETLPlugin validator = new ETLPlugin("Validator", "transform", validProps);

270

```

271

272

**Batch Sink Plugins:**

273

274

```java

275

// Table sink

276

Map<String, String> tableSinkProps = new HashMap<>();

277

tableSinkProps.put("name", "output_table");

278

tableSinkProps.put("schema.row.field", "id");

279

ETLPlugin tableSink = new ETLPlugin("Table", "batchsink", tableSinkProps);

280

281

// File sink

282

Map<String, String> fileSinkProps = new HashMap<>();

283

fileSinkProps.put("path", "/data/output");

284

fileSinkProps.put("format", "parquet");

285

ETLPlugin fileSink = new ETLPlugin("File", "batchsink", fileSinkProps);

286

287

// Database sink

288

Map<String, String> dbSinkProps = new HashMap<>();

289

dbSinkProps.put("connectionString", "jdbc:postgresql://localhost:5432/warehouse");

290

dbSinkProps.put("tableName", "processed_data");

291

ETLPlugin dbSink = new ETLPlugin("Database", "batchsink", dbSinkProps);

292

```

293

294

### Stage Validation and Error Handling

295

296

Comprehensive validation logic ensuring stage and plugin configurations are correct and complete.

297

298

```java { .api }

299

/**

300

* Validation methods for stage and plugin configurations

301

*/

302

public class ValidationUtils {

303

/**

304

* Common validation patterns for stage names

305

* - Must not be null or empty

306

* - Must be unique within pipeline

307

* - Should follow naming conventions

308

*/

309

public static void validateStageName(String name);

310

311

/**

312

* Common validation patterns for plugin configurations

313

* - Plugin name and type must be specified

314

* - Required properties must be present

315

* - Property values must meet format requirements

316

*/

317

public static void validatePlugin(ETLPlugin plugin);

318

}

319

```

320

321

**Error Handling Examples:**

322

323

```java

324

try {

325

// Invalid stage - missing name

326

ETLStage invalidStage = new ETLStage("", plugin);

327

invalidStage.validate();

328

} catch (IllegalArgumentException e) {

329

// Handle validation error

330

System.err.println("Stage validation failed: " + e.getMessage());

331

}

332

333

try {

334

// Invalid plugin - missing required properties

335

ETLPlugin invalidPlugin = new ETLPlugin("Database", "batchsource", new HashMap<>());

336

invalidPlugin.validate();

337

} catch (IllegalArgumentException e) {

338

// Handle plugin validation error

339

System.err.println("Plugin validation failed: " + e.getMessage());

340

}

341

342

// Best practice: validate before using in pipeline

343

if (stage.getName() != null && !stage.getName().isEmpty()) {

344

try {

345

stage.validate();

346

// Safe to use stage in pipeline

347

} catch (IllegalArgumentException e) {

348

// Log error and handle gracefully

349

logger.error("Stage validation failed for {}: {}", stage.getName(), e.getMessage());

350

}

351

}

352

```