or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md
tile.json

grouping-aggregation.mddocs/

Grouping and Aggregation

Group-wise operations and aggregation functions for analyzing and summarizing grouped data. These operations are essential for data analytics and reporting workflows.

Capabilities

DataSet Grouping

Create grouped DataSets for group-wise operations using various key selection strategies.

class DataSet[T] {
  /**
   * Groups elements by key selector function
   * @param fun Key selector function
   * @return GroupedDataSet for group-wise operations
   */
  def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
  
  /**
   * Groups elements by field positions
   * @param fields Field positions to group by
   * @return GroupedDataSet for group-wise operations
   */
  def groupBy(fields: Int*): GroupedDataSet[T]
  
  /**
   * Groups elements by field names
   * @param firstField First field name
   * @param otherFields Additional field names
   * @return GroupedDataSet for group-wise operations
   */
  def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]
}

Usage Examples:

import org.apache.flink.api.scala._

case class Sale(product: String, category: String, amount: Double, date: String)

val env = ExecutionEnvironment.getExecutionEnvironment
val sales = env.fromElements(
  Sale("Laptop", "Electronics", 999.99, "2023-01-15"),
  Sale("Phone", "Electronics", 599.99, "2023-01-16"),
  Sale("Chair", "Furniture", 149.99, "2023-01-15"),
  Sale("Desk", "Furniture", 299.99, "2023-01-16")
)

// Group by category
val byCategory = sales.groupBy(_.category)

// Group by multiple fields
val byCategoryAndDate = sales.groupBy(s => (s.category, s.date))

// Group by field positions (for tuples/case classes)
val salesTuples = sales.map(s => (s.category, s.amount))
val groupedTuples = salesTuples.groupBy(0) // Group by first field

Group Reductions

Apply reduction operations within each group to combine elements.

class GroupedDataSet[T] {
  /**
   * Reduces elements within each group using a combining function
   * @param fun Binary combining function
   * @return DataSet with one reduced element per group
   */
  def reduce(fun: (T, T) => T): DataSet[T]
  
  /**
   * Reduces with combine hint for optimization
   * @param fun Binary combining function
   * @param strategy Combine strategy hint
   * @return DataSet with reduced elements per group
   */
  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T]
  
  /**
   * Reduces using a ReduceFunction
   * @param reducer ReduceFunction implementation
   * @return DataSet with reduced elements
   */
  def reduce(reducer: ReduceFunction[T]): DataSet[T]
}

Group Processing

Process entire groups using iterators for complex group-wise computations.

class GroupedDataSet[T] {
  /**
   * Processes each group using an iterator
   * @param fun Function processing group iterator to produce result
   * @return DataSet with group processing results
   */
  def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
  
  /**
   * Processes groups using iterator and collector
   * @param fun Function with group iterator and result collector
   * @return DataSet with collected group results
   */
  def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
  
  /**
   * Applies GroupReduceFunction to process groups
   * @param reducer GroupReduceFunction implementation
   * @return DataSet with group reduction results
   */
  def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
}

Usage Examples:

// Calculate total sales per category
val categoryTotals = sales
  .groupBy(_.category)
  .reduceGroup(_.map(_.amount).sum)

// Complex group processing
case class CategoryStats(category: String, totalAmount: Double, count: Int, avgAmount: Double)

val categoryStats = sales
  .groupBy(_.category)
  .reduceGroup { salesInCategory =>
    val salesList = salesInCategory.toList
    val total = salesList.map(_.amount).sum
    val count = salesList.length
    CategoryStats(salesList.head.category, total, count, total / count)
  }

Group Combining

Pre-aggregate elements within partitions before final grouping for better performance.

class GroupedDataSet[T] {
  /**
   * Combines elements within partitions before final grouping
   * @param fun Function for partition-wise combining with collector
   * @return DataSet with partition-combined results
   */
  def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
  
  /**
   * Applies GroupCombineFunction for partition-wise combining
   * @param combiner GroupCombineFunction implementation
   * @return DataSet with combined results
   */
  def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
}

Built-in Aggregations

Convenient aggregation functions for common operations like sum, min, max.

class GroupedDataSet[T] {
  /**
   * Sums values in specified field across each group
   * @param field Field position to sum
   * @return AggregateDataSet for chaining additional aggregations
   */
  def sum(field: Int): AggregateDataSet[T]
  
  /**
   * Sums values in named field across each group
   * @param field Field name to sum
   * @return AggregateDataSet for chaining additional aggregations
   */
  def sum(field: String): AggregateDataSet[T]
  
  /**
   * Finds maximum values in specified field across each group
   * @param field Field position for maximum
   * @return AggregateDataSet for chaining additional aggregations
   */
  def max(field: Int): AggregateDataSet[T]
  
  /**
   * Finds maximum values in named field across each group
   * @param field Field name for maximum
   * @return AggregateDataSet for chaining additional aggregations
   */
  def max(field: String): AggregateDataSet[T]
  
  /**
   * Finds minimum values in specified field across each group
   * @param field Field position for minimum
   * @return AggregateDataSet for chaining additional aggregations
   */
  def min(field: Int): AggregateDataSet[T]
  
  /**
   * Finds minimum values in named field across each group
   * @param field Field name for minimum
   * @return AggregateDataSet for chaining additional aggregations
   */
  def min(field: String): AggregateDataSet[T]
  
  /**
   * Applies specified aggregation to field across each group
   * @param agg Aggregation type (SUM, MAX, MIN)
   * @param field Field position for aggregation
   * @return AggregateDataSet for chaining additional aggregations
   */
  def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
  
  /**
   * Applies specified aggregation to named field across each group
   * @param agg Aggregation type (SUM, MAX, MIN)
   * @param field Field name for aggregation
   * @return AggregateDataSet for chaining additional aggregations
   */
  def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
}

Element Selection

Select specific elements from each group based on field values.

class GroupedDataSet[T] {
  /**
   * Selects elements with minimum values in specified fields from each group
   * @param fields Field positions for minimum comparison
   * @return DataSet with minimum elements per group
   */
  def minBy(fields: Int*): DataSet[T]
  
  /**
   * Selects elements with maximum values in specified fields from each group  
   * @param fields Field positions for maximum comparison
   * @return DataSet with maximum elements per group
   */
  def maxBy(fields: Int*): DataSet[T]
  
  /**
   * Selects first n elements from each group
   * @param n Number of elements to select per group
   * @return DataSet with first n elements per group
   */
  def first(n: Int): DataSet[T]
}

Usage Examples:

// Find highest sale in each category
val highestSalePerCategory = sales
  .groupBy(_.category)
  .maxBy("amount")

// Get top 2 sales per category
val top2PerCategory = sales
  .groupBy(_.category)
  .first(2)

Group Sorting

Sort elements within each group before processing.

class GroupedDataSet[T] {
  /**
   * Sorts elements within each group by field position
   * @param field Field position for sorting
   * @param order Sort order (ASCENDING or DESCENDING)
   * @return Sorted GroupedDataSet
   */
  def sortGroup(field: Int, order: Order): GroupedDataSet[T]
  
  /**
   * Sorts elements within each group by field name
   * @param field Field name for sorting
   * @param order Sort order (ASCENDING or DESCENDING)
   * @return Sorted GroupedDataSet
   */
  def sortGroup(field: String, order: Order): GroupedDataSet[T]
  
  /**
   * Sorts elements within each group using key selector
   * @param fun Key selector function for sorting
   * @param order Sort order (ASCENDING or DESCENDING)
   * @return Sorted GroupedDataSet
   */
  def sortGroup[K: TypeInformation](fun: T => K, order: Order): GroupedDataSet[T]
}

Custom Partitioning

Control how groups are distributed across cluster nodes.

class GroupedDataSet[T] {
  /**
   * Uses custom partitioner for group distribution
   * @param partitioner Custom partitioner implementation
   * @return GroupedDataSet with custom partitioning
   */
  def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): GroupedDataSet[T]
}

Chained Aggregations

AggregateDataSet Operations

Chain multiple aggregation operations for comprehensive analytics.

class AggregateDataSet[T] {
  /**
   * Adds additional aggregation to the chain
   * @param agg Aggregation type
   * @param field Field position for aggregation
   * @return AggregateDataSet for further chaining
   */
  def and(agg: Aggregations, field: Int): AggregateDataSet[T]
  
  /**
   * Adds additional aggregation to the chain by field name
   * @param agg Aggregation type
   * @param field Field name for aggregation
   * @return AggregateDataSet for further chaining
   */
  def and(agg: Aggregations, field: String): AggregateDataSet[T]
  
  /**
   * Adds sum aggregation to the chain
   * @param field Field position to sum
   * @return AggregateDataSet for further chaining
   */
  def andSum(field: Int): AggregateDataSet[T]
  
  /**
   * Adds maximum aggregation to the chain
   * @param field Field position for maximum
   * @return AggregateDataSet for further chaining
   */
  def andMax(field: Int): AggregateDataSet[T]
  
  /**
   * Adds minimum aggregation to the chain
   * @param field Field position for minimum
   * @return AggregateDataSet for further chaining
   */
  def andMin(field: Int): AggregateDataSet[T]
  
  /**
   * Adds sum aggregation to the chain by field name
   * @param field Field name to sum
   * @return AggregateDataSet for further chaining
   */
  def andSum(field: String): AggregateDataSet[T]
  
  /**
   * Adds maximum aggregation to the chain by field name
   * @param field Field name for maximum
   * @return AggregateDataSet for further chaining
   */
  def andMax(field: String): AggregateDataSet[T]
  
  /**
   * Adds minimum aggregation to the chain by field name
   * @param field Field name for minimum
   * @return AggregateDataSet for further chaining
   */
  def andMin(field: String): AggregateDataSet[T]
}

Usage Examples:

// Multiple aggregations on grouped data
case class SalesRecord(product: String, region: String, quantity: Int, revenue: Double)

val salesData = env.fromElements(
  SalesRecord("Laptop", "North", 10, 9999.90),
  SalesRecord("Phone", "North", 25, 14999.75),
  SalesRecord("Laptop", "South", 15, 14999.85)
)

// Sum both quantity and revenue by region
val regionStats = salesData
  .groupBy(_.region)
  .sum("quantity")
  .andSum("revenue")

Types

class GroupedDataSet[T] {
  // Inherits from DataSet[T] and adds grouping-specific operations
}

class AggregateDataSet[T] extends DataSet[T] {
  // Represents result of aggregation operations with chaining capabilities
}

sealed trait CombineHint
object CombineHint {
  case object HASH extends CombineHint
  case object SORT extends CombineHint
}

object Aggregations extends Enumeration {
  type Aggregations = Value
  val SUM, MAX, MIN = Value
}

sealed trait Order
object Order {
  case object ASCENDING extends Order
  case object DESCENDING extends Order
}

abstract class Partitioner[T] {
  def partition(key: T, numPartitions: Int): Int
}