Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.
—
Specialized functions for processing windowed data with access to window metadata, state, and complete window contents. Essential for complex windowed computations and analytics.
Basic window function interface for processing all elements in a window.
trait WindowFunction[IN, OUT, KEY, W <: Window] {
/**
* Process all elements in a window
* @param key The key of the window
* @param window The window metadata
* @param input All elements in the window
* @param out Collector for emitting results
*/
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}Advanced window function with access to context and state.
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
/**
* Process all elements in a window with access to context
* @param key The key of the window
* @param context Context providing window info and state access
* @param elements All elements in the window
* @param out Collector for emitting results
*/
def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
/**
* Clear any per-window state (optional override)
* @param context Context for accessing state
*/
def clear(context: Context): Unit = {}
abstract class Context {
def window: W
def currentProcessingTime: Long
def currentWatermark: Long
def windowState: KeyedStateStore
def globalState: KeyedStateStore
def output[X](outputTag: OutputTag[X], value: X): Unit
}
}Usage Examples:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{WindowFunction, ProcessWindowFunction}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
case class WindowStats(sensorId: String, windowStart: Long, windowEnd: Long,
count: Int, avgTemp: Double, minTemp: Double, maxTemp: Double)
// WindowFunction example - calculate window statistics
class SensorStatsWindowFunction extends WindowFunction[SensorReading, WindowStats, String, TimeWindow] {
override def apply(
key: String,
window: TimeWindow,
input: Iterable[SensorReading],
out: Collector[WindowStats]
): Unit = {
val readings = input.toList
val count = readings.size
val temperatures = readings.map(_.temperature)
val avgTemp = temperatures.sum / count
val minTemp = temperatures.min
val maxTemp = temperatures.max
out.collect(WindowStats(
key, window.getStart, window.getEnd,
count, avgTemp, minTemp, maxTemp
))
}
}
// ProcessWindowFunction example - with state and side outputs
class AdvancedSensorStatsFunction extends ProcessWindowFunction[SensorReading, WindowStats, String, TimeWindow] {
override def process(
key: String,
context: Context,
elements: Iterable[SensorReading],
out: Collector[WindowStats]
): Unit = {
val readings = elements.toList
val count = readings.size
val temperatures = readings.map(_.temperature)
val avgTemp = temperatures.sum / count
val minTemp = temperatures.min
val maxTemp = temperatures.max
// Emit main result
out.collect(WindowStats(
key, context.window.getStart, context.window.getEnd,
count, avgTemp, minTemp, maxTemp
))
// Emit to side output if temperature is too high
if (maxTemp > 80.0) {
context.output(
OutputTag[String]("high-temp-alerts"),
s"Sensor $key exceeded 80°C in window ${context.window.getStart}-${context.window.getEnd}"
)
}
// Update global state (count of processed windows)
val globalCounter = context.globalState.getState(
new ValueStateDescriptor[Long]("window-count", classOf[Long])
)
val currentCount = Option(globalCounter.value()).getOrElse(0L)
globalCounter.update(currentCount + 1)
}
}
// Apply window functions
val readings = env.fromElements(
SensorReading("sensor1", 20.0, 1000L),
SensorReading("sensor1", 25.0, 2000L),
SensorReading("sensor1", 30.0, 3000L)
).assignAscendingTimestamps(_.timestamp)
val keyedReadings = readings.keyBy(_.sensorId)
// Using WindowFunction
val windowStats = keyedReadings
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new SensorStatsWindowFunction)
// Using ProcessWindowFunction
val advancedStats = keyedReadings
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new AdvancedSensorStatsFunction)
// Get side output
val highTempAlerts = advancedStats.getSideOutput(OutputTag[String]("high-temp-alerts"))Window function for non-keyed streams (all-window operations).
trait AllWindowFunction[IN, OUT, W <: Window] {
/**
* Process all elements in a window (non-keyed)
* @param window The window metadata
* @param input All elements in the window
* @param out Collector for emitting results
*/
def apply(window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}Advanced all-window function with context access.
abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] {
/**
* Process all elements in a window with context (non-keyed)
* @param context Context providing window info and state access
* @param elements All elements in the window
* @param out Collector for emitting results
*/
def process(context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
/**
* Clear any per-window state (optional override)
* @param context Context for accessing state
*/
def clear(context: Context): Unit = {}
abstract class Context {
def window: W
def currentProcessingTime: Long
def currentWatermark: Long
def windowState: KeyedStateStore
def globalState: KeyedStateStore
def output[X](outputTag: OutputTag[X], value: X): Unit
}
}Usage Examples:
import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction}
case class GlobalStats(windowStart: Long, windowEnd: Long, totalCount: Int, avgValue: Double)
// AllWindowFunction example - global statistics
class GlobalStatsFunction extends AllWindowFunction[SensorReading, GlobalStats, TimeWindow] {
override def apply(
window: TimeWindow,
input: Iterable[SensorReading],
out: Collector[GlobalStats]
): Unit = {
val readings = input.toList
val count = readings.size
val avgTemp = readings.map(_.temperature).sum / count
out.collect(GlobalStats(window.getStart, window.getEnd, count, avgTemp))
}
}
// ProcessAllWindowFunction example - with side outputs
class AdvancedGlobalStatsFunction extends ProcessAllWindowFunction[SensorReading, GlobalStats, TimeWindow] {
override def process(
context: Context,
elements: Iterable[SensorReading],
out: Collector[GlobalStats]
): Unit = {
val readings = elements.toList
val count = readings.size
val avgTemp = readings.map(_.temperature).sum / count
out.collect(GlobalStats(context.window.getStart, context.window.getEnd, count, avgTemp))
// Side output for anomalies
if (avgTemp > 50.0) {
context.output(
OutputTag[String]("global-anomalies"),
s"Global average temperature ${avgTemp}°C is above threshold"
)
}
}
}
// Apply all-window functions
val globalStats = readings
.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
.apply(new GlobalStatsFunction)
val advancedGlobalStats = readings
.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new AdvancedGlobalStatsFunction)Rich versions of window functions with lifecycle methods and runtime context access.
abstract class RichWindowFunction[IN, OUT, KEY, W <: Window]
extends WindowFunction[IN, OUT, KEY, W] with RichFunction {
override def open(parameters: Configuration): Unit = {}
override def close(): Unit = {}
def getRuntimeContext: RuntimeContext
def setRuntimeContext(t: RuntimeContext): Unit
}
abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window]
extends ProcessWindowFunction[IN, OUT, KEY, W] with RichFunction {
override def open(parameters: Configuration): Unit = {}
override def close(): Unit = {}
def getRuntimeContext: RuntimeContext
def setRuntimeContext(t: RuntimeContext): Unit
}
abstract class RichAllWindowFunction[IN, OUT, W <: Window]
extends AllWindowFunction[IN, OUT, W] with RichFunction {
override def open(parameters: Configuration): Unit = {}
override def close(): Unit = {}
def getRuntimeContext: RuntimeContext
def setRuntimeContext(t: RuntimeContext): Unit
}
abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window]
extends ProcessAllWindowFunction[IN, OUT, W] with RichFunction {
override def open(parameters: Configuration): Unit = {}
override def close(): Unit = {}
def getRuntimeContext: RuntimeContext
def setRuntimeContext(t: RuntimeContext): Unit
}Usage Examples:
import org.apache.flink.configuration.Configuration
class MetricsWindowFunction extends RichWindowFunction[SensorReading, WindowStats, String, TimeWindow] {
private var processedWindows: Counter = _
override def open(parameters: Configuration): Unit = {
processedWindows = getRuntimeContext
.getMetricGroup
.counter("processed-windows")
}
override def apply(
key: String,
window: TimeWindow,
input: Iterable[SensorReading],
out: Collector[WindowStats]
): Unit = {
// Increment metric
processedWindows.inc()
// Process window as usual
val readings = input.toList
val count = readings.size
val avgTemp = readings.map(_.temperature).sum / count
out.collect(WindowStats(key, window.getStart, window.getEnd, count, avgTemp, 0.0, 0.0))
}
}// Window types
abstract class Window {
def maxTimestamp(): Long
}
class TimeWindow(start: Long, end: Long) extends Window {
def getStart: Long = start
def getEnd: Long = end
def maxTimestamp(): Long = end - 1
}
class GlobalWindow extends Window {
def maxTimestamp(): Long = Long.MaxValue
}
// State store for window functions
trait KeyedStateStore {
def getState[T](stateDescriptor: StateDescriptor[T, _]): State
def getListState[T](stateDescriptor: ListStateDescriptor[T]): ListState[T]
def getReducingState[T](stateDescriptor: ReducingStateDescriptor[T]): ReducingState[T]
def getAggregatingState[IN, ACC, OUT](stateDescriptor: AggregatingStateDescriptor[IN, ACC, OUT]): AggregatingState[IN, OUT]
def getMapState[UK, UV](stateDescriptor: MapStateDescriptor[UK, UV]): MapState[UK, UV]
}
// State descriptors
class ValueStateDescriptor[T](name: String, typeClass: Class[T])
class ListStateDescriptor[T](name: String, typeClass: Class[T])
class ReducingStateDescriptor[T](name: String, reduceFunction: ReduceFunction[T], typeClass: Class[T])
class MapStateDescriptor[K, V](name: String, keyClass: Class[K], valueClass: Class[V])
// Collector interface
trait Collector[T] {
def collect(record: T): Unit
def close(): Unit
}
// Output tag for side outputs
case class OutputTag[T: TypeInformation](id: String) {
def getTypeInfo: TypeInformation[T]
}
// Rich function base
trait RichFunction {
def open(parameters: Configuration): Unit
def close(): Unit
def getRuntimeContext: RuntimeContext
def setRuntimeContext(t: RuntimeContext): Unit
}
// Runtime context
trait RuntimeContext {
def getTaskName: String
def getMetricGroup: MetricGroup
def getNumberOfParallelSubtasks: Int
def getIndexOfThisSubtask: Int
}
// Metrics
trait Counter {
def inc(): Unit
def inc(n: Long): Unit
def getCount: Long
}
trait MetricGroup {
def counter(name: String): Counter
def gauge[T](name: String, gauge: Gauge[T]): Gauge[T]
def histogram(name: String, histogram: Histogram): Histogram
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-12