0
# Join Operations
1
2
Join operations combine two DataSets based on key equality. Flink supports various join types and provides optimization hints for better performance.
3
4
## Join Types
5
6
### Inner Join
7
8
```scala { .api }
9
class DataSet[T] {
10
def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
11
}
12
```
13
14
### Outer Joins
15
16
```scala { .api }
17
class DataSet[T] {
18
def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
19
def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
20
def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
21
}
22
```
23
24
## Join Key Specification
25
26
### UnfinishedJoinOperation
27
28
```scala { .api }
29
class UnfinishedJoinOperation[L, R] {
30
def where[K: TypeInformation](fun: L => K): HalfUnfinishedJoinOperation[L, R]
31
def where(fields: Int*): HalfUnfinishedJoinOperation[L, R]
32
def where(firstField: String, otherFields: String*): HalfUnfinishedJoinOperation[L, R]
33
}
34
35
class HalfUnfinishedJoinOperation[L, R] {
36
def equalTo[K: TypeInformation](fun: R => K): JoinDataSet[L, R]
37
def equalTo(fields: Int*): JoinDataSet[L, R]
38
def equalTo(firstField: String, otherFields: String*): JoinDataSet[L, R]
39
}
40
```
41
42
## Join Function Application
43
44
```scala { .api }
45
class JoinDataSet[L, R] {
46
def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
47
def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]
48
def apply[O: TypeInformation: ClassTag](joiner: JoinFunction[L, R, O]): DataSet[O]
49
def apply[O: TypeInformation: ClassTag](joiner: FlatJoinFunction[L, R, O]): DataSet[O]
50
51
// Optimization hints
52
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): JoinDataSet[L, R]
53
}
54
```
55
56
## Usage Examples
57
58
### Basic Inner Join
59
60
```scala
61
case class Employee(id: Int, name: String, deptId: Int)
62
case class Department(id: Int, name: String)
63
64
val employees = env.fromElements(
65
Employee(1, "Alice", 10),
66
Employee(2, "Bob", 20),
67
Employee(3, "Charlie", 10)
68
)
69
70
val departments = env.fromElements(
71
Department(10, "Engineering"),
72
Department(20, "Sales"),
73
Department(30, "Marketing")
74
)
75
76
// Join by department ID
77
val employeeDepartments = employees
78
.join(departments)
79
.where(_.deptId)
80
.equalTo(_.id)
81
.apply((emp, dept) => (emp.name, dept.name))
82
83
// Result: ("Alice", "Engineering"), ("Bob", "Sales"), ("Charlie", "Engineering")
84
```
85
86
### Join with Field Positions
87
88
```scala
89
val orders = env.fromElements(
90
(1, "Order1", 100), // (orderId, orderName, customerId)
91
(2, "Order2", 200),
92
(3, "Order3", 100)
93
)
94
95
val customers = env.fromElements(
96
(100, "Customer A"), // (customerId, customerName)
97
(200, "Customer B"),
98
(300, "Customer C")
99
)
100
101
// Join using field positions
102
val orderCustomers = orders
103
.join(customers)
104
.where(2) // customerId field in orders (index 2)
105
.equalTo(0) // customerId field in customers (index 0)
106
.apply((order, customer) => (order._2, customer._2))
107
```
108
109
### Join with Field Names
110
111
```scala
112
case class Order(orderId: Int, orderName: String, customerId: Int)
113
case class Customer(customerId: Int, customerName: String)
114
115
val orders = env.fromElements(
116
Order(1, "Order1", 100),
117
Order(2, "Order2", 200)
118
)
119
120
val customers = env.fromElements(
121
Customer(100, "Customer A"),
122
Customer(200, "Customer B")
123
)
124
125
// Join using field names
126
val result = orders
127
.join(customers)
128
.where("customerId")
129
.equalTo("customerId")
130
.apply((order, customer) => s"${order.orderName} for ${customer.customerName}")
131
```
132
133
### Left Outer Join
134
135
```scala
136
val leftData = env.fromElements(("A", 1), ("B", 2), ("C", 3))
137
val rightData = env.fromElements(("A", 10), ("C", 30), ("D", 40))
138
139
val leftOuterResult = leftData
140
.leftOuterJoin(rightData)
141
.where(_._1)
142
.equalTo(_._1)
143
.apply { (left, right) =>
144
val rightValue = Option(right).map(_._2).getOrElse(0)
145
(left._1, left._2, rightValue)
146
}
147
148
// Result: ("A", 1, 10), ("B", 2, 0), ("C", 3, 30)
149
```
150
151
### Right Outer Join
152
153
```scala
154
val rightOuterResult = leftData
155
.rightOuterJoin(rightData)
156
.where(_._1)
157
.equalTo(_._1)
158
.apply { (left, right) =>
159
val leftValue = Option(left).map(_._2).getOrElse(0)
160
(right._1, leftValue, right._2)
161
}
162
163
// Result: ("A", 1, 10), ("C", 3, 30), ("D", 0, 40)
164
```
165
166
### Full Outer Join
167
168
```scala
169
val fullOuterResult = leftData
170
.fullOuterJoin(rightData)
171
.where(_._1)
172
.equalTo(_._1)
173
.apply { (left, right) =>
174
val key = if (left != null) left._1 else right._1
175
val leftValue = Option(left).map(_._2).getOrElse(0)
176
val rightValue = Option(right).map(_._2).getOrElse(0)
177
(key, leftValue, rightValue)
178
}
179
180
// Result: ("A", 1, 10), ("B", 2, 0), ("C", 3, 30), ("D", 0, 40)
181
```
182
183
## Multiple Key Fields
184
185
```scala
186
case class Sale(region: String, product: String, amount: Double)
187
case class Target(region: String, product: String, target: Double)
188
189
val sales = env.fromElements(
190
Sale("US", "Product A", 1000),
191
Sale("EU", "Product B", 1500)
192
)
193
194
val targets = env.fromElements(
195
Target("US", "Product A", 1200),
196
Target("EU", "Product B", 1400)
197
)
198
199
// Join on multiple fields
200
val comparison = sales
201
.join(targets)
202
.where(s => (s.region, s.product))
203
.equalTo(t => (t.region, t.product))
204
.apply { (sale, target) =>
205
(sale.region, sale.product, sale.amount, target.target, sale.amount - target.target)
206
}
207
```
208
209
## Cross Product Operations
210
211
Cross operations create a Cartesian product of two DataSets.
212
213
### Cross Product
214
215
```scala { .api }
216
class DataSet[T] {
217
def cross[O](other: DataSet[O]): CrossDataSet[T, O]
218
def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O]
219
def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O]
220
}
221
222
class CrossDataSet[L, R] {
223
def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
224
def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]
225
def apply[O: TypeInformation: ClassTag](crosser: CrossFunction[L, R, O]): DataSet[O]
226
}
227
```
228
229
### Cross Examples
230
231
```scala
232
val left = env.fromElements("A", "B")
233
val right = env.fromElements(1, 2, 3)
234
235
// Simple cross product
236
val crossed = left
237
.cross(right)
238
.apply((l, r) => s"$l-$r")
239
240
// Result: "A-1", "A-2", "A-3", "B-1", "B-2", "B-3"
241
242
// Cross with hints for optimization
243
val leftSmall = env.fromElements(1, 2) // Small dataset
244
val rightLarge = env.fromElements((1 to 1000000): _*) // Large dataset
245
246
// Hint that left is tiny (will be broadcast)
247
val crossedWithHint = leftSmall
248
.crossWithHuge(rightLarge)
249
.apply((small, large) => small * large)
250
```
251
252
## CoGroup Operations
253
254
CoGroup operations group elements from two DataSets by key and process groups together.
255
256
### CoGroup API
257
258
```scala { .api }
259
class DataSet[T] {
260
def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
261
}
262
263
class UnfinishedCoGroupOperation[L, R] {
264
def where[K: TypeInformation](fun: L => K): HalfUnfinishedCoGroupOperation[L, R]
265
def where(fields: Int*): HalfUnfinishedCoGroupOperation[L, R]
266
}
267
268
class CoGroupDataSet[L, R] {
269
def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R]) => O): DataSet[O]
270
def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R], Collector[O]) => Unit): DataSet[O]
271
def apply[O: TypeInformation: ClassTag](coGroupFunction: CoGroupFunction[L, R, O]): DataSet[O]
272
}
273
```
274
275
### CoGroup Examples
276
277
```scala
278
val left = env.fromElements(("A", 1), ("B", 2), ("A", 3))
279
val right = env.fromElements(("A", 10), ("C", 30), ("A", 20))
280
281
val coGrouped = left
282
.coGroup(right)
283
.where(_._1)
284
.equalTo(_._1)
285
.apply { (leftIterator, rightIterator) =>
286
val leftList = leftIterator.toList
287
val rightList = rightIterator.toList
288
val key = if (leftList.nonEmpty) leftList.head._1 else rightList.head._1
289
val leftSum = leftList.map(_._2).sum
290
val rightSum = rightList.map(_._2).sum
291
(key, leftSum, rightSum)
292
}
293
294
// Result: ("A", 4, 30), ("B", 2, 0), ("C", 0, 30)
295
```
296
297
## Types
298
299
```scala { .api }
300
// Join function interfaces
301
trait JoinFunction[IN1, IN2, OUT] extends Function {
302
def join(first: IN1, second: IN2): OUT
303
}
304
305
trait FlatJoinFunction[IN1, IN2, OUT] extends Function {
306
def join(first: IN1, second: IN2, out: Collector[OUT]): Unit
307
}
308
309
// Cross function interfaces
310
trait CrossFunction[IN1, IN2, OUT] extends Function {
311
def cross(val1: IN1, val2: IN2): OUT
312
}
313
314
// CoGroup function interface
315
trait CoGroupFunction[IN1, IN2, OUT] extends Function {
316
def coGroup(first: java.lang.Iterable[IN1], second: java.lang.Iterable[IN2], out: Collector[OUT]): Unit
317
}
318
319
// For outer joins, null values are possible
320
// Always check for null in outer join functions:
321
def handleOuterJoin(left: LeftType, right: RightType): ResultType = {
322
val leftValue = Option(left).getOrElse(defaultLeft)
323
val rightValue = Option(right).getOrElse(defaultRight)
324
// Process values...
325
}
326
```
327
328
## Performance Hints
329
330
### Join Hints
331
332
```scala
333
// For joins where one dataset is much smaller
334
val result = largeDataSet
335
.join(smallDataSet)
336
.where(_.key)
337
.equalTo(_.key)
338
.withPartitioner(new CustomPartitioner()) // Custom partitioning
339
.apply((large, small) => processJoin(large, small))
340
341
// Cross product hints
342
val crossResult = smallDataSet
343
.crossWithHuge(largeDataSet) // smallDataSet will be broadcast
344
.apply((small, large) => combine(small, large))
345
```
346
347
### Join Strategy Selection
348
349
Flink automatically chooses join strategies, but you can influence the choice:
350
351
- Use `crossWithTiny()` when one dataset fits in memory
352
- Use `crossWithHuge()` when the other dataset is very large
353
- CoGroup is efficient for joining datasets with many-to-many relationships
354
- Consider using broadcast variables for very small lookup datasets