Apache Flink Scala API for DataStream processing with type-safe, functional programming constructs for building streaming data processing applications.
Windowing in Flink enables processing of unbounded streams by grouping elements into finite sets called windows. Flink supports both keyed and non-keyed windowing with various window types and time semantics.
class KeyedStream[T, K] {
def timeWindow(size: Time): WindowedStream[T, K, TimeWindow]
def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow]
}Create time-based windows for keyed streams:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
val sensorData = env.fromElements(
SensorReading("sensor1", 20.0, 1000),
SensorReading("sensor1", 22.0, 2000),
SensorReading("sensor2", 18.0, 1500),
SensorReading("sensor2", 25.0, 2500)
).keyBy(_.sensorId)
// Tumbling time windows (non-overlapping)
val tumblingWindows = sensorData
.timeWindow(Time.minutes(5)) // 5-minute tumbling windows
// Sliding time windows (overlapping)
val slidingWindows = sensorData
.timeWindow(Time.minutes(10), Time.minutes(2)) // 10-minute windows, sliding every 2 minutes
// Process windows
tumblingWindows
.apply((key, window, readings, out) => {
val avgTemp = readings.map(_.temperature).sum / readings.size
out.collect((key, avgTemp, window.getStart, window.getEnd))
})
.print()class KeyedStream[T, K] {
def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]
def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow]
}Create count-based windows:
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
val env = StreamExecutionEnvironment.getExecutionEnvironment
val events = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.keyBy(identity)
// Tumbling count windows
val tumblingCountWindows = events
.countWindow(3) // Every 3 elements
// Sliding count windows
val slidingCountWindows = events
.countWindow(5, 2) // 5 elements per window, slide by 2
tumblingCountWindows
.sum(0)
.print("Tumbling Count")
slidingCountWindows
.sum(0)
.print("Sliding Count")class KeyedStream[T, K] {
def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]
}Use custom window assigners:
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
val env = StreamExecutionEnvironment.getExecutionEnvironment
val keyedStream = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)
// Session windows (windows that close after period of inactivity)
val sessionWindows = keyedStream
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
// Global windows (all elements in one window until manually triggered)
val globalWindows = keyedStream
.window(GlobalWindows.create())
// Processing time sliding windows
val processingTimeWindows = keyedStream
.window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(5)))
// Event time tumbling windows
val eventTimeWindows = keyedStream
.window(TumblingEventTimeWindows.of(Time.hours(1)))class DataStream[T] {
def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow]
def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow]
}Apply windows to entire stream (all elements):
val env = StreamExecutionEnvironment.getExecutionEnvironment
val allData = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// All elements in 5-minute tumbling windows
val allTumbling = allData
.timeWindowAll(Time.minutes(5))
// All elements in 10-minute sliding windows
val allSliding = allData
.timeWindowAll(Time.minutes(10), Time.minutes(2))
allTumbling
.sum(0)
.print("All Tumbling")class DataStream[T] {
def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow]
def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow]
}Count windows over all elements:
val allCountWindows = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.countWindowAll(4) // Every 4 elements
allCountWindows
.sum(0)
.print("All Count Windows")class DataStream[T] {
def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W]
}class WindowedStream[T, K, W <: Window] {
def trigger(trigger: Trigger[_ >: T, _ >: W]): WindowedStream[T, K, W]
}
class AllWindowedStream[T, W <: Window] {
def trigger(trigger: Trigger[_ >: T, _ >: W]): AllWindowedStream[T, W]
}Control when windows fire:
import org.apache.flink.streaming.api.windowing.triggers._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val keyedStream = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)
// Custom trigger: fire on every element or when window is full
val customTrigger = keyedStream
.timeWindow(Time.minutes(5))
.trigger(CountTrigger.of(10)) // Fire when 10 elements or window end
// Processing time trigger
val processingTrigger = keyedStream
.timeWindow(Time.minutes(5))
.trigger(ProcessingTimeTrigger.create())
// Purging trigger (removes elements after firing)
val purgingTrigger = keyedStream
.timeWindow(Time.minutes(5))
.trigger(PurgingTrigger.of(CountTrigger.of(5)))class WindowedStream[T, K, W <: Window] {
def evictor(evictor: Evictor[_ >: T, _ >: W]): WindowedStream[T, K, W]
}Remove elements from windows before or after function application:
import org.apache.flink.streaming.api.windowing.evictors._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val keyedStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).keyBy(identity)
// Count evictor: keep only the latest N elements
val countEvictor = keyedStream
.timeWindow(Time.minutes(5))
.evictor(CountEvictor.of(5)) // Keep only latest 5 elements
// Time evictor: keep only elements from last N time units
val timeEvictor = keyedStream
.timeWindow(Time.minutes(5))
.evictor(TimeEvictor.of(Time.minutes(2))) // Keep only last 2 minutes
// Delta evictor: keep elements within threshold
val deltaEvictor = keyedStream
.timeWindow(Time.minutes(5))
.evictor(DeltaEvictor.of(5.0, (a: Int, b: Int) => Math.abs(a - b).toDouble))class WindowedStream[T, K, W <: Window] {
def allowedLateness(lateness: Time): WindowedStream[T, K, W]
}Handle late-arriving data:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val lateDataStream = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)
// Allow 1 minute of lateness
val windowWithLateness = lateDataStream
.timeWindow(Time.minutes(5))
.allowedLateness(Time.minutes(1))
.sum(0)class WindowedStream[T, K, W <: Window] {
def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W]
}Route late data to side output:
import org.apache.flink.streaming.api.scala.OutputTag
val env = StreamExecutionEnvironment.getExecutionEnvironment
val lateDataTag = OutputTag[Int]("late-data")
val mainResult = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)
.timeWindow(Time.minutes(5))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateDataTag)
.sum(0)
// Process late data separately
val lateData = mainResult.getSideOutput(lateDataTag)
lateData.print("Late Data")import org.apache.flink.streaming.api.TimeCharacteristic
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Event time processing (use timestamps in data)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Processing time (use system time when elements arrive)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// Ingestion time (use time when elements enter Flink)
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
case class TimestampedEvent(data: String, eventTime: Long)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val timestampedStream = env.fromElements(
TimestampedEvent("event1", 1000),
TimestampedEvent("event2", 2000),
TimestampedEvent("event3", 1500)
)
// Assign watermarks with bounded out-of-orderness
val watermarkedStream = timestampedStream
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[TimestampedEvent](Time.seconds(5)) {
override def extractTimestamp(element: TimestampedEvent): Long = element.eventTime
}
)
// Ascending timestamps (no out-of-order events)
val ascendingStream = timestampedStream
.assignAscendingTimestamps(_.eventTime)class WindowedStream[T, K, W <: Window] {
def sum(position: Int): DataStream[T]
def sum(field: String): DataStream[T]
def min(position: Int): DataStream[T]
def min(field: String): DataStream[T]
def max(position: Int): DataStream[T]
def max(field: String): DataStream[T]
def minBy(position: Int): DataStream[T]
def minBy(field: String): DataStream[T]
def maxBy(position: Int): DataStream[T]
def maxBy(field: String): DataStream[T]
}class WindowedStream[T, K, W <: Window] {
def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]
def apply[R: TypeInformation](function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R]
}Apply custom functions to windows:
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.util.Collector
case class WindowResult(key: String, count: Int, avg: Double, window: String)
class StatisticsWindowFunction extends WindowFunction[SensorReading, WindowResult, String, TimeWindow] {
override def apply(
key: String,
window: TimeWindow,
readings: Iterable[SensorReading],
out: Collector[WindowResult]
): Unit = {
val temperatures = readings.map(_.temperature).toList
val count = temperatures.size
val avg = temperatures.sum / count
val windowInfo = s"${window.getStart}-${window.getEnd}"
out.collect(WindowResult(key, count, avg, windowInfo))
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val sensorData = env.fromElements(
SensorReading("sensor1", 20.0, 1000),
SensorReading("sensor1", 25.0, 2000)
).keyBy(_.sensorId)
// Apply custom window function
val windowResults = sensorData
.timeWindow(Time.minutes(5))
.apply(new StatisticsWindowFunction)
// Lambda-based window function
val lambdaResults = sensorData
.timeWindow(Time.minutes(5))
.apply { (key, window, readings, out) =>
val maxTemp = readings.map(_.temperature).max
out.collect((key, maxTemp, window.getStart))
}import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
case class UserActivity(userId: String, activity: String, timestamp: Long)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val userActivities = env.fromElements(
UserActivity("user1", "login", 1000),
UserActivity("user1", "click", 2000),
UserActivity("user1", "scroll", 3000)
)
// Session windows with 10-minute inactivity gap
val sessionWindows = userActivities
.assignAscendingTimestamps(_.timestamp)
.keyBy(_.userId)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.apply { (userId, window, activities, out) =>
val sessionSummary = (
userId,
activities.size,
activities.map(_.activity).mkString(","),
window.getEnd - window.getStart
)
out.collect(sessionSummary)
}import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
// Custom window assigner for business hours (9 AM to 5 PM)
class BusinessHoursWindowAssigner extends WindowAssigner[Object, TimeWindow] {
override def assignWindows(
element: Object,
timestamp: Long,
context: WindowAssigner.WindowAssignerContext
): java.util.Collection[TimeWindow] = {
val hour = (timestamp / 3600000) % 24 // Hour of day
if (hour >= 9 && hour < 17) { // Business hours
val startOfDay = timestamp - (timestamp % 86400000) // Start of day
val startOfBusinessHours = startOfDay + 9 * 3600000 // 9 AM
val endOfBusinessHours = startOfDay + 17 * 3600000 // 5 PM
java.util.Collections.singletonList(new TimeWindow(startOfBusinessHours, endOfBusinessHours))
} else {
java.util.Collections.emptyList()
}
}
override def getDefaultTrigger(env: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment): org.apache.flink.streaming.api.windowing.triggers.Trigger[Object, TimeWindow] = {
EventTimeTrigger.create()
}
override def getWindowSerializer(executionConfig: org.apache.flink.api.common.ExecutionConfig): org.apache.flink.api.common.typeutils.TypeSerializer[TimeWindow] = {
new TimeWindow.Serializer()
}
override def isEventTime: Boolean = true
}import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.util.Collector
case class WebEvent(
userId: String,
pageUrl: String,
action: String,
timestamp: Long,
sessionId: String
)
case class PageViewStats(
pageUrl: String,
windowStart: Long,
windowEnd: Long,
uniqueUsers: Int,
totalViews: Int,
avgSessionDuration: Double
)
object RealTimeAnalytics {
class PageViewAnalytics extends WindowFunction[WebEvent, PageViewStats, String, TimeWindow] {
override def apply(
pageUrl: String,
window: TimeWindow,
events: Iterable[WebEvent],
out: Collector[PageViewStats]
): Unit = {
val eventList = events.toList
val uniqueUsers = eventList.map(_.userId).distinct.size
val totalViews = eventList.size
// Calculate average session duration (simplified)
val sessionDurations = eventList
.groupBy(_.sessionId)
.mapValues(events => {
val timestamps = events.map(_.timestamp)
timestamps.max - timestamps.min
})
val avgSessionDuration = if (sessionDurations.nonEmpty) {
sessionDurations.values.sum.toDouble / sessionDurations.size
} else 0.0
out.collect(PageViewStats(
pageUrl,
window.getStart,
window.getEnd,
uniqueUsers,
totalViews,
avgSessionDuration
))
}
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// Sample web events
val webEvents = env.fromElements(
WebEvent("user1", "/home", "view", 1000, "session1"),
WebEvent("user2", "/home", "view", 1500, "session2"),
WebEvent("user1", "/products", "view", 2000, "session1"),
WebEvent("user3", "/home", "view", 2500, "session3"),
WebEvent("user2", "/checkout", "view", 3000, "session2")
)
// Assign watermarks
val watermarkedEvents = webEvents
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[WebEvent](Time.seconds(10)) {
override def extractTimestamp(event: WebEvent): Long = event.timestamp
}
)
// Real-time page view analytics
val pageViewAnalytics = watermarkedEvents
.keyBy(_.pageUrl)
.timeWindow(Time.minutes(5)) // 5-minute windows
.apply(new PageViewAnalytics)
// User activity patterns (sliding windows)
val userActivityPatterns = watermarkedEvents
.keyBy(_.userId)
.timeWindow(Time.minutes(10), Time.minutes(2)) // 10-minute windows, slide every 2 minutes
.apply { (userId, window, events, out) =>
val actionsPerMinute = events.size.toDouble / 10.0 // Actions per minute
val uniquePages = events.map(_.pageUrl).toSet.size
out.collect((userId, window.getStart, actionsPerMinute, uniquePages))
}
// Global statistics (all events)
val globalStats = watermarkedEvents
.timeWindowAll(Time.minutes(1)) // 1-minute global windows
.apply { (window, events, out) =>
val totalEvents = events.size
val uniqueUsers = events.map(_.userId).toSet.size
val uniquePages = events.map(_.pageUrl).toSet.size
out.collect((window.getStart, totalEvents, uniqueUsers, uniquePages))
}
// Print results
pageViewAnalytics.print("Page View Analytics")
userActivityPatterns.print("User Activity")
globalStats.print("Global Stats")
env.execute("Real-Time Web Analytics")
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-10