0
# Join Operations
1
2
Advanced join operations with automatic join optimization, comprehensive join definitions, and error handling for combining data from multiple inputs in CDAP ETL pipelines.
3
4
## Core Join Interfaces
5
6
### AutoJoiner
7
8
Interface for automatic join operations with intelligent optimization.
9
10
```java { .api }
11
package io.cdap.cdap.etl.api.join;
12
13
public interface AutoJoiner {
14
/**
15
* Define the join operation with context information.
16
*/
17
JoinDefinition define(AutoJoinerContext context);
18
}
19
```
20
21
### AutoJoinerContext
22
23
Context interface providing input schemas and validation capabilities.
24
25
```java { .api }
26
package io.cdap.cdap.etl.api.join;
27
28
public interface AutoJoinerContext {
29
/**
30
* Get input schemas for all stages.
31
*/
32
Map<String, Schema> getInputSchemas();
33
34
/**
35
* Get failure collector for validation.
36
*/
37
FailureCollector getFailureCollector();
38
}
39
```
40
41
**AutoJoiner Implementation Example:**
42
```java
43
@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)
44
@Name("SmartJoiner")
45
@Description("Automatically optimized join operation")
46
public class SmartJoiner extends BatchAutoJoiner {
47
48
private final Config config;
49
50
@Override
51
public JoinDefinition define(AutoJoinerContext context) {
52
Map<String, Schema> inputSchemas = context.getInputSchemas();
53
FailureCollector collector = context.getFailureCollector();
54
55
// Validate configuration
56
config.validate(collector, inputSchemas);
57
58
// Build join definition
59
JoinDefinition.Builder builder = JoinDefinition.builder();
60
61
// Configure join stages
62
List<JoinStage> stages = new ArrayList<>();
63
for (StageInfo stageInfo : config.stages) {
64
Schema inputSchema = inputSchemas.get(stageInfo.stageName);
65
if (inputSchema == null) {
66
collector.addFailure("Unknown stage: " + stageInfo.stageName, null);
67
continue;
68
}
69
70
// Determine if stage should be broadcast based on estimated size
71
boolean shouldBroadcast = shouldBroadcastStage(stageInfo, inputSchema);
72
73
JoinStage stage = new JoinStage(
74
stageInfo.stageName,
75
stageInfo.joinType,
76
stageInfo.getSelectedFields(inputSchema),
77
stageInfo.required,
78
shouldBroadcast
79
);
80
stages.add(stage);
81
}
82
83
builder.from(stages);
84
85
// Configure join keys
86
List<JoinKey> joinKeys = new ArrayList<>();
87
for (String stageName : inputSchemas.keySet()) {
88
List<String> keyFields = config.getJoinKeysForStage(stageName);
89
if (!keyFields.isEmpty()) {
90
joinKeys.add(new JoinKey(stageName, keyFields));
91
}
92
}
93
builder.on(joinKeys);
94
95
// Configure output fields
96
List<JoinField> outputFields = buildOutputFields(inputSchemas, collector);
97
builder.select(outputFields);
98
99
// Set distribution strategy if specified
100
if (config.distributionSize != null) {
101
builder.setDistribution(new JoinDistribution(config.distributionSize));
102
}
103
104
// Add join condition if specified
105
if (config.condition != null && !config.condition.isEmpty()) {
106
builder.setCondition(new JoinCondition(config.condition));
107
}
108
109
return builder.build();
110
}
111
112
private boolean shouldBroadcastStage(StageInfo stageInfo, Schema schema) {
113
// Simple heuristic: broadcast if estimated size is small
114
int estimatedRecords = stageInfo.estimatedRecords;
115
int fieldCount = schema.getFields().size();
116
117
// Estimate size (rough calculation)
118
long estimatedSizeBytes = (long) estimatedRecords * fieldCount * 50; // 50 bytes avg per field
119
120
// Broadcast if less than 100MB
121
return estimatedSizeBytes < 100 * 1024 * 1024;
122
}
123
}
124
```
125
126
## Join Configuration
127
128
### JoinDefinition
129
130
Comprehensive definition of join operation with all configuration options.
131
132
```java { .api }
133
package io.cdap.cdap.etl.api.join;
134
135
public class JoinDefinition {
136
/**
137
* Create builder for join definition.
138
*/
139
public static Builder builder() {}
140
141
/**
142
* Get join stages.
143
*/
144
public List<JoinStage> getStages() {}
145
146
/**
147
* Get join keys.
148
*/
149
public List<JoinKey> getKeys() {}
150
151
/**
152
* Get output schema.
153
*/
154
public Schema getOutputSchema() {}
155
156
/**
157
* Get join condition.
158
*/
159
public JoinCondition getCondition() {}
160
161
/**
162
* Get distribution strategy.
163
*/
164
public JoinDistribution getDistribution() {}
165
}
166
```
167
168
**JoinDefinition Builder Usage:**
169
```java
170
JoinDefinition joinDef = JoinDefinition.builder()
171
.select(Arrays.asList(
172
new JoinField("customers", "id", "customer_id"),
173
new JoinField("customers", "name", "customer_name"),
174
new JoinField("customers", "email", "customer_email"),
175
new JoinField("orders", "id", "order_id"),
176
new JoinField("orders", "amount", "order_amount"),
177
new JoinField("orders", "date", "order_date")
178
))
179
.from(Arrays.asList(
180
new JoinStage("customers", JoinType.INNER,
181
Collections.emptyList(), true, false),
182
new JoinStage("orders", JoinType.LEFT_OUTER,
183
Collections.emptyList(), false, true)
184
))
185
.on(Arrays.asList(
186
new JoinKey("customers", Arrays.asList("id")),
187
new JoinKey("orders", Arrays.asList("customer_id"))
188
))
189
.setCondition(new JoinCondition("customers.status = 'active' AND orders.amount > 0"))
190
.setDistribution(new JoinDistribution(4))
191
.build();
192
```
193
194
### JoinStage
195
196
Definition of a stage participating in the join operation.
197
198
```java { .api }
199
package io.cdap.cdap.etl.api.join;
200
201
public class JoinStage {
202
/**
203
* Create join stage with configuration.
204
*/
205
public JoinStage(String stageName, JoinType joinType, List<JoinField> fields,
206
boolean required, boolean broadcast) {}
207
208
/**
209
* Get stage name.
210
*/
211
public String getStageName() {}
212
213
/**
214
* Get join type for this stage.
215
*/
216
public JoinType getJoinType() {}
217
218
/**
219
* Get selected fields from this stage.
220
*/
221
public List<JoinField> getFields() {}
222
223
/**
224
* Check if stage is required for join.
225
*/
226
public boolean isRequired() {}
227
228
/**
229
* Check if stage should be broadcast.
230
*/
231
public boolean isBroadcast() {}
232
}
233
```
234
235
### JoinField
236
237
Field definition in join operation with aliasing support.
238
239
```java { .api }
240
package io.cdap.cdap.etl.api.join;
241
242
public class JoinField {
243
/**
244
* Create join field with stage name, field name, and alias.
245
*/
246
public JoinField(String stageName, String fieldName, String alias) {}
247
248
/**
249
* Get source stage name.
250
*/
251
public String getStageName() {}
252
253
/**
254
* Get source field name.
255
*/
256
public String getFieldName() {}
257
258
/**
259
* Get output field alias.
260
*/
261
public String getAlias() {}
262
}
263
```
264
265
### JoinKey
266
267
Key definition for join operations supporting composite keys.
268
269
```java { .api }
270
package io.cdap.cdap.etl.api.join;
271
272
public class JoinKey {
273
/**
274
* Create join key for stage with field list.
275
*/
276
public JoinKey(String stageName, List<String> fields) {}
277
278
/**
279
* Get stage name.
280
*/
281
public String getStageName() {}
282
283
/**
284
* Get join key fields.
285
*/
286
public List<String> getFields() {}
287
}
288
```
289
290
### JoinCondition
291
292
Advanced join condition with expression support.
293
294
```java { .api }
295
package io.cdap.cdap.etl.api.join;
296
297
public class JoinCondition {
298
/**
299
* Create join condition with expression.
300
*/
301
public JoinCondition(String expression) {}
302
303
/**
304
* Get condition expression.
305
*/
306
public String getExpression() {}
307
}
308
```
309
310
### JoinDistribution
311
312
Distribution strategy for join optimization.
313
314
```java { .api }
315
package io.cdap.cdap.etl.api.join;
316
317
public class JoinDistribution {
318
/**
319
* Create distribution with partition count.
320
*/
321
public JoinDistribution(int partitions) {}
322
323
/**
324
* Get number of partitions.
325
*/
326
public int getPartitions() {}
327
}
328
```
329
330
## Join Types
331
332
Join operations support various join types:
333
334
```java
335
public enum JoinType {
336
INNER, // Inner join - only matching records
337
LEFT_OUTER, // Left outer join - all records from left side
338
RIGHT_OUTER, // Right outer join - all records from right side
339
FULL_OUTER // Full outer join - all records from both sides
340
}
341
```
342
343
## Complex Join Examples
344
345
### Multi-Table Customer Analytics Join
346
347
```java
348
@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)
349
@Name("CustomerAnalyticsJoiner")
350
public class CustomerAnalyticsJoiner extends BatchAutoJoiner {
351
352
@Override
353
public JoinDefinition define(AutoJoinerContext context) {
354
return JoinDefinition.builder()
355
// Select comprehensive customer view
356
.select(Arrays.asList(
357
// Customer information
358
new JoinField("customers", "customer_id", "customer_id"),
359
new JoinField("customers", "name", "customer_name"),
360
new JoinField("customers", "email", "customer_email"),
361
new JoinField("customers", "registration_date", "customer_since"),
362
363
// Order summary
364
new JoinField("orders", "total_orders", "total_orders"),
365
new JoinField("orders", "total_amount", "lifetime_value"),
366
new JoinField("orders", "last_order_date", "last_order_date"),
367
368
// Product preferences
369
new JoinField("preferences", "favorite_category", "favorite_category"),
370
new JoinField("preferences", "avg_rating", "avg_rating"),
371
372
// Support interactions
373
new JoinField("support", "ticket_count", "support_tickets"),
374
new JoinField("support", "satisfaction_score", "satisfaction_score")
375
))
376
377
// Define join stages with optimization hints
378
.from(Arrays.asList(
379
// Customers as the main table (required)
380
new JoinStage("customers", JoinType.INNER,
381
Collections.emptyList(), true, false),
382
383
// Orders aggregated (left join for customers without orders)
384
new JoinStage("orders", JoinType.LEFT_OUTER,
385
Collections.emptyList(), false, false),
386
387
// Product preferences (small lookup table - broadcast)
388
new JoinStage("preferences", JoinType.LEFT_OUTER,
389
Collections.emptyList(), false, true),
390
391
// Support data (left join - not all customers have tickets)
392
new JoinStage("support", JoinType.LEFT_OUTER,
393
Collections.emptyList(), false, false)
394
))
395
396
// Define join keys
397
.on(Arrays.asList(
398
new JoinKey("customers", Arrays.asList("customer_id")),
399
new JoinKey("orders", Arrays.asList("customer_id")),
400
new JoinKey("preferences", Arrays.asList("customer_id")),
401
new JoinKey("support", Arrays.asList("customer_id"))
402
))
403
404
// Add business logic conditions
405
.setCondition(new JoinCondition(
406
"customers.status = 'active' AND " +
407
"(orders.total_amount IS NULL OR orders.total_amount >= 0)"
408
))
409
410
// Optimize distribution
411
.setDistribution(new JoinDistribution(8))
412
.build();
413
}
414
}
415
```
416
417
### Time-Series Data Join with Window Functions
418
419
```java
420
@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)
421
@Name("TimeSeriesJoiner")
422
public class TimeSeriesJoiner extends BatchAutoJoiner {
423
424
private final Config config;
425
426
@Override
427
public JoinDefinition define(AutoJoinerContext context) {
428
// Build time-based join for sensor data
429
return JoinDefinition.builder()
430
.select(Arrays.asList(
431
// Time dimension
432
new JoinField("timestamps", "timestamp", "event_time"),
433
new JoinField("timestamps", "hour", "hour"),
434
new JoinField("timestamps", "day", "day"),
435
436
// Sensor measurements
437
new JoinField("temperature", "value", "temperature"),
438
new JoinField("humidity", "value", "humidity"),
439
new JoinField("pressure", "value", "pressure"),
440
441
// Calculated fields
442
new JoinField("weather", "condition", "weather_condition"),
443
new JoinField("weather", "alert", "weather_alert")
444
))
445
446
.from(Arrays.asList(
447
// Time dimension table (main driver)
448
new JoinStage("timestamps", JoinType.INNER,
449
Collections.emptyList(), true, false),
450
451
// Sensor data (may have gaps)
452
new JoinStage("temperature", JoinType.LEFT_OUTER,
453
Collections.emptyList(), false, false),
454
new JoinStage("humidity", JoinType.LEFT_OUTER,
455
Collections.emptyList(), false, false),
456
new JoinStage("pressure", JoinType.LEFT_OUTER,
457
Collections.emptyList(), false, false),
458
459
// Weather data (external enrichment - broadcast)
460
new JoinStage("weather", JoinType.LEFT_OUTER,
461
Collections.emptyList(), false, true)
462
))
463
464
// Time-based join keys with tolerance
465
.on(Arrays.asList(
466
new JoinKey("timestamps", Arrays.asList("timestamp")),
467
new JoinKey("temperature", Arrays.asList("timestamp")),
468
new JoinKey("humidity", Arrays.asList("timestamp")),
469
new JoinKey("pressure", Arrays.asList("timestamp")),
470
new JoinKey("weather", Arrays.asList("timestamp"))
471
))
472
473
// Filter for valid time range
474
.setCondition(new JoinCondition(
475
"timestamps.timestamp >= '" + config.startTime + "' AND " +
476
"timestamps.timestamp <= '" + config.endTime + "'"
477
))
478
479
.setDistribution(new JoinDistribution(config.partitions))
480
.build();
481
}
482
}
483
```
484
485
## Join Error Handling
486
487
The join API provides comprehensive error handling classes in the `io.cdap.cdap.etl.api.join.error` package:
488
489
### JoinError
490
491
Base class for join operation errors.
492
493
```java { .api }
494
package io.cdap.cdap.etl.api.join.error;
495
496
public class JoinError {
497
// Base error class for join operations
498
}
499
```
500
501
### Specific Join Errors
502
503
#### BroadcastError
504
505
Error related to broadcast join operations.
506
507
```java { .api }
508
package io.cdap.cdap.etl.api.join.error;
509
510
public class BroadcastError extends JoinError {
511
// Errors in broadcast join configuration or execution
512
}
513
```
514
515
#### DistributionSizeError
516
517
Error related to distribution size configuration.
518
519
```java { .api }
520
package io.cdap.cdap.etl.api.join.error;
521
522
public class DistributionSizeError extends JoinError {
523
// Errors in distribution size specification
524
}
525
```
526
527
#### DistributionStageError
528
529
Error related to distribution stage configuration.
530
531
```java { .api }
532
package io.cdap.cdap.etl.api.join.error;
533
534
public class DistributionStageError extends JoinError {
535
// Errors in stage distribution configuration
536
}
537
```
538
539
#### ExpressionConditionError
540
541
Error in join expression conditions.
542
543
```java { .api }
544
package io.cdap.cdap.etl.api.join.error;
545
546
public class ExpressionConditionError extends JoinError {
547
// Errors in join condition expressions
548
}
549
```
550
551
#### JoinKeyError
552
553
Error related to join keys.
554
555
```java { .api }
556
package io.cdap.cdap.etl.api.join.error;
557
558
public class JoinKeyError extends JoinError {
559
// Errors in join key specification
560
}
561
```
562
563
#### JoinKeyFieldError
564
565
Error in join key field specification.
566
567
```java { .api }
568
package io.cdap.cdap.etl.api.join.error;
569
570
public class JoinKeyFieldError extends JoinError {
571
// Errors in join key field names or types
572
}
573
```
574
575
#### OutputSchemaError
576
577
Error in output schema definition.
578
579
```java { .api }
580
package io.cdap.cdap.etl.api.join.error;
581
582
public class OutputSchemaError extends JoinError {
583
// Errors in output schema specification
584
}
585
```
586
587
#### SelectedFieldError
588
589
Error in selected field specification.
590
591
```java { .api }
592
package io.cdap.cdap.etl.api.join.error;
593
594
public class SelectedFieldError extends JoinError {
595
// Errors in field selection for join output
596
}
597
```
598
599
### InvalidJoinException
600
601
Exception for invalid join operations.
602
603
```java { .api }
604
package io.cdap.cdap.etl.api.join;
605
606
public class InvalidJoinException extends Exception {
607
/**
608
* Exception thrown for invalid join configurations.
609
*/
610
public InvalidJoinException(String message) {}
611
public InvalidJoinException(String message, Throwable cause) {}
612
}
613
```
614
615
## Join Validation and Error Handling
616
617
### Comprehensive Join Validation
618
619
```java
620
public class JoinValidator {
621
622
public static void validateJoinDefinition(JoinDefinition joinDef,
623
Map<String, Schema> inputSchemas,
624
FailureCollector collector) {
625
// Validate stages
626
validateJoinStages(joinDef.getStages(), inputSchemas, collector);
627
628
// Validate join keys
629
validateJoinKeys(joinDef.getKeys(), inputSchemas, collector);
630
631
// Validate selected fields
632
validateSelectedFields(joinDef.getStages(), inputSchemas, collector);
633
634
// Validate join condition
635
if (joinDef.getCondition() != null) {
636
validateJoinCondition(joinDef.getCondition(), inputSchemas, collector);
637
}
638
639
// Validate distribution strategy
640
if (joinDef.getDistribution() != null) {
641
validateDistribution(joinDef.getDistribution(), collector);
642
}
643
}
644
645
private static void validateJoinStages(List<JoinStage> stages,
646
Map<String, Schema> inputSchemas,
647
FailureCollector collector) {
648
Set<String> stageNames = new HashSet<>();
649
boolean hasRequiredStage = false;
650
651
for (JoinStage stage : stages) {
652
String stageName = stage.getStageName();
653
654
// Check for duplicate stage names
655
if (stageNames.contains(stageName)) {
656
collector.addFailure("Duplicate stage name: " + stageName,
657
"Use unique stage names in join");
658
}
659
stageNames.add(stageName);
660
661
// Check if stage exists in input schemas
662
if (!inputSchemas.containsKey(stageName)) {
663
collector.addFailure("Unknown stage: " + stageName,
664
"Verify stage name exists in pipeline");
665
}
666
667
// Check if at least one stage is required
668
if (stage.isRequired()) {
669
hasRequiredStage = true;
670
}
671
672
// Validate broadcast hint
673
if (stage.isBroadcast() && stage.getJoinType() == JoinType.FULL_OUTER) {
674
collector.addFailure("Cannot broadcast stage with FULL_OUTER join: " + stageName,
675
"Use different join type or disable broadcast");
676
}
677
}
678
679
if (!hasRequiredStage) {
680
collector.addFailure("At least one stage must be required",
681
"Set required=true for main stage");
682
}
683
}
684
685
private static void validateJoinKeys(List<JoinKey> joinKeys,
686
Map<String, Schema> inputSchemas,
687
FailureCollector collector) {
688
if (joinKeys.isEmpty()) {
689
collector.addFailure("Join keys are required", "Specify join keys for stages");
690
return;
691
}
692
693
// Group keys by stage
694
Map<String, JoinKey> keysByStage = new HashMap<>();
695
for (JoinKey key : joinKeys) {
696
keysByStage.put(key.getStageName(), key);
697
}
698
699
// Validate each key
700
Set<List<Schema.Type>> keyTypes = new HashSet<>();
701
for (JoinKey key : joinKeys) {
702
String stageName = key.getStageName();
703
Schema schema = inputSchemas.get(stageName);
704
705
if (schema == null) {
706
collector.addFailure("Unknown stage in join key: " + stageName, null);
707
continue;
708
}
709
710
List<Schema.Type> stageKeyTypes = new ArrayList<>();
711
for (String fieldName : key.getFields()) {
712
Schema.Field field = schema.getField(fieldName);
713
if (field == null) {
714
collector.addFailure("Unknown field in join key: " + stageName + "." + fieldName,
715
"Verify field exists in stage schema");
716
} else {
717
stageKeyTypes.add(field.getSchema().isNullable() ?
718
field.getSchema().getNonNullable().getType() :
719
field.getSchema().getType());
720
}
721
}
722
keyTypes.add(stageKeyTypes);
723
}
724
725
// Validate key type compatibility
726
if (keyTypes.size() > 1) {
727
collector.addFailure("Join key types are not compatible across stages",
728
"Ensure all join keys have the same types");
729
}
730
}
731
}
732
```
733
734
## Performance Optimization
735
736
### Broadcast Join Optimization
737
738
```java
739
private boolean shouldBroadcastStage(String stageName, Schema schema,
740
Map<String, Object> stageProperties) {
741
// Check explicit broadcast hint
742
Object broadcastHint = stageProperties.get("broadcast");
743
if (Boolean.TRUE.equals(broadcastHint)) {
744
return true;
745
}
746
747
// Estimate stage size
748
Object recordCountHint = stageProperties.get("estimatedRecords");
749
if (recordCountHint instanceof Number) {
750
long estimatedRecords = ((Number) recordCountHint).longValue();
751
int fieldCount = schema.getFields().size();
752
753
// Rough size estimation (bytes)
754
long estimatedSize = estimatedRecords * fieldCount * 50; // 50 bytes avg per field
755
756
// Broadcast if less than 200MB
757
return estimatedSize < 200 * 1024 * 1024;
758
}
759
760
// Default: don't broadcast
761
return false;
762
}
763
```
764
765
### Partitioning Strategy
766
767
```java
768
private int calculateOptimalPartitions(Map<String, Schema> inputSchemas,
769
JoinDefinition joinDef) {
770
// Calculate total estimated input size
771
long totalEstimatedSize = 0;
772
for (JoinStage stage : joinDef.getStages()) {
773
if (!stage.isBroadcast()) {
774
// Estimate non-broadcast stage sizes
775
totalEstimatedSize += estimateStageSize(stage.getStageName(), inputSchemas);
776
}
777
}
778
779
// Target ~128MB per partition
780
long targetPartitionSize = 128 * 1024 * 1024;
781
int calculatedPartitions = (int) Math.max(1, totalEstimatedSize / targetPartitionSize);
782
783
// Cap at reasonable limits
784
return Math.min(Math.max(calculatedPartitions, 1), 1000);
785
}
786
```