or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actions-conditions.mdbatch-processing.mdcore-pipeline.mddata-connectors.mdindex.mdjoin-operations.mdlineage-metadata.mdsql-engine.mdvalidation.md

core-pipeline.mddocs/

0

# Core Pipeline Components

1

2

Core ETL pipeline interfaces and classes for building transformation stages, handling data flow, and managing stage lifecycle in CDAP ETL pipelines.

3

4

## Transform Operations

5

6

### Transform<IN, OUT>

7

8

Base abstract class for transformation stages in ETL pipelines.

9

10

```java { .api }

11

package io.cdap.cdap.etl.api;

12

13

public abstract class Transform<IN, OUT>

14

implements StageLifecycle<TransformContext>,

15

SubmitterLifecycle<StageSubmitterContext>,

16

Transformation<IN, OUT>, PipelineConfigurable {

17

18

public static final String PLUGIN_TYPE = "transform";

19

20

// Lifecycle methods

21

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}

22

public void initialize(TransformContext context) throws Exception {}

23

public void prepareRun(StageSubmitterContext context) throws Exception {}

24

public void onRunFinish(boolean succeeded, StageSubmitterContext context) {}

25

public void destroy() {}

26

27

// Access to runtime context

28

protected TransformContext getContext();

29

}

30

```

31

32

**Usage Example:**

33

```java

34

@Plugin(type = Transform.PLUGIN_TYPE)

35

@Name("TextCleaner")

36

@Description("Cleans and normalizes text fields")

37

public class TextCleanerTransform extends Transform<StructuredRecord, StructuredRecord> {

38

39

private final Config config;

40

private Schema outputSchema;

41

42

@Override

43

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

44

StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();

45

Schema inputSchema = stageConfigurer.getInputSchema();

46

47

if (inputSchema != null) {

48

outputSchema = buildOutputSchema(inputSchema);

49

stageConfigurer.setOutputSchema(outputSchema);

50

}

51

52

config.validate(stageConfigurer.getFailureCollector());

53

}

54

55

@Override

56

public void initialize(TransformContext context) throws Exception {

57

outputSchema = context.getOutputSchema();

58

}

59

60

@Override

61

public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter)

62

throws Exception {

63

StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema);

64

65

for (Schema.Field field : input.getSchema().getFields()) {

66

Object value = input.get(field.getName());

67

if (field.getSchema().getType() == Schema.Type.STRING && value != null) {

68

// Clean text: trim, normalize whitespace, remove special chars

69

String cleanedValue = value.toString().trim().replaceAll("\\s+", " ");

70

builder.set(field.getName(), cleanedValue);

71

} else {

72

builder.set(field.getName(), value);

73

}

74

}

75

76

emitter.emit(builder.build());

77

}

78

}

79

```

80

81

### Transformation<IN, OUT>

82

83

Core interface for data transformation operations.

84

85

```java { .api }

86

package io.cdap.cdap.etl.api;

87

88

public interface Transformation<IN, OUT> {

89

/**

90

* Transform input record and emit zero or more output records.

91

*

92

* @param input input record to transform

93

* @param emitter emitter for output records

94

* @throws Exception if transformation fails

95

*/

96

void transform(IN input, Emitter<OUT> emitter) throws Exception;

97

}

98

```

99

100

### Multi-Output Transforms

101

102

#### SplitterTransform<IN, OUT>

103

104

Transform that splits data to multiple named outputs.

105

106

```java { .api }

107

package io.cdap.cdap.etl.api;

108

109

public abstract class SplitterTransform<IN, OUT>

110

implements MultiOutputPipelineConfigurable,

111

StageLifecycle<TransformContext>,

112

SubmitterLifecycle<StageSubmitterContext>,

113

MultiOutputTransformation<IN, OUT> {

114

115

public static final String PLUGIN_TYPE = "splittertransform";

116

}

117

```

118

119

#### MultiOutputTransformation<IN, E>

120

121

Interface for transformations with multiple outputs.

122

123

```java { .api }

124

package io.cdap.cdap.etl.api;

125

126

public interface MultiOutputTransformation<IN, E> {

127

/**

128

* Transform input and emit to multiple named outputs.

129

*/

130

void transform(IN input, MultiOutputEmitter<E> emitter) throws Exception;

131

}

132

```

133

134

**Usage Example:**

135

```java

136

public class DataSplitter extends SplitterTransform<StructuredRecord, StructuredRecord> {

137

138

@Override

139

public void transform(StructuredRecord input, MultiOutputEmitter<StructuredRecord> emitter) {

140

String category = input.get("category");

141

142

if ("valid".equals(category)) {

143

emitter.emit("valid-data", input);

144

} else if ("error".equals(category)) {

145

emitter.emit("error-data", input);

146

} else {

147

emitter.emit("unknown-data", input);

148

}

149

}

150

}

151

```

152

153

### Error Transforms

154

155

#### ErrorTransform<ERR_IN, OUT>

156

157

Transform for handling error records from other stages.

158

159

```java { .api }

160

package io.cdap.cdap.etl.api;

161

162

public abstract class ErrorTransform<ERR_IN, OUT>

163

implements StageLifecycle<TransformContext>,

164

SubmitterLifecycle<StageSubmitterContext>,

165

PipelineConfigurable {

166

167

public static final String PLUGIN_TYPE = "errortransform";

168

169

public void initialize(TransformContext context) throws Exception {}

170

171

/**

172

* Transform error record.

173

*/

174

public abstract void transform(ErrorRecord<ERR_IN> input, Emitter<OUT> emitter)

175

throws Exception;

176

}

177

```

178

179

## Data Emission

180

181

### Emitter<T>

182

183

Primary interface for emitting data to next stage.

184

185

```java { .api }

186

package io.cdap.cdap.etl.api;

187

188

public interface Emitter<T> extends AlertEmitter, ErrorEmitter<T> {

189

/**

190

* Emit a record to the next stage.

191

*/

192

void emit(T value);

193

}

194

```

195

196

### ErrorEmitter<T>

197

198

Interface for emitting error records.

199

200

```java { .api }

201

package io.cdap.cdap.etl.api;

202

203

public interface ErrorEmitter<T> {

204

/**

205

* Emit an error record.

206

*/

207

void emitError(InvalidEntry<T> invalidEntry);

208

}

209

```

210

211

### AlertEmitter

212

213

Interface for emitting alerts.

214

215

```java { .api }

216

package io.cdap.cdap.etl.api;

217

218

public interface AlertEmitter {

219

/**

220

* Emit an alert with payload.

221

*/

222

void emitAlert(Map<String, String> payload);

223

}

224

```

225

226

### MultiOutputEmitter<E>

227

228

Emitter for multi-output transformations.

229

230

```java { .api }

231

package io.cdap.cdap.etl.api;

232

233

public interface MultiOutputEmitter<E> extends AlertEmitter, ErrorEmitter<Object> {

234

/**

235

* Emit to a specific output port.

236

*/

237

void emit(String port, E value);

238

}

239

```

240

241

## Aggregation Operations

242

243

### Aggregator<GROUP_KEY, GROUP_VALUE, OUT>

244

245

Interface for aggregation operations.

246

247

```java { .api }

248

package io.cdap.cdap.etl.api;

249

250

public interface Aggregator<GROUP_KEY, GROUP_VALUE, OUT> {

251

/**

252

* Emit group key for the group value.

253

*/

254

void groupBy(GROUP_VALUE groupValue, Emitter<GROUP_KEY> emitter) throws Exception;

255

256

/**

257

* Aggregate all values for a group key into output records.

258

*/

259

void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues,

260

Emitter<OUT> emitter) throws Exception;

261

}

262

```

263

264

**Usage Example:**

265

```java

266

public class SumAggregator implements Aggregator<String, StructuredRecord, StructuredRecord> {

267

268

@Override

269

public void groupBy(StructuredRecord groupValue, Emitter<String> emitter) throws Exception {

270

String groupKey = groupValue.get("department");

271

emitter.emit(groupKey);

272

}

273

274

@Override

275

public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues,

276

Emitter<StructuredRecord> emitter) throws Exception {

277

double sum = 0.0;

278

int count = 0;

279

280

while (groupValues.hasNext()) {

281

StructuredRecord record = groupValues.next();

282

Double salary = record.get("salary");

283

if (salary != null) {

284

sum += salary;

285

count++;

286

}

287

}

288

289

StructuredRecord result = StructuredRecord.builder(outputSchema)

290

.set("department", groupKey)

291

.set("total_salary", sum)

292

.set("employee_count", count)

293

.set("average_salary", count > 0 ? sum / count : 0.0)

294

.build();

295

296

emitter.emit(result);

297

}

298

}

299

```

300

301

### ReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT>

302

303

Aggregator with reducible intermediate values for better performance.

304

305

```java { .api }

306

package io.cdap.cdap.etl.api;

307

308

public interface ReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT> {

309

/**

310

* Emit group key for the group value.

311

*/

312

void groupBy(GROUP_VALUE groupValue, Emitter<GROUP_KEY> emitter) throws Exception;

313

314

/**

315

* Aggregate group values into intermediate aggregate values.

316

*/

317

void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues,

318

Emitter<AGGREGATE_VALUE> emitter) throws Exception;

319

320

/**

321

* Reduce intermediate aggregate values into final output.

322

*/

323

void reduce(GROUP_KEY groupKey, Iterator<AGGREGATE_VALUE> aggregateValues,

324

Emitter<OUT> emitter) throws Exception;

325

}

326

```

327

328

## Join Operations

329

330

### Joiner<JOIN_KEY, INPUT_RECORD, OUT>

331

332

Interface for join operations between multiple inputs.

333

334

```java { .api }

335

package io.cdap.cdap.etl.api;

336

337

public interface Joiner<JOIN_KEY, INPUT_RECORD, OUT> {

338

/**

339

* Get join keys for input record (deprecated - use getJoinKeys).

340

*/

341

@Deprecated

342

default JOIN_KEY joinOn(String stageName, INPUT_RECORD inputRecord) throws Exception {

343

throw new UnsupportedOperationException("joinOn method is deprecated");

344

}

345

346

/**

347

* Get collection of join keys for input record.

348

*/

349

default Collection<JOIN_KEY> getJoinKeys(String stageName, INPUT_RECORD inputRecord)

350

throws Exception {

351

JOIN_KEY key = joinOn(stageName, inputRecord);

352

return key == null ? Collections.emptySet() : Collections.singleton(key);

353

}

354

355

/**

356

* Get join configuration.

357

*/

358

JoinConfig getJoinConfig() throws Exception;

359

360

/**

361

* Merge records from join result.

362

*/

363

OUT merge(JOIN_KEY joinKey, Iterable<JoinElement<INPUT_RECORD>> joinResult)

364

throws Exception;

365

}

366

```

367

368

### JoinConfig

369

370

Configuration for join operations.

371

372

```java { .api }

373

package io.cdap.cdap.etl.api;

374

375

public class JoinConfig {

376

/**

377

* Create join config with required input stages.

378

*/

379

public JoinConfig(Iterable<String> requiredInputs) {}

380

381

/**

382

* Get required input stages for the join.

383

*/

384

public Iterable<String> getRequiredInputs() {}

385

}

386

```

387

388

### JoinElement<INPUT_RECORD>

389

390

Element in join result containing stage name and input record.

391

392

```java { .api }

393

package io.cdap.cdap.etl.api;

394

395

public class JoinElement<INPUT_RECORD> {

396

public JoinElement(String stageName, INPUT_RECORD inputRecord) {}

397

398

public String getStageName() {}

399

public INPUT_RECORD getInputRecord() {}

400

}

401

```

402

403

## Lifecycle Management

404

405

### StageLifecycle<T>

406

407

Interface for stage initialization and cleanup.

408

409

```java { .api }

410

package io.cdap.cdap.etl.api;

411

412

public interface StageLifecycle<T> extends Destroyable {

413

/**

414

* Initialize stage with runtime context.

415

*/

416

void initialize(T context) throws Exception;

417

}

418

```

419

420

### SubmitterLifecycle<T>

421

422

Interface for submission lifecycle management.

423

424

```java { .api }

425

package io.cdap.cdap.etl.api;

426

427

public interface SubmitterLifecycle<T> {

428

/**

429

* Prepare for pipeline run.

430

*/

431

void prepareRun(T context) throws Exception;

432

433

/**

434

* Handle run completion.

435

*/

436

void onRunFinish(boolean succeeded, T context);

437

}

438

```

439

440

### Destroyable

441

442

Interface for resource cleanup.

443

444

```java { .api }

445

package io.cdap.cdap.etl.api;

446

447

public interface Destroyable {

448

/**

449

* Cleanup resources.

450

*/

451

void destroy();

452

}

453

```

454

455

## Context Interfaces

456

457

### StageContext

458

459

Base runtime context for pipeline stages.

460

461

```java { .api }

462

package io.cdap.cdap.etl.api;

463

464

public interface StageContext extends RuntimeContext, PluginContext,

465

ServiceDiscoverer, FeatureFlagsProvider,

466

ConnectionConfigurable {

467

// Provides access to:

468

// - Runtime arguments and metrics

469

// - Plugin instantiation

470

// - Service discovery

471

// - Feature flags

472

// - Connection configuration

473

}

474

```

475

476

### TransformContext

477

478

Context for transform stages with lookup capabilities.

479

480

```java { .api }

481

package io.cdap.cdap.etl.api;

482

483

public interface TransformContext extends StageContext, LookupProvider {

484

// Inherits StageContext capabilities

485

// Adds lookup provider for data lookups

486

}

487

```

488

489

### StageSubmitterContext

490

491

Context for stage submission operations.

492

493

```java { .api }

494

package io.cdap.cdap.etl.api;

495

496

public interface StageSubmitterContext {

497

/**

498

* Get runtime arguments.

499

*/

500

Arguments getArguments();

501

502

/**

503

* Get stage metrics.

504

*/

505

StageMetrics getMetrics();

506

}

507

```

508

509

## Configuration

510

511

### PipelineConfigurable

512

513

Interface for pipeline configuration.

514

515

```java { .api }

516

package io.cdap.cdap.etl.api;

517

518

public interface PipelineConfigurable {

519

/**

520

* Configure the pipeline stage.

521

*/

522

void configurePipeline(PipelineConfigurer pipelineConfigurer);

523

}

524

```

525

526

### MultiInputPipelineConfigurable

527

528

Configuration interface for stages with multiple inputs.

529

530

```java { .api }

531

package io.cdap.cdap.etl.api;

532

533

public interface MultiInputPipelineConfigurable {

534

/**

535

* Configure multi-input pipeline stage.

536

*/

537

void configurePipeline(MultiInputPipelineConfigurer multiInputPipelineConfigurer);

538

}

539

```

540

541

### MultiOutputPipelineConfigurable

542

543

Configuration interface for stages with multiple outputs.

544

545

```java { .api }

546

package io.cdap.cdap.etl.api;

547

548

public interface MultiOutputPipelineConfigurable {

549

/**

550

* Configure multi-output pipeline stage.

551

*/

552

void configurePipeline(MultiOutputPipelineConfigurer multiOutputPipelineConfigurer);

553

}

554

```

555

556

## Error Handling

557

558

### ErrorRecord<T>

559

560

Represents an error record from pipeline execution.

561

562

```java { .api }

563

package io.cdap.cdap.etl.api;

564

565

public class ErrorRecord<T> {

566

public ErrorRecord(T record, String errorMessage, int stage) {}

567

public ErrorRecord(T record, String errorMessage) {}

568

569

public T getRecord() {}

570

public String getErrorMessage() {}

571

public int getStage() {}

572

}

573

```

574

575

### InvalidEntry<T>

576

577

Represents an invalid entry with error details.

578

579

```java { .api }

580

package io.cdap.cdap.etl.api;

581

582

public class InvalidEntry<T> {

583

public InvalidEntry(int errorCode, String errorMsg, T invalidRecord) {}

584

585

public int getErrorCode() {}

586

public String getErrorMsg() {}

587

public T getInvalidRecord() {}

588

}

589

```

590

591

## Specialized Transform Types

592

593

### SerializableTransform<IN, OUT>

594

595

Serializable transformation interface.

596

597

```java { .api }

598

package io.cdap.cdap.etl.api;

599

600

public interface SerializableTransform<IN, OUT>

601

extends Transformation<IN, OUT>, Serializable {

602

// Combines transformation with Java serialization

603

}

604

```

605

606

### ToKeyValueTransform<IN, KEY, VAL>

607

608

Transform input to key-value pairs.

609

610

```java { .api }

611

package io.cdap.cdap.etl.api;

612

613

public interface ToKeyValueTransform<IN, KEY, VAL> {

614

/**

615

* Transform input into key-value pairs.

616

*/

617

void transform(IN input, Emitter<KeyValue<KEY, VAL>> emitter) throws Exception;

618

}

619

```

620

621

### FromKeyValueTransform<KEY, VAL, OUT>

622

623

Transform from key-value pairs to output records.

624

625

```java { .api }

626

package io.cdap.cdap.etl.api;

627

628

public interface FromKeyValueTransform<KEY, VAL, OUT> {

629

/**

630

* Transform key-value pairs into output records.

631

*/

632

void transform(KeyValue<KEY, VAL> input, Emitter<OUT> emitter) throws Exception;

633

}

634

```