Apache Flink Scala API module that provides elegant and fluent Scala language bindings for Flink's distributed stream and batch processing framework
Apache Flink Scala API provides elegant and fluent Scala language bindings for Flink's distributed stream and batch processing framework. This module enables Scala developers to write type-safe data processing applications using idiomatic Scala constructs, including case classes, pattern matching, and functional programming patterns.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironmentFor type utilities:
import org.apache.flink.api.scala.typeutils.TypesFor extension methods (partial function support):
import org.apache.flink.api.scala.extensions._import org.apache.flink.api.scala._
// Create execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// Create DataSet from collection
val data = env.fromCollection(List(1, 2, 3, 4, 5))
// Transform data
val result = data
.filter(_ > 2)
.map(_ * 2)
.reduce(_ + _)
// Execute and get result
println(result.collect().head) // Prints: 18The Flink Scala API is built around several key components:
Environment setup, data source creation, and job execution management. The primary entry point for all Flink programs.
object ExecutionEnvironment {
def getExecutionEnvironment: ExecutionEnvironment
def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
}
class ExecutionEnvironment {
def setParallelism(parallelism: Int): Unit
def getParallelism: Int
def fromCollection[T: TypeInformation: ClassTag](data: Iterable[T]): DataSet[T]
def fromElements[T: TypeInformation: ClassTag](data: T*): DataSet[T]
def readTextFile(filePath: String): DataSet[String]
def execute(): JobExecutionResult
def execute(jobName: String): JobExecutionResult
}Core data processing operations including map, filter, reduce, and aggregations. The heart of Flink's data processing capabilities.
class DataSet[T] {
def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
def filter(fun: T => Boolean): DataSet[T]
def reduce(fun: (T, T) => T): DataSet[T]
def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
def union(other: DataSet[T]): DataSet[T]
}Group-wise operations and aggregation functions for summarizing and analyzing grouped data.
class GroupedDataSet[T] {
def reduce(fun: (T, T) => T): DataSet[T]
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
def sum(field: String): AggregateDataSet[T]
def max(field: String): AggregateDataSet[T]
def min(field: String): AggregateDataSet[T]
def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
}Join, cross, and coGroup operations for combining multiple DataSets.
class DataSet[T] {
def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
def cross[O](other: DataSet[O]): CrossDataSet[T, O]
def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
}Comprehensive type information system and Scala-specific serialization support.
object Types {
def of[T: TypeInformation]: TypeInformation[T]
def OPTION[A](valueType: TypeInformation[A]): TypeInformation[Option[A]]
def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]
def TRY[A](valueType: TypeInformation[A]): TypeInformation[Try[A]]
def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
}
// Automatic type information generation via macro
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]Control over data distribution and partitioning strategies across the cluster.
class DataSet[T] {
def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]
def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]
def rebalance(): DataSet[T]
def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
}Reading data from various sources and writing results to different sinks.
class ExecutionEnvironment {
def readTextFile(filePath: String): DataSet[String]
def readCsvFile[T: ClassTag: TypeInformation](filePath: String): DataSet[T]
def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
}
class DataSet[T] {
def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = null): DataSink[T]
def writeAsCsv(filePath: String): DataSink[T]
def print(): Unit
def collect(): Seq[T]
}Advanced utilities for sampling, indexing, and data analysis.
implicit class DataSetUtils[T](dataSet: DataSet[T]) {
def zipWithIndex: DataSet[(Long, T)]
def zipWithUniqueId: DataSet[(Long, T)]
def sample(withReplacement: Boolean, fraction: Double): DataSet[T]
def countElementsPerPartition: DataSet[(Int, Long)]
}trait TypeInformation[T] {
def getTypeClass: Class[T]
def createSerializer(config: ExecutionConfig): TypeSerializer[T]
}
class JobExecutionResult {
def getJobID: JobID
def getNetRuntime: Long
def getNetRuntime(timeUnit: TimeUnit): Long
}
sealed trait Order
object Order {
case object ASCENDING extends Order
case object DESCENDING extends Order
}
abstract class Partitioner[T] {
def partition(key: T, numPartitions: Int): Int
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-scala-2-11