or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-sources-sinks.mdexecution-environment.mdgrouping-aggregation.mdhadoop-integration.mdindex.mditerations.mdjoins-cogroups.mdtransformations.mdtype-system.md
tile.json

transformations.mddocs/

Transformations

Apache Flink Scala API provides a rich set of transformation operations that enable functional programming patterns with type safety. All transformations are lazy and form a directed acyclic graph (DAG) that is executed when an action is triggered.

Basic Transformations

Map

Transform each element using a function, producing exactly one output element per input element.

class DataSet[T] {
  // Using function literal
  def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
  
  // Using MapFunction  
  def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R]
  
  // Using RichMapFunction with RuntimeContext access
  def map[R: TypeInformation: ClassTag](mapper: RichMapFunction[T, R]): DataSet[R]
}

FlatMap

Transform each element into zero, one, or more output elements.

class DataSet[T] {
  // Using function returning TraversableOnce
  def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
  
  // Using function with Collector
  def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataSet[R]
  
  // Using FlatMapFunction
  def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R]
  
  // Using RichFlatMapFunction with RuntimeContext access
  def flatMap[R: TypeInformation: ClassTag](flatMapper: RichFlatMapFunction[T, R]): DataSet[R]
}

Filter

Keep only elements that satisfy a predicate function.

class DataSet[T] {
  // Using boolean function
  def filter(fun: T => Boolean): DataSet[T]
  
  // Using FilterFunction
  def filter(filter: FilterFunction[T]): DataSet[T]
  
  // Using RichFilterFunction with RuntimeContext access
  def filter(filter: RichFilterFunction[T]): DataSet[T]
}

Partition-wise Transformations

MapPartition

Transform entire partitions at once, useful for expensive initialization operations.

class DataSet[T] {
  // Using function on Iterator
  def mapPartition[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
  
  // Using function with Collector
  def mapPartition[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
  
  // Using MapPartitionFunction
  def mapPartition[R: TypeInformation: ClassTag](partitionMapper: MapPartitionFunction[T, R]): DataSet[R]
}

Transformations with Broadcast Variables

MapWithBroadcastSet

Transform elements with access to broadcast variables for enrichment or lookup operations.

class DataSet[T] {
  def mapWithBroadcastSet[R: TypeInformation: ClassTag](
    fun: (T, Iterable[_]) => R, 
    broadcastSetName: String
  ): DataSet[R]
}

FlatMapWithBroadcastSet

class DataSet[T] {
  def flatMapWithBroadcastSet[R: TypeInformation: ClassTag](
    fun: (T, Iterable[_], Collector[R]) => Unit, 
    broadcastSetName: String
  ): DataSet[R]
}

MapPartitionWithBroadcastSet

class DataSet[T] {
  def mapPartitionWithBroadcastSet[R: TypeInformation: ClassTag](
    fun: (Iterator[T], Iterable[_], Collector[R]) => Unit, 
    broadcastSetName: String
  ): DataSet[R]
}

FlatMapPartitionWithBroadcastSet

class DataSet[T] {
  def flatMapPartitionWithBroadcastSet[R: TypeInformation: ClassTag](
    fun: (Iterator[T], Iterable[_], Collector[R]) => Unit, 
    broadcastSetName: String
  ): DataSet[R]
}

FilterWithBroadcastSet

class DataSet[T] {
  def filterWithBroadcastSet(
    fun: (T, Iterable[_]) => Boolean, 
    broadcastSetName: String
  ): DataSet[T]
}

Reduce Operations

Reduce

Combine elements using an associative and commutative function.

class DataSet[T] {
  // Using function
  def reduce(fun: (T, T) => T): DataSet[T]
  
  // Using ReduceFunction
  def reduce(reducer: ReduceFunction[T]): DataSet[T]
  
  // Using RichReduceFunction with RuntimeContext access
  def reduce(reducer: RichReduceFunction[T]): DataSet[T]
}

Set Operations

Union

Combine multiple DataSets into one containing all elements.

class DataSet[T] {
  def union(other: DataSet[T]*): DataSet[T]
}

Distinct

Remove duplicate elements based on equality or key extraction.

class DataSet[T] {
  // Distinct by element equality
  def distinct(): DataSet[T]
  
  // Distinct by key function
  def distinct[K: TypeInformation](fun: T => K): DataSet[T]
  
  // Distinct by field positions
  def distinct(fields: Int*): DataSet[T]
  
  // Distinct by field names  
  def distinct(firstField: String, otherFields: String*): DataSet[T]
}

Sorting and Partitioning

SortPartition

Sort elements within each partition.

class DataSet[T] {
  // Sort by key function
  def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
  
  // Sort by field position
  def sortPartition(field: Int, order: Order): DataSet[T]
  
  // Sort by field name
  def sortPartition(field: String, order: Order): DataSet[T]
}

Partitioning Operations

Control how data is distributed across parallel instances.

class DataSet[T] {
  // Hash partitioning
  def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]
  def partitionByHash(fields: Int*): DataSet[T]
  def partitionByHash(firstField: String, otherFields: String*): DataSet[T]
  
  // Range partitioning  
  def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]
  def partitionByRange(fields: Int*): DataSet[T]
  def partitionByRange(firstField: String, otherFields: String*): DataSet[T]
  
  // Custom partitioning
  def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]
  def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fields: Int*): DataSet[T]
  def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], firstField: String, otherFields: String*): DataSet[T]
  
  // Round-robin rebalancing
  def rebalance(): DataSet[T]
}

Sampling

Sample

Extract a random sample of elements from the DataSet.

class DataSet[T] {
  // Sample with fraction
  def sample(withReplacement: Boolean, fraction: Double): DataSet[T]
  def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataSet[T]
}

First

Get the first n elements.

class DataSet[T] {
  def first(n: Int): DataSet[T]
}

Configuration Operations

Naming and Parallelism

class DataSet[T] {
  // Set operation name for debugging
  def name(name: String): DataSet[T]
  
  // Set parallelism for this operation
  def setParallelism(parallelism: Int): DataSet[T]
  def getParallelism: Int
}

Resource Configuration

class DataSet[T] {
  // Get resource specifications
  def minResources: ResourceSpec
  def preferredResources: ResourceSpec
  
  // Set resource requirements
  def setResources(minResources: ResourceSpec, preferredResources: ResourceSpec): DataSet[T]
}

Broadcast Variables

class DataSet[T] {
  // Add broadcast variable
  def withBroadcastSet(data: DataSet[_], name: String): DataSet[T]
}

Usage Examples

Basic Transformations

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// Map: square each number
val squares = numbers.map(x => x * x)

// Filter: keep only even numbers
val evens = numbers.filter(_ % 2 == 0)

// FlatMap: split range into individual numbers
val ranges = env.fromElements("1-3", "4-6", "7-9")
val flatMapped = ranges.flatMap { range =>
  val parts = range.split("-")
  val start = parts(0).toInt
  val end = parts(1).toInt
  start to end
}

Working with Case Classes

import org.apache.flink.api.scala._

case class Person(name: String, age: Int, city: String)

val env = ExecutionEnvironment.getExecutionEnvironment
val people = env.fromElements(
  Person("Alice", 25, "New York"),
  Person("Bob", 30, "London"), 
  Person("Charlie", 35, "Paris")
)

// Map: extract names
val names = people.map(_.name)

// Filter: adults only  
val adults = people.filter(_.age >= 18)

// Transform: create summary
val summaries = people.map(p => s"${p.name} (${p.age}) from ${p.city}")

Partition Operations

import org.apache.flink.api.scala._
import org.apache.flink.api.common.operators.Order

val env = ExecutionEnvironment.getExecutionEnvironment

case class Sale(product: String, amount: Double, region: String)

val sales = env.fromElements(
  Sale("laptop", 1000.0, "US"),
  Sale("phone", 500.0, "EU"),
  Sale("tablet", 300.0, "US")
)

// Partition by region for co-location
val partitioned = sales.partitionByHash(_.region)

// Sort within partitions by amount
val sorted = sales.sortPartition(_.amount, Order.DESCENDING)

// Rebalance for load distribution
val rebalanced = sales.rebalance()

Broadcast Variables

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

// Lookup data
val exchangeRates = env.fromElements(
  ("USD", 1.0),
  ("EUR", 0.85), 
  ("GBP", 0.73)
)

// Transaction data
val transactions = env.fromElements(
  ("USD", 100.0),
  ("EUR", 85.0),
  ("GBP", 73.0)
)

// Convert to USD using broadcast variable
val convertedTransactions = transactions
  .map { case (currency, amount) =>
    // This would access broadcast variable in real implementation
    amount * 1.0 // Simplified example
  }
  .withBroadcastSet(exchangeRates, "rates")

MapPartition for Expensive Operations

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements("hello", "world", "flink", "scala")

// Expensive initialization per partition
val processed = data.mapPartition { iter =>
  // Expensive setup (done once per partition)
  val expensiveResource = initializeExpensiveResource()
  
  // Process all elements in partition
  iter.map { element =>
    expensiveResource.process(element)
  }
}

def initializeExpensiveResource(): AnyRef = {
  // Simulate expensive initialization
  new AnyRef
}

Rich Functions

Rich functions provide access to the RuntimeContext for advanced features like accumulators, broadcast variables, and execution parameters.

Rich Function Types

// Rich function base classes
abstract class RichMapFunction[T, O] extends MapFunction[T, O] with RichFunction
abstract class RichFlatMapFunction[T, O] extends FlatMapFunction[T, O] with RichFunction  
abstract class RichFilterFunction[T] extends FilterFunction[T] with RichFunction
abstract class RichReduceFunction[T] extends ReduceFunction[T] with RichFunction
abstract class RichGroupReduceFunction[T, O] extends GroupReduceFunction[T, O] with RichFunction
abstract class RichMapPartitionFunction[T, O] extends MapPartitionFunction[T, O] with RichFunction

// RichFunction interface
trait RichFunction {
  def getRuntimeContext: RuntimeContext
  def open(parameters: Configuration): Unit = {}
  def close(): Unit = {}
  def setRuntimeContext(t: RuntimeContext): Unit
}

// RuntimeContext interface  
trait RuntimeContext {
  def getExecutionConfig: ExecutionConfig
  def getJobId: JobID
  def getTaskName: String
  def getTaskNameWithSubtasks: String
  def getNumberOfParallelSubtasks: Int
  def getIndexOfThisSubtask: Int
  def getAttemptNumber: Int
  
  // Accumulators
  def getAccumulator[V, A](name: String): A
  def getIntCounter(name: String): IntCounter
  def getLongCounter(name: String): LongCounter
  def getDoubleCounter(name: String): DoubleCounter
  def getHistogram(name: String): Histogram
  
  // Broadcast variables
  def getBroadcastVariable[T](name: String): util.List[T]
  def getBroadcastVariableWithInitializer[T, C](name: String, initializer: BroadcastVariableInitializer[T, C]): C
}

Rich Function Usage Examples

import org.apache.flink.api.scala._
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration

val env = ExecutionEnvironment.getExecutionEnvironment

// Rich function with accumulator access
class CountingMapper extends RichMapFunction[String, String] {
  var counter: IntCounter = _
  
  override def open(parameters: Configuration): Unit = {
    counter = getRuntimeContext.getIntCounter("processed-records")
  }
  
  override def map(value: String): String = {
    counter.add(1)
    value.toUpperCase
  }
}

val data = env.fromElements("hello", "world", "flink")
val result = data.map(new CountingMapper())

// Rich function with broadcast variables
class EnrichingMapper extends RichMapFunction[String, String] {
  var broadcastData: util.List[String] = _
  
  override def open(parameters: Configuration): Unit = {
    broadcastData = getRuntimeContext.getBroadcastVariable("lookup-data")
  }
  
  override def map(value: String): String = {
    // Use broadcast data for enrichment
    val enrichment = if (broadcastData.contains(value)) " [FOUND]" else " [NOT_FOUND]"
    value + enrichment
  }
}

val lookupData = env.fromElements("hello", "flink")
val enriched = data
  .map(new EnrichingMapper())
  .withBroadcastSet(lookupData, "lookup-data")