0
# Pipeline Configuration
1
2
Current version (v2) ETL pipeline configuration providing comprehensive features for both batch and streaming data processing scenarios. Includes advanced resource management, stage logging, process timing, and extensive property support.
3
4
## Capabilities
5
6
### Base ETL Configuration
7
8
Core configuration class providing common functionality for all ETL pipeline types.
9
10
```java { .api }
11
/**
12
* Base ETL Configuration class for all pipeline types
13
*/
14
public class ETLConfig extends Config implements UpgradeableConfig {
15
/**
16
* Get pipeline description
17
* @return pipeline description, may be null
18
*/
19
public String getDescription();
20
21
/**
22
* Get all pipeline stages
23
* @return immutable set of ETL stages
24
*/
25
public Set<ETLStage> getStages();
26
27
/**
28
* Get stage connections defining data flow
29
* @return immutable set of connections between stages
30
*/
31
public Set<Connection> getConnections();
32
33
/**
34
* Get resource allocation for pipeline execution
35
* @return resource configuration, defaults to 1024MB/1 core if not specified
36
*/
37
public Resources getResources();
38
39
/**
40
* Get driver resource allocation
41
* @return driver resource configuration
42
*/
43
public Resources getDriverResources();
44
45
/**
46
* Get client resource allocation
47
* @return client resource configuration
48
*/
49
public Resources getClientResources();
50
51
/**
52
* Get number of records for preview
53
* @return preview record count, defaults to 100
54
*/
55
public int getNumOfRecordsPreview();
56
57
/**
58
* Check if stage logging is enabled
59
* @return true if stage logging enabled, defaults to true
60
*/
61
public boolean isStageLoggingEnabled();
62
63
/**
64
* Check if process timing is enabled
65
* @return true if process timing enabled, defaults to true
66
*/
67
public boolean isProcessTimingEnabled();
68
69
/**
70
* Get pipeline properties
71
* @return immutable map of pipeline properties
72
*/
73
public Map<String, String> getProperties();
74
75
/**
76
* Validate configuration correctness
77
* @throws IllegalArgumentException if configuration is invalid
78
*/
79
public void validate();
80
81
/**
82
* Check if configuration can be upgraded
83
* @return false for v2 configurations (latest version)
84
*/
85
public boolean canUpgrade();
86
87
/**
88
* Upgrade configuration to next version
89
* @param upgradeContext context for upgrading
90
* @throws UnsupportedOperationException for v2 configurations
91
*/
92
public UpgradeableConfig upgrade(UpgradeContext upgradeContext);
93
}
94
```
95
96
**Usage Example:**
97
98
```java
99
import co.cask.cdap.etl.proto.v2.*;
100
import co.cask.cdap.api.Resources;
101
102
// Using builder pattern to create base configuration
103
ETLConfig.Builder<?> builder = new ETLConfig.Builder<ETLConfig.Builder<?>>() {
104
// Abstract builder implementation would be provided by concrete subclasses
105
};
106
107
// Configure resources and properties
108
Map<String, String> properties = new HashMap<>();
109
properties.put("custom.property", "value");
110
111
builder.setResources(new Resources(2048, 4))
112
.setDriverResources(new Resources(1024, 2))
113
.setClientResources(new Resources(512, 1))
114
.setNumOfRecordsPreview(500)
115
.setProperties(properties);
116
117
// Disable optional features
118
builder.disableStageLogging()
119
.disableProcessTiming();
120
```
121
122
### Batch ETL Configuration
123
124
Configuration specific to batch ETL pipelines with scheduling, execution engine selection, and post-action support.
125
126
```java { .api }
127
/**
128
* ETL Batch Configuration for scheduled batch processing pipelines
129
*/
130
public final class ETLBatchConfig extends ETLConfig {
131
/**
132
* Get post-execution actions
133
* @return immutable list of post-actions to execute after pipeline completion
134
*/
135
public List<ETLStage> getPostActions();
136
137
/**
138
* Get execution engine
139
* @return execution engine (MapReduce or Spark), defaults to MapReduce
140
*/
141
public Engine getEngine();
142
143
/**
144
* Get schedule configuration
145
* @return schedule string, may be null
146
*/
147
public String getSchedule();
148
149
/**
150
* Get maximum concurrent runs
151
* @return max concurrent runs, may be null for unlimited
152
*/
153
public Integer getMaxConcurrentRuns();
154
155
/**
156
* Convert old configuration format to current v2 format
157
* @return v2 batch configuration
158
*/
159
public ETLBatchConfig convertOldConfig();
160
161
/**
162
* Create builder for batch configuration
163
* @return new builder instance
164
*/
165
public static Builder builder();
166
167
/**
168
* Create builder with schedule (deprecated)
169
* @param schedule time schedule
170
* @return new builder instance
171
* @deprecated use builder() and setTimeSchedule() instead
172
*/
173
@Deprecated
174
public static Builder builder(String schedule);
175
}
176
```
177
178
**Usage Example:**
179
180
```java
181
import co.cask.cdap.etl.proto.v2.*;
182
import co.cask.cdap.etl.api.Engine;
183
import co.cask.cdap.api.Resources;
184
185
// Build batch ETL configuration
186
ETLBatchConfig batchConfig = ETLBatchConfig.builder()
187
.setTimeSchedule("0 0 2 * * ?") // Daily at 2 AM
188
.setEngine(Engine.SPARK)
189
.setMaxConcurrentRuns(3)
190
.addStage(sourceStage)
191
.addStage(transformStage)
192
.addStage(sinkStage)
193
.addConnection("source", "transform")
194
.addConnection("transform", "sink")
195
.addPostAction(cleanupAction)
196
.setResources(new Resources(4096, 8))
197
.setDriverResources(new Resources(2048, 4))
198
.build();
199
200
// Validate and use
201
batchConfig.validate();
202
```
203
204
### Data Streams Configuration
205
206
Configuration for streaming ETL pipelines with batch interval settings, checkpoint management, and graceful shutdown options.
207
208
```java { .api }
209
/**
210
* Data Streams Configuration for real-time streaming pipelines
211
*/
212
public final class DataStreamsConfig extends ETLConfig {
213
/**
214
* Get batch processing interval
215
* @return batch interval string (e.g., "1m", "30s")
216
*/
217
public String getBatchInterval();
218
219
/**
220
* Check if running in unit test mode
221
* @return true if in unit test mode
222
*/
223
public boolean isUnitTest();
224
225
/**
226
* Check if checkpoints are disabled
227
* @return true if checkpoints disabled, defaults to false
228
*/
229
public boolean checkpointsDisabled();
230
231
/**
232
* Get additional JVM options
233
* @return extra Java options string, defaults to empty
234
*/
235
public String getExtraJavaOpts();
236
237
/**
238
* Get graceful stop setting
239
* @return true for graceful stop, defaults to true
240
*/
241
public Boolean getStopGracefully();
242
243
/**
244
* Get checkpoint directory
245
* @return checkpoint directory path, may be null
246
*/
247
public String getCheckpointDir();
248
249
/**
250
* Create builder for data streams configuration
251
* @return new builder instance
252
*/
253
public static Builder builder();
254
}
255
```
256
257
**Usage Example:**
258
259
```java
260
import co.cask.cdap.etl.proto.v2.DataStreamsConfig;
261
262
// Build streaming ETL configuration
263
DataStreamsConfig streamConfig = DataStreamsConfig.builder()
264
.setBatchInterval("30s")
265
.setCheckpointDir("/tmp/streaming-checkpoints")
266
.setStopGracefully(true)
267
.addStage(kafkaSource)
268
.addStage(realtimeTransform)
269
.addStage(tablesSink)
270
.addConnection("kafka", "transform")
271
.addConnection("transform", "sink")
272
.setResources(new Resources(8192, 16))
273
.build();
274
275
// Configure for production use
276
if (!streamConfig.checkpointsDisabled()) {
277
// Checkpoints enabled - configure persistent storage
278
System.out.println("Checkpoints enabled at: " + streamConfig.getCheckpointDir());
279
}
280
```
281
282
### Configuration Builder Pattern
283
284
Abstract builder pattern providing fluent API for constructing ETL configurations with validation and type safety.
285
286
```java { .api }
287
/**
288
* Abstract builder for ETL configurations
289
* @param <T> The concrete builder type for method chaining
290
*/
291
public abstract static class Builder<T extends Builder> {
292
/**
293
* Add a stage to the pipeline
294
* @param stage ETL stage to add
295
* @return builder instance for chaining
296
*/
297
public T addStage(ETLStage stage);
298
299
/**
300
* Add connection between stages
301
* @param from source stage name
302
* @param to target stage name
303
* @return builder instance for chaining
304
*/
305
public T addConnection(String from, String to);
306
307
/**
308
* Add connection with port specification
309
* @param from source stage name
310
* @param to target stage name
311
* @param port output port name
312
* @return builder instance for chaining
313
*/
314
public T addConnection(String from, String to, String port);
315
316
/**
317
* Add conditional connection
318
* @param from source stage name
319
* @param to target stage name
320
* @param condition connection condition
321
* @return builder instance for chaining
322
*/
323
public T addConnection(String from, String to, Boolean condition);
324
325
/**
326
* Add connection object
327
* @param connection connection to add
328
* @return builder instance for chaining
329
*/
330
public T addConnection(Connection connection);
331
332
/**
333
* Add multiple connections
334
* @param connections collection of connections
335
* @return builder instance for chaining
336
*/
337
public T addConnections(Collection<Connection> connections);
338
339
/**
340
* Set resource allocation
341
* @param resources resource configuration
342
* @return builder instance for chaining
343
*/
344
public T setResources(Resources resources);
345
346
/**
347
* Set driver resource allocation
348
* @param resources driver resource configuration
349
* @return builder instance for chaining
350
*/
351
public T setDriverResources(Resources resources);
352
353
/**
354
* Set client resource allocation
355
* @param resources client resource configuration
356
* @return builder instance for chaining
357
*/
358
public T setClientResources(Resources resources);
359
360
/**
361
* Set number of preview records
362
* @param numOfRecordsPreview preview record count
363
* @return builder instance for chaining
364
*/
365
public T setNumOfRecordsPreview(int numOfRecordsPreview);
366
367
/**
368
* Disable stage logging
369
* @return builder instance for chaining
370
*/
371
public T disableStageLogging();
372
373
/**
374
* Disable process timing
375
* @return builder instance for chaining
376
*/
377
public T disableProcessTiming();
378
379
/**
380
* Set pipeline properties
381
* @param properties pipeline properties map
382
* @return builder instance for chaining
383
*/
384
public T setProperties(Map<String, String> properties);
385
}
386
```