Apache Flink Scala API provides a rich set of transformation operations that enable functional programming patterns with type safety. All transformations are lazy and form a directed acyclic graph (DAG) that is executed when an action is triggered.
Transform each element using a function, producing exactly one output element per input element.
class DataSet[T] {
// Using function literal
def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
// Using MapFunction
def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R]
// Using RichMapFunction with RuntimeContext access
def map[R: TypeInformation: ClassTag](mapper: RichMapFunction[T, R]): DataSet[R]
}Transform each element into zero, one, or more output elements.
class DataSet[T] {
// Using function returning TraversableOnce
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
// Using function with Collector
def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataSet[R]
// Using FlatMapFunction
def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R]
// Using RichFlatMapFunction with RuntimeContext access
def flatMap[R: TypeInformation: ClassTag](flatMapper: RichFlatMapFunction[T, R]): DataSet[R]
}Keep only elements that satisfy a predicate function.
class DataSet[T] {
// Using boolean function
def filter(fun: T => Boolean): DataSet[T]
// Using FilterFunction
def filter(filter: FilterFunction[T]): DataSet[T]
// Using RichFilterFunction with RuntimeContext access
def filter(filter: RichFilterFunction[T]): DataSet[T]
}Transform entire partitions at once, useful for expensive initialization operations.
class DataSet[T] {
// Using function on Iterator
def mapPartition[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
// Using function with Collector
def mapPartition[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
// Using MapPartitionFunction
def mapPartition[R: TypeInformation: ClassTag](partitionMapper: MapPartitionFunction[T, R]): DataSet[R]
}Transform elements with access to broadcast variables for enrichment or lookup operations.
class DataSet[T] {
def mapWithBroadcastSet[R: TypeInformation: ClassTag](
fun: (T, Iterable[_]) => R,
broadcastSetName: String
): DataSet[R]
}class DataSet[T] {
def flatMapWithBroadcastSet[R: TypeInformation: ClassTag](
fun: (T, Iterable[_], Collector[R]) => Unit,
broadcastSetName: String
): DataSet[R]
}class DataSet[T] {
def mapPartitionWithBroadcastSet[R: TypeInformation: ClassTag](
fun: (Iterator[T], Iterable[_], Collector[R]) => Unit,
broadcastSetName: String
): DataSet[R]
}class DataSet[T] {
def flatMapPartitionWithBroadcastSet[R: TypeInformation: ClassTag](
fun: (Iterator[T], Iterable[_], Collector[R]) => Unit,
broadcastSetName: String
): DataSet[R]
}class DataSet[T] {
def filterWithBroadcastSet(
fun: (T, Iterable[_]) => Boolean,
broadcastSetName: String
): DataSet[T]
}Combine elements using an associative and commutative function.
class DataSet[T] {
// Using function
def reduce(fun: (T, T) => T): DataSet[T]
// Using ReduceFunction
def reduce(reducer: ReduceFunction[T]): DataSet[T]
// Using RichReduceFunction with RuntimeContext access
def reduce(reducer: RichReduceFunction[T]): DataSet[T]
}Combine multiple DataSets into one containing all elements.
class DataSet[T] {
def union(other: DataSet[T]*): DataSet[T]
}Remove duplicate elements based on equality or key extraction.
class DataSet[T] {
// Distinct by element equality
def distinct(): DataSet[T]
// Distinct by key function
def distinct[K: TypeInformation](fun: T => K): DataSet[T]
// Distinct by field positions
def distinct(fields: Int*): DataSet[T]
// Distinct by field names
def distinct(firstField: String, otherFields: String*): DataSet[T]
}Sort elements within each partition.
class DataSet[T] {
// Sort by key function
def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
// Sort by field position
def sortPartition(field: Int, order: Order): DataSet[T]
// Sort by field name
def sortPartition(field: String, order: Order): DataSet[T]
}Control how data is distributed across parallel instances.
class DataSet[T] {
// Hash partitioning
def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]
def partitionByHash(fields: Int*): DataSet[T]
def partitionByHash(firstField: String, otherFields: String*): DataSet[T]
// Range partitioning
def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]
def partitionByRange(fields: Int*): DataSet[T]
def partitionByRange(firstField: String, otherFields: String*): DataSet[T]
// Custom partitioning
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fields: Int*): DataSet[T]
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], firstField: String, otherFields: String*): DataSet[T]
// Round-robin rebalancing
def rebalance(): DataSet[T]
}Extract a random sample of elements from the DataSet.
class DataSet[T] {
// Sample with fraction
def sample(withReplacement: Boolean, fraction: Double): DataSet[T]
def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataSet[T]
}Get the first n elements.
class DataSet[T] {
def first(n: Int): DataSet[T]
}class DataSet[T] {
// Set operation name for debugging
def name(name: String): DataSet[T]
// Set parallelism for this operation
def setParallelism(parallelism: Int): DataSet[T]
def getParallelism: Int
}class DataSet[T] {
// Get resource specifications
def minResources: ResourceSpec
def preferredResources: ResourceSpec
// Set resource requirements
def setResources(minResources: ResourceSpec, preferredResources: ResourceSpec): DataSet[T]
}class DataSet[T] {
// Add broadcast variable
def withBroadcastSet(data: DataSet[_], name: String): DataSet[T]
}import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Map: square each number
val squares = numbers.map(x => x * x)
// Filter: keep only even numbers
val evens = numbers.filter(_ % 2 == 0)
// FlatMap: split range into individual numbers
val ranges = env.fromElements("1-3", "4-6", "7-9")
val flatMapped = ranges.flatMap { range =>
val parts = range.split("-")
val start = parts(0).toInt
val end = parts(1).toInt
start to end
}import org.apache.flink.api.scala._
case class Person(name: String, age: Int, city: String)
val env = ExecutionEnvironment.getExecutionEnvironment
val people = env.fromElements(
Person("Alice", 25, "New York"),
Person("Bob", 30, "London"),
Person("Charlie", 35, "Paris")
)
// Map: extract names
val names = people.map(_.name)
// Filter: adults only
val adults = people.filter(_.age >= 18)
// Transform: create summary
val summaries = people.map(p => s"${p.name} (${p.age}) from ${p.city}")import org.apache.flink.api.scala._
import org.apache.flink.api.common.operators.Order
val env = ExecutionEnvironment.getExecutionEnvironment
case class Sale(product: String, amount: Double, region: String)
val sales = env.fromElements(
Sale("laptop", 1000.0, "US"),
Sale("phone", 500.0, "EU"),
Sale("tablet", 300.0, "US")
)
// Partition by region for co-location
val partitioned = sales.partitionByHash(_.region)
// Sort within partitions by amount
val sorted = sales.sortPartition(_.amount, Order.DESCENDING)
// Rebalance for load distribution
val rebalanced = sales.rebalance()import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
// Lookup data
val exchangeRates = env.fromElements(
("USD", 1.0),
("EUR", 0.85),
("GBP", 0.73)
)
// Transaction data
val transactions = env.fromElements(
("USD", 100.0),
("EUR", 85.0),
("GBP", 73.0)
)
// Convert to USD using broadcast variable
val convertedTransactions = transactions
.map { case (currency, amount) =>
// This would access broadcast variable in real implementation
amount * 1.0 // Simplified example
}
.withBroadcastSet(exchangeRates, "rates")import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements("hello", "world", "flink", "scala")
// Expensive initialization per partition
val processed = data.mapPartition { iter =>
// Expensive setup (done once per partition)
val expensiveResource = initializeExpensiveResource()
// Process all elements in partition
iter.map { element =>
expensiveResource.process(element)
}
}
def initializeExpensiveResource(): AnyRef = {
// Simulate expensive initialization
new AnyRef
}Rich functions provide access to the RuntimeContext for advanced features like accumulators, broadcast variables, and execution parameters.
// Rich function base classes
abstract class RichMapFunction[T, O] extends MapFunction[T, O] with RichFunction
abstract class RichFlatMapFunction[T, O] extends FlatMapFunction[T, O] with RichFunction
abstract class RichFilterFunction[T] extends FilterFunction[T] with RichFunction
abstract class RichReduceFunction[T] extends ReduceFunction[T] with RichFunction
abstract class RichGroupReduceFunction[T, O] extends GroupReduceFunction[T, O] with RichFunction
abstract class RichMapPartitionFunction[T, O] extends MapPartitionFunction[T, O] with RichFunction
// RichFunction interface
trait RichFunction {
def getRuntimeContext: RuntimeContext
def open(parameters: Configuration): Unit = {}
def close(): Unit = {}
def setRuntimeContext(t: RuntimeContext): Unit
}
// RuntimeContext interface
trait RuntimeContext {
def getExecutionConfig: ExecutionConfig
def getJobId: JobID
def getTaskName: String
def getTaskNameWithSubtasks: String
def getNumberOfParallelSubtasks: Int
def getIndexOfThisSubtask: Int
def getAttemptNumber: Int
// Accumulators
def getAccumulator[V, A](name: String): A
def getIntCounter(name: String): IntCounter
def getLongCounter(name: String): LongCounter
def getDoubleCounter(name: String): DoubleCounter
def getHistogram(name: String): Histogram
// Broadcast variables
def getBroadcastVariable[T](name: String): util.List[T]
def getBroadcastVariableWithInitializer[T, C](name: String, initializer: BroadcastVariableInitializer[T, C]): C
}import org.apache.flink.api.scala._
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
val env = ExecutionEnvironment.getExecutionEnvironment
// Rich function with accumulator access
class CountingMapper extends RichMapFunction[String, String] {
var counter: IntCounter = _
override def open(parameters: Configuration): Unit = {
counter = getRuntimeContext.getIntCounter("processed-records")
}
override def map(value: String): String = {
counter.add(1)
value.toUpperCase
}
}
val data = env.fromElements("hello", "world", "flink")
val result = data.map(new CountingMapper())
// Rich function with broadcast variables
class EnrichingMapper extends RichMapFunction[String, String] {
var broadcastData: util.List[String] = _
override def open(parameters: Configuration): Unit = {
broadcastData = getRuntimeContext.getBroadcastVariable("lookup-data")
}
override def map(value: String): String = {
// Use broadcast data for enrichment
val enrichment = if (broadcastData.contains(value)) " [FOUND]" else " [NOT_FOUND]"
value + enrichment
}
}
val lookupData = env.fromElements("hello", "flink")
val enriched = data
.map(new EnrichingMapper())
.withBroadcastSet(lookupData, "lookup-data")