or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-framework.mddata-management.mddata-processing.mdindex.mdoperational.mdplugin-system.mdsecurity-metadata.md

data-processing.mddocs/

0

# Data Processing

1

2

CDAP provides comprehensive support for batch and streaming data processing through native integration with Apache MapReduce and Apache Spark. These processing frameworks enable scalable data transformation, analytics, and machine learning workflows.

3

4

## MapReduce Integration

5

6

CDAP's MapReduce integration provides a familiar programming model with enhanced features for dataset access, metrics collection, and operational control.

7

8

### MapReduce Program Definition

9

10

```java { .api }

11

import io.cdap.cdap.api.mapreduce.*;

12

import org.apache.hadoop.mapreduce.*;

13

14

// MapReduce program interface

15

public interface MapReduce extends ProgramLifecycle<MapReduceContext> {

16

void configure(MapReduceConfigurer configurer);

17

}

18

19

// Abstract MapReduce implementation

20

public abstract class AbstractMapReduce

21

extends AbstractPluginConfigurable<MapReduceConfigurer>

22

implements MapReduce {

23

24

@Override

25

public void initialize(MapReduceContext context) throws Exception {

26

// Initialize MapReduce resources

27

// Configure Hadoop Job here

28

}

29

30

@Override

31

public void destroy() {

32

// Cleanup MapReduce resources

33

}

34

}

35

36

// MapReduce configurer interface

37

public interface MapReduceConfigurer

38

extends DatasetConfigurer, ProgramConfigurer, PluginConfigurer {

39

40

void setDriverResources(Resources resources);

41

void setMapperResources(Resources resources);

42

void setReducerResources(Resources resources);

43

}

44

```

45

46

### MapReduce Context

47

48

```java { .api }

49

// MapReduce runtime context

50

public interface MapReduceContext

51

extends SchedulableProgramContext, RuntimeContext, DatasetContext,

52

ServiceDiscoverer, PluginContext, LineageRecorder {

53

54

// Hadoop Job configuration

55

Job getHadoopJob() throws IOException;

56

void setInput(Input input);

57

void addInput(Input input);

58

void setOutput(Output output);

59

void addOutput(Output output);

60

61

// Workflow integration

62

void write(String key, Object value);

63

64

// Resource access

65

Map<String, LocalizeResource> getResourcesToLocalize();

66

void localize(String name, URI uri);

67

void localize(String name, URI uri, boolean archive);

68

}

69

70

// Task-level context for mapper and reducer

71

public interface MapReduceTaskContext<KEYOUT, VALUEOUT>

72

extends SchedulableProgramContext, TaskLocalizationContext, RuntimeContext,

73

DatasetContext, ServiceDiscoverer, PluginContext {

74

75

// Hadoop context access

76

TaskAttemptContext getHadoopContext();

77

78

// Metrics and progress

79

void progress();

80

void setStatus(String msg);

81

Counter getCounter(String groupName, String counterName);

82

Counter getCounter(Enum<?> counterName);

83

}

84

```

85

86

### MapReduce Implementation Examples

87

88

```java { .api }

89

// Basic MapReduce program

90

public class WordCountMapReduce extends AbstractMapReduce {

91

92

@Override

93

public void configure(MapReduceConfigurer configurer) {

94

configurer.setName("WordCountMapReduce");

95

configurer.setDescription("Counts word occurrences in input data");

96

97

// Set resource requirements

98

configurer.setDriverResources(new Resources(512, 1));

99

configurer.setMapperResources(new Resources(1024, 1));

100

configurer.setReducerResources(new Resources(1024, 1));

101

}

102

103

@Override

104

public void initialize(MapReduceContext context) throws Exception {

105

Job job = context.getHadoopJob();

106

107

// Configure input and output

108

context.setInput(Input.ofDataset("input_text"));

109

context.setOutput(Output.ofDataset("word_counts"));

110

111

// Configure Hadoop job

112

job.setMapperClass(WordMapper.class);

113

job.setCombinerClass(WordReducer.class);

114

job.setReducerClass(WordReducer.class);

115

116

job.setMapOutputKeyClass(Text.class);

117

job.setMapOutputValueClass(IntWritable.class);

118

job.setOutputKeyClass(Text.class);

119

job.setOutputValueClass(IntWritable.class);

120

}

121

122

// Mapper implementation

123

public static class WordMapper extends Mapper<byte[], String, Text, IntWritable> {

124

125

private final static IntWritable one = new IntWritable(1);

126

private Text word = new Text();

127

128

@Override

129

protected void map(byte[] key, String value, Context context)

130

throws IOException, InterruptedException {

131

132

// Split text into words

133

String[] words = value.toLowerCase().split("\\s+");

134

for (String w : words) {

135

if (!w.isEmpty()) {

136

word.set(w);

137

context.write(word, one);

138

}

139

}

140

}

141

}

142

143

// Reducer implementation

144

public static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

145

146

private IntWritable result = new IntWritable();

147

148

@Override

149

protected void reduce(Text key, Iterable<IntWritable> values, Context context)

150

throws IOException, InterruptedException {

151

152

int sum = 0;

153

for (IntWritable value : values) {

154

sum += value.get();

155

}

156

157

result.set(sum);

158

context.write(key, result);

159

}

160

}

161

}

162

163

// Advanced MapReduce with dataset operations

164

public class UserAnalyticsMapReduce extends AbstractMapReduce {

165

166

@Override

167

public void configure(MapReduceConfigurer configurer) {

168

configurer.setName("UserAnalyticsMapReduce");

169

configurer.setDescription("Analyzes user behavior patterns");

170

}

171

172

@Override

173

public void initialize(MapReduceContext context) throws Exception {

174

Job job = context.getHadoopJob();

175

176

// Multiple input datasets

177

context.addInput(Input.ofDataset("user_events"));

178

context.addInput(Input.ofDataset("user_profiles"));

179

180

// Multiple output datasets

181

context.addOutput(Output.ofDataset("user_stats"));

182

context.addOutput(Output.ofDataset("behavior_patterns"));

183

184

// Configure job

185

job.setMapperClass(UserAnalyticsMapper.class);

186

job.setReducerClass(UserAnalyticsReducer.class);

187

188

// Custom partitioning for better load distribution

189

job.setPartitionerClass(UserPartitioner.class);

190

job.setNumReduceTasks(10);

191

}

192

193

public static class UserAnalyticsMapper

194

extends Mapper<byte[], Row, Text, UserActivity> {

195

196

@Override

197

protected void map(byte[] key, Row row, Context context)

198

throws IOException, InterruptedException {

199

200

String userId = row.getString("user_id");

201

String eventType = row.getString("event_type");

202

long timestamp = row.getLong("timestamp");

203

204

UserActivity activity = new UserActivity(eventType, timestamp);

205

context.write(new Text(userId), activity);

206

}

207

}

208

209

public static class UserAnalyticsReducer

210

extends Reducer<Text, UserActivity, byte[], Put> {

211

212

@Override

213

protected void reduce(Text userId, Iterable<UserActivity> activities,

214

Context context) throws IOException, InterruptedException {

215

216

Map<String, Integer> eventCounts = new HashMap<>();

217

long firstActivity = Long.MAX_VALUE;

218

long lastActivity = Long.MIN_VALUE;

219

220

for (UserActivity activity : activities) {

221

String eventType = activity.getEventType();

222

eventCounts.put(eventType, eventCounts.getOrDefault(eventType, 0) + 1);

223

224

long timestamp = activity.getTimestamp();

225

firstActivity = Math.min(firstActivity, timestamp);

226

lastActivity = Math.max(lastActivity, timestamp);

227

}

228

229

// Create output record

230

byte[] rowKey = Bytes.toBytes(userId.toString());

231

Put put = new Put(rowKey);

232

put.add("stats", "first_activity", firstActivity);

233

put.add("stats", "last_activity", lastActivity);

234

put.add("stats", "session_duration", lastActivity - firstActivity);

235

236

for (Map.Entry<String, Integer> entry : eventCounts.entrySet()) {

237

put.add("events", entry.getKey(), entry.getValue());

238

}

239

240

context.write(rowKey, put);

241

}

242

}

243

}

244

```

245

246

### Input and Output Configuration

247

248

```java { .api }

249

// Input configuration for MapReduce

250

public class Input {

251

public static Input ofDataset(String datasetName) { /* dataset input */ }

252

public static Input ofDataset(String datasetName, Map<String, String> arguments) { /* dataset with args */ }

253

public static Input ofDataset(String datasetName, Split split) { /* dataset with split */ }

254

255

// Batch input configuration

256

public static BatchSource.Builder<?, ?, ?> batch() { /* batch input builder */ }

257

}

258

259

// Output configuration for MapReduce

260

public class Output {

261

public static Output ofDataset(String datasetName) { /* dataset output */ }

262

public static Output ofDataset(String datasetName, Map<String, String> arguments) { /* dataset with args */ }

263

264

// Batch output configuration

265

public static BatchSink.Builder<?, ?, ?> batch() { /* batch output builder */ }

266

}

267

268

// Split specification for parallel processing

269

public interface Split {

270

long getLength();

271

List<String> getLocations();

272

}

273

```

274

275

## Spark Integration

276

277

CDAP provides native Apache Spark integration supporting both batch and streaming processing with enhanced dataset access and operational features.

278

279

### Spark Program Definition

280

281

```java { .api }

282

import io.cdap.cdap.api.spark.*;

283

import org.apache.spark.api.java.JavaSparkContext;

284

import org.apache.spark.sql.SparkSession;

285

286

// Spark program interface

287

public interface Spark extends ProgramLifecycle<SparkClientContext> {

288

void configure(SparkConfigurer configurer);

289

}

290

291

// Abstract Spark implementation

292

public abstract class AbstractSpark

293

extends AbstractPluginConfigurable<SparkConfigurer>

294

implements Spark {

295

296

@Override

297

public void initialize(SparkClientContext context) throws Exception {

298

// Initialize Spark resources

299

}

300

301

@Override

302

public void destroy() {

303

// Cleanup Spark resources

304

}

305

}

306

307

// Spark configurer interface

308

public interface SparkConfigurer

309

extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {

310

311

void setMainClassName(String className);

312

void setDriverResources(Resources resources);

313

void setExecutorResources(Resources resources);

314

void setNumExecutors(int numExecutors);

315

void setExecutorCores(int cores);

316

void setClientResources(Resources resources);

317

318

// Spark configuration

319

void setSparkConf(Map<String, String> sparkConf);

320

void addSparkConf(String key, String value);

321

}

322

```

323

324

### Spark Context

325

326

```java { .api }

327

// Spark client context

328

public interface SparkClientContext

329

extends SchedulableProgramContext, RuntimeContext, DatasetContext,

330

ServiceDiscoverer, PluginContext, LineageRecorder {

331

332

// Spark session access

333

SparkSession getSparkSession();

334

JavaSparkContext getOriginalSparkContext();

335

336

// Dataset integration

337

<K, V> JavaPairRDD<K, V> fromDataset(String datasetName);

338

<K, V> JavaPairRDD<K, V> fromDataset(String datasetName, Map<String, String> arguments);

339

<K, V> JavaPairRDD<K, V> fromDataset(String datasetName, Split... splits);

340

341

void saveAsDataset(JavaPairRDD<?, ?> rdd, String datasetName);

342

void saveAsDataset(JavaPairRDD<?, ?> rdd, String datasetName, Map<String, String> arguments);

343

344

// Localization

345

Map<String, LocalizeResource> getResourcesToLocalize();

346

void localize(String name, URI uri);

347

void localize(String name, URI uri, boolean archive);

348

}

349

```

350

351

### Spark Implementation Examples

352

353

```java { .api }

354

// Basic Spark program

355

public class DataTransformationSpark extends AbstractSpark {

356

357

@Override

358

public void configure(SparkConfigurer configurer) {

359

configurer.setName("DataTransformationSpark");

360

configurer.setDescription("Transforms raw data using Spark");

361

configurer.setMainClassName(DataTransformationSpark.class.getName());

362

363

// Resource configuration

364

configurer.setDriverResources(new Resources(1024, 2));

365

configurer.setExecutorResources(new Resources(2048, 2));

366

configurer.setNumExecutors(4);

367

368

// Spark configuration

369

configurer.addSparkConf("spark.sql.adaptive.enabled", "true");

370

configurer.addSparkConf("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

371

}

372

373

@Override

374

public void run(SparkClientContext context) throws Exception {

375

SparkSession spark = context.getSparkSession();

376

377

// Read data from CDAP dataset

378

JavaPairRDD<byte[], Row> inputRDD = context.fromDataset("raw_data");

379

380

// Transform to DataFrame for SQL operations

381

Dataset<Row> inputDF = spark.createDataFrame(

382

inputRDD.map(tuple -> {

383

Row row = tuple._2();

384

return RowFactory.create(

385

row.getString("id"),

386

row.getString("name"),

387

row.getLong("timestamp"),

388

row.getDouble("value")

389

);

390

}).rdd(),

391

DataTypes.createStructType(Arrays.asList(

392

DataTypes.createStructField("id", DataTypes.StringType, false),

393

DataTypes.createStructField("name", DataTypes.StringType, false),

394

DataTypes.createStructField("timestamp", DataTypes.LongType, false),

395

DataTypes.createStructField("value", DataTypes.DoubleType, false)

396

))

397

);

398

399

// Register temporary view for SQL

400

inputDF.createOrReplaceTempView("raw_data");

401

402

// Perform transformations using Spark SQL

403

Dataset<Row> transformedDF = spark.sql(

404

"SELECT " +

405

" id, " +

406

" name, " +

407

" DATE(FROM_UNIXTIME(timestamp/1000)) as date, " +

408

" ROUND(value * 1.1, 2) as adjusted_value, " +

409

" CASE " +

410

" WHEN value > 100 THEN 'high' " +

411

" WHEN value > 50 THEN 'medium' " +

412

" ELSE 'low' " +

413

" END as category " +

414

"FROM raw_data " +

415

"WHERE value IS NOT NULL AND value > 0"

416

);

417

418

// Convert back to RDD for dataset output

419

JavaPairRDD<byte[], Put> outputRDD = transformedDF.javaRDD().mapToPair(row -> {

420

String id = row.getAs("id");

421

byte[] key = Bytes.toBytes(id);

422

423

Put put = new Put(key);

424

put.add("data", "name", row.getAs("name"));

425

put.add("data", "date", row.getAs("date").toString());

426

put.add("data", "adjusted_value", row.getAs("adjusted_value"));

427

put.add("data", "category", row.getAs("category"));

428

429

return new Tuple2<>(key, put);

430

});

431

432

// Save to CDAP dataset

433

context.saveAsDataset(outputRDD, "transformed_data");

434

}

435

}

436

437

// Advanced Spark program with streaming

438

public class RealTimeAnalyticsSpark extends AbstractSpark {

439

440

@Override

441

public void configure(SparkConfigurer configurer) {

442

configurer.setName("RealTimeAnalyticsSpark");

443

configurer.setDescription("Real-time analytics using Spark Streaming");

444

configurer.setMainClassName(RealTimeAnalyticsSpark.class.getName());

445

446

// Streaming configuration

447

configurer.addSparkConf("spark.streaming.stopGracefullyOnShutdown", "true");

448

configurer.addSparkConf("spark.sql.streaming.checkpointLocation", "/tmp/spark-checkpoint");

449

}

450

451

@Override

452

public void run(SparkClientContext context) throws Exception {

453

SparkSession spark = context.getSparkSession();

454

455

// Read streaming data (example with Kafka integration)

456

Dataset<Row> streamingDF = spark

457

.readStream()

458

.format("kafka")

459

.option("kafka.bootstrap.servers", "localhost:9092")

460

.option("subscribe", "events")

461

.load();

462

463

// Parse JSON data

464

Dataset<Row> eventsDF = streamingDF

465

.select(from_json(col("value").cast("string"), getEventSchema()).as("event"))

466

.select("event.*")

467

.withWatermark("timestamp", "10 minutes");

468

469

// Perform windowed aggregations

470

Dataset<Row> aggregatedDF = eventsDF

471

.groupBy(

472

window(col("timestamp"), "5 minutes"),

473

col("event_type"),

474

col("user_id")

475

)

476

.agg(

477

count("*").as("event_count"),

478

avg("duration").as("avg_duration"),

479

max("duration").as("max_duration")

480

);

481

482

// Write results to CDAP dataset using foreachBatch

483

StreamingQuery query = aggregatedDF

484

.writeStream()

485

.outputMode("update")

486

.foreachBatch((Dataset<Row> batchDF, Long batchId) -> {

487

// Convert DataFrame to RDD for dataset output

488

JavaPairRDD<byte[], Put> outputRDD = batchDF.javaRDD().mapToPair(row -> {

489

String key = String.format("%s_%s_%d",

490

row.getAs("user_id"),

491

row.getAs("event_type"),

492

row.getStruct(0).getLong(0) // window start

493

);

494

495

Put put = new Put(Bytes.toBytes(key));

496

put.add("stats", "event_count", row.getAs("event_count"));

497

put.add("stats", "avg_duration", row.getAs("avg_duration"));

498

put.add("stats", "max_duration", row.getAs("max_duration"));

499

put.add("stats", "batch_id", batchId);

500

501

return new Tuple2<>(Bytes.toBytes(key), put);

502

});

503

504

context.saveAsDataset(outputRDD, "real_time_analytics");

505

})

506

.start();

507

508

// Wait for termination

509

query.awaitTermination();

510

}

511

512

private StructType getEventSchema() {

513

return DataTypes.createStructType(Arrays.asList(

514

DataTypes.createStructField("user_id", DataTypes.StringType, false),

515

DataTypes.createStructField("event_type", DataTypes.StringType, false),

516

DataTypes.createStructField("timestamp", DataTypes.TimestampType, false),

517

DataTypes.createStructField("duration", DataTypes.DoubleType, true)

518

));

519

}

520

}

521

```

522

523

### Spark SQL Integration

524

525

```java { .api }

526

// Using Spark SQL with CDAP datasets

527

public class SparkSQLAnalytics extends AbstractSpark {

528

529

@Override

530

public void run(SparkClientContext context) throws Exception {

531

SparkSession spark = context.getSparkSession();

532

533

// Load multiple datasets as DataFrames

534

Dataset<Row> usersDF = loadDatasetAsDF(context, spark, "users", getUserSchema());

535

Dataset<Row> ordersDF = loadDatasetAsDF(context, spark, "orders", getOrderSchema());

536

Dataset<Row> productsDF = loadDatasetAsDF(context, spark, "products", getProductSchema());

537

538

// Register as temporary views

539

usersDF.createOrReplaceTempView("users");

540

ordersDF.createOrReplaceTempView("orders");

541

productsDF.createOrReplaceTempView("products");

542

543

// Complex analytical queries

544

Dataset<Row> customerAnalytics = spark.sql(

545

"SELECT " +

546

" u.user_id, " +

547

" u.name, " +

548

" COUNT(o.order_id) as total_orders, " +

549

" SUM(o.total_amount) as total_spent, " +

550

" AVG(o.total_amount) as avg_order_value, " +

551

" COLLECT_LIST(DISTINCT p.category) as purchased_categories, " +

552

" DATEDIFF(CURRENT_DATE(), MAX(o.order_date)) as days_since_last_order " +

553

"FROM users u " +

554

"LEFT JOIN orders o ON u.user_id = o.user_id " +

555

"LEFT JOIN products p ON o.product_id = p.product_id " +

556

"GROUP BY u.user_id, u.name " +

557

"HAVING total_orders > 0 " +

558

"ORDER BY total_spent DESC"

559

);

560

561

// Save results

562

saveDataFrameAsDataset(context, customerAnalytics, "customer_analytics");

563

564

// Create customer segments

565

Dataset<Row> customerSegments = spark.sql(

566

"SELECT " +

567

" user_id, " +

568

" name, " +

569

" total_spent, " +

570

" CASE " +

571

" WHEN total_spent > 1000 THEN 'VIP' " +

572

" WHEN total_spent > 500 THEN 'Premium' " +

573

" WHEN total_spent > 100 THEN 'Regular' " +

574

" ELSE 'Basic' " +

575

" END as segment, " +

576

" CASE " +

577

" WHEN days_since_last_order <= 30 THEN 'Active' " +

578

" WHEN days_since_last_order <= 90 THEN 'At Risk' " +

579

" ELSE 'Churned' " +

580

" END as status " +

581

"FROM customer_analytics"

582

);

583

584

customerSegments.createOrReplaceTempView("customer_segments");

585

saveDataFrameAsDataset(context, customerSegments, "customer_segments");

586

}

587

588

private Dataset<Row> loadDatasetAsDF(SparkClientContext context, SparkSession spark,

589

String datasetName, StructType schema) {

590

JavaPairRDD<byte[], Row> rdd = context.fromDataset(datasetName);

591

return spark.createDataFrame(rdd.map(Tuple2::_2).rdd(), schema);

592

}

593

594

private void saveDataFrameAsDataset(SparkClientContext context, Dataset<Row> df,

595

String datasetName) {

596

JavaPairRDD<byte[], Put> outputRDD = df.javaRDD().mapToPair(row -> {

597

String key = row.getAs("user_id").toString();

598

Put put = new Put(Bytes.toBytes(key));

599

600

// Add all columns to the Put

601

for (String fieldName : row.schema().fieldNames()) {

602

Object value = row.getAs(fieldName);

603

if (value != null) {

604

put.add("data", fieldName, value.toString());

605

}

606

}

607

608

return new Tuple2<>(Bytes.toBytes(key), put);

609

});

610

611

context.saveAsDataset(outputRDD, datasetName);

612

}

613

}

614

```

615

616

## Custom Actions for Data Processing

617

618

Custom actions can be used in workflows for specialized data processing tasks:

619

620

```java { .api }

621

import io.cdap.cdap.api.customaction.*;

622

623

// Custom data processing action

624

public class DataQualityCheckAction extends AbstractCustomAction {

625

626

@Override

627

public void configure(CustomActionConfigurer configurer) {

628

configurer.setName("DataQualityCheck");

629

configurer.setDescription("Validates data quality and sets processing flags");

630

}

631

632

@Override

633

public void run(CustomActionContext context) throws Exception {

634

Table inputData = context.getDataset("raw_data");

635

WorkflowToken token = context.getWorkflowToken();

636

Metrics metrics = context.getMetrics();

637

638

// Perform comprehensive data quality checks

639

DataQualityResults results = performQualityChecks(inputData);

640

641

// Store results in workflow token for downstream processing

642

token.put("dq.total_records", String.valueOf(results.getTotalRecords()));

643

token.put("dq.valid_records", String.valueOf(results.getValidRecords()));

644

token.put("dq.error_rate", String.valueOf(results.getErrorRate()));

645

token.put("dq.null_rate", String.valueOf(results.getNullRate()));

646

token.put("dq.duplicate_rate", String.valueOf(results.getDuplicateRate()));

647

648

// Emit metrics for monitoring

649

metrics.gauge("data_quality.error_rate", results.getErrorRate());

650

metrics.gauge("data_quality.null_rate", results.getNullRate());

651

metrics.gauge("data_quality.duplicate_rate", results.getDuplicateRate());

652

metrics.count("data_quality.total_records", results.getTotalRecords());

653

654

// Determine if processing should continue

655

boolean canProceed = results.getErrorRate() < 0.05 &&

656

results.getNullRate() < 0.1 &&

657

results.getDuplicateRate() < 0.02;

658

659

token.put("dq.can_proceed", String.valueOf(canProceed));

660

661

if (!canProceed) {

662

// Optionally fail the action to stop workflow

663

throw new RuntimeException(

664

String.format("Data quality check failed: error_rate=%.2f%%, null_rate=%.2f%%, duplicate_rate=%.2f%%",

665

results.getErrorRate() * 100,

666

results.getNullRate() * 100,

667

results.getDuplicateRate() * 100)

668

);

669

}

670

}

671

672

private DataQualityResults performQualityChecks(Table inputData) {

673

// Implementation of data quality validation logic

674

return new DataQualityResults();

675

}

676

}

677

678

// Data processing coordination action

679

public class ProcessingCoordinatorAction extends AbstractCustomAction {

680

681

@Override

682

public void run(CustomActionContext context) throws Exception {

683

WorkflowToken token = context.getWorkflowToken();

684

685

// Read processing parameters from token

686

long totalRecords = Long.parseLong(token.get("dq.total_records").toString());

687

double errorRate = Double.parseDouble(token.get("dq.error_rate").toString());

688

689

// Determine optimal processing strategy

690

ProcessingStrategy strategy = determineStrategy(totalRecords, errorRate);

691

692

// Configure downstream processing

693

token.put("processing.strategy", strategy.name());

694

token.put("processing.batch_size", String.valueOf(strategy.getBatchSize()));

695

token.put("processing.parallel_level", String.valueOf(strategy.getParallelism()));

696

697

context.getMetrics().gauge("processing.batch_size", strategy.getBatchSize());

698

}

699

700

private ProcessingStrategy determineStrategy(long totalRecords, double errorRate) {

701

if (totalRecords > 10_000_000) {

702

return ProcessingStrategy.LARGE_BATCH;

703

} else if (totalRecords > 1_000_000) {

704

return ProcessingStrategy.MEDIUM_BATCH;

705

} else {

706

return ProcessingStrategy.SMALL_BATCH;

707

}

708

}

709

}

710

```

711

712

The CDAP data processing framework provides enterprise-grade capabilities for both batch and streaming data processing, with seamless integration between MapReduce, Spark, and CDAP's dataset and operational features.