0
# Data Processing
1
2
CDAP provides comprehensive support for batch and streaming data processing through native integration with Apache MapReduce and Apache Spark. These processing frameworks enable scalable data transformation, analytics, and machine learning workflows.
3
4
## MapReduce Integration
5
6
CDAP's MapReduce integration provides a familiar programming model with enhanced features for dataset access, metrics collection, and operational control.
7
8
### MapReduce Program Definition
9
10
```java { .api }
11
import io.cdap.cdap.api.mapreduce.*;
12
import org.apache.hadoop.mapreduce.*;
13
14
// MapReduce program interface
15
public interface MapReduce extends ProgramLifecycle<MapReduceContext> {
16
void configure(MapReduceConfigurer configurer);
17
}
18
19
// Abstract MapReduce implementation
20
public abstract class AbstractMapReduce
21
extends AbstractPluginConfigurable<MapReduceConfigurer>
22
implements MapReduce {
23
24
@Override
25
public void initialize(MapReduceContext context) throws Exception {
26
// Initialize MapReduce resources
27
// Configure Hadoop Job here
28
}
29
30
@Override
31
public void destroy() {
32
// Cleanup MapReduce resources
33
}
34
}
35
36
// MapReduce configurer interface
37
public interface MapReduceConfigurer
38
extends DatasetConfigurer, ProgramConfigurer, PluginConfigurer {
39
40
void setDriverResources(Resources resources);
41
void setMapperResources(Resources resources);
42
void setReducerResources(Resources resources);
43
}
44
```
45
46
### MapReduce Context
47
48
```java { .api }
49
// MapReduce runtime context
50
public interface MapReduceContext
51
extends SchedulableProgramContext, RuntimeContext, DatasetContext,
52
ServiceDiscoverer, PluginContext, LineageRecorder {
53
54
// Hadoop Job configuration
55
Job getHadoopJob() throws IOException;
56
void setInput(Input input);
57
void addInput(Input input);
58
void setOutput(Output output);
59
void addOutput(Output output);
60
61
// Workflow integration
62
void write(String key, Object value);
63
64
// Resource access
65
Map<String, LocalizeResource> getResourcesToLocalize();
66
void localize(String name, URI uri);
67
void localize(String name, URI uri, boolean archive);
68
}
69
70
// Task-level context for mapper and reducer
71
public interface MapReduceTaskContext<KEYOUT, VALUEOUT>
72
extends SchedulableProgramContext, TaskLocalizationContext, RuntimeContext,
73
DatasetContext, ServiceDiscoverer, PluginContext {
74
75
// Hadoop context access
76
TaskAttemptContext getHadoopContext();
77
78
// Metrics and progress
79
void progress();
80
void setStatus(String msg);
81
Counter getCounter(String groupName, String counterName);
82
Counter getCounter(Enum<?> counterName);
83
}
84
```
85
86
### MapReduce Implementation Examples
87
88
```java { .api }
89
// Basic MapReduce program
90
public class WordCountMapReduce extends AbstractMapReduce {
91
92
@Override
93
public void configure(MapReduceConfigurer configurer) {
94
configurer.setName("WordCountMapReduce");
95
configurer.setDescription("Counts word occurrences in input data");
96
97
// Set resource requirements
98
configurer.setDriverResources(new Resources(512, 1));
99
configurer.setMapperResources(new Resources(1024, 1));
100
configurer.setReducerResources(new Resources(1024, 1));
101
}
102
103
@Override
104
public void initialize(MapReduceContext context) throws Exception {
105
Job job = context.getHadoopJob();
106
107
// Configure input and output
108
context.setInput(Input.ofDataset("input_text"));
109
context.setOutput(Output.ofDataset("word_counts"));
110
111
// Configure Hadoop job
112
job.setMapperClass(WordMapper.class);
113
job.setCombinerClass(WordReducer.class);
114
job.setReducerClass(WordReducer.class);
115
116
job.setMapOutputKeyClass(Text.class);
117
job.setMapOutputValueClass(IntWritable.class);
118
job.setOutputKeyClass(Text.class);
119
job.setOutputValueClass(IntWritable.class);
120
}
121
122
// Mapper implementation
123
public static class WordMapper extends Mapper<byte[], String, Text, IntWritable> {
124
125
private final static IntWritable one = new IntWritable(1);
126
private Text word = new Text();
127
128
@Override
129
protected void map(byte[] key, String value, Context context)
130
throws IOException, InterruptedException {
131
132
// Split text into words
133
String[] words = value.toLowerCase().split("\\s+");
134
for (String w : words) {
135
if (!w.isEmpty()) {
136
word.set(w);
137
context.write(word, one);
138
}
139
}
140
}
141
}
142
143
// Reducer implementation
144
public static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
145
146
private IntWritable result = new IntWritable();
147
148
@Override
149
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
150
throws IOException, InterruptedException {
151
152
int sum = 0;
153
for (IntWritable value : values) {
154
sum += value.get();
155
}
156
157
result.set(sum);
158
context.write(key, result);
159
}
160
}
161
}
162
163
// Advanced MapReduce with dataset operations
164
public class UserAnalyticsMapReduce extends AbstractMapReduce {
165
166
@Override
167
public void configure(MapReduceConfigurer configurer) {
168
configurer.setName("UserAnalyticsMapReduce");
169
configurer.setDescription("Analyzes user behavior patterns");
170
}
171
172
@Override
173
public void initialize(MapReduceContext context) throws Exception {
174
Job job = context.getHadoopJob();
175
176
// Multiple input datasets
177
context.addInput(Input.ofDataset("user_events"));
178
context.addInput(Input.ofDataset("user_profiles"));
179
180
// Multiple output datasets
181
context.addOutput(Output.ofDataset("user_stats"));
182
context.addOutput(Output.ofDataset("behavior_patterns"));
183
184
// Configure job
185
job.setMapperClass(UserAnalyticsMapper.class);
186
job.setReducerClass(UserAnalyticsReducer.class);
187
188
// Custom partitioning for better load distribution
189
job.setPartitionerClass(UserPartitioner.class);
190
job.setNumReduceTasks(10);
191
}
192
193
public static class UserAnalyticsMapper
194
extends Mapper<byte[], Row, Text, UserActivity> {
195
196
@Override
197
protected void map(byte[] key, Row row, Context context)
198
throws IOException, InterruptedException {
199
200
String userId = row.getString("user_id");
201
String eventType = row.getString("event_type");
202
long timestamp = row.getLong("timestamp");
203
204
UserActivity activity = new UserActivity(eventType, timestamp);
205
context.write(new Text(userId), activity);
206
}
207
}
208
209
public static class UserAnalyticsReducer
210
extends Reducer<Text, UserActivity, byte[], Put> {
211
212
@Override
213
protected void reduce(Text userId, Iterable<UserActivity> activities,
214
Context context) throws IOException, InterruptedException {
215
216
Map<String, Integer> eventCounts = new HashMap<>();
217
long firstActivity = Long.MAX_VALUE;
218
long lastActivity = Long.MIN_VALUE;
219
220
for (UserActivity activity : activities) {
221
String eventType = activity.getEventType();
222
eventCounts.put(eventType, eventCounts.getOrDefault(eventType, 0) + 1);
223
224
long timestamp = activity.getTimestamp();
225
firstActivity = Math.min(firstActivity, timestamp);
226
lastActivity = Math.max(lastActivity, timestamp);
227
}
228
229
// Create output record
230
byte[] rowKey = Bytes.toBytes(userId.toString());
231
Put put = new Put(rowKey);
232
put.add("stats", "first_activity", firstActivity);
233
put.add("stats", "last_activity", lastActivity);
234
put.add("stats", "session_duration", lastActivity - firstActivity);
235
236
for (Map.Entry<String, Integer> entry : eventCounts.entrySet()) {
237
put.add("events", entry.getKey(), entry.getValue());
238
}
239
240
context.write(rowKey, put);
241
}
242
}
243
}
244
```
245
246
### Input and Output Configuration
247
248
```java { .api }
249
// Input configuration for MapReduce
250
public class Input {
251
public static Input ofDataset(String datasetName) { /* dataset input */ }
252
public static Input ofDataset(String datasetName, Map<String, String> arguments) { /* dataset with args */ }
253
public static Input ofDataset(String datasetName, Split split) { /* dataset with split */ }
254
255
// Batch input configuration
256
public static BatchSource.Builder<?, ?, ?> batch() { /* batch input builder */ }
257
}
258
259
// Output configuration for MapReduce
260
public class Output {
261
public static Output ofDataset(String datasetName) { /* dataset output */ }
262
public static Output ofDataset(String datasetName, Map<String, String> arguments) { /* dataset with args */ }
263
264
// Batch output configuration
265
public static BatchSink.Builder<?, ?, ?> batch() { /* batch output builder */ }
266
}
267
268
// Split specification for parallel processing
269
public interface Split {
270
long getLength();
271
List<String> getLocations();
272
}
273
```
274
275
## Spark Integration
276
277
CDAP provides native Apache Spark integration supporting both batch and streaming processing with enhanced dataset access and operational features.
278
279
### Spark Program Definition
280
281
```java { .api }
282
import io.cdap.cdap.api.spark.*;
283
import org.apache.spark.api.java.JavaSparkContext;
284
import org.apache.spark.sql.SparkSession;
285
286
// Spark program interface
287
public interface Spark extends ProgramLifecycle<SparkClientContext> {
288
void configure(SparkConfigurer configurer);
289
}
290
291
// Abstract Spark implementation
292
public abstract class AbstractSpark
293
extends AbstractPluginConfigurable<SparkConfigurer>
294
implements Spark {
295
296
@Override
297
public void initialize(SparkClientContext context) throws Exception {
298
// Initialize Spark resources
299
}
300
301
@Override
302
public void destroy() {
303
// Cleanup Spark resources
304
}
305
}
306
307
// Spark configurer interface
308
public interface SparkConfigurer
309
extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
310
311
void setMainClassName(String className);
312
void setDriverResources(Resources resources);
313
void setExecutorResources(Resources resources);
314
void setNumExecutors(int numExecutors);
315
void setExecutorCores(int cores);
316
void setClientResources(Resources resources);
317
318
// Spark configuration
319
void setSparkConf(Map<String, String> sparkConf);
320
void addSparkConf(String key, String value);
321
}
322
```
323
324
### Spark Context
325
326
```java { .api }
327
// Spark client context
328
public interface SparkClientContext
329
extends SchedulableProgramContext, RuntimeContext, DatasetContext,
330
ServiceDiscoverer, PluginContext, LineageRecorder {
331
332
// Spark session access
333
SparkSession getSparkSession();
334
JavaSparkContext getOriginalSparkContext();
335
336
// Dataset integration
337
<K, V> JavaPairRDD<K, V> fromDataset(String datasetName);
338
<K, V> JavaPairRDD<K, V> fromDataset(String datasetName, Map<String, String> arguments);
339
<K, V> JavaPairRDD<K, V> fromDataset(String datasetName, Split... splits);
340
341
void saveAsDataset(JavaPairRDD<?, ?> rdd, String datasetName);
342
void saveAsDataset(JavaPairRDD<?, ?> rdd, String datasetName, Map<String, String> arguments);
343
344
// Localization
345
Map<String, LocalizeResource> getResourcesToLocalize();
346
void localize(String name, URI uri);
347
void localize(String name, URI uri, boolean archive);
348
}
349
```
350
351
### Spark Implementation Examples
352
353
```java { .api }
354
// Basic Spark program
355
public class DataTransformationSpark extends AbstractSpark {
356
357
@Override
358
public void configure(SparkConfigurer configurer) {
359
configurer.setName("DataTransformationSpark");
360
configurer.setDescription("Transforms raw data using Spark");
361
configurer.setMainClassName(DataTransformationSpark.class.getName());
362
363
// Resource configuration
364
configurer.setDriverResources(new Resources(1024, 2));
365
configurer.setExecutorResources(new Resources(2048, 2));
366
configurer.setNumExecutors(4);
367
368
// Spark configuration
369
configurer.addSparkConf("spark.sql.adaptive.enabled", "true");
370
configurer.addSparkConf("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
371
}
372
373
@Override
374
public void run(SparkClientContext context) throws Exception {
375
SparkSession spark = context.getSparkSession();
376
377
// Read data from CDAP dataset
378
JavaPairRDD<byte[], Row> inputRDD = context.fromDataset("raw_data");
379
380
// Transform to DataFrame for SQL operations
381
Dataset<Row> inputDF = spark.createDataFrame(
382
inputRDD.map(tuple -> {
383
Row row = tuple._2();
384
return RowFactory.create(
385
row.getString("id"),
386
row.getString("name"),
387
row.getLong("timestamp"),
388
row.getDouble("value")
389
);
390
}).rdd(),
391
DataTypes.createStructType(Arrays.asList(
392
DataTypes.createStructField("id", DataTypes.StringType, false),
393
DataTypes.createStructField("name", DataTypes.StringType, false),
394
DataTypes.createStructField("timestamp", DataTypes.LongType, false),
395
DataTypes.createStructField("value", DataTypes.DoubleType, false)
396
))
397
);
398
399
// Register temporary view for SQL
400
inputDF.createOrReplaceTempView("raw_data");
401
402
// Perform transformations using Spark SQL
403
Dataset<Row> transformedDF = spark.sql(
404
"SELECT " +
405
" id, " +
406
" name, " +
407
" DATE(FROM_UNIXTIME(timestamp/1000)) as date, " +
408
" ROUND(value * 1.1, 2) as adjusted_value, " +
409
" CASE " +
410
" WHEN value > 100 THEN 'high' " +
411
" WHEN value > 50 THEN 'medium' " +
412
" ELSE 'low' " +
413
" END as category " +
414
"FROM raw_data " +
415
"WHERE value IS NOT NULL AND value > 0"
416
);
417
418
// Convert back to RDD for dataset output
419
JavaPairRDD<byte[], Put> outputRDD = transformedDF.javaRDD().mapToPair(row -> {
420
String id = row.getAs("id");
421
byte[] key = Bytes.toBytes(id);
422
423
Put put = new Put(key);
424
put.add("data", "name", row.getAs("name"));
425
put.add("data", "date", row.getAs("date").toString());
426
put.add("data", "adjusted_value", row.getAs("adjusted_value"));
427
put.add("data", "category", row.getAs("category"));
428
429
return new Tuple2<>(key, put);
430
});
431
432
// Save to CDAP dataset
433
context.saveAsDataset(outputRDD, "transformed_data");
434
}
435
}
436
437
// Advanced Spark program with streaming
438
public class RealTimeAnalyticsSpark extends AbstractSpark {
439
440
@Override
441
public void configure(SparkConfigurer configurer) {
442
configurer.setName("RealTimeAnalyticsSpark");
443
configurer.setDescription("Real-time analytics using Spark Streaming");
444
configurer.setMainClassName(RealTimeAnalyticsSpark.class.getName());
445
446
// Streaming configuration
447
configurer.addSparkConf("spark.streaming.stopGracefullyOnShutdown", "true");
448
configurer.addSparkConf("spark.sql.streaming.checkpointLocation", "/tmp/spark-checkpoint");
449
}
450
451
@Override
452
public void run(SparkClientContext context) throws Exception {
453
SparkSession spark = context.getSparkSession();
454
455
// Read streaming data (example with Kafka integration)
456
Dataset<Row> streamingDF = spark
457
.readStream()
458
.format("kafka")
459
.option("kafka.bootstrap.servers", "localhost:9092")
460
.option("subscribe", "events")
461
.load();
462
463
// Parse JSON data
464
Dataset<Row> eventsDF = streamingDF
465
.select(from_json(col("value").cast("string"), getEventSchema()).as("event"))
466
.select("event.*")
467
.withWatermark("timestamp", "10 minutes");
468
469
// Perform windowed aggregations
470
Dataset<Row> aggregatedDF = eventsDF
471
.groupBy(
472
window(col("timestamp"), "5 minutes"),
473
col("event_type"),
474
col("user_id")
475
)
476
.agg(
477
count("*").as("event_count"),
478
avg("duration").as("avg_duration"),
479
max("duration").as("max_duration")
480
);
481
482
// Write results to CDAP dataset using foreachBatch
483
StreamingQuery query = aggregatedDF
484
.writeStream()
485
.outputMode("update")
486
.foreachBatch((Dataset<Row> batchDF, Long batchId) -> {
487
// Convert DataFrame to RDD for dataset output
488
JavaPairRDD<byte[], Put> outputRDD = batchDF.javaRDD().mapToPair(row -> {
489
String key = String.format("%s_%s_%d",
490
row.getAs("user_id"),
491
row.getAs("event_type"),
492
row.getStruct(0).getLong(0) // window start
493
);
494
495
Put put = new Put(Bytes.toBytes(key));
496
put.add("stats", "event_count", row.getAs("event_count"));
497
put.add("stats", "avg_duration", row.getAs("avg_duration"));
498
put.add("stats", "max_duration", row.getAs("max_duration"));
499
put.add("stats", "batch_id", batchId);
500
501
return new Tuple2<>(Bytes.toBytes(key), put);
502
});
503
504
context.saveAsDataset(outputRDD, "real_time_analytics");
505
})
506
.start();
507
508
// Wait for termination
509
query.awaitTermination();
510
}
511
512
private StructType getEventSchema() {
513
return DataTypes.createStructType(Arrays.asList(
514
DataTypes.createStructField("user_id", DataTypes.StringType, false),
515
DataTypes.createStructField("event_type", DataTypes.StringType, false),
516
DataTypes.createStructField("timestamp", DataTypes.TimestampType, false),
517
DataTypes.createStructField("duration", DataTypes.DoubleType, true)
518
));
519
}
520
}
521
```
522
523
### Spark SQL Integration
524
525
```java { .api }
526
// Using Spark SQL with CDAP datasets
527
public class SparkSQLAnalytics extends AbstractSpark {
528
529
@Override
530
public void run(SparkClientContext context) throws Exception {
531
SparkSession spark = context.getSparkSession();
532
533
// Load multiple datasets as DataFrames
534
Dataset<Row> usersDF = loadDatasetAsDF(context, spark, "users", getUserSchema());
535
Dataset<Row> ordersDF = loadDatasetAsDF(context, spark, "orders", getOrderSchema());
536
Dataset<Row> productsDF = loadDatasetAsDF(context, spark, "products", getProductSchema());
537
538
// Register as temporary views
539
usersDF.createOrReplaceTempView("users");
540
ordersDF.createOrReplaceTempView("orders");
541
productsDF.createOrReplaceTempView("products");
542
543
// Complex analytical queries
544
Dataset<Row> customerAnalytics = spark.sql(
545
"SELECT " +
546
" u.user_id, " +
547
" u.name, " +
548
" COUNT(o.order_id) as total_orders, " +
549
" SUM(o.total_amount) as total_spent, " +
550
" AVG(o.total_amount) as avg_order_value, " +
551
" COLLECT_LIST(DISTINCT p.category) as purchased_categories, " +
552
" DATEDIFF(CURRENT_DATE(), MAX(o.order_date)) as days_since_last_order " +
553
"FROM users u " +
554
"LEFT JOIN orders o ON u.user_id = o.user_id " +
555
"LEFT JOIN products p ON o.product_id = p.product_id " +
556
"GROUP BY u.user_id, u.name " +
557
"HAVING total_orders > 0 " +
558
"ORDER BY total_spent DESC"
559
);
560
561
// Save results
562
saveDataFrameAsDataset(context, customerAnalytics, "customer_analytics");
563
564
// Create customer segments
565
Dataset<Row> customerSegments = spark.sql(
566
"SELECT " +
567
" user_id, " +
568
" name, " +
569
" total_spent, " +
570
" CASE " +
571
" WHEN total_spent > 1000 THEN 'VIP' " +
572
" WHEN total_spent > 500 THEN 'Premium' " +
573
" WHEN total_spent > 100 THEN 'Regular' " +
574
" ELSE 'Basic' " +
575
" END as segment, " +
576
" CASE " +
577
" WHEN days_since_last_order <= 30 THEN 'Active' " +
578
" WHEN days_since_last_order <= 90 THEN 'At Risk' " +
579
" ELSE 'Churned' " +
580
" END as status " +
581
"FROM customer_analytics"
582
);
583
584
customerSegments.createOrReplaceTempView("customer_segments");
585
saveDataFrameAsDataset(context, customerSegments, "customer_segments");
586
}
587
588
private Dataset<Row> loadDatasetAsDF(SparkClientContext context, SparkSession spark,
589
String datasetName, StructType schema) {
590
JavaPairRDD<byte[], Row> rdd = context.fromDataset(datasetName);
591
return spark.createDataFrame(rdd.map(Tuple2::_2).rdd(), schema);
592
}
593
594
private void saveDataFrameAsDataset(SparkClientContext context, Dataset<Row> df,
595
String datasetName) {
596
JavaPairRDD<byte[], Put> outputRDD = df.javaRDD().mapToPair(row -> {
597
String key = row.getAs("user_id").toString();
598
Put put = new Put(Bytes.toBytes(key));
599
600
// Add all columns to the Put
601
for (String fieldName : row.schema().fieldNames()) {
602
Object value = row.getAs(fieldName);
603
if (value != null) {
604
put.add("data", fieldName, value.toString());
605
}
606
}
607
608
return new Tuple2<>(Bytes.toBytes(key), put);
609
});
610
611
context.saveAsDataset(outputRDD, datasetName);
612
}
613
}
614
```
615
616
## Custom Actions for Data Processing
617
618
Custom actions can be used in workflows for specialized data processing tasks:
619
620
```java { .api }
621
import io.cdap.cdap.api.customaction.*;
622
623
// Custom data processing action
624
public class DataQualityCheckAction extends AbstractCustomAction {
625
626
@Override
627
public void configure(CustomActionConfigurer configurer) {
628
configurer.setName("DataQualityCheck");
629
configurer.setDescription("Validates data quality and sets processing flags");
630
}
631
632
@Override
633
public void run(CustomActionContext context) throws Exception {
634
Table inputData = context.getDataset("raw_data");
635
WorkflowToken token = context.getWorkflowToken();
636
Metrics metrics = context.getMetrics();
637
638
// Perform comprehensive data quality checks
639
DataQualityResults results = performQualityChecks(inputData);
640
641
// Store results in workflow token for downstream processing
642
token.put("dq.total_records", String.valueOf(results.getTotalRecords()));
643
token.put("dq.valid_records", String.valueOf(results.getValidRecords()));
644
token.put("dq.error_rate", String.valueOf(results.getErrorRate()));
645
token.put("dq.null_rate", String.valueOf(results.getNullRate()));
646
token.put("dq.duplicate_rate", String.valueOf(results.getDuplicateRate()));
647
648
// Emit metrics for monitoring
649
metrics.gauge("data_quality.error_rate", results.getErrorRate());
650
metrics.gauge("data_quality.null_rate", results.getNullRate());
651
metrics.gauge("data_quality.duplicate_rate", results.getDuplicateRate());
652
metrics.count("data_quality.total_records", results.getTotalRecords());
653
654
// Determine if processing should continue
655
boolean canProceed = results.getErrorRate() < 0.05 &&
656
results.getNullRate() < 0.1 &&
657
results.getDuplicateRate() < 0.02;
658
659
token.put("dq.can_proceed", String.valueOf(canProceed));
660
661
if (!canProceed) {
662
// Optionally fail the action to stop workflow
663
throw new RuntimeException(
664
String.format("Data quality check failed: error_rate=%.2f%%, null_rate=%.2f%%, duplicate_rate=%.2f%%",
665
results.getErrorRate() * 100,
666
results.getNullRate() * 100,
667
results.getDuplicateRate() * 100)
668
);
669
}
670
}
671
672
private DataQualityResults performQualityChecks(Table inputData) {
673
// Implementation of data quality validation logic
674
return new DataQualityResults();
675
}
676
}
677
678
// Data processing coordination action
679
public class ProcessingCoordinatorAction extends AbstractCustomAction {
680
681
@Override
682
public void run(CustomActionContext context) throws Exception {
683
WorkflowToken token = context.getWorkflowToken();
684
685
// Read processing parameters from token
686
long totalRecords = Long.parseLong(token.get("dq.total_records").toString());
687
double errorRate = Double.parseDouble(token.get("dq.error_rate").toString());
688
689
// Determine optimal processing strategy
690
ProcessingStrategy strategy = determineStrategy(totalRecords, errorRate);
691
692
// Configure downstream processing
693
token.put("processing.strategy", strategy.name());
694
token.put("processing.batch_size", String.valueOf(strategy.getBatchSize()));
695
token.put("processing.parallel_level", String.valueOf(strategy.getParallelism()));
696
697
context.getMetrics().gauge("processing.batch_size", strategy.getBatchSize());
698
}
699
700
private ProcessingStrategy determineStrategy(long totalRecords, double errorRate) {
701
if (totalRecords > 10_000_000) {
702
return ProcessingStrategy.LARGE_BATCH;
703
} else if (totalRecords > 1_000_000) {
704
return ProcessingStrategy.MEDIUM_BATCH;
705
} else {
706
return ProcessingStrategy.SMALL_BATCH;
707
}
708
}
709
}
710
```
711
712
The CDAP data processing framework provides enterprise-grade capabilities for both batch and streaming data processing, with seamless integration between MapReduce, Spark, and CDAP's dataset and operational features.