or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

dataset-operations.mdexecution-environment.mdgrouped-dataset-operations.mdindex.mdjoin-operations.mdtype-system.mdutility-functions.md
tile.json

grouped-dataset-operations.mddocs/

Grouped DataSet Operations

Grouped DataSets are created by applying groupBy operations to DataSets. They provide specialized operations for processing data within groups.

Creating Grouped DataSets

class DataSet[T] {
  def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
  def groupBy(fields: Int*): GroupedDataSet[T] 
  def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]
}

Grouping Examples

case class Sale(region: String, product: String, amount: Double, date: String)

val sales = env.fromElements(
  Sale("US", "ProductA", 100.0, "2023-01-01"),
  Sale("EU", "ProductA", 150.0, "2023-01-01"), 
  Sale("US", "ProductB", 200.0, "2023-01-02"),
  Sale("EU", "ProductB", 120.0, "2023-01-02")
)

// Group by key function
val groupedByRegion = sales.groupBy(_.region)

// Group by field position (for tuples)
val tuples = env.fromElements(("US", 100), ("EU", 150), ("US", 200))
val groupedByPosition = tuples.groupBy(0)

// Group by field name (for case classes)
val groupedByName = sales.groupBy("region")

// Group by multiple fields
val groupedByRegionAndProduct = sales.groupBy(s => (s.region, s.product))
val groupedByMultipleFields = sales.groupBy("region", "product")

Sorting Within Groups

class GroupedDataSet[T] {
  def sortGroup(field: Int, order: Order): GroupedDataSet[T]
  def sortGroup(field: String, order: Order): GroupedDataSet[T]
  def sortGroup[K: TypeInformation](fun: T => K, order: Order): GroupedDataSet[T]
}

Sorting Examples

import org.apache.flink.api.common.operators.Order

val sales = env.fromElements(
  ("US", 100, "2023-01-01"),
  ("US", 200, "2023-01-02"),
  ("EU", 150, "2023-01-01")
)

// Sort by field position within groups
val sortedByAmount = sales
  .groupBy(0) // Group by region
  .sortGroup(1, Order.DESCENDING) // Sort by amount within each group

// Sort by field name
case class Transaction(region: String, amount: Double, date: String)
val transactions = env.fromElements(
  Transaction("US", 100.0, "2023-01-01"),
  Transaction("US", 200.0, "2023-01-02")
)

val sortedTransactions = transactions
  .groupBy("region")
  .sortGroup("amount", Order.ASCENDING)

// Sort by key function
val sortedByDate = transactions
  .groupBy(_.region)
  .sortGroup(_.date, Order.DESCENDING)

Aggregation Operations

Basic Aggregations

class GroupedDataSet[T] {
  def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
  def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
  def sum(field: Int): AggregateDataSet[T]
  def sum(field: String): AggregateDataSet[T]
  def max(field: Int): AggregateDataSet[T]
  def max(field: String): AggregateDataSet[T]
  def min(field: Int): AggregateDataSet[T]
  def min(field: String): AggregateDataSet[T]
}

Aggregation Examples

val salesData = env.fromElements(
  ("US", "ProductA", 100.0),
  ("US", "ProductB", 200.0),
  ("EU", "ProductA", 150.0),
  ("EU", "ProductB", 120.0)
)

// Sum sales by region
val totalByRegion = salesData
  .groupBy(0) // Group by region
  .sum(2)     // Sum the amount field

// Max sales by region  
val maxByRegion = salesData
  .groupBy(0)
  .max(2)

// Min sales by region
val minByRegion = salesData
  .groupBy(0) 
  .min(2)

// Multiple aggregations
val stats = salesData
  .groupBy(0)
  .aggregate(Aggregations.SUM, 2)
  .and(Aggregations.MAX, 2)
  .and(Aggregations.MIN, 2)

MaxBy and MinBy

class GroupedDataSet[T] {
  def maxBy(fields: Int*): DataSet[T]
  def maxBy(firstField: String, otherFields: String*): DataSet[T]
  def minBy(fields: Int*): DataSet[T]
  def minBy(firstField: String, otherFields: String*): DataSet[T]
}
val employeeData = env.fromElements(
  ("Engineering", "Alice", 95000),
  ("Engineering", "Bob", 85000),
  ("Sales", "Charlie", 75000),
  ("Sales", "David", 80000)
)

// Get highest paid employee per department
val highestPaid = employeeData
  .groupBy(0) // Group by department
  .maxBy(2)   // Max by salary

// Get lowest paid employee per department
val lowestPaid = employeeData
  .groupBy(0)
  .minBy(2)

// MaxBy with multiple fields (for tie-breaking)
val employeesByName = env.fromElements(
  ("Dept1", "Alice", 80000, 5),  // (dept, name, salary, years)
  ("Dept1", "Bob", 80000, 3),
  ("Dept2", "Charlie", 75000, 7)
)

// Max by salary, then by years of experience
val seniorHighestPaid = employeesByName
  .groupBy(0)
  .maxBy(2, 3) // Max by salary, then by years

Reduce Operations

Simple Reduce

class GroupedDataSet[T] {
  def reduce(fun: (T, T) => T): DataSet[T]
  def reduce(reducer: ReduceFunction[T]): DataSet[T]
}
val numbers = env.fromElements(
  ("A", 1), ("A", 2), ("A", 3),
  ("B", 4), ("B", 5)
)

// Sum numbers within each group
val groupSum = numbers
  .groupBy(0)
  .reduce((a, b) => (a._1, a._2 + b._2))

// Find maximum within each group
val groupMax = numbers
  .groupBy(0)
  .reduce((a, b) => if (a._2 > b._2) a else b)

Group Reduce

class GroupedDataSet[T] {
  def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
  def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
  def reduceGroup[R: TypeInformation: ClassTag](groupReducer: GroupReduceFunction[T, R]): DataSet[R]
}
val studentGrades = env.fromElements(
  ("Math", "Alice", 85),
  ("Math", "Bob", 90),
  ("Math", "Charlie", 78),
  ("Science", "Alice", 92),
  ("Science", "Bob", 88)
)

// Calculate statistics per subject
val subjectStats = studentGrades
  .groupBy(0) // Group by subject
  .reduceGroup { iterator =>
    val grades = iterator.map(_._3).toList
    val subject = grades.headOption.map(_ => iterator.next()._1).getOrElse("Unknown")
    val avg = grades.sum.toDouble / grades.length
    val max = grades.max
    val min = grades.min
    val count = grades.length
    (subject, avg, max, min, count)
  }

// Multiple output per group
val gradeRanges = studentGrades
  .groupBy(0)
  .reduceGroup { (iterator, collector) =>
    val gradesList = iterator.toList
    val subject = gradesList.head._1
    
    gradesList.foreach { case (_, student, grade) =>
      val range = grade match {
        case g if g >= 90 => "A"
        case g if g >= 80 => "B"
        case g if g >= 70 => "C"
        case _ => "D"
      }
      collector.collect((subject, student, grade, range))
    }
  }

Combine Group

class GroupedDataSet[T] {
  def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
  def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
}
val salesRecords = env.fromElements(
  ("US", 100), ("US", 200), ("US", 150),
  ("EU", 120), ("EU", 180), ("EU", 90)
)

// Combine groups efficiently (pre-aggregation)
val combinedSales = salesRecords
  .groupBy(0)
  .combineGroup { (iterator, collector) =>
    val records = iterator.toList
    val region = records.head._1
    val totalSales = records.map(_._2).sum
    val recordCount = records.length
    collector.collect((region, totalSales, recordCount))
  }

First Operations

class GroupedDataSet[T] {
  def first(n: Int): DataSet[T]
}
val timestampedData = env.fromElements(
  ("US", "2023-01-01", 100),
  ("US", "2023-01-02", 150),
  ("US", "2023-01-03", 200),
  ("EU", "2023-01-01", 120),
  ("EU", "2023-01-02", 180)
)

// Get first 2 records per region
val firstTwo = timestampedData
  .groupBy(0) // Group by region
  .first(2)

// Get first record per region after sorting
val latestPerRegion = timestampedData
  .groupBy(0)
  .sortGroup(1, Order.DESCENDING) // Sort by date descending
  .first(1) // Get latest (first after desc sort)

Window Operations on Groups

While DataSet API doesn't have built-in windowing, you can simulate windowing using groupBy and custom logic:

case class Event(userId: String, eventType: String, timestamp: Long, value: Double)

val events = env.fromElements(
  Event("user1", "click", 1000, 1.0),
  Event("user1", "click", 2000, 1.0),
  Event("user1", "purchase", 3000, 10.0),
  Event("user2", "click", 1500, 1.0),
  Event("user2", "purchase", 4000, 15.0)
)

// Group by user and time window (using truncated timestamp)
val windowSize = 3000L // 3 second windows
val windowedEvents = events
  .map(e => (e.userId, e.timestamp / windowSize, e)) // Add window key
  .groupBy(e => (e._1, e._2)) // Group by user and window
  .reduceGroup { iterator =>
    val eventsInWindow = iterator.map(_._3).toList
    val userId = eventsInWindow.head.userId
    val windowStart = eventsInWindow.head.timestamp / windowSize * windowSize
    val totalValue = eventsInWindow.map(_.value).sum
    val eventCount = eventsInWindow.length
    (userId, windowStart, totalValue, eventCount)
  }

Types

import org.apache.flink.api.common.functions.{
  ReduceFunction, 
  GroupReduceFunction, 
  GroupCombineFunction
}
import org.apache.flink.api.common.operators.Order
import org.apache.flink.util.Collector

// Function interfaces
trait ReduceFunction[T] extends Function {
  def reduce(value1: T, value2: T): T
}

trait GroupReduceFunction[IN, OUT] extends Function {
  def reduce(values: java.lang.Iterable[IN], out: Collector[OUT]): Unit
}

trait GroupCombineFunction[IN, OUT] extends Function {
  def combine(values: java.lang.Iterable[IN], out: Collector[OUT]): Unit
}

// Aggregation types
object Aggregations extends Enumeration {
  val SUM, MAX, MIN = Value
}

// Chained aggregations
class AggregateDataSet[T] {
  def and(agg: Aggregations, field: Int): AggregateDataSet[T]
  def and(agg: Aggregations, field: String): AggregateDataSet[T]
  def andSum(field: Int): AggregateDataSet[T]
  def andSum(field: String): AggregateDataSet[T]
  def andMax(field: Int): AggregateDataSet[T]
  def andMax(field: String): AggregateDataSet[T]
  def andMin(field: Int): AggregateDataSet[T]
  def andMin(field: String): AggregateDataSet[T]
}

// Order enumeration
object Order extends Enumeration {
  val ASCENDING, DESCENDING = Value
}

Performance Considerations

Efficient Grouping

// Prefer groupBy with key functions over field names when possible
case class Record(key: String, value: Int)
val data: DataSet[Record] = // ...

// Efficient - direct key extraction
val grouped1 = data.groupBy(_.key)

// Less efficient - reflection-based field access
val grouped2 = data.groupBy("key")

Combining vs Reducing

// Use combineGroup for operations that can be pre-aggregated
val largeSales = env.fromElements((1 to 1000000).map(i => (s"region${i % 10}", i)): _*)

// Efficient - pre-aggregates locally before sending over network
val combined = largeSales
  .groupBy(0)
  .combineGroup { (iterator, collector) =>
    val (region, values) = iterator.toList.unzip
    collector.collect((region.head, values.sum))
  }

// Less efficient for large datasets - no pre-aggregation
val reduced = largeSales
  .groupBy(0)
  .reduce((a, b) => (a._1, a._2 + b._2))

Sorting Optimization

// Sort within groups only when necessary
val transactions = env.fromElements(
  ("US", 100, "2023-01-01"),
  ("US", 200, "2023-01-02")
)

// If you only need the first N elements, sort only for that
val topTransactions = transactions
  .groupBy(0)
  .sortGroup(1, Order.DESCENDING)
  .first(5) // Only sort enough to get top 5

// If you need all elements sorted, this is fine
val allSorted = transactions
  .groupBy(0)
  .sortGroup(1, Order.DESCENDING)
  .reduceGroup(identity) // Process all sorted elements