Core data processing operations for mapping, filtering, reducing, and transforming DataSets. These operations form the foundation of Flink's data processing capabilities.
Transform each element in a DataSet to a new element, potentially of a different type.
class DataSet[T] {
/**
* Applies a transformation function to each element
* @param fun Transformation function T => R
* @return DataSet with transformed elements
*/
def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
/**
* Applies a MapFunction to each element
* @param mapper MapFunction implementation
* @return DataSet with transformed elements
*/
def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R]
}Usage Examples:
import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
val numbers = env.fromElements(1, 2, 3, 4, 5)
// Simple transformation
val doubled = numbers.map(_ * 2)
// Type transformation
case class Person(name: String, age: Int)
val people = env.fromElements("Alice,25", "Bob,30")
val persons = people.map { line =>
val parts = line.split(",")
Person(parts(0), parts(1).toInt)
}Transform each element into zero or more elements, useful for splitting and expanding data.
class DataSet[T] {
/**
* Transforms each element into a traversable collection
* @param fun Function returning TraversableOnce[R]
* @return DataSet with flattened results
*/
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
/**
* Transforms using a FlatMapFunction with collector
* @param fun Function using collector to emit results
* @return DataSet with collected results
*/
def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataSet[R]
/**
* Applies a FlatMapFunction to each element
* @param flatMapper FlatMapFunction implementation
* @return DataSet with flattened results
*/
def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R]
}Usage Examples:
val sentences = env.fromElements("Hello world", "Scala Flink", "Data processing")
// Split sentences into words
val words = sentences.flatMap(_.split(" "))
// Generate multiple values per input
val numbers = env.fromElements(1, 2, 3)
val expanded = numbers.flatMap(n => 1 to n)
// Result: [1, 1, 2, 1, 2, 3]Transform entire partitions at once, useful for initialization-heavy operations.
class DataSet[T] {
/**
* Transforms entire partitions using an iterator
* @param fun Function transforming Iterator[T] to TraversableOnce[R]
* @return DataSet with partition-wise transformations
*/
def mapPartition[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
/**
* Transforms partitions using iterator and collector
* @param fun Function with iterator input and collector output
* @return DataSet with collected partition results
*/
def mapPartition[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
/**
* Applies a MapPartitionFunction to each partition
* @param partitionMapper MapPartitionFunction implementation
* @return DataSet with partition transformations
*/
def mapPartition[R: TypeInformation: ClassTag](partitionMapper: MapPartitionFunction[T, R]): DataSet[R]
}Select elements based on predicates, removing elements that don't match criteria.
class DataSet[T] {
/**
* Filters elements using a predicate function
* @param fun Predicate function returning Boolean
* @return DataSet with filtered elements
*/
def filter(fun: T => Boolean): DataSet[T]
/**
* Filters elements using a FilterFunction
* @param filter FilterFunction implementation
* @return DataSet with filtered elements
*/
def filter(filter: FilterFunction[T]): DataSet[T]
}Usage Examples:
val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Filter even numbers
val evenNumbers = numbers.filter(_ % 2 == 0)
// Filter with complex condition
case class Product(name: String, price: Double, inStock: Boolean)
val products = env.fromElements(
Product("Laptop", 999.99, true),
Product("Mouse", 29.99, false),
Product("Keyboard", 79.99, true)
)
val availableProducts = products.filter(p => p.inStock && p.price < 100)Combine elements using associative operations to produce aggregated results.
class DataSet[T] {
/**
* Reduces all elements using a combining function
* @param fun Binary combining function (T, T) => T
* @return DataSet with single reduced element
*/
def reduce(fun: (T, T) => T): DataSet[T]
/**
* Reduces elements using a ReduceFunction
* @param reducer ReduceFunction implementation
* @return DataSet with reduced result
*/
def reduce(reducer: ReduceFunction[T]): DataSet[T]
}Process all elements together, potentially producing different output types.
class DataSet[T] {
/**
* Processes all elements as a group using iterator
* @param fun Function processing Iterator[T] to produce R
* @return DataSet with group processing result
*/
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
/**
* Processes groups using iterator and collector
* @param fun Function with iterator input and collector output
* @return DataSet with collected group results
*/
def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
/**
* Applies a GroupReduceFunction to process groups
* @param reducer GroupReduceFunction implementation
* @return DataSet with group reduction results
*/
def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
}Pre-aggregate elements within partitions before final grouping, improving performance.
class DataSet[T] {
/**
* Combines elements within partitions using iterator and collector
* @param fun Function for partition-wise combining
* @return DataSet with partition-combined results
*/
def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
/**
* Applies a GroupCombineFunction for partition-wise combining
* @param combiner GroupCombineFunction implementation
* @return DataSet with combined results
*/
def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
}Remove duplicate elements from DataSets with various key selection strategies.
class DataSet[T] {
/**
* Removes all duplicates from the DataSet
* @return DataSet with unique elements
*/
def distinct(): DataSet[T]
/**
* Removes duplicates based on field positions
* @param fields Field positions to consider for uniqueness
* @return DataSet with unique elements by fields
*/
def distinct(fields: Int*): DataSet[T]
/**
* Removes duplicates based on field names
* @param firstField First field name
* @param otherFields Additional field names
* @return DataSet with unique elements by named fields
*/
def distinct(firstField: String, otherFields: String*): DataSet[T]
/**
* Removes duplicates based on key selector function
* @param fun Key selector function
* @return DataSet with unique elements by key
*/
def distinct[K: TypeInformation](fun: T => K): DataSet[T]
}Usage Examples:
val numbers = env.fromElements(1, 2, 2, 3, 3, 3, 4, 5)
// Remove all duplicates
val unique = numbers.distinct()
// Remove duplicates by key
case class Person(name: String, age: Int, city: String)
val people = env.fromElements(
Person("Alice", 25, "NYC"),
Person("Bob", 25, "NYC"),
Person("Alice", 30, "LA")
)
// Unique by name
val uniqueByName = people.distinct(_.name)
// Unique by age and city
val uniqueByAgeCity = people.distinct(p => (p.age, p.city))Select specific elements or subsets from DataSets.
class DataSet[T] {
/**
* Selects the first n elements
* @param n Number of elements to select
* @return DataSet with first n elements
*/
def first(n: Int): DataSet[T]
/**
* Selects elements with minimum values in specified fields
* @param fields Field positions for minimum comparison
* @return DataSet with minimum elements
*/
def minBy(fields: Int*): DataSet[T]
/**
* Selects elements with maximum values in specified fields
* @param fields Field positions for maximum comparison
* @return DataSet with maximum elements
*/
def maxBy(fields: Int*): DataSet[T]
}Get counts and collect elements for inspection.
class DataSet[T] {
/**
* Counts the number of elements in the DataSet
* @return Number of elements
*/
def count(): Long
/**
* Collects all elements to the driver program
* @return Sequence containing all elements
*/
def collect(): Seq[T]
}Usage Examples:
val data = env.fromElements(1, 2, 3, 4, 5)
// Count elements
val elementCount = data.count()
println(s"Total elements: $elementCount")
// Collect results (use carefully with large datasets)
val results = data.map(_ * 2).collect()
results.foreach(println)abstract class MapFunction[T, O] extends Function {
def map(value: T): O
}
abstract class FlatMapFunction[T, O] extends Function {
def flatMap(value: T, out: Collector[O]): Unit
}
abstract class MapPartitionFunction[T, O] extends Function {
def mapPartition(values: java.lang.Iterable[T], out: Collector[O]): Unit
}
abstract class FilterFunction[T] extends Function {
def filter(value: T): Boolean
}
abstract class ReduceFunction[T] extends Function {
def reduce(value1: T, value2: T): T
}
abstract class GroupReduceFunction[T, O] extends Function {
def reduce(values: java.lang.Iterable[T], out: Collector[O]): Unit
}
abstract class GroupCombineFunction[T, O] extends Function {
def combine(values: java.lang.Iterable[T], out: Collector[O]): Unit
}
trait Collector[T] {
def collect(record: T): Unit
}