or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-operations.mddata-sources.mdfunction-interfaces.mdindex.mdkeyed-streams.mdoutput-operations.mdscala-extensions.mdstream-composition.mdstream-environment.mdstream-partitioning.mdstream-transformations.mdwindow-operations.mdwindowing.md
tile.json

stream-transformations.mddocs/

Stream Transformations and Operations

DataStream provides comprehensive transformation operations for processing stream elements, including element-wise transformations, filtering, and stateful processing with type-safe functional interfaces.

Basic Transformations

Map Operations

class DataStream[T] {
  def map[R: TypeInformation](fun: T => R): DataStream[R]
  def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R]
}

Transform each element to a new element:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.functions.MapFunction

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromElements(1, 2, 3, 4, 5)

// Map with lambda function
val doubled = stream.map(_ * 2)

// Map with explicit function
val stringified = stream.map(x => s"Number: $x")

// Map with MapFunction interface
class SquareMapper extends MapFunction[Int, Int] {
  override def map(value: Int): Int = value * value
}
val squared = stream.map(new SquareMapper)

// Map with case class transformation
case class User(id: Int, name: String)
val users = env.fromElements(1, 2, 3)
  .map(id => User(id, s"User$id"))

FlatMap Operations

class DataStream[T] {
  def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]
  def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R]
  def flatMap[R: TypeInformation](flatMapper: FlatMapFunction[T, R]): DataStream[R]
}

Transform each element to zero or more elements:

import org.apache.flink.util.Collector
import org.apache.flink.api.common.functions.FlatMapFunction

val env = StreamExecutionEnvironment.getExecutionEnvironment
val sentences = env.fromElements("Hello world", "How are you", "Flink streaming")

// FlatMap with lambda returning TraversableOnce
val words = sentences.flatMap(_.split("\\s+"))

// FlatMap with lambda using Collector
val wordsWithCollector = sentences.flatMap { (sentence, out) =>
  sentence.split("\\s+").foreach(out.collect)
}

// FlatMap with FlatMapFunction
class WordSplitter extends FlatMapFunction[String, String] {
  override def flatMap(sentence: String, out: Collector[String]): Unit = {
    sentence.toLowerCase.split("\\W+")
      .filter(_.nonEmpty)
      .foreach(out.collect)
  }
}
val cleanWords = sentences.flatMap(new WordSplitter)

// FlatMap for conditional emission
val evenNumbers = env.fromElements(1, 2, 3, 4, 5, 6)
  .flatMap(x => if (x % 2 == 0) Some(x) else None)

Filter Operations

class DataStream[T] {
  def filter(fun: T => Boolean): DataStream[T]
  def filter(filter: FilterFunction[T]): DataStream[T]
}

Filter elements based on predicates:

import org.apache.flink.api.common.functions.FilterFunction

val env = StreamExecutionEnvironment.getExecutionEnvironment
val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// Filter with lambda
val evenNumbers = numbers.filter(_ % 2 == 0)

// Filter with complex condition
case class Person(name: String, age: Int)
val people = env.fromElements(
  Person("Alice", 25),
  Person("Bob", 17),
  Person("Charlie", 30)
)
val adults = people.filter(_.age >= 18)

// Filter with FilterFunction
class PositiveFilter extends FilterFunction[Int] {
  override def filter(value: Int): Boolean = value > 0
}
val positives = numbers.filter(new PositiveFilter)

Advanced Transformations

Process Functions

class DataStream[T] {
  def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]
}

Low-level processing with access to timers and state:

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.ProcessFunction.Context
import org.apache.flink.util.Collector

class EventProcessor extends ProcessFunction[String, String] {
  override def processElement(
    value: String,
    ctx: Context,
    out: Collector[String]
  ): Unit = {
    // Access current timestamp
    val timestamp = ctx.timestamp()
    
    // Access current watermark
    val watermark = ctx.timerService().currentWatermark()
    
    // Register timer for 10 seconds from now
    ctx.timerService().registerProcessingTimeTimer(
      ctx.timerService().currentProcessingTime() + 10000
    )
    
    // Emit result
    out.collect(s"Processed: $value at $timestamp")
  }
  
  override def onTimer(
    timestamp: Long,
    ctx: OnTimerContext,
    out: Collector[String]
  ): Unit = {
    out.collect(s"Timer fired at $timestamp")
  }
}

val env = StreamExecutionEnvironment.getExecutionEnvironment
val processed = env.socketTextStream("localhost", 9999)
  .process(new EventProcessor)

Side Outputs

OutputTag and Side Output Streams

class DataStream[T] {
  def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X]
}

case class OutputTag[T](id: String)(implicit val typeInfo: TypeInformation[T])

Route different types of data to side outputs:

import org.apache.flink.streaming.api.scala.OutputTag

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Define output tags
val evenTag = OutputTag[Int]("even-numbers")
val oddTag = OutputTag[Int]("odd-numbers")

class NumberSplitter extends ProcessFunction[Int, String] {
  override def processElement(
    value: Int,
    ctx: Context,
    out: Collector[String]
  ): Unit = {
    if (value % 2 == 0) {
      ctx.output(evenTag, value)
    } else {
      ctx.output(oddTag, value)
    }
    out.collect(s"Processed: $value")
  }
}

val mainStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  .process(new NumberSplitter)

// Get side output streams
val evenNumbers = mainStream.getSideOutput(evenTag)
val oddNumbers = mainStream.getSideOutput(oddTag)

evenNumbers.print("Even")
oddNumbers.print("Odd")
mainStream.print("Main")

Stream Splitting (Legacy)

Split and Select Operations

class DataStream[T] {
  def split(selector: OutputSelector[T]): SplitStream[T]
  def split(fun: T => TraversableOnce[String]): SplitStream[T]
}

class SplitStream[T] {
  def select(outputNames: String*): DataStream[T]
}

Split streams based on conditions (deprecated, use side outputs instead):

import org.apache.flink.streaming.api.collector.selector.OutputSelector

val env = StreamExecutionEnvironment.getExecutionEnvironment
val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// Split with OutputSelector
class EvenOddSelector extends OutputSelector[Int] {
  override def select(value: Int): java.lang.Iterable[String] = {
    if (value % 2 == 0) List("even").asJava else List("odd").asJava
  }
}

val splitStream = numbers.split(new EvenOddSelector)
val evenStream = splitStream.select("even")
val oddStream = splitStream.select("odd")

// Split with lambda
val splitStreamLambda = numbers.split(value =>
  if (value % 2 == 0) List("even") else List("odd")
)

Stream Configuration

Operator Configuration

class DataStream[T] {
  def setParallelism(parallelism: Int): DataStream[T]
  def setMaxParallelism(maxParallelism: Int): DataStream[T]
  def name(name: String): DataStream[T]
  def uid(uid: String): DataStream[T]
  def setUidHash(hash: String): DataStream[T]
  def disableChaining(): DataStream[T]
  def startNewChain(): DataStream[T]
  def slotSharingGroup(slotSharingGroup: String): DataStream[T]
  def setBufferTimeout(timeoutMillis: Long): DataStream[T]
}

Configure transformation operators:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromElements(1, 2, 3, 4, 5)

val result = stream
  .map(_ * 2)
  .name("Doubler")                    // Set operator name
  .uid("doubler-operator")            // Set unique ID
  .setParallelism(4)                  // Set parallelism
  .disableChaining()                  // Disable operator chaining
  .slotSharingGroup("group1")         // Set slot sharing group
  .filter(_ > 5)
  .name("Filter Large Numbers")
  .startNewChain()                    // Start new operator chain

Error Handling in Transformations

Exception Handling

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration

class SafeMapper extends RichMapFunction[String, Int] {
  override def map(value: String): Int = {
    try {
      value.toInt
    } catch {
      case _: NumberFormatException =>
        // Log error or emit to side output
        getRuntimeContext.getMetricGroup
          .counter("parsing-errors")
          .inc()
        -1  // Default value
    }
  }
}

Dead Letter Queue Pattern

val validTag = OutputTag[Int]("valid")
val invalidTag = OutputTag[String]("invalid")

class ValidatingProcessor extends ProcessFunction[String, String] {
  override def processElement(
    value: String,
    ctx: Context,
    out: Collector[String]
  ): Unit = {
    try {
      val number = value.toInt
      ctx.output(validTag, number)
      out.collect(s"Processed: $number")
    } catch {
      case _: NumberFormatException =>
        ctx.output(invalidTag, value)
    }
  }
}

val env = StreamExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements("1", "2", "invalid", "4", "not-a-number", "6")

val processed = input.process(new ValidatingProcessor)
val validNumbers = processed.getSideOutput(validTag)
val invalidInputs = processed.getSideOutput(invalidTag)

// Handle valid and invalid data separately
validNumbers.print("Valid")
invalidInputs.print("Invalid")

Performance Considerations

Operator Fusion and Chaining

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Operations are chained together by default for efficiency
val stream = env.fromElements(1, 2, 3, 4, 5)
  .map(_ * 2)      // Chained with filter
  .filter(_ > 5)   // Chained with map
  .map(_ + 1)      // Chained with previous operations

// Explicit chaining control
val explicitChaining = env.fromElements(1, 2, 3, 4, 5)
  .map(_ * 2)
  .startNewChain()  // Force new chain
  .filter(_ > 5)
  .disableChaining()  // Disable chaining for this operator
  .map(_ + 1)

Memory and State Considerations

// Prefer stateless transformations when possible
val stateless = stream.map(_ * 2)  // No state required

// Use process functions judiciously as they can maintain state
class StatefulProcessor extends ProcessFunction[Int, Int] {
  // This would maintain state per key
  override def processElement(value: Int, ctx: Context, out: Collector[Int]): Unit = {
    // Processing logic
  }
}

Complete Example: Text Processing Pipeline

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector

object TextProcessingPipeline {
  
  case class WordCount(word: String, count: Int)
  
  // Output tags for different types of words
  val shortWordsTag = OutputTag[String]("short-words")
  val longWordsTag = OutputTag[String]("long-words")
  
  class WordClassifier extends ProcessFunction[String, WordCount] {
    override def processElement(
      word: String,
      ctx: Context,
      out: Collector[WordCount]
    ): Unit = {
      val cleanWord = word.toLowerCase.trim
      
      if (cleanWord.length < 4) {
        ctx.output(shortWordsTag, cleanWord)
      } else if (cleanWord.length > 8) {
        ctx.output(longWordsTag, cleanWord)
      }
      
      out.collect(WordCount(cleanWord, 1))
    }
  }
  
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    // Read text input
    val textStream = env.socketTextStream("localhost", 9999)
    
    // Process text through transformation pipeline
    val words = textStream
      .flatMap(_.split("\\W+"))
      .filter(_.nonEmpty)
      .name("Word Splitter")
      .setParallelism(4)
    
    // Classify and count words
    val wordCounts = words
      .process(new WordClassifier)
      .name("Word Classifier")
    
    // Get side outputs
    val shortWords = wordCounts.getSideOutput(shortWordsTag)
    val longWords = wordCounts.getSideOutput(longWordsTag)
    
    // Print results
    wordCounts.print("Word Counts")
    shortWords.print("Short Words")
    longWords.print("Long Words")
    
    env.execute("Text Processing Pipeline")
  }
}