Group-wise operations and aggregation functions for analyzing and summarizing grouped data. These operations are essential for data analytics and reporting workflows.
Create grouped DataSets for group-wise operations using various key selection strategies.
class DataSet[T] {
/**
* Groups elements by key selector function
* @param fun Key selector function
* @return GroupedDataSet for group-wise operations
*/
def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
/**
* Groups elements by field positions
* @param fields Field positions to group by
* @return GroupedDataSet for group-wise operations
*/
def groupBy(fields: Int*): GroupedDataSet[T]
/**
* Groups elements by field names
* @param firstField First field name
* @param otherFields Additional field names
* @return GroupedDataSet for group-wise operations
*/
def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]
}Usage Examples:
import org.apache.flink.api.scala._
case class Sale(product: String, category: String, amount: Double, date: String)
val env = ExecutionEnvironment.getExecutionEnvironment
val sales = env.fromElements(
Sale("Laptop", "Electronics", 999.99, "2023-01-15"),
Sale("Phone", "Electronics", 599.99, "2023-01-16"),
Sale("Chair", "Furniture", 149.99, "2023-01-15"),
Sale("Desk", "Furniture", 299.99, "2023-01-16")
)
// Group by category
val byCategory = sales.groupBy(_.category)
// Group by multiple fields
val byCategoryAndDate = sales.groupBy(s => (s.category, s.date))
// Group by field positions (for tuples/case classes)
val salesTuples = sales.map(s => (s.category, s.amount))
val groupedTuples = salesTuples.groupBy(0) // Group by first fieldApply reduction operations within each group to combine elements.
class GroupedDataSet[T] {
/**
* Reduces elements within each group using a combining function
* @param fun Binary combining function
* @return DataSet with one reduced element per group
*/
def reduce(fun: (T, T) => T): DataSet[T]
/**
* Reduces with combine hint for optimization
* @param fun Binary combining function
* @param strategy Combine strategy hint
* @return DataSet with reduced elements per group
*/
def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T]
/**
* Reduces using a ReduceFunction
* @param reducer ReduceFunction implementation
* @return DataSet with reduced elements
*/
def reduce(reducer: ReduceFunction[T]): DataSet[T]
}Process entire groups using iterators for complex group-wise computations.
class GroupedDataSet[T] {
/**
* Processes each group using an iterator
* @param fun Function processing group iterator to produce result
* @return DataSet with group processing results
*/
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
/**
* Processes groups using iterator and collector
* @param fun Function with group iterator and result collector
* @return DataSet with collected group results
*/
def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
/**
* Applies GroupReduceFunction to process groups
* @param reducer GroupReduceFunction implementation
* @return DataSet with group reduction results
*/
def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
}Usage Examples:
// Calculate total sales per category
val categoryTotals = sales
.groupBy(_.category)
.reduceGroup(_.map(_.amount).sum)
// Complex group processing
case class CategoryStats(category: String, totalAmount: Double, count: Int, avgAmount: Double)
val categoryStats = sales
.groupBy(_.category)
.reduceGroup { salesInCategory =>
val salesList = salesInCategory.toList
val total = salesList.map(_.amount).sum
val count = salesList.length
CategoryStats(salesList.head.category, total, count, total / count)
}Pre-aggregate elements within partitions before final grouping for better performance.
class GroupedDataSet[T] {
/**
* Combines elements within partitions before final grouping
* @param fun Function for partition-wise combining with collector
* @return DataSet with partition-combined results
*/
def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
/**
* Applies GroupCombineFunction for partition-wise combining
* @param combiner GroupCombineFunction implementation
* @return DataSet with combined results
*/
def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
}Convenient aggregation functions for common operations like sum, min, max.
class GroupedDataSet[T] {
/**
* Sums values in specified field across each group
* @param field Field position to sum
* @return AggregateDataSet for chaining additional aggregations
*/
def sum(field: Int): AggregateDataSet[T]
/**
* Sums values in named field across each group
* @param field Field name to sum
* @return AggregateDataSet for chaining additional aggregations
*/
def sum(field: String): AggregateDataSet[T]
/**
* Finds maximum values in specified field across each group
* @param field Field position for maximum
* @return AggregateDataSet for chaining additional aggregations
*/
def max(field: Int): AggregateDataSet[T]
/**
* Finds maximum values in named field across each group
* @param field Field name for maximum
* @return AggregateDataSet for chaining additional aggregations
*/
def max(field: String): AggregateDataSet[T]
/**
* Finds minimum values in specified field across each group
* @param field Field position for minimum
* @return AggregateDataSet for chaining additional aggregations
*/
def min(field: Int): AggregateDataSet[T]
/**
* Finds minimum values in named field across each group
* @param field Field name for minimum
* @return AggregateDataSet for chaining additional aggregations
*/
def min(field: String): AggregateDataSet[T]
/**
* Applies specified aggregation to field across each group
* @param agg Aggregation type (SUM, MAX, MIN)
* @param field Field position for aggregation
* @return AggregateDataSet for chaining additional aggregations
*/
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
/**
* Applies specified aggregation to named field across each group
* @param agg Aggregation type (SUM, MAX, MIN)
* @param field Field name for aggregation
* @return AggregateDataSet for chaining additional aggregations
*/
def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
}Select specific elements from each group based on field values.
class GroupedDataSet[T] {
/**
* Selects elements with minimum values in specified fields from each group
* @param fields Field positions for minimum comparison
* @return DataSet with minimum elements per group
*/
def minBy(fields: Int*): DataSet[T]
/**
* Selects elements with maximum values in specified fields from each group
* @param fields Field positions for maximum comparison
* @return DataSet with maximum elements per group
*/
def maxBy(fields: Int*): DataSet[T]
/**
* Selects first n elements from each group
* @param n Number of elements to select per group
* @return DataSet with first n elements per group
*/
def first(n: Int): DataSet[T]
}Usage Examples:
// Find highest sale in each category
val highestSalePerCategory = sales
.groupBy(_.category)
.maxBy("amount")
// Get top 2 sales per category
val top2PerCategory = sales
.groupBy(_.category)
.first(2)Sort elements within each group before processing.
class GroupedDataSet[T] {
/**
* Sorts elements within each group by field position
* @param field Field position for sorting
* @param order Sort order (ASCENDING or DESCENDING)
* @return Sorted GroupedDataSet
*/
def sortGroup(field: Int, order: Order): GroupedDataSet[T]
/**
* Sorts elements within each group by field name
* @param field Field name for sorting
* @param order Sort order (ASCENDING or DESCENDING)
* @return Sorted GroupedDataSet
*/
def sortGroup(field: String, order: Order): GroupedDataSet[T]
/**
* Sorts elements within each group using key selector
* @param fun Key selector function for sorting
* @param order Sort order (ASCENDING or DESCENDING)
* @return Sorted GroupedDataSet
*/
def sortGroup[K: TypeInformation](fun: T => K, order: Order): GroupedDataSet[T]
}Control how groups are distributed across cluster nodes.
class GroupedDataSet[T] {
/**
* Uses custom partitioner for group distribution
* @param partitioner Custom partitioner implementation
* @return GroupedDataSet with custom partitioning
*/
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): GroupedDataSet[T]
}Chain multiple aggregation operations for comprehensive analytics.
class AggregateDataSet[T] {
/**
* Adds additional aggregation to the chain
* @param agg Aggregation type
* @param field Field position for aggregation
* @return AggregateDataSet for further chaining
*/
def and(agg: Aggregations, field: Int): AggregateDataSet[T]
/**
* Adds additional aggregation to the chain by field name
* @param agg Aggregation type
* @param field Field name for aggregation
* @return AggregateDataSet for further chaining
*/
def and(agg: Aggregations, field: String): AggregateDataSet[T]
/**
* Adds sum aggregation to the chain
* @param field Field position to sum
* @return AggregateDataSet for further chaining
*/
def andSum(field: Int): AggregateDataSet[T]
/**
* Adds maximum aggregation to the chain
* @param field Field position for maximum
* @return AggregateDataSet for further chaining
*/
def andMax(field: Int): AggregateDataSet[T]
/**
* Adds minimum aggregation to the chain
* @param field Field position for minimum
* @return AggregateDataSet for further chaining
*/
def andMin(field: Int): AggregateDataSet[T]
/**
* Adds sum aggregation to the chain by field name
* @param field Field name to sum
* @return AggregateDataSet for further chaining
*/
def andSum(field: String): AggregateDataSet[T]
/**
* Adds maximum aggregation to the chain by field name
* @param field Field name for maximum
* @return AggregateDataSet for further chaining
*/
def andMax(field: String): AggregateDataSet[T]
/**
* Adds minimum aggregation to the chain by field name
* @param field Field name for minimum
* @return AggregateDataSet for further chaining
*/
def andMin(field: String): AggregateDataSet[T]
}Usage Examples:
// Multiple aggregations on grouped data
case class SalesRecord(product: String, region: String, quantity: Int, revenue: Double)
val salesData = env.fromElements(
SalesRecord("Laptop", "North", 10, 9999.90),
SalesRecord("Phone", "North", 25, 14999.75),
SalesRecord("Laptop", "South", 15, 14999.85)
)
// Sum both quantity and revenue by region
val regionStats = salesData
.groupBy(_.region)
.sum("quantity")
.andSum("revenue")class GroupedDataSet[T] {
// Inherits from DataSet[T] and adds grouping-specific operations
}
class AggregateDataSet[T] extends DataSet[T] {
// Represents result of aggregation operations with chaining capabilities
}
sealed trait CombineHint
object CombineHint {
case object HASH extends CombineHint
case object SORT extends CombineHint
}
object Aggregations extends Enumeration {
type Aggregations = Value
val SUM, MAX, MIN = Value
}
sealed trait Order
object Order {
case object ASCENDING extends Order
case object DESCENDING extends Order
}
abstract class Partitioner[T] {
def partition(key: T, numPartitions: Int): Int
}