DataStream provides comprehensive transformation operations for processing stream elements, including element-wise transformations, filtering, and stateful processing with type-safe functional interfaces.
class DataStream[T] {
def map[R: TypeInformation](fun: T => R): DataStream[R]
def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R]
}Transform each element to a new element:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.functions.MapFunction
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromElements(1, 2, 3, 4, 5)
// Map with lambda function
val doubled = stream.map(_ * 2)
// Map with explicit function
val stringified = stream.map(x => s"Number: $x")
// Map with MapFunction interface
class SquareMapper extends MapFunction[Int, Int] {
override def map(value: Int): Int = value * value
}
val squared = stream.map(new SquareMapper)
// Map with case class transformation
case class User(id: Int, name: String)
val users = env.fromElements(1, 2, 3)
.map(id => User(id, s"User$id"))class DataStream[T] {
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]
def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R]
def flatMap[R: TypeInformation](flatMapper: FlatMapFunction[T, R]): DataStream[R]
}Transform each element to zero or more elements:
import org.apache.flink.util.Collector
import org.apache.flink.api.common.functions.FlatMapFunction
val env = StreamExecutionEnvironment.getExecutionEnvironment
val sentences = env.fromElements("Hello world", "How are you", "Flink streaming")
// FlatMap with lambda returning TraversableOnce
val words = sentences.flatMap(_.split("\\s+"))
// FlatMap with lambda using Collector
val wordsWithCollector = sentences.flatMap { (sentence, out) =>
sentence.split("\\s+").foreach(out.collect)
}
// FlatMap with FlatMapFunction
class WordSplitter extends FlatMapFunction[String, String] {
override def flatMap(sentence: String, out: Collector[String]): Unit = {
sentence.toLowerCase.split("\\W+")
.filter(_.nonEmpty)
.foreach(out.collect)
}
}
val cleanWords = sentences.flatMap(new WordSplitter)
// FlatMap for conditional emission
val evenNumbers = env.fromElements(1, 2, 3, 4, 5, 6)
.flatMap(x => if (x % 2 == 0) Some(x) else None)class DataStream[T] {
def filter(fun: T => Boolean): DataStream[T]
def filter(filter: FilterFunction[T]): DataStream[T]
}Filter elements based on predicates:
import org.apache.flink.api.common.functions.FilterFunction
val env = StreamExecutionEnvironment.getExecutionEnvironment
val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Filter with lambda
val evenNumbers = numbers.filter(_ % 2 == 0)
// Filter with complex condition
case class Person(name: String, age: Int)
val people = env.fromElements(
Person("Alice", 25),
Person("Bob", 17),
Person("Charlie", 30)
)
val adults = people.filter(_.age >= 18)
// Filter with FilterFunction
class PositiveFilter extends FilterFunction[Int] {
override def filter(value: Int): Boolean = value > 0
}
val positives = numbers.filter(new PositiveFilter)class DataStream[T] {
def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]
}Low-level processing with access to timers and state:
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.ProcessFunction.Context
import org.apache.flink.util.Collector
class EventProcessor extends ProcessFunction[String, String] {
override def processElement(
value: String,
ctx: Context,
out: Collector[String]
): Unit = {
// Access current timestamp
val timestamp = ctx.timestamp()
// Access current watermark
val watermark = ctx.timerService().currentWatermark()
// Register timer for 10 seconds from now
ctx.timerService().registerProcessingTimeTimer(
ctx.timerService().currentProcessingTime() + 10000
)
// Emit result
out.collect(s"Processed: $value at $timestamp")
}
override def onTimer(
timestamp: Long,
ctx: OnTimerContext,
out: Collector[String]
): Unit = {
out.collect(s"Timer fired at $timestamp")
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val processed = env.socketTextStream("localhost", 9999)
.process(new EventProcessor)class DataStream[T] {
def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X]
}
case class OutputTag[T](id: String)(implicit val typeInfo: TypeInformation[T])Route different types of data to side outputs:
import org.apache.flink.streaming.api.scala.OutputTag
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Define output tags
val evenTag = OutputTag[Int]("even-numbers")
val oddTag = OutputTag[Int]("odd-numbers")
class NumberSplitter extends ProcessFunction[Int, String] {
override def processElement(
value: Int,
ctx: Context,
out: Collector[String]
): Unit = {
if (value % 2 == 0) {
ctx.output(evenTag, value)
} else {
ctx.output(oddTag, value)
}
out.collect(s"Processed: $value")
}
}
val mainStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.process(new NumberSplitter)
// Get side output streams
val evenNumbers = mainStream.getSideOutput(evenTag)
val oddNumbers = mainStream.getSideOutput(oddTag)
evenNumbers.print("Even")
oddNumbers.print("Odd")
mainStream.print("Main")class DataStream[T] {
def split(selector: OutputSelector[T]): SplitStream[T]
def split(fun: T => TraversableOnce[String]): SplitStream[T]
}
class SplitStream[T] {
def select(outputNames: String*): DataStream[T]
}Split streams based on conditions (deprecated, use side outputs instead):
import org.apache.flink.streaming.api.collector.selector.OutputSelector
val env = StreamExecutionEnvironment.getExecutionEnvironment
val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Split with OutputSelector
class EvenOddSelector extends OutputSelector[Int] {
override def select(value: Int): java.lang.Iterable[String] = {
if (value % 2 == 0) List("even").asJava else List("odd").asJava
}
}
val splitStream = numbers.split(new EvenOddSelector)
val evenStream = splitStream.select("even")
val oddStream = splitStream.select("odd")
// Split with lambda
val splitStreamLambda = numbers.split(value =>
if (value % 2 == 0) List("even") else List("odd")
)class DataStream[T] {
def setParallelism(parallelism: Int): DataStream[T]
def setMaxParallelism(maxParallelism: Int): DataStream[T]
def name(name: String): DataStream[T]
def uid(uid: String): DataStream[T]
def setUidHash(hash: String): DataStream[T]
def disableChaining(): DataStream[T]
def startNewChain(): DataStream[T]
def slotSharingGroup(slotSharingGroup: String): DataStream[T]
def setBufferTimeout(timeoutMillis: Long): DataStream[T]
}Configure transformation operators:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromElements(1, 2, 3, 4, 5)
val result = stream
.map(_ * 2)
.name("Doubler") // Set operator name
.uid("doubler-operator") // Set unique ID
.setParallelism(4) // Set parallelism
.disableChaining() // Disable operator chaining
.slotSharingGroup("group1") // Set slot sharing group
.filter(_ > 5)
.name("Filter Large Numbers")
.startNewChain() // Start new operator chainimport org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
class SafeMapper extends RichMapFunction[String, Int] {
override def map(value: String): Int = {
try {
value.toInt
} catch {
case _: NumberFormatException =>
// Log error or emit to side output
getRuntimeContext.getMetricGroup
.counter("parsing-errors")
.inc()
-1 // Default value
}
}
}val validTag = OutputTag[Int]("valid")
val invalidTag = OutputTag[String]("invalid")
class ValidatingProcessor extends ProcessFunction[String, String] {
override def processElement(
value: String,
ctx: Context,
out: Collector[String]
): Unit = {
try {
val number = value.toInt
ctx.output(validTag, number)
out.collect(s"Processed: $number")
} catch {
case _: NumberFormatException =>
ctx.output(invalidTag, value)
}
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements("1", "2", "invalid", "4", "not-a-number", "6")
val processed = input.process(new ValidatingProcessor)
val validNumbers = processed.getSideOutput(validTag)
val invalidInputs = processed.getSideOutput(invalidTag)
// Handle valid and invalid data separately
validNumbers.print("Valid")
invalidInputs.print("Invalid")val env = StreamExecutionEnvironment.getExecutionEnvironment
// Operations are chained together by default for efficiency
val stream = env.fromElements(1, 2, 3, 4, 5)
.map(_ * 2) // Chained with filter
.filter(_ > 5) // Chained with map
.map(_ + 1) // Chained with previous operations
// Explicit chaining control
val explicitChaining = env.fromElements(1, 2, 3, 4, 5)
.map(_ * 2)
.startNewChain() // Force new chain
.filter(_ > 5)
.disableChaining() // Disable chaining for this operator
.map(_ + 1)// Prefer stateless transformations when possible
val stateless = stream.map(_ * 2) // No state required
// Use process functions judiciously as they can maintain state
class StatefulProcessor extends ProcessFunction[Int, Int] {
// This would maintain state per key
override def processElement(value: Int, ctx: Context, out: Collector[Int]): Unit = {
// Processing logic
}
}import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
object TextProcessingPipeline {
case class WordCount(word: String, count: Int)
// Output tags for different types of words
val shortWordsTag = OutputTag[String]("short-words")
val longWordsTag = OutputTag[String]("long-words")
class WordClassifier extends ProcessFunction[String, WordCount] {
override def processElement(
word: String,
ctx: Context,
out: Collector[WordCount]
): Unit = {
val cleanWord = word.toLowerCase.trim
if (cleanWord.length < 4) {
ctx.output(shortWordsTag, cleanWord)
} else if (cleanWord.length > 8) {
ctx.output(longWordsTag, cleanWord)
}
out.collect(WordCount(cleanWord, 1))
}
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Read text input
val textStream = env.socketTextStream("localhost", 9999)
// Process text through transformation pipeline
val words = textStream
.flatMap(_.split("\\W+"))
.filter(_.nonEmpty)
.name("Word Splitter")
.setParallelism(4)
// Classify and count words
val wordCounts = words
.process(new WordClassifier)
.name("Word Classifier")
// Get side outputs
val shortWords = wordCounts.getSideOutput(shortWordsTag)
val longWords = wordCounts.getSideOutput(longWordsTag)
// Print results
wordCounts.print("Word Counts")
shortWords.print("Short Words")
longWords.print("Long Words")
env.execute("Text Processing Pipeline")
}
}