Apache Flink Scala API providing type-safe operations and functional programming paradigms for distributed stream and batch processing applications.
DataSet is the main abstraction for distributed data collections in Flink. It provides immutable transformation operations that create new DataSets.
class DataSet[T] {
def getType: TypeInformation[T]
def getExecutionEnvironment: ExecutionEnvironment
def getParallelism: Int
def setParallelism(parallelism: Int): DataSet[T]
def name(name: String): DataSet[T]
def setDescription(description: String): DataSet[T]
}class DataSet[T] {
def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R]
def mapPartition[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
def mapPartition[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
def mapPartition[R: TypeInformation: ClassTag](mapPartitionFunction: MapPartitionFunction[T, R]): DataSet[R]
}val numbers = env.fromElements(1, 2, 3, 4, 5)
// Simple map with lambda
val doubled = numbers.map(_ * 2)
// Map with function
val squared = numbers.map(x => x * x)
// Map to different type
val numberStrings = numbers.map(n => s"Number: $n")
// Map partition with collector (process entire partition at once)
val processed = numbers.mapPartition { (iterator, collector) =>
val batch = iterator.toList
batch.foreach(x => collector.collect(x * 10))
}
// Map partition returning collection (alternative approach)
val processedAlt = numbers.mapPartition { iterator =>
iterator.toList.map(_ * 10)
}class DataSet[T] {
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R]
}val sentences = env.fromElements("Hello World", "Flink Scala API", "Distributed Processing")
// Split sentences into words
val words = sentences.flatMap(_.split(" "))
// FlatMap with filtering
val longWords = sentences.flatMap(_.split(" ").filter(_.length > 4))
// FlatMap with collections
val chars = words.flatMap(_.toCharArray)class DataSet[T] {
def filter(fun: T => Boolean): DataSet[T]
def filter(filter: FilterFunction[T]): DataSet[T]
}val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Filter even numbers
val evenNumbers = numbers.filter(_ % 2 == 0)
// Filter with complex condition
val filtered = numbers.filter(x => x > 3 && x < 8)class DataSet[T] {
def distinct(): DataSet[T]
def distinct(fields: Int*): DataSet[T]
def distinct(firstField: String, otherFields: String*): DataSet[T]
def distinct[K: TypeInformation](fun: T => K): DataSet[T]
}val duplicates = env.fromElements(1, 2, 2, 3, 3, 3, 4)
// Remove all duplicates
val unique = duplicates.distinct()
// Distinct by specific fields (for tuples/case classes)
val people = env.fromElements(("Alice", 25), ("Bob", 30), ("Alice", 35))
val distinctNames = people.distinct(0) // distinct by first field (name)
// Distinct by key function
case class Person(name: String, age: Int)
val personData = env.fromElements(Person("Alice", 25), Person("Bob", 30), Person("Alice", 35))
val distinctByName = personData.distinct(_.name)class DataSet[T] {
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
def sum(field: Int): AggregateDataSet[T]
def sum(field: String): AggregateDataSet[T]
def max(field: Int): AggregateDataSet[T]
def max(field: String): AggregateDataSet[T]
def min(field: Int): AggregateDataSet[T]
def min(field: String): AggregateDataSet[T]
def maxBy(fields: Int*): DataSet[T]
def maxBy(firstField: String, otherFields: String*): DataSet[T]
def minBy(fields: Int*): DataSet[T]
def minBy(firstField: String, otherFields: String*): DataSet[T]
}val sales = env.fromElements(
("Q1", 1000), ("Q2", 1500), ("Q3", 1200), ("Q4", 1800)
)
// Sum by field index
val totalSales = sales.sum(1)
// Max by field
val maxSales = sales.max(1)
// MaxBy - return entire record with maximum value
val bestQuarter = sales.maxBy(1)class DataSet[T] {
def reduce(fun: (T, T) => T): DataSet[T]
def reduce(reducer: ReduceFunction[T]): DataSet[T]
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
def reduceGroup[R: TypeInformation: ClassTag](groupReducer: GroupReduceFunction[T, R]): DataSet[R]
def combineGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
}val numbers = env.fromElements(1, 2, 3, 4, 5)
// Simple reduce - sum all numbers
val sum = numbers.reduce(_ + _)
// Reduce with more complex logic
val product = numbers.reduce((a, b) => a * b)
// Reduce group (process all elements together)
val statistics = numbers.reduceGroup { iterator =>
val values = iterator.toList
val sum = values.sum
val count = values.length
val avg = sum.toDouble / count
(sum, count, avg)
}class DataSet[T] {
def partitionByHash(fields: Int*): DataSet[T]
def partitionByHash(firstField: String, otherFields: String*): DataSet[T]
def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]
def partitionByRange(fields: Int*): DataSet[T]
def partitionByRange(firstField: String, otherFields: String*): DataSet[T]
def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]
def rebalance(): DataSet[T]
}val data = env.fromElements(("A", 1), ("B", 2), ("C", 3), ("D", 4))
// Hash partition by first field
val hashPartitioned = data.partitionByHash(0)
// Range partition by second field
val rangePartitioned = data.partitionByRange(1)
// Custom partitioning
val customPartitioned = data.partitionCustom(new MyPartitioner(), _._1)
// Rebalance (round-robin distribution)
val rebalanced = data.rebalance()class DataSet[T] {
def sortPartition(field: Int, order: Order): DataSet[T]
def sortPartition(field: String, order: Order): DataSet[T]
def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
}import org.apache.flink.api.common.operators.Order
val data = env.fromElements(("Alice", 25), ("Bob", 30), ("Charlie", 20))
// Sort partitions by age in ascending order
val sortedByAge = data.sortPartition(1, Order.ASCENDING)
// Sort by key function
val sortedByName = data.sortPartition(_._1, Order.DESCENDING)class DataSet[T] {
def collect(): Seq[T]
def count(): Long
def print(): Unit
def printToErr(): Unit
def printOnTaskManager(sinkIdentifier: String): DataSink[T]
}val numbers = env.fromElements(1, 2, 3, 4, 5)
// Collect to local collection (triggers execution)
val results: Seq[Int] = numbers.collect()
// Count elements
val elementCount: Long = numbers.count()
// Print to console
numbers.print()
// Print to stderr
numbers.printToErr()class DataSet[T] {
def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE): DataSink[T]
def writeAsCsv(
filePath: String,
rowDelimiter: String = "\n",
fieldDelimiter: String = ",",
writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE
): DataSink[T]
def write[O <: FileOutputFormat[T]](
outputFormat: O,
filePath: String,
writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE
): DataSink[T]
def output(outputFormat: OutputFormat[T]): DataSink[T]
}val results = env.fromElements("Hello", "World", "Flink")
// Write as text file
results.writeAsText("output/results.txt")
// Write as CSV with custom delimiters
val csvData = env.fromElements(("Alice", 25), ("Bob", 30))
csvData.writeAsCsv("output/people.csv", fieldDelimiter = ";")
// Write with custom output format
results.write(new MyOutputFormat(), "output/custom.dat")class DataSet[T] {
def first(n: Int): DataSet[T]
}val numbers = env.fromElements(10, 5, 8, 3, 1, 9, 2)
// Get first 3 elements (order not guaranteed unless sorted)
val firstThree = numbers.first(3)
// Get first 3 after sorting
val topThree = numbers.sortPartition(0, Order.DESCENDING).first(3)class DataSet[T] {
def union(other: DataSet[T]): DataSet[T]
}val set1 = env.fromElements(1, 2, 3)
val set2 = env.fromElements(4, 5, 6)
// Union two datasets
val combined = set1.union(set2)
// Chain multiple unions
val set3 = env.fromElements(7, 8, 9)
val allSets = set1.union(set2).union(set3)class DataSet[T] {
def withBroadcastSet(data: DataSet[_], name: String): DataSet[T]
}val lookupData = env.fromElements(("A", 100), ("B", 200), ("C", 300))
val mainData = env.fromElements("A", "B", "C", "A")
// Use lookup data as broadcast variable
val enriched = mainData
.withBroadcastSet(lookupData, "lookup")
.map(new RichMapFunction[String, (String, Int)] {
var lookupMap: Map[String, Int] = _
override def open(parameters: Configuration): Unit = {
import scala.collection.JavaConverters._
val lookupCollection = getRuntimeContext
.getBroadcastVariable[(String, Int)]("lookup")
.asScala
lookupMap = lookupCollection.toMap
}
override def map(value: String): (String, Int) = {
(value, lookupMap.getOrElse(value, 0))
}
})class DataSet[T] {
def withForwardedFields(forwardedFields: String*): DataSet[T]
def withForwardedFieldsFirst(forwardedFields: String*): DataSet[T]
def withForwardedFieldsSecond(forwardedFields: String*): DataSet[T]
}val tuples = env.fromElements(("Alice", 25, "Engineer"), ("Bob", 30, "Manager"))
// Hint that first field is forwarded unchanged
val mapped = tuples
.withForwardedFields("0") // field 0 (name) is forwarded
.map(t => (t._1, t._2 + 1, t._3)) // only increment ageclass DataSet[T] {
def iterate(maxIterations: Int)(stepFunction: DataSet[T] => DataSet[T]): DataSet[T]
def iterateWithTermination(maxIterations: Int)(
stepFunction: DataSet[T] => (DataSet[T], DataSet[_])
): DataSet[T]
def iterateDelta[R: ClassTag](
workset: DataSet[R],
maxIterations: Int,
keyFields: Array[Int]
)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
def iterateDelta[R: ClassTag](
workset: DataSet[R],
maxIterations: Int,
keyFields: Array[String]
)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
def iterateDelta[R: ClassTag](
workset: DataSet[R],
maxIterations: Int,
keyFields: Array[Int]
)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R], DataSet[_])): DataSet[T]
}val initial = env.fromElements(1.0)
// Simple iteration - compute power
val result = initial.iterate(10) { current =>
current.map(_ * 2)
}
// Iteration with termination criterion
val converged = initial.iterateWithTermination(100) { current =>
val next = current.map(x => (x + 2.0 / x) / 2.0) // Newton's method for sqrt(2)
val termination = current.filter(Math.abs(_) < 0.0001)
(next, termination)
}import org.apache.flink.api.common.operators.Order
object Order extends Enumeration {
val ASCENDING, DESCENDING = Value
}
object FileSystem {
object WriteMode extends Enumeration {
val NO_OVERWRITE, OVERWRITE = Value
}
}
trait DataSink[T] {
def name(name: String): DataSink[T]
def setParallelism(parallelism: Int): DataSink[T]
}
class AggregateDataSet[T](private[flink] val set: DataSet[T], private[flink] val keys: Keys[T], private[flink] val aggregations: Seq[AggregationFunction[_]]) {
def and(agg: Aggregations, field: Int): AggregateDataSet[T]
def and(agg: Aggregations, field: String): AggregateDataSet[T]
def andSum(field: Int): AggregateDataSet[T]
def andSum(field: String): AggregateDataSet[T]
def andMax(field: Int): AggregateDataSet[T]
def andMax(field: String): AggregateDataSet[T]
def andMin(field: Int): AggregateDataSet[T]
def andMin(field: String): AggregateDataSet[T]
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-scala-2-12