0
# Join and CoGroup Operations
1
2
Advanced operations for combining multiple DataSets using various join strategies, coGroup operations, and cross products. These operations enable complex data processing workflows involving multiple data sources.
3
4
## Capabilities
5
6
### Join Operations
7
8
Join two DataSets based on key equality with support for different join types.
9
10
```java { .api }
11
/**
12
* Join with another DataSet
13
* @param other the other DataSet to join with
14
* @return JoinOperatorSets for key specification and join configuration
15
*/
16
public <R> JoinOperatorSets<T, R> join(DataSet<R> other);
17
```
18
19
**Usage Examples:**
20
21
```java
22
// Inner join on key fields
23
DataSet<Tuple2<Long, String>> users = env.fromElements(
24
new Tuple2<>(1L, "Alice"), new Tuple2<>(2L, "Bob"));
25
DataSet<Tuple2<Long, String>> orders = env.fromElements(
26
new Tuple2<>(1L, "Order1"), new Tuple2<>(1L, "Order2"), new Tuple2<>(2L, "Order3"));
27
28
// Join users and orders on user ID
29
DataSet<Tuple2<Tuple2<Long, String>, Tuple2<Long, String>>> joined = users
30
.join(orders)
31
.where(0) // user ID field in users
32
.equalTo(0) // user ID field in orders
33
.types(Tuple2.class, Tuple2.class);
34
35
// Custom join function
36
DataSet<String> customJoin = users
37
.join(orders)
38
.where(0)
39
.equalTo(0)
40
.with(new JoinFunction<Tuple2<Long, String>, Tuple2<Long, String>, String>() {
41
@Override
42
public String join(Tuple2<Long, String> user, Tuple2<Long, String> order) {
43
return user.f1 + " has " + order.f1;
44
}
45
});
46
```
47
48
### Join Types
49
50
Different types of joins supported by the join operation.
51
52
```java { .api }
53
/**
54
* Enum defining join types
55
*/
56
public enum JoinType {
57
INNER, // Inner join (default)
58
LEFT_OUTER, // Left outer join
59
RIGHT_OUTER, // Right outer join
60
FULL_OUTER // Full outer join
61
}
62
63
/**
64
* Specify the join type
65
* @param joinType the type of join to perform
66
* @return configured join operator
67
*/
68
public JoinOperator<T1, T2, R> with(JoinType joinType);
69
```
70
71
### Join Function Interface
72
73
Custom join logic for combining elements from both DataSets.
74
75
```java { .api }
76
/**
77
* Interface for join functions
78
* @param <IN1> type of elements from first DataSet
79
* @param <IN2> type of elements from second DataSet
80
* @param <OUT> type of result elements
81
*/
82
public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
83
/**
84
* Join function that combines two elements
85
* @param first element from first DataSet
86
* @param second element from second DataSet
87
* @return combined result element
88
*/
89
OUT join(IN1 first, IN2 second) throws Exception;
90
}
91
92
/**
93
* Rich version with access to runtime context
94
*/
95
public abstract class RichJoinFunction<IN1, IN2, OUT>
96
extends AbstractRichFunction implements JoinFunction<IN1, IN2, OUT> {
97
}
98
```
99
100
### CoGroup Operations
101
102
Group elements from two DataSets by key and process groups together.
103
104
```java { .api }
105
/**
106
* CoGroup with another DataSet
107
* @param other the other DataSet to coGroup with
108
* @return CoGroupOperatorSets for key specification and coGroup configuration
109
*/
110
public <R> CoGroupOperator.CoGroupOperatorSets<T, R> coGroup(DataSet<R> other);
111
```
112
113
**Usage Examples:**
114
115
```java
116
// CoGroup users and orders
117
DataSet<String> coGroupResult = users
118
.coGroup(orders)
119
.where(0)
120
.equalTo(0)
121
.with(new CoGroupFunction<Tuple2<Long, String>, Tuple2<Long, String>, String>() {
122
@Override
123
public void coGroup(
124
Iterable<Tuple2<Long, String>> users,
125
Iterable<Tuple2<Long, String>> orders,
126
Collector<String> out) {
127
128
Iterator<Tuple2<Long, String>> userIter = users.iterator();
129
if (userIter.hasNext()) {
130
Tuple2<Long, String> user = userIter.next();
131
int orderCount = 0;
132
for (Tuple2<Long, String> order : orders) {
133
orderCount++;
134
}
135
out.collect(user.f1 + " has " + orderCount + " orders");
136
}
137
}
138
});
139
```
140
141
### CoGroup Function Interface
142
143
Custom coGroup logic for processing groups from both DataSets.
144
145
```java { .api }
146
/**
147
* Interface for coGroup functions
148
* @param <IN1> type of elements from first DataSet
149
* @param <IN2> type of elements from second DataSet
150
* @param <OUT> type of result elements
151
*/
152
public interface CoGroupFunction<IN1, IN2, OUT> extends Function, Serializable {
153
/**
154
* CoGroup function that processes groups from both sides
155
* @param first iterable of elements from first DataSet
156
* @param second iterable of elements from second DataSet
157
* @param out collector for result elements
158
*/
159
void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception;
160
}
161
162
/**
163
* Rich version with access to runtime context
164
*/
165
public abstract class RichCoGroupFunction<IN1, IN2, OUT>
166
extends AbstractRichFunction implements CoGroupFunction<IN1, IN2, OUT> {
167
}
168
```
169
170
### Cross Operations
171
172
Compute the cross product (Cartesian product) of two DataSets.
173
174
```java { .api }
175
/**
176
* Cross product with another DataSet
177
* @param other the other DataSet to cross with
178
* @return CrossOperator for cross product configuration
179
*/
180
public <R> CrossOperator.DefaultCross<T, R> cross(DataSet<R> other);
181
```
182
183
**Usage Examples:**
184
185
```java
186
// Simple cross product
187
DataSet<String> colors = env.fromElements("red", "blue");
188
DataSet<String> sizes = env.fromElements("small", "large");
189
190
DataSet<Tuple2<String, String>> crossed = colors
191
.cross(sizes)
192
.types(String.class, String.class);
193
194
// Cross with custom function
195
DataSet<String> customCross = colors
196
.cross(sizes)
197
.with(new CrossFunction<String, String, String>() {
198
@Override
199
public String cross(String color, String size) {
200
return size + " " + color + " item";
201
}
202
});
203
```
204
205
### Cross Function Interface
206
207
Custom cross logic for combining every element from first DataSet with every element from second DataSet.
208
209
```java { .api }
210
/**
211
* Interface for cross functions
212
* @param <IN1> type of elements from first DataSet
213
* @param <IN2> type of elements from second DataSet
214
* @param <OUT> type of result elements
215
*/
216
public interface CrossFunction<IN1, IN2, OUT> extends Function, Serializable {
217
/**
218
* Cross function that combines elements from both DataSets
219
* @param first element from first DataSet
220
* @param second element from second DataSet
221
* @return combined result element
222
*/
223
OUT cross(IN1 first, IN2 second) throws Exception;
224
}
225
226
/**
227
* Rich version with access to runtime context
228
*/
229
public abstract class RichCrossFunction<IN1, IN2, OUT>
230
extends AbstractRichFunction implements CrossFunction<IN1, IN2, OUT> {
231
}
232
```
233
234
### Union Operations
235
236
Combine two DataSets of the same type into a single DataSet.
237
238
```java { .api }
239
/**
240
* Union with another DataSet of the same type
241
* @param other the other DataSet to union with
242
* @return UnionOperator containing elements from both DataSets
243
*/
244
public UnionOperator<T> union(DataSet<T> other);
245
```
246
247
**Usage Examples:**
248
249
```java
250
// Union two DataSets
251
DataSet<String> dataset1 = env.fromElements("a", "b", "c");
252
DataSet<String> dataset2 = env.fromElements("d", "e", "f");
253
254
DataSet<String> combined = dataset1.union(dataset2);
255
// Result contains: a, b, c, d, e, f
256
257
// Union multiple DataSets
258
DataSet<String> dataset3 = env.fromElements("g", "h");
259
DataSet<String> allCombined = dataset1.union(dataset2).union(dataset3);
260
```
261
262
### Key Specification
263
264
Methods for specifying keys for join and coGroup operations.
265
266
```java { .api }
267
/**
268
* Specify key fields by position (for Tuple types)
269
* @param fields the field positions to use as keys
270
* @return key specification for further configuration
271
*/
272
public JoinOperatorSets.JoinOperatorSetsPredicate where(int... fields);
273
274
/**
275
* Specify key fields by name (for POJO types)
276
* @param fields the field names to use as keys
277
* @return key specification for further configuration
278
*/
279
public JoinOperatorSets.JoinOperatorSetsPredicate where(String... fields);
280
281
/**
282
* Specify key using key selector function
283
* @param keyExtractor function to extract the key
284
* @return key specification for further configuration
285
*/
286
public <K> JoinOperatorSets.JoinOperatorSetsPredicateWithKeySelector<K> where(KeySelector<T, K> keyExtractor);
287
288
/**
289
* Specify the matching key fields in the other DataSet
290
* @param fields the field positions in the other DataSet
291
* @return configured join operator
292
*/
293
public JoinOperator.EquiJoin<T1, T2> equalTo(int... fields);
294
```
295
296
### Join Hints
297
298
Performance hints for join execution strategy.
299
300
```java { .api }
301
/**
302
* Enum for join strategy hints
303
*/
304
public enum JoinHint {
305
OPTIMIZER_CHOOSES, // Let optimizer choose strategy
306
BROADCAST_HASH_FIRST, // Broadcast first DataSet and use hash join
307
BROADCAST_HASH_SECOND,// Broadcast second DataSet and use hash join
308
REPARTITION_HASH_FIRST, // Repartition both, use first as build side
309
REPARTITION_HASH_SECOND, // Repartition both, use second as build side
310
REPARTITION_SORT_MERGE // Repartition and sort-merge join
311
}
312
313
/**
314
* Provide a hint for join execution strategy
315
* @param hint the execution strategy hint
316
* @return configured join operator
317
*/
318
public JoinOperator<T1, T2, R> strategy(JoinHint hint);
319
```
320
321
## Types
322
323
```java { .api }
324
import org.apache.flink.api.java.operators.JoinOperator;
325
import org.apache.flink.api.java.operators.CoGroupOperator;
326
import org.apache.flink.api.java.operators.CrossOperator;
327
import org.apache.flink.api.java.operators.UnionOperator;
328
import org.apache.flink.api.java.operators.join.JoinType;
329
import org.apache.flink.api.java.operators.join.JoinOperatorSets;
330
import org.apache.flink.api.common.functions.JoinFunction;
331
import org.apache.flink.api.common.functions.CoGroupFunction;
332
import org.apache.flink.api.common.functions.CrossFunction;
333
import org.apache.flink.api.common.functions.RichJoinFunction;
334
import org.apache.flink.api.common.functions.RichCoGroupFunction;
335
import org.apache.flink.api.common.functions.RichCrossFunction;
336
import org.apache.flink.util.Collector;
337
```