0
# Batch Processing
1
2
Comprehensive batch processing capabilities including batch sources, sinks, aggregators, joiners, and post-actions for large-scale data processing in CDAP ETL pipelines.
3
4
## Batch Sources
5
6
### BatchSource<KEY_IN, VAL_IN, OUT>
7
8
Base abstract class for batch data sources that read from external systems.
9
10
```java { .api }
11
package io.cdap.cdap.etl.api.batch;
12
13
public abstract class BatchSource<KEY_IN, VAL_IN, OUT>
14
extends BatchConfigurable<BatchSourceContext>
15
implements Transformation<KeyValue<KEY_IN, VAL_IN>, OUT>,
16
StageLifecycle<BatchRuntimeContext> {
17
18
public static final String PLUGIN_TYPE = "batchsource";
19
public static final String FORMAT_PLUGIN_TYPE = "inputformat";
20
21
// Lifecycle methods
22
public void initialize(BatchRuntimeContext context) throws Exception {}
23
public void destroy() {}
24
25
// Data transformation
26
public void transform(KeyValue<KEY_IN, VAL_IN> input, Emitter<OUT> emitter)
27
throws Exception {}
28
}
29
```
30
31
**Usage Example:**
32
```java
33
@Plugin(type = BatchSource.PLUGIN_TYPE)
34
@Name("FileSource")
35
@Description("Reads data from files")
36
public class FileSource extends BatchSource<LongWritable, Text, StructuredRecord> {
37
38
private final Config config;
39
private Schema schema;
40
41
@Override
42
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
43
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
44
45
// Validate configuration
46
config.validate(stageConfigurer.getFailureCollector());
47
48
// Set output schema
49
schema = Schema.parseJson(config.schema);
50
stageConfigurer.setOutputSchema(schema);
51
}
52
53
@Override
54
public void prepareRun(BatchSourceContext context) throws Exception {
55
// Configure input format and input paths
56
Job job = context.getHadoopJob();
57
Configuration conf = job.getConfiguration();
58
59
TextInputFormat.setInputPaths(job, config.path);
60
context.setInput(Input.of(config.referenceName,
61
new SourceInputFormatProvider(TextInputFormat.class, conf)));
62
}
63
64
@Override
65
public void initialize(BatchRuntimeContext context) throws Exception {
66
schema = context.getOutputSchema();
67
}
68
69
@Override
70
public void transform(KeyValue<LongWritable, Text> input,
71
Emitter<StructuredRecord> emitter) throws Exception {
72
String line = input.getValue().toString();
73
String[] fields = line.split(config.delimiter);
74
75
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
76
List<Schema.Field> schemaFields = schema.getFields();
77
78
for (int i = 0; i < Math.min(fields.length, schemaFields.size()); i++) {
79
Schema.Field field = schemaFields.get(i);
80
builder.set(field.getName(), convertValue(fields[i], field.getSchema()));
81
}
82
83
emitter.emit(builder.build());
84
}
85
}
86
```
87
88
## Batch Sinks
89
90
### BatchSink<IN, KEY_OUT, VAL_OUT>
91
92
Base abstract class for batch data sinks that write to external systems.
93
94
```java { .api }
95
package io.cdap.cdap.etl.api.batch;
96
97
public abstract class BatchSink<IN, KEY_OUT, VAL_OUT>
98
extends BatchConfigurable<BatchSinkContext>
99
implements Transformation<IN, KeyValue<KEY_OUT, VAL_OUT>>,
100
StageLifecycle<BatchRuntimeContext> {
101
102
public static final String PLUGIN_TYPE = "batchsink";
103
public static final String FORMAT_PLUGIN_TYPE = "outputformat";
104
105
// Lifecycle methods
106
public void initialize(BatchRuntimeContext context) throws Exception {}
107
public void destroy() {}
108
109
// Data transformation
110
public void transform(IN input, Emitter<KeyValue<KEY_OUT, VAL_OUT>> emitter)
111
throws Exception {}
112
}
113
```
114
115
**Usage Example:**
116
```java
117
@Plugin(type = BatchSink.PLUGIN_TYPE)
118
@Name("FileSink")
119
@Description("Writes data to files")
120
public class FileSink extends BatchSink<StructuredRecord, NullWritable, Text> {
121
122
private final Config config;
123
124
@Override
125
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
126
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
127
config.validate(stageConfigurer.getFailureCollector());
128
}
129
130
@Override
131
public void prepareRun(BatchSinkContext context) throws Exception {
132
Job job = context.getHadoopJob();
133
Configuration conf = job.getConfiguration();
134
135
TextOutputFormat.setOutputPath(job, new Path(config.path));
136
context.addOutput(Output.of(config.referenceName,
137
new SinkOutputFormatProvider(TextOutputFormat.class, conf)));
138
}
139
140
@Override
141
public void transform(StructuredRecord input,
142
Emitter<KeyValue<NullWritable, Text>> emitter) throws Exception {
143
StringBuilder line = new StringBuilder();
144
145
List<Schema.Field> fields = input.getSchema().getFields();
146
for (int i = 0; i < fields.size(); i++) {
147
if (i > 0) line.append(config.delimiter);
148
149
Object value = input.get(fields.get(i).getName());
150
line.append(value != null ? value.toString() : "");
151
}
152
153
emitter.emit(new KeyValue<>(NullWritable.get(), new Text(line.toString())));
154
}
155
}
156
```
157
158
## Batch Configuration Base Classes
159
160
### BatchConfigurable<T>
161
162
Base abstract class for batch stage configuration providing common lifecycle methods.
163
164
```java { .api }
165
package io.cdap.cdap.etl.api.batch;
166
167
public abstract class BatchConfigurable<T>
168
implements PipelineConfigurable, SubmitterLifecycle<T> {
169
170
// Pipeline configuration
171
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}
172
173
// Submission lifecycle
174
public abstract void prepareRun(T context) throws Exception;
175
public void onRunFinish(boolean succeeded, T context) {}
176
}
177
```
178
179
### MultiInputBatchConfigurable<T>
180
181
Base class for batch stages with multiple inputs.
182
183
```java { .api }
184
package io.cdap.cdap.etl.api.batch;
185
186
public abstract class MultiInputBatchConfigurable<T>
187
implements MultiInputPipelineConfigurable, SubmitterLifecycle<T> {
188
189
public void configurePipeline(MultiInputPipelineConfigurer multiInputPipelineConfigurer) {}
190
public abstract void prepareRun(T context) throws Exception;
191
public void onRunFinish(boolean succeeded, T context) {}
192
}
193
```
194
195
## Batch Aggregators
196
197
### BatchAggregator<GROUP_KEY, GROUP_VALUE, OUT>
198
199
Batch implementation of aggregation operations.
200
201
```java { .api }
202
package io.cdap.cdap.etl.api.batch;
203
204
public abstract class BatchAggregator<GROUP_KEY, GROUP_VALUE, OUT>
205
extends BatchConfigurable<BatchAggregatorContext>
206
implements Aggregator<GROUP_KEY, GROUP_VALUE, OUT>,
207
StageLifecycle<BatchRuntimeContext> {
208
209
public static final String PLUGIN_TYPE = "batchaggregator";
210
211
// Lifecycle methods
212
public void initialize(BatchRuntimeContext context) throws Exception {}
213
public void destroy() {}
214
215
// Aggregation methods
216
public abstract void groupBy(GROUP_VALUE groupValue, Emitter<GROUP_KEY> emitter)
217
throws Exception;
218
public abstract void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues,
219
Emitter<OUT> emitter) throws Exception;
220
}
221
```
222
223
**Usage Example:**
224
```java
225
@Plugin(type = BatchAggregator.PLUGIN_TYPE)
226
@Name("SalesAggregator")
227
public class SalesAggregator extends BatchAggregator<String, StructuredRecord, StructuredRecord> {
228
229
private Schema outputSchema;
230
231
@Override
232
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
233
outputSchema = Schema.recordOf("sales_summary",
234
Schema.Field.of("region", Schema.of(Schema.Type.STRING)),
235
Schema.Field.of("total_sales", Schema.of(Schema.Type.DOUBLE)),
236
Schema.Field.of("order_count", Schema.of(Schema.Type.INT)),
237
Schema.Field.of("avg_order_value", Schema.of(Schema.Type.DOUBLE))
238
);
239
pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
240
}
241
242
@Override
243
public void prepareRun(BatchAggregatorContext context) throws Exception {
244
context.setNumPartitions(10); // Optimize parallelism
245
}
246
247
@Override
248
public void groupBy(StructuredRecord groupValue, Emitter<String> emitter) throws Exception {
249
String region = groupValue.get("region");
250
emitter.emit(region);
251
}
252
253
@Override
254
public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues,
255
Emitter<StructuredRecord> emitter) throws Exception {
256
double totalSales = 0.0;
257
int orderCount = 0;
258
259
while (groupValues.hasNext()) {
260
StructuredRecord record = groupValues.next();
261
Double sales = record.get("sales_amount");
262
if (sales != null) {
263
totalSales += sales;
264
orderCount++;
265
}
266
}
267
268
double avgOrderValue = orderCount > 0 ? totalSales / orderCount : 0.0;
269
270
StructuredRecord result = StructuredRecord.builder(outputSchema)
271
.set("region", groupKey)
272
.set("total_sales", totalSales)
273
.set("order_count", orderCount)
274
.set("avg_order_value", avgOrderValue)
275
.build();
276
277
emitter.emit(result);
278
}
279
}
280
```
281
282
### BatchReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT>
283
284
Batch aggregator with reducible intermediate values for improved performance.
285
286
```java { .api }
287
package io.cdap.cdap.etl.api.batch;
288
289
public abstract class BatchReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT>
290
extends BatchConfigurable<BatchAggregatorContext>
291
implements ReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT>,
292
StageLifecycle<BatchRuntimeContext> {
293
294
public static final String PLUGIN_TYPE = "batchaggregator";
295
}
296
```
297
298
## Batch Joiners
299
300
### BatchJoiner<JOIN_KEY, INPUT_RECORD, OUT>
301
302
Batch implementation of join operations.
303
304
```java { .api }
305
package io.cdap.cdap.etl.api.batch;
306
307
public abstract class BatchJoiner<JOIN_KEY, INPUT_RECORD, OUT>
308
extends BatchConfigurable<BatchJoinerContext>
309
implements Joiner<JOIN_KEY, INPUT_RECORD, OUT>,
310
StageLifecycle<BatchJoinerRuntimeContext> {
311
312
public static final String PLUGIN_TYPE = "batchjoiner";
313
314
// Lifecycle methods
315
public void initialize(BatchJoinerRuntimeContext context) throws Exception {}
316
public void destroy() {}
317
318
// Join methods
319
public abstract Collection<JOIN_KEY> getJoinKeys(String stageName, INPUT_RECORD inputRecord)
320
throws Exception;
321
public abstract JoinConfig getJoinConfig() throws Exception;
322
public abstract OUT merge(JOIN_KEY joinKey, Iterable<JoinElement<INPUT_RECORD>> joinResult)
323
throws Exception;
324
}
325
```
326
327
### BatchAutoJoiner
328
329
Auto-joiner implementation for batch processing with automatic join optimization.
330
331
```java { .api }
332
package io.cdap.cdap.etl.api.batch;
333
334
public abstract class BatchAutoJoiner
335
extends BatchConfigurable<BatchJoinerContext>
336
implements AutoJoiner, StageLifecycle<BatchJoinerRuntimeContext> {
337
338
public static final String PLUGIN_TYPE = "batchjoiner";
339
}
340
```
341
342
**Usage Example:**
343
```java
344
@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)
345
@Name("CustomerOrderJoiner")
346
public class CustomerOrderJoiner extends BatchAutoJoiner {
347
348
@Override
349
public JoinDefinition define(AutoJoinerContext context) {
350
return JoinDefinition.builder()
351
.select(Arrays.asList(
352
new JoinField("customers", "customer_id", "customer_id"),
353
new JoinField("customers", "name", "customer_name"),
354
new JoinField("orders", "order_id", "order_id"),
355
new JoinField("orders", "amount", "order_amount")
356
))
357
.from(Arrays.asList(
358
new JoinStage("customers", JoinType.REQUIRED,
359
Collections.emptyList(), true, false),
360
new JoinStage("orders", JoinType.OUTER,
361
Collections.emptyList(), false, false)
362
))
363
.on(JoinCondition.onKeys()
364
.addKey(new JoinKey("customers", Arrays.asList("customer_id")))
365
.addKey(new JoinKey("orders", Arrays.asList("customer_id"))))
366
.build();
367
}
368
}
369
```
370
371
## Batch Contexts
372
373
### BatchContext
374
375
Base context interface for batch operations.
376
377
```java { .api }
378
package io.cdap.cdap.etl.api.batch;
379
380
public interface BatchContext extends StageSubmitterContext {
381
// Base context for batch operations
382
// Inherits arguments and metrics from StageSubmitterContext
383
}
384
```
385
386
### BatchRuntimeContext
387
388
Runtime context for batch stages providing access to runtime services.
389
390
```java { .api }
391
package io.cdap.cdap.etl.api.batch;
392
393
public interface BatchRuntimeContext extends StageContext, LookupProvider {
394
// Combines stage context with lookup capabilities
395
// Provides access to:
396
// - Runtime arguments and metrics
397
// - Plugin instantiation
398
// - Service discovery
399
// - Data lookups
400
}
401
```
402
403
### BatchSourceContext
404
405
Context for batch source operations.
406
407
```java { .api }
408
package io.cdap.cdap.etl.api.batch;
409
410
public interface BatchSourceContext extends BatchContext {
411
/**
412
* Set input for the batch source.
413
*/
414
void setInput(Input input);
415
416
/**
417
* Check if preview mode is enabled.
418
*/
419
boolean isPreviewEnabled();
420
421
/**
422
* Set error dataset for invalid records.
423
*/
424
void setErrorDataset(String errorDatasetName);
425
}
426
```
427
428
### BatchSinkContext
429
430
Context for batch sink operations.
431
432
```java { .api }
433
package io.cdap.cdap.etl.api.batch;
434
435
public interface BatchSinkContext extends BatchContext {
436
/**
437
* Add output for the batch sink.
438
*/
439
void addOutput(Output output);
440
441
/**
442
* Add named output for the batch sink.
443
*/
444
void addOutput(String outputName, Output output);
445
446
/**
447
* Check if preview mode is enabled.
448
*/
449
boolean isPreviewEnabled();
450
}
451
```
452
453
### BatchAggregatorContext
454
455
Context for batch aggregator operations.
456
457
```java { .api }
458
package io.cdap.cdap.etl.api.batch;
459
460
public interface BatchAggregatorContext extends BatchContext {
461
/**
462
* Set number of partitions for aggregation.
463
*/
464
void setNumPartitions(int numPartitions);
465
466
/**
467
* Set memory for group-by operations.
468
*/
469
void setGroupByMemoryMB(int memoryMB);
470
}
471
```
472
473
### BatchJoinerContext
474
475
Context for batch joiner operations.
476
477
```java { .api }
478
package io.cdap.cdap.etl.api.batch;
479
480
public interface BatchJoinerContext extends BatchContext {
481
/**
482
* Set number of partitions for join operations.
483
*/
484
void setNumPartitions(int numPartitions);
485
}
486
```
487
488
### BatchJoinerRuntimeContext
489
490
Runtime context for batch joiner operations.
491
492
```java { .api }
493
package io.cdap.cdap.etl.api.batch;
494
495
public interface BatchJoinerRuntimeContext extends BatchRuntimeContext {
496
// Combines batch runtime context for join operations
497
}
498
```
499
500
## Post Actions
501
502
### PostAction
503
504
Abstract class for post-execution actions in batch pipelines.
505
506
```java { .api }
507
package io.cdap.cdap.etl.api.batch;
508
509
public abstract class PostAction
510
extends BatchConfigurable<BatchActionContext>
511
implements StageLifecycle<BatchActionContext> {
512
513
public static final String PLUGIN_TYPE = "postaction";
514
515
// Lifecycle methods
516
public void initialize(BatchActionContext context) throws Exception {}
517
public void destroy() {}
518
519
// Action execution
520
public abstract void run() throws Exception;
521
}
522
```
523
524
**Usage Example:**
525
```java
526
@Plugin(type = PostAction.PLUGIN_TYPE)
527
@Name("EmailNotification")
528
@Description("Sends email notification after pipeline completion")
529
public class EmailNotificationAction extends PostAction {
530
531
private final Config config;
532
533
@Override
534
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
535
config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
536
}
537
538
@Override
539
public void prepareRun(BatchActionContext context) throws Exception {
540
// Prepare email configuration
541
}
542
543
@Override
544
public void run() throws Exception {
545
// Send email notification
546
EmailService emailService = new EmailService(config.smtpServer, config.port);
547
548
String subject = "Pipeline Execution Completed";
549
String body = String.format("Pipeline '%s' completed successfully at %s",
550
config.pipelineName, new Date());
551
552
emailService.sendEmail(config.recipients, subject, body);
553
}
554
}
555
```
556
557
## Batch Connectors
558
559
### BatchConnector
560
561
Base class for batch connectors.
562
563
```java { .api }
564
package io.cdap.cdap.etl.api.batch;
565
566
public abstract class BatchConnector extends BatchConfigurable<BatchContext> {
567
public static final String PLUGIN_TYPE = "batchconnector";
568
}
569
```
570
571
## Performance Optimization
572
573
### Partitioning Control
574
575
```java
576
// In aggregator context
577
@Override
578
public void prepareRun(BatchAggregatorContext context) throws Exception {
579
// Set optimal number of partitions based on data size
580
context.setNumPartitions(calculateOptimalPartitions());
581
582
// Set memory for group-by operations
583
context.setGroupByMemoryMB(2048);
584
}
585
```
586
587
### Memory Management
588
589
```java
590
// Configure memory for aggregation operations
591
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
592
// Set stage properties for memory optimization
593
Map<String, String> properties = new HashMap<>();
594
properties.put("spark.executor.memory", "4g");
595
properties.put("spark.sql.shuffle.partitions", "200");
596
597
pipelineConfigurer.getStageConfigurer().addProperties(properties);
598
}
599
```
600
601
### Input/Output Optimization
602
603
```java
604
// Optimize input format configuration
605
@Override
606
public void prepareRun(BatchSourceContext context) throws Exception {
607
Job job = context.getHadoopJob();
608
Configuration conf = job.getConfiguration();
609
610
// Enable compression
611
conf.setBoolean("mapreduce.map.output.compress", true);
612
conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
613
614
// Set optimal split size
615
conf.setLong("mapreduce.input.fileinputformat.split.maxsize", 128 * 1024 * 1024); // 128MB
616
617
context.setInput(Input.of(config.referenceName,
618
new SourceInputFormatProvider(inputFormatClass, conf)));
619
}
620
```
621
622
## Error Handling in Batch Operations
623
624
### Error Dataset Configuration
625
626
```java
627
@Override
628
public void prepareRun(BatchSourceContext context) throws Exception {
629
// Configure error dataset for invalid records
630
if (config.errorDataset != null) {
631
context.setErrorDataset(config.errorDataset);
632
}
633
634
// Set input with error handling
635
context.setInput(Input.of(config.referenceName, inputFormatProvider));
636
}
637
```
638
639
### Error Record Processing
640
641
```java
642
@Override
643
public void transform(KeyValue<LongWritable, Text> input,
644
Emitter<StructuredRecord> emitter) throws Exception {
645
try {
646
StructuredRecord record = parseRecord(input.getValue().toString());
647
emitter.emit(record);
648
} catch (Exception e) {
649
// Emit error record instead of failing the entire pipeline
650
ErrorRecord<String> errorRecord = new ErrorRecord<>(
651
input.getValue().toString(),
652
"Failed to parse record: " + e.getMessage()
653
);
654
emitter.emitError(new InvalidEntry<>(400, errorRecord.getErrorMessage(), errorRecord));
655
}
656
}
657
```