or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actions-conditions.mdbatch-processing.mdcore-pipeline.mddata-connectors.mdindex.mdjoin-operations.mdlineage-metadata.mdsql-engine.mdvalidation.md

join-operations.mddocs/

0

# Join Operations

1

2

Advanced join operations with automatic join optimization, comprehensive join definitions, and error handling for combining data from multiple inputs in CDAP ETL pipelines.

3

4

## Core Join Interfaces

5

6

### AutoJoiner

7

8

Interface for automatic join operations with intelligent optimization.

9

10

```java { .api }

11

package io.cdap.cdap.etl.api.join;

12

13

public interface AutoJoiner {

14

/**

15

* Define the join operation with context information.

16

*/

17

JoinDefinition define(AutoJoinerContext context);

18

}

19

```

20

21

### AutoJoinerContext

22

23

Context interface providing input schemas and validation capabilities.

24

25

```java { .api }

26

package io.cdap.cdap.etl.api.join;

27

28

public interface AutoJoinerContext {

29

/**

30

* Get input schemas for all stages.

31

*/

32

Map<String, Schema> getInputSchemas();

33

34

/**

35

* Get failure collector for validation.

36

*/

37

FailureCollector getFailureCollector();

38

}

39

```

40

41

**AutoJoiner Implementation Example:**

42

```java

43

@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)

44

@Name("SmartJoiner")

45

@Description("Automatically optimized join operation")

46

public class SmartJoiner extends BatchAutoJoiner {

47

48

private final Config config;

49

50

@Override

51

public JoinDefinition define(AutoJoinerContext context) {

52

Map<String, Schema> inputSchemas = context.getInputSchemas();

53

FailureCollector collector = context.getFailureCollector();

54

55

// Validate configuration

56

config.validate(collector, inputSchemas);

57

58

// Build join definition

59

JoinDefinition.Builder builder = JoinDefinition.builder();

60

61

// Configure join stages

62

List<JoinStage> stages = new ArrayList<>();

63

for (StageInfo stageInfo : config.stages) {

64

Schema inputSchema = inputSchemas.get(stageInfo.stageName);

65

if (inputSchema == null) {

66

collector.addFailure("Unknown stage: " + stageInfo.stageName, null);

67

continue;

68

}

69

70

// Determine if stage should be broadcast based on estimated size

71

boolean shouldBroadcast = shouldBroadcastStage(stageInfo, inputSchema);

72

73

JoinStage stage = new JoinStage(

74

stageInfo.stageName,

75

stageInfo.joinType,

76

stageInfo.getSelectedFields(inputSchema),

77

stageInfo.required,

78

shouldBroadcast

79

);

80

stages.add(stage);

81

}

82

83

builder.from(stages);

84

85

// Configure join keys

86

List<JoinKey> joinKeys = new ArrayList<>();

87

for (String stageName : inputSchemas.keySet()) {

88

List<String> keyFields = config.getJoinKeysForStage(stageName);

89

if (!keyFields.isEmpty()) {

90

joinKeys.add(new JoinKey(stageName, keyFields));

91

}

92

}

93

builder.on(joinKeys);

94

95

// Configure output fields

96

List<JoinField> outputFields = buildOutputFields(inputSchemas, collector);

97

builder.select(outputFields);

98

99

// Set distribution strategy if specified

100

if (config.distributionSize != null) {

101

builder.setDistribution(new JoinDistribution(config.distributionSize));

102

}

103

104

// Add join condition if specified

105

if (config.condition != null && !config.condition.isEmpty()) {

106

builder.setCondition(new JoinCondition(config.condition));

107

}

108

109

return builder.build();

110

}

111

112

private boolean shouldBroadcastStage(StageInfo stageInfo, Schema schema) {

113

// Simple heuristic: broadcast if estimated size is small

114

int estimatedRecords = stageInfo.estimatedRecords;

115

int fieldCount = schema.getFields().size();

116

117

// Estimate size (rough calculation)

118

long estimatedSizeBytes = (long) estimatedRecords * fieldCount * 50; // 50 bytes avg per field

119

120

// Broadcast if less than 100MB

121

return estimatedSizeBytes < 100 * 1024 * 1024;

122

}

123

}

124

```

125

126

## Join Configuration

127

128

### JoinDefinition

129

130

Comprehensive definition of join operation with all configuration options.

131

132

```java { .api }

133

package io.cdap.cdap.etl.api.join;

134

135

public class JoinDefinition {

136

/**

137

* Create builder for join definition.

138

*/

139

public static Builder builder() {}

140

141

/**

142

* Get join stages.

143

*/

144

public List<JoinStage> getStages() {}

145

146

/**

147

* Get join keys.

148

*/

149

public List<JoinKey> getKeys() {}

150

151

/**

152

* Get output schema.

153

*/

154

public Schema getOutputSchema() {}

155

156

/**

157

* Get join condition.

158

*/

159

public JoinCondition getCondition() {}

160

161

/**

162

* Get distribution strategy.

163

*/

164

public JoinDistribution getDistribution() {}

165

}

166

```

167

168

**JoinDefinition Builder Usage:**

169

```java

170

JoinDefinition joinDef = JoinDefinition.builder()

171

.select(Arrays.asList(

172

new JoinField("customers", "id", "customer_id"),

173

new JoinField("customers", "name", "customer_name"),

174

new JoinField("customers", "email", "customer_email"),

175

new JoinField("orders", "id", "order_id"),

176

new JoinField("orders", "amount", "order_amount"),

177

new JoinField("orders", "date", "order_date")

178

))

179

.from(Arrays.asList(

180

new JoinStage("customers", JoinType.INNER,

181

Collections.emptyList(), true, false),

182

new JoinStage("orders", JoinType.LEFT_OUTER,

183

Collections.emptyList(), false, true)

184

))

185

.on(Arrays.asList(

186

new JoinKey("customers", Arrays.asList("id")),

187

new JoinKey("orders", Arrays.asList("customer_id"))

188

))

189

.setCondition(new JoinCondition("customers.status = 'active' AND orders.amount > 0"))

190

.setDistribution(new JoinDistribution(4))

191

.build();

192

```

193

194

### JoinStage

195

196

Definition of a stage participating in the join operation.

197

198

```java { .api }

199

package io.cdap.cdap.etl.api.join;

200

201

public class JoinStage {

202

/**

203

* Create join stage with configuration.

204

*/

205

public JoinStage(String stageName, JoinType joinType, List<JoinField> fields,

206

boolean required, boolean broadcast) {}

207

208

/**

209

* Get stage name.

210

*/

211

public String getStageName() {}

212

213

/**

214

* Get join type for this stage.

215

*/

216

public JoinType getJoinType() {}

217

218

/**

219

* Get selected fields from this stage.

220

*/

221

public List<JoinField> getFields() {}

222

223

/**

224

* Check if stage is required for join.

225

*/

226

public boolean isRequired() {}

227

228

/**

229

* Check if stage should be broadcast.

230

*/

231

public boolean isBroadcast() {}

232

}

233

```

234

235

### JoinField

236

237

Field definition in join operation with aliasing support.

238

239

```java { .api }

240

package io.cdap.cdap.etl.api.join;

241

242

public class JoinField {

243

/**

244

* Create join field with stage name, field name, and alias.

245

*/

246

public JoinField(String stageName, String fieldName, String alias) {}

247

248

/**

249

* Get source stage name.

250

*/

251

public String getStageName() {}

252

253

/**

254

* Get source field name.

255

*/

256

public String getFieldName() {}

257

258

/**

259

* Get output field alias.

260

*/

261

public String getAlias() {}

262

}

263

```

264

265

### JoinKey

266

267

Key definition for join operations supporting composite keys.

268

269

```java { .api }

270

package io.cdap.cdap.etl.api.join;

271

272

public class JoinKey {

273

/**

274

* Create join key for stage with field list.

275

*/

276

public JoinKey(String stageName, List<String> fields) {}

277

278

/**

279

* Get stage name.

280

*/

281

public String getStageName() {}

282

283

/**

284

* Get join key fields.

285

*/

286

public List<String> getFields() {}

287

}

288

```

289

290

### JoinCondition

291

292

Advanced join condition with expression support.

293

294

```java { .api }

295

package io.cdap.cdap.etl.api.join;

296

297

public class JoinCondition {

298

/**

299

* Create join condition with expression.

300

*/

301

public JoinCondition(String expression) {}

302

303

/**

304

* Get condition expression.

305

*/

306

public String getExpression() {}

307

}

308

```

309

310

### JoinDistribution

311

312

Distribution strategy for join optimization.

313

314

```java { .api }

315

package io.cdap.cdap.etl.api.join;

316

317

public class JoinDistribution {

318

/**

319

* Create distribution with partition count.

320

*/

321

public JoinDistribution(int partitions) {}

322

323

/**

324

* Get number of partitions.

325

*/

326

public int getPartitions() {}

327

}

328

```

329

330

## Join Types

331

332

Join operations support various join types:

333

334

```java

335

public enum JoinType {

336

INNER, // Inner join - only matching records

337

LEFT_OUTER, // Left outer join - all records from left side

338

RIGHT_OUTER, // Right outer join - all records from right side

339

FULL_OUTER // Full outer join - all records from both sides

340

}

341

```

342

343

## Complex Join Examples

344

345

### Multi-Table Customer Analytics Join

346

347

```java

348

@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)

349

@Name("CustomerAnalyticsJoiner")

350

public class CustomerAnalyticsJoiner extends BatchAutoJoiner {

351

352

@Override

353

public JoinDefinition define(AutoJoinerContext context) {

354

return JoinDefinition.builder()

355

// Select comprehensive customer view

356

.select(Arrays.asList(

357

// Customer information

358

new JoinField("customers", "customer_id", "customer_id"),

359

new JoinField("customers", "name", "customer_name"),

360

new JoinField("customers", "email", "customer_email"),

361

new JoinField("customers", "registration_date", "customer_since"),

362

363

// Order summary

364

new JoinField("orders", "total_orders", "total_orders"),

365

new JoinField("orders", "total_amount", "lifetime_value"),

366

new JoinField("orders", "last_order_date", "last_order_date"),

367

368

// Product preferences

369

new JoinField("preferences", "favorite_category", "favorite_category"),

370

new JoinField("preferences", "avg_rating", "avg_rating"),

371

372

// Support interactions

373

new JoinField("support", "ticket_count", "support_tickets"),

374

new JoinField("support", "satisfaction_score", "satisfaction_score")

375

))

376

377

// Define join stages with optimization hints

378

.from(Arrays.asList(

379

// Customers as the main table (required)

380

new JoinStage("customers", JoinType.INNER,

381

Collections.emptyList(), true, false),

382

383

// Orders aggregated (left join for customers without orders)

384

new JoinStage("orders", JoinType.LEFT_OUTER,

385

Collections.emptyList(), false, false),

386

387

// Product preferences (small lookup table - broadcast)

388

new JoinStage("preferences", JoinType.LEFT_OUTER,

389

Collections.emptyList(), false, true),

390

391

// Support data (left join - not all customers have tickets)

392

new JoinStage("support", JoinType.LEFT_OUTER,

393

Collections.emptyList(), false, false)

394

))

395

396

// Define join keys

397

.on(Arrays.asList(

398

new JoinKey("customers", Arrays.asList("customer_id")),

399

new JoinKey("orders", Arrays.asList("customer_id")),

400

new JoinKey("preferences", Arrays.asList("customer_id")),

401

new JoinKey("support", Arrays.asList("customer_id"))

402

))

403

404

// Add business logic conditions

405

.setCondition(new JoinCondition(

406

"customers.status = 'active' AND " +

407

"(orders.total_amount IS NULL OR orders.total_amount >= 0)"

408

))

409

410

// Optimize distribution

411

.setDistribution(new JoinDistribution(8))

412

.build();

413

}

414

}

415

```

416

417

### Time-Series Data Join with Window Functions

418

419

```java

420

@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)

421

@Name("TimeSeriesJoiner")

422

public class TimeSeriesJoiner extends BatchAutoJoiner {

423

424

private final Config config;

425

426

@Override

427

public JoinDefinition define(AutoJoinerContext context) {

428

// Build time-based join for sensor data

429

return JoinDefinition.builder()

430

.select(Arrays.asList(

431

// Time dimension

432

new JoinField("timestamps", "timestamp", "event_time"),

433

new JoinField("timestamps", "hour", "hour"),

434

new JoinField("timestamps", "day", "day"),

435

436

// Sensor measurements

437

new JoinField("temperature", "value", "temperature"),

438

new JoinField("humidity", "value", "humidity"),

439

new JoinField("pressure", "value", "pressure"),

440

441

// Calculated fields

442

new JoinField("weather", "condition", "weather_condition"),

443

new JoinField("weather", "alert", "weather_alert")

444

))

445

446

.from(Arrays.asList(

447

// Time dimension table (main driver)

448

new JoinStage("timestamps", JoinType.INNER,

449

Collections.emptyList(), true, false),

450

451

// Sensor data (may have gaps)

452

new JoinStage("temperature", JoinType.LEFT_OUTER,

453

Collections.emptyList(), false, false),

454

new JoinStage("humidity", JoinType.LEFT_OUTER,

455

Collections.emptyList(), false, false),

456

new JoinStage("pressure", JoinType.LEFT_OUTER,

457

Collections.emptyList(), false, false),

458

459

// Weather data (external enrichment - broadcast)

460

new JoinStage("weather", JoinType.LEFT_OUTER,

461

Collections.emptyList(), false, true)

462

))

463

464

// Time-based join keys with tolerance

465

.on(Arrays.asList(

466

new JoinKey("timestamps", Arrays.asList("timestamp")),

467

new JoinKey("temperature", Arrays.asList("timestamp")),

468

new JoinKey("humidity", Arrays.asList("timestamp")),

469

new JoinKey("pressure", Arrays.asList("timestamp")),

470

new JoinKey("weather", Arrays.asList("timestamp"))

471

))

472

473

// Filter for valid time range

474

.setCondition(new JoinCondition(

475

"timestamps.timestamp >= '" + config.startTime + "' AND " +

476

"timestamps.timestamp <= '" + config.endTime + "'"

477

))

478

479

.setDistribution(new JoinDistribution(config.partitions))

480

.build();

481

}

482

}

483

```

484

485

## Join Error Handling

486

487

The join API provides comprehensive error handling classes in the `io.cdap.cdap.etl.api.join.error` package:

488

489

### JoinError

490

491

Base class for join operation errors.

492

493

```java { .api }

494

package io.cdap.cdap.etl.api.join.error;

495

496

public class JoinError {

497

// Base error class for join operations

498

}

499

```

500

501

### Specific Join Errors

502

503

#### BroadcastError

504

505

Error related to broadcast join operations.

506

507

```java { .api }

508

package io.cdap.cdap.etl.api.join.error;

509

510

public class BroadcastError extends JoinError {

511

// Errors in broadcast join configuration or execution

512

}

513

```

514

515

#### DistributionSizeError

516

517

Error related to distribution size configuration.

518

519

```java { .api }

520

package io.cdap.cdap.etl.api.join.error;

521

522

public class DistributionSizeError extends JoinError {

523

// Errors in distribution size specification

524

}

525

```

526

527

#### DistributionStageError

528

529

Error related to distribution stage configuration.

530

531

```java { .api }

532

package io.cdap.cdap.etl.api.join.error;

533

534

public class DistributionStageError extends JoinError {

535

// Errors in stage distribution configuration

536

}

537

```

538

539

#### ExpressionConditionError

540

541

Error in join expression conditions.

542

543

```java { .api }

544

package io.cdap.cdap.etl.api.join.error;

545

546

public class ExpressionConditionError extends JoinError {

547

// Errors in join condition expressions

548

}

549

```

550

551

#### JoinKeyError

552

553

Error related to join keys.

554

555

```java { .api }

556

package io.cdap.cdap.etl.api.join.error;

557

558

public class JoinKeyError extends JoinError {

559

// Errors in join key specification

560

}

561

```

562

563

#### JoinKeyFieldError

564

565

Error in join key field specification.

566

567

```java { .api }

568

package io.cdap.cdap.etl.api.join.error;

569

570

public class JoinKeyFieldError extends JoinError {

571

// Errors in join key field names or types

572

}

573

```

574

575

#### OutputSchemaError

576

577

Error in output schema definition.

578

579

```java { .api }

580

package io.cdap.cdap.etl.api.join.error;

581

582

public class OutputSchemaError extends JoinError {

583

// Errors in output schema specification

584

}

585

```

586

587

#### SelectedFieldError

588

589

Error in selected field specification.

590

591

```java { .api }

592

package io.cdap.cdap.etl.api.join.error;

593

594

public class SelectedFieldError extends JoinError {

595

// Errors in field selection for join output

596

}

597

```

598

599

### InvalidJoinException

600

601

Exception for invalid join operations.

602

603

```java { .api }

604

package io.cdap.cdap.etl.api.join;

605

606

public class InvalidJoinException extends Exception {

607

/**

608

* Exception thrown for invalid join configurations.

609

*/

610

public InvalidJoinException(String message) {}

611

public InvalidJoinException(String message, Throwable cause) {}

612

}

613

```

614

615

## Join Validation and Error Handling

616

617

### Comprehensive Join Validation

618

619

```java

620

public class JoinValidator {

621

622

public static void validateJoinDefinition(JoinDefinition joinDef,

623

Map<String, Schema> inputSchemas,

624

FailureCollector collector) {

625

// Validate stages

626

validateJoinStages(joinDef.getStages(), inputSchemas, collector);

627

628

// Validate join keys

629

validateJoinKeys(joinDef.getKeys(), inputSchemas, collector);

630

631

// Validate selected fields

632

validateSelectedFields(joinDef.getStages(), inputSchemas, collector);

633

634

// Validate join condition

635

if (joinDef.getCondition() != null) {

636

validateJoinCondition(joinDef.getCondition(), inputSchemas, collector);

637

}

638

639

// Validate distribution strategy

640

if (joinDef.getDistribution() != null) {

641

validateDistribution(joinDef.getDistribution(), collector);

642

}

643

}

644

645

private static void validateJoinStages(List<JoinStage> stages,

646

Map<String, Schema> inputSchemas,

647

FailureCollector collector) {

648

Set<String> stageNames = new HashSet<>();

649

boolean hasRequiredStage = false;

650

651

for (JoinStage stage : stages) {

652

String stageName = stage.getStageName();

653

654

// Check for duplicate stage names

655

if (stageNames.contains(stageName)) {

656

collector.addFailure("Duplicate stage name: " + stageName,

657

"Use unique stage names in join");

658

}

659

stageNames.add(stageName);

660

661

// Check if stage exists in input schemas

662

if (!inputSchemas.containsKey(stageName)) {

663

collector.addFailure("Unknown stage: " + stageName,

664

"Verify stage name exists in pipeline");

665

}

666

667

// Check if at least one stage is required

668

if (stage.isRequired()) {

669

hasRequiredStage = true;

670

}

671

672

// Validate broadcast hint

673

if (stage.isBroadcast() && stage.getJoinType() == JoinType.FULL_OUTER) {

674

collector.addFailure("Cannot broadcast stage with FULL_OUTER join: " + stageName,

675

"Use different join type or disable broadcast");

676

}

677

}

678

679

if (!hasRequiredStage) {

680

collector.addFailure("At least one stage must be required",

681

"Set required=true for main stage");

682

}

683

}

684

685

private static void validateJoinKeys(List<JoinKey> joinKeys,

686

Map<String, Schema> inputSchemas,

687

FailureCollector collector) {

688

if (joinKeys.isEmpty()) {

689

collector.addFailure("Join keys are required", "Specify join keys for stages");

690

return;

691

}

692

693

// Group keys by stage

694

Map<String, JoinKey> keysByStage = new HashMap<>();

695

for (JoinKey key : joinKeys) {

696

keysByStage.put(key.getStageName(), key);

697

}

698

699

// Validate each key

700

Set<List<Schema.Type>> keyTypes = new HashSet<>();

701

for (JoinKey key : joinKeys) {

702

String stageName = key.getStageName();

703

Schema schema = inputSchemas.get(stageName);

704

705

if (schema == null) {

706

collector.addFailure("Unknown stage in join key: " + stageName, null);

707

continue;

708

}

709

710

List<Schema.Type> stageKeyTypes = new ArrayList<>();

711

for (String fieldName : key.getFields()) {

712

Schema.Field field = schema.getField(fieldName);

713

if (field == null) {

714

collector.addFailure("Unknown field in join key: " + stageName + "." + fieldName,

715

"Verify field exists in stage schema");

716

} else {

717

stageKeyTypes.add(field.getSchema().isNullable() ?

718

field.getSchema().getNonNullable().getType() :

719

field.getSchema().getType());

720

}

721

}

722

keyTypes.add(stageKeyTypes);

723

}

724

725

// Validate key type compatibility

726

if (keyTypes.size() > 1) {

727

collector.addFailure("Join key types are not compatible across stages",

728

"Ensure all join keys have the same types");

729

}

730

}

731

}

732

```

733

734

## Performance Optimization

735

736

### Broadcast Join Optimization

737

738

```java

739

private boolean shouldBroadcastStage(String stageName, Schema schema,

740

Map<String, Object> stageProperties) {

741

// Check explicit broadcast hint

742

Object broadcastHint = stageProperties.get("broadcast");

743

if (Boolean.TRUE.equals(broadcastHint)) {

744

return true;

745

}

746

747

// Estimate stage size

748

Object recordCountHint = stageProperties.get("estimatedRecords");

749

if (recordCountHint instanceof Number) {

750

long estimatedRecords = ((Number) recordCountHint).longValue();

751

int fieldCount = schema.getFields().size();

752

753

// Rough size estimation (bytes)

754

long estimatedSize = estimatedRecords * fieldCount * 50; // 50 bytes avg per field

755

756

// Broadcast if less than 200MB

757

return estimatedSize < 200 * 1024 * 1024;

758

}

759

760

// Default: don't broadcast

761

return false;

762

}

763

```

764

765

### Partitioning Strategy

766

767

```java

768

private int calculateOptimalPartitions(Map<String, Schema> inputSchemas,

769

JoinDefinition joinDef) {

770

// Calculate total estimated input size

771

long totalEstimatedSize = 0;

772

for (JoinStage stage : joinDef.getStages()) {

773

if (!stage.isBroadcast()) {

774

// Estimate non-broadcast stage sizes

775

totalEstimatedSize += estimateStageSize(stage.getStageName(), inputSchemas);

776

}

777

}

778

779

// Target ~128MB per partition

780

long targetPartitionSize = 128 * 1024 * 1024;

781

int calculatedPartitions = (int) Math.max(1, totalEstimatedSize / targetPartitionSize);

782

783

// Cap at reasonable limits

784

return Math.min(Math.max(calculatedPartitions, 1), 1000);

785

}

786

```