CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-scala-2-10

Apache Flink Scala API providing type-safe distributed stream and batch processing with idiomatic Scala constructs, functional programming features, and seamless runtime integration.

Overview
Eval results
Files

grouping-aggregation.mddocs/

Grouping and Aggregation

Apache Flink Scala API provides powerful grouping and aggregation operations with type-safe field access and functional programming patterns. Data can be grouped by keys and then aggregated using built-in functions or custom reduce operations.

Grouping Operations

GroupBy

Group elements by key for subsequent aggregation operations.

class DataSet[T] {
  // Group by key function
  def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
  
  // Group by field positions (for tuples/case classes)
  def groupBy(fields: Int*): GroupedDataSet[T]
  
  // Group by field names (for case classes)
  def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]
}

Aggregation Operations

Basic Aggregations

Perform common aggregation operations on grouped data.

class GroupedDataSet[T] {
  // Sum values by field
  def sum(field: Int): DataSet[T]
  def sum(field: String): DataSet[T]
  
  // Find maximum values by field
  def max(field: Int): DataSet[T]
  def max(field: String): DataSet[T]
  
  // Find minimum values by field  
  def min(field: Int): DataSet[T]
  def min(field: String): DataSet[T]
  
  // Find records with maximum field values
  def maxBy(fields: Int*): DataSet[T]
  def maxBy(firstField: String, otherFields: String*): DataSet[T]
  
  // Find records with minimum field values
  def minBy(fields: Int*): DataSet[T]
  def minBy(firstField: String, otherFields: String*): DataSet[T]
}

Generic Aggregation

class GroupedDataSet[T] {
  // Generic aggregation with Aggregations enum
  def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
  def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
  
  // Chain multiple aggregations
  class AggregateDataSet[T] {
    def and(agg: Aggregations, field: Int): AggregateDataSet[T]
    def and(agg: Aggregations, field: String): AggregateDataSet[T]
  }
}

// Direct aggregation on ungrouped DataSet
class DataSet[T] {
  def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
  def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
}

Partitioning Configuration

class GroupedDataSet[T] {
  // Configure custom partitioner for grouping
  def withPartitioner[K : TypeInformation](partitioner: Partitioner[K]): GroupedDataSet[T]
}

Reduce Operations

Basic Reduce

class GroupedDataSet[T] {
  // Reduce groups using function
  def reduce(fun: (T, T) => T): DataSet[T]
  
  // Reduce groups using ReduceFunction
  def reduce(reducer: ReduceFunction[T]): DataSet[T]
}

Group Reduce

Transform entire groups at once, providing access to all elements in each group.

class GroupedDataSet[T] {
  // Process entire group with function returning single result
  def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
  
  // Process entire group with function returning multiple results
  def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
  
  // Process entire group with Collector for output
  def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
  
  // Process entire group with GroupReduceFunction
  def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
}

// Direct group reduce on ungrouped DataSet
class DataSet[T] {
  def aggregateGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
  def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
}

Combine Functions

Pre-aggregate data locally before shuffling for better performance.

class GroupedDataSet[T] {
  // Combine with function returning single result
  def combineGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
  
  // Combine with function returning multiple results
  def combineGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
  
  // Combine with Collector for output
  def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
  
  // Combine with GroupCombineFunction
  def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
}

Sorted Grouping

Sort Groups

Sort elements within each group before processing.

class GroupedDataSet[T] {
  // Sort by key function
  def sortGroup[K: TypeInformation](fun: T => K, order: Order): SortedGrouping[T]
  
  // Sort by field position
  def sortGroup(field: Int, order: Order): SortedGrouping[T]
  
  // Sort by field name
  def sortGroup(field: String, order: Order): SortedGrouping[T]
}

class SortedGrouping[T] {
  // Chain additional sort keys
  def sortGroup[K: TypeInformation](fun: T => K, order: Order): SortedGrouping[T]
  def sortGroup(field: Int, order: Order): SortedGrouping[T]
  def sortGroup(field: String, order: Order): SortedGrouping[T]
  
  // Reduce sorted groups
  def reduce(fun: (T, T) => T): DataSet[T]
  def reduce(reducer: ReduceFunction[T]): DataSet[T]
  def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
}

First N Elements

Get the first n elements from each group.

class GroupedDataSet[T] {
  def first(n: Int): DataSet[T]
}

Usage Examples

Basic Grouping and Aggregation

import org.apache.flink.api.scala._

case class Sale(product: String, region: String, amount: Double)

val env = ExecutionEnvironment.getExecutionEnvironment
val sales = env.fromElements(
  Sale("laptop", "US", 1000.0),
  Sale("phone", "US", 500.0),
  Sale("laptop", "EU", 800.0),
  Sale("phone", "EU", 450.0),
  Sale("tablet", "US", 300.0)
)

// Group by product and sum amounts
val productSales = sales
  .groupBy(_.product)
  .sum("amount")

// Group by region and find max sale
val maxSalesByRegion = sales
  .groupBy(_.region)
  .maxBy("amount")

Multiple Field Grouping

import org.apache.flink.api.scala._

case class Transaction(date: String, product: String, region: String, amount: Double)

val env = ExecutionEnvironment.getExecutionEnvironment
val transactions = env.fromElements(
  Transaction("2023-01-01", "laptop", "US", 1000.0),
  Transaction("2023-01-01", "laptop", "EU", 800.0),
  Transaction("2023-01-02", "laptop", "US", 1200.0)
)

// Group by multiple fields
val dailyProductSales = transactions
  .groupBy(t => (t.date, t.product))
  .sum("amount")

// Group by field names
val regionProductSales = transactions
  .groupBy("region", "product")
  .aggregate(Aggregations.SUM, "amount")

Custom Reduce Operations

import org.apache.flink.api.scala._

case class Employee(name: String, department: String, salary: Double)

val env = ExecutionEnvironment.getExecutionEnvironment
val employees = env.fromElements(
  Employee("Alice", "Engineering", 80000),
  Employee("Bob", "Engineering", 75000),
  Employee("Charlie", "Sales", 60000),
  Employee("Diana", "Sales", 65000)
)

// Find highest paid employee per department
val topEarners = employees
  .groupBy(_.department)
  .reduce((e1, e2) => if (e1.salary > e2.salary) e1 else e2)

// Calculate average salary per department
case class DeptAverage(department: String, avgSalary: Double, count: Int)

val avgSalaries = employees
  .groupBy(_.department)
  .reduceGroup { employees =>
    val list = employees.toList
    val dept = list.head.department
    val total = list.map(_.salary).sum
    val count = list.length
    DeptAverage(dept, total / count, count)
  }

Group Reduce with Multiple Results

import org.apache.flink.api.scala._

case class Order(customer: String, product: String, quantity: Int, price: Double)

val env = ExecutionEnvironment.getExecutionEnvironment
val orders = env.fromElements(
  Order("Alice", "laptop", 1, 1000.0),
  Order("Alice", "mouse", 2, 20.0),
  Order("Bob", "phone", 1, 500.0),
  Order("Bob", "case", 1, 15.0)
)

// Create customer summary with multiple metrics
case class CustomerSummary(customer: String, totalSpent: Double, itemCount: Int, avgOrderValue: Double)

val customerSummaries = orders
  .groupBy(_.customer)
  .reduceGroup { orders =>
    val orderList = orders.toList
    val customer = orderList.head.customer
    val totalSpent = orderList.map(o => o.quantity * o.price).sum
    val itemCount = orderList.map(_.quantity).sum
    val avgOrderValue = totalSpent / orderList.length
    CustomerSummary(customer, totalSpent, itemCount, avgOrderValue)
  }

Sorted Grouping

import org.apache.flink.api.scala._
import org.apache.flink.api.common.operators.Order

case class Score(player: String, game: String, score: Int, timestamp: Long)

val env = ExecutionEnvironment.getExecutionEnvironment
val scores = env.fromElements(
  Score("Alice", "game1", 100, 1000L),
  Score("Alice", "game1", 150, 2000L),
  Score("Alice", "game1", 120, 3000L),
  Score("Bob", "game1", 90, 1500L),
  Score("Bob", "game1", 110, 2500L)
)

// Get best score per player (latest timestamp in case of ties)
val bestScores = scores
  .groupBy(_.player)
  .sortGroup(_.score, Order.DESCENDING)
  .sortGroup(_.timestamp, Order.DESCENDING)
  .first(1)

// Get top 3 scores per player
val top3Scores = scores
  .groupBy(_.player)
  .sortGroup(_.score, Order.DESCENDING)
  .first(3)

Chained Aggregations

import org.apache.flink.api.scala._
import org.apache.flink.api.java.aggregation.Aggregations

case class Metrics(region: String, product: String, sales: Double, profit: Double, units: Int)

val env = ExecutionEnvironment.getExecutionEnvironment
val metrics = env.fromElements(
  Metrics("US", "laptop", 1000.0, 200.0, 1),
  Metrics("US", "phone", 500.0, 100.0, 1),
  Metrics("EU", "laptop", 800.0, 150.0, 1)
)

// Aggregate multiple fields
val regionalSummary = metrics
  .groupBy(_.region)
  .aggregate(Aggregations.SUM, "sales")
  .and(Aggregations.SUM, "profit")
  .and(Aggregations.SUM, "units")

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-scala-2-10

docs

data-sources-sinks.md

execution-environment.md

grouping-aggregation.md

hadoop-integration.md

index.md

iterations.md

joins-cogroups.md

transformations.md

type-system.md

tile.json