0
# Grouped DataSet Operations
1
2
Grouped DataSets are created by applying groupBy operations to DataSets. They provide specialized operations for processing data within groups.
3
4
## Creating Grouped DataSets
5
6
```scala { .api }
7
class DataSet[T] {
8
def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
9
def groupBy(fields: Int*): GroupedDataSet[T]
10
def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]
11
}
12
```
13
14
### Grouping Examples
15
16
```scala
17
case class Sale(region: String, product: String, amount: Double, date: String)
18
19
val sales = env.fromElements(
20
Sale("US", "ProductA", 100.0, "2023-01-01"),
21
Sale("EU", "ProductA", 150.0, "2023-01-01"),
22
Sale("US", "ProductB", 200.0, "2023-01-02"),
23
Sale("EU", "ProductB", 120.0, "2023-01-02")
24
)
25
26
// Group by key function
27
val groupedByRegion = sales.groupBy(_.region)
28
29
// Group by field position (for tuples)
30
val tuples = env.fromElements(("US", 100), ("EU", 150), ("US", 200))
31
val groupedByPosition = tuples.groupBy(0)
32
33
// Group by field name (for case classes)
34
val groupedByName = sales.groupBy("region")
35
36
// Group by multiple fields
37
val groupedByRegionAndProduct = sales.groupBy(s => (s.region, s.product))
38
val groupedByMultipleFields = sales.groupBy("region", "product")
39
```
40
41
## Sorting Within Groups
42
43
```scala { .api }
44
class GroupedDataSet[T] {
45
def sortGroup(field: Int, order: Order): GroupedDataSet[T]
46
def sortGroup(field: String, order: Order): GroupedDataSet[T]
47
def sortGroup[K: TypeInformation](fun: T => K, order: Order): GroupedDataSet[T]
48
}
49
```
50
51
### Sorting Examples
52
53
```scala
54
import org.apache.flink.api.common.operators.Order
55
56
val sales = env.fromElements(
57
("US", 100, "2023-01-01"),
58
("US", 200, "2023-01-02"),
59
("EU", 150, "2023-01-01")
60
)
61
62
// Sort by field position within groups
63
val sortedByAmount = sales
64
.groupBy(0) // Group by region
65
.sortGroup(1, Order.DESCENDING) // Sort by amount within each group
66
67
// Sort by field name
68
case class Transaction(region: String, amount: Double, date: String)
69
val transactions = env.fromElements(
70
Transaction("US", 100.0, "2023-01-01"),
71
Transaction("US", 200.0, "2023-01-02")
72
)
73
74
val sortedTransactions = transactions
75
.groupBy("region")
76
.sortGroup("amount", Order.ASCENDING)
77
78
// Sort by key function
79
val sortedByDate = transactions
80
.groupBy(_.region)
81
.sortGroup(_.date, Order.DESCENDING)
82
```
83
84
## Aggregation Operations
85
86
### Basic Aggregations
87
88
```scala { .api }
89
class GroupedDataSet[T] {
90
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
91
def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
92
def sum(field: Int): AggregateDataSet[T]
93
def sum(field: String): AggregateDataSet[T]
94
def max(field: Int): AggregateDataSet[T]
95
def max(field: String): AggregateDataSet[T]
96
def min(field: Int): AggregateDataSet[T]
97
def min(field: String): AggregateDataSet[T]
98
}
99
```
100
101
### Aggregation Examples
102
103
```scala
104
val salesData = env.fromElements(
105
("US", "ProductA", 100.0),
106
("US", "ProductB", 200.0),
107
("EU", "ProductA", 150.0),
108
("EU", "ProductB", 120.0)
109
)
110
111
// Sum sales by region
112
val totalByRegion = salesData
113
.groupBy(0) // Group by region
114
.sum(2) // Sum the amount field
115
116
// Max sales by region
117
val maxByRegion = salesData
118
.groupBy(0)
119
.max(2)
120
121
// Min sales by region
122
val minByRegion = salesData
123
.groupBy(0)
124
.min(2)
125
126
// Multiple aggregations
127
val stats = salesData
128
.groupBy(0)
129
.aggregate(Aggregations.SUM, 2)
130
.and(Aggregations.MAX, 2)
131
.and(Aggregations.MIN, 2)
132
```
133
134
### MaxBy and MinBy
135
136
```scala { .api }
137
class GroupedDataSet[T] {
138
def maxBy(fields: Int*): DataSet[T]
139
def maxBy(firstField: String, otherFields: String*): DataSet[T]
140
def minBy(fields: Int*): DataSet[T]
141
def minBy(firstField: String, otherFields: String*): DataSet[T]
142
}
143
```
144
145
```scala
146
val employeeData = env.fromElements(
147
("Engineering", "Alice", 95000),
148
("Engineering", "Bob", 85000),
149
("Sales", "Charlie", 75000),
150
("Sales", "David", 80000)
151
)
152
153
// Get highest paid employee per department
154
val highestPaid = employeeData
155
.groupBy(0) // Group by department
156
.maxBy(2) // Max by salary
157
158
// Get lowest paid employee per department
159
val lowestPaid = employeeData
160
.groupBy(0)
161
.minBy(2)
162
163
// MaxBy with multiple fields (for tie-breaking)
164
val employeesByName = env.fromElements(
165
("Dept1", "Alice", 80000, 5), // (dept, name, salary, years)
166
("Dept1", "Bob", 80000, 3),
167
("Dept2", "Charlie", 75000, 7)
168
)
169
170
// Max by salary, then by years of experience
171
val seniorHighestPaid = employeesByName
172
.groupBy(0)
173
.maxBy(2, 3) // Max by salary, then by years
174
```
175
176
## Reduce Operations
177
178
### Simple Reduce
179
180
```scala { .api }
181
class GroupedDataSet[T] {
182
def reduce(fun: (T, T) => T): DataSet[T]
183
def reduce(reducer: ReduceFunction[T]): DataSet[T]
184
}
185
```
186
187
```scala
188
val numbers = env.fromElements(
189
("A", 1), ("A", 2), ("A", 3),
190
("B", 4), ("B", 5)
191
)
192
193
// Sum numbers within each group
194
val groupSum = numbers
195
.groupBy(0)
196
.reduce((a, b) => (a._1, a._2 + b._2))
197
198
// Find maximum within each group
199
val groupMax = numbers
200
.groupBy(0)
201
.reduce((a, b) => if (a._2 > b._2) a else b)
202
```
203
204
### Group Reduce
205
206
```scala { .api }
207
class GroupedDataSet[T] {
208
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
209
def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
210
def reduceGroup[R: TypeInformation: ClassTag](groupReducer: GroupReduceFunction[T, R]): DataSet[R]
211
}
212
```
213
214
```scala
215
val studentGrades = env.fromElements(
216
("Math", "Alice", 85),
217
("Math", "Bob", 90),
218
("Math", "Charlie", 78),
219
("Science", "Alice", 92),
220
("Science", "Bob", 88)
221
)
222
223
// Calculate statistics per subject
224
val subjectStats = studentGrades
225
.groupBy(0) // Group by subject
226
.reduceGroup { iterator =>
227
val grades = iterator.map(_._3).toList
228
val subject = grades.headOption.map(_ => iterator.next()._1).getOrElse("Unknown")
229
val avg = grades.sum.toDouble / grades.length
230
val max = grades.max
231
val min = grades.min
232
val count = grades.length
233
(subject, avg, max, min, count)
234
}
235
236
// Multiple output per group
237
val gradeRanges = studentGrades
238
.groupBy(0)
239
.reduceGroup { (iterator, collector) =>
240
val gradesList = iterator.toList
241
val subject = gradesList.head._1
242
243
gradesList.foreach { case (_, student, grade) =>
244
val range = grade match {
245
case g if g >= 90 => "A"
246
case g if g >= 80 => "B"
247
case g if g >= 70 => "C"
248
case _ => "D"
249
}
250
collector.collect((subject, student, grade, range))
251
}
252
}
253
```
254
255
### Combine Group
256
257
```scala { .api }
258
class GroupedDataSet[T] {
259
def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
260
def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
261
}
262
```
263
264
```scala
265
val salesRecords = env.fromElements(
266
("US", 100), ("US", 200), ("US", 150),
267
("EU", 120), ("EU", 180), ("EU", 90)
268
)
269
270
// Combine groups efficiently (pre-aggregation)
271
val combinedSales = salesRecords
272
.groupBy(0)
273
.combineGroup { (iterator, collector) =>
274
val records = iterator.toList
275
val region = records.head._1
276
val totalSales = records.map(_._2).sum
277
val recordCount = records.length
278
collector.collect((region, totalSales, recordCount))
279
}
280
```
281
282
## First Operations
283
284
```scala { .api }
285
class GroupedDataSet[T] {
286
def first(n: Int): DataSet[T]
287
}
288
```
289
290
```scala
291
val timestampedData = env.fromElements(
292
("US", "2023-01-01", 100),
293
("US", "2023-01-02", 150),
294
("US", "2023-01-03", 200),
295
("EU", "2023-01-01", 120),
296
("EU", "2023-01-02", 180)
297
)
298
299
// Get first 2 records per region
300
val firstTwo = timestampedData
301
.groupBy(0) // Group by region
302
.first(2)
303
304
// Get first record per region after sorting
305
val latestPerRegion = timestampedData
306
.groupBy(0)
307
.sortGroup(1, Order.DESCENDING) // Sort by date descending
308
.first(1) // Get latest (first after desc sort)
309
```
310
311
## Window Operations on Groups
312
313
While DataSet API doesn't have built-in windowing, you can simulate windowing using groupBy and custom logic:
314
315
```scala
316
case class Event(userId: String, eventType: String, timestamp: Long, value: Double)
317
318
val events = env.fromElements(
319
Event("user1", "click", 1000, 1.0),
320
Event("user1", "click", 2000, 1.0),
321
Event("user1", "purchase", 3000, 10.0),
322
Event("user2", "click", 1500, 1.0),
323
Event("user2", "purchase", 4000, 15.0)
324
)
325
326
// Group by user and time window (using truncated timestamp)
327
val windowSize = 3000L // 3 second windows
328
val windowedEvents = events
329
.map(e => (e.userId, e.timestamp / windowSize, e)) // Add window key
330
.groupBy(e => (e._1, e._2)) // Group by user and window
331
.reduceGroup { iterator =>
332
val eventsInWindow = iterator.map(_._3).toList
333
val userId = eventsInWindow.head.userId
334
val windowStart = eventsInWindow.head.timestamp / windowSize * windowSize
335
val totalValue = eventsInWindow.map(_.value).sum
336
val eventCount = eventsInWindow.length
337
(userId, windowStart, totalValue, eventCount)
338
}
339
```
340
341
## Types
342
343
```scala { .api }
344
import org.apache.flink.api.common.functions.{
345
ReduceFunction,
346
GroupReduceFunction,
347
GroupCombineFunction
348
}
349
import org.apache.flink.api.common.operators.Order
350
import org.apache.flink.util.Collector
351
352
// Function interfaces
353
trait ReduceFunction[T] extends Function {
354
def reduce(value1: T, value2: T): T
355
}
356
357
trait GroupReduceFunction[IN, OUT] extends Function {
358
def reduce(values: java.lang.Iterable[IN], out: Collector[OUT]): Unit
359
}
360
361
trait GroupCombineFunction[IN, OUT] extends Function {
362
def combine(values: java.lang.Iterable[IN], out: Collector[OUT]): Unit
363
}
364
365
// Aggregation types
366
object Aggregations extends Enumeration {
367
val SUM, MAX, MIN = Value
368
}
369
370
// Chained aggregations
371
class AggregateDataSet[T] {
372
def and(agg: Aggregations, field: Int): AggregateDataSet[T]
373
def and(agg: Aggregations, field: String): AggregateDataSet[T]
374
def andSum(field: Int): AggregateDataSet[T]
375
def andSum(field: String): AggregateDataSet[T]
376
def andMax(field: Int): AggregateDataSet[T]
377
def andMax(field: String): AggregateDataSet[T]
378
def andMin(field: Int): AggregateDataSet[T]
379
def andMin(field: String): AggregateDataSet[T]
380
}
381
382
// Order enumeration
383
object Order extends Enumeration {
384
val ASCENDING, DESCENDING = Value
385
}
386
```
387
388
## Performance Considerations
389
390
### Efficient Grouping
391
392
```scala
393
// Prefer groupBy with key functions over field names when possible
394
case class Record(key: String, value: Int)
395
val data: DataSet[Record] = // ...
396
397
// Efficient - direct key extraction
398
val grouped1 = data.groupBy(_.key)
399
400
// Less efficient - reflection-based field access
401
val grouped2 = data.groupBy("key")
402
```
403
404
### Combining vs Reducing
405
406
```scala
407
// Use combineGroup for operations that can be pre-aggregated
408
val largeSales = env.fromElements((1 to 1000000).map(i => (s"region${i % 10}", i)): _*)
409
410
// Efficient - pre-aggregates locally before sending over network
411
val combined = largeSales
412
.groupBy(0)
413
.combineGroup { (iterator, collector) =>
414
val (region, values) = iterator.toList.unzip
415
collector.collect((region.head, values.sum))
416
}
417
418
// Less efficient for large datasets - no pre-aggregation
419
val reduced = largeSales
420
.groupBy(0)
421
.reduce((a, b) => (a._1, a._2 + b._2))
422
```
423
424
### Sorting Optimization
425
426
```scala
427
// Sort within groups only when necessary
428
val transactions = env.fromElements(
429
("US", 100, "2023-01-01"),
430
("US", 200, "2023-01-02")
431
)
432
433
// If you only need the first N elements, sort only for that
434
val topTransactions = transactions
435
.groupBy(0)
436
.sortGroup(1, Order.DESCENDING)
437
.first(5) // Only sort enough to get top 5
438
439
// If you need all elements sorted, this is fine
440
val allSorted = transactions
441
.groupBy(0)
442
.sortGroup(1, Order.DESCENDING)
443
.reduceGroup(identity) // Process all sorted elements
444
```