Apache Flink Scala API module that provides elegant and fluent Scala language bindings for Flink's distributed stream and batch processing framework
Core data processing operations for mapping, filtering, reducing, and transforming DataSets. These operations form the foundation of Flink's data processing capabilities.
Transform each element in a DataSet to a new element, potentially of a different type.
class DataSet[T] {
/**
* Applies a transformation function to each element
* @param fun Transformation function T => R
* @return DataSet with transformed elements
*/
def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
/**
* Applies a MapFunction to each element
* @param mapper MapFunction implementation
* @return DataSet with transformed elements
*/
def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R]
}Usage Examples:
import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
val numbers = env.fromElements(1, 2, 3, 4, 5)
// Simple transformation
val doubled = numbers.map(_ * 2)
// Type transformation
case class Person(name: String, age: Int)
val people = env.fromElements("Alice,25", "Bob,30")
val persons = people.map { line =>
val parts = line.split(",")
Person(parts(0), parts(1).toInt)
}Transform each element into zero or more elements, useful for splitting and expanding data.
class DataSet[T] {
/**
* Transforms each element into a traversable collection
* @param fun Function returning TraversableOnce[R]
* @return DataSet with flattened results
*/
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
/**
* Transforms using a FlatMapFunction with collector
* @param fun Function using collector to emit results
* @return DataSet with collected results
*/
def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataSet[R]
/**
* Applies a FlatMapFunction to each element
* @param flatMapper FlatMapFunction implementation
* @return DataSet with flattened results
*/
def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R]
}Usage Examples:
val sentences = env.fromElements("Hello world", "Scala Flink", "Data processing")
// Split sentences into words
val words = sentences.flatMap(_.split(" "))
// Generate multiple values per input
val numbers = env.fromElements(1, 2, 3)
val expanded = numbers.flatMap(n => 1 to n)
// Result: [1, 1, 2, 1, 2, 3]Transform entire partitions at once, useful for initialization-heavy operations.
class DataSet[T] {
/**
* Transforms entire partitions using an iterator
* @param fun Function transforming Iterator[T] to TraversableOnce[R]
* @return DataSet with partition-wise transformations
*/
def mapPartition[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
/**
* Transforms partitions using iterator and collector
* @param fun Function with iterator input and collector output
* @return DataSet with collected partition results
*/
def mapPartition[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
/**
* Applies a MapPartitionFunction to each partition
* @param partitionMapper MapPartitionFunction implementation
* @return DataSet with partition transformations
*/
def mapPartition[R: TypeInformation: ClassTag](partitionMapper: MapPartitionFunction[T, R]): DataSet[R]
}Select elements based on predicates, removing elements that don't match criteria.
class DataSet[T] {
/**
* Filters elements using a predicate function
* @param fun Predicate function returning Boolean
* @return DataSet with filtered elements
*/
def filter(fun: T => Boolean): DataSet[T]
/**
* Filters elements using a FilterFunction
* @param filter FilterFunction implementation
* @return DataSet with filtered elements
*/
def filter(filter: FilterFunction[T]): DataSet[T]
}Usage Examples:
val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Filter even numbers
val evenNumbers = numbers.filter(_ % 2 == 0)
// Filter with complex condition
case class Product(name: String, price: Double, inStock: Boolean)
val products = env.fromElements(
Product("Laptop", 999.99, true),
Product("Mouse", 29.99, false),
Product("Keyboard", 79.99, true)
)
val availableProducts = products.filter(p => p.inStock && p.price < 100)Combine elements using associative operations to produce aggregated results.
class DataSet[T] {
/**
* Reduces all elements using a combining function
* @param fun Binary combining function (T, T) => T
* @return DataSet with single reduced element
*/
def reduce(fun: (T, T) => T): DataSet[T]
/**
* Reduces elements using a ReduceFunction
* @param reducer ReduceFunction implementation
* @return DataSet with reduced result
*/
def reduce(reducer: ReduceFunction[T]): DataSet[T]
}Process all elements together, potentially producing different output types.
class DataSet[T] {
/**
* Processes all elements as a group using iterator
* @param fun Function processing Iterator[T] to produce R
* @return DataSet with group processing result
*/
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
/**
* Processes groups using iterator and collector
* @param fun Function with iterator input and collector output
* @return DataSet with collected group results
*/
def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
/**
* Applies a GroupReduceFunction to process groups
* @param reducer GroupReduceFunction implementation
* @return DataSet with group reduction results
*/
def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
}Pre-aggregate elements within partitions before final grouping, improving performance.
class DataSet[T] {
/**
* Combines elements within partitions using iterator and collector
* @param fun Function for partition-wise combining
* @return DataSet with partition-combined results
*/
def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
/**
* Applies a GroupCombineFunction for partition-wise combining
* @param combiner GroupCombineFunction implementation
* @return DataSet with combined results
*/
def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
}Remove duplicate elements from DataSets with various key selection strategies.
class DataSet[T] {
/**
* Removes all duplicates from the DataSet
* @return DataSet with unique elements
*/
def distinct(): DataSet[T]
/**
* Removes duplicates based on field positions
* @param fields Field positions to consider for uniqueness
* @return DataSet with unique elements by fields
*/
def distinct(fields: Int*): DataSet[T]
/**
* Removes duplicates based on field names
* @param firstField First field name
* @param otherFields Additional field names
* @return DataSet with unique elements by named fields
*/
def distinct(firstField: String, otherFields: String*): DataSet[T]
/**
* Removes duplicates based on key selector function
* @param fun Key selector function
* @return DataSet with unique elements by key
*/
def distinct[K: TypeInformation](fun: T => K): DataSet[T]
}Usage Examples:
val numbers = env.fromElements(1, 2, 2, 3, 3, 3, 4, 5)
// Remove all duplicates
val unique = numbers.distinct()
// Remove duplicates by key
case class Person(name: String, age: Int, city: String)
val people = env.fromElements(
Person("Alice", 25, "NYC"),
Person("Bob", 25, "NYC"),
Person("Alice", 30, "LA")
)
// Unique by name
val uniqueByName = people.distinct(_.name)
// Unique by age and city
val uniqueByAgeCity = people.distinct(p => (p.age, p.city))Select specific elements or subsets from DataSets.
class DataSet[T] {
/**
* Selects the first n elements
* @param n Number of elements to select
* @return DataSet with first n elements
*/
def first(n: Int): DataSet[T]
/**
* Selects elements with minimum values in specified fields
* @param fields Field positions for minimum comparison
* @return DataSet with minimum elements
*/
def minBy(fields: Int*): DataSet[T]
/**
* Selects elements with maximum values in specified fields
* @param fields Field positions for maximum comparison
* @return DataSet with maximum elements
*/
def maxBy(fields: Int*): DataSet[T]
}Get counts and collect elements for inspection.
class DataSet[T] {
/**
* Counts the number of elements in the DataSet
* @return Number of elements
*/
def count(): Long
/**
* Collects all elements to the driver program
* @return Sequence containing all elements
*/
def collect(): Seq[T]
}Usage Examples:
val data = env.fromElements(1, 2, 3, 4, 5)
// Count elements
val elementCount = data.count()
println(s"Total elements: $elementCount")
// Collect results (use carefully with large datasets)
val results = data.map(_ * 2).collect()
results.foreach(println)abstract class MapFunction[T, O] extends Function {
def map(value: T): O
}
abstract class FlatMapFunction[T, O] extends Function {
def flatMap(value: T, out: Collector[O]): Unit
}
abstract class MapPartitionFunction[T, O] extends Function {
def mapPartition(values: java.lang.Iterable[T], out: Collector[O]): Unit
}
abstract class FilterFunction[T] extends Function {
def filter(value: T): Boolean
}
abstract class ReduceFunction[T] extends Function {
def reduce(value1: T, value2: T): T
}
abstract class GroupReduceFunction[T, O] extends Function {
def reduce(values: java.lang.Iterable[T], out: Collector[O]): Unit
}
abstract class GroupCombineFunction[T, O] extends Function {
def combine(values: java.lang.Iterable[T], out: Collector[O]): Unit
}
trait Collector[T] {
def collect(record: T): Unit
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-scala-2-11