0
# Stage and Plugin Management
1
2
Core components for defining individual pipeline stages and their associated plugins. Each stage represents a discrete processing step in an ETL pipeline, containing plugin configuration, validation logic, and metadata necessary for pipeline execution.
3
4
## Capabilities
5
6
### ETL Stage Configuration
7
8
Individual pipeline stage configuration containing plugin definition and stage metadata.
9
10
```java { .api }
11
/**
12
* ETL Stage Configuration representing a single processing step in a pipeline
13
*/
14
public final class ETLStage {
15
/**
16
* Create ETL stage with name and plugin
17
* @param name unique stage name within the pipeline
18
* @param plugin plugin configuration for this stage
19
*/
20
public ETLStage(String name, ETLPlugin plugin);
21
22
/**
23
* Get stage name
24
* @return unique stage name
25
*/
26
public String getName();
27
28
/**
29
* Get plugin configuration
30
* @return ETL plugin configuration
31
*/
32
public ETLPlugin getPlugin();
33
34
/**
35
* Validate stage configuration
36
* @throws IllegalArgumentException if stage configuration is invalid
37
*/
38
public void validate();
39
40
/**
41
* Upgrade stage to current version with artifact resolution
42
* @param upgradeContext context providing artifact information
43
* @return upgraded stage configuration
44
*/
45
public ETLStage upgradeStage(UpgradeContext upgradeContext);
46
}
47
```
48
49
**Usage Examples:**
50
51
```java
52
import co.cask.cdap.etl.proto.v2.*;
53
54
// Create source stage
55
Map<String, String> sourceProperties = new HashMap<>();
56
sourceProperties.put("name", "customer_data");
57
sourceProperties.put("schema.row.field", "customer_id");
58
59
ETLPlugin tableSource = new ETLPlugin(
60
"Table",
61
"batchsource",
62
sourceProperties
63
);
64
ETLStage sourceStage = new ETLStage("customers", tableSource);
65
66
// Create transform stage with JavaScript
67
Map<String, String> transformProperties = new HashMap<>();
68
transformProperties.put("script", "function transform(input, emitter, context) {" +
69
" input.processed_date = new Date().toISOString();" +
70
" emitter.emit(input);" +
71
"}");
72
73
ETLPlugin jsTransform = new ETLPlugin(
74
"JavaScript",
75
"transform",
76
transformProperties
77
);
78
ETLStage transformStage = new ETLStage("add_timestamp", jsTransform);
79
80
// Create sink stage
81
Map<String, String> sinkProperties = new HashMap<>();
82
sinkProperties.put("name", "processed_customers");
83
84
ETLPlugin tableSink = new ETLPlugin(
85
"Table",
86
"batchsink",
87
sinkProperties
88
);
89
ETLStage sinkStage = new ETLStage("output", tableSink);
90
91
// Validate stages
92
sourceStage.validate();
93
transformStage.validate();
94
sinkStage.validate();
95
```
96
97
### ETL Plugin Configuration
98
99
Plugin configuration within an ETL stage, containing plugin identification, properties, and artifact selection.
100
101
```java { .api }
102
/**
103
* Plugin Configuration defining the processing logic for an ETL stage
104
*/
105
public class ETLPlugin {
106
/**
107
* Create plugin with basic configuration
108
* @param name plugin name
109
* @param type plugin type (e.g., "batchsource", "transform", "batchsink")
110
* @param properties plugin configuration properties
111
*/
112
public ETLPlugin(String name, String type, Map<String, String> properties);
113
114
/**
115
* Create plugin with artifact specification
116
* @param name plugin name
117
* @param type plugin type
118
* @param properties plugin configuration properties
119
* @param artifact artifact selector for plugin resolution
120
*/
121
public ETLPlugin(String name, String type, Map<String, String> properties, ArtifactSelectorConfig artifact);
122
123
/**
124
* Get plugin name
125
* @return plugin name
126
*/
127
public String getName();
128
129
/**
130
* Get plugin type
131
* @return plugin type identifier
132
*/
133
public String getType();
134
135
/**
136
* Get plugin properties
137
* @return immutable map of plugin properties
138
*/
139
public Map<String, String> getProperties();
140
141
/**
142
* Get plugin properties as PluginProperties object
143
* @return PluginProperties instance for CDAP plugin system
144
*/
145
public PluginProperties getPluginProperties();
146
147
/**
148
* Get artifact configuration
149
* @return artifact selector configuration, may be null
150
*/
151
public ArtifactSelectorConfig getArtifactConfig();
152
153
/**
154
* Validate plugin configuration
155
* @throws IllegalArgumentException if plugin configuration is invalid
156
*/
157
public void validate();
158
}
159
```
160
161
**Usage Examples:**
162
163
```java
164
import co.cask.cdap.etl.proto.v2.ETLPlugin;
165
import co.cask.cdap.etl.proto.ArtifactSelectorConfig;
166
167
// Basic plugin configuration
168
Map<String, String> basicProperties = new HashMap<>();
169
basicProperties.put("path", "/data/input");
170
basicProperties.put("format", "csv");
171
basicProperties.put("delimiter", ",");
172
basicProperties.put("skipHeader", "true");
173
174
ETLPlugin basicPlugin = new ETLPlugin(
175
"File",
176
"batchsource",
177
basicProperties
178
);
179
180
// Plugin with artifact specification
181
ArtifactSelectorConfig artifact = new ArtifactSelectorConfig(
182
"SYSTEM",
183
"core-plugins",
184
"2.8.0"
185
);
186
187
Map<String, String> bigQueryProperties = new HashMap<>();
188
bigQueryProperties.put("project", "my-gcp-project");
189
bigQueryProperties.put("dataset", "analytics");
190
bigQueryProperties.put("table", "events");
191
bigQueryProperties.put("serviceFilePath", "/path/to/service-account.json");
192
193
ETLPlugin pluginWithArtifact = new ETLPlugin(
194
"BigQueryTable",
195
"batchsource",
196
bigQueryProperties,
197
artifact
198
);
199
200
// Complex transform plugin
201
Map<String, String> pythonProperties = new HashMap<>();
202
pythonProperties.put("script", "def transform(input, emitter, context):\n" +
203
" if input['age'] >= 18:\n" +
204
" input['category'] = 'adult'\n" +
205
" else:\n" +
206
" input['category'] = 'minor'\n" +
207
" emitter.emit(input)");
208
209
ETLPlugin pythonTransform = new ETLPlugin(
210
"Python",
211
"transform",
212
pythonProperties
213
);
214
215
// Validate plugins
216
basicPlugin.validate();
217
pluginWithArtifact.validate();
218
pythonTransform.validate();
219
220
// Access plugin properties
221
Map<String, String> properties = basicPlugin.getProperties();
222
String path = properties.get("path");
223
```
224
225
### Plugin Types and Common Configurations
226
227
Standard plugin types and their typical configuration patterns for different ETL operations.
228
229
**Batch Source Plugins:**
230
231
```java
232
// Table source
233
Map<String, String> tableProps = new HashMap<>();
234
tableProps.put("name", "input_table");
235
tableProps.put("schema.row.field", "id");
236
ETLPlugin tableSource = new ETLPlugin("Table", "batchsource", tableProps);
237
238
// File source
239
Map<String, String> fileProps = new HashMap<>();
240
fileProps.put("path", "/data/input.csv");
241
fileProps.put("format", "csv");
242
ETLPlugin fileSource = new ETLPlugin("File", "batchsource", fileProps);
243
244
// Database source
245
Map<String, String> dbProps = new HashMap<>();
246
dbProps.put("connectionString", "jdbc:mysql://localhost:3306/db");
247
dbProps.put("tableName", "users");
248
dbProps.put("user", "admin");
249
ETLPlugin dbSource = new ETLPlugin("Database", "batchsource", dbProps);
250
```
251
252
**Transform Plugins:**
253
254
```java
255
// JavaScript transform
256
Map<String, String> jsProps = new HashMap<>();
257
jsProps.put("script", "function transform(input, emitter, context) { /* logic */ }");
258
ETLPlugin jsTransform = new ETLPlugin("JavaScript", "transform", jsProps);
259
260
// Projection transform
261
Map<String, String> projProps = new HashMap<>();
262
projProps.put("fieldsToKeep", "id,name,email");
263
projProps.put("fieldsToRename", "old_name:new_name");
264
ETLPlugin projection = new ETLPlugin("Projection", "transform", projProps);
265
266
// Validator transform
267
Map<String, String> validProps = new HashMap<>();
268
validProps.put("validators", "email:email,age:range[0,120]");
269
ETLPlugin validator = new ETLPlugin("Validator", "transform", validProps);
270
```
271
272
**Batch Sink Plugins:**
273
274
```java
275
// Table sink
276
Map<String, String> tableSinkProps = new HashMap<>();
277
tableSinkProps.put("name", "output_table");
278
tableSinkProps.put("schema.row.field", "id");
279
ETLPlugin tableSink = new ETLPlugin("Table", "batchsink", tableSinkProps);
280
281
// File sink
282
Map<String, String> fileSinkProps = new HashMap<>();
283
fileSinkProps.put("path", "/data/output");
284
fileSinkProps.put("format", "parquet");
285
ETLPlugin fileSink = new ETLPlugin("File", "batchsink", fileSinkProps);
286
287
// Database sink
288
Map<String, String> dbSinkProps = new HashMap<>();
289
dbSinkProps.put("connectionString", "jdbc:postgresql://localhost:5432/warehouse");
290
dbSinkProps.put("tableName", "processed_data");
291
ETLPlugin dbSink = new ETLPlugin("Database", "batchsink", dbSinkProps);
292
```
293
294
### Stage Validation and Error Handling
295
296
Comprehensive validation logic ensuring stage and plugin configurations are correct and complete.
297
298
```java { .api }
299
/**
300
* Validation methods for stage and plugin configurations
301
*/
302
public class ValidationUtils {
303
/**
304
* Common validation patterns for stage names
305
* - Must not be null or empty
306
* - Must be unique within pipeline
307
* - Should follow naming conventions
308
*/
309
public static void validateStageName(String name);
310
311
/**
312
* Common validation patterns for plugin configurations
313
* - Plugin name and type must be specified
314
* - Required properties must be present
315
* - Property values must meet format requirements
316
*/
317
public static void validatePlugin(ETLPlugin plugin);
318
}
319
```
320
321
**Error Handling Examples:**
322
323
```java
324
try {
325
// Invalid stage - missing name
326
ETLStage invalidStage = new ETLStage("", plugin);
327
invalidStage.validate();
328
} catch (IllegalArgumentException e) {
329
// Handle validation error
330
System.err.println("Stage validation failed: " + e.getMessage());
331
}
332
333
try {
334
// Invalid plugin - missing required properties
335
ETLPlugin invalidPlugin = new ETLPlugin("Database", "batchsource", new HashMap<>());
336
invalidPlugin.validate();
337
} catch (IllegalArgumentException e) {
338
// Handle plugin validation error
339
System.err.println("Plugin validation failed: " + e.getMessage());
340
}
341
342
// Best practice: validate before using in pipeline
343
if (stage.getName() != null && !stage.getName().isEmpty()) {
344
try {
345
stage.validate();
346
// Safe to use stage in pipeline
347
} catch (IllegalArgumentException e) {
348
// Log error and handle gracefully
349
logger.error("Stage validation failed for {}: {}", stage.getName(), e.getMessage());
350
}
351
}
352
```