CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-scala-2-12

Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.

Pending
Overview
Eval results
Files

window-functions.mddocs/

Window Functions

Specialized functions for processing windowed data with access to window metadata, state, and complete window contents. Essential for complex windowed computations and analytics.

Capabilities

WindowFunction

Basic window function interface for processing all elements in a window.

trait WindowFunction[IN, OUT, KEY, W <: Window] {
  /**
   * Process all elements in a window
   * @param key The key of the window
   * @param window The window metadata
   * @param input All elements in the window
   * @param out Collector for emitting results
   */
  def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}

ProcessWindowFunction

Advanced window function with access to context and state.

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
  /**
   * Process all elements in a window with access to context
   * @param key The key of the window
   * @param context Context providing window info and state access
   * @param elements All elements in the window
   * @param out Collector for emitting results
   */
  def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
  
  /**
   * Clear any per-window state (optional override)
   * @param context Context for accessing state
   */
  def clear(context: Context): Unit = {}
  
  abstract class Context {
    def window: W
    def currentProcessingTime: Long
    def currentWatermark: Long  
    def windowState: KeyedStateStore
    def globalState: KeyedStateStore
    def output[X](outputTag: OutputTag[X], value: X): Unit
  }
}

Usage Examples:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{WindowFunction, ProcessWindowFunction}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
case class WindowStats(sensorId: String, windowStart: Long, windowEnd: Long, 
                      count: Int, avgTemp: Double, minTemp: Double, maxTemp: Double)

// WindowFunction example - calculate window statistics
class SensorStatsWindowFunction extends WindowFunction[SensorReading, WindowStats, String, TimeWindow] {
  override def apply(
    key: String,
    window: TimeWindow,
    input: Iterable[SensorReading],
    out: Collector[WindowStats]
  ): Unit = {
    val readings = input.toList
    val count = readings.size
    val temperatures = readings.map(_.temperature)
    val avgTemp = temperatures.sum / count
    val minTemp = temperatures.min
    val maxTemp = temperatures.max
    
    out.collect(WindowStats(
      key, window.getStart, window.getEnd, 
      count, avgTemp, minTemp, maxTemp
    ))
  }
}

// ProcessWindowFunction example - with state and side outputs
class AdvancedSensorStatsFunction extends ProcessWindowFunction[SensorReading, WindowStats, String, TimeWindow] {
  
  override def process(
    key: String,
    context: Context,
    elements: Iterable[SensorReading],
    out: Collector[WindowStats]
  ): Unit = {
    val readings = elements.toList
    val count = readings.size
    val temperatures = readings.map(_.temperature)
    val avgTemp = temperatures.sum / count
    val minTemp = temperatures.min
    val maxTemp = temperatures.max
    
    // Emit main result
    out.collect(WindowStats(
      key, context.window.getStart, context.window.getEnd,
      count, avgTemp, minTemp, maxTemp
    ))
    
    // Emit to side output if temperature is too high
    if (maxTemp > 80.0) {
      context.output(
        OutputTag[String]("high-temp-alerts"),
        s"Sensor $key exceeded 80°C in window ${context.window.getStart}-${context.window.getEnd}"
      )
    }
    
    // Update global state (count of processed windows)
    val globalCounter = context.globalState.getState(
      new ValueStateDescriptor[Long]("window-count", classOf[Long])
    )
    val currentCount = Option(globalCounter.value()).getOrElse(0L)
    globalCounter.update(currentCount + 1)
  }
}

// Apply window functions
val readings = env.fromElements(
  SensorReading("sensor1", 20.0, 1000L),
  SensorReading("sensor1", 25.0, 2000L),
  SensorReading("sensor1", 30.0, 3000L)
).assignAscendingTimestamps(_.timestamp)

val keyedReadings = readings.keyBy(_.sensorId)

// Using WindowFunction
val windowStats = keyedReadings
  .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  .apply(new SensorStatsWindowFunction)

// Using ProcessWindowFunction
val advancedStats = keyedReadings
  .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  .process(new AdvancedSensorStatsFunction)

// Get side output
val highTempAlerts = advancedStats.getSideOutput(OutputTag[String]("high-temp-alerts"))

AllWindowFunction

Window function for non-keyed streams (all-window operations).

trait AllWindowFunction[IN, OUT, W <: Window] {
  /**
   * Process all elements in a window (non-keyed)
   * @param window The window metadata
   * @param input All elements in the window
   * @param out Collector for emitting results
   */
  def apply(window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}

ProcessAllWindowFunction

Advanced all-window function with context access.

abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] {
  /**
   * Process all elements in a window with context (non-keyed)
   * @param context Context providing window info and state access
   * @param elements All elements in the window
   * @param out Collector for emitting results
   */
  def process(context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
  
  /**
   * Clear any per-window state (optional override)
   * @param context Context for accessing state
   */
  def clear(context: Context): Unit = {}
  
  abstract class Context {
    def window: W
    def currentProcessingTime: Long
    def currentWatermark: Long
    def windowState: KeyedStateStore
    def globalState: KeyedStateStore
    def output[X](outputTag: OutputTag[X], value: X): Unit
  }
}

Usage Examples:

import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction}

case class GlobalStats(windowStart: Long, windowEnd: Long, totalCount: Int, avgValue: Double)

// AllWindowFunction example - global statistics
class GlobalStatsFunction extends AllWindowFunction[SensorReading, GlobalStats, TimeWindow] {
  override def apply(
    window: TimeWindow,
    input: Iterable[SensorReading],
    out: Collector[GlobalStats]
  ): Unit = {
    val readings = input.toList
    val count = readings.size
    val avgTemp = readings.map(_.temperature).sum / count
    
    out.collect(GlobalStats(window.getStart, window.getEnd, count, avgTemp))
  }
}

// ProcessAllWindowFunction example - with side outputs
class AdvancedGlobalStatsFunction extends ProcessAllWindowFunction[SensorReading, GlobalStats, TimeWindow] {
  override def process(
    context: Context,
    elements: Iterable[SensorReading],
    out: Collector[GlobalStats]
  ): Unit = {
    val readings = elements.toList
    val count = readings.size
    val avgTemp = readings.map(_.temperature).sum / count
    
    out.collect(GlobalStats(context.window.getStart, context.window.getEnd, count, avgTemp))
    
    // Side output for anomalies
    if (avgTemp > 50.0) {
      context.output(
        OutputTag[String]("global-anomalies"),
        s"Global average temperature ${avgTemp}°C is above threshold"
      )
    }
  }
}

// Apply all-window functions
val globalStats = readings
  .windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
  .apply(new GlobalStatsFunction)

val advancedGlobalStats = readings
  .windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
  .process(new AdvancedGlobalStatsFunction)

Rich Window Functions

Rich versions of window functions with lifecycle methods and runtime context access.

abstract class RichWindowFunction[IN, OUT, KEY, W <: Window] 
  extends WindowFunction[IN, OUT, KEY, W] with RichFunction {
  
  override def open(parameters: Configuration): Unit = {}
  override def close(): Unit = {}
  def getRuntimeContext: RuntimeContext
  def setRuntimeContext(t: RuntimeContext): Unit
}

abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window] 
  extends ProcessWindowFunction[IN, OUT, KEY, W] with RichFunction {
  
  override def open(parameters: Configuration): Unit = {}
  override def close(): Unit = {}
  def getRuntimeContext: RuntimeContext
  def setRuntimeContext(t: RuntimeContext): Unit
}

abstract class RichAllWindowFunction[IN, OUT, W <: Window] 
  extends AllWindowFunction[IN, OUT, W] with RichFunction {
  
  override def open(parameters: Configuration): Unit = {}
  override def close(): Unit = {}
  def getRuntimeContext: RuntimeContext
  def setRuntimeContext(t: RuntimeContext): Unit
}

abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window] 
  extends ProcessAllWindowFunction[IN, OUT, W] with RichFunction {
  
  override def open(parameters: Configuration): Unit = {}
  override def close(): Unit = {}
  def getRuntimeContext: RuntimeContext
  def setRuntimeContext(t: RuntimeContext): Unit
}

Usage Examples:

import org.apache.flink.configuration.Configuration

class MetricsWindowFunction extends RichWindowFunction[SensorReading, WindowStats, String, TimeWindow] {
  private var processedWindows: Counter = _
  
  override def open(parameters: Configuration): Unit = {
    processedWindows = getRuntimeContext
      .getMetricGroup
      .counter("processed-windows")
  }
  
  override def apply(
    key: String,
    window: TimeWindow,
    input: Iterable[SensorReading],
    out: Collector[WindowStats]
  ): Unit = {
    // Increment metric
    processedWindows.inc()
    
    // Process window as usual
    val readings = input.toList
    val count = readings.size
    val avgTemp = readings.map(_.temperature).sum / count
    
    out.collect(WindowStats(key, window.getStart, window.getEnd, count, avgTemp, 0.0, 0.0))
  }
}

Types

// Window types
abstract class Window {
  def maxTimestamp(): Long
}

class TimeWindow(start: Long, end: Long) extends Window {
  def getStart: Long = start
  def getEnd: Long = end
  def maxTimestamp(): Long = end - 1
}

class GlobalWindow extends Window {
  def maxTimestamp(): Long = Long.MaxValue
}

// State store for window functions
trait KeyedStateStore {
  def getState[T](stateDescriptor: StateDescriptor[T, _]): State
  def getListState[T](stateDescriptor: ListStateDescriptor[T]): ListState[T]
  def getReducingState[T](stateDescriptor: ReducingStateDescriptor[T]): ReducingState[T]
  def getAggregatingState[IN, ACC, OUT](stateDescriptor: AggregatingStateDescriptor[IN, ACC, OUT]): AggregatingState[IN, OUT]
  def getMapState[UK, UV](stateDescriptor: MapStateDescriptor[UK, UV]): MapState[UK, UV]
}

// State descriptors
class ValueStateDescriptor[T](name: String, typeClass: Class[T])
class ListStateDescriptor[T](name: String, typeClass: Class[T])
class ReducingStateDescriptor[T](name: String, reduceFunction: ReduceFunction[T], typeClass: Class[T])
class MapStateDescriptor[K, V](name: String, keyClass: Class[K], valueClass: Class[V])

// Collector interface
trait Collector[T] {
  def collect(record: T): Unit
  def close(): Unit
}

// Output tag for side outputs
case class OutputTag[T: TypeInformation](id: String) {
  def getTypeInfo: TypeInformation[T]
}

// Rich function base
trait RichFunction {
  def open(parameters: Configuration): Unit
  def close(): Unit
  def getRuntimeContext: RuntimeContext
  def setRuntimeContext(t: RuntimeContext): Unit
}

// Runtime context
trait RuntimeContext {
  def getTaskName: String
  def getMetricGroup: MetricGroup
  def getNumberOfParallelSubtasks: Int
  def getIndexOfThisSubtask: Int
}

// Metrics
trait Counter {
  def inc(): Unit
  def inc(n: Long): Unit
  def getCount: Long
}

trait MetricGroup {
  def counter(name: String): Counter
  def gauge[T](name: String, gauge: Gauge[T]): Gauge[T]
  def histogram(name: String, histogram: Histogram): Histogram
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-12

docs

async-io.md

data-streams.md

execution-environment.md

index.md

keyed-streams.md

processing-functions.md

sinks-output.md

stream-connections.md

window-functions.md

windowing.md

tile.json