0
# Grouping and Aggregation
1
2
Apache Flink Scala API provides powerful grouping and aggregation operations with type-safe field access and functional programming patterns. Data can be grouped by keys and then aggregated using built-in functions or custom reduce operations.
3
4
## Grouping Operations
5
6
### GroupBy
7
8
Group elements by key for subsequent aggregation operations.
9
10
```scala { .api }
11
class DataSet[T] {
12
// Group by key function
13
def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
14
15
// Group by field positions (for tuples/case classes)
16
def groupBy(fields: Int*): GroupedDataSet[T]
17
18
// Group by field names (for case classes)
19
def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]
20
}
21
```
22
23
## Aggregation Operations
24
25
### Basic Aggregations
26
27
Perform common aggregation operations on grouped data.
28
29
```scala { .api }
30
class GroupedDataSet[T] {
31
// Sum values by field
32
def sum(field: Int): DataSet[T]
33
def sum(field: String): DataSet[T]
34
35
// Find maximum values by field
36
def max(field: Int): DataSet[T]
37
def max(field: String): DataSet[T]
38
39
// Find minimum values by field
40
def min(field: Int): DataSet[T]
41
def min(field: String): DataSet[T]
42
43
// Find records with maximum field values
44
def maxBy(fields: Int*): DataSet[T]
45
def maxBy(firstField: String, otherFields: String*): DataSet[T]
46
47
// Find records with minimum field values
48
def minBy(fields: Int*): DataSet[T]
49
def minBy(firstField: String, otherFields: String*): DataSet[T]
50
}
51
```
52
53
### Generic Aggregation
54
55
```scala { .api }
56
class GroupedDataSet[T] {
57
// Generic aggregation with Aggregations enum
58
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
59
def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
60
61
// Chain multiple aggregations
62
class AggregateDataSet[T] {
63
def and(agg: Aggregations, field: Int): AggregateDataSet[T]
64
def and(agg: Aggregations, field: String): AggregateDataSet[T]
65
}
66
}
67
68
// Direct aggregation on ungrouped DataSet
69
class DataSet[T] {
70
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
71
def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
72
}
73
```
74
75
### Partitioning Configuration
76
77
```scala { .api }
78
class GroupedDataSet[T] {
79
// Configure custom partitioner for grouping
80
def withPartitioner[K : TypeInformation](partitioner: Partitioner[K]): GroupedDataSet[T]
81
}
82
```
83
84
## Reduce Operations
85
86
### Basic Reduce
87
88
```scala { .api }
89
class GroupedDataSet[T] {
90
// Reduce groups using function
91
def reduce(fun: (T, T) => T): DataSet[T]
92
93
// Reduce groups using ReduceFunction
94
def reduce(reducer: ReduceFunction[T]): DataSet[T]
95
}
96
```
97
98
### Group Reduce
99
100
Transform entire groups at once, providing access to all elements in each group.
101
102
```scala { .api }
103
class GroupedDataSet[T] {
104
// Process entire group with function returning single result
105
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
106
107
// Process entire group with function returning multiple results
108
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
109
110
// Process entire group with Collector for output
111
def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
112
113
// Process entire group with GroupReduceFunction
114
def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
115
}
116
117
// Direct group reduce on ungrouped DataSet
118
class DataSet[T] {
119
def aggregateGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
120
def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
121
}
122
```
123
124
### Combine Functions
125
126
Pre-aggregate data locally before shuffling for better performance.
127
128
```scala { .api }
129
class GroupedDataSet[T] {
130
// Combine with function returning single result
131
def combineGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
132
133
// Combine with function returning multiple results
134
def combineGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
135
136
// Combine with Collector for output
137
def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
138
139
// Combine with GroupCombineFunction
140
def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
141
}
142
```
143
144
## Sorted Grouping
145
146
### Sort Groups
147
148
Sort elements within each group before processing.
149
150
```scala { .api }
151
class GroupedDataSet[T] {
152
// Sort by key function
153
def sortGroup[K: TypeInformation](fun: T => K, order: Order): SortedGrouping[T]
154
155
// Sort by field position
156
def sortGroup(field: Int, order: Order): SortedGrouping[T]
157
158
// Sort by field name
159
def sortGroup(field: String, order: Order): SortedGrouping[T]
160
}
161
162
class SortedGrouping[T] {
163
// Chain additional sort keys
164
def sortGroup[K: TypeInformation](fun: T => K, order: Order): SortedGrouping[T]
165
def sortGroup(field: Int, order: Order): SortedGrouping[T]
166
def sortGroup(field: String, order: Order): SortedGrouping[T]
167
168
// Reduce sorted groups
169
def reduce(fun: (T, T) => T): DataSet[T]
170
def reduce(reducer: ReduceFunction[T]): DataSet[T]
171
def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
172
}
173
```
174
175
### First N Elements
176
177
Get the first n elements from each group.
178
179
```scala { .api }
180
class GroupedDataSet[T] {
181
def first(n: Int): DataSet[T]
182
}
183
```
184
185
## Usage Examples
186
187
### Basic Grouping and Aggregation
188
189
```scala
190
import org.apache.flink.api.scala._
191
192
case class Sale(product: String, region: String, amount: Double)
193
194
val env = ExecutionEnvironment.getExecutionEnvironment
195
val sales = env.fromElements(
196
Sale("laptop", "US", 1000.0),
197
Sale("phone", "US", 500.0),
198
Sale("laptop", "EU", 800.0),
199
Sale("phone", "EU", 450.0),
200
Sale("tablet", "US", 300.0)
201
)
202
203
// Group by product and sum amounts
204
val productSales = sales
205
.groupBy(_.product)
206
.sum("amount")
207
208
// Group by region and find max sale
209
val maxSalesByRegion = sales
210
.groupBy(_.region)
211
.maxBy("amount")
212
```
213
214
### Multiple Field Grouping
215
216
```scala
217
import org.apache.flink.api.scala._
218
219
case class Transaction(date: String, product: String, region: String, amount: Double)
220
221
val env = ExecutionEnvironment.getExecutionEnvironment
222
val transactions = env.fromElements(
223
Transaction("2023-01-01", "laptop", "US", 1000.0),
224
Transaction("2023-01-01", "laptop", "EU", 800.0),
225
Transaction("2023-01-02", "laptop", "US", 1200.0)
226
)
227
228
// Group by multiple fields
229
val dailyProductSales = transactions
230
.groupBy(t => (t.date, t.product))
231
.sum("amount")
232
233
// Group by field names
234
val regionProductSales = transactions
235
.groupBy("region", "product")
236
.aggregate(Aggregations.SUM, "amount")
237
```
238
239
### Custom Reduce Operations
240
241
```scala
242
import org.apache.flink.api.scala._
243
244
case class Employee(name: String, department: String, salary: Double)
245
246
val env = ExecutionEnvironment.getExecutionEnvironment
247
val employees = env.fromElements(
248
Employee("Alice", "Engineering", 80000),
249
Employee("Bob", "Engineering", 75000),
250
Employee("Charlie", "Sales", 60000),
251
Employee("Diana", "Sales", 65000)
252
)
253
254
// Find highest paid employee per department
255
val topEarners = employees
256
.groupBy(_.department)
257
.reduce((e1, e2) => if (e1.salary > e2.salary) e1 else e2)
258
259
// Calculate average salary per department
260
case class DeptAverage(department: String, avgSalary: Double, count: Int)
261
262
val avgSalaries = employees
263
.groupBy(_.department)
264
.reduceGroup { employees =>
265
val list = employees.toList
266
val dept = list.head.department
267
val total = list.map(_.salary).sum
268
val count = list.length
269
DeptAverage(dept, total / count, count)
270
}
271
```
272
273
### Group Reduce with Multiple Results
274
275
```scala
276
import org.apache.flink.api.scala._
277
278
case class Order(customer: String, product: String, quantity: Int, price: Double)
279
280
val env = ExecutionEnvironment.getExecutionEnvironment
281
val orders = env.fromElements(
282
Order("Alice", "laptop", 1, 1000.0),
283
Order("Alice", "mouse", 2, 20.0),
284
Order("Bob", "phone", 1, 500.0),
285
Order("Bob", "case", 1, 15.0)
286
)
287
288
// Create customer summary with multiple metrics
289
case class CustomerSummary(customer: String, totalSpent: Double, itemCount: Int, avgOrderValue: Double)
290
291
val customerSummaries = orders
292
.groupBy(_.customer)
293
.reduceGroup { orders =>
294
val orderList = orders.toList
295
val customer = orderList.head.customer
296
val totalSpent = orderList.map(o => o.quantity * o.price).sum
297
val itemCount = orderList.map(_.quantity).sum
298
val avgOrderValue = totalSpent / orderList.length
299
CustomerSummary(customer, totalSpent, itemCount, avgOrderValue)
300
}
301
```
302
303
### Sorted Grouping
304
305
```scala
306
import org.apache.flink.api.scala._
307
import org.apache.flink.api.common.operators.Order
308
309
case class Score(player: String, game: String, score: Int, timestamp: Long)
310
311
val env = ExecutionEnvironment.getExecutionEnvironment
312
val scores = env.fromElements(
313
Score("Alice", "game1", 100, 1000L),
314
Score("Alice", "game1", 150, 2000L),
315
Score("Alice", "game1", 120, 3000L),
316
Score("Bob", "game1", 90, 1500L),
317
Score("Bob", "game1", 110, 2500L)
318
)
319
320
// Get best score per player (latest timestamp in case of ties)
321
val bestScores = scores
322
.groupBy(_.player)
323
.sortGroup(_.score, Order.DESCENDING)
324
.sortGroup(_.timestamp, Order.DESCENDING)
325
.first(1)
326
327
// Get top 3 scores per player
328
val top3Scores = scores
329
.groupBy(_.player)
330
.sortGroup(_.score, Order.DESCENDING)
331
.first(3)
332
```
333
334
### Chained Aggregations
335
336
```scala
337
import org.apache.flink.api.scala._
338
import org.apache.flink.api.java.aggregation.Aggregations
339
340
case class Metrics(region: String, product: String, sales: Double, profit: Double, units: Int)
341
342
val env = ExecutionEnvironment.getExecutionEnvironment
343
val metrics = env.fromElements(
344
Metrics("US", "laptop", 1000.0, 200.0, 1),
345
Metrics("US", "phone", 500.0, 100.0, 1),
346
Metrics("EU", "laptop", 800.0, 150.0, 1)
347
)
348
349
// Aggregate multiple fields
350
val regionalSummary = metrics
351
.groupBy(_.region)
352
.aggregate(Aggregations.SUM, "sales")
353
.and(Aggregations.SUM, "profit")
354
.and(Aggregations.SUM, "units")
355
```