Apache Flink Scala API providing type-safe distributed stream and batch processing with idiomatic Scala constructs, functional programming features, and seamless runtime integration.
Apache Flink Scala API provides powerful grouping and aggregation operations with type-safe field access and functional programming patterns. Data can be grouped by keys and then aggregated using built-in functions or custom reduce operations.
Group elements by key for subsequent aggregation operations.
class DataSet[T] {
// Group by key function
def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
// Group by field positions (for tuples/case classes)
def groupBy(fields: Int*): GroupedDataSet[T]
// Group by field names (for case classes)
def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]
}Perform common aggregation operations on grouped data.
class GroupedDataSet[T] {
// Sum values by field
def sum(field: Int): DataSet[T]
def sum(field: String): DataSet[T]
// Find maximum values by field
def max(field: Int): DataSet[T]
def max(field: String): DataSet[T]
// Find minimum values by field
def min(field: Int): DataSet[T]
def min(field: String): DataSet[T]
// Find records with maximum field values
def maxBy(fields: Int*): DataSet[T]
def maxBy(firstField: String, otherFields: String*): DataSet[T]
// Find records with minimum field values
def minBy(fields: Int*): DataSet[T]
def minBy(firstField: String, otherFields: String*): DataSet[T]
}class GroupedDataSet[T] {
// Generic aggregation with Aggregations enum
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
// Chain multiple aggregations
class AggregateDataSet[T] {
def and(agg: Aggregations, field: Int): AggregateDataSet[T]
def and(agg: Aggregations, field: String): AggregateDataSet[T]
}
}
// Direct aggregation on ungrouped DataSet
class DataSet[T] {
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
}class GroupedDataSet[T] {
// Configure custom partitioner for grouping
def withPartitioner[K : TypeInformation](partitioner: Partitioner[K]): GroupedDataSet[T]
}class GroupedDataSet[T] {
// Reduce groups using function
def reduce(fun: (T, T) => T): DataSet[T]
// Reduce groups using ReduceFunction
def reduce(reducer: ReduceFunction[T]): DataSet[T]
}Transform entire groups at once, providing access to all elements in each group.
class GroupedDataSet[T] {
// Process entire group with function returning single result
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
// Process entire group with function returning multiple results
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
// Process entire group with Collector for output
def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
// Process entire group with GroupReduceFunction
def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
}
// Direct group reduce on ungrouped DataSet
class DataSet[T] {
def aggregateGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
}Pre-aggregate data locally before shuffling for better performance.
class GroupedDataSet[T] {
// Combine with function returning single result
def combineGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
// Combine with function returning multiple results
def combineGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
// Combine with Collector for output
def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
// Combine with GroupCombineFunction
def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
}Sort elements within each group before processing.
class GroupedDataSet[T] {
// Sort by key function
def sortGroup[K: TypeInformation](fun: T => K, order: Order): SortedGrouping[T]
// Sort by field position
def sortGroup(field: Int, order: Order): SortedGrouping[T]
// Sort by field name
def sortGroup(field: String, order: Order): SortedGrouping[T]
}
class SortedGrouping[T] {
// Chain additional sort keys
def sortGroup[K: TypeInformation](fun: T => K, order: Order): SortedGrouping[T]
def sortGroup(field: Int, order: Order): SortedGrouping[T]
def sortGroup(field: String, order: Order): SortedGrouping[T]
// Reduce sorted groups
def reduce(fun: (T, T) => T): DataSet[T]
def reduce(reducer: ReduceFunction[T]): DataSet[T]
def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
}Get the first n elements from each group.
class GroupedDataSet[T] {
def first(n: Int): DataSet[T]
}import org.apache.flink.api.scala._
case class Sale(product: String, region: String, amount: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
val sales = env.fromElements(
Sale("laptop", "US", 1000.0),
Sale("phone", "US", 500.0),
Sale("laptop", "EU", 800.0),
Sale("phone", "EU", 450.0),
Sale("tablet", "US", 300.0)
)
// Group by product and sum amounts
val productSales = sales
.groupBy(_.product)
.sum("amount")
// Group by region and find max sale
val maxSalesByRegion = sales
.groupBy(_.region)
.maxBy("amount")import org.apache.flink.api.scala._
case class Transaction(date: String, product: String, region: String, amount: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
val transactions = env.fromElements(
Transaction("2023-01-01", "laptop", "US", 1000.0),
Transaction("2023-01-01", "laptop", "EU", 800.0),
Transaction("2023-01-02", "laptop", "US", 1200.0)
)
// Group by multiple fields
val dailyProductSales = transactions
.groupBy(t => (t.date, t.product))
.sum("amount")
// Group by field names
val regionProductSales = transactions
.groupBy("region", "product")
.aggregate(Aggregations.SUM, "amount")import org.apache.flink.api.scala._
case class Employee(name: String, department: String, salary: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
val employees = env.fromElements(
Employee("Alice", "Engineering", 80000),
Employee("Bob", "Engineering", 75000),
Employee("Charlie", "Sales", 60000),
Employee("Diana", "Sales", 65000)
)
// Find highest paid employee per department
val topEarners = employees
.groupBy(_.department)
.reduce((e1, e2) => if (e1.salary > e2.salary) e1 else e2)
// Calculate average salary per department
case class DeptAverage(department: String, avgSalary: Double, count: Int)
val avgSalaries = employees
.groupBy(_.department)
.reduceGroup { employees =>
val list = employees.toList
val dept = list.head.department
val total = list.map(_.salary).sum
val count = list.length
DeptAverage(dept, total / count, count)
}import org.apache.flink.api.scala._
case class Order(customer: String, product: String, quantity: Int, price: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
val orders = env.fromElements(
Order("Alice", "laptop", 1, 1000.0),
Order("Alice", "mouse", 2, 20.0),
Order("Bob", "phone", 1, 500.0),
Order("Bob", "case", 1, 15.0)
)
// Create customer summary with multiple metrics
case class CustomerSummary(customer: String, totalSpent: Double, itemCount: Int, avgOrderValue: Double)
val customerSummaries = orders
.groupBy(_.customer)
.reduceGroup { orders =>
val orderList = orders.toList
val customer = orderList.head.customer
val totalSpent = orderList.map(o => o.quantity * o.price).sum
val itemCount = orderList.map(_.quantity).sum
val avgOrderValue = totalSpent / orderList.length
CustomerSummary(customer, totalSpent, itemCount, avgOrderValue)
}import org.apache.flink.api.scala._
import org.apache.flink.api.common.operators.Order
case class Score(player: String, game: String, score: Int, timestamp: Long)
val env = ExecutionEnvironment.getExecutionEnvironment
val scores = env.fromElements(
Score("Alice", "game1", 100, 1000L),
Score("Alice", "game1", 150, 2000L),
Score("Alice", "game1", 120, 3000L),
Score("Bob", "game1", 90, 1500L),
Score("Bob", "game1", 110, 2500L)
)
// Get best score per player (latest timestamp in case of ties)
val bestScores = scores
.groupBy(_.player)
.sortGroup(_.score, Order.DESCENDING)
.sortGroup(_.timestamp, Order.DESCENDING)
.first(1)
// Get top 3 scores per player
val top3Scores = scores
.groupBy(_.player)
.sortGroup(_.score, Order.DESCENDING)
.first(3)import org.apache.flink.api.scala._
import org.apache.flink.api.java.aggregation.Aggregations
case class Metrics(region: String, product: String, sales: Double, profit: Double, units: Int)
val env = ExecutionEnvironment.getExecutionEnvironment
val metrics = env.fromElements(
Metrics("US", "laptop", 1000.0, 200.0, 1),
Metrics("US", "phone", 500.0, 100.0, 1),
Metrics("EU", "laptop", 800.0, 150.0, 1)
)
// Aggregate multiple fields
val regionalSummary = metrics
.groupBy(_.region)
.aggregate(Aggregations.SUM, "sales")
.and(Aggregations.SUM, "profit")
.and(Aggregations.SUM, "units")Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-scala-2-10