0
# Binary Operations
1
2
Operations for combining multiple DataSets through joins, crosses, and coGroup operations. These operations enable complex data relationships and multi-dataset analysis.
3
4
## Capabilities
5
6
### Join Operations
7
8
Combine two DataSets based on matching keys with various join strategies.
9
10
```scala { .api }
11
class DataSet[T] {
12
/**
13
* Starts an inner join with another DataSet
14
* @param other DataSet to join with
15
* @return UnfinishedJoinOperation for key specification
16
*/
17
def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
18
19
/**
20
* Starts an inner join with optimizer hint
21
* @param other DataSet to join with
22
* @param strategy Join strategy hint (OPTIMIZER_CHOOSES, BROADCAST_HASH_FIRST, etc.)
23
* @return UnfinishedJoinOperation for key specification
24
*/
25
def join[O](other: DataSet[O], strategy: JoinHint): UnfinishedJoinOperation[T, O]
26
27
/**
28
* Joins where other DataSet is small (broadcast join)
29
* @param other Small DataSet to broadcast
30
* @return UnfinishedJoinOperation for key specification
31
*/
32
def joinWithTiny[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
33
34
/**
35
* Joins where other DataSet is large (this DataSet is broadcast)
36
* @param other Large DataSet
37
* @return UnfinishedJoinOperation for key specification
38
*/
39
def joinWithHuge[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
40
}
41
```
42
43
### Outer Join Operations
44
45
Perform left, right, and full outer joins to include non-matching elements.
46
47
```scala { .api }
48
class DataSet[T] {
49
/**
50
* Starts a full outer join with another DataSet
51
* @param other DataSet to join with
52
* @return UnfinishedOuterJoinOperation for key specification
53
*/
54
def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
55
56
/**
57
* Starts a full outer join with optimizer hint
58
* @param other DataSet to join with
59
* @param strategy Join strategy hint
60
* @return UnfinishedOuterJoinOperation for key specification
61
*/
62
def fullOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]
63
64
/**
65
* Starts a left outer join (keeps all elements from this DataSet)
66
* @param other DataSet to join with
67
* @return UnfinishedOuterJoinOperation for key specification
68
*/
69
def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
70
71
/**
72
* Starts a left outer join with optimizer hint
73
* @param other DataSet to join with
74
* @param strategy Join strategy hint
75
* @return UnfinishedOuterJoinOperation for key specification
76
*/
77
def leftOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]
78
79
/**
80
* Starts a right outer join (keeps all elements from other DataSet)
81
* @param other DataSet to join with
82
* @return UnfinishedOuterJoinOperation for key specification
83
*/
84
def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
85
86
/**
87
* Starts a right outer join with optimizer hint
88
* @param other DataSet to join with
89
* @param strategy Join strategy hint
90
* @return UnfinishedOuterJoinOperation for key specification
91
*/
92
def rightOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]
93
}
94
```
95
96
### Join Key Specification
97
98
Specify join keys and apply join functions to create final results.
99
100
```scala { .api }
101
class UnfinishedJoinOperation[L, R] {
102
/**
103
* Specifies join key using field positions
104
* @param fields Field positions for join key
105
* @return KeySelector for specifying other side's key
106
*/
107
def where(fields: Int*): Keys[L]
108
109
/**
110
* Specifies join key using field names
111
* @param firstField First field name
112
* @param otherFields Additional field names
113
* @return KeySelector for specifying other side's key
114
*/
115
def where(firstField: String, otherFields: String*): Keys[L]
116
117
/**
118
* Specifies join key using key selector function
119
* @param fun Key selector function
120
* @return KeySelector for specifying other side's key
121
*/
122
def where[K: TypeInformation](fun: L => K): Keys[L]
123
}
124
125
class Keys[T] {
126
/**
127
* Specifies other side's join key using field positions
128
* @param fields Field positions for other side's key
129
* @return JoinFunctionAssigner for applying join function
130
*/
131
def equalTo(fields: Int*): JoinFunctionAssigner[L, R]
132
133
/**
134
* Specifies other side's join key using field names
135
* @param firstField First field name
136
* @param otherFields Additional field names
137
* @return JoinFunctionAssigner for applying join function
138
*/
139
def equalTo(firstField: String, otherFields: String*): JoinFunctionAssigner[L, R]
140
141
/**
142
* Specifies other side's join key using key selector function
143
* @param fun Key selector function
144
* @return JoinFunctionAssigner for applying join function
145
*/
146
def equalTo[K: TypeInformation](fun: R => K): JoinFunctionAssigner[L, R]
147
}
148
```
149
150
### Join Function Application
151
152
Apply functions to joined elements to produce final results.
153
154
```scala { .api }
155
trait JoinFunctionAssigner[L, R] {
156
/**
157
* Applies function to each pair of joined elements
158
* @param fun Function combining left and right elements
159
* @return DataSet with join results
160
*/
161
def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
162
163
/**
164
* Applies function with collector for multiple outputs per join
165
* @param fun Function with collector for emitting multiple results
166
* @return DataSet with collected join results
167
*/
168
def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]
169
170
/**
171
* Applies JoinFunction to joined elements
172
* @param joiner JoinFunction implementation
173
* @return DataSet with join results
174
*/
175
def apply[O: TypeInformation: ClassTag](joiner: JoinFunction[L, R, O]): DataSet[O]
176
177
/**
178
* Applies FlatJoinFunction for multiple outputs per join
179
* @param joiner FlatJoinFunction implementation
180
* @return DataSet with flattened join results
181
*/
182
def apply[O: TypeInformation: ClassTag](joiner: FlatJoinFunction[L, R, O]): DataSet[O]
183
184
/**
185
* Uses custom partitioner for join distribution
186
* @param partitioner Custom partitioner
187
* @return JoinFunctionAssigner with custom partitioning
188
*/
189
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): JoinFunctionAssigner[L, R]
190
}
191
```
192
193
**Usage Examples:**
194
195
```scala
196
import org.apache.flink.api.scala._
197
198
case class Customer(id: Int, name: String, city: String)
199
case class Order(id: Int, customerId: Int, product: String, amount: Double)
200
case class CustomerOrder(customerName: String, product: String, amount: Double)
201
202
val env = ExecutionEnvironment.getExecutionEnvironment
203
204
val customers = env.fromElements(
205
Customer(1, "Alice", "NYC"),
206
Customer(2, "Bob", "LA"),
207
Customer(3, "Charlie", "Chicago")
208
)
209
210
val orders = env.fromElements(
211
Order(101, 1, "Laptop", 999.99),
212
Order(102, 2, "Phone", 599.99),
213
Order(103, 1, "Mouse", 29.99)
214
)
215
216
// Inner join customers with orders
217
val customerOrders = customers
218
.join(orders)
219
.where(_.id)
220
.equalTo(_.customerId)
221
.apply((customer, order) => CustomerOrder(customer.name, order.product, order.amount))
222
223
// Left outer join to include customers without orders
224
val allCustomerOrders = customers
225
.leftOuterJoin(orders)
226
.where(_.id)
227
.equalTo(_.customerId)
228
.apply { (customer, order) =>
229
if (order != null) {
230
CustomerOrder(customer.name, order.product, order.amount)
231
} else {
232
CustomerOrder(customer.name, "No orders", 0.0)
233
}
234
}
235
```
236
237
### Cross Operations
238
239
Create Cartesian product of two DataSets for all-pairs operations.
240
241
```scala { .api }
242
class DataSet[T] {
243
/**
244
* Creates Cartesian product with another DataSet
245
* @param other DataSet to cross with
246
* @return CrossDataSet for applying cross function
247
*/
248
def cross[O](other: DataSet[O]): CrossDataSet[T, O]
249
250
/**
251
* Cross where other DataSet is small (broadcast)
252
* @param other Small DataSet to broadcast
253
* @return CrossDataSet for applying cross function
254
*/
255
def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O]
256
257
/**
258
* Cross where other DataSet is large (this DataSet is broadcast)
259
* @param other Large DataSet
260
* @return CrossDataSet for applying cross function
261
*/
262
def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O]
263
}
264
```
265
266
### Cross Function Application
267
268
Apply functions to all pairs of elements from crossed DataSets.
269
270
```scala { .api }
271
class CrossDataSet[L, R] {
272
/**
273
* Applies function to each pair of crossed elements
274
* @param fun Function combining left and right elements
275
* @return DataSet with cross results
276
*/
277
def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
278
279
/**
280
* Applies CrossFunction to crossed elements
281
* @param crosser CrossFunction implementation
282
* @return DataSet with cross results
283
*/
284
def apply[O: TypeInformation: ClassTag](crosser: CrossFunction[L, R, O]): DataSet[O]
285
}
286
```
287
288
**Usage Examples:**
289
290
```scala
291
val colors = env.fromElements("Red", "Green", "Blue")
292
val sizes = env.fromElements("Small", "Medium", "Large")
293
294
// Create all color-size combinations
295
val combinations = colors
296
.cross(sizes)
297
.apply((color, size) => s"$color $size")
298
299
// Results: ["Red Small", "Red Medium", "Red Large", "Green Small", ...]
300
```
301
302
### CoGroup Operations
303
304
Group elements from two DataSets by key and process groups together.
305
306
```scala { .api }
307
class DataSet[T] {
308
/**
309
* Starts a coGroup operation with another DataSet
310
* @param other DataSet to coGroup with
311
* @return UnfinishedCoGroupOperation for key specification
312
*/
313
def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
314
}
315
```
316
317
### CoGroup Key Specification and Function Application
318
319
```scala { .api }
320
class UnfinishedCoGroupOperation[L, R] {
321
/**
322
* Specifies coGroup key using field positions
323
* @param fields Field positions for coGroup key
324
* @return KeySelector for specifying other side's key
325
*/
326
def where(fields: Int*): Keys[L]
327
328
/**
329
* Specifies coGroup key using key selector function
330
* @param fun Key selector function
331
* @return KeySelector for specifying other side's key
332
*/
333
def where[K: TypeInformation](fun: L => K): Keys[L]
334
}
335
336
class CoGroupDataSet[L, R] {
337
/**
338
* Applies function to each pair of grouped iterators
339
* @param fun Function processing left and right group iterators
340
* @return DataSet with coGroup results
341
*/
342
def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R]) => O): DataSet[O]
343
344
/**
345
* Applies function with collector to grouped iterators
346
* @param fun Function with collector for multiple outputs
347
* @return DataSet with collected coGroup results
348
*/
349
def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R], Collector[O]) => Unit): DataSet[O]
350
351
/**
352
* Applies CoGroupFunction to grouped iterators
353
* @param coGrouper CoGroupFunction implementation
354
* @return DataSet with coGroup results
355
*/
356
def apply[O: TypeInformation: ClassTag](coGrouper: CoGroupFunction[L, R, O]): DataSet[O]
357
358
/**
359
* Uses custom partitioner for coGroup distribution
360
* @param partitioner Custom partitioner
361
* @return CoGroupDataSet with custom partitioning
362
*/
363
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): CoGroupDataSet[L, R]
364
}
365
```
366
367
### CoGroup Sorting
368
369
Sort elements within each group before coGroup processing.
370
371
```scala { .api }
372
class CoGroupDataSet[L, R] {
373
/**
374
* Sorts first DataSet's groups by field position
375
* @param field Field position for sorting
376
* @param order Sort order
377
* @return CoGroupDataSet with sorted first groups
378
*/
379
def sortFirstGroup(field: Int, order: Order): CoGroupDataSet[L, R]
380
381
/**
382
* Sorts first DataSet's groups by field name
383
* @param field Field name for sorting
384
* @param order Sort order
385
* @return CoGroupDataSet with sorted first groups
386
*/
387
def sortFirstGroup(field: String, order: Order): CoGroupDataSet[L, R]
388
389
/**
390
* Sorts second DataSet's groups by field position
391
* @param field Field position for sorting
392
* @param order Sort order
393
* @return CoGroupDataSet with sorted second groups
394
*/
395
def sortSecondGroup(field: Int, order: Order): CoGroupDataSet[L, R]
396
397
/**
398
* Sorts second DataSet's groups by field name
399
* @param field Field name for sorting
400
* @param order Sort order
401
* @return CoGroupDataSet with sorted second groups
402
*/
403
def sortSecondGroup(field: String, order: Order): CoGroupDataSet[L, R]
404
}
405
```
406
407
**Usage Examples:**
408
409
```scala
410
case class LeftData(key: String, value: Int)
411
case class RightData(key: String, description: String)
412
413
val leftData = env.fromElements(
414
LeftData("A", 1),
415
LeftData("A", 2),
416
LeftData("B", 3)
417
)
418
419
val rightData = env.fromElements(
420
RightData("A", "Group A data"),
421
RightData("C", "Group C data")
422
)
423
424
// CoGroup by key to process groups together
425
val coGroupResult = leftData
426
.coGroup(rightData)
427
.where(_.key)
428
.equalTo(_.key)
429
.apply { (leftIter, rightIter) =>
430
val leftList = leftIter.toList
431
val rightList = rightIter.toList
432
val key = if (leftList.nonEmpty) leftList.head.key else rightList.head.key
433
val leftSum = leftList.map(_.value).sum
434
val rightCount = rightList.length
435
s"Key: $key, LeftSum: $leftSum, RightCount: $rightCount"
436
}
437
```
438
439
### Union Operations
440
441
Combine DataSets of the same type without key matching.
442
443
```scala { .api }
444
class DataSet[T] {
445
/**
446
* Creates union with another DataSet of the same type
447
* @param other DataSet to union with
448
* @return DataSet containing elements from both DataSets
449
*/
450
def union(other: DataSet[T]): DataSet[T]
451
}
452
453
class ExecutionEnvironment {
454
/**
455
* Creates union of multiple DataSets
456
* @param sets Sequence of DataSets to union
457
* @return Unified DataSet
458
*/
459
def union[T](sets: Seq[DataSet[T]]): DataSet[T]
460
}
461
```
462
463
**Usage Examples:**
464
465
```scala
466
val numbers1 = env.fromElements(1, 2, 3)
467
val numbers2 = env.fromElements(4, 5, 6)
468
val numbers3 = env.fromElements(7, 8, 9)
469
470
// Union two DataSets
471
val combined = numbers1.union(numbers2)
472
473
// Union multiple DataSets
474
val allNumbers = env.union(Seq(numbers1, numbers2, numbers3))
475
```
476
477
## Types
478
479
```scala { .api }
480
abstract class JoinFunction[IN1, IN2, OUT] extends Function {
481
def join(first: IN1, second: IN2): OUT
482
}
483
484
abstract class FlatJoinFunction[IN1, IN2, OUT] extends Function {
485
def join(first: IN1, second: IN2, out: Collector[OUT]): Unit
486
}
487
488
abstract class CrossFunction[IN1, IN2, OUT] extends Function {
489
def cross(val1: IN1, val2: IN2): OUT
490
}
491
492
abstract class CoGroupFunction[IN1, IN2, OUT] extends Function {
493
def coGroup(first: java.lang.Iterable[IN1], second: java.lang.Iterable[IN2], out: Collector[OUT]): Unit
494
}
495
496
sealed trait JoinHint
497
object JoinHint {
498
case object OPTIMIZER_CHOOSES extends JoinHint
499
case object BROADCAST_HASH_FIRST extends JoinHint
500
case object BROADCAST_HASH_SECOND extends JoinHint
501
case object REPARTITION_HASH_FIRST extends JoinHint
502
case object REPARTITION_HASH_SECOND extends JoinHint
503
case object REPARTITION_SORT_MERGE extends JoinHint
504
}
505
506
sealed trait JoinType
507
object JoinType {
508
case object INNER extends JoinType
509
case object LEFT_OUTER extends JoinType
510
case object RIGHT_OUTER extends JoinType
511
case object FULL_OUTER extends JoinType
512
}
513
514
class UnfinishedJoinOperation[L, R] {
515
// Fluent interface for join key specification
516
}
517
518
class UnfinishedOuterJoinOperation[L, R] {
519
// Fluent interface for outer join key specification
520
}
521
522
class UnfinishedCoGroupOperation[L, R] {
523
// Fluent interface for coGroup key specification
524
}
525
526
trait JoinFunctionAssigner[L, R] {
527
// Interface for applying join functions
528
}
529
530
class CrossDataSet[L, R] extends DataSet[(L, R)] {
531
// Represents result of cross operation
532
}
533
534
class CoGroupDataSet[L, R] {
535
// Represents configured coGroup operation ready for function application
536
}
537
```