Apache Flink Scala API providing type-safe operations and functional programming paradigms for distributed stream and batch processing applications.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-scala_2-12@1.20.0Apache Flink Scala API provides type-safe operations and functional programming paradigms for distributed stream and batch processing applications. It offers elegant Scala APIs with case class support, pattern matching, and functional composition patterns for building scalable data processing pipelines.
⚠️ Deprecation Notice: All Flink Scala APIs are deprecated and will be removed in a future Flink version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.20.2</version>
</dependency>import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironmentFor specific functionality:
import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.api.scala.extensions._ // For partial function supportimport org.apache.flink.api.scala._
// Create execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// Create dataset from elements
val data: DataSet[String] = env.fromElements("Hello", "World", "from", "Flink")
// Transform data
val wordCounts = data
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_, 1))
.groupBy(0)
.sum(1)
// Output results
wordCounts.print()
// Execute the program
env.execute("Word Count Example")
// Example with partial functions (requires extensions import)
import org.apache.flink.api.scala.extensions._
case class Sale(region: String, product: String, amount: Double)
val sales = env.fromElements(
Sale("US", "ProductA", 100.0),
Sale("EU", "ProductA", 150.0)
)
val result = sales
.filterWith { case Sale(region, _, _) => region == "US" }
.mapWith { case Sale(region, product, amount) => (product, amount) }
.groupingBy(_._1)
.sum(1)The Flink Scala API is built around these core concepts:
The entry point for all Flink Scala programs, providing methods to create DataSets and configure execution.
object ExecutionEnvironment {
def getExecutionEnvironment: ExecutionEnvironment
def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
}
class ExecutionEnvironment {
def setParallelism(parallelism: Int): Unit
def getParallelism: Int
def getConfig: ExecutionConfig
// Data source creation
def fromElements[T: ClassTag: TypeInformation](data: T*): DataSet[T]
def fromCollection[T: ClassTag: TypeInformation](data: Iterable[T]): DataSet[T]
def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
def readCsvFile[T: ClassTag: TypeInformation](filePath: String, /* ... */): DataSet[T]
def generateSequence(from: Long, to: Long): DataSet[Long]
// Execution
def execute(): JobExecutionResult
def execute(jobName: String): JobExecutionResult
def executeAsync(): JobClient
}Core data transformation and processing operations on distributed datasets.
class DataSet[T] {
// Basic transformations
def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
def filter(fun: T => Boolean): DataSet[T]
def distinct(): DataSet[T]
// Grouping
def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
def groupBy(fields: Int*): GroupedDataSet[T]
// Aggregations
def reduce(fun: (T, T) => T): DataSet[T]
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
def sum(field: Int): AggregateDataSet[T]
def max(field: Int): AggregateDataSet[T]
def min(field: Int): AggregateDataSet[T]
// Output operations
def collect(): Seq[T]
def print(): Unit
def count(): Long
def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE): DataSink[T]
}Joining datasets on keys with various join types and optimization hints.
class DataSet[T] {
def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
}
class UnfinishedJoinOperation[L, R] {
def where[K: TypeInformation](fun: L => K): HalfUnfinishedJoinOperation[L, R]
def where(fields: Int*): HalfUnfinishedJoinOperation[L, R]
}
class JoinDataSet[L, R] {
def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
}Operations available on grouped datasets including sorting and specialized aggregations.
class GroupedDataSet[T] {
def sortGroup(field: Int, order: Order): GroupedDataSet[T]
def reduce(fun: (T, T) => T): DataSet[T]
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
def sum(field: Int): AggregateDataSet[T]
def max(field: Int): AggregateDataSet[T]
def min(field: Int): AggregateDataSet[T]
def maxBy(fields: Int*): DataSet[T]
def minBy(fields: Int*): DataSet[T]
def first(n: Int): DataSet[T]
}Scala-specific type information system for serialization and type safety.
object Types {
// Basic types
val STRING: TypeInformation[String]
val INT: TypeInformation[Int]
val LONG: TypeInformation[Long]
val DOUBLE: TypeInformation[Double]
val BOOLEAN: TypeInformation[Boolean]
// Factory methods
def of[T: TypeInformation]: TypeInformation[T]
def TUPLE[T: TypeInformation]: TypeInformation[T]
def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T]
def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]
}
// Implicit type information generation (macro-based)
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]Specialized DataSet types for partitioned and sorted data operations.
class PartitionSortedDataSet[T] extends DataSet[T] {
def sortPartition(field: Int, order: Order): DataSet[T]
def sortPartition(field: String, order: Order): DataSet[T]
// Note: Cannot chain key selector functions
}Scala-friendly extension methods that accept partial functions for pattern matching.
// Import extensions
import org.apache.flink.api.scala.extensions._
class OnDataSet[T] {
def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
def filterWith(fun: T => Boolean): DataSet[T]
def reduceWith(fun: (T, T) => T): DataSet[T]
def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
def groupingBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
}Additional utilities for DataSet operations including sampling, partitioning, and indexing.
// Import utilities
import org.apache.flink.api.scala.utils._
class DataSet[T] {
// Available via implicit conversions from utils package
def countElementsPerPartition(): DataSet[(Int, Long)]
def zipWithIndex(): DataSet[(Long, T)]
def zipWithUniqueId(): DataSet[(Long, T)]
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.RNG.nextLong()): DataSet[T]
def sampleWithSize(withReplacement: Boolean, numSamples: Int, seed: Long = Utils.RNG.nextLong()): DataSet[T]
def checksumHashCode(): ChecksumHashCode
}// Execution configuration
class ExecutionConfig {
def setParallelism(parallelism: Int): ExecutionConfig
def getParallelism: Int
def enableClosureCleaner(): ExecutionConfig
def disableClosureCleaner(): ExecutionConfig
}
// Job execution result
class JobExecutionResult {
def getJobExecutionTime: Long
def getAccumulatorResult[T](accumulatorName: String): T
}
// Aggregation types
object Aggregations extends Enumeration {
val SUM, MAX, MIN = Value
}
// File system write modes
object FileSystem {
object WriteMode extends Enumeration {
val NO_OVERWRITE, OVERWRITE = Value
}
}
// Ordering for sorting
object Order extends Enumeration {
val ASCENDING, DESCENDING = Value
}The Flink Scala API can throw these exceptions:
IllegalArgumentException - Invalid parameters or field namesUnsupportedOperationException - Unsupported operations on certain data typesRuntimeException - Runtime execution errorsIOException - File I/O related errorsJobExecutionException - Job execution failures