Apache Flink Java API - Core Java APIs for Apache Flink's batch and stream processing framework
—
Built-in aggregation functions and grouping operations for statistical computations and data summarization. These operations enable efficient computation of aggregates like sum, min, max on grouped data.
Group DataSet elements by key fields or key selector functions to enable aggregation operations.
/**
* Group elements by field positions (for Tuple types)
* @param fields the field positions to group by
* @return UnsortedGrouping for aggregation operations
*/
public UnsortedGrouping<T> groupBy(int... fields);
/**
* Group elements by field names (for POJO types)
* @param fields the field names to group by
* @return UnsortedGrouping for aggregation operations
*/
public UnsortedGrouping<T> groupBy(String... fields);
/**
* Group elements by key selector function
* @param keyExtractor function to extract the grouping key
* @return UnsortedGrouping for aggregation operations
*/
public <K> UnsortedGrouping<T> groupBy(KeySelector<T, K> keyExtractor);Usage Examples:
// Group by field position (for Tuples)
DataSet<Tuple3<String, String, Integer>> sales = env.fromElements(
new Tuple3<>("Product A", "Region 1", 100),
new Tuple3<>("Product A", "Region 2", 150),
new Tuple3<>("Product B", "Region 1", 200)
);
// Group by product (field 0)
UnsortedGrouping<Tuple3<String, String, Integer>> byProduct = sales.groupBy(0);
// Group by product and region (fields 0 and 1)
UnsortedGrouping<Tuple3<String, String, Integer>> byProductRegion = sales.groupBy(0, 1);
// Group by key selector
DataSet<Person> people = getPersonDataSet();
UnsortedGrouping<Person> byAge = people.groupBy(person -> person.age);Operations available on unsorted groupings for aggregation and reduction.
/**
* Apply built-in aggregation function
* @param agg the aggregation function (SUM, MIN, MAX)
* @param field the field position to aggregate (for Tuple types only)
* @return AggregateOperator with aggregation result
*/
public AggregateOperator<T> aggregate(Aggregations agg, int field);
/**
* Sum aggregation on specified field position
* @param field the field position to sum (for Tuple types only)
* @return AggregateOperator with sum result
*/
public AggregateOperator<T> sum(int field);
/**
* Minimum aggregation on specified field position
* @param field the field position to find minimum (for Tuple types only)
* @return AggregateOperator with minimum result
*/
public AggregateOperator<T> min(int field);
/**
* Maximum aggregation on specified field position
* @param field the field position to find maximum (for Tuple types only)
* @return AggregateOperator with maximum result
*/
public AggregateOperator<T> max(int field);Usage Examples:
// Sum sales by product
DataSet<Tuple3<String, String, Integer>> totalSales = sales
.groupBy(0) // group by product
.sum(2); // sum the sales amount (field 2)
// Multiple aggregations
DataSet<Tuple3<String, String, Integer>> minSales = sales
.groupBy(0)
.min(2);
DataSet<Tuple3<String, String, Integer>> maxSales = sales
.groupBy(0)
.max(2);
// Note: Aggregation methods only work with field positions for Tuple types
// For POJO types, use custom reduce functions insteadApply custom reduction functions to grouped data.
/**
* Apply reduce function to each group
* @param reducer the reduce function to apply
* @return ReduceOperator with reduction result
*/
public ReduceOperator<T> reduce(ReduceFunction<T> reducer);
/**
* Apply group reduce function to each group
* @param reducer the group reduce function to apply
* @return GroupReduceOperator with group reduction result
*/
public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer);
/**
* Apply combine function to each group (for pre-aggregation)
* @param combiner the group combine function to apply
* @return GroupCombineOperator with combine result
*/
public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner);
/**
* Select elements with minimum value for specified fields
* @param fields the field positions to compare
* @return ReduceOperator with minimum elements
*/
public ReduceOperator<T> minBy(int... fields);
/**
* Select elements with maximum value for specified fields
* @param fields the field positions to compare
* @return ReduceOperator with maximum elements
*/
public ReduceOperator<T> maxBy(int... fields);
/**
* Get first n elements from each group
* @param n number of elements to select from each group
* @return GroupReduceOperator with first n elements
*/
public GroupReduceOperator<T, T> first(int n);Usage Examples:
// Custom reduce function
DataSet<Tuple2<String, Integer>> wordCounts = words
.groupBy(0)
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
// Group reduce for more complex aggregations
DataSet<Tuple2<String, String>> concatenated = sales
.groupBy(0)
.reduceGroup(new GroupReduceFunction<Tuple3<String, String, Integer>, Tuple2<String, String>>() {
@Override
public void reduce(Iterable<Tuple3<String, String, Integer>> values,
Collector<Tuple2<String, String>> out) {
String product = null;
StringBuilder regions = new StringBuilder();
for (Tuple3<String, String, Integer> value : values) {
if (product == null) product = value.f0;
regions.append(value.f1).append(",");
}
out.collect(new Tuple2<>(product, regions.toString()));
}
});Sort groups by specific fields for ordered processing.
/**
* Sort group by specified field and order
* @param field the field to sort by
* @param order the sort order (ASCENDING or DESCENDING)
* @return SortedGrouping for ordered group operations
*/
public SortedGrouping<T> sortGroup(int field, Order order);
/**
* Sort group by field name and order
* @param field the field name to sort by
* @param order the sort order (ASCENDING or DESCENDING)
* @return SortedGrouping for ordered group operations
*/
public SortedGrouping<T> sortGroup(String field, Order order);Usage Examples:
// Sort sales within each product group by amount (descending)
DataSet<Tuple3<String, String, Integer>> sortedSales = sales
.groupBy(0) // group by product
.sortGroup(2, Order.DESCENDING) // sort by sales amount descending
.reduceGroup(new GroupReduceFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>>() {
@Override
public void reduce(Iterable<Tuple3<String, String, Integer>> values,
Collector<Tuple3<String, String, Integer>> out) {
// Process sorted values - first value is the highest sale
Iterator<Tuple3<String, String, Integer>> iter = values.iterator();
if (iter.hasNext()) {
out.collect(iter.next()); // emit only the highest sale per product
}
}
});Built-in aggregation functions available for numeric fields.
/**
* Enumeration of built-in aggregation functions
*/
public enum Aggregations {
/** Sum aggregation */
SUM,
/** Minimum aggregation */
MIN,
/** Maximum aggregation */
MAX
}Sort order options for sorted grouping operations.
/**
* Sort order enumeration
*/
public enum Order {
/** Ascending order (1, 2, 3, ...) */
ASCENDING,
/** Descending order (..., 3, 2, 1) */
DESCENDING
}Interfaces for custom aggregation and reduction functions.
/**
* Interface for reduce functions
* @param <T> the type of elements to reduce
*/
public interface ReduceFunction<T> extends Function, Serializable {
/**
* Reduce two values to one
* @param value1 first value
* @param value2 second value
* @return reduced value
*/
T reduce(T value1, T value2) throws Exception;
}
/**
* Interface for group reduce functions
* @param <IN> input element type
* @param <OUT> output element type
*/
public interface GroupReduceFunction<IN, OUT> extends Function, Serializable {
/**
* Reduce a group of values
* @param values iterable of input values
* @param out collector for output values
*/
void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception;
}
/**
* Interface for group combine functions (for pre-aggregation)
* @param <IN> input element type
* @param <OUT> output element type
*/
public interface GroupCombineFunction<IN, OUT> extends Function, Serializable {
/**
* Combine a group of values (partial aggregation)
* @param values iterable of input values
* @param out collector for output values
*/
void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
}
/**
* Rich versions with access to runtime context
*/
public abstract class RichReduceFunction<T> extends AbstractRichFunction implements ReduceFunction<T> {}
public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT> {}
public abstract class RichGroupCombineFunction<IN, OUT> extends AbstractRichFunction implements GroupCombineFunction<IN, OUT> {}Exception types related to aggregation operations.
/**
* Exception thrown when aggregation is applied to unsupported type
*/
public class UnsupportedAggregationTypeException extends RuntimeException {
/**
* Create exception with message
* @param message error message
*/
public UnsupportedAggregationTypeException(String message);
}Usage Examples for Exception Handling:
try {
// This might throw UnsupportedAggregationTypeException
// if trying to sum non-numeric fields
DataSet<Tuple2<String, String>> result = stringData
.groupBy(0)
.sum(1); // Error: cannot sum String field
} catch (UnsupportedAggregationTypeException e) {
System.err.println("Cannot perform aggregation: " + e.getMessage());
}import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.operators.SortedGrouping;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.GroupCombineOperator;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupCombineFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.util.Collector;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-java