0
# Core Pipeline Components
1
2
Core ETL pipeline interfaces and classes for building transformation stages, handling data flow, and managing stage lifecycle in CDAP ETL pipelines.
3
4
## Transform Operations
5
6
### Transform<IN, OUT>
7
8
Base abstract class for transformation stages in ETL pipelines.
9
10
```java { .api }
11
package io.cdap.cdap.etl.api;
12
13
public abstract class Transform<IN, OUT>
14
implements StageLifecycle<TransformContext>,
15
SubmitterLifecycle<StageSubmitterContext>,
16
Transformation<IN, OUT>, PipelineConfigurable {
17
18
public static final String PLUGIN_TYPE = "transform";
19
20
// Lifecycle methods
21
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}
22
public void initialize(TransformContext context) throws Exception {}
23
public void prepareRun(StageSubmitterContext context) throws Exception {}
24
public void onRunFinish(boolean succeeded, StageSubmitterContext context) {}
25
public void destroy() {}
26
27
// Access to runtime context
28
protected TransformContext getContext();
29
}
30
```
31
32
**Usage Example:**
33
```java
34
@Plugin(type = Transform.PLUGIN_TYPE)
35
@Name("TextCleaner")
36
@Description("Cleans and normalizes text fields")
37
public class TextCleanerTransform extends Transform<StructuredRecord, StructuredRecord> {
38
39
private final Config config;
40
private Schema outputSchema;
41
42
@Override
43
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
44
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
45
Schema inputSchema = stageConfigurer.getInputSchema();
46
47
if (inputSchema != null) {
48
outputSchema = buildOutputSchema(inputSchema);
49
stageConfigurer.setOutputSchema(outputSchema);
50
}
51
52
config.validate(stageConfigurer.getFailureCollector());
53
}
54
55
@Override
56
public void initialize(TransformContext context) throws Exception {
57
outputSchema = context.getOutputSchema();
58
}
59
60
@Override
61
public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter)
62
throws Exception {
63
StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema);
64
65
for (Schema.Field field : input.getSchema().getFields()) {
66
Object value = input.get(field.getName());
67
if (field.getSchema().getType() == Schema.Type.STRING && value != null) {
68
// Clean text: trim, normalize whitespace, remove special chars
69
String cleanedValue = value.toString().trim().replaceAll("\\s+", " ");
70
builder.set(field.getName(), cleanedValue);
71
} else {
72
builder.set(field.getName(), value);
73
}
74
}
75
76
emitter.emit(builder.build());
77
}
78
}
79
```
80
81
### Transformation<IN, OUT>
82
83
Core interface for data transformation operations.
84
85
```java { .api }
86
package io.cdap.cdap.etl.api;
87
88
public interface Transformation<IN, OUT> {
89
/**
90
* Transform input record and emit zero or more output records.
91
*
92
* @param input input record to transform
93
* @param emitter emitter for output records
94
* @throws Exception if transformation fails
95
*/
96
void transform(IN input, Emitter<OUT> emitter) throws Exception;
97
}
98
```
99
100
### Multi-Output Transforms
101
102
#### SplitterTransform<IN, OUT>
103
104
Transform that splits data to multiple named outputs.
105
106
```java { .api }
107
package io.cdap.cdap.etl.api;
108
109
public abstract class SplitterTransform<IN, OUT>
110
implements MultiOutputPipelineConfigurable,
111
StageLifecycle<TransformContext>,
112
SubmitterLifecycle<StageSubmitterContext>,
113
MultiOutputTransformation<IN, OUT> {
114
115
public static final String PLUGIN_TYPE = "splittertransform";
116
}
117
```
118
119
#### MultiOutputTransformation<IN, E>
120
121
Interface for transformations with multiple outputs.
122
123
```java { .api }
124
package io.cdap.cdap.etl.api;
125
126
public interface MultiOutputTransformation<IN, E> {
127
/**
128
* Transform input and emit to multiple named outputs.
129
*/
130
void transform(IN input, MultiOutputEmitter<E> emitter) throws Exception;
131
}
132
```
133
134
**Usage Example:**
135
```java
136
public class DataSplitter extends SplitterTransform<StructuredRecord, StructuredRecord> {
137
138
@Override
139
public void transform(StructuredRecord input, MultiOutputEmitter<StructuredRecord> emitter) {
140
String category = input.get("category");
141
142
if ("valid".equals(category)) {
143
emitter.emit("valid-data", input);
144
} else if ("error".equals(category)) {
145
emitter.emit("error-data", input);
146
} else {
147
emitter.emit("unknown-data", input);
148
}
149
}
150
}
151
```
152
153
### Error Transforms
154
155
#### ErrorTransform<ERR_IN, OUT>
156
157
Transform for handling error records from other stages.
158
159
```java { .api }
160
package io.cdap.cdap.etl.api;
161
162
public abstract class ErrorTransform<ERR_IN, OUT>
163
implements StageLifecycle<TransformContext>,
164
SubmitterLifecycle<StageSubmitterContext>,
165
PipelineConfigurable {
166
167
public static final String PLUGIN_TYPE = "errortransform";
168
169
public void initialize(TransformContext context) throws Exception {}
170
171
/**
172
* Transform error record.
173
*/
174
public abstract void transform(ErrorRecord<ERR_IN> input, Emitter<OUT> emitter)
175
throws Exception;
176
}
177
```
178
179
## Data Emission
180
181
### Emitter<T>
182
183
Primary interface for emitting data to next stage.
184
185
```java { .api }
186
package io.cdap.cdap.etl.api;
187
188
public interface Emitter<T> extends AlertEmitter, ErrorEmitter<T> {
189
/**
190
* Emit a record to the next stage.
191
*/
192
void emit(T value);
193
}
194
```
195
196
### ErrorEmitter<T>
197
198
Interface for emitting error records.
199
200
```java { .api }
201
package io.cdap.cdap.etl.api;
202
203
public interface ErrorEmitter<T> {
204
/**
205
* Emit an error record.
206
*/
207
void emitError(InvalidEntry<T> invalidEntry);
208
}
209
```
210
211
### AlertEmitter
212
213
Interface for emitting alerts.
214
215
```java { .api }
216
package io.cdap.cdap.etl.api;
217
218
public interface AlertEmitter {
219
/**
220
* Emit an alert with payload.
221
*/
222
void emitAlert(Map<String, String> payload);
223
}
224
```
225
226
### MultiOutputEmitter<E>
227
228
Emitter for multi-output transformations.
229
230
```java { .api }
231
package io.cdap.cdap.etl.api;
232
233
public interface MultiOutputEmitter<E> extends AlertEmitter, ErrorEmitter<Object> {
234
/**
235
* Emit to a specific output port.
236
*/
237
void emit(String port, E value);
238
}
239
```
240
241
## Aggregation Operations
242
243
### Aggregator<GROUP_KEY, GROUP_VALUE, OUT>
244
245
Interface for aggregation operations.
246
247
```java { .api }
248
package io.cdap.cdap.etl.api;
249
250
public interface Aggregator<GROUP_KEY, GROUP_VALUE, OUT> {
251
/**
252
* Emit group key for the group value.
253
*/
254
void groupBy(GROUP_VALUE groupValue, Emitter<GROUP_KEY> emitter) throws Exception;
255
256
/**
257
* Aggregate all values for a group key into output records.
258
*/
259
void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues,
260
Emitter<OUT> emitter) throws Exception;
261
}
262
```
263
264
**Usage Example:**
265
```java
266
public class SumAggregator implements Aggregator<String, StructuredRecord, StructuredRecord> {
267
268
@Override
269
public void groupBy(StructuredRecord groupValue, Emitter<String> emitter) throws Exception {
270
String groupKey = groupValue.get("department");
271
emitter.emit(groupKey);
272
}
273
274
@Override
275
public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues,
276
Emitter<StructuredRecord> emitter) throws Exception {
277
double sum = 0.0;
278
int count = 0;
279
280
while (groupValues.hasNext()) {
281
StructuredRecord record = groupValues.next();
282
Double salary = record.get("salary");
283
if (salary != null) {
284
sum += salary;
285
count++;
286
}
287
}
288
289
StructuredRecord result = StructuredRecord.builder(outputSchema)
290
.set("department", groupKey)
291
.set("total_salary", sum)
292
.set("employee_count", count)
293
.set("average_salary", count > 0 ? sum / count : 0.0)
294
.build();
295
296
emitter.emit(result);
297
}
298
}
299
```
300
301
### ReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT>
302
303
Aggregator with reducible intermediate values for better performance.
304
305
```java { .api }
306
package io.cdap.cdap.etl.api;
307
308
public interface ReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT> {
309
/**
310
* Emit group key for the group value.
311
*/
312
void groupBy(GROUP_VALUE groupValue, Emitter<GROUP_KEY> emitter) throws Exception;
313
314
/**
315
* Aggregate group values into intermediate aggregate values.
316
*/
317
void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues,
318
Emitter<AGGREGATE_VALUE> emitter) throws Exception;
319
320
/**
321
* Reduce intermediate aggregate values into final output.
322
*/
323
void reduce(GROUP_KEY groupKey, Iterator<AGGREGATE_VALUE> aggregateValues,
324
Emitter<OUT> emitter) throws Exception;
325
}
326
```
327
328
## Join Operations
329
330
### Joiner<JOIN_KEY, INPUT_RECORD, OUT>
331
332
Interface for join operations between multiple inputs.
333
334
```java { .api }
335
package io.cdap.cdap.etl.api;
336
337
public interface Joiner<JOIN_KEY, INPUT_RECORD, OUT> {
338
/**
339
* Get join keys for input record (deprecated - use getJoinKeys).
340
*/
341
@Deprecated
342
default JOIN_KEY joinOn(String stageName, INPUT_RECORD inputRecord) throws Exception {
343
throw new UnsupportedOperationException("joinOn method is deprecated");
344
}
345
346
/**
347
* Get collection of join keys for input record.
348
*/
349
default Collection<JOIN_KEY> getJoinKeys(String stageName, INPUT_RECORD inputRecord)
350
throws Exception {
351
JOIN_KEY key = joinOn(stageName, inputRecord);
352
return key == null ? Collections.emptySet() : Collections.singleton(key);
353
}
354
355
/**
356
* Get join configuration.
357
*/
358
JoinConfig getJoinConfig() throws Exception;
359
360
/**
361
* Merge records from join result.
362
*/
363
OUT merge(JOIN_KEY joinKey, Iterable<JoinElement<INPUT_RECORD>> joinResult)
364
throws Exception;
365
}
366
```
367
368
### JoinConfig
369
370
Configuration for join operations.
371
372
```java { .api }
373
package io.cdap.cdap.etl.api;
374
375
public class JoinConfig {
376
/**
377
* Create join config with required input stages.
378
*/
379
public JoinConfig(Iterable<String> requiredInputs) {}
380
381
/**
382
* Get required input stages for the join.
383
*/
384
public Iterable<String> getRequiredInputs() {}
385
}
386
```
387
388
### JoinElement<INPUT_RECORD>
389
390
Element in join result containing stage name and input record.
391
392
```java { .api }
393
package io.cdap.cdap.etl.api;
394
395
public class JoinElement<INPUT_RECORD> {
396
public JoinElement(String stageName, INPUT_RECORD inputRecord) {}
397
398
public String getStageName() {}
399
public INPUT_RECORD getInputRecord() {}
400
}
401
```
402
403
## Lifecycle Management
404
405
### StageLifecycle<T>
406
407
Interface for stage initialization and cleanup.
408
409
```java { .api }
410
package io.cdap.cdap.etl.api;
411
412
public interface StageLifecycle<T> extends Destroyable {
413
/**
414
* Initialize stage with runtime context.
415
*/
416
void initialize(T context) throws Exception;
417
}
418
```
419
420
### SubmitterLifecycle<T>
421
422
Interface for submission lifecycle management.
423
424
```java { .api }
425
package io.cdap.cdap.etl.api;
426
427
public interface SubmitterLifecycle<T> {
428
/**
429
* Prepare for pipeline run.
430
*/
431
void prepareRun(T context) throws Exception;
432
433
/**
434
* Handle run completion.
435
*/
436
void onRunFinish(boolean succeeded, T context);
437
}
438
```
439
440
### Destroyable
441
442
Interface for resource cleanup.
443
444
```java { .api }
445
package io.cdap.cdap.etl.api;
446
447
public interface Destroyable {
448
/**
449
* Cleanup resources.
450
*/
451
void destroy();
452
}
453
```
454
455
## Context Interfaces
456
457
### StageContext
458
459
Base runtime context for pipeline stages.
460
461
```java { .api }
462
package io.cdap.cdap.etl.api;
463
464
public interface StageContext extends RuntimeContext, PluginContext,
465
ServiceDiscoverer, FeatureFlagsProvider,
466
ConnectionConfigurable {
467
// Provides access to:
468
// - Runtime arguments and metrics
469
// - Plugin instantiation
470
// - Service discovery
471
// - Feature flags
472
// - Connection configuration
473
}
474
```
475
476
### TransformContext
477
478
Context for transform stages with lookup capabilities.
479
480
```java { .api }
481
package io.cdap.cdap.etl.api;
482
483
public interface TransformContext extends StageContext, LookupProvider {
484
// Inherits StageContext capabilities
485
// Adds lookup provider for data lookups
486
}
487
```
488
489
### StageSubmitterContext
490
491
Context for stage submission operations.
492
493
```java { .api }
494
package io.cdap.cdap.etl.api;
495
496
public interface StageSubmitterContext {
497
/**
498
* Get runtime arguments.
499
*/
500
Arguments getArguments();
501
502
/**
503
* Get stage metrics.
504
*/
505
StageMetrics getMetrics();
506
}
507
```
508
509
## Configuration
510
511
### PipelineConfigurable
512
513
Interface for pipeline configuration.
514
515
```java { .api }
516
package io.cdap.cdap.etl.api;
517
518
public interface PipelineConfigurable {
519
/**
520
* Configure the pipeline stage.
521
*/
522
void configurePipeline(PipelineConfigurer pipelineConfigurer);
523
}
524
```
525
526
### MultiInputPipelineConfigurable
527
528
Configuration interface for stages with multiple inputs.
529
530
```java { .api }
531
package io.cdap.cdap.etl.api;
532
533
public interface MultiInputPipelineConfigurable {
534
/**
535
* Configure multi-input pipeline stage.
536
*/
537
void configurePipeline(MultiInputPipelineConfigurer multiInputPipelineConfigurer);
538
}
539
```
540
541
### MultiOutputPipelineConfigurable
542
543
Configuration interface for stages with multiple outputs.
544
545
```java { .api }
546
package io.cdap.cdap.etl.api;
547
548
public interface MultiOutputPipelineConfigurable {
549
/**
550
* Configure multi-output pipeline stage.
551
*/
552
void configurePipeline(MultiOutputPipelineConfigurer multiOutputPipelineConfigurer);
553
}
554
```
555
556
## Error Handling
557
558
### ErrorRecord<T>
559
560
Represents an error record from pipeline execution.
561
562
```java { .api }
563
package io.cdap.cdap.etl.api;
564
565
public class ErrorRecord<T> {
566
public ErrorRecord(T record, String errorMessage, int stage) {}
567
public ErrorRecord(T record, String errorMessage) {}
568
569
public T getRecord() {}
570
public String getErrorMessage() {}
571
public int getStage() {}
572
}
573
```
574
575
### InvalidEntry<T>
576
577
Represents an invalid entry with error details.
578
579
```java { .api }
580
package io.cdap.cdap.etl.api;
581
582
public class InvalidEntry<T> {
583
public InvalidEntry(int errorCode, String errorMsg, T invalidRecord) {}
584
585
public int getErrorCode() {}
586
public String getErrorMsg() {}
587
public T getInvalidRecord() {}
588
}
589
```
590
591
## Specialized Transform Types
592
593
### SerializableTransform<IN, OUT>
594
595
Serializable transformation interface.
596
597
```java { .api }
598
package io.cdap.cdap.etl.api;
599
600
public interface SerializableTransform<IN, OUT>
601
extends Transformation<IN, OUT>, Serializable {
602
// Combines transformation with Java serialization
603
}
604
```
605
606
### ToKeyValueTransform<IN, KEY, VAL>
607
608
Transform input to key-value pairs.
609
610
```java { .api }
611
package io.cdap.cdap.etl.api;
612
613
public interface ToKeyValueTransform<IN, KEY, VAL> {
614
/**
615
* Transform input into key-value pairs.
616
*/
617
void transform(IN input, Emitter<KeyValue<KEY, VAL>> emitter) throws Exception;
618
}
619
```
620
621
### FromKeyValueTransform<KEY, VAL, OUT>
622
623
Transform from key-value pairs to output records.
624
625
```java { .api }
626
package io.cdap.cdap.etl.api;
627
628
public interface FromKeyValueTransform<KEY, VAL, OUT> {
629
/**
630
* Transform key-value pairs into output records.
631
*/
632
void transform(KeyValue<KEY, VAL> input, Emitter<OUT> emitter) throws Exception;
633
}
634
```