0
# Joins and CoGroups
1
2
Apache Flink Scala API provides comprehensive support for combining multiple DataSets using joins, co-groups, and cross products with flexible key selection and customizable join strategies.
3
4
## Join Operations
5
6
### Basic Join Setup
7
8
```scala { .api }
9
class DataSet[T] {
10
// Start join operation
11
def join[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
12
13
// Join with size hints for optimization
14
def joinWithTiny[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
15
def joinWithHuge[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
16
}
17
```
18
19
### Outer Joins
20
21
```scala { .api }
22
class DataSet[T] {
23
// Left outer join
24
def leftOuterJoin[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
25
26
// Right outer join
27
def rightOuterJoin[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
28
29
// Full outer join
30
def fullOuterJoin[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
31
}
32
```
33
34
### Join Key Specification
35
36
```scala { .api }
37
class UnfinishedJoinOperation[T, O] {
38
// Specify left side key using function
39
def where[K: TypeInformation](keySelector: T => K): UnfinishedJoinOperationWhere[T, O]
40
41
// Specify left side key using field positions
42
def where(fields: Int*): UnfinishedJoinOperationWhere[T, O]
43
44
// Specify left side key using field names
45
def where(firstField: String, otherFields: String*): UnfinishedJoinOperationWhere[T, O]
46
}
47
48
class UnfinishedJoinOperationWhere[T, O] {
49
// Specify right side key using function
50
def equalTo[K: TypeInformation](keySelector: O => K): UnfinishedJoinOperationWhereEqual[T, O]
51
52
// Specify right side key using field positions
53
def equalTo(fields: Int*): UnfinishedJoinOperationWhereEqual[T, O]
54
55
// Specify right side key using field names
56
def equalTo(firstField: String, otherFields: String*): UnfinishedJoinOperationWhereEqual[T, O]
57
}
58
```
59
60
### Join Function Application
61
62
```scala { .api }
63
class UnfinishedJoinOperationWhereEqual[T, O] {
64
// Apply join function - creates tuple by default
65
def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]
66
67
// Use JoinFunction
68
def apply[R: TypeInformation: ClassTag](joiner: JoinFunction[T, O, R]): DataSet[R]
69
70
// Get JoinDataSet for further configuration
71
def getJoinDataSet: JoinDataSet[T, O]
72
}
73
74
class JoinDataSet[T, O] {
75
// Apply join function
76
def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]
77
def apply[R: TypeInformation: ClassTag](joiner: JoinFunction[T, O, R]): DataSet[R]
78
79
// Configuration methods
80
def withJoinHint(joinHint: JoinHint): JoinDataSet[T, O]
81
}
82
```
83
84
## CoGroup Operations
85
86
CoGroup operations group elements from two DataSets by key and provide access to all elements from both groups.
87
88
```scala { .api }
89
class DataSet[T] {
90
// Start coGroup operation
91
def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
92
}
93
94
class UnfinishedCoGroupOperation[T, O] {
95
// Specify left side key
96
def where[K: TypeInformation](keySelector: T => K): HalfUnfinishedKeyPairOperation[T, O, K]
97
def where(fields: Int*): HalfUnfinishedKeyPairOperation[T, O, _]
98
def where(firstField: String, otherFields: String*): HalfUnfinishedKeyPairOperation[T, O, _]
99
}
100
101
class HalfUnfinishedKeyPairOperation[T, O, K] {
102
// Specify right side key
103
def equalTo[K2: TypeInformation](keySelector: O => K2): CoGroupDataSet[T, O]
104
def equalTo(fields: Int*): CoGroupDataSet[T, O]
105
def equalTo(firstField: String, otherFields: String*): CoGroupDataSet[T, O]
106
}
107
```
108
109
### CoGroup Function Application
110
111
```scala { .api }
112
class CoGroupDataSet[T, O] {
113
// Apply function with iterators
114
def apply[R: TypeInformation: ClassTag](fun: (Iterator[T], Iterator[O]) => R): DataSet[R]
115
116
// Apply function with iterators returning multiple results
117
def apply[R: TypeInformation: ClassTag](fun: (Iterator[T], Iterator[O]) => TraversableOnce[R]): DataSet[R]
118
119
// Apply function with iterators and collector
120
def apply[R: TypeInformation: ClassTag](fun: (Iterator[T], Iterator[O], Collector[R]) => Unit): DataSet[R]
121
122
// Use CoGroupFunction
123
def apply[R: TypeInformation: ClassTag](coGrouper: CoGroupFunction[T, O, R]): DataSet[R]
124
}
125
```
126
127
## Cross Operations
128
129
Cross operations create Cartesian products of two DataSets.
130
131
```scala { .api }
132
class DataSet[T] {
133
// Cross with other DataSet
134
def cross[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]
135
136
// Cross with size hints
137
def crossWithTiny[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]
138
def crossWithHuge[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]
139
}
140
141
class CrossDataSet[T, O] {
142
// Apply cross function
143
def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]
144
145
// Use CrossFunction
146
def apply[R: TypeInformation: ClassTag](crosser: CrossFunction[T, O, R]): DataSet[R]
147
148
// Configuration
149
def withCrossHint(crossHint: CrossHint): CrossDataSet[T, O]
150
}
151
```
152
153
## Usage Examples
154
155
### Inner Join
156
157
```scala
158
import org.apache.flink.api.scala._
159
160
case class Customer(id: Int, name: String, city: String)
161
case class Order(customerId: Int, product: String, amount: Double)
162
case class CustomerOrder(customerName: String, city: String, product: String, amount: Double)
163
164
val env = ExecutionEnvironment.getExecutionEnvironment
165
166
val customers = env.fromElements(
167
Customer(1, "Alice", "New York"),
168
Customer(2, "Bob", "London"),
169
Customer(3, "Charlie", "Paris")
170
)
171
172
val orders = env.fromElements(
173
Order(1, "laptop", 1000.0),
174
Order(2, "phone", 500.0),
175
Order(1, "mouse", 25.0)
176
)
177
178
// Join customers with orders
179
val customerOrders = customers
180
.join(orders)
181
.where(_.id)
182
.equalTo(_.customerId)
183
.apply { (customer, order) =>
184
CustomerOrder(customer.name, customer.city, order.product, order.amount)
185
}
186
```
187
188
### Left Outer Join
189
190
```scala
191
import org.apache.flink.api.scala._
192
193
case class Employee(id: Int, name: String, deptId: Int)
194
case class Department(id: Int, name: String)
195
case class EmployeeDept(empName: String, deptName: Option[String])
196
197
val env = ExecutionEnvironment.getExecutionEnvironment
198
199
val employees = env.fromElements(
200
Employee(1, "Alice", 10),
201
Employee(2, "Bob", 20),
202
Employee(3, "Charlie", 99) // Department doesn't exist
203
)
204
205
val departments = env.fromElements(
206
Department(10, "Engineering"),
207
Department(20, "Sales")
208
)
209
210
// Left outer join - all employees, with department if it exists
211
val employeeDepts = employees
212
.leftOuterJoin(departments)
213
.where(_.deptId)
214
.equalTo(_.id)
215
.apply { (emp, deptOpt) =>
216
EmployeeDept(emp.name, Option(deptOpt).map(_.name))
217
}
218
```
219
220
### CoGroup Operation
221
222
```scala
223
import org.apache.flink.api.scala._
224
225
case class Student(id: Int, name: String)
226
case class Grade(studentId: Int, subject: String, score: Int)
227
case class StudentReport(name: String, grades: List[Grade], avgScore: Double)
228
229
val env = ExecutionEnvironment.getExecutionEnvironment
230
231
val students = env.fromElements(
232
Student(1, "Alice"),
233
Student(2, "Bob"),
234
Student(3, "Charlie")
235
)
236
237
val grades = env.fromElements(
238
Grade(1, "Math", 90),
239
Grade(1, "Science", 85),
240
Grade(2, "Math", 78),
241
Grade(2, "Science", 82),
242
Grade(1, "History", 88)
243
)
244
245
// CoGroup to create comprehensive student reports
246
val studentReports = students
247
.coGroup(grades)
248
.where(_.id)
249
.equalTo(_.studentId)
250
.apply { (studentIter, gradeIter) =>
251
val student = studentIter.next() // Should be exactly one
252
val gradeList = gradeIter.toList
253
val avgScore = if (gradeList.nonEmpty) gradeList.map(_.score).sum.toDouble / gradeList.length else 0.0
254
StudentReport(student.name, gradeList, avgScore)
255
}
256
```
257
258
### Multiple Join Conditions
259
260
```scala
261
import org.apache.flink.api.scala._
262
263
case class Sale(year: Int, quarter: Int, region: String, amount: Double)
264
case class Target(year: Int, quarter: Int, region: String, target: Double)
265
case class Performance(year: Int, quarter: Int, region: String, actual: Double, target: Double, achievement: Double)
266
267
val env = ExecutionEnvironment.getExecutionEnvironment
268
269
val sales = env.fromElements(
270
Sale(2023, 1, "US", 1000000),
271
Sale(2023, 1, "EU", 800000),
272
Sale(2023, 2, "US", 1200000)
273
)
274
275
val targets = env.fromElements(
276
Target(2023, 1, "US", 900000),
277
Target(2023, 1, "EU", 750000),
278
Target(2023, 2, "US", 1100000)
279
)
280
281
// Join on multiple fields: year, quarter, region
282
val performance = sales
283
.join(targets)
284
.where(s => (s.year, s.quarter, s.region))
285
.equalTo(t => (t.year, t.quarter, t.region))
286
.apply { (sale, target) =>
287
Performance(
288
sale.year,
289
sale.quarter,
290
sale.region,
291
sale.amount,
292
target.target,
293
sale.amount / target.target
294
)
295
}
296
```
297
298
### Cross Product
299
300
```scala
301
import org.apache.flink.api.scala._
302
303
case class Color(name: String, hex: String)
304
case class Size(name: String, dimension: String)
305
case class Product(color: String, size: String, colorHex: String, sizeDimension: String)
306
307
val env = ExecutionEnvironment.getExecutionEnvironment
308
309
val colors = env.fromElements(
310
Color("Red", "#FF0000"),
311
Color("Blue", "#0000FF"),
312
Color("Green", "#00FF00")
313
)
314
315
val sizes = env.fromElements(
316
Size("Small", "S"),
317
Size("Medium", "M"),
318
Size("Large", "L")
319
)
320
321
// Create all color-size combinations
322
val products = colors
323
.cross(sizes)
324
.apply { (color, size) =>
325
Product(color.name, size.name, color.hex, size.dimension)
326
}
327
```
328
329
### Join with Custom Functions
330
331
```scala
332
import org.apache.flink.api.scala._
333
import org.apache.flink.api.common.functions.JoinFunction
334
335
case class User(id: Int, name: String, email: String)
336
case class Activity(userId: Int, activity: String, timestamp: Long)
337
case class UserActivity(userName: String, email: String, activities: List[String])
338
339
val env = ExecutionEnvironment.getExecutionEnvironment
340
341
val users = env.fromElements(
342
User(1, "Alice", "alice@example.com"),
343
User(2, "Bob", "bob@example.com")
344
)
345
346
val activities = env.fromElements(
347
Activity(1, "login", 1000L),
348
Activity(1, "view_page", 1001L),
349
Activity(2, "login", 1002L)
350
)
351
352
// Using JoinFunction for complex join logic
353
class UserActivityJoinFunction extends JoinFunction[User, Activity, (Int, String, String, String)] {
354
def join(user: User, activity: Activity): (Int, String, String, String) = {
355
(user.id, user.name, user.email, activity.activity)
356
}
357
}
358
359
val userActivities = users
360
.join(activities)
361
.where(_.id)
362
.equalTo(_.userId)
363
.apply(new UserActivityJoinFunction())
364
```
365
366
### Broadcast Join
367
368
```scala
369
import org.apache.flink.api.scala._
370
371
case class Transaction(id: String, productCode: String, amount: Double)
372
case class ProductInfo(code: String, name: String, category: String)
373
case class EnrichedTransaction(id: String, productName: String, category: String, amount: Double)
374
375
val env = ExecutionEnvironment.getExecutionEnvironment
376
377
// Large transaction dataset
378
val transactions = env.fromElements(
379
Transaction("t1", "P001", 100.0),
380
Transaction("t2", "P002", 200.0),
381
Transaction("t3", "P001", 150.0)
382
)
383
384
// Small product info dataset (suitable for broadcasting)
385
val productInfo = env.fromElements(
386
ProductInfo("P001", "Laptop", "Electronics"),
387
ProductInfo("P002", "Phone", "Electronics")
388
)
389
390
// Join with broadcast hint for small dataset
391
val enrichedTransactions = transactions
392
.joinWithTiny(productInfo) // Hint that productInfo is small
393
.where(_.productCode)
394
.equalTo(_.code)
395
.apply { (transaction, product) =>
396
EnrichedTransaction(
397
transaction.id,
398
product.name,
399
product.category,
400
transaction.amount
401
)
402
}
403
```