Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.
—
WindowedStream enables bounded computations on infinite streams by grouping elements into finite windows based on time or count. This is essential for aggregations and batch-style processing on streaming data.
Configure window behavior for late data handling and triggering.
class WindowedStream[T, K, W <: Window] {
/**
* Set allowed lateness for late arriving elements
* @param lateness Maximum allowed lateness
* @return WindowedStream with lateness configuration
*/
def allowedLateness(lateness: Time): WindowedStream[T, K, W]
/**
* Send late data to a side output
* @param outputTag Tag for late data side output
* @return WindowedStream with late data handling
*/
def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W]
/**
* Set a custom trigger for window firing
* @param trigger Custom trigger implementation
* @return WindowedStream with custom trigger
*/
def trigger(trigger: Trigger[_ >: T, _ >: W]): WindowedStream[T, K, W]
/**
* Set an evictor for removing elements from windows
* @param evictor Custom evictor implementation
* @return WindowedStream with custom evictor
*/
def evictor(evictor: Evictor[_ >: T, _ >: W]): WindowedStream[T, K, W]
}Apply reduction functions to combine elements within windows.
class WindowedStream[T, K, W <: Window] {
/**
* Reduce elements in each window using a ReduceFunction
* @param function Reduce function to combine elements
* @return DataStream with reduced window results
*/
def reduce(function: ReduceFunction[T]): DataStream[T]
/**
* Reduce elements in each window using a function
* @param function Function to combine two elements
* @return DataStream with reduced window results
*/
def reduce(function: (T, T) => T): DataStream[T]
/**
* Reduce with pre-aggregation and window function
* @param preAggregator ReduceFunction for pre-aggregation
* @param function WindowFunction for final processing
* @return DataStream with window function results
*/
def reduce[R: TypeInformation](
preAggregator: ReduceFunction[T],
function: WindowFunction[T, R, K, W]
): DataStream[R]
/**
* Reduce with pre-aggregation and process window function
* @param preAggregator ReduceFunction for pre-aggregation
* @param function ProcessWindowFunction for final processing
* @return DataStream with process window function results
*/
def reduce[R: TypeInformation](
preAggregator: ReduceFunction[T],
function: ProcessWindowFunction[T, R, K, W]
): DataStream[R]
}Usage Examples:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
val readings = env.fromElements(
SensorReading("sensor1", 20.0, 1000L),
SensorReading("sensor1", 25.0, 2000L),
SensorReading("sensor1", 22.0, 3000L)
)
val keyedReadings = readings
.assignAscendingTimestamps(_.timestamp)
.keyBy(_.sensorId)
// Simple reduction - get maximum temperature per window
val maxTemps = keyedReadings
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce((r1, r2) => if (r1.temperature > r2.temperature) r1 else r2)
// Reduction with window function - add window info
val maxTempsWithWindow = keyedReadings
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce(
(r1: SensorReading, r2: SensorReading) => if (r1.temperature > r2.temperature) r1 else r2,
(key: String, window: TimeWindow, readings: Iterable[SensorReading], out: Collector[(String, Double, Long, Long)]) => {
val maxReading = readings.head
out.collect((key, maxReading.temperature, window.getStart, window.getEnd))
}
)Apply aggregate functions for more complex computations within windows.
class WindowedStream[T, K, W <: Window] {
/**
* Apply an AggregateFunction to window elements
* @param aggregateFunction Function for incremental aggregation
* @return DataStream with aggregation results
*/
def aggregate[ACC: TypeInformation, R: TypeInformation](
aggregateFunction: AggregateFunction[T, ACC, R]
): DataStream[R]
/**
* Apply aggregation with window function
* @param aggregateFunction Function for incremental aggregation
* @param windowFunction Function for final window processing
* @return DataStream with window function results
*/
def aggregate[ACC: TypeInformation, R: TypeInformation](
aggregateFunction: AggregateFunction[T, ACC, R],
windowFunction: WindowFunction[R, R, K, W]
): DataStream[R]
/**
* Apply aggregation with process window function
* @param aggregateFunction Function for incremental aggregation
* @param windowFunction ProcessWindowFunction for final processing
* @return DataStream with process window function results
*/
def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](
aggregateFunction: AggregateFunction[T, ACC, V],
windowFunction: ProcessWindowFunction[V, R, K, W]
): DataStream[R]
}Usage Examples:
import org.apache.flink.api.common.functions.AggregateFunction
// Define a custom aggregate function for average temperature
class AverageAggregateFunction extends AggregateFunction[SensorReading, (Double, Int), Double] {
override def createAccumulator(): (Double, Int) = (0.0, 0)
override def add(value: SensorReading, accumulator: (Double, Int)): (Double, Int) =
(accumulator._1 + value.temperature, accumulator._2 + 1)
override def getResult(accumulator: (Double, Int)): Double =
accumulator._1 / accumulator._2
override def merge(a: (Double, Int), b: (Double, Int)): (Double, Int) =
(a._1 + b._1, a._2 + b._2)
}
// Apply aggregate function
val avgTemps = keyedReadings
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new AverageAggregateFunction)Apply functions that operate on complete window contents.
class WindowedStream[T, K, W <: Window] {
/**
* Apply a WindowFunction to all elements in each window
* @param function WindowFunction implementation
* @return DataStream with window function results
*/
def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]
/**
* Apply a function using closure syntax
* @param function Function with (key, window, elements, collector) parameters
* @return DataStream with function results
*/
def apply[R: TypeInformation](
function: (K, W, Iterable[T], Collector[R]) => Unit
): DataStream[R]
/**
* Apply a ProcessWindowFunction for advanced processing
* @param function ProcessWindowFunction implementation
* @return DataStream with process function results
*/
def process[R: TypeInformation](
function: ProcessWindowFunction[T, R, K, W]
): DataStream[R]
}Usage Examples:
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
// Window function to collect all readings with window metadata
val allReadingsWithWindow = keyedReadings
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.apply(new WindowFunction[SensorReading, (String, List[Double], Long, Long), String, TimeWindow] {
override def apply(
key: String,
window: TimeWindow,
input: Iterable[SensorReading],
out: Collector[(String, List[Double], Long, Long)]
): Unit = {
val temperatures = input.map(_.temperature).toList
out.collect((key, temperatures, window.getStart, window.getEnd))
}
})
// Using closure syntax
val readingCounts = keyedReadings
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.apply { (key: String, window: TimeWindow, readings: Iterable[SensorReading], out: Collector[(String, Int)]) =>
out.collect((key, readings.size))
}Window assigners determine how elements are grouped into windows.
// Tumbling time windows
object TumblingEventTimeWindows {
def of(size: Time): TumblingEventTimeWindows
def of(size: Time, offset: Time): TumblingEventTimeWindows
}
object TumblingProcessingTimeWindows {
def of(size: Time): TumblingProcessingTimeWindows
def of(size: Time, offset: Time): TumblingProcessingTimeWindows
}
// Sliding time windows
object SlidingEventTimeWindows {
def of(size: Time, slide: Time): SlidingEventTimeWindows
def of(size: Time, slide: Time, offset: Time): SlidingEventTimeWindows
}
object SlidingProcessingTimeWindows {
def of(size: Time, slide: Time): SlidingProcessingTimeWindows
def of(size: Time, slide: Time, offset: Time): SlidingProcessingTimeWindows
}
// Session windows
object EventTimeSessionWindows {
def withGap(sessionTimeout: Time): EventTimeSessionWindows
def withDynamicGap(sessionWindowTimeGapExtractor: SessionWindowTimeGapExtractor[Any]): EventTimeSessionWindows
}
object ProcessingTimeSessionWindows {
def withGap(sessionTimeout: Time): ProcessingTimeSessionWindows
def withDynamicGap(sessionWindowTimeGapExtractor: SessionWindowTimeGapExtractor[Any]): ProcessingTimeSessionWindows
}
// Global windows (for count-based operations)
object GlobalWindows {
def create(): GlobalWindows
}Usage Examples:
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
// Tumbling windows - non-overlapping fixed-size windows
val tumblingWindow = keyedReadings
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
// Sliding windows - overlapping windows
val slidingWindow = keyedReadings
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(2)))
// Session windows - dynamic windows based on inactivity gaps
val sessionWindow = keyedReadings
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))// Window types
abstract class Window {
def maxTimestamp(): Long
}
class TimeWindow(start: Long, end: Long) extends Window {
def getStart: Long
def getEnd: Long
def maxTimestamp(): Long = end - 1
}
class GlobalWindow extends Window {
def maxTimestamp(): Long = Long.MaxValue
}
// Aggregate function interface
trait AggregateFunction[IN, ACC, OUT] {
def createAccumulator(): ACC
def add(value: IN, accumulator: ACC): ACC
def getResult(accumulator: ACC): OUT
def merge(a: ACC, b: ACC): ACC
}
// Window function interfaces
trait WindowFunction[IN, OUT, KEY, W <: Window] {
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
def clear(context: Context): Unit = {}
abstract class Context {
def window: W
def currentProcessingTime: Long
def currentWatermark: Long
def windowState: KeyedStateStore
def globalState: KeyedStateStore
def output[X](outputTag: OutputTag[X], value: X): Unit
}
}
// Trigger interface for custom window firing
abstract class Trigger[T, W <: Window] {
def onElement(element: T, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult
def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult
def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult
def clear(window: W, ctx: TriggerContext): Unit
}
// Evictor interface for removing elements
trait Evictor[T, W <: Window] {
def evictBefore(elements: java.lang.Iterable[TimestampedValue[T]], size: Int, window: W, evictorContext: EvictorContext): Unit
def evictAfter(elements: java.lang.Iterable[TimestampedValue[T]], size: Int, window: W, evictorContext: EvictorContext): Unit
}
// Trigger results
sealed trait TriggerResult
object TriggerResult {
case object CONTINUE extends TriggerResult
case object FIRE extends TriggerResult
case object PURGE extends TriggerResult
case object FIRE_AND_PURGE extends TriggerResult
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-12