or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md
tile.json

data-transformations.mddocs/

Data Transformations

Core data processing operations for mapping, filtering, reducing, and transforming DataSets. These operations form the foundation of Flink's data processing capabilities.

Capabilities

Map Transformations

Transform each element in a DataSet to a new element, potentially of a different type.

class DataSet[T] {
  /**
   * Applies a transformation function to each element
   * @param fun Transformation function T => R
   * @return DataSet with transformed elements
   */
  def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
  
  /**
   * Applies a MapFunction to each element
   * @param mapper MapFunction implementation
   * @return DataSet with transformed elements
   */
  def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R]
}

Usage Examples:

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment
val numbers = env.fromElements(1, 2, 3, 4, 5)

// Simple transformation
val doubled = numbers.map(_ * 2)

// Type transformation
case class Person(name: String, age: Int)
val people = env.fromElements("Alice,25", "Bob,30")
val persons = people.map { line =>
  val parts = line.split(",")
  Person(parts(0), parts(1).toInt)
}

FlatMap Transformations

Transform each element into zero or more elements, useful for splitting and expanding data.

class DataSet[T] {
  /**
   * Transforms each element into a traversable collection
   * @param fun Function returning TraversableOnce[R]
   * @return DataSet with flattened results
   */
  def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
  
  /**
   * Transforms using a FlatMapFunction with collector
   * @param fun Function using collector to emit results
   * @return DataSet with collected results
   */
  def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataSet[R]
  
  /**
   * Applies a FlatMapFunction to each element
   * @param flatMapper FlatMapFunction implementation
   * @return DataSet with flattened results
   */
  def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R]
}

Usage Examples:

val sentences = env.fromElements("Hello world", "Scala Flink", "Data processing")

// Split sentences into words
val words = sentences.flatMap(_.split(" "))

// Generate multiple values per input
val numbers = env.fromElements(1, 2, 3)
val expanded = numbers.flatMap(n => 1 to n)
// Result: [1, 1, 2, 1, 2, 3]

MapPartition Transformations

Transform entire partitions at once, useful for initialization-heavy operations.

class DataSet[T] {
  /**
   * Transforms entire partitions using an iterator
   * @param fun Function transforming Iterator[T] to TraversableOnce[R]
   * @return DataSet with partition-wise transformations
   */
  def mapPartition[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
  
  /**
   * Transforms partitions using iterator and collector
   * @param fun Function with iterator input and collector output
   * @return DataSet with collected partition results
   */
  def mapPartition[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
  
  /**
   * Applies a MapPartitionFunction to each partition
   * @param partitionMapper MapPartitionFunction implementation
   * @return DataSet with partition transformations
   */
  def mapPartition[R: TypeInformation: ClassTag](partitionMapper: MapPartitionFunction[T, R]): DataSet[R]
}

Filter Operations

Select elements based on predicates, removing elements that don't match criteria.

class DataSet[T] {
  /**
   * Filters elements using a predicate function
   * @param fun Predicate function returning Boolean
   * @return DataSet with filtered elements
   */
  def filter(fun: T => Boolean): DataSet[T]
  
  /**
   * Filters elements using a FilterFunction
   * @param filter FilterFunction implementation
   * @return DataSet with filtered elements
   */
  def filter(filter: FilterFunction[T]): DataSet[T]
}

Usage Examples:

val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// Filter even numbers
val evenNumbers = numbers.filter(_ % 2 == 0)

// Filter with complex condition
case class Product(name: String, price: Double, inStock: Boolean)
val products = env.fromElements(
  Product("Laptop", 999.99, true),
  Product("Mouse", 29.99, false),
  Product("Keyboard", 79.99, true)
)

val availableProducts = products.filter(p => p.inStock && p.price < 100)

Reduce Operations

Combine elements using associative operations to produce aggregated results.

class DataSet[T] {
  /**
   * Reduces all elements using a combining function
   * @param fun Binary combining function (T, T) => T
   * @return DataSet with single reduced element
   */
  def reduce(fun: (T, T) => T): DataSet[T]
  
  /**
   * Reduces elements using a ReduceFunction
   * @param reducer ReduceFunction implementation
   * @return DataSet with reduced result
   */
  def reduce(reducer: ReduceFunction[T]): DataSet[T]
}

ReduceGroup Operations

Process all elements together, potentially producing different output types.

class DataSet[T] {
  /**
   * Processes all elements as a group using iterator
   * @param fun Function processing Iterator[T] to produce R
   * @return DataSet with group processing result
   */
  def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
  
  /**
   * Processes groups using iterator and collector
   * @param fun Function with iterator input and collector output
   * @return DataSet with collected group results
   */
  def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
  
  /**
   * Applies a GroupReduceFunction to process groups
   * @param reducer GroupReduceFunction implementation
   * @return DataSet with group reduction results
   */
  def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
}

CombineGroup Operations

Pre-aggregate elements within partitions before final grouping, improving performance.

class DataSet[T] {
  /**
   * Combines elements within partitions using iterator and collector
   * @param fun Function for partition-wise combining
   * @return DataSet with partition-combined results
   */
  def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
  
  /**
   * Applies a GroupCombineFunction for partition-wise combining
   * @param combiner GroupCombineFunction implementation
   * @return DataSet with combined results
   */
  def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
}

Distinct Operations

Remove duplicate elements from DataSets with various key selection strategies.

class DataSet[T] {
  /**
   * Removes all duplicates from the DataSet
   * @return DataSet with unique elements
   */
  def distinct(): DataSet[T]
  
  /**
   * Removes duplicates based on field positions
   * @param fields Field positions to consider for uniqueness
   * @return DataSet with unique elements by fields
   */
  def distinct(fields: Int*): DataSet[T]
  
  /**
   * Removes duplicates based on field names
   * @param firstField First field name
   * @param otherFields Additional field names
   * @return DataSet with unique elements by named fields
   */
  def distinct(firstField: String, otherFields: String*): DataSet[T]
  
  /**
   * Removes duplicates based on key selector function
   * @param fun Key selector function
   * @return DataSet with unique elements by key
   */
  def distinct[K: TypeInformation](fun: T => K): DataSet[T]
}

Usage Examples:

val numbers = env.fromElements(1, 2, 2, 3, 3, 3, 4, 5)

// Remove all duplicates
val unique = numbers.distinct()

// Remove duplicates by key
case class Person(name: String, age: Int, city: String)
val people = env.fromElements(
  Person("Alice", 25, "NYC"),
  Person("Bob", 25, "NYC"),
  Person("Alice", 30, "LA")
)

// Unique by name
val uniqueByName = people.distinct(_.name)

// Unique by age and city
val uniqueByAgeCity = people.distinct(p => (p.age, p.city))

Selection Operations

Select specific elements or subsets from DataSets.

class DataSet[T] {
  /**
   * Selects the first n elements
   * @param n Number of elements to select
   * @return DataSet with first n elements
   */
  def first(n: Int): DataSet[T]
  
  /**
   * Selects elements with minimum values in specified fields
   * @param fields Field positions for minimum comparison
   * @return DataSet with minimum elements
   */
  def minBy(fields: Int*): DataSet[T]
  
  /**
   * Selects elements with maximum values in specified fields
   * @param fields Field positions for maximum comparison
   * @return DataSet with maximum elements
   */
  def maxBy(fields: Int*): DataSet[T]
}

Counting and Collection

Get counts and collect elements for inspection.

class DataSet[T] {
  /**
   * Counts the number of elements in the DataSet
   * @return Number of elements
   */
  def count(): Long
  
  /**
   * Collects all elements to the driver program
   * @return Sequence containing all elements
   */
  def collect(): Seq[T]
}

Usage Examples:

val data = env.fromElements(1, 2, 3, 4, 5)

// Count elements
val elementCount = data.count()
println(s"Total elements: $elementCount")

// Collect results (use carefully with large datasets)
val results = data.map(_ * 2).collect()
results.foreach(println)

Types

abstract class MapFunction[T, O] extends Function {
  def map(value: T): O
}

abstract class FlatMapFunction[T, O] extends Function {
  def flatMap(value: T, out: Collector[O]): Unit
}

abstract class MapPartitionFunction[T, O] extends Function {
  def mapPartition(values: java.lang.Iterable[T], out: Collector[O]): Unit
}

abstract class FilterFunction[T] extends Function {
  def filter(value: T): Boolean  
}

abstract class ReduceFunction[T] extends Function {
  def reduce(value1: T, value2: T): T
}

abstract class GroupReduceFunction[T, O] extends Function {
  def reduce(values: java.lang.Iterable[T], out: Collector[O]): Unit
}

abstract class GroupCombineFunction[T, O] extends Function {
  def combine(values: java.lang.Iterable[T], out: Collector[O]): Unit
}

trait Collector[T] {
  def collect(record: T): Unit
}