CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-scala-2-12

Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.

Pending
Overview
Eval results
Files

windowing.mddocs/

Windowing Operations

WindowedStream enables bounded computations on infinite streams by grouping elements into finite windows based on time or count. This is essential for aggregations and batch-style processing on streaming data.

Capabilities

Window Configuration

Configure window behavior for late data handling and triggering.

class WindowedStream[T, K, W <: Window] {
  /**
   * Set allowed lateness for late arriving elements
   * @param lateness Maximum allowed lateness
   * @return WindowedStream with lateness configuration
   */
  def allowedLateness(lateness: Time): WindowedStream[T, K, W]
  
  /**
   * Send late data to a side output
   * @param outputTag Tag for late data side output
   * @return WindowedStream with late data handling
   */
  def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W]
  
  /**
   * Set a custom trigger for window firing
   * @param trigger Custom trigger implementation
   * @return WindowedStream with custom trigger
   */
  def trigger(trigger: Trigger[_ >: T, _ >: W]): WindowedStream[T, K, W]
  
  /**
   * Set an evictor for removing elements from windows
   * @param evictor Custom evictor implementation
   * @return WindowedStream with custom evictor
   */
  def evictor(evictor: Evictor[_ >: T, _ >: W]): WindowedStream[T, K, W]
}

Reduction Operations

Apply reduction functions to combine elements within windows.

class WindowedStream[T, K, W <: Window] {
  /**
   * Reduce elements in each window using a ReduceFunction
   * @param function Reduce function to combine elements
   * @return DataStream with reduced window results
   */
  def reduce(function: ReduceFunction[T]): DataStream[T]
  
  /**
   * Reduce elements in each window using a function
   * @param function Function to combine two elements
   * @return DataStream with reduced window results
   */
  def reduce(function: (T, T) => T): DataStream[T]
  
  /**
   * Reduce with pre-aggregation and window function
   * @param preAggregator ReduceFunction for pre-aggregation
   * @param function WindowFunction for final processing
   * @return DataStream with window function results
   */
  def reduce[R: TypeInformation](
    preAggregator: ReduceFunction[T], 
    function: WindowFunction[T, R, K, W]
  ): DataStream[R]
  
  /**
   * Reduce with pre-aggregation and process window function
   * @param preAggregator ReduceFunction for pre-aggregation
   * @param function ProcessWindowFunction for final processing
   * @return DataStream with process window function results
   */
  def reduce[R: TypeInformation](
    preAggregator: ReduceFunction[T], 
    function: ProcessWindowFunction[T, R, K, W]
  ): DataStream[R]
}

Usage Examples:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)

val readings = env.fromElements(
  SensorReading("sensor1", 20.0, 1000L),
  SensorReading("sensor1", 25.0, 2000L),
  SensorReading("sensor1", 22.0, 3000L)
)

val keyedReadings = readings
  .assignAscendingTimestamps(_.timestamp)
  .keyBy(_.sensorId)

// Simple reduction - get maximum temperature per window
val maxTemps = keyedReadings
  .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  .reduce((r1, r2) => if (r1.temperature > r2.temperature) r1 else r2)

// Reduction with window function - add window info
val maxTempsWithWindow = keyedReadings
  .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  .reduce(
    (r1: SensorReading, r2: SensorReading) => if (r1.temperature > r2.temperature) r1 else r2,
    (key: String, window: TimeWindow, readings: Iterable[SensorReading], out: Collector[(String, Double, Long, Long)]) => {
      val maxReading = readings.head
      out.collect((key, maxReading.temperature, window.getStart, window.getEnd))
    }
  )

Aggregation Operations

Apply aggregate functions for more complex computations within windows.

class WindowedStream[T, K, W <: Window] {
  /**
   * Apply an AggregateFunction to window elements
   * @param aggregateFunction Function for incremental aggregation
   * @return DataStream with aggregation results
   */
  def aggregate[ACC: TypeInformation, R: TypeInformation](
    aggregateFunction: AggregateFunction[T, ACC, R]
  ): DataStream[R]
  
  /**
   * Apply aggregation with window function
   * @param aggregateFunction Function for incremental aggregation
   * @param windowFunction Function for final window processing
   * @return DataStream with window function results
   */
  def aggregate[ACC: TypeInformation, R: TypeInformation](
    aggregateFunction: AggregateFunction[T, ACC, R], 
    windowFunction: WindowFunction[R, R, K, W]
  ): DataStream[R]
  
  /**
   * Apply aggregation with process window function
   * @param aggregateFunction Function for incremental aggregation
   * @param windowFunction ProcessWindowFunction for final processing
   * @return DataStream with process window function results
   */
  def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](
    aggregateFunction: AggregateFunction[T, ACC, V], 
    windowFunction: ProcessWindowFunction[V, R, K, W]
  ): DataStream[R]
}

Usage Examples:

import org.apache.flink.api.common.functions.AggregateFunction

// Define a custom aggregate function for average temperature
class AverageAggregateFunction extends AggregateFunction[SensorReading, (Double, Int), Double] {
  override def createAccumulator(): (Double, Int) = (0.0, 0)
  
  override def add(value: SensorReading, accumulator: (Double, Int)): (Double, Int) = 
    (accumulator._1 + value.temperature, accumulator._2 + 1)
  
  override def getResult(accumulator: (Double, Int)): Double = 
    accumulator._1 / accumulator._2
  
  override def merge(a: (Double, Int), b: (Double, Int)): (Double, Int) = 
    (a._1 + b._1, a._2 + b._2)
}

// Apply aggregate function
val avgTemps = keyedReadings
  .window(TumblingEventTimeWindows.of(Time.minutes(1)))
  .aggregate(new AverageAggregateFunction)

Window Functions

Apply functions that operate on complete window contents.

class WindowedStream[T, K, W <: Window] {
  /**
   * Apply a WindowFunction to all elements in each window
   * @param function WindowFunction implementation
   * @return DataStream with window function results
   */
  def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]
  
  /**
   * Apply a function using closure syntax
   * @param function Function with (key, window, elements, collector) parameters
   * @return DataStream with function results
   */
  def apply[R: TypeInformation](
    function: (K, W, Iterable[T], Collector[R]) => Unit
  ): DataStream[R]
  
  /**
   * Apply a ProcessWindowFunction for advanced processing
   * @param function ProcessWindowFunction implementation
   * @return DataStream with process function results
   */
  def process[R: TypeInformation](
    function: ProcessWindowFunction[T, R, K, W]
  ): DataStream[R]
}

Usage Examples:

import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

// Window function to collect all readings with window metadata
val allReadingsWithWindow = keyedReadings
  .window(TumblingEventTimeWindows.of(Time.minutes(1)))
  .apply(new WindowFunction[SensorReading, (String, List[Double], Long, Long), String, TimeWindow] {
    override def apply(
      key: String, 
      window: TimeWindow, 
      input: Iterable[SensorReading], 
      out: Collector[(String, List[Double], Long, Long)]
    ): Unit = {
      val temperatures = input.map(_.temperature).toList
      out.collect((key, temperatures, window.getStart, window.getEnd))
    }
  })

// Using closure syntax
val readingCounts = keyedReadings
  .window(TumblingEventTimeWindows.of(Time.minutes(1)))
  .apply { (key: String, window: TimeWindow, readings: Iterable[SensorReading], out: Collector[(String, Int)]) =>
    out.collect((key, readings.size))
  }

Window Assigners

Window assigners determine how elements are grouped into windows.

// Tumbling time windows
object TumblingEventTimeWindows {
  def of(size: Time): TumblingEventTimeWindows
  def of(size: Time, offset: Time): TumblingEventTimeWindows
}

object TumblingProcessingTimeWindows {
  def of(size: Time): TumblingProcessingTimeWindows
  def of(size: Time, offset: Time): TumblingProcessingTimeWindows
}

// Sliding time windows
object SlidingEventTimeWindows {
  def of(size: Time, slide: Time): SlidingEventTimeWindows
  def of(size: Time, slide: Time, offset: Time): SlidingEventTimeWindows
}

object SlidingProcessingTimeWindows {
  def of(size: Time, slide: Time): SlidingProcessingTimeWindows
  def of(size: Time, slide: Time, offset: Time): SlidingProcessingTimeWindows
}

// Session windows
object EventTimeSessionWindows {
  def withGap(sessionTimeout: Time): EventTimeSessionWindows
  def withDynamicGap(sessionWindowTimeGapExtractor: SessionWindowTimeGapExtractor[Any]): EventTimeSessionWindows
}

object ProcessingTimeSessionWindows {
  def withGap(sessionTimeout: Time): ProcessingTimeSessionWindows
  def withDynamicGap(sessionWindowTimeGapExtractor: SessionWindowTimeGapExtractor[Any]): ProcessingTimeSessionWindows
}

// Global windows (for count-based operations)
object GlobalWindows {
  def create(): GlobalWindows
}

Usage Examples:

import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time

// Tumbling windows - non-overlapping fixed-size windows
val tumblingWindow = keyedReadings
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))

// Sliding windows - overlapping windows  
val slidingWindow = keyedReadings
  .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(2)))

// Session windows - dynamic windows based on inactivity gaps
val sessionWindow = keyedReadings
  .window(EventTimeSessionWindows.withGap(Time.minutes(30)))

Types

// Window types
abstract class Window {
  def maxTimestamp(): Long
}

class TimeWindow(start: Long, end: Long) extends Window {
  def getStart: Long
  def getEnd: Long
  def maxTimestamp(): Long = end - 1
}

class GlobalWindow extends Window {
  def maxTimestamp(): Long = Long.MaxValue
}

// Aggregate function interface
trait AggregateFunction[IN, ACC, OUT] {
  def createAccumulator(): ACC
  def add(value: IN, accumulator: ACC): ACC
  def getResult(accumulator: ACC): OUT
  def merge(a: ACC, b: ACC): ACC
}

// Window function interfaces
trait WindowFunction[IN, OUT, KEY, W <: Window] {
  def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
  def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
  def clear(context: Context): Unit = {}
  
  abstract class Context {
    def window: W
    def currentProcessingTime: Long
    def currentWatermark: Long
    def windowState: KeyedStateStore
    def globalState: KeyedStateStore
    def output[X](outputTag: OutputTag[X], value: X): Unit
  }
}

// Trigger interface for custom window firing
abstract class Trigger[T, W <: Window] {
  def onElement(element: T, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult
  def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult
  def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult
  def clear(window: W, ctx: TriggerContext): Unit
}

// Evictor interface for removing elements
trait Evictor[T, W <: Window] {
  def evictBefore(elements: java.lang.Iterable[TimestampedValue[T]], size: Int, window: W, evictorContext: EvictorContext): Unit
  def evictAfter(elements: java.lang.Iterable[TimestampedValue[T]], size: Int, window: W, evictorContext: EvictorContext): Unit
}

// Trigger results
sealed trait TriggerResult
object TriggerResult {
  case object CONTINUE extends TriggerResult
  case object FIRE extends TriggerResult
  case object PURGE extends TriggerResult
  case object FIRE_AND_PURGE extends TriggerResult
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-12

docs

async-io.md

data-streams.md

execution-environment.md

index.md

keyed-streams.md

processing-functions.md

sinks-output.md

stream-connections.md

window-functions.md

windowing.md

tile.json