CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-scala-2-12

Apache Flink Scala API providing type-safe operations and functional programming paradigms for distributed stream and batch processing applications.

Overview
Eval results
Files

extensions.mddocs/

Extensions Package - Partial Functions Support

The Flink Scala API extensions package provides partial function support for DataSet operations, enabling more idiomatic Scala pattern matching and case class decomposition directly in transformation functions.

Overview

The extensions package provides methods with the -With suffix that accept partial functions, allowing you to use pattern matching syntax directly in transformations without wrapping in explicit functions.

Importing Extensions

import org.apache.flink.api.scala.extensions._

This import adds implicit conversions that extend DataSet and related classes with partial function methods.

DataSet Extensions

OnDataSet Extension Methods

class OnDataSet[T] {
  def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
  def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
  def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
  def filterWith(fun: T => Boolean): DataSet[T]
  def reduceWith(fun: (T, T) => T): DataSet[T]
  def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
  def groupingBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
  def sortingBy[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
}

Usage Examples

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.extensions._

case class Person(name: String, age: Int, city: String)
case class Sale(product: String, amount: Double, region: String)

val people = env.fromElements(
  Person("Alice", 25, "NYC"),
  Person("Bob", 30, "LA"),
  Person("Charlie", 35, "NYC")
)

val sales = env.fromElements(
  Sale("ProductA", 100.0, "US"),
  Sale("ProductB", 200.0, "EU"),
  Sale("ProductA", 150.0, "US")
)

// Pattern matching with mapWith
val names = people.mapWith {
  case Person(name, _, _) => name.toUpperCase
}

// Filter with pattern matching
val youngPeople = people.filterWith {
  case Person(_, age, _) => age < 30
}

// FlatMap with pattern matching
val cityLetters = people.flatMapWith {
  case Person(_, _, city) => city.toCharArray
}

// Group by with pattern matching
val groupedByCity = people.groupingBy {
  case Person(_, _, city) => city
}

// Complex pattern matching example
val salesSummary = sales
  .groupingBy(_.product)
  .reduceGroupWith { salesStream =>
    val salesList = salesStream.toList
    val product = salesList.head.product
    val totalAmount = salesList.map(_.amount).sum
    val regions = salesList.map(_.region).distinct
    (product, totalAmount, regions.mkString(","))
  }

GroupedDataSet Extensions

OnGroupedDataSet Extension Methods

class OnGroupedDataSet[T] {
  def reduceWith(fun: (T, T) => T): DataSet[T]
  def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
  def sortingBy[K: TypeInformation](fun: T => K, order: Order): GroupedDataSet[T]
}

Usage Examples

val sales = env.fromElements(
  ("ProductA", 100), ("ProductB", 200), ("ProductA", 150), ("ProductB", 50)
)

val groupedSales = sales.groupingBy(_._1)

// Reduce with pattern matching
val productTotals = groupedSales.reduceWith {
  case ((product, amount1), (_, amount2)) => (product, amount1 + amount2)
}

// Reduce group with pattern matching and complex logic
val productStats = groupedSales.reduceGroupWith { salesStream =>
  val salesList = salesStream.toList
  val product = salesList.head._1
  val amounts = salesList.map(_._2)
  val total = amounts.sum
  val average = total / amounts.length
  val max = amounts.max
  (product, total, average, max)
}

Cross Product Extensions

OnCrossDataSet Extension Methods

class OnCrossDataSet[L, R] {
  def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
}

Usage Examples

val left = env.fromElements(("A", 1), ("B", 2))
val right = env.fromElements(("X", 10), ("Y", 20))

val crossed = left
  .cross(right)
  .applyWith {
    case ((leftKey, leftVal), (rightKey, rightVal)) => 
      s"$leftKey-$rightKey: ${leftVal * rightVal}"
  }

CoGroup Extensions

OnCoGroupDataSet Extension Methods

class OnCoGroupDataSet[L, R] {
  def applyWith[O: TypeInformation: ClassTag](
    fun: (Stream[L], Stream[R]) => O
  ): DataSet[O]
}

Usage Examples

val left = env.fromElements(("A", 1), ("B", 2), ("A", 3))
val right = env.fromElements(("A", 10), ("C", 30), ("A", 20))

val coGrouped = left
  .coGroup(right)
  .where(_._1)
  .equalTo(_._1)
  .applyWith {
    case (leftStream, rightStream) =>
      val leftList = leftStream.toList
      val rightList = rightStream.toList
      val key = if (leftList.nonEmpty) leftList.head._1 else rightList.head._1
      val leftSum = leftList.map(_._2).sum
      val rightSum = rightList.map(_._2).sum
      (key, leftSum, rightSum)
  }

Join Extensions

OnJoinFunctionAssigner Extension Methods

class OnJoinFunctionAssigner[L, R] {
  def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
}

Usage Examples

val employees = env.fromElements(
  ("Alice", "Engineering"), ("Bob", "Sales"), ("Charlie", "Engineering")
)
val salaries = env.fromElements(
  ("Alice", 75000), ("Bob", 65000), ("Charlie", 80000)
)

val employeeData = employees
  .join(salaries)
  .where(_._1)
  .equalTo(_._1)
  .applyWith {
    case ((name, dept), (_, salary)) => (name, dept, salary)
  }

Advanced Pattern Matching Examples

Working with Complex Case Classes

case class Order(id: Int, customerId: String, items: List[String], total: Double)
case class Customer(id: String, name: String, region: String)

val orders = env.fromElements(
  Order(1, "C001", List("A", "B"), 100.0),
  Order(2, "C002", List("C", "D", "E"), 250.0),
  Order(3, "C001", List("F"), 50.0)
)

val customers = env.fromElements(
  Customer("C001", "Alice Corp", "US"),
  Customer("C002", "Bob Industries", "EU")
)

// Extract order details with pattern matching
val orderSummary = orders.mapWith {
  case Order(id, customerId, items, total) => 
    (id, customerId, items.length, total)
}

// Filter orders by complex criteria
val largeOrders = orders.filterWith {
  case Order(_, _, items, total) => items.length > 2 && total > 200
}

// Join with pattern matching
val customerOrders = customers
  .join(orders)
  .where(_.id)
  .equalTo(_.customerId)
  .applyWith {
    case (Customer(_, name, region), Order(orderId, _, items, total)) =>
      (orderId, name, region, items.length, total)
  }

Working with Tuples and Collections

val tupleData = env.fromElements(
  ("A", List(1, 2, 3), Map("x" -> 10)),
  ("B", List(4, 5), Map("y" -> 20, "z" -> 30)),
  ("C", List(6), Map("x" -> 40))
)

// Pattern match on complex tuple structure
val extracted = tupleData.flatMapWith {
  case (key, numbers, mapping) =>
    for {
      num <- numbers
      (mapKey, mapValue) <- mapping
    } yield (key, mapKey, num * mapValue)
}

// Filter with nested pattern matching
val filtered = tupleData.filterWith {
  case (_, numbers, mapping) => 
    numbers.sum > 5 && mapping.values.max > 15
}

Key Advantages

  1. Pattern Matching Support: Use Scala's powerful pattern matching directly in transformations
  2. Case Class Decomposition: Extract fields from case classes without explicit getters
  3. Tuple Deconstruction: Break down tuples in a readable way
  4. Collection Pattern Matching: Match on collection structures and contents
  5. Cleaner Code: More concise and idiomatic Scala code

Important Notes

  • Extensions require explicit import: import org.apache.flink.api.scala.extensions._
  • All methods have -With suffix to avoid conflicts with regular DataSet methods
  • Full type information is still required for transformations
  • Performance is equivalent to regular DataSet operations - extensions are purely syntactic
  • Particularly useful for complex data structures and ETL operations

The extensions package makes Flink Scala API more idiomatic and easier to work with for Scala developers, especially when dealing with complex data structures and requiring pattern matching capabilities.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-scala-2-12@1.20.2

docs

dataset-operations.md

execution-environment.md

extensions.md

grouped-dataset-operations.md

index.md

join-operations.md

type-system.md

utility-functions.md

tile.json