Apache Flink Scala API providing type-safe distributed stream and batch processing with idiomatic Scala constructs, functional programming features, and seamless runtime integration.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-scala_2-10@1.3.0Apache Flink Scala API provides type-safe distributed stream and batch processing with idiomatic Scala constructs, functional programming features, and seamless runtime integration. It enables Scala developers to write data processing applications using Flink's powerful streaming and batch processing capabilities with native Scala types, case classes, pattern matching, and functional transformations.
pom.xml:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.api.scala._
import org.apache.flink.api.scala.{ExecutionEnvironment, DataSet}import org.apache.flink.api.scala._
// Create execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// Create a DataSet from a collection
val data = env.fromElements(1, 2, 3, 4, 5)
// Transform the data
val result = data
.map(_ * 2)
.filter(_ > 4)
// Execute and collect results
result.print()The Flink Scala API is built around several core abstractions:
The ExecutionEnvironment is the main entry point for Flink batch programs, providing data source creation and execution control.
object ExecutionEnvironment {
def getExecutionEnvironment: ExecutionEnvironment
def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
}
class ExecutionEnvironment {
def setParallelism(parallelism: Int): Unit
def getParallelism: Int
def execute(): JobExecutionResult
def execute(jobName: String): JobExecutionResult
}Create DataSets from various sources and write results to different output formats.
class ExecutionEnvironment {
def fromCollection[T: ClassTag : TypeInformation](data: Iterable[T]): DataSet[T]
def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T]
def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
def generateSequence(from: Long, to: Long): DataSet[Long]
}
class DataSet[T] {
def writeAsText(filePath: String): DataSink[T]
def print(): DataSink[T]
def collect(): Seq[T]
}Core transformation operations for processing data with functional programming patterns.
class DataSet[T] {
def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
def filter(fun: T => Boolean): DataSet[T]
def reduce(fun: (T, T) => T): DataSet[T]
def distinct(): DataSet[T]
def union(other: DataSet[T]*): DataSet[T]
}Group data by keys and perform aggregation operations with type-safe field access.
class DataSet[T] {
def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
def groupBy(fields: Int*): GroupedDataSet[T]
def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]
}
class GroupedDataSet[T] {
def reduce(fun: (T, T) => T): DataSet[T]
def sum(field: Int): DataSet[T]
def max(field: Int): DataSet[T]
def min(field: Int): DataSet[T]
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
}Combine multiple DataSets using joins, co-groups, and cross products with flexible key selection.
class DataSet[T] {
def join[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
def cross[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]
}
class UnfinishedJoinOperation[T, O] {
def where[K: TypeInformation](keySelector: T => K): UnfinishedJoinOperationWhere[T, O]
def where(fields: Int*): UnfinishedJoinOperationWhere[T, O]
}Support for iterative algorithms with both bulk iteration and delta iteration patterns.
class DataSet[T] {
def iterate(maxIterations: Int)(stepFunction: DataSet[T] => DataSet[T]): DataSet[T]
def iterateWithTermination(maxIterations: Int)(stepFunction: DataSet[T] => (DataSet[T], DataSet[_])): DataSet[T]
def iterateDelta[R: TypeInformation: ClassTag](
workset: DataSet[R],
maxIterations: Int,
keyFields: Array[String]
)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
}Comprehensive type information system supporting Scala types with macro-based code generation.
// Implicit type information generation
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
// Core type information classes
class CaseClassTypeInfo[T] extends TypeInformation[T]
class OptionTypeInfo[T] extends TypeInformation[Option[T]]
class EitherTypeInfo[A, B] extends TypeInformation[Either[A, B]]
class TryTypeInfo[T] extends TypeInformation[Try[T]]Native integration with Hadoop MapReduce and MapRed input/output formats.
class ExecutionEnvironment {
def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
inputFormat: MapreduceInputFormat[K, V],
keyClass: Class[K],
valueClass: Class[V],
inputPath: String
): DataSet[(K, V)]
}// Core execution types
class ExecutionEnvironment
class DataSet[T]
class GroupedDataSet[T]
class CrossDataSet[T, O]
class AggregateDataSet[T]
class CoGroupDataSet[T, O]
// Configuration and results
class ExecutionConfig
class JobExecutionResult
class JobID
// Resource management
class ResourceSpec
// Type information
abstract class TypeInformation[T]
class CaseClassTypeInfo[T] extends TypeInformation[T]
class OptionTypeInfo[T] extends TypeInformation[Option[T]]
class EitherTypeInfo[A, B] extends TypeInformation[Either[A, B]]
// Serialization
abstract class TypeSerializer[T]
class CaseClassSerializer[T] extends TypeSerializer[T]
// Operators and functions
trait MapFunction[T, O]
trait FlatMapFunction[T, O]
trait FilterFunction[T]
trait ReduceFunction[T]
trait GroupReduceFunction[T, O]
trait JoinFunction[T, O, R]
trait CoGroupFunction[T, O, R]
trait CrossFunction[T, O, R]
// Aggregation types
class Aggregations
class Order
// Join operations
class UnfinishedJoinOperation[T, O]
class UnfinishedJoinOperationWhere[T, O]
class UnfinishedJoinOperationWhereEqual[T, O]
class JoinDataSet[T, O]
// Output types
class DataSink[T]
trait OutputFormat[T]
class TextOutputFormat[T] extends OutputFormat[T]
// Input types
trait InputFormat[T, S]
class TextInputFormat extends InputFormat[String, FileInputSplit]
// Utilities
object DataSetUtils
class ScalaGauge[T]
// Accumulator types
trait Accumulator[V, R]
class IntCounter extends Accumulator[Int, Int]
class LongCounter extends Accumulator[Long, Long]
class DoubleCounter extends Accumulator[Double, Double]
class Histogram extends Accumulator[Int, java.util.Map[Int, Int]]
class ListAccumulator[T] extends Accumulator[T, java.util.ArrayList[T]]
class IntMaximum extends Accumulator[Int, Int]
class IntMinimum extends Accumulator[Int, Int]
class DoubleMaximum extends Accumulator[Double, Double]
class DoubleMinimum extends Accumulator[Double, Double]
// Rich functions
trait RichFunction
abstract class RichMapFunction[T, O] extends MapFunction[T, O] with RichFunction
abstract class RichFlatMapFunction[T, O] extends FlatMapFunction[T, O] with RichFunction
abstract class RichFilterFunction[T] extends FilterFunction[T] with RichFunction
abstract class RichReduceFunction[T] extends ReduceFunction[T] with RichFunction
abstract class RichGroupReduceFunction[T, O] extends GroupReduceFunction[T, O] with RichFunction
abstract class RichMapPartitionFunction[T, O] extends MapPartitionFunction[T, O] with RichFunction
// Runtime context
trait RuntimeContext
trait BroadcastVariableInitializer[T, C]
// Configuration
class Configuration