Windowing in Flink allows you to group stream elements into finite sets for computation. Windows can be time-based (processing time or event time) or count-based, and can be tumbling, sliding, or session windows.
Core operations available on windowed streams.
/**
* WindowedStream operations for keyed streams
*/
class WindowedStream[T, K, W <: Window] {
def reduce(reducer: (T, T) => T): DataStream[T]
def reduce(reducer: ReduceFunction[T]): DataStream[T]
def aggregate[ACC, R](aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R]
def fold[R](initialValue: R)(folder: (R, T) => R): DataStream[R] // Deprecated
def apply[R](function: WindowFunction[T, R, K, W]): DataStream[R]
def process[R](function: ProcessWindowFunction[T, R, K, W]): DataStream[R]
}Usage Examples:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
val readings = env.fromElements(
SensorReading("sensor1", 20.0, 1000L),
SensorReading("sensor1", 25.0, 2000L),
SensorReading("sensor1", 22.0, 5000L)
)
val keyedReadings = readings
.assignAscendingTimestamps(_.timestamp)
.keyBy(_.sensorId)
.timeWindow(Time.seconds(5))
// Reduce: find maximum temperature in each window
val maxTemp = keyedReadings.reduce((r1, r2) =>
if (r1.temperature > r2.temperature) r1 else r2)
// Apply window function: calculate average temperature
val avgTemp = keyedReadings.apply(new WindowFunction[SensorReading, (String, Double), String, TimeWindow] {
override def apply(
key: String,
window: TimeWindow,
input: Iterable[SensorReading],
out: Collector[(String, Double)]
): Unit = {
val readings = input.toList
val avgTemperature = readings.map(_.temperature).sum / readings.length
out.collect((key, avgTemperature))
}
})Operations for non-keyed windowed streams.
/**
* AllWindowedStream operations for non-keyed streams
*/
class AllWindowedStream[T, W <: Window] {
def reduce(reducer: (T, T) => T): DataStream[T]
def reduce(reducer: ReduceFunction[T]): DataStream[T]
def aggregate[ACC, R](aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R]
def fold[R](initialValue: R)(folder: (R, T) => R): DataStream[R] // Deprecated
def apply[R](function: AllWindowFunction[T, R, W]): DataStream[R]
def process[R](function: ProcessAllWindowFunction[T, R, W]): DataStream[R]
}Usage Examples:
// Non-keyed windowing: process all elements together
val allReadings = readings
.assignAscendingTimestamps(_.timestamp)
.timeWindowAll(Time.seconds(10))
// Calculate global statistics
val globalStats = allReadings.apply(new AllWindowFunction[SensorReading, String, TimeWindow] {
override def apply(
window: TimeWindow,
values: Iterable[SensorReading],
out: Collector[String]
): Unit = {
val readings = values.toList
val count = readings.length
val avgTemp = readings.map(_.temperature).sum / count
out.collect(s"Window ${window.getStart}-${window.getEnd}: $count readings, avg temp: $avgTemp")
}
})Different types of window assigners for creating windows.
/**
* Window assigners
*/
// Time-based windows
object TumblingEventTimeWindows {
def of(size: Time): TumblingEventTimeWindows
def of(size: Time, offset: Time): TumblingEventTimeWindows
}
object TumblingProcessingTimeWindows {
def of(size: Time): TumblingProcessingTimeWindows
def of(size: Time, offset: Time): TumblingProcessingTimeWindows
}
object SlidingEventTimeWindows {
def of(size: Time, slide: Time): SlidingEventTimeWindows
def of(size: Time, slide: Time, offset: Time): SlidingEventTimeWindows
}
object SlidingProcessingTimeWindows {
def of(size: Time, slide: Time): SlidingProcessingTimeWindows
def of(size: Time, slide: Time, offset: Time): SlidingProcessingTimeWindows
}
object EventTimeSessionWindows {
def withGap(sessionTimeout: Time): EventTimeSessionWindows
}
object ProcessingTimeSessionWindows {
def withGap(sessionTimeout: Time): ProcessingTimeSessionWindows
}
// Count-based windows
object GlobalWindows {
def create(): GlobalWindows
}Usage Examples:
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
val keyedStream = readings.keyBy(_.sensorId)
// Tumbling event time windows of 1 minute
val tumblingWindow = keyedStream.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// Sliding event time windows: 10 minutes size, 5 minutes slide
val slidingWindow = keyedStream.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
// Session windows with 30 seconds timeout
val sessionWindow = keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(30)))
// Count windows using GlobalWindows with triggers
val countWindow = keyedStream.window(GlobalWindows.create())Custom window functions for processing windowed data.
/**
* Window function interfaces
*/
trait WindowFunction[IN, OUT, KEY, W <: Window] {
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}
trait AllWindowFunction[IN, OUT, W <: Window] {
def apply(window: W, values: Iterable[IN], out: Collector[OUT]): Unit
}
trait ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
trait Context {
def window: W
def currentProcessingTime: Long
def currentWatermark: Long
def windowState: KeyedStateStore
def globalState: KeyedStateStore
def output[X](outputTag: OutputTag[X], value: X): Unit
}
}
trait ProcessAllWindowFunction[IN, OUT, W <: Window] {
def process(context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
trait Context {
def window: W
def currentProcessingTime: Long
def currentWatermark: Long
def windowState: KeyedStateStore
def globalState: KeyedStateStore
def output[X](outputTag: OutputTag[X], value: X): Unit
}
}Usage Examples:
// Custom window function with rich context
class TemperatureStatsFunction extends ProcessWindowFunction[SensorReading, String, String, TimeWindow] {
override def process(
key: String,
context: Context,
elements: Iterable[SensorReading],
out: Collector[String]
): Unit = {
val readings = elements.toList
val count = readings.length
val avgTemp = readings.map(_.temperature).sum / count
val minTemp = readings.map(_.temperature).min
val maxTemp = readings.map(_.temperature).max
val windowStart = context.window.getStart
val windowEnd = context.window.getEnd
val watermark = context.currentWatermark
out.collect(s"Sensor $key: Window [$windowStart-$windowEnd], " +
s"Count: $count, Avg: $avgTemp, Min: $minTemp, Max: $maxTemp, " +
s"Watermark: $watermark")
}
}
val stats = keyedReadings
.timeWindow(Time.minutes(1))
.process(new TemperatureStatsFunction())Efficient incremental aggregation functions.
/**
* Aggregate function interface
*/
trait AggregateFunction[IN, ACC, OUT] {
def createAccumulator(): ACC
def add(value: IN, accumulator: ACC): ACC
def getResult(accumulator: ACC): OUT
def merge(a: ACC, b: ACC): ACC
}Usage Examples:
// Custom aggregate function for calculating average
class AverageAggregateFunction extends AggregateFunction[SensorReading, (Double, Long), Double] {
override def createAccumulator(): (Double, Long) = (0.0, 0L)
override def add(reading: SensorReading, accumulator: (Double, Long)): (Double, Long) = {
(accumulator._1 + reading.temperature, accumulator._2 + 1L)
}
override def getResult(accumulator: (Double, Long)): Double = {
accumulator._1 / accumulator._2
}
override def merge(a: (Double, Long), b: (Double, Long)): (Double, Long) = {
(a._1 + b._1, a._2 + b._2)
}
}
val avgTemperature = keyedReadings
.timeWindow(Time.minutes(1))
.aggregate(new AverageAggregateFunction())Custom triggers for controlling when windows fire.
/**
* Window triggers
*/
abstract class Trigger[T, W <: Window] {
def onElement(element: T, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult
def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult
def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult
def clear(window: W, ctx: TriggerContext): Unit
}
enum TriggerResult {
CONTINUE, FIRE_AND_PURGE, FIRE, PURGE
}Custom evictors for removing elements from windows.
/**
* Window evictors
*/
trait Evictor[T, W <: Window] {
def evictBefore(elements: Iterable[TimestampedValue[T]], size: Int, window: W, evictorContext: EvictorContext): Unit
def evictAfter(elements: Iterable[TimestampedValue[T]], size: Int, window: W, evictorContext: EvictorContext): Unit
}// Window stream types
class WindowedStream[T, K, W <: Window]
class AllWindowedStream[T, W <: Window]
// Window types
trait Window {
def maxTimestamp(): Long
}
class TimeWindow(start: Long, end: Long) extends Window
class GlobalWindow extends Window
// Window assigners
trait WindowAssigner[T, W <: Window]
class TumblingEventTimeWindows extends WindowAssigner[Object, TimeWindow]
class TumblingProcessingTimeWindows extends WindowAssigner[Object, TimeWindow]
class SlidingEventTimeWindows extends WindowAssigner[Object, TimeWindow]
class SlidingProcessingTimeWindows extends WindowAssigner[Object, TimeWindow]
class EventTimeSessionWindows extends WindowAssigner[Object, TimeWindow]
class ProcessingTimeSessionWindows extends WindowAssigner[Object, TimeWindow]
class GlobalWindows extends WindowAssigner[Object, GlobalWindow]
// Function types
trait WindowFunction[IN, OUT, KEY, W <: Window]
trait AllWindowFunction[IN, OUT, W <: Window]
trait ProcessWindowFunction[IN, OUT, KEY, W <: Window]
trait ProcessAllWindowFunction[IN, OUT, W <: Window]
trait AggregateFunction[IN, ACC, OUT]
// Control types
abstract class Trigger[T, W <: Window]
enum TriggerResult
trait Evictor[T, W <: Window]
class TimestampedValue[T]
// Time type
class Time