or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-operations.mddata-stream.mdfunctions.mdindex.mdjoining.mdkeyed-stream.mdstream-execution-environment.mdwindowing.md
tile.json

keyed-stream.mddocs/

Keyed Stream

KeyedStream represents a partitioned stream where elements with the same key are guaranteed to be processed by the same operator instance. This enables stateful operations, aggregations, and key-based computations.

Capabilities

Aggregation Operations

Built-in aggregation functions for common operations.

/**
 * Aggregation operations on keyed streams
 */
class KeyedStream[T, K] {
  def sum(field: Int): DataStream[T]
  def sum(field: String): DataStream[T]
  def min(field: Int): DataStream[T]
  def min(field: String): DataStream[T]
  def max(field: Int): DataStream[T]
  def max(field: String): DataStream[T]
  def minBy(field: Int): DataStream[T]
  def minBy(field: String): DataStream[T]
  def maxBy(field: Int): DataStream[T]
  def maxBy(field: String): DataStream[T]
}

Usage Examples:

import org.apache.flink.streaming.api.scala._

case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)

val readings = env.fromElements(
  SensorReading("sensor1", 20.0, 1000L),
  SensorReading("sensor1", 25.0, 2000L),
  SensorReading("sensor2", 15.0, 1500L),
  SensorReading("sensor2", 18.0, 2500L)
)

val keyedReadings = readings.keyBy(_.sensorId)

// Sum temperatures by sensor
val tempSum = keyedReadings.sum("temperature")

// Get maximum temperature reading per sensor
val maxTemp = keyedReadings.max("temperature")

// Get the reading with maximum temperature per sensor (entire object)
val maxTempReading = keyedReadings.maxBy("temperature")

Reduction Operations

Custom reduction logic for aggregating elements.

/**
 * Reduction operations
 */
class KeyedStream[T, K] {
  def reduce(reducer: (T, T) => T): DataStream[T]
  def reduce(reducer: ReduceFunction[T]): DataStream[T]
}

Usage Examples:

case class WordCount(word: String, count: Int)

val words = env.fromElements(
  WordCount("hello", 1),
  WordCount("world", 1),
  WordCount("hello", 1),
  WordCount("flink", 1)
)

val keyedWords = words.keyBy(_.word)

// Reduce to count words
val wordCounts = keyedWords.reduce((w1, w2) => WordCount(w1.word, w1.count + w2.count))

// Using ReduceFunction
val wordCountsFunc = keyedWords.reduce(new ReduceFunction[WordCount] {
  override def reduce(value1: WordCount, value2: WordCount): WordCount = {
    WordCount(value1.word, value1.count + value2.count)
  }
})

Windowing Operations

Apply windowing to keyed streams for time-based or count-based operations.

/**
 * Window operations on keyed streams
 */
class KeyedStream[T, K] {
  def window[W <: Window](assigner: WindowAssigner[T, W]): WindowedStream[T, K, W]
  def timeWindow(size: Time): WindowedStream[T, K, TimeWindow]
  def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow]
  def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]
  def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow]
}

Usage Examples:

import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows

val keyedReadings = readings.keyBy(_.sensorId)

// Tumbling time window of 5 seconds
val windowedReadings = keyedReadings.timeWindow(Time.seconds(5))

// Sliding time window: 10 second window, sliding every 5 seconds
val slidingWindowedReadings = keyedReadings.timeWindow(Time.seconds(10), Time.seconds(5))

// Count window: trigger every 100 elements
val countWindowedReadings = keyedReadings.countWindow(100)

// Custom window assigner
val customWindowedReadings = keyedReadings.window(TumblingEventTimeWindows.of(Time.minutes(1)))

Process Functions

Apply custom stateful processing logic.

/**
 * Process functions for stateful operations
 */
class KeyedStream[T, K] {
  def process[R](function: KeyedProcessFunction[K, T, R]): DataStream[R]
  def process[R](function: KeyedProcessFunction[K, T, R], outputType: TypeInformation[R]): DataStream[R]
}

Usage Examples:

import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector

class TemperatureAlertFunction extends KeyedProcessFunction[String, SensorReading, String] {
  
  override def processElement(
    value: SensorReading,
    ctx: KeyedProcessFunction.Context,
    out: Collector[String]
  ): Unit = {
    // Custom stateful logic
    if (value.temperature > 30.0) {
      out.collect(s"Alert: High temperature ${value.temperature} from ${value.sensorId}")
    }
    
    // Set timer for 10 seconds in the future
    ctx.timerService().registerEventTimeTimer(value.timestamp + 10000)
  }
  
  override def onTimer(
    timestamp: Long,
    ctx: KeyedProcessFunction.OnTimerContext,
    out: Collector[String]
  ): Unit = {
    out.collect(s"Timer fired at $timestamp for key ${ctx.getCurrentKey}")
  }
}

val alerts = keyedReadings.process(new TemperatureAlertFunction())

State Management

Access and manage keyed state for stateful computations.

/**
 * State management operations (available in process functions)
 */
trait KeyedProcessFunction[K, I, O] {
  def getRuntimeContext: RuntimeContext
  
  // State descriptors and access methods available through getRuntimeContext
}

// State types available
trait ValueState[T]
trait ListState[T] 
trait MapState[UK, UV]
trait ReducingState[T]
trait AggregatingState[IN, OUT]

Usage Examples:

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration

class StatefulProcessFunction extends KeyedProcessFunction[String, SensorReading, String] {
  
  private var lastTemperature: ValueState[Double] = _
  
  override def open(parameters: Configuration): Unit = {
    val descriptor = new ValueStateDescriptor[Double]("lastTemperature", classOf[Double])
    lastTemperature = getRuntimeContext.getState(descriptor)
  }
  
  override def processElement(
    value: SensorReading,
    ctx: KeyedProcessFunction.Context,
    out: Collector[String]
  ): Unit = {
    val lastTemp = Option(lastTemperature.value()).getOrElse(0.0)
    
    if (math.abs(value.temperature - lastTemp) > 5.0) {
      out.collect(s"Temperature change detected: $lastTemp -> ${value.temperature}")
    }
    
    lastTemperature.update(value.temperature)
  }
}

Fold Operations (Deprecated)

Legacy fold operations for custom aggregations.

/**
 * Fold operations (deprecated, use process functions instead)
 */
class KeyedStream[T, K] {
  @deprecated("Use process functions with state instead", "1.12")
  def fold[R](initialValue: R)(folder: (R, T) => R): DataStream[R]
  
  @deprecated("Use process functions with state instead", "1.12")
  def fold[R](initialValue: R, folder: FoldFunction[T, R]): DataStream[R]
}

Stream Configuration

Configure properties of the keyed stream.

/**
 * Configuration operations
 */
class KeyedStream[T, K] {
  def setParallelism(parallelism: Int): KeyedStream[T, K]
  def getParallelism: Int
  def setMaxParallelism(maxParallelism: Int): KeyedStream[T, K]
  def getMaxParallelism: Int
  def name(name: String): KeyedStream[T, K]
  def uid(uid: String): KeyedStream[T, K]
  def setUidHash(uidHash: String): KeyedStream[T, K]
  def disableChaining(): KeyedStream[T, K]
  def startNewChain(): KeyedStream[T, K]
  def slotSharingGroup(slotSharingGroup: String): KeyedStream[T, K]
}

Interval Joins

Join keyed streams based on time intervals.

/**
 * Interval join operations
 */
class KeyedStream[T, K] {
  def intervalJoin[T2](otherStream: KeyedStream[T2, K]): IntervalJoin[T, T2, K]
}

class IntervalJoin[T1, T2, K] {
  def between(lowerBound: Time, upperBound: Time): IntervalJoin[T1, T2, K]
  def process[R](function: ProcessJoinFunction[T1, T2, R]): DataStream[R]
}

Usage Examples:

val orders = env.fromElements(/* order events */).keyBy(_.orderId)
val payments = env.fromElements(/* payment events */).keyBy(_.orderId)

// Join orders and payments within 10 minutes
val joinedStream = orders
  .intervalJoin(payments)
  .between(Time.minutes(-10), Time.minutes(10))
  .process(new ProcessJoinFunction[Order, Payment, OrderPayment] {
    override def processElement(
      left: Order,
      right: Payment,
      ctx: ProcessJoinFunction.Context,
      out: Collector[OrderPayment]
    ): Unit = {
      out.collect(OrderPayment(left, right))
    }
  })

Types

// Main keyed stream class
class KeyedStream[T, K]

// Window-related types
class WindowedStream[T, K, W <: Window]
trait Window
class TimeWindow extends Window
class GlobalWindow extends Window

// Function types
trait ReduceFunction[T]
trait FoldFunction[T, O] // Deprecated
trait KeyedProcessFunction[K, I, O]
trait ProcessJoinFunction[IN1, IN2, OUT]

// State types
trait ValueState[T]
trait ListState[T]
trait MapState[UK, UV] 
trait ReducingState[T]
trait AggregatingState[IN, OUT]

// State descriptors
class ValueStateDescriptor[T]
class ListStateDescriptor[T]
class MapStateDescriptor[UK, UV]
class ReducingStateDescriptor[T]
class AggregatingStateDescriptor[IN, ACC, OUT]

// Join types
class IntervalJoin[T1, T2, K]

// Time types
class Time