or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-operations.mddata-stream.mdfunctions.mdindex.mdjoining.mdkeyed-stream.mdstream-execution-environment.mdwindowing.md
tile.json

windowing.mddocs/

Windowing

Windowing in Flink allows you to group stream elements into finite sets for computation. Windows can be time-based (processing time or event time) or count-based, and can be tumbling, sliding, or session windows.

Capabilities

WindowedStream Operations

Core operations available on windowed streams.

/**
 * WindowedStream operations for keyed streams
 */
class WindowedStream[T, K, W <: Window] {
  def reduce(reducer: (T, T) => T): DataStream[T]
  def reduce(reducer: ReduceFunction[T]): DataStream[T]
  def aggregate[ACC, R](aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R]
  def fold[R](initialValue: R)(folder: (R, T) => R): DataStream[R] // Deprecated
  def apply[R](function: WindowFunction[T, R, K, W]): DataStream[R]
  def process[R](function: ProcessWindowFunction[T, R, K, W]): DataStream[R]
}

Usage Examples:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)

val readings = env.fromElements(
  SensorReading("sensor1", 20.0, 1000L),
  SensorReading("sensor1", 25.0, 2000L),
  SensorReading("sensor1", 22.0, 5000L)
)

val keyedReadings = readings
  .assignAscendingTimestamps(_.timestamp)
  .keyBy(_.sensorId)
  .timeWindow(Time.seconds(5))

// Reduce: find maximum temperature in each window
val maxTemp = keyedReadings.reduce((r1, r2) => 
  if (r1.temperature > r2.temperature) r1 else r2)

// Apply window function: calculate average temperature
val avgTemp = keyedReadings.apply(new WindowFunction[SensorReading, (String, Double), String, TimeWindow] {
  override def apply(
    key: String,
    window: TimeWindow,
    input: Iterable[SensorReading],
    out: Collector[(String, Double)]
  ): Unit = {
    val readings = input.toList
    val avgTemperature = readings.map(_.temperature).sum / readings.length
    out.collect((key, avgTemperature))
  }
})

AllWindowedStream Operations

Operations for non-keyed windowed streams.

/**
 * AllWindowedStream operations for non-keyed streams
 */
class AllWindowedStream[T, W <: Window] {
  def reduce(reducer: (T, T) => T): DataStream[T]
  def reduce(reducer: ReduceFunction[T]): DataStream[T]  
  def aggregate[ACC, R](aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R]
  def fold[R](initialValue: R)(folder: (R, T) => R): DataStream[R] // Deprecated
  def apply[R](function: AllWindowFunction[T, R, W]): DataStream[R]
  def process[R](function: ProcessAllWindowFunction[T, R, W]): DataStream[R]
}

Usage Examples:

// Non-keyed windowing: process all elements together
val allReadings = readings
  .assignAscendingTimestamps(_.timestamp)
  .timeWindowAll(Time.seconds(10))

// Calculate global statistics
val globalStats = allReadings.apply(new AllWindowFunction[SensorReading, String, TimeWindow] {
  override def apply(
    window: TimeWindow,
    values: Iterable[SensorReading],
    out: Collector[String]
  ): Unit = {
    val readings = values.toList
    val count = readings.length
    val avgTemp = readings.map(_.temperature).sum / count
    out.collect(s"Window ${window.getStart}-${window.getEnd}: $count readings, avg temp: $avgTemp")
  }
})

Window Assigners

Different types of window assigners for creating windows.

/**
 * Window assigners
 */
// Time-based windows
object TumblingEventTimeWindows {
  def of(size: Time): TumblingEventTimeWindows
  def of(size: Time, offset: Time): TumblingEventTimeWindows
}

object TumblingProcessingTimeWindows {
  def of(size: Time): TumblingProcessingTimeWindows
  def of(size: Time, offset: Time): TumblingProcessingTimeWindows
}

object SlidingEventTimeWindows {
  def of(size: Time, slide: Time): SlidingEventTimeWindows
  def of(size: Time, slide: Time, offset: Time): SlidingEventTimeWindows
}

object SlidingProcessingTimeWindows {
  def of(size: Time, slide: Time): SlidingProcessingTimeWindows
  def of(size: Time, slide: Time, offset: Time): SlidingProcessingTimeWindows
}

object EventTimeSessionWindows {
  def withGap(sessionTimeout: Time): EventTimeSessionWindows
}

object ProcessingTimeSessionWindows {
  def withGap(sessionTimeout: Time): ProcessingTimeSessionWindows
}

// Count-based windows
object GlobalWindows {
  def create(): GlobalWindows
}

Usage Examples:

import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time

val keyedStream = readings.keyBy(_.sensorId)

// Tumbling event time windows of 1 minute
val tumblingWindow = keyedStream.window(TumblingEventTimeWindows.of(Time.minutes(1)))

// Sliding event time windows: 10 minutes size, 5 minutes slide
val slidingWindow = keyedStream.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))

// Session windows with 30 seconds timeout
val sessionWindow = keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(30)))

// Count windows using GlobalWindows with triggers
val countWindow = keyedStream.window(GlobalWindows.create())

Window Functions

Custom window functions for processing windowed data.

/**
 * Window function interfaces
 */
trait WindowFunction[IN, OUT, KEY, W <: Window] {
  def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}

trait AllWindowFunction[IN, OUT, W <: Window] {
  def apply(window: W, values: Iterable[IN], out: Collector[OUT]): Unit
}

trait ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
  def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
  
  trait Context {
    def window: W
    def currentProcessingTime: Long
    def currentWatermark: Long
    def windowState: KeyedStateStore
    def globalState: KeyedStateStore
    def output[X](outputTag: OutputTag[X], value: X): Unit
  }
}

trait ProcessAllWindowFunction[IN, OUT, W <: Window] {
  def process(context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
  
  trait Context {
    def window: W
    def currentProcessingTime: Long  
    def currentWatermark: Long
    def windowState: KeyedStateStore
    def globalState: KeyedStateStore
    def output[X](outputTag: OutputTag[X], value: X): Unit
  }
}

Usage Examples:

// Custom window function with rich context
class TemperatureStatsFunction extends ProcessWindowFunction[SensorReading, String, String, TimeWindow] {
  
  override def process(
    key: String,
    context: Context,
    elements: Iterable[SensorReading],
    out: Collector[String]
  ): Unit = {
    val readings = elements.toList
    val count = readings.length
    val avgTemp = readings.map(_.temperature).sum / count
    val minTemp = readings.map(_.temperature).min
    val maxTemp = readings.map(_.temperature).max
    
    val windowStart = context.window.getStart
    val windowEnd = context.window.getEnd
    val watermark = context.currentWatermark
    
    out.collect(s"Sensor $key: Window [$windowStart-$windowEnd], " +
               s"Count: $count, Avg: $avgTemp, Min: $minTemp, Max: $maxTemp, " +
               s"Watermark: $watermark")
  }
}

val stats = keyedReadings
  .timeWindow(Time.minutes(1))
  .process(new TemperatureStatsFunction())

Aggregate Functions

Efficient incremental aggregation functions.

/**
 * Aggregate function interface
 */
trait AggregateFunction[IN, ACC, OUT] {
  def createAccumulator(): ACC
  def add(value: IN, accumulator: ACC): ACC
  def getResult(accumulator: ACC): OUT
  def merge(a: ACC, b: ACC): ACC
}

Usage Examples:

// Custom aggregate function for calculating average
class AverageAggregateFunction extends AggregateFunction[SensorReading, (Double, Long), Double] {
  
  override def createAccumulator(): (Double, Long) = (0.0, 0L)
  
  override def add(reading: SensorReading, accumulator: (Double, Long)): (Double, Long) = {
    (accumulator._1 + reading.temperature, accumulator._2 + 1L)
  }
  
  override def getResult(accumulator: (Double, Long)): Double = {
    accumulator._1 / accumulator._2
  }
  
  override def merge(a: (Double, Long), b: (Double, Long)): (Double, Long) = {
    (a._1 + b._1, a._2 + b._2)
  }
}

val avgTemperature = keyedReadings
  .timeWindow(Time.minutes(1))
  .aggregate(new AverageAggregateFunction())

Window Triggers

Custom triggers for controlling when windows fire.

/**
 * Window triggers
 */
abstract class Trigger[T, W <: Window] {
  def onElement(element: T, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult
  def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult
  def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult
  def clear(window: W, ctx: TriggerContext): Unit
}

enum TriggerResult {
  CONTINUE, FIRE_AND_PURGE, FIRE, PURGE
}

Evictors

Custom evictors for removing elements from windows.

/**
 * Window evictors
 */
trait Evictor[T, W <: Window] {
  def evictBefore(elements: Iterable[TimestampedValue[T]], size: Int, window: W, evictorContext: EvictorContext): Unit
  def evictAfter(elements: Iterable[TimestampedValue[T]], size: Int, window: W, evictorContext: EvictorContext): Unit
}

Types

// Window stream types
class WindowedStream[T, K, W <: Window]
class AllWindowedStream[T, W <: Window]

// Window types
trait Window {
  def maxTimestamp(): Long
}
class TimeWindow(start: Long, end: Long) extends Window
class GlobalWindow extends Window

// Window assigners
trait WindowAssigner[T, W <: Window]
class TumblingEventTimeWindows extends WindowAssigner[Object, TimeWindow]
class TumblingProcessingTimeWindows extends WindowAssigner[Object, TimeWindow]
class SlidingEventTimeWindows extends WindowAssigner[Object, TimeWindow] 
class SlidingProcessingTimeWindows extends WindowAssigner[Object, TimeWindow]
class EventTimeSessionWindows extends WindowAssigner[Object, TimeWindow]
class ProcessingTimeSessionWindows extends WindowAssigner[Object, TimeWindow]
class GlobalWindows extends WindowAssigner[Object, GlobalWindow]

// Function types
trait WindowFunction[IN, OUT, KEY, W <: Window]
trait AllWindowFunction[IN, OUT, W <: Window]
trait ProcessWindowFunction[IN, OUT, KEY, W <: Window]
trait ProcessAllWindowFunction[IN, OUT, W <: Window]
trait AggregateFunction[IN, ACC, OUT]

// Control types
abstract class Trigger[T, W <: Window]
enum TriggerResult
trait Evictor[T, W <: Window]
class TimestampedValue[T]

// Time type
class Time