or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

dataset-operations.mdexecution-environment.mdgrouped-dataset-operations.mdindex.mdjoin-operations.mdtype-system.mdutility-functions.md
tile.json

dataset-operations.mddocs/

DataSet Operations

DataSet is the main abstraction for distributed data collections in Flink. It provides immutable transformation operations that create new DataSets.

Basic Properties

class DataSet[T] {
  def getType: TypeInformation[T]
  def getExecutionEnvironment: ExecutionEnvironment
  def getParallelism: Int
  def setParallelism(parallelism: Int): DataSet[T]
  def name(name: String): DataSet[T]
  def setDescription(description: String): DataSet[T]
}

Basic Transformations

Map Operations

class DataSet[T] {
  def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
  def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R]
  def mapPartition[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
  def mapPartition[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
  def mapPartition[R: TypeInformation: ClassTag](mapPartitionFunction: MapPartitionFunction[T, R]): DataSet[R]
}

Usage Examples

val numbers = env.fromElements(1, 2, 3, 4, 5)

// Simple map with lambda
val doubled = numbers.map(_ * 2)

// Map with function
val squared = numbers.map(x => x * x)

// Map to different type
val numberStrings = numbers.map(n => s"Number: $n")

// Map partition with collector (process entire partition at once)
val processed = numbers.mapPartition { (iterator, collector) =>
  val batch = iterator.toList
  batch.foreach(x => collector.collect(x * 10))
}

// Map partition returning collection (alternative approach)
val processedAlt = numbers.mapPartition { iterator =>
  iterator.toList.map(_ * 10)
}

FlatMap Operations

class DataSet[T] {
  def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
  def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R]
}
val sentences = env.fromElements("Hello World", "Flink Scala API", "Distributed Processing")

// Split sentences into words
val words = sentences.flatMap(_.split(" "))

// FlatMap with filtering
val longWords = sentences.flatMap(_.split(" ").filter(_.length > 4))

// FlatMap with collections
val chars = words.flatMap(_.toCharArray)

Filter Operations

class DataSet[T] {
  def filter(fun: T => Boolean): DataSet[T]
  def filter(filter: FilterFunction[T]): DataSet[T]
}
val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// Filter even numbers
val evenNumbers = numbers.filter(_ % 2 == 0)

// Filter with complex condition
val filtered = numbers.filter(x => x > 3 && x < 8)

Distinct Operations

class DataSet[T] {
  def distinct(): DataSet[T]
  def distinct(fields: Int*): DataSet[T]
  def distinct(firstField: String, otherFields: String*): DataSet[T]
  def distinct[K: TypeInformation](fun: T => K): DataSet[T]
}
val duplicates = env.fromElements(1, 2, 2, 3, 3, 3, 4)

// Remove all duplicates
val unique = duplicates.distinct()

// Distinct by specific fields (for tuples/case classes)
val people = env.fromElements(("Alice", 25), ("Bob", 30), ("Alice", 35))
val distinctNames = people.distinct(0) // distinct by first field (name)

// Distinct by key function
case class Person(name: String, age: Int)
val personData = env.fromElements(Person("Alice", 25), Person("Bob", 30), Person("Alice", 35))
val distinctByName = personData.distinct(_.name)

Aggregation Operations

Basic Aggregations

class DataSet[T] {
  def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
  def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
  def sum(field: Int): AggregateDataSet[T]
  def sum(field: String): AggregateDataSet[T]
  def max(field: Int): AggregateDataSet[T]
  def max(field: String): AggregateDataSet[T]
  def min(field: Int): AggregateDataSet[T]
  def min(field: String): AggregateDataSet[T]
  def maxBy(fields: Int*): DataSet[T]
  def maxBy(firstField: String, otherFields: String*): DataSet[T]
  def minBy(fields: Int*): DataSet[T]
  def minBy(firstField: String, otherFields: String*): DataSet[T]
}
val sales = env.fromElements(
  ("Q1", 1000), ("Q2", 1500), ("Q3", 1200), ("Q4", 1800)
)

// Sum by field index
val totalSales = sales.sum(1)

// Max by field
val maxSales = sales.max(1)

// MaxBy - return entire record with maximum value
val bestQuarter = sales.maxBy(1)

Reduce Operations

class DataSet[T] {
  def reduce(fun: (T, T) => T): DataSet[T]
  def reduce(reducer: ReduceFunction[T]): DataSet[T]
  def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
  def reduceGroup[R: TypeInformation: ClassTag](groupReducer: GroupReduceFunction[T, R]): DataSet[R]
  def combineGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
  def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
}
val numbers = env.fromElements(1, 2, 3, 4, 5)

// Simple reduce - sum all numbers
val sum = numbers.reduce(_ + _)

// Reduce with more complex logic
val product = numbers.reduce((a, b) => a * b)

// Reduce group (process all elements together)
val statistics = numbers.reduceGroup { iterator =>
  val values = iterator.toList
  val sum = values.sum
  val count = values.length
  val avg = sum.toDouble / count
  (sum, count, avg)
}

Partitioning Operations

class DataSet[T] {
  def partitionByHash(fields: Int*): DataSet[T]
  def partitionByHash(firstField: String, otherFields: String*): DataSet[T]
  def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]
  def partitionByRange(fields: Int*): DataSet[T]
  def partitionByRange(firstField: String, otherFields: String*): DataSet[T]
  def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]
  def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]
  def rebalance(): DataSet[T]
}
val data = env.fromElements(("A", 1), ("B", 2), ("C", 3), ("D", 4))

// Hash partition by first field
val hashPartitioned = data.partitionByHash(0)

// Range partition by second field
val rangePartitioned = data.partitionByRange(1)

// Custom partitioning
val customPartitioned = data.partitionCustom(new MyPartitioner(), _._1)

// Rebalance (round-robin distribution)
val rebalanced = data.rebalance()

Sort Partition

class DataSet[T] {
  def sortPartition(field: Int, order: Order): DataSet[T]
  def sortPartition(field: String, order: Order): DataSet[T]
  def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
}
import org.apache.flink.api.common.operators.Order

val data = env.fromElements(("Alice", 25), ("Bob", 30), ("Charlie", 20))

// Sort partitions by age in ascending order
val sortedByAge = data.sortPartition(1, Order.ASCENDING)

// Sort by key function
val sortedByName = data.sortPartition(_._1, Order.DESCENDING)

Output Operations

Collect and Print

class DataSet[T] {
  def collect(): Seq[T]
  def count(): Long
  def print(): Unit
  def printToErr(): Unit
  def printOnTaskManager(sinkIdentifier: String): DataSink[T]
}
val numbers = env.fromElements(1, 2, 3, 4, 5)

// Collect to local collection (triggers execution)
val results: Seq[Int] = numbers.collect()

// Count elements
val elementCount: Long = numbers.count()

// Print to console
numbers.print()

// Print to stderr
numbers.printToErr()

File Output

class DataSet[T] {
  def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE): DataSink[T]
  def writeAsCsv(
    filePath: String,
    rowDelimiter: String = "\n",
    fieldDelimiter: String = ",",
    writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE
  ): DataSink[T]
  def write[O <: FileOutputFormat[T]](
    outputFormat: O,
    filePath: String,
    writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE
  ): DataSink[T]
  def output(outputFormat: OutputFormat[T]): DataSink[T]
}
val results = env.fromElements("Hello", "World", "Flink")

// Write as text file
results.writeAsText("output/results.txt")

// Write as CSV with custom delimiters
val csvData = env.fromElements(("Alice", 25), ("Bob", 30))
csvData.writeAsCsv("output/people.csv", fieldDelimiter = ";")

// Write with custom output format
results.write(new MyOutputFormat(), "output/custom.dat")

First Operations

class DataSet[T] {
  def first(n: Int): DataSet[T]
}
val numbers = env.fromElements(10, 5, 8, 3, 1, 9, 2)

// Get first 3 elements (order not guaranteed unless sorted)
val firstThree = numbers.first(3)

// Get first 3 after sorting
val topThree = numbers.sortPartition(0, Order.DESCENDING).first(3)

Union Operations

class DataSet[T] {
  def union(other: DataSet[T]): DataSet[T]
}
val set1 = env.fromElements(1, 2, 3)
val set2 = env.fromElements(4, 5, 6)

// Union two datasets
val combined = set1.union(set2)

// Chain multiple unions
val set3 = env.fromElements(7, 8, 9)
val allSets = set1.union(set2).union(set3)

Configuration and Optimization

Broadcast Variables

class DataSet[T] {
  def withBroadcastSet(data: DataSet[_], name: String): DataSet[T]
}
val lookupData = env.fromElements(("A", 100), ("B", 200), ("C", 300))
val mainData = env.fromElements("A", "B", "C", "A")

// Use lookup data as broadcast variable
val enriched = mainData
  .withBroadcastSet(lookupData, "lookup")
  .map(new RichMapFunction[String, (String, Int)] {
    var lookupMap: Map[String, Int] = _
    
    override def open(parameters: Configuration): Unit = {
      import scala.collection.JavaConverters._
      val lookupCollection = getRuntimeContext
        .getBroadcastVariable[(String, Int)]("lookup")
        .asScala
      lookupMap = lookupCollection.toMap
    }
    
    override def map(value: String): (String, Int) = {
      (value, lookupMap.getOrElse(value, 0))
    }
  })

Field Forwarding Hints

class DataSet[T] {
  def withForwardedFields(forwardedFields: String*): DataSet[T]
  def withForwardedFieldsFirst(forwardedFields: String*): DataSet[T]
  def withForwardedFieldsSecond(forwardedFields: String*): DataSet[T]
}
val tuples = env.fromElements(("Alice", 25, "Engineer"), ("Bob", 30, "Manager"))

// Hint that first field is forwarded unchanged
val mapped = tuples
  .withForwardedFields("0") // field 0 (name) is forwarded
  .map(t => (t._1, t._2 + 1, t._3)) // only increment age

Iteration Operations

class DataSet[T] {
  def iterate(maxIterations: Int)(stepFunction: DataSet[T] => DataSet[T]): DataSet[T]
  def iterateWithTermination(maxIterations: Int)(
    stepFunction: DataSet[T] => (DataSet[T], DataSet[_])
  ): DataSet[T]
  def iterateDelta[R: ClassTag](
    workset: DataSet[R], 
    maxIterations: Int, 
    keyFields: Array[Int]
  )(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
  def iterateDelta[R: ClassTag](
    workset: DataSet[R], 
    maxIterations: Int, 
    keyFields: Array[String]
  )(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
  def iterateDelta[R: ClassTag](
    workset: DataSet[R], 
    maxIterations: Int, 
    keyFields: Array[Int]
  )(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R], DataSet[_])): DataSet[T]
}
val initial = env.fromElements(1.0)

// Simple iteration - compute power
val result = initial.iterate(10) { current =>
  current.map(_ * 2)
}

// Iteration with termination criterion
val converged = initial.iterateWithTermination(100) { current =>
  val next = current.map(x => (x + 2.0 / x) / 2.0) // Newton's method for sqrt(2)
  val termination = current.filter(Math.abs(_) < 0.0001)
  (next, termination)
}

Types

import org.apache.flink.api.common.operators.Order

object Order extends Enumeration {
  val ASCENDING, DESCENDING = Value
}

object FileSystem {
  object WriteMode extends Enumeration {
    val NO_OVERWRITE, OVERWRITE = Value
  }
}

trait DataSink[T] {
  def name(name: String): DataSink[T]
  def setParallelism(parallelism: Int): DataSink[T]
}

class AggregateDataSet[T](private[flink] val set: DataSet[T], private[flink] val keys: Keys[T], private[flink] val aggregations: Seq[AggregationFunction[_]]) {
  def and(agg: Aggregations, field: Int): AggregateDataSet[T]
  def and(agg: Aggregations, field: String): AggregateDataSet[T]
  def andSum(field: Int): AggregateDataSet[T]
  def andSum(field: String): AggregateDataSet[T]
  def andMax(field: Int): AggregateDataSet[T]
  def andMax(field: String): AggregateDataSet[T]
  def andMin(field: Int): AggregateDataSet[T]
  def andMin(field: String): AggregateDataSet[T]
}