or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-operations.mddata-sources.mdfunction-interfaces.mdindex.mdkeyed-streams.mdoutput-operations.mdscala-extensions.mdstream-composition.mdstream-environment.mdstream-partitioning.mdstream-transformations.mdwindow-operations.mdwindowing.md
tile.json

windowing.mddocs/

Windowing and Time-Based Processing

Windowing in Flink enables processing of unbounded streams by grouping elements into finite sets called windows. Flink supports both keyed and non-keyed windowing with various window types and time semantics.

Keyed Windowing

Time-Based Windows

class KeyedStream[T, K] {
  def timeWindow(size: Time): WindowedStream[T, K, TimeWindow]
  def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow]
}

Create time-based windows for keyed streams:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

val env = StreamExecutionEnvironment.getExecutionEnvironment

case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)

val sensorData = env.fromElements(
  SensorReading("sensor1", 20.0, 1000),
  SensorReading("sensor1", 22.0, 2000),
  SensorReading("sensor2", 18.0, 1500),
  SensorReading("sensor2", 25.0, 2500)
).keyBy(_.sensorId)

// Tumbling time windows (non-overlapping)
val tumblingWindows = sensorData
  .timeWindow(Time.minutes(5))  // 5-minute tumbling windows

// Sliding time windows (overlapping)
val slidingWindows = sensorData
  .timeWindow(Time.minutes(10), Time.minutes(2))  // 10-minute windows, sliding every 2 minutes

// Process windows
tumblingWindows
  .apply((key, window, readings, out) => {
    val avgTemp = readings.map(_.temperature).sum / readings.size
    out.collect((key, avgTemp, window.getStart, window.getEnd))
  })
  .print()

Count-Based Windows

class KeyedStream[T, K] {
  def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]
  def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow]
}

Create count-based windows:

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

val env = StreamExecutionEnvironment.getExecutionEnvironment
val events = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  .keyBy(identity)

// Tumbling count windows
val tumblingCountWindows = events
  .countWindow(3)  // Every 3 elements

// Sliding count windows
val slidingCountWindows = events
  .countWindow(5, 2)  // 5 elements per window, slide by 2

tumblingCountWindows
  .sum(0)
  .print("Tumbling Count")

slidingCountWindows
  .sum(0)
  .print("Sliding Count")

Custom Window Assigners

class KeyedStream[T, K] {
  def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]
}

Use custom window assigners:

import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time

val env = StreamExecutionEnvironment.getExecutionEnvironment
val keyedStream = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)

// Session windows (windows that close after period of inactivity)
val sessionWindows = keyedStream
  .window(EventTimeSessionWindows.withGap(Time.minutes(10)))

// Global windows (all elements in one window until manually triggered)
val globalWindows = keyedStream
  .window(GlobalWindows.create())

// Processing time sliding windows
val processingTimeWindows = keyedStream
  .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(5)))

// Event time tumbling windows
val eventTimeWindows = keyedStream
  .window(TumblingEventTimeWindows.of(Time.hours(1)))

Non-Keyed Windowing (All Windows)

Time-Based All Windows

class DataStream[T] {
  def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow]
  def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow]
}

Apply windows to entire stream (all elements):

val env = StreamExecutionEnvironment.getExecutionEnvironment
val allData = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// All elements in 5-minute tumbling windows
val allTumbling = allData
  .timeWindowAll(Time.minutes(5))

// All elements in 10-minute sliding windows
val allSliding = allData
  .timeWindowAll(Time.minutes(10), Time.minutes(2))

allTumbling
  .sum(0)
  .print("All Tumbling")

Count-Based All Windows

class DataStream[T] {
  def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow]
  def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow]
}

Count windows over all elements:

val allCountWindows = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  .countWindowAll(4)  // Every 4 elements

allCountWindows
  .sum(0)
  .print("All Count Windows")

Custom All Window Assigners

class DataStream[T] {
  def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W]
}

Window Configuration

Triggers

class WindowedStream[T, K, W <: Window] {
  def trigger(trigger: Trigger[_ >: T, _ >: W]): WindowedStream[T, K, W]
}

class AllWindowedStream[T, W <: Window] {
  def trigger(trigger: Trigger[_ >: T, _ >: W]): AllWindowedStream[T, W]
}

Control when windows fire:

import org.apache.flink.streaming.api.windowing.triggers._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val keyedStream = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)

// Custom trigger: fire on every element or when window is full
val customTrigger = keyedStream
  .timeWindow(Time.minutes(5))
  .trigger(CountTrigger.of(10))  // Fire when 10 elements or window end

// Processing time trigger
val processingTrigger = keyedStream
  .timeWindow(Time.minutes(5))
  .trigger(ProcessingTimeTrigger.create())

// Purging trigger (removes elements after firing)
val purgingTrigger = keyedStream
  .timeWindow(Time.minutes(5))
  .trigger(PurgingTrigger.of(CountTrigger.of(5)))

Evictors

class WindowedStream[T, K, W <: Window] {
  def evictor(evictor: Evictor[_ >: T, _ >: W]): WindowedStream[T, K, W]
}

Remove elements from windows before or after function application:

import org.apache.flink.streaming.api.windowing.evictors._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val keyedStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).keyBy(identity)

// Count evictor: keep only the latest N elements
val countEvictor = keyedStream
  .timeWindow(Time.minutes(5))
  .evictor(CountEvictor.of(5))  // Keep only latest 5 elements

// Time evictor: keep only elements from last N time units
val timeEvictor = keyedStream
  .timeWindow(Time.minutes(5))
  .evictor(TimeEvictor.of(Time.minutes(2)))  // Keep only last 2 minutes

// Delta evictor: keep elements within threshold
val deltaEvictor = keyedStream
  .timeWindow(Time.minutes(5))
  .evictor(DeltaEvictor.of(5.0, (a: Int, b: Int) => Math.abs(a - b).toDouble))

Allowed Lateness

class WindowedStream[T, K, W <: Window] {
  def allowedLateness(lateness: Time): WindowedStream[T, K, W]
}

Handle late-arriving data:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val lateDataStream = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)

// Allow 1 minute of lateness
val windowWithLateness = lateDataStream
  .timeWindow(Time.minutes(5))
  .allowedLateness(Time.minutes(1))
  .sum(0)

Side Output for Late Data

class WindowedStream[T, K, W <: Window] {
  def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W]
}

Route late data to side output:

import org.apache.flink.streaming.api.scala.OutputTag

val env = StreamExecutionEnvironment.getExecutionEnvironment
val lateDataTag = OutputTag[Int]("late-data")

val mainResult = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)
  .timeWindow(Time.minutes(5))
  .allowedLateness(Time.minutes(1))
  .sideOutputLateData(lateDataTag)
  .sum(0)

// Process late data separately
val lateData = mainResult.getSideOutput(lateDataTag)
lateData.print("Late Data")

Time Characteristics and Watermarks

Time Characteristics

import org.apache.flink.streaming.api.TimeCharacteristic

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Event time processing (use timestamps in data)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// Processing time (use system time when elements arrive)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

// Ingestion time (use time when elements enter Flink)
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

Watermark Assignment

import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time

case class TimestampedEvent(data: String, eventTime: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val timestampedStream = env.fromElements(
  TimestampedEvent("event1", 1000),
  TimestampedEvent("event2", 2000),
  TimestampedEvent("event3", 1500)
)

// Assign watermarks with bounded out-of-orderness
val watermarkedStream = timestampedStream
  .assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor[TimestampedEvent](Time.seconds(5)) {
      override def extractTimestamp(element: TimestampedEvent): Long = element.eventTime
    }
  )

// Ascending timestamps (no out-of-order events)
val ascendingStream = timestampedStream
  .assignAscendingTimestamps(_.eventTime)

Window Functions

Built-in Aggregations

class WindowedStream[T, K, W <: Window] {
  def sum(position: Int): DataStream[T]
  def sum(field: String): DataStream[T]
  def min(position: Int): DataStream[T]
  def min(field: String): DataStream[T]
  def max(position: Int): DataStream[T]
  def max(field: String): DataStream[T]
  def minBy(position: Int): DataStream[T]
  def minBy(field: String): DataStream[T]
  def maxBy(position: Int): DataStream[T]
  def maxBy(field: String): DataStream[T]
}

Custom Window Functions

class WindowedStream[T, K, W <: Window] {
  def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]
  def apply[R: TypeInformation](function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R]
}

Apply custom functions to windows:

import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.util.Collector

case class WindowResult(key: String, count: Int, avg: Double, window: String)

class StatisticsWindowFunction extends WindowFunction[SensorReading, WindowResult, String, TimeWindow] {
  override def apply(
    key: String,
    window: TimeWindow,
    readings: Iterable[SensorReading],
    out: Collector[WindowResult]
  ): Unit = {
    val temperatures = readings.map(_.temperature).toList
    val count = temperatures.size
    val avg = temperatures.sum / count
    val windowInfo = s"${window.getStart}-${window.getEnd}"
    
    out.collect(WindowResult(key, count, avg, windowInfo))
  }
}

val env = StreamExecutionEnvironment.getExecutionEnvironment
val sensorData = env.fromElements(
  SensorReading("sensor1", 20.0, 1000),
  SensorReading("sensor1", 25.0, 2000)
).keyBy(_.sensorId)

// Apply custom window function
val windowResults = sensorData
  .timeWindow(Time.minutes(5))
  .apply(new StatisticsWindowFunction)

// Lambda-based window function
val lambdaResults = sensorData
  .timeWindow(Time.minutes(5))
  .apply { (key, window, readings, out) =>
    val maxTemp = readings.map(_.temperature).max
    out.collect((key, maxTemp, window.getStart))
  }

Advanced Windowing Patterns

Session Windows

import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows

case class UserActivity(userId: String, activity: String, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val userActivities = env.fromElements(
  UserActivity("user1", "login", 1000),
  UserActivity("user1", "click", 2000),
  UserActivity("user1", "scroll", 3000)
)

// Session windows with 10-minute inactivity gap
val sessionWindows = userActivities
  .assignAscendingTimestamps(_.timestamp)
  .keyBy(_.userId)
  .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
  .apply { (userId, window, activities, out) =>
    val sessionSummary = (
      userId,
      activities.size,
      activities.map(_.activity).mkString(","),
      window.getEnd - window.getStart
    )
    out.collect(sessionSummary)
  }

Custom Window Assigner

import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger

// Custom window assigner for business hours (9 AM to 5 PM)
class BusinessHoursWindowAssigner extends WindowAssigner[Object, TimeWindow] {
  override def assignWindows(
    element: Object,
    timestamp: Long,
    context: WindowAssigner.WindowAssignerContext
  ): java.util.Collection[TimeWindow] = {
    val hour = (timestamp / 3600000) % 24  // Hour of day
    
    if (hour >= 9 && hour < 17) {  // Business hours
      val startOfDay = timestamp - (timestamp % 86400000)  // Start of day
      val startOfBusinessHours = startOfDay + 9 * 3600000  // 9 AM
      val endOfBusinessHours = startOfDay + 17 * 3600000   // 5 PM
      
      java.util.Collections.singletonList(new TimeWindow(startOfBusinessHours, endOfBusinessHours))
    } else {
      java.util.Collections.emptyList()
    }
  }
  
  override def getDefaultTrigger(env: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment): org.apache.flink.streaming.api.windowing.triggers.Trigger[Object, TimeWindow] = {
    EventTimeTrigger.create()
  }
  
  override def getWindowSerializer(executionConfig: org.apache.flink.api.common.ExecutionConfig): org.apache.flink.api.common.typeutils.TypeSerializer[TimeWindow] = {
    new TimeWindow.Serializer()
  }
  
  override def isEventTime: Boolean = true
}

Complete Example: Real-Time Analytics Dashboard

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.util.Collector

case class WebEvent(
  userId: String,
  pageUrl: String,
  action: String,
  timestamp: Long,
  sessionId: String
)

case class PageViewStats(
  pageUrl: String,
  windowStart: Long,
  windowEnd: Long,
  uniqueUsers: Int,
  totalViews: Int,
  avgSessionDuration: Double
)

object RealTimeAnalytics {
  
  class PageViewAnalytics extends WindowFunction[WebEvent, PageViewStats, String, TimeWindow] {
    override def apply(
      pageUrl: String,
      window: TimeWindow,
      events: Iterable[WebEvent],
      out: Collector[PageViewStats]
    ): Unit = {
      val eventList = events.toList
      val uniqueUsers = eventList.map(_.userId).distinct.size
      val totalViews = eventList.size
      
      // Calculate average session duration (simplified)
      val sessionDurations = eventList
        .groupBy(_.sessionId)
        .mapValues(events => {
          val timestamps = events.map(_.timestamp)
          timestamps.max - timestamps.min
        })
      
      val avgSessionDuration = if (sessionDurations.nonEmpty) {
        sessionDurations.values.sum.toDouble / sessionDurations.size
      } else 0.0
      
      out.collect(PageViewStats(
        pageUrl,
        window.getStart,
        window.getEnd,
        uniqueUsers,
        totalViews,
        avgSessionDuration
      ))
    }
  }
  
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    
    // Sample web events
    val webEvents = env.fromElements(
      WebEvent("user1", "/home", "view", 1000, "session1"),
      WebEvent("user2", "/home", "view", 1500, "session2"),
      WebEvent("user1", "/products", "view", 2000, "session1"),
      WebEvent("user3", "/home", "view", 2500, "session3"),
      WebEvent("user2", "/checkout", "view", 3000, "session2")
    )
    
    // Assign watermarks
    val watermarkedEvents = webEvents
      .assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor[WebEvent](Time.seconds(10)) {
          override def extractTimestamp(event: WebEvent): Long = event.timestamp
        }
      )
    
    // Real-time page view analytics
    val pageViewAnalytics = watermarkedEvents
      .keyBy(_.pageUrl)
      .timeWindow(Time.minutes(5))  // 5-minute windows
      .apply(new PageViewAnalytics)
    
    // User activity patterns (sliding windows)
    val userActivityPatterns = watermarkedEvents
      .keyBy(_.userId)
      .timeWindow(Time.minutes(10), Time.minutes(2))  // 10-minute windows, slide every 2 minutes
      .apply { (userId, window, events, out) =>
        val actionsPerMinute = events.size.toDouble / 10.0  // Actions per minute
        val uniquePages = events.map(_.pageUrl).toSet.size
        out.collect((userId, window.getStart, actionsPerMinute, uniquePages))
      }
    
    // Global statistics (all events)
    val globalStats = watermarkedEvents
      .timeWindowAll(Time.minutes(1))  // 1-minute global windows
      .apply { (window, events, out) =>
        val totalEvents = events.size
        val uniqueUsers = events.map(_.userId).toSet.size
        val uniquePages = events.map(_.pageUrl).toSet.size
        out.collect((window.getStart, totalEvents, uniqueUsers, uniquePages))
      }
    
    // Print results
    pageViewAnalytics.print("Page View Analytics")
    userActivityPatterns.print("User Activity")
    globalStats.print("Global Stats")
    
    env.execute("Real-Time Web Analytics")
  }
}