or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actions-conditions.mdbatch-processing.mdcore-pipeline.mddata-connectors.mdindex.mdjoin-operations.mdlineage-metadata.mdsql-engine.mdvalidation.md

batch-processing.mddocs/

0

# Batch Processing

1

2

Comprehensive batch processing capabilities including batch sources, sinks, aggregators, joiners, and post-actions for large-scale data processing in CDAP ETL pipelines.

3

4

## Batch Sources

5

6

### BatchSource<KEY_IN, VAL_IN, OUT>

7

8

Base abstract class for batch data sources that read from external systems.

9

10

```java { .api }

11

package io.cdap.cdap.etl.api.batch;

12

13

public abstract class BatchSource<KEY_IN, VAL_IN, OUT>

14

extends BatchConfigurable<BatchSourceContext>

15

implements Transformation<KeyValue<KEY_IN, VAL_IN>, OUT>,

16

StageLifecycle<BatchRuntimeContext> {

17

18

public static final String PLUGIN_TYPE = "batchsource";

19

public static final String FORMAT_PLUGIN_TYPE = "inputformat";

20

21

// Lifecycle methods

22

public void initialize(BatchRuntimeContext context) throws Exception {}

23

public void destroy() {}

24

25

// Data transformation

26

public void transform(KeyValue<KEY_IN, VAL_IN> input, Emitter<OUT> emitter)

27

throws Exception {}

28

}

29

```

30

31

**Usage Example:**

32

```java

33

@Plugin(type = BatchSource.PLUGIN_TYPE)

34

@Name("FileSource")

35

@Description("Reads data from files")

36

public class FileSource extends BatchSource<LongWritable, Text, StructuredRecord> {

37

38

private final Config config;

39

private Schema schema;

40

41

@Override

42

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

43

StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();

44

45

// Validate configuration

46

config.validate(stageConfigurer.getFailureCollector());

47

48

// Set output schema

49

schema = Schema.parseJson(config.schema);

50

stageConfigurer.setOutputSchema(schema);

51

}

52

53

@Override

54

public void prepareRun(BatchSourceContext context) throws Exception {

55

// Configure input format and input paths

56

Job job = context.getHadoopJob();

57

Configuration conf = job.getConfiguration();

58

59

TextInputFormat.setInputPaths(job, config.path);

60

context.setInput(Input.of(config.referenceName,

61

new SourceInputFormatProvider(TextInputFormat.class, conf)));

62

}

63

64

@Override

65

public void initialize(BatchRuntimeContext context) throws Exception {

66

schema = context.getOutputSchema();

67

}

68

69

@Override

70

public void transform(KeyValue<LongWritable, Text> input,

71

Emitter<StructuredRecord> emitter) throws Exception {

72

String line = input.getValue().toString();

73

String[] fields = line.split(config.delimiter);

74

75

StructuredRecord.Builder builder = StructuredRecord.builder(schema);

76

List<Schema.Field> schemaFields = schema.getFields();

77

78

for (int i = 0; i < Math.min(fields.length, schemaFields.size()); i++) {

79

Schema.Field field = schemaFields.get(i);

80

builder.set(field.getName(), convertValue(fields[i], field.getSchema()));

81

}

82

83

emitter.emit(builder.build());

84

}

85

}

86

```

87

88

## Batch Sinks

89

90

### BatchSink<IN, KEY_OUT, VAL_OUT>

91

92

Base abstract class for batch data sinks that write to external systems.

93

94

```java { .api }

95

package io.cdap.cdap.etl.api.batch;

96

97

public abstract class BatchSink<IN, KEY_OUT, VAL_OUT>

98

extends BatchConfigurable<BatchSinkContext>

99

implements Transformation<IN, KeyValue<KEY_OUT, VAL_OUT>>,

100

StageLifecycle<BatchRuntimeContext> {

101

102

public static final String PLUGIN_TYPE = "batchsink";

103

public static final String FORMAT_PLUGIN_TYPE = "outputformat";

104

105

// Lifecycle methods

106

public void initialize(BatchRuntimeContext context) throws Exception {}

107

public void destroy() {}

108

109

// Data transformation

110

public void transform(IN input, Emitter<KeyValue<KEY_OUT, VAL_OUT>> emitter)

111

throws Exception {}

112

}

113

```

114

115

**Usage Example:**

116

```java

117

@Plugin(type = BatchSink.PLUGIN_TYPE)

118

@Name("FileSink")

119

@Description("Writes data to files")

120

public class FileSink extends BatchSink<StructuredRecord, NullWritable, Text> {

121

122

private final Config config;

123

124

@Override

125

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

126

StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();

127

config.validate(stageConfigurer.getFailureCollector());

128

}

129

130

@Override

131

public void prepareRun(BatchSinkContext context) throws Exception {

132

Job job = context.getHadoopJob();

133

Configuration conf = job.getConfiguration();

134

135

TextOutputFormat.setOutputPath(job, new Path(config.path));

136

context.addOutput(Output.of(config.referenceName,

137

new SinkOutputFormatProvider(TextOutputFormat.class, conf)));

138

}

139

140

@Override

141

public void transform(StructuredRecord input,

142

Emitter<KeyValue<NullWritable, Text>> emitter) throws Exception {

143

StringBuilder line = new StringBuilder();

144

145

List<Schema.Field> fields = input.getSchema().getFields();

146

for (int i = 0; i < fields.size(); i++) {

147

if (i > 0) line.append(config.delimiter);

148

149

Object value = input.get(fields.get(i).getName());

150

line.append(value != null ? value.toString() : "");

151

}

152

153

emitter.emit(new KeyValue<>(NullWritable.get(), new Text(line.toString())));

154

}

155

}

156

```

157

158

## Batch Configuration Base Classes

159

160

### BatchConfigurable<T>

161

162

Base abstract class for batch stage configuration providing common lifecycle methods.

163

164

```java { .api }

165

package io.cdap.cdap.etl.api.batch;

166

167

public abstract class BatchConfigurable<T>

168

implements PipelineConfigurable, SubmitterLifecycle<T> {

169

170

// Pipeline configuration

171

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}

172

173

// Submission lifecycle

174

public abstract void prepareRun(T context) throws Exception;

175

public void onRunFinish(boolean succeeded, T context) {}

176

}

177

```

178

179

### MultiInputBatchConfigurable<T>

180

181

Base class for batch stages with multiple inputs.

182

183

```java { .api }

184

package io.cdap.cdap.etl.api.batch;

185

186

public abstract class MultiInputBatchConfigurable<T>

187

implements MultiInputPipelineConfigurable, SubmitterLifecycle<T> {

188

189

public void configurePipeline(MultiInputPipelineConfigurer multiInputPipelineConfigurer) {}

190

public abstract void prepareRun(T context) throws Exception;

191

public void onRunFinish(boolean succeeded, T context) {}

192

}

193

```

194

195

## Batch Aggregators

196

197

### BatchAggregator<GROUP_KEY, GROUP_VALUE, OUT>

198

199

Batch implementation of aggregation operations.

200

201

```java { .api }

202

package io.cdap.cdap.etl.api.batch;

203

204

public abstract class BatchAggregator<GROUP_KEY, GROUP_VALUE, OUT>

205

extends BatchConfigurable<BatchAggregatorContext>

206

implements Aggregator<GROUP_KEY, GROUP_VALUE, OUT>,

207

StageLifecycle<BatchRuntimeContext> {

208

209

public static final String PLUGIN_TYPE = "batchaggregator";

210

211

// Lifecycle methods

212

public void initialize(BatchRuntimeContext context) throws Exception {}

213

public void destroy() {}

214

215

// Aggregation methods

216

public abstract void groupBy(GROUP_VALUE groupValue, Emitter<GROUP_KEY> emitter)

217

throws Exception;

218

public abstract void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues,

219

Emitter<OUT> emitter) throws Exception;

220

}

221

```

222

223

**Usage Example:**

224

```java

225

@Plugin(type = BatchAggregator.PLUGIN_TYPE)

226

@Name("SalesAggregator")

227

public class SalesAggregator extends BatchAggregator<String, StructuredRecord, StructuredRecord> {

228

229

private Schema outputSchema;

230

231

@Override

232

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

233

outputSchema = Schema.recordOf("sales_summary",

234

Schema.Field.of("region", Schema.of(Schema.Type.STRING)),

235

Schema.Field.of("total_sales", Schema.of(Schema.Type.DOUBLE)),

236

Schema.Field.of("order_count", Schema.of(Schema.Type.INT)),

237

Schema.Field.of("avg_order_value", Schema.of(Schema.Type.DOUBLE))

238

);

239

pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);

240

}

241

242

@Override

243

public void prepareRun(BatchAggregatorContext context) throws Exception {

244

context.setNumPartitions(10); // Optimize parallelism

245

}

246

247

@Override

248

public void groupBy(StructuredRecord groupValue, Emitter<String> emitter) throws Exception {

249

String region = groupValue.get("region");

250

emitter.emit(region);

251

}

252

253

@Override

254

public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues,

255

Emitter<StructuredRecord> emitter) throws Exception {

256

double totalSales = 0.0;

257

int orderCount = 0;

258

259

while (groupValues.hasNext()) {

260

StructuredRecord record = groupValues.next();

261

Double sales = record.get("sales_amount");

262

if (sales != null) {

263

totalSales += sales;

264

orderCount++;

265

}

266

}

267

268

double avgOrderValue = orderCount > 0 ? totalSales / orderCount : 0.0;

269

270

StructuredRecord result = StructuredRecord.builder(outputSchema)

271

.set("region", groupKey)

272

.set("total_sales", totalSales)

273

.set("order_count", orderCount)

274

.set("avg_order_value", avgOrderValue)

275

.build();

276

277

emitter.emit(result);

278

}

279

}

280

```

281

282

### BatchReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT>

283

284

Batch aggregator with reducible intermediate values for improved performance.

285

286

```java { .api }

287

package io.cdap.cdap.etl.api.batch;

288

289

public abstract class BatchReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT>

290

extends BatchConfigurable<BatchAggregatorContext>

291

implements ReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT>,

292

StageLifecycle<BatchRuntimeContext> {

293

294

public static final String PLUGIN_TYPE = "batchaggregator";

295

}

296

```

297

298

## Batch Joiners

299

300

### BatchJoiner<JOIN_KEY, INPUT_RECORD, OUT>

301

302

Batch implementation of join operations.

303

304

```java { .api }

305

package io.cdap.cdap.etl.api.batch;

306

307

public abstract class BatchJoiner<JOIN_KEY, INPUT_RECORD, OUT>

308

extends BatchConfigurable<BatchJoinerContext>

309

implements Joiner<JOIN_KEY, INPUT_RECORD, OUT>,

310

StageLifecycle<BatchJoinerRuntimeContext> {

311

312

public static final String PLUGIN_TYPE = "batchjoiner";

313

314

// Lifecycle methods

315

public void initialize(BatchJoinerRuntimeContext context) throws Exception {}

316

public void destroy() {}

317

318

// Join methods

319

public abstract Collection<JOIN_KEY> getJoinKeys(String stageName, INPUT_RECORD inputRecord)

320

throws Exception;

321

public abstract JoinConfig getJoinConfig() throws Exception;

322

public abstract OUT merge(JOIN_KEY joinKey, Iterable<JoinElement<INPUT_RECORD>> joinResult)

323

throws Exception;

324

}

325

```

326

327

### BatchAutoJoiner

328

329

Auto-joiner implementation for batch processing with automatic join optimization.

330

331

```java { .api }

332

package io.cdap.cdap.etl.api.batch;

333

334

public abstract class BatchAutoJoiner

335

extends BatchConfigurable<BatchJoinerContext>

336

implements AutoJoiner, StageLifecycle<BatchJoinerRuntimeContext> {

337

338

public static final String PLUGIN_TYPE = "batchjoiner";

339

}

340

```

341

342

**Usage Example:**

343

```java

344

@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)

345

@Name("CustomerOrderJoiner")

346

public class CustomerOrderJoiner extends BatchAutoJoiner {

347

348

@Override

349

public JoinDefinition define(AutoJoinerContext context) {

350

return JoinDefinition.builder()

351

.select(Arrays.asList(

352

new JoinField("customers", "customer_id", "customer_id"),

353

new JoinField("customers", "name", "customer_name"),

354

new JoinField("orders", "order_id", "order_id"),

355

new JoinField("orders", "amount", "order_amount")

356

))

357

.from(Arrays.asList(

358

new JoinStage("customers", JoinType.REQUIRED,

359

Collections.emptyList(), true, false),

360

new JoinStage("orders", JoinType.OUTER,

361

Collections.emptyList(), false, false)

362

))

363

.on(JoinCondition.onKeys()

364

.addKey(new JoinKey("customers", Arrays.asList("customer_id")))

365

.addKey(new JoinKey("orders", Arrays.asList("customer_id"))))

366

.build();

367

}

368

}

369

```

370

371

## Batch Contexts

372

373

### BatchContext

374

375

Base context interface for batch operations.

376

377

```java { .api }

378

package io.cdap.cdap.etl.api.batch;

379

380

public interface BatchContext extends StageSubmitterContext {

381

// Base context for batch operations

382

// Inherits arguments and metrics from StageSubmitterContext

383

}

384

```

385

386

### BatchRuntimeContext

387

388

Runtime context for batch stages providing access to runtime services.

389

390

```java { .api }

391

package io.cdap.cdap.etl.api.batch;

392

393

public interface BatchRuntimeContext extends StageContext, LookupProvider {

394

// Combines stage context with lookup capabilities

395

// Provides access to:

396

// - Runtime arguments and metrics

397

// - Plugin instantiation

398

// - Service discovery

399

// - Data lookups

400

}

401

```

402

403

### BatchSourceContext

404

405

Context for batch source operations.

406

407

```java { .api }

408

package io.cdap.cdap.etl.api.batch;

409

410

public interface BatchSourceContext extends BatchContext {

411

/**

412

* Set input for the batch source.

413

*/

414

void setInput(Input input);

415

416

/**

417

* Check if preview mode is enabled.

418

*/

419

boolean isPreviewEnabled();

420

421

/**

422

* Set error dataset for invalid records.

423

*/

424

void setErrorDataset(String errorDatasetName);

425

}

426

```

427

428

### BatchSinkContext

429

430

Context for batch sink operations.

431

432

```java { .api }

433

package io.cdap.cdap.etl.api.batch;

434

435

public interface BatchSinkContext extends BatchContext {

436

/**

437

* Add output for the batch sink.

438

*/

439

void addOutput(Output output);

440

441

/**

442

* Add named output for the batch sink.

443

*/

444

void addOutput(String outputName, Output output);

445

446

/**

447

* Check if preview mode is enabled.

448

*/

449

boolean isPreviewEnabled();

450

}

451

```

452

453

### BatchAggregatorContext

454

455

Context for batch aggregator operations.

456

457

```java { .api }

458

package io.cdap.cdap.etl.api.batch;

459

460

public interface BatchAggregatorContext extends BatchContext {

461

/**

462

* Set number of partitions for aggregation.

463

*/

464

void setNumPartitions(int numPartitions);

465

466

/**

467

* Set memory for group-by operations.

468

*/

469

void setGroupByMemoryMB(int memoryMB);

470

}

471

```

472

473

### BatchJoinerContext

474

475

Context for batch joiner operations.

476

477

```java { .api }

478

package io.cdap.cdap.etl.api.batch;

479

480

public interface BatchJoinerContext extends BatchContext {

481

/**

482

* Set number of partitions for join operations.

483

*/

484

void setNumPartitions(int numPartitions);

485

}

486

```

487

488

### BatchJoinerRuntimeContext

489

490

Runtime context for batch joiner operations.

491

492

```java { .api }

493

package io.cdap.cdap.etl.api.batch;

494

495

public interface BatchJoinerRuntimeContext extends BatchRuntimeContext {

496

// Combines batch runtime context for join operations

497

}

498

```

499

500

## Post Actions

501

502

### PostAction

503

504

Abstract class for post-execution actions in batch pipelines.

505

506

```java { .api }

507

package io.cdap.cdap.etl.api.batch;

508

509

public abstract class PostAction

510

extends BatchConfigurable<BatchActionContext>

511

implements StageLifecycle<BatchActionContext> {

512

513

public static final String PLUGIN_TYPE = "postaction";

514

515

// Lifecycle methods

516

public void initialize(BatchActionContext context) throws Exception {}

517

public void destroy() {}

518

519

// Action execution

520

public abstract void run() throws Exception;

521

}

522

```

523

524

**Usage Example:**

525

```java

526

@Plugin(type = PostAction.PLUGIN_TYPE)

527

@Name("EmailNotification")

528

@Description("Sends email notification after pipeline completion")

529

public class EmailNotificationAction extends PostAction {

530

531

private final Config config;

532

533

@Override

534

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

535

config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());

536

}

537

538

@Override

539

public void prepareRun(BatchActionContext context) throws Exception {

540

// Prepare email configuration

541

}

542

543

@Override

544

public void run() throws Exception {

545

// Send email notification

546

EmailService emailService = new EmailService(config.smtpServer, config.port);

547

548

String subject = "Pipeline Execution Completed";

549

String body = String.format("Pipeline '%s' completed successfully at %s",

550

config.pipelineName, new Date());

551

552

emailService.sendEmail(config.recipients, subject, body);

553

}

554

}

555

```

556

557

## Batch Connectors

558

559

### BatchConnector

560

561

Base class for batch connectors.

562

563

```java { .api }

564

package io.cdap.cdap.etl.api.batch;

565

566

public abstract class BatchConnector extends BatchConfigurable<BatchContext> {

567

public static final String PLUGIN_TYPE = "batchconnector";

568

}

569

```

570

571

## Performance Optimization

572

573

### Partitioning Control

574

575

```java

576

// In aggregator context

577

@Override

578

public void prepareRun(BatchAggregatorContext context) throws Exception {

579

// Set optimal number of partitions based on data size

580

context.setNumPartitions(calculateOptimalPartitions());

581

582

// Set memory for group-by operations

583

context.setGroupByMemoryMB(2048);

584

}

585

```

586

587

### Memory Management

588

589

```java

590

// Configure memory for aggregation operations

591

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

592

// Set stage properties for memory optimization

593

Map<String, String> properties = new HashMap<>();

594

properties.put("spark.executor.memory", "4g");

595

properties.put("spark.sql.shuffle.partitions", "200");

596

597

pipelineConfigurer.getStageConfigurer().addProperties(properties);

598

}

599

```

600

601

### Input/Output Optimization

602

603

```java

604

// Optimize input format configuration

605

@Override

606

public void prepareRun(BatchSourceContext context) throws Exception {

607

Job job = context.getHadoopJob();

608

Configuration conf = job.getConfiguration();

609

610

// Enable compression

611

conf.setBoolean("mapreduce.map.output.compress", true);

612

conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");

613

614

// Set optimal split size

615

conf.setLong("mapreduce.input.fileinputformat.split.maxsize", 128 * 1024 * 1024); // 128MB

616

617

context.setInput(Input.of(config.referenceName,

618

new SourceInputFormatProvider(inputFormatClass, conf)));

619

}

620

```

621

622

## Error Handling in Batch Operations

623

624

### Error Dataset Configuration

625

626

```java

627

@Override

628

public void prepareRun(BatchSourceContext context) throws Exception {

629

// Configure error dataset for invalid records

630

if (config.errorDataset != null) {

631

context.setErrorDataset(config.errorDataset);

632

}

633

634

// Set input with error handling

635

context.setInput(Input.of(config.referenceName, inputFormatProvider));

636

}

637

```

638

639

### Error Record Processing

640

641

```java

642

@Override

643

public void transform(KeyValue<LongWritable, Text> input,

644

Emitter<StructuredRecord> emitter) throws Exception {

645

try {

646

StructuredRecord record = parseRecord(input.getValue().toString());

647

emitter.emit(record);

648

} catch (Exception e) {

649

// Emit error record instead of failing the entire pipeline

650

ErrorRecord<String> errorRecord = new ErrorRecord<>(

651

input.getValue().toString(),

652

"Failed to parse record: " + e.getMessage()

653

);

654

emitter.emitError(new InvalidEntry<>(400, errorRecord.getErrorMessage(), errorRecord));

655

}

656

}

657

```