Apache Flink Scala API providing type-safe operations and functional programming paradigms for distributed stream and batch processing applications.
The Flink Scala API extensions package provides partial function support for DataSet operations, enabling more idiomatic Scala pattern matching and case class decomposition directly in transformation functions.
The extensions package provides methods with the -With suffix that accept partial functions, allowing you to use pattern matching syntax directly in transformations without wrapping in explicit functions.
import org.apache.flink.api.scala.extensions._This import adds implicit conversions that extend DataSet and related classes with partial function methods.
class OnDataSet[T] {
def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
def filterWith(fun: T => Boolean): DataSet[T]
def reduceWith(fun: (T, T) => T): DataSet[T]
def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
def groupingBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
def sortingBy[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
}import org.apache.flink.api.scala._
import org.apache.flink.api.scala.extensions._
case class Person(name: String, age: Int, city: String)
case class Sale(product: String, amount: Double, region: String)
val people = env.fromElements(
Person("Alice", 25, "NYC"),
Person("Bob", 30, "LA"),
Person("Charlie", 35, "NYC")
)
val sales = env.fromElements(
Sale("ProductA", 100.0, "US"),
Sale("ProductB", 200.0, "EU"),
Sale("ProductA", 150.0, "US")
)
// Pattern matching with mapWith
val names = people.mapWith {
case Person(name, _, _) => name.toUpperCase
}
// Filter with pattern matching
val youngPeople = people.filterWith {
case Person(_, age, _) => age < 30
}
// FlatMap with pattern matching
val cityLetters = people.flatMapWith {
case Person(_, _, city) => city.toCharArray
}
// Group by with pattern matching
val groupedByCity = people.groupingBy {
case Person(_, _, city) => city
}
// Complex pattern matching example
val salesSummary = sales
.groupingBy(_.product)
.reduceGroupWith { salesStream =>
val salesList = salesStream.toList
val product = salesList.head.product
val totalAmount = salesList.map(_.amount).sum
val regions = salesList.map(_.region).distinct
(product, totalAmount, regions.mkString(","))
}class OnGroupedDataSet[T] {
def reduceWith(fun: (T, T) => T): DataSet[T]
def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
def sortingBy[K: TypeInformation](fun: T => K, order: Order): GroupedDataSet[T]
}val sales = env.fromElements(
("ProductA", 100), ("ProductB", 200), ("ProductA", 150), ("ProductB", 50)
)
val groupedSales = sales.groupingBy(_._1)
// Reduce with pattern matching
val productTotals = groupedSales.reduceWith {
case ((product, amount1), (_, amount2)) => (product, amount1 + amount2)
}
// Reduce group with pattern matching and complex logic
val productStats = groupedSales.reduceGroupWith { salesStream =>
val salesList = salesStream.toList
val product = salesList.head._1
val amounts = salesList.map(_._2)
val total = amounts.sum
val average = total / amounts.length
val max = amounts.max
(product, total, average, max)
}class OnCrossDataSet[L, R] {
def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
}val left = env.fromElements(("A", 1), ("B", 2))
val right = env.fromElements(("X", 10), ("Y", 20))
val crossed = left
.cross(right)
.applyWith {
case ((leftKey, leftVal), (rightKey, rightVal)) =>
s"$leftKey-$rightKey: ${leftVal * rightVal}"
}class OnCoGroupDataSet[L, R] {
def applyWith[O: TypeInformation: ClassTag](
fun: (Stream[L], Stream[R]) => O
): DataSet[O]
}val left = env.fromElements(("A", 1), ("B", 2), ("A", 3))
val right = env.fromElements(("A", 10), ("C", 30), ("A", 20))
val coGrouped = left
.coGroup(right)
.where(_._1)
.equalTo(_._1)
.applyWith {
case (leftStream, rightStream) =>
val leftList = leftStream.toList
val rightList = rightStream.toList
val key = if (leftList.nonEmpty) leftList.head._1 else rightList.head._1
val leftSum = leftList.map(_._2).sum
val rightSum = rightList.map(_._2).sum
(key, leftSum, rightSum)
}class OnJoinFunctionAssigner[L, R] {
def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
}val employees = env.fromElements(
("Alice", "Engineering"), ("Bob", "Sales"), ("Charlie", "Engineering")
)
val salaries = env.fromElements(
("Alice", 75000), ("Bob", 65000), ("Charlie", 80000)
)
val employeeData = employees
.join(salaries)
.where(_._1)
.equalTo(_._1)
.applyWith {
case ((name, dept), (_, salary)) => (name, dept, salary)
}case class Order(id: Int, customerId: String, items: List[String], total: Double)
case class Customer(id: String, name: String, region: String)
val orders = env.fromElements(
Order(1, "C001", List("A", "B"), 100.0),
Order(2, "C002", List("C", "D", "E"), 250.0),
Order(3, "C001", List("F"), 50.0)
)
val customers = env.fromElements(
Customer("C001", "Alice Corp", "US"),
Customer("C002", "Bob Industries", "EU")
)
// Extract order details with pattern matching
val orderSummary = orders.mapWith {
case Order(id, customerId, items, total) =>
(id, customerId, items.length, total)
}
// Filter orders by complex criteria
val largeOrders = orders.filterWith {
case Order(_, _, items, total) => items.length > 2 && total > 200
}
// Join with pattern matching
val customerOrders = customers
.join(orders)
.where(_.id)
.equalTo(_.customerId)
.applyWith {
case (Customer(_, name, region), Order(orderId, _, items, total)) =>
(orderId, name, region, items.length, total)
}val tupleData = env.fromElements(
("A", List(1, 2, 3), Map("x" -> 10)),
("B", List(4, 5), Map("y" -> 20, "z" -> 30)),
("C", List(6), Map("x" -> 40))
)
// Pattern match on complex tuple structure
val extracted = tupleData.flatMapWith {
case (key, numbers, mapping) =>
for {
num <- numbers
(mapKey, mapValue) <- mapping
} yield (key, mapKey, num * mapValue)
}
// Filter with nested pattern matching
val filtered = tupleData.filterWith {
case (_, numbers, mapping) =>
numbers.sum > 5 && mapping.values.max > 15
}import org.apache.flink.api.scala.extensions._-With suffix to avoid conflicts with regular DataSet methodsThe extensions package makes Flink Scala API more idiomatic and easier to work with for Scala developers, especially when dealing with complex data structures and requiring pattern matching capabilities.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-scala-2-12