DataSet is the main abstraction for distributed data collections in Flink. It provides immutable transformation operations that create new DataSets.
class DataSet[T] {
def getType: TypeInformation[T]
def getExecutionEnvironment: ExecutionEnvironment
def getParallelism: Int
def setParallelism(parallelism: Int): DataSet[T]
def name(name: String): DataSet[T]
def setDescription(description: String): DataSet[T]
}class DataSet[T] {
def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R]
def mapPartition[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
def mapPartition[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
def mapPartition[R: TypeInformation: ClassTag](mapPartitionFunction: MapPartitionFunction[T, R]): DataSet[R]
}val numbers = env.fromElements(1, 2, 3, 4, 5)
// Simple map with lambda
val doubled = numbers.map(_ * 2)
// Map with function
val squared = numbers.map(x => x * x)
// Map to different type
val numberStrings = numbers.map(n => s"Number: $n")
// Map partition with collector (process entire partition at once)
val processed = numbers.mapPartition { (iterator, collector) =>
val batch = iterator.toList
batch.foreach(x => collector.collect(x * 10))
}
// Map partition returning collection (alternative approach)
val processedAlt = numbers.mapPartition { iterator =>
iterator.toList.map(_ * 10)
}class DataSet[T] {
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R]
}val sentences = env.fromElements("Hello World", "Flink Scala API", "Distributed Processing")
// Split sentences into words
val words = sentences.flatMap(_.split(" "))
// FlatMap with filtering
val longWords = sentences.flatMap(_.split(" ").filter(_.length > 4))
// FlatMap with collections
val chars = words.flatMap(_.toCharArray)class DataSet[T] {
def filter(fun: T => Boolean): DataSet[T]
def filter(filter: FilterFunction[T]): DataSet[T]
}val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Filter even numbers
val evenNumbers = numbers.filter(_ % 2 == 0)
// Filter with complex condition
val filtered = numbers.filter(x => x > 3 && x < 8)class DataSet[T] {
def distinct(): DataSet[T]
def distinct(fields: Int*): DataSet[T]
def distinct(firstField: String, otherFields: String*): DataSet[T]
def distinct[K: TypeInformation](fun: T => K): DataSet[T]
}val duplicates = env.fromElements(1, 2, 2, 3, 3, 3, 4)
// Remove all duplicates
val unique = duplicates.distinct()
// Distinct by specific fields (for tuples/case classes)
val people = env.fromElements(("Alice", 25), ("Bob", 30), ("Alice", 35))
val distinctNames = people.distinct(0) // distinct by first field (name)
// Distinct by key function
case class Person(name: String, age: Int)
val personData = env.fromElements(Person("Alice", 25), Person("Bob", 30), Person("Alice", 35))
val distinctByName = personData.distinct(_.name)class DataSet[T] {
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
def sum(field: Int): AggregateDataSet[T]
def sum(field: String): AggregateDataSet[T]
def max(field: Int): AggregateDataSet[T]
def max(field: String): AggregateDataSet[T]
def min(field: Int): AggregateDataSet[T]
def min(field: String): AggregateDataSet[T]
def maxBy(fields: Int*): DataSet[T]
def maxBy(firstField: String, otherFields: String*): DataSet[T]
def minBy(fields: Int*): DataSet[T]
def minBy(firstField: String, otherFields: String*): DataSet[T]
}val sales = env.fromElements(
("Q1", 1000), ("Q2", 1500), ("Q3", 1200), ("Q4", 1800)
)
// Sum by field index
val totalSales = sales.sum(1)
// Max by field
val maxSales = sales.max(1)
// MaxBy - return entire record with maximum value
val bestQuarter = sales.maxBy(1)class DataSet[T] {
def reduce(fun: (T, T) => T): DataSet[T]
def reduce(reducer: ReduceFunction[T]): DataSet[T]
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
def reduceGroup[R: TypeInformation: ClassTag](groupReducer: GroupReduceFunction[T, R]): DataSet[R]
def combineGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
}val numbers = env.fromElements(1, 2, 3, 4, 5)
// Simple reduce - sum all numbers
val sum = numbers.reduce(_ + _)
// Reduce with more complex logic
val product = numbers.reduce((a, b) => a * b)
// Reduce group (process all elements together)
val statistics = numbers.reduceGroup { iterator =>
val values = iterator.toList
val sum = values.sum
val count = values.length
val avg = sum.toDouble / count
(sum, count, avg)
}class DataSet[T] {
def partitionByHash(fields: Int*): DataSet[T]
def partitionByHash(firstField: String, otherFields: String*): DataSet[T]
def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]
def partitionByRange(fields: Int*): DataSet[T]
def partitionByRange(firstField: String, otherFields: String*): DataSet[T]
def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]
def rebalance(): DataSet[T]
}val data = env.fromElements(("A", 1), ("B", 2), ("C", 3), ("D", 4))
// Hash partition by first field
val hashPartitioned = data.partitionByHash(0)
// Range partition by second field
val rangePartitioned = data.partitionByRange(1)
// Custom partitioning
val customPartitioned = data.partitionCustom(new MyPartitioner(), _._1)
// Rebalance (round-robin distribution)
val rebalanced = data.rebalance()class DataSet[T] {
def sortPartition(field: Int, order: Order): DataSet[T]
def sortPartition(field: String, order: Order): DataSet[T]
def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
}import org.apache.flink.api.common.operators.Order
val data = env.fromElements(("Alice", 25), ("Bob", 30), ("Charlie", 20))
// Sort partitions by age in ascending order
val sortedByAge = data.sortPartition(1, Order.ASCENDING)
// Sort by key function
val sortedByName = data.sortPartition(_._1, Order.DESCENDING)class DataSet[T] {
def collect(): Seq[T]
def count(): Long
def print(): Unit
def printToErr(): Unit
def printOnTaskManager(sinkIdentifier: String): DataSink[T]
}val numbers = env.fromElements(1, 2, 3, 4, 5)
// Collect to local collection (triggers execution)
val results: Seq[Int] = numbers.collect()
// Count elements
val elementCount: Long = numbers.count()
// Print to console
numbers.print()
// Print to stderr
numbers.printToErr()class DataSet[T] {
def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE): DataSink[T]
def writeAsCsv(
filePath: String,
rowDelimiter: String = "\n",
fieldDelimiter: String = ",",
writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE
): DataSink[T]
def write[O <: FileOutputFormat[T]](
outputFormat: O,
filePath: String,
writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE
): DataSink[T]
def output(outputFormat: OutputFormat[T]): DataSink[T]
}val results = env.fromElements("Hello", "World", "Flink")
// Write as text file
results.writeAsText("output/results.txt")
// Write as CSV with custom delimiters
val csvData = env.fromElements(("Alice", 25), ("Bob", 30))
csvData.writeAsCsv("output/people.csv", fieldDelimiter = ";")
// Write with custom output format
results.write(new MyOutputFormat(), "output/custom.dat")class DataSet[T] {
def first(n: Int): DataSet[T]
}val numbers = env.fromElements(10, 5, 8, 3, 1, 9, 2)
// Get first 3 elements (order not guaranteed unless sorted)
val firstThree = numbers.first(3)
// Get first 3 after sorting
val topThree = numbers.sortPartition(0, Order.DESCENDING).first(3)class DataSet[T] {
def union(other: DataSet[T]): DataSet[T]
}val set1 = env.fromElements(1, 2, 3)
val set2 = env.fromElements(4, 5, 6)
// Union two datasets
val combined = set1.union(set2)
// Chain multiple unions
val set3 = env.fromElements(7, 8, 9)
val allSets = set1.union(set2).union(set3)class DataSet[T] {
def withBroadcastSet(data: DataSet[_], name: String): DataSet[T]
}val lookupData = env.fromElements(("A", 100), ("B", 200), ("C", 300))
val mainData = env.fromElements("A", "B", "C", "A")
// Use lookup data as broadcast variable
val enriched = mainData
.withBroadcastSet(lookupData, "lookup")
.map(new RichMapFunction[String, (String, Int)] {
var lookupMap: Map[String, Int] = _
override def open(parameters: Configuration): Unit = {
import scala.collection.JavaConverters._
val lookupCollection = getRuntimeContext
.getBroadcastVariable[(String, Int)]("lookup")
.asScala
lookupMap = lookupCollection.toMap
}
override def map(value: String): (String, Int) = {
(value, lookupMap.getOrElse(value, 0))
}
})class DataSet[T] {
def withForwardedFields(forwardedFields: String*): DataSet[T]
def withForwardedFieldsFirst(forwardedFields: String*): DataSet[T]
def withForwardedFieldsSecond(forwardedFields: String*): DataSet[T]
}val tuples = env.fromElements(("Alice", 25, "Engineer"), ("Bob", 30, "Manager"))
// Hint that first field is forwarded unchanged
val mapped = tuples
.withForwardedFields("0") // field 0 (name) is forwarded
.map(t => (t._1, t._2 + 1, t._3)) // only increment ageclass DataSet[T] {
def iterate(maxIterations: Int)(stepFunction: DataSet[T] => DataSet[T]): DataSet[T]
def iterateWithTermination(maxIterations: Int)(
stepFunction: DataSet[T] => (DataSet[T], DataSet[_])
): DataSet[T]
def iterateDelta[R: ClassTag](
workset: DataSet[R],
maxIterations: Int,
keyFields: Array[Int]
)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
def iterateDelta[R: ClassTag](
workset: DataSet[R],
maxIterations: Int,
keyFields: Array[String]
)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
def iterateDelta[R: ClassTag](
workset: DataSet[R],
maxIterations: Int,
keyFields: Array[Int]
)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R], DataSet[_])): DataSet[T]
}val initial = env.fromElements(1.0)
// Simple iteration - compute power
val result = initial.iterate(10) { current =>
current.map(_ * 2)
}
// Iteration with termination criterion
val converged = initial.iterateWithTermination(100) { current =>
val next = current.map(x => (x + 2.0 / x) / 2.0) // Newton's method for sqrt(2)
val termination = current.filter(Math.abs(_) < 0.0001)
(next, termination)
}import org.apache.flink.api.common.operators.Order
object Order extends Enumeration {
val ASCENDING, DESCENDING = Value
}
object FileSystem {
object WriteMode extends Enumeration {
val NO_OVERWRITE, OVERWRITE = Value
}
}
trait DataSink[T] {
def name(name: String): DataSink[T]
def setParallelism(parallelism: Int): DataSink[T]
}
class AggregateDataSet[T](private[flink] val set: DataSet[T], private[flink] val keys: Keys[T], private[flink] val aggregations: Seq[AggregationFunction[_]]) {
def and(agg: Aggregations, field: Int): AggregateDataSet[T]
def and(agg: Aggregations, field: String): AggregateDataSet[T]
def andSum(field: Int): AggregateDataSet[T]
def andSum(field: String): AggregateDataSet[T]
def andMax(field: Int): AggregateDataSet[T]
def andMax(field: String): AggregateDataSet[T]
def andMin(field: Int): AggregateDataSet[T]
def andMin(field: String): AggregateDataSet[T]
}