Grouped DataSets are created by applying groupBy operations to DataSets. They provide specialized operations for processing data within groups.
class DataSet[T] {
def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
def groupBy(fields: Int*): GroupedDataSet[T]
def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]
}case class Sale(region: String, product: String, amount: Double, date: String)
val sales = env.fromElements(
Sale("US", "ProductA", 100.0, "2023-01-01"),
Sale("EU", "ProductA", 150.0, "2023-01-01"),
Sale("US", "ProductB", 200.0, "2023-01-02"),
Sale("EU", "ProductB", 120.0, "2023-01-02")
)
// Group by key function
val groupedByRegion = sales.groupBy(_.region)
// Group by field position (for tuples)
val tuples = env.fromElements(("US", 100), ("EU", 150), ("US", 200))
val groupedByPosition = tuples.groupBy(0)
// Group by field name (for case classes)
val groupedByName = sales.groupBy("region")
// Group by multiple fields
val groupedByRegionAndProduct = sales.groupBy(s => (s.region, s.product))
val groupedByMultipleFields = sales.groupBy("region", "product")class GroupedDataSet[T] {
def sortGroup(field: Int, order: Order): GroupedDataSet[T]
def sortGroup(field: String, order: Order): GroupedDataSet[T]
def sortGroup[K: TypeInformation](fun: T => K, order: Order): GroupedDataSet[T]
}import org.apache.flink.api.common.operators.Order
val sales = env.fromElements(
("US", 100, "2023-01-01"),
("US", 200, "2023-01-02"),
("EU", 150, "2023-01-01")
)
// Sort by field position within groups
val sortedByAmount = sales
.groupBy(0) // Group by region
.sortGroup(1, Order.DESCENDING) // Sort by amount within each group
// Sort by field name
case class Transaction(region: String, amount: Double, date: String)
val transactions = env.fromElements(
Transaction("US", 100.0, "2023-01-01"),
Transaction("US", 200.0, "2023-01-02")
)
val sortedTransactions = transactions
.groupBy("region")
.sortGroup("amount", Order.ASCENDING)
// Sort by key function
val sortedByDate = transactions
.groupBy(_.region)
.sortGroup(_.date, Order.DESCENDING)class GroupedDataSet[T] {
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
def sum(field: Int): AggregateDataSet[T]
def sum(field: String): AggregateDataSet[T]
def max(field: Int): AggregateDataSet[T]
def max(field: String): AggregateDataSet[T]
def min(field: Int): AggregateDataSet[T]
def min(field: String): AggregateDataSet[T]
}val salesData = env.fromElements(
("US", "ProductA", 100.0),
("US", "ProductB", 200.0),
("EU", "ProductA", 150.0),
("EU", "ProductB", 120.0)
)
// Sum sales by region
val totalByRegion = salesData
.groupBy(0) // Group by region
.sum(2) // Sum the amount field
// Max sales by region
val maxByRegion = salesData
.groupBy(0)
.max(2)
// Min sales by region
val minByRegion = salesData
.groupBy(0)
.min(2)
// Multiple aggregations
val stats = salesData
.groupBy(0)
.aggregate(Aggregations.SUM, 2)
.and(Aggregations.MAX, 2)
.and(Aggregations.MIN, 2)class GroupedDataSet[T] {
def maxBy(fields: Int*): DataSet[T]
def maxBy(firstField: String, otherFields: String*): DataSet[T]
def minBy(fields: Int*): DataSet[T]
def minBy(firstField: String, otherFields: String*): DataSet[T]
}val employeeData = env.fromElements(
("Engineering", "Alice", 95000),
("Engineering", "Bob", 85000),
("Sales", "Charlie", 75000),
("Sales", "David", 80000)
)
// Get highest paid employee per department
val highestPaid = employeeData
.groupBy(0) // Group by department
.maxBy(2) // Max by salary
// Get lowest paid employee per department
val lowestPaid = employeeData
.groupBy(0)
.minBy(2)
// MaxBy with multiple fields (for tie-breaking)
val employeesByName = env.fromElements(
("Dept1", "Alice", 80000, 5), // (dept, name, salary, years)
("Dept1", "Bob", 80000, 3),
("Dept2", "Charlie", 75000, 7)
)
// Max by salary, then by years of experience
val seniorHighestPaid = employeesByName
.groupBy(0)
.maxBy(2, 3) // Max by salary, then by yearsclass GroupedDataSet[T] {
def reduce(fun: (T, T) => T): DataSet[T]
def reduce(reducer: ReduceFunction[T]): DataSet[T]
}val numbers = env.fromElements(
("A", 1), ("A", 2), ("A", 3),
("B", 4), ("B", 5)
)
// Sum numbers within each group
val groupSum = numbers
.groupBy(0)
.reduce((a, b) => (a._1, a._2 + b._2))
// Find maximum within each group
val groupMax = numbers
.groupBy(0)
.reduce((a, b) => if (a._2 > b._2) a else b)class GroupedDataSet[T] {
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
def reduceGroup[R: TypeInformation: ClassTag](groupReducer: GroupReduceFunction[T, R]): DataSet[R]
}val studentGrades = env.fromElements(
("Math", "Alice", 85),
("Math", "Bob", 90),
("Math", "Charlie", 78),
("Science", "Alice", 92),
("Science", "Bob", 88)
)
// Calculate statistics per subject
val subjectStats = studentGrades
.groupBy(0) // Group by subject
.reduceGroup { iterator =>
val grades = iterator.map(_._3).toList
val subject = grades.headOption.map(_ => iterator.next()._1).getOrElse("Unknown")
val avg = grades.sum.toDouble / grades.length
val max = grades.max
val min = grades.min
val count = grades.length
(subject, avg, max, min, count)
}
// Multiple output per group
val gradeRanges = studentGrades
.groupBy(0)
.reduceGroup { (iterator, collector) =>
val gradesList = iterator.toList
val subject = gradesList.head._1
gradesList.foreach { case (_, student, grade) =>
val range = grade match {
case g if g >= 90 => "A"
case g if g >= 80 => "B"
case g if g >= 70 => "C"
case _ => "D"
}
collector.collect((subject, student, grade, range))
}
}class GroupedDataSet[T] {
def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
}val salesRecords = env.fromElements(
("US", 100), ("US", 200), ("US", 150),
("EU", 120), ("EU", 180), ("EU", 90)
)
// Combine groups efficiently (pre-aggregation)
val combinedSales = salesRecords
.groupBy(0)
.combineGroup { (iterator, collector) =>
val records = iterator.toList
val region = records.head._1
val totalSales = records.map(_._2).sum
val recordCount = records.length
collector.collect((region, totalSales, recordCount))
}class GroupedDataSet[T] {
def first(n: Int): DataSet[T]
}val timestampedData = env.fromElements(
("US", "2023-01-01", 100),
("US", "2023-01-02", 150),
("US", "2023-01-03", 200),
("EU", "2023-01-01", 120),
("EU", "2023-01-02", 180)
)
// Get first 2 records per region
val firstTwo = timestampedData
.groupBy(0) // Group by region
.first(2)
// Get first record per region after sorting
val latestPerRegion = timestampedData
.groupBy(0)
.sortGroup(1, Order.DESCENDING) // Sort by date descending
.first(1) // Get latest (first after desc sort)While DataSet API doesn't have built-in windowing, you can simulate windowing using groupBy and custom logic:
case class Event(userId: String, eventType: String, timestamp: Long, value: Double)
val events = env.fromElements(
Event("user1", "click", 1000, 1.0),
Event("user1", "click", 2000, 1.0),
Event("user1", "purchase", 3000, 10.0),
Event("user2", "click", 1500, 1.0),
Event("user2", "purchase", 4000, 15.0)
)
// Group by user and time window (using truncated timestamp)
val windowSize = 3000L // 3 second windows
val windowedEvents = events
.map(e => (e.userId, e.timestamp / windowSize, e)) // Add window key
.groupBy(e => (e._1, e._2)) // Group by user and window
.reduceGroup { iterator =>
val eventsInWindow = iterator.map(_._3).toList
val userId = eventsInWindow.head.userId
val windowStart = eventsInWindow.head.timestamp / windowSize * windowSize
val totalValue = eventsInWindow.map(_.value).sum
val eventCount = eventsInWindow.length
(userId, windowStart, totalValue, eventCount)
}import org.apache.flink.api.common.functions.{
ReduceFunction,
GroupReduceFunction,
GroupCombineFunction
}
import org.apache.flink.api.common.operators.Order
import org.apache.flink.util.Collector
// Function interfaces
trait ReduceFunction[T] extends Function {
def reduce(value1: T, value2: T): T
}
trait GroupReduceFunction[IN, OUT] extends Function {
def reduce(values: java.lang.Iterable[IN], out: Collector[OUT]): Unit
}
trait GroupCombineFunction[IN, OUT] extends Function {
def combine(values: java.lang.Iterable[IN], out: Collector[OUT]): Unit
}
// Aggregation types
object Aggregations extends Enumeration {
val SUM, MAX, MIN = Value
}
// Chained aggregations
class AggregateDataSet[T] {
def and(agg: Aggregations, field: Int): AggregateDataSet[T]
def and(agg: Aggregations, field: String): AggregateDataSet[T]
def andSum(field: Int): AggregateDataSet[T]
def andSum(field: String): AggregateDataSet[T]
def andMax(field: Int): AggregateDataSet[T]
def andMax(field: String): AggregateDataSet[T]
def andMin(field: Int): AggregateDataSet[T]
def andMin(field: String): AggregateDataSet[T]
}
// Order enumeration
object Order extends Enumeration {
val ASCENDING, DESCENDING = Value
}// Prefer groupBy with key functions over field names when possible
case class Record(key: String, value: Int)
val data: DataSet[Record] = // ...
// Efficient - direct key extraction
val grouped1 = data.groupBy(_.key)
// Less efficient - reflection-based field access
val grouped2 = data.groupBy("key")// Use combineGroup for operations that can be pre-aggregated
val largeSales = env.fromElements((1 to 1000000).map(i => (s"region${i % 10}", i)): _*)
// Efficient - pre-aggregates locally before sending over network
val combined = largeSales
.groupBy(0)
.combineGroup { (iterator, collector) =>
val (region, values) = iterator.toList.unzip
collector.collect((region.head, values.sum))
}
// Less efficient for large datasets - no pre-aggregation
val reduced = largeSales
.groupBy(0)
.reduce((a, b) => (a._1, a._2 + b._2))// Sort within groups only when necessary
val transactions = env.fromElements(
("US", 100, "2023-01-01"),
("US", 200, "2023-01-02")
)
// If you only need the first N elements, sort only for that
val topTransactions = transactions
.groupBy(0)
.sortGroup(1, Order.DESCENDING)
.first(5) // Only sort enough to get top 5
// If you need all elements sorted, this is fine
val allSorted = transactions
.groupBy(0)
.sortGroup(1, Order.DESCENDING)
.reduceGroup(identity) // Process all sorted elements