0
# Aggregation and Grouping
1
2
Built-in aggregation functions and grouping operations for statistical computations and data summarization. These operations enable efficient computation of aggregates like sum, min, max on grouped data.
3
4
## Capabilities
5
6
### Grouping Operations
7
8
Group DataSet elements by key fields or key selector functions to enable aggregation operations.
9
10
```java { .api }
11
/**
12
* Group elements by field positions (for Tuple types)
13
* @param fields the field positions to group by
14
* @return UnsortedGrouping for aggregation operations
15
*/
16
public UnsortedGrouping<T> groupBy(int... fields);
17
18
/**
19
* Group elements by field names (for POJO types)
20
* @param fields the field names to group by
21
* @return UnsortedGrouping for aggregation operations
22
*/
23
public UnsortedGrouping<T> groupBy(String... fields);
24
25
/**
26
* Group elements by key selector function
27
* @param keyExtractor function to extract the grouping key
28
* @return UnsortedGrouping for aggregation operations
29
*/
30
public <K> UnsortedGrouping<T> groupBy(KeySelector<T, K> keyExtractor);
31
```
32
33
**Usage Examples:**
34
35
```java
36
// Group by field position (for Tuples)
37
DataSet<Tuple3<String, String, Integer>> sales = env.fromElements(
38
new Tuple3<>("Product A", "Region 1", 100),
39
new Tuple3<>("Product A", "Region 2", 150),
40
new Tuple3<>("Product B", "Region 1", 200)
41
);
42
43
// Group by product (field 0)
44
UnsortedGrouping<Tuple3<String, String, Integer>> byProduct = sales.groupBy(0);
45
46
// Group by product and region (fields 0 and 1)
47
UnsortedGrouping<Tuple3<String, String, Integer>> byProductRegion = sales.groupBy(0, 1);
48
49
// Group by key selector
50
DataSet<Person> people = getPersonDataSet();
51
UnsortedGrouping<Person> byAge = people.groupBy(person -> person.age);
52
```
53
54
### UnsortedGrouping Operations
55
56
Operations available on unsorted groupings for aggregation and reduction.
57
58
```java { .api }
59
/**
60
* Apply built-in aggregation function
61
* @param agg the aggregation function (SUM, MIN, MAX)
62
* @param field the field position to aggregate (for Tuple types only)
63
* @return AggregateOperator with aggregation result
64
*/
65
public AggregateOperator<T> aggregate(Aggregations agg, int field);
66
67
/**
68
* Sum aggregation on specified field position
69
* @param field the field position to sum (for Tuple types only)
70
* @return AggregateOperator with sum result
71
*/
72
public AggregateOperator<T> sum(int field);
73
74
/**
75
* Minimum aggregation on specified field position
76
* @param field the field position to find minimum (for Tuple types only)
77
* @return AggregateOperator with minimum result
78
*/
79
public AggregateOperator<T> min(int field);
80
81
/**
82
* Maximum aggregation on specified field position
83
* @param field the field position to find maximum (for Tuple types only)
84
* @return AggregateOperator with maximum result
85
*/
86
public AggregateOperator<T> max(int field);
87
```
88
89
**Usage Examples:**
90
91
```java
92
// Sum sales by product
93
DataSet<Tuple3<String, String, Integer>> totalSales = sales
94
.groupBy(0) // group by product
95
.sum(2); // sum the sales amount (field 2)
96
97
// Multiple aggregations
98
DataSet<Tuple3<String, String, Integer>> minSales = sales
99
.groupBy(0)
100
.min(2);
101
102
DataSet<Tuple3<String, String, Integer>> maxSales = sales
103
.groupBy(0)
104
.max(2);
105
106
// Note: Aggregation methods only work with field positions for Tuple types
107
// For POJO types, use custom reduce functions instead
108
```
109
110
### Custom Reduce Operations
111
112
Apply custom reduction functions to grouped data.
113
114
```java { .api }
115
/**
116
* Apply reduce function to each group
117
* @param reducer the reduce function to apply
118
* @return ReduceOperator with reduction result
119
*/
120
public ReduceOperator<T> reduce(ReduceFunction<T> reducer);
121
122
/**
123
* Apply group reduce function to each group
124
* @param reducer the group reduce function to apply
125
* @return GroupReduceOperator with group reduction result
126
*/
127
public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer);
128
129
/**
130
* Apply combine function to each group (for pre-aggregation)
131
* @param combiner the group combine function to apply
132
* @return GroupCombineOperator with combine result
133
*/
134
public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner);
135
136
/**
137
* Select elements with minimum value for specified fields
138
* @param fields the field positions to compare
139
* @return ReduceOperator with minimum elements
140
*/
141
public ReduceOperator<T> minBy(int... fields);
142
143
/**
144
* Select elements with maximum value for specified fields
145
* @param fields the field positions to compare
146
* @return ReduceOperator with maximum elements
147
*/
148
public ReduceOperator<T> maxBy(int... fields);
149
150
/**
151
* Get first n elements from each group
152
* @param n number of elements to select from each group
153
* @return GroupReduceOperator with first n elements
154
*/
155
public GroupReduceOperator<T, T> first(int n);
156
```
157
158
**Usage Examples:**
159
160
```java
161
// Custom reduce function
162
DataSet<Tuple2<String, Integer>> wordCounts = words
163
.groupBy(0)
164
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
165
@Override
166
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) {
167
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
168
}
169
});
170
171
// Group reduce for more complex aggregations
172
DataSet<Tuple2<String, String>> concatenated = sales
173
.groupBy(0)
174
.reduceGroup(new GroupReduceFunction<Tuple3<String, String, Integer>, Tuple2<String, String>>() {
175
@Override
176
public void reduce(Iterable<Tuple3<String, String, Integer>> values,
177
Collector<Tuple2<String, String>> out) {
178
String product = null;
179
StringBuilder regions = new StringBuilder();
180
181
for (Tuple3<String, String, Integer> value : values) {
182
if (product == null) product = value.f0;
183
regions.append(value.f1).append(",");
184
}
185
186
out.collect(new Tuple2<>(product, regions.toString()));
187
}
188
});
189
```
190
191
### Sorted Grouping
192
193
Sort groups by specific fields for ordered processing.
194
195
```java { .api }
196
/**
197
* Sort group by specified field and order
198
* @param field the field to sort by
199
* @param order the sort order (ASCENDING or DESCENDING)
200
* @return SortedGrouping for ordered group operations
201
*/
202
public SortedGrouping<T> sortGroup(int field, Order order);
203
204
/**
205
* Sort group by field name and order
206
* @param field the field name to sort by
207
* @param order the sort order (ASCENDING or DESCENDING)
208
* @return SortedGrouping for ordered group operations
209
*/
210
public SortedGrouping<T> sortGroup(String field, Order order);
211
```
212
213
**Usage Examples:**
214
215
```java
216
// Sort sales within each product group by amount (descending)
217
DataSet<Tuple3<String, String, Integer>> sortedSales = sales
218
.groupBy(0) // group by product
219
.sortGroup(2, Order.DESCENDING) // sort by sales amount descending
220
.reduceGroup(new GroupReduceFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>>() {
221
@Override
222
public void reduce(Iterable<Tuple3<String, String, Integer>> values,
223
Collector<Tuple3<String, String, Integer>> out) {
224
// Process sorted values - first value is the highest sale
225
Iterator<Tuple3<String, String, Integer>> iter = values.iterator();
226
if (iter.hasNext()) {
227
out.collect(iter.next()); // emit only the highest sale per product
228
}
229
}
230
});
231
```
232
233
### Aggregation Types
234
235
Built-in aggregation functions available for numeric fields.
236
237
```java { .api }
238
/**
239
* Enumeration of built-in aggregation functions
240
*/
241
public enum Aggregations {
242
/** Sum aggregation */
243
SUM,
244
/** Minimum aggregation */
245
MIN,
246
/** Maximum aggregation */
247
MAX
248
}
249
```
250
251
### Sort Orders
252
253
Sort order options for sorted grouping operations.
254
255
```java { .api }
256
/**
257
* Sort order enumeration
258
*/
259
public enum Order {
260
/** Ascending order (1, 2, 3, ...) */
261
ASCENDING,
262
/** Descending order (..., 3, 2, 1) */
263
DESCENDING
264
}
265
```
266
267
### Function Interfaces
268
269
Interfaces for custom aggregation and reduction functions.
270
271
```java { .api }
272
/**
273
* Interface for reduce functions
274
* @param <T> the type of elements to reduce
275
*/
276
public interface ReduceFunction<T> extends Function, Serializable {
277
/**
278
* Reduce two values to one
279
* @param value1 first value
280
* @param value2 second value
281
* @return reduced value
282
*/
283
T reduce(T value1, T value2) throws Exception;
284
}
285
286
/**
287
* Interface for group reduce functions
288
* @param <IN> input element type
289
* @param <OUT> output element type
290
*/
291
public interface GroupReduceFunction<IN, OUT> extends Function, Serializable {
292
/**
293
* Reduce a group of values
294
* @param values iterable of input values
295
* @param out collector for output values
296
*/
297
void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception;
298
}
299
300
/**
301
* Interface for group combine functions (for pre-aggregation)
302
* @param <IN> input element type
303
* @param <OUT> output element type
304
*/
305
public interface GroupCombineFunction<IN, OUT> extends Function, Serializable {
306
/**
307
* Combine a group of values (partial aggregation)
308
* @param values iterable of input values
309
* @param out collector for output values
310
*/
311
void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
312
}
313
314
/**
315
* Rich versions with access to runtime context
316
*/
317
public abstract class RichReduceFunction<T> extends AbstractRichFunction implements ReduceFunction<T> {}
318
public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT> {}
319
public abstract class RichGroupCombineFunction<IN, OUT> extends AbstractRichFunction implements GroupCombineFunction<IN, OUT> {}
320
```
321
322
### Exception Handling
323
324
Exception types related to aggregation operations.
325
326
```java { .api }
327
/**
328
* Exception thrown when aggregation is applied to unsupported type
329
*/
330
public class UnsupportedAggregationTypeException extends RuntimeException {
331
/**
332
* Create exception with message
333
* @param message error message
334
*/
335
public UnsupportedAggregationTypeException(String message);
336
}
337
```
338
339
**Usage Examples for Exception Handling:**
340
341
```java
342
try {
343
// This might throw UnsupportedAggregationTypeException
344
// if trying to sum non-numeric fields
345
DataSet<Tuple2<String, String>> result = stringData
346
.groupBy(0)
347
.sum(1); // Error: cannot sum String field
348
349
} catch (UnsupportedAggregationTypeException e) {
350
System.err.println("Cannot perform aggregation: " + e.getMessage());
351
}
352
```
353
354
## Types
355
356
```java { .api }
357
import org.apache.flink.api.java.operators.UnsortedGrouping;
358
import org.apache.flink.api.java.operators.SortedGrouping;
359
import org.apache.flink.api.java.operators.AggregateOperator;
360
import org.apache.flink.api.java.operators.ReduceOperator;
361
import org.apache.flink.api.java.operators.GroupReduceOperator;
362
import org.apache.flink.api.java.operators.GroupCombineOperator;
363
import org.apache.flink.api.java.aggregation.Aggregations;
364
import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException;
365
import org.apache.flink.api.common.functions.ReduceFunction;
366
import org.apache.flink.api.common.functions.GroupReduceFunction;
367
import org.apache.flink.api.common.functions.GroupCombineFunction;
368
import org.apache.flink.api.common.functions.RichReduceFunction;
369
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
370
import org.apache.flink.api.common.functions.RichGroupCombineFunction;
371
import org.apache.flink.api.common.operators.Order;
372
import org.apache.flink.util.Collector;
373
```