Windowing in Flink enables processing of unbounded streams by grouping elements into finite sets called windows. Flink supports both keyed and non-keyed windowing with various window types and time semantics.
class KeyedStream[T, K] {
def timeWindow(size: Time): WindowedStream[T, K, TimeWindow]
def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow]
}Create time-based windows for keyed streams:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
val sensorData = env.fromElements(
SensorReading("sensor1", 20.0, 1000),
SensorReading("sensor1", 22.0, 2000),
SensorReading("sensor2", 18.0, 1500),
SensorReading("sensor2", 25.0, 2500)
).keyBy(_.sensorId)
// Tumbling time windows (non-overlapping)
val tumblingWindows = sensorData
.timeWindow(Time.minutes(5)) // 5-minute tumbling windows
// Sliding time windows (overlapping)
val slidingWindows = sensorData
.timeWindow(Time.minutes(10), Time.minutes(2)) // 10-minute windows, sliding every 2 minutes
// Process windows
tumblingWindows
.apply((key, window, readings, out) => {
val avgTemp = readings.map(_.temperature).sum / readings.size
out.collect((key, avgTemp, window.getStart, window.getEnd))
})
.print()class KeyedStream[T, K] {
def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]
def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow]
}Create count-based windows:
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
val env = StreamExecutionEnvironment.getExecutionEnvironment
val events = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.keyBy(identity)
// Tumbling count windows
val tumblingCountWindows = events
.countWindow(3) // Every 3 elements
// Sliding count windows
val slidingCountWindows = events
.countWindow(5, 2) // 5 elements per window, slide by 2
tumblingCountWindows
.sum(0)
.print("Tumbling Count")
slidingCountWindows
.sum(0)
.print("Sliding Count")class KeyedStream[T, K] {
def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]
}Use custom window assigners:
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
val env = StreamExecutionEnvironment.getExecutionEnvironment
val keyedStream = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)
// Session windows (windows that close after period of inactivity)
val sessionWindows = keyedStream
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
// Global windows (all elements in one window until manually triggered)
val globalWindows = keyedStream
.window(GlobalWindows.create())
// Processing time sliding windows
val processingTimeWindows = keyedStream
.window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(5)))
// Event time tumbling windows
val eventTimeWindows = keyedStream
.window(TumblingEventTimeWindows.of(Time.hours(1)))class DataStream[T] {
def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow]
def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow]
}Apply windows to entire stream (all elements):
val env = StreamExecutionEnvironment.getExecutionEnvironment
val allData = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// All elements in 5-minute tumbling windows
val allTumbling = allData
.timeWindowAll(Time.minutes(5))
// All elements in 10-minute sliding windows
val allSliding = allData
.timeWindowAll(Time.minutes(10), Time.minutes(2))
allTumbling
.sum(0)
.print("All Tumbling")class DataStream[T] {
def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow]
def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow]
}Count windows over all elements:
val allCountWindows = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.countWindowAll(4) // Every 4 elements
allCountWindows
.sum(0)
.print("All Count Windows")class DataStream[T] {
def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W]
}class WindowedStream[T, K, W <: Window] {
def trigger(trigger: Trigger[_ >: T, _ >: W]): WindowedStream[T, K, W]
}
class AllWindowedStream[T, W <: Window] {
def trigger(trigger: Trigger[_ >: T, _ >: W]): AllWindowedStream[T, W]
}Control when windows fire:
import org.apache.flink.streaming.api.windowing.triggers._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val keyedStream = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)
// Custom trigger: fire on every element or when window is full
val customTrigger = keyedStream
.timeWindow(Time.minutes(5))
.trigger(CountTrigger.of(10)) // Fire when 10 elements or window end
// Processing time trigger
val processingTrigger = keyedStream
.timeWindow(Time.minutes(5))
.trigger(ProcessingTimeTrigger.create())
// Purging trigger (removes elements after firing)
val purgingTrigger = keyedStream
.timeWindow(Time.minutes(5))
.trigger(PurgingTrigger.of(CountTrigger.of(5)))class WindowedStream[T, K, W <: Window] {
def evictor(evictor: Evictor[_ >: T, _ >: W]): WindowedStream[T, K, W]
}Remove elements from windows before or after function application:
import org.apache.flink.streaming.api.windowing.evictors._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val keyedStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).keyBy(identity)
// Count evictor: keep only the latest N elements
val countEvictor = keyedStream
.timeWindow(Time.minutes(5))
.evictor(CountEvictor.of(5)) // Keep only latest 5 elements
// Time evictor: keep only elements from last N time units
val timeEvictor = keyedStream
.timeWindow(Time.minutes(5))
.evictor(TimeEvictor.of(Time.minutes(2))) // Keep only last 2 minutes
// Delta evictor: keep elements within threshold
val deltaEvictor = keyedStream
.timeWindow(Time.minutes(5))
.evictor(DeltaEvictor.of(5.0, (a: Int, b: Int) => Math.abs(a - b).toDouble))class WindowedStream[T, K, W <: Window] {
def allowedLateness(lateness: Time): WindowedStream[T, K, W]
}Handle late-arriving data:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val lateDataStream = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)
// Allow 1 minute of lateness
val windowWithLateness = lateDataStream
.timeWindow(Time.minutes(5))
.allowedLateness(Time.minutes(1))
.sum(0)class WindowedStream[T, K, W <: Window] {
def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W]
}Route late data to side output:
import org.apache.flink.streaming.api.scala.OutputTag
val env = StreamExecutionEnvironment.getExecutionEnvironment
val lateDataTag = OutputTag[Int]("late-data")
val mainResult = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)
.timeWindow(Time.minutes(5))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateDataTag)
.sum(0)
// Process late data separately
val lateData = mainResult.getSideOutput(lateDataTag)
lateData.print("Late Data")import org.apache.flink.streaming.api.TimeCharacteristic
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Event time processing (use timestamps in data)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Processing time (use system time when elements arrive)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// Ingestion time (use time when elements enter Flink)
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
case class TimestampedEvent(data: String, eventTime: Long)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val timestampedStream = env.fromElements(
TimestampedEvent("event1", 1000),
TimestampedEvent("event2", 2000),
TimestampedEvent("event3", 1500)
)
// Assign watermarks with bounded out-of-orderness
val watermarkedStream = timestampedStream
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[TimestampedEvent](Time.seconds(5)) {
override def extractTimestamp(element: TimestampedEvent): Long = element.eventTime
}
)
// Ascending timestamps (no out-of-order events)
val ascendingStream = timestampedStream
.assignAscendingTimestamps(_.eventTime)class WindowedStream[T, K, W <: Window] {
def sum(position: Int): DataStream[T]
def sum(field: String): DataStream[T]
def min(position: Int): DataStream[T]
def min(field: String): DataStream[T]
def max(position: Int): DataStream[T]
def max(field: String): DataStream[T]
def minBy(position: Int): DataStream[T]
def minBy(field: String): DataStream[T]
def maxBy(position: Int): DataStream[T]
def maxBy(field: String): DataStream[T]
}class WindowedStream[T, K, W <: Window] {
def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]
def apply[R: TypeInformation](function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R]
}Apply custom functions to windows:
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.util.Collector
case class WindowResult(key: String, count: Int, avg: Double, window: String)
class StatisticsWindowFunction extends WindowFunction[SensorReading, WindowResult, String, TimeWindow] {
override def apply(
key: String,
window: TimeWindow,
readings: Iterable[SensorReading],
out: Collector[WindowResult]
): Unit = {
val temperatures = readings.map(_.temperature).toList
val count = temperatures.size
val avg = temperatures.sum / count
val windowInfo = s"${window.getStart}-${window.getEnd}"
out.collect(WindowResult(key, count, avg, windowInfo))
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val sensorData = env.fromElements(
SensorReading("sensor1", 20.0, 1000),
SensorReading("sensor1", 25.0, 2000)
).keyBy(_.sensorId)
// Apply custom window function
val windowResults = sensorData
.timeWindow(Time.minutes(5))
.apply(new StatisticsWindowFunction)
// Lambda-based window function
val lambdaResults = sensorData
.timeWindow(Time.minutes(5))
.apply { (key, window, readings, out) =>
val maxTemp = readings.map(_.temperature).max
out.collect((key, maxTemp, window.getStart))
}import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
case class UserActivity(userId: String, activity: String, timestamp: Long)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val userActivities = env.fromElements(
UserActivity("user1", "login", 1000),
UserActivity("user1", "click", 2000),
UserActivity("user1", "scroll", 3000)
)
// Session windows with 10-minute inactivity gap
val sessionWindows = userActivities
.assignAscendingTimestamps(_.timestamp)
.keyBy(_.userId)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.apply { (userId, window, activities, out) =>
val sessionSummary = (
userId,
activities.size,
activities.map(_.activity).mkString(","),
window.getEnd - window.getStart
)
out.collect(sessionSummary)
}import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
// Custom window assigner for business hours (9 AM to 5 PM)
class BusinessHoursWindowAssigner extends WindowAssigner[Object, TimeWindow] {
override def assignWindows(
element: Object,
timestamp: Long,
context: WindowAssigner.WindowAssignerContext
): java.util.Collection[TimeWindow] = {
val hour = (timestamp / 3600000) % 24 // Hour of day
if (hour >= 9 && hour < 17) { // Business hours
val startOfDay = timestamp - (timestamp % 86400000) // Start of day
val startOfBusinessHours = startOfDay + 9 * 3600000 // 9 AM
val endOfBusinessHours = startOfDay + 17 * 3600000 // 5 PM
java.util.Collections.singletonList(new TimeWindow(startOfBusinessHours, endOfBusinessHours))
} else {
java.util.Collections.emptyList()
}
}
override def getDefaultTrigger(env: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment): org.apache.flink.streaming.api.windowing.triggers.Trigger[Object, TimeWindow] = {
EventTimeTrigger.create()
}
override def getWindowSerializer(executionConfig: org.apache.flink.api.common.ExecutionConfig): org.apache.flink.api.common.typeutils.TypeSerializer[TimeWindow] = {
new TimeWindow.Serializer()
}
override def isEventTime: Boolean = true
}import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.util.Collector
case class WebEvent(
userId: String,
pageUrl: String,
action: String,
timestamp: Long,
sessionId: String
)
case class PageViewStats(
pageUrl: String,
windowStart: Long,
windowEnd: Long,
uniqueUsers: Int,
totalViews: Int,
avgSessionDuration: Double
)
object RealTimeAnalytics {
class PageViewAnalytics extends WindowFunction[WebEvent, PageViewStats, String, TimeWindow] {
override def apply(
pageUrl: String,
window: TimeWindow,
events: Iterable[WebEvent],
out: Collector[PageViewStats]
): Unit = {
val eventList = events.toList
val uniqueUsers = eventList.map(_.userId).distinct.size
val totalViews = eventList.size
// Calculate average session duration (simplified)
val sessionDurations = eventList
.groupBy(_.sessionId)
.mapValues(events => {
val timestamps = events.map(_.timestamp)
timestamps.max - timestamps.min
})
val avgSessionDuration = if (sessionDurations.nonEmpty) {
sessionDurations.values.sum.toDouble / sessionDurations.size
} else 0.0
out.collect(PageViewStats(
pageUrl,
window.getStart,
window.getEnd,
uniqueUsers,
totalViews,
avgSessionDuration
))
}
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// Sample web events
val webEvents = env.fromElements(
WebEvent("user1", "/home", "view", 1000, "session1"),
WebEvent("user2", "/home", "view", 1500, "session2"),
WebEvent("user1", "/products", "view", 2000, "session1"),
WebEvent("user3", "/home", "view", 2500, "session3"),
WebEvent("user2", "/checkout", "view", 3000, "session2")
)
// Assign watermarks
val watermarkedEvents = webEvents
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[WebEvent](Time.seconds(10)) {
override def extractTimestamp(event: WebEvent): Long = event.timestamp
}
)
// Real-time page view analytics
val pageViewAnalytics = watermarkedEvents
.keyBy(_.pageUrl)
.timeWindow(Time.minutes(5)) // 5-minute windows
.apply(new PageViewAnalytics)
// User activity patterns (sliding windows)
val userActivityPatterns = watermarkedEvents
.keyBy(_.userId)
.timeWindow(Time.minutes(10), Time.minutes(2)) // 10-minute windows, slide every 2 minutes
.apply { (userId, window, events, out) =>
val actionsPerMinute = events.size.toDouble / 10.0 // Actions per minute
val uniquePages = events.map(_.pageUrl).toSet.size
out.collect((userId, window.getStart, actionsPerMinute, uniquePages))
}
// Global statistics (all events)
val globalStats = watermarkedEvents
.timeWindowAll(Time.minutes(1)) // 1-minute global windows
.apply { (window, events, out) =>
val totalEvents = events.size
val uniqueUsers = events.map(_.userId).toSet.size
val uniquePages = events.map(_.pageUrl).toSet.size
out.collect((window.getStart, totalEvents, uniqueUsers, uniquePages))
}
// Print results
pageViewAnalytics.print("Page View Analytics")
userActivityPatterns.print("User Activity")
globalStats.print("Global Stats")
env.execute("Real-Time Web Analytics")
}
}