Apache Flink Scala API providing type-safe operations and functional programming paradigms for distributed stream and batch processing applications.
Grouped DataSets are created by applying groupBy operations to DataSets. They provide specialized operations for processing data within groups.
class DataSet[T] {
def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
def groupBy(fields: Int*): GroupedDataSet[T]
def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]
}case class Sale(region: String, product: String, amount: Double, date: String)
val sales = env.fromElements(
Sale("US", "ProductA", 100.0, "2023-01-01"),
Sale("EU", "ProductA", 150.0, "2023-01-01"),
Sale("US", "ProductB", 200.0, "2023-01-02"),
Sale("EU", "ProductB", 120.0, "2023-01-02")
)
// Group by key function
val groupedByRegion = sales.groupBy(_.region)
// Group by field position (for tuples)
val tuples = env.fromElements(("US", 100), ("EU", 150), ("US", 200))
val groupedByPosition = tuples.groupBy(0)
// Group by field name (for case classes)
val groupedByName = sales.groupBy("region")
// Group by multiple fields
val groupedByRegionAndProduct = sales.groupBy(s => (s.region, s.product))
val groupedByMultipleFields = sales.groupBy("region", "product")class GroupedDataSet[T] {
def sortGroup(field: Int, order: Order): GroupedDataSet[T]
def sortGroup(field: String, order: Order): GroupedDataSet[T]
def sortGroup[K: TypeInformation](fun: T => K, order: Order): GroupedDataSet[T]
}import org.apache.flink.api.common.operators.Order
val sales = env.fromElements(
("US", 100, "2023-01-01"),
("US", 200, "2023-01-02"),
("EU", 150, "2023-01-01")
)
// Sort by field position within groups
val sortedByAmount = sales
.groupBy(0) // Group by region
.sortGroup(1, Order.DESCENDING) // Sort by amount within each group
// Sort by field name
case class Transaction(region: String, amount: Double, date: String)
val transactions = env.fromElements(
Transaction("US", 100.0, "2023-01-01"),
Transaction("US", 200.0, "2023-01-02")
)
val sortedTransactions = transactions
.groupBy("region")
.sortGroup("amount", Order.ASCENDING)
// Sort by key function
val sortedByDate = transactions
.groupBy(_.region)
.sortGroup(_.date, Order.DESCENDING)class GroupedDataSet[T] {
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
def sum(field: Int): AggregateDataSet[T]
def sum(field: String): AggregateDataSet[T]
def max(field: Int): AggregateDataSet[T]
def max(field: String): AggregateDataSet[T]
def min(field: Int): AggregateDataSet[T]
def min(field: String): AggregateDataSet[T]
}val salesData = env.fromElements(
("US", "ProductA", 100.0),
("US", "ProductB", 200.0),
("EU", "ProductA", 150.0),
("EU", "ProductB", 120.0)
)
// Sum sales by region
val totalByRegion = salesData
.groupBy(0) // Group by region
.sum(2) // Sum the amount field
// Max sales by region
val maxByRegion = salesData
.groupBy(0)
.max(2)
// Min sales by region
val minByRegion = salesData
.groupBy(0)
.min(2)
// Multiple aggregations
val stats = salesData
.groupBy(0)
.aggregate(Aggregations.SUM, 2)
.and(Aggregations.MAX, 2)
.and(Aggregations.MIN, 2)class GroupedDataSet[T] {
def maxBy(fields: Int*): DataSet[T]
def maxBy(firstField: String, otherFields: String*): DataSet[T]
def minBy(fields: Int*): DataSet[T]
def minBy(firstField: String, otherFields: String*): DataSet[T]
}val employeeData = env.fromElements(
("Engineering", "Alice", 95000),
("Engineering", "Bob", 85000),
("Sales", "Charlie", 75000),
("Sales", "David", 80000)
)
// Get highest paid employee per department
val highestPaid = employeeData
.groupBy(0) // Group by department
.maxBy(2) // Max by salary
// Get lowest paid employee per department
val lowestPaid = employeeData
.groupBy(0)
.minBy(2)
// MaxBy with multiple fields (for tie-breaking)
val employeesByName = env.fromElements(
("Dept1", "Alice", 80000, 5), // (dept, name, salary, years)
("Dept1", "Bob", 80000, 3),
("Dept2", "Charlie", 75000, 7)
)
// Max by salary, then by years of experience
val seniorHighestPaid = employeesByName
.groupBy(0)
.maxBy(2, 3) // Max by salary, then by yearsclass GroupedDataSet[T] {
def reduce(fun: (T, T) => T): DataSet[T]
def reduce(reducer: ReduceFunction[T]): DataSet[T]
}val numbers = env.fromElements(
("A", 1), ("A", 2), ("A", 3),
("B", 4), ("B", 5)
)
// Sum numbers within each group
val groupSum = numbers
.groupBy(0)
.reduce((a, b) => (a._1, a._2 + b._2))
// Find maximum within each group
val groupMax = numbers
.groupBy(0)
.reduce((a, b) => if (a._2 > b._2) a else b)class GroupedDataSet[T] {
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
def reduceGroup[R: TypeInformation: ClassTag](groupReducer: GroupReduceFunction[T, R]): DataSet[R]
}val studentGrades = env.fromElements(
("Math", "Alice", 85),
("Math", "Bob", 90),
("Math", "Charlie", 78),
("Science", "Alice", 92),
("Science", "Bob", 88)
)
// Calculate statistics per subject
val subjectStats = studentGrades
.groupBy(0) // Group by subject
.reduceGroup { iterator =>
val grades = iterator.map(_._3).toList
val subject = grades.headOption.map(_ => iterator.next()._1).getOrElse("Unknown")
val avg = grades.sum.toDouble / grades.length
val max = grades.max
val min = grades.min
val count = grades.length
(subject, avg, max, min, count)
}
// Multiple output per group
val gradeRanges = studentGrades
.groupBy(0)
.reduceGroup { (iterator, collector) =>
val gradesList = iterator.toList
val subject = gradesList.head._1
gradesList.foreach { case (_, student, grade) =>
val range = grade match {
case g if g >= 90 => "A"
case g if g >= 80 => "B"
case g if g >= 70 => "C"
case _ => "D"
}
collector.collect((subject, student, grade, range))
}
}class GroupedDataSet[T] {
def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
}val salesRecords = env.fromElements(
("US", 100), ("US", 200), ("US", 150),
("EU", 120), ("EU", 180), ("EU", 90)
)
// Combine groups efficiently (pre-aggregation)
val combinedSales = salesRecords
.groupBy(0)
.combineGroup { (iterator, collector) =>
val records = iterator.toList
val region = records.head._1
val totalSales = records.map(_._2).sum
val recordCount = records.length
collector.collect((region, totalSales, recordCount))
}class GroupedDataSet[T] {
def first(n: Int): DataSet[T]
}val timestampedData = env.fromElements(
("US", "2023-01-01", 100),
("US", "2023-01-02", 150),
("US", "2023-01-03", 200),
("EU", "2023-01-01", 120),
("EU", "2023-01-02", 180)
)
// Get first 2 records per region
val firstTwo = timestampedData
.groupBy(0) // Group by region
.first(2)
// Get first record per region after sorting
val latestPerRegion = timestampedData
.groupBy(0)
.sortGroup(1, Order.DESCENDING) // Sort by date descending
.first(1) // Get latest (first after desc sort)While DataSet API doesn't have built-in windowing, you can simulate windowing using groupBy and custom logic:
case class Event(userId: String, eventType: String, timestamp: Long, value: Double)
val events = env.fromElements(
Event("user1", "click", 1000, 1.0),
Event("user1", "click", 2000, 1.0),
Event("user1", "purchase", 3000, 10.0),
Event("user2", "click", 1500, 1.0),
Event("user2", "purchase", 4000, 15.0)
)
// Group by user and time window (using truncated timestamp)
val windowSize = 3000L // 3 second windows
val windowedEvents = events
.map(e => (e.userId, e.timestamp / windowSize, e)) // Add window key
.groupBy(e => (e._1, e._2)) // Group by user and window
.reduceGroup { iterator =>
val eventsInWindow = iterator.map(_._3).toList
val userId = eventsInWindow.head.userId
val windowStart = eventsInWindow.head.timestamp / windowSize * windowSize
val totalValue = eventsInWindow.map(_.value).sum
val eventCount = eventsInWindow.length
(userId, windowStart, totalValue, eventCount)
}import org.apache.flink.api.common.functions.{
ReduceFunction,
GroupReduceFunction,
GroupCombineFunction
}
import org.apache.flink.api.common.operators.Order
import org.apache.flink.util.Collector
// Function interfaces
trait ReduceFunction[T] extends Function {
def reduce(value1: T, value2: T): T
}
trait GroupReduceFunction[IN, OUT] extends Function {
def reduce(values: java.lang.Iterable[IN], out: Collector[OUT]): Unit
}
trait GroupCombineFunction[IN, OUT] extends Function {
def combine(values: java.lang.Iterable[IN], out: Collector[OUT]): Unit
}
// Aggregation types
object Aggregations extends Enumeration {
val SUM, MAX, MIN = Value
}
// Chained aggregations
class AggregateDataSet[T] {
def and(agg: Aggregations, field: Int): AggregateDataSet[T]
def and(agg: Aggregations, field: String): AggregateDataSet[T]
def andSum(field: Int): AggregateDataSet[T]
def andSum(field: String): AggregateDataSet[T]
def andMax(field: Int): AggregateDataSet[T]
def andMax(field: String): AggregateDataSet[T]
def andMin(field: Int): AggregateDataSet[T]
def andMin(field: String): AggregateDataSet[T]
}
// Order enumeration
object Order extends Enumeration {
val ASCENDING, DESCENDING = Value
}// Prefer groupBy with key functions over field names when possible
case class Record(key: String, value: Int)
val data: DataSet[Record] = // ...
// Efficient - direct key extraction
val grouped1 = data.groupBy(_.key)
// Less efficient - reflection-based field access
val grouped2 = data.groupBy("key")// Use combineGroup for operations that can be pre-aggregated
val largeSales = env.fromElements((1 to 1000000).map(i => (s"region${i % 10}", i)): _*)
// Efficient - pre-aggregates locally before sending over network
val combined = largeSales
.groupBy(0)
.combineGroup { (iterator, collector) =>
val (region, values) = iterator.toList.unzip
collector.collect((region.head, values.sum))
}
// Less efficient for large datasets - no pre-aggregation
val reduced = largeSales
.groupBy(0)
.reduce((a, b) => (a._1, a._2 + b._2))// Sort within groups only when necessary
val transactions = env.fromElements(
("US", 100, "2023-01-01"),
("US", 200, "2023-01-02")
)
// If you only need the first N elements, sort only for that
val topTransactions = transactions
.groupBy(0)
.sortGroup(1, Order.DESCENDING)
.first(5) // Only sort enough to get top 5
// If you need all elements sorted, this is fine
val allSorted = transactions
.groupBy(0)
.sortGroup(1, Order.DESCENDING)
.reduceGroup(identity) // Process all sorted elementsInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-scala-2-12@1.20.2