0
# Grouping and Aggregation
1
2
Group-wise operations and aggregation functions for analyzing and summarizing grouped data. These operations are essential for data analytics and reporting workflows.
3
4
## Capabilities
5
6
### DataSet Grouping
7
8
Create grouped DataSets for group-wise operations using various key selection strategies.
9
10
```scala { .api }
11
class DataSet[T] {
12
/**
13
* Groups elements by key selector function
14
* @param fun Key selector function
15
* @return GroupedDataSet for group-wise operations
16
*/
17
def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
18
19
/**
20
* Groups elements by field positions
21
* @param fields Field positions to group by
22
* @return GroupedDataSet for group-wise operations
23
*/
24
def groupBy(fields: Int*): GroupedDataSet[T]
25
26
/**
27
* Groups elements by field names
28
* @param firstField First field name
29
* @param otherFields Additional field names
30
* @return GroupedDataSet for group-wise operations
31
*/
32
def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]
33
}
34
```
35
36
**Usage Examples:**
37
38
```scala
39
import org.apache.flink.api.scala._
40
41
case class Sale(product: String, category: String, amount: Double, date: String)
42
43
val env = ExecutionEnvironment.getExecutionEnvironment
44
val sales = env.fromElements(
45
Sale("Laptop", "Electronics", 999.99, "2023-01-15"),
46
Sale("Phone", "Electronics", 599.99, "2023-01-16"),
47
Sale("Chair", "Furniture", 149.99, "2023-01-15"),
48
Sale("Desk", "Furniture", 299.99, "2023-01-16")
49
)
50
51
// Group by category
52
val byCategory = sales.groupBy(_.category)
53
54
// Group by multiple fields
55
val byCategoryAndDate = sales.groupBy(s => (s.category, s.date))
56
57
// Group by field positions (for tuples/case classes)
58
val salesTuples = sales.map(s => (s.category, s.amount))
59
val groupedTuples = salesTuples.groupBy(0) // Group by first field
60
```
61
62
### Group Reductions
63
64
Apply reduction operations within each group to combine elements.
65
66
```scala { .api }
67
class GroupedDataSet[T] {
68
/**
69
* Reduces elements within each group using a combining function
70
* @param fun Binary combining function
71
* @return DataSet with one reduced element per group
72
*/
73
def reduce(fun: (T, T) => T): DataSet[T]
74
75
/**
76
* Reduces with combine hint for optimization
77
* @param fun Binary combining function
78
* @param strategy Combine strategy hint
79
* @return DataSet with reduced elements per group
80
*/
81
def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T]
82
83
/**
84
* Reduces using a ReduceFunction
85
* @param reducer ReduceFunction implementation
86
* @return DataSet with reduced elements
87
*/
88
def reduce(reducer: ReduceFunction[T]): DataSet[T]
89
}
90
```
91
92
### Group Processing
93
94
Process entire groups using iterators for complex group-wise computations.
95
96
```scala { .api }
97
class GroupedDataSet[T] {
98
/**
99
* Processes each group using an iterator
100
* @param fun Function processing group iterator to produce result
101
* @return DataSet with group processing results
102
*/
103
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
104
105
/**
106
* Processes groups using iterator and collector
107
* @param fun Function with group iterator and result collector
108
* @return DataSet with collected group results
109
*/
110
def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
111
112
/**
113
* Applies GroupReduceFunction to process groups
114
* @param reducer GroupReduceFunction implementation
115
* @return DataSet with group reduction results
116
*/
117
def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
118
}
119
```
120
121
**Usage Examples:**
122
123
```scala
124
// Calculate total sales per category
125
val categoryTotals = sales
126
.groupBy(_.category)
127
.reduceGroup(_.map(_.amount).sum)
128
129
// Complex group processing
130
case class CategoryStats(category: String, totalAmount: Double, count: Int, avgAmount: Double)
131
132
val categoryStats = sales
133
.groupBy(_.category)
134
.reduceGroup { salesInCategory =>
135
val salesList = salesInCategory.toList
136
val total = salesList.map(_.amount).sum
137
val count = salesList.length
138
CategoryStats(salesList.head.category, total, count, total / count)
139
}
140
```
141
142
### Group Combining
143
144
Pre-aggregate elements within partitions before final grouping for better performance.
145
146
```scala { .api }
147
class GroupedDataSet[T] {
148
/**
149
* Combines elements within partitions before final grouping
150
* @param fun Function for partition-wise combining with collector
151
* @return DataSet with partition-combined results
152
*/
153
def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
154
155
/**
156
* Applies GroupCombineFunction for partition-wise combining
157
* @param combiner GroupCombineFunction implementation
158
* @return DataSet with combined results
159
*/
160
def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
161
}
162
```
163
164
### Built-in Aggregations
165
166
Convenient aggregation functions for common operations like sum, min, max.
167
168
```scala { .api }
169
class GroupedDataSet[T] {
170
/**
171
* Sums values in specified field across each group
172
* @param field Field position to sum
173
* @return AggregateDataSet for chaining additional aggregations
174
*/
175
def sum(field: Int): AggregateDataSet[T]
176
177
/**
178
* Sums values in named field across each group
179
* @param field Field name to sum
180
* @return AggregateDataSet for chaining additional aggregations
181
*/
182
def sum(field: String): AggregateDataSet[T]
183
184
/**
185
* Finds maximum values in specified field across each group
186
* @param field Field position for maximum
187
* @return AggregateDataSet for chaining additional aggregations
188
*/
189
def max(field: Int): AggregateDataSet[T]
190
191
/**
192
* Finds maximum values in named field across each group
193
* @param field Field name for maximum
194
* @return AggregateDataSet for chaining additional aggregations
195
*/
196
def max(field: String): AggregateDataSet[T]
197
198
/**
199
* Finds minimum values in specified field across each group
200
* @param field Field position for minimum
201
* @return AggregateDataSet for chaining additional aggregations
202
*/
203
def min(field: Int): AggregateDataSet[T]
204
205
/**
206
* Finds minimum values in named field across each group
207
* @param field Field name for minimum
208
* @return AggregateDataSet for chaining additional aggregations
209
*/
210
def min(field: String): AggregateDataSet[T]
211
212
/**
213
* Applies specified aggregation to field across each group
214
* @param agg Aggregation type (SUM, MAX, MIN)
215
* @param field Field position for aggregation
216
* @return AggregateDataSet for chaining additional aggregations
217
*/
218
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
219
220
/**
221
* Applies specified aggregation to named field across each group
222
* @param agg Aggregation type (SUM, MAX, MIN)
223
* @param field Field name for aggregation
224
* @return AggregateDataSet for chaining additional aggregations
225
*/
226
def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
227
}
228
```
229
230
### Element Selection
231
232
Select specific elements from each group based on field values.
233
234
```scala { .api }
235
class GroupedDataSet[T] {
236
/**
237
* Selects elements with minimum values in specified fields from each group
238
* @param fields Field positions for minimum comparison
239
* @return DataSet with minimum elements per group
240
*/
241
def minBy(fields: Int*): DataSet[T]
242
243
/**
244
* Selects elements with maximum values in specified fields from each group
245
* @param fields Field positions for maximum comparison
246
* @return DataSet with maximum elements per group
247
*/
248
def maxBy(fields: Int*): DataSet[T]
249
250
/**
251
* Selects first n elements from each group
252
* @param n Number of elements to select per group
253
* @return DataSet with first n elements per group
254
*/
255
def first(n: Int): DataSet[T]
256
}
257
```
258
259
**Usage Examples:**
260
261
```scala
262
// Find highest sale in each category
263
val highestSalePerCategory = sales
264
.groupBy(_.category)
265
.maxBy("amount")
266
267
// Get top 2 sales per category
268
val top2PerCategory = sales
269
.groupBy(_.category)
270
.first(2)
271
```
272
273
### Group Sorting
274
275
Sort elements within each group before processing.
276
277
```scala { .api }
278
class GroupedDataSet[T] {
279
/**
280
* Sorts elements within each group by field position
281
* @param field Field position for sorting
282
* @param order Sort order (ASCENDING or DESCENDING)
283
* @return Sorted GroupedDataSet
284
*/
285
def sortGroup(field: Int, order: Order): GroupedDataSet[T]
286
287
/**
288
* Sorts elements within each group by field name
289
* @param field Field name for sorting
290
* @param order Sort order (ASCENDING or DESCENDING)
291
* @return Sorted GroupedDataSet
292
*/
293
def sortGroup(field: String, order: Order): GroupedDataSet[T]
294
295
/**
296
* Sorts elements within each group using key selector
297
* @param fun Key selector function for sorting
298
* @param order Sort order (ASCENDING or DESCENDING)
299
* @return Sorted GroupedDataSet
300
*/
301
def sortGroup[K: TypeInformation](fun: T => K, order: Order): GroupedDataSet[T]
302
}
303
```
304
305
### Custom Partitioning
306
307
Control how groups are distributed across cluster nodes.
308
309
```scala { .api }
310
class GroupedDataSet[T] {
311
/**
312
* Uses custom partitioner for group distribution
313
* @param partitioner Custom partitioner implementation
314
* @return GroupedDataSet with custom partitioning
315
*/
316
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): GroupedDataSet[T]
317
}
318
```
319
320
## Chained Aggregations
321
322
### AggregateDataSet Operations
323
324
Chain multiple aggregation operations for comprehensive analytics.
325
326
```scala { .api }
327
class AggregateDataSet[T] {
328
/**
329
* Adds additional aggregation to the chain
330
* @param agg Aggregation type
331
* @param field Field position for aggregation
332
* @return AggregateDataSet for further chaining
333
*/
334
def and(agg: Aggregations, field: Int): AggregateDataSet[T]
335
336
/**
337
* Adds additional aggregation to the chain by field name
338
* @param agg Aggregation type
339
* @param field Field name for aggregation
340
* @return AggregateDataSet for further chaining
341
*/
342
def and(agg: Aggregations, field: String): AggregateDataSet[T]
343
344
/**
345
* Adds sum aggregation to the chain
346
* @param field Field position to sum
347
* @return AggregateDataSet for further chaining
348
*/
349
def andSum(field: Int): AggregateDataSet[T]
350
351
/**
352
* Adds maximum aggregation to the chain
353
* @param field Field position for maximum
354
* @return AggregateDataSet for further chaining
355
*/
356
def andMax(field: Int): AggregateDataSet[T]
357
358
/**
359
* Adds minimum aggregation to the chain
360
* @param field Field position for minimum
361
* @return AggregateDataSet for further chaining
362
*/
363
def andMin(field: Int): AggregateDataSet[T]
364
365
/**
366
* Adds sum aggregation to the chain by field name
367
* @param field Field name to sum
368
* @return AggregateDataSet for further chaining
369
*/
370
def andSum(field: String): AggregateDataSet[T]
371
372
/**
373
* Adds maximum aggregation to the chain by field name
374
* @param field Field name for maximum
375
* @return AggregateDataSet for further chaining
376
*/
377
def andMax(field: String): AggregateDataSet[T]
378
379
/**
380
* Adds minimum aggregation to the chain by field name
381
* @param field Field name for minimum
382
* @return AggregateDataSet for further chaining
383
*/
384
def andMin(field: String): AggregateDataSet[T]
385
}
386
```
387
388
**Usage Examples:**
389
390
```scala
391
// Multiple aggregations on grouped data
392
case class SalesRecord(product: String, region: String, quantity: Int, revenue: Double)
393
394
val salesData = env.fromElements(
395
SalesRecord("Laptop", "North", 10, 9999.90),
396
SalesRecord("Phone", "North", 25, 14999.75),
397
SalesRecord("Laptop", "South", 15, 14999.85)
398
)
399
400
// Sum both quantity and revenue by region
401
val regionStats = salesData
402
.groupBy(_.region)
403
.sum("quantity")
404
.andSum("revenue")
405
```
406
407
## Types
408
409
```scala { .api }
410
class GroupedDataSet[T] {
411
// Inherits from DataSet[T] and adds grouping-specific operations
412
}
413
414
class AggregateDataSet[T] extends DataSet[T] {
415
// Represents result of aggregation operations with chaining capabilities
416
}
417
418
sealed trait CombineHint
419
object CombineHint {
420
case object HASH extends CombineHint
421
case object SORT extends CombineHint
422
}
423
424
object Aggregations extends Enumeration {
425
type Aggregations = Value
426
val SUM, MAX, MIN = Value
427
}
428
429
sealed trait Order
430
object Order {
431
case object ASCENDING extends Order
432
case object DESCENDING extends Order
433
}
434
435
abstract class Partitioner[T] {
436
def partition(key: T, numPartitions: Int): Int
437
}
438
```