Apache Flink Scala API module that provides elegant and fluent Scala language bindings for Flink's distributed stream and batch processing framework
npx @tessl/cli install tessl/maven-org-apache-flink--flink-scala_2-11@1.14.0Apache Flink Scala API provides elegant and fluent Scala language bindings for Flink's distributed stream and batch processing framework. This module enables Scala developers to write type-safe data processing applications using idiomatic Scala constructs, including case classes, pattern matching, and functional programming patterns.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironmentFor type utilities:
import org.apache.flink.api.scala.typeutils.TypesFor extension methods (partial function support):
import org.apache.flink.api.scala.extensions._import org.apache.flink.api.scala._
// Create execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// Create DataSet from collection
val data = env.fromCollection(List(1, 2, 3, 4, 5))
// Transform data
val result = data
.filter(_ > 2)
.map(_ * 2)
.reduce(_ + _)
// Execute and get result
println(result.collect().head) // Prints: 18The Flink Scala API is built around several key components:
Environment setup, data source creation, and job execution management. The primary entry point for all Flink programs.
object ExecutionEnvironment {
def getExecutionEnvironment: ExecutionEnvironment
def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
}
class ExecutionEnvironment {
def setParallelism(parallelism: Int): Unit
def getParallelism: Int
def fromCollection[T: TypeInformation: ClassTag](data: Iterable[T]): DataSet[T]
def fromElements[T: TypeInformation: ClassTag](data: T*): DataSet[T]
def readTextFile(filePath: String): DataSet[String]
def execute(): JobExecutionResult
def execute(jobName: String): JobExecutionResult
}Core data processing operations including map, filter, reduce, and aggregations. The heart of Flink's data processing capabilities.
class DataSet[T] {
def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
def filter(fun: T => Boolean): DataSet[T]
def reduce(fun: (T, T) => T): DataSet[T]
def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
def union(other: DataSet[T]): DataSet[T]
}Group-wise operations and aggregation functions for summarizing and analyzing grouped data.
class GroupedDataSet[T] {
def reduce(fun: (T, T) => T): DataSet[T]
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
def sum(field: String): AggregateDataSet[T]
def max(field: String): AggregateDataSet[T]
def min(field: String): AggregateDataSet[T]
def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
}Join, cross, and coGroup operations for combining multiple DataSets.
class DataSet[T] {
def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
def cross[O](other: DataSet[O]): CrossDataSet[T, O]
def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
}Comprehensive type information system and Scala-specific serialization support.
object Types {
def of[T: TypeInformation]: TypeInformation[T]
def OPTION[A](valueType: TypeInformation[A]): TypeInformation[Option[A]]
def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]
def TRY[A](valueType: TypeInformation[A]): TypeInformation[Try[A]]
def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
}
// Automatic type information generation via macro
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]Control over data distribution and partitioning strategies across the cluster.
class DataSet[T] {
def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]
def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]
def rebalance(): DataSet[T]
def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
}Reading data from various sources and writing results to different sinks.
class ExecutionEnvironment {
def readTextFile(filePath: String): DataSet[String]
def readCsvFile[T: ClassTag: TypeInformation](filePath: String): DataSet[T]
def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
}
class DataSet[T] {
def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = null): DataSink[T]
def writeAsCsv(filePath: String): DataSink[T]
def print(): Unit
def collect(): Seq[T]
}Advanced utilities for sampling, indexing, and data analysis.
implicit class DataSetUtils[T](dataSet: DataSet[T]) {
def zipWithIndex: DataSet[(Long, T)]
def zipWithUniqueId: DataSet[(Long, T)]
def sample(withReplacement: Boolean, fraction: Double): DataSet[T]
def countElementsPerPartition: DataSet[(Int, Long)]
}trait TypeInformation[T] {
def getTypeClass: Class[T]
def createSerializer(config: ExecutionConfig): TypeSerializer[T]
}
class JobExecutionResult {
def getJobID: JobID
def getNetRuntime: Long
def getNetRuntime(timeUnit: TimeUnit): Long
}
sealed trait Order
object Order {
case object ASCENDING extends Order
case object DESCENDING extends Order
}
abstract class Partitioner[T] {
def partition(key: T, numPartitions: Int): Int
}