or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-operations.mddata-stream.mdfunctions.mdindex.mdjoining.mdkeyed-stream.mdstream-execution-environment.mdwindowing.md
tile.json

functions.mddocs/

Functions

User-defined functions (UDFs) allow you to implement custom processing logic in Flink. The Scala API provides various function interfaces for different use cases, from simple transformations to complex stateful processing.

Capabilities

Process Functions

Core process functions for custom stream processing logic.

/**
 * ProcessFunction for single stream processing
 */
trait ProcessFunction[I, O] {
  def processElement(value: I, ctx: ProcessFunction.Context, out: Collector[O]): Unit
  
  def onTimer(timestamp: Long, ctx: ProcessFunction.OnTimerContext, out: Collector[O]): Unit = {}
  
  trait Context {
    def element(): I
    def timestamp(): Long
    def timerService(): TimerService
    def output[X](outputTag: OutputTag[X], value: X): Unit
  }
  
  trait OnTimerContext extends Context {
    def timeDomain(): TimeDomain
  }
}

/**
 * KeyedProcessFunction for keyed stream processing
 */
trait KeyedProcessFunction[K, I, O] {
  def processElement(value: I, ctx: KeyedProcessFunction.Context, out: Collector[O]): Unit
  
  def onTimer(timestamp: Long, ctx: KeyedProcessFunction.OnTimerContext, out: Collector[O]): Unit = {}
  
  trait Context {
    def getCurrentKey: K
    def element(): I
    def timestamp(): Long
    def timerService(): TimerService
    def output[X](outputTag: OutputTag[X], value: X): Unit
  }
  
  trait OnTimerContext extends Context {
    def timeDomain(): TimeDomain
  }
}

Usage Examples:

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.TimeDomain

case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)

// Simple process function
class TemperatureAlertFunction extends ProcessFunction[SensorReading, String] {
  
  override def processElement(
    reading: SensorReading,
    ctx: ProcessFunction.Context,
    out: Collector[String]
  ): Unit = {
    if (reading.temperature > 30.0) {
      out.collect(s"High temperature alert: ${reading.temperature}°C from ${reading.sensorId}")
    }
    
    // Set timer for 10 seconds later
    ctx.timerService().registerEventTimeTimer(reading.timestamp + 10000)
  }
  
  override def onTimer(
    timestamp: Long,
    ctx: ProcessFunction.OnTimerContext,
    out: Collector[String]
  ): Unit = {
    out.collect(s"Timer fired at $timestamp")
  }
}

// Keyed process function with state
class TemperatureMonitorFunction extends KeyedProcessFunction[String, SensorReading, String] {
  
  private var lastTemperature: ValueState[Double] = _
  
  override def open(parameters: Configuration): Unit = {
    val descriptor = new ValueStateDescriptor[Double]("lastTemp", classOf[Double])
    lastTemperature = getRuntimeContext.getState(descriptor)
  }
  
  override def processElement(
    reading: SensorReading,
    ctx: KeyedProcessFunction.Context,
    out: Collector[String]
  ): Unit = {
    val previousTemp = Option(lastTemperature.value()).getOrElse(0.0)
    val currentTemp = reading.temperature
    
    if (math.abs(currentTemp - previousTemp) > 5.0) {
      out.collect(s"Temperature spike detected for ${ctx.getCurrentKey}: $previousTemp -> $currentTemp")
    }
    
    lastTemperature.update(currentTemp)
  }
}

Window Functions

Functions for processing windowed data.

/**
 * WindowFunction for processing window contents
 */
trait WindowFunction[IN, OUT, KEY, W <: Window] {
  def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}

/**
 * ProcessWindowFunction with rich context
 */
trait ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
  def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
  
  trait Context {
    def window: W
    def currentProcessingTime: Long
    def currentWatermark: Long
    def windowState: KeyedStateStore
    def globalState: KeyedStateStore
    def output[X](outputTag: OutputTag[X], value: X): Unit
  }
}

/**
 * AllWindowFunction for non-keyed windows
 */
trait AllWindowFunction[IN, OUT, W <: Window] {
  def apply(window: W, values: Iterable[IN], out: Collector[OUT]): Unit
}

/**
 * ProcessAllWindowFunction for non-keyed windows with rich context
 */
trait ProcessAllWindowFunction[IN, OUT, W <: Window] {
  def process(context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
  
  trait Context {
    def window: W
    def currentProcessingTime: Long
    def currentWatermark: Long
    def windowState: KeyedStateStore
    def globalState: KeyedStateStore
    def output[X](outputTag: OutputTag[X], value: X): Unit
  }
}

Usage Examples:

import org.apache.flink.streaming.api.scala.function.{WindowFunction, ProcessWindowFunction}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

// Simple window function
class TemperatureStatsFunction extends WindowFunction[SensorReading, String, String, TimeWindow] {
  
  override def apply(
    sensorId: String,
    window: TimeWindow,
    input: Iterable[SensorReading],
    out: Collector[String]
  ): Unit = {
    val readings = input.toList
    val count = readings.length
    val avgTemp = readings.map(_.temperature).sum / count
    val minTemp = readings.map(_.temperature).min
    val maxTemp = readings.map(_.temperature).max
    
    out.collect(s"Sensor $sensorId: Window [${window.getStart}-${window.getEnd}], " +
               s"Count: $count, Avg: $avgTemp, Min: $minTemp, Max: $maxTemp")
  }
}

// Process window function with state
class ProcessTemperatureStatsFunction extends ProcessWindowFunction[SensorReading, String, String, TimeWindow] {
  
  override def process(
    sensorId: String,
    context: Context,
    elements: Iterable[SensorReading],
    out: Collector[String]
  ): Unit = {
    val readings = elements.toList
    val stats = calculateStats(readings)
    
    // Access window metadata
    val windowStart = context.window.getStart
    val windowEnd = context.window.getEnd
    val watermark = context.currentWatermark
    
    // Use window state for cross-window information
    val windowCountDescriptor = new ValueStateDescriptor[Long]("windowCount", classOf[Long])
    val windowCount = context.windowState.getState(windowCountDescriptor)
    val currentCount = Option(windowCount.value()).getOrElse(0L) + 1
    windowCount.update(currentCount)
    
    out.collect(s"Sensor $sensorId: Window #$currentCount [$windowStart-$windowEnd], " +
               s"Stats: $stats, Watermark: $watermark")
  }
  
  private def calculateStats(readings: List[SensorReading]): String = {
    val temps = readings.map(_.temperature)
    s"Count: ${temps.length}, Avg: ${temps.sum / temps.length}, Min: ${temps.min}, Max: ${temps.max}"
  }
}

Rich Functions

Rich versions of functions with lifecycle methods and runtime context access.

/**
 * RichFunction base trait
 */
trait RichFunction {
  def open(parameters: Configuration): Unit = {}
  def close(): Unit = {}
  def getRuntimeContext: RuntimeContext
  def setRuntimeContext(context: RuntimeContext): Unit
  def getIterationRuntimeContext: IterationRuntimeContext
}

/**
 * Rich process functions
 */
abstract class RichProcessFunction[I, O] extends ProcessFunction[I, O] with RichFunction
abstract class RichKeyedProcessFunction[K, I, O] extends KeyedProcessFunction[K, I, O] with RichFunction

/**
 * Rich window functions
 */
abstract class RichWindowFunction[IN, OUT, KEY, W <: Window] extends WindowFunction[IN, OUT, KEY, W] with RichFunction
abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window] extends ProcessWindowFunction[IN, OUT, KEY, W] with RichFunction
abstract class RichAllWindowFunction[IN, OUT, W <: Window] extends AllWindowFunction[IN, OUT, W] with RichFunction
abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window] extends ProcessAllWindowFunction[IN, OUT, W] with RichFunction

Usage Examples:

import org.apache.flink.streaming.api.scala.function.RichProcessFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.metrics.Counter
import org.apache.flink.configuration.Configuration

class RichTemperatureProcessor extends RichProcessFunction[SensorReading, String] {
  
  private var highTempCount: Counter = _
  private var lastProcessingTime: ValueState[Long] = _
  
  override def open(parameters: Configuration): Unit = {
    // Initialize metrics
    highTempCount = getRuntimeContext
      .getMetricGroup
      .addGroup("temperature")
      .counter("high_temp_count")
    
    // Initialize state
    val descriptor = new ValueStateDescriptor[Long]("lastProcessingTime", classOf[Long])
    lastProcessingTime = getRuntimeContext.getState(descriptor)
  }
  
  override def close(): Unit = {
    // Cleanup resources if needed
  }
  
  override def processElement(
    reading: SensorReading,
    ctx: ProcessFunction.Context,
    out: Collector[String]
  ): Unit = {
    val currentTime = ctx.timerService().currentProcessingTime()
    val lastTime = Option(lastProcessingTime.value()).getOrElse(0L)
    
    if (reading.temperature > 30.0) {
      highTempCount.inc()
      out.collect(s"High temperature: ${reading.temperature}°C")
    }
    
    // Update processing time
    lastProcessingTime.update(currentTime)
    
    // Access runtime context information
    val subtaskIndex = getRuntimeContext.getIndexOfThisSubtask
    val parallelism = getRuntimeContext.getNumberOfParallelSubtasks
    
    out.collect(s"Processed by subtask $subtaskIndex/$parallelism at time $currentTime")
  }
}

Stateful Functions

Functions that maintain state across elements.

/**
 * StatefulFunction marker trait
 */
trait StatefulFunction

/**
 * State descriptors for different state types
 */
class ValueStateDescriptor[T](name: String, typeClass: Class[T])
class ListStateDescriptor[T](name: String, typeClass: Class[T])
class MapStateDescriptor[UK, UV](name: String, keyClass: Class[UK], valueClass: Class[UV])
class ReducingStateDescriptor[T](name: String, reduceFunction: ReduceFunction[T], typeClass: Class[T])
class AggregatingStateDescriptor[IN, ACC, OUT](
  name: String,
  aggregateFunction: AggregateFunction[IN, ACC, OUT],
  accClass: Class[ACC]
)

Usage Examples:

import org.apache.flink.api.common.state._
import org.apache.flink.api.common.functions.ReduceFunction

class StatefulWordCounter extends RichKeyedProcessFunction[String, String, (String, Long)] {
  
  private var wordCount: ValueState[Long] = _
  private var wordHistory: ListState[String] = _
  private var wordTimestamps: MapState[String, Long] = _
  private var totalCount: ReducingState[Long] = _
  
  override def open(parameters: Configuration): Unit = {
    // Value state
    wordCount = getRuntimeContext.getState(
      new ValueStateDescriptor[Long]("wordCount", classOf[Long])
    )
    
    // List state
    wordHistory = getRuntimeContext.getListState(
      new ListStateDescriptor[String]("wordHistory", classOf[String])
    )
    
    // Map state
    wordTimestamps = getRuntimeContext.getMapState(
      new MapStateDescriptor[String, Long]("wordTimestamps", classOf[String], classOf[Long])
    )
    
    // Reducing state
    totalCount = getRuntimeContext.getReducingState(
      new ReducingStateDescriptor[Long](
        "totalCount",
        new ReduceFunction[Long] {
          override def reduce(value1: Long, value2: Long): Long = value1 + value2
        },
        classOf[Long]
      )
    )
  }
  
  override def processElement(
    word: String,
    ctx: KeyedProcessFunction.Context,
    out: Collector[(String, Long)]
  ): Unit = {
    // Update value state
    val currentCount = Option(wordCount.value()).getOrElse(0L) + 1
    wordCount.update(currentCount)
    
    // Update list state
    wordHistory.add(word)
    
    // Update map state
    wordTimestamps.put(word, ctx.timestamp())
    
    // Update reducing state
    totalCount.add(1L)
    
    out.collect((word, currentCount))
  }
}

Timer Service

Service for registering and managing timers in process functions.

/**
 * TimerService for managing timers
 */
trait TimerService {
  def currentProcessingTime(): Long
  def currentWatermark(): Long
  def registerProcessingTimeTimer(time: Long): Unit
  def registerEventTimeTimer(time: Long): Unit
  def deleteProcessingTimeTimer(time: Long): Unit
  def deleteEventTimeTimer(time: Long): Unit
}

/**
 * TimeDomain enumeration
 */
enum TimeDomain {
  EVENT_TIME, PROCESSING_TIME
}

Usage Examples:

class TimerBasedProcessor extends KeyedProcessFunction[String, SensorReading, String] {
  
  private var lastReading: ValueState[SensorReading] = _
  
  override def open(parameters: Configuration): Unit = {
    lastReading = getRuntimeContext.getState(
      new ValueStateDescriptor[SensorReading]("lastReading", classOf[SensorReading])
    )
  }
  
  override def processElement(
    reading: SensorReading,
    ctx: KeyedProcessFunction.Context,
    out: Collector[String]
  ): Unit = {
    // Store current reading
    lastReading.update(reading)
    
    // Register timer for 30 seconds later (event time)
    val timerTime = reading.timestamp + 30000
    ctx.timerService().registerEventTimeTimer(timerTime)
    
    // Register processing time timer for 1 minute later
    val processingTimer = ctx.timerService().currentProcessingTime() + 60000
    ctx.timerService().registerProcessingTimeTimer(processingTimer)
    
    out.collect(s"Processed reading: $reading")
  }
  
  override def onTimer(
    timestamp: Long,
    ctx: KeyedProcessFunction.OnTimerContext,
    out: Collector[String]
  ): Unit = {
    val key = ctx.getCurrentKey
    val timeDomain = ctx.timeDomain()
    val lastReadingValue = lastReading.value()
    
    timeDomain match {
      case TimeDomain.EVENT_TIME =>
        out.collect(s"Event time timer fired for $key at $timestamp. Last reading: $lastReadingValue")
      case TimeDomain.PROCESSING_TIME =>
        out.collect(s"Processing time timer fired for $key at $timestamp. Last reading: $lastReadingValue")
    }
    
    // Clean up state if needed
    if (lastReadingValue != null && timestamp - lastReadingValue.timestamp > 300000) {
      lastReading.clear()
    }
  }
}

Types

// Core function types
trait ProcessFunction[I, O]
trait KeyedProcessFunction[K, I, O]
trait WindowFunction[IN, OUT, KEY, W <: Window]
trait ProcessWindowFunction[IN, OUT, KEY, W <: Window]
trait AllWindowFunction[IN, OUT, W <: Window]
trait ProcessAllWindowFunction[IN, OUT, W <: Window]

// Rich function types
trait RichFunction
abstract class RichProcessFunction[I, O]
abstract class RichKeyedProcessFunction[K, I, O]
abstract class RichWindowFunction[IN, OUT, KEY, W <: Window]
abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window]
abstract class RichAllWindowFunction[IN, OUT, W <: Window]
abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window]

// State types
trait ValueState[T]
trait ListState[T]
trait MapState[UK, UV]
trait ReducingState[T]
trait AggregatingState[IN, OUT]

// State descriptors
class ValueStateDescriptor[T]
class ListStateDescriptor[T]
class MapStateDescriptor[UK, UV]
class ReducingStateDescriptor[T]
class AggregatingStateDescriptor[IN, ACC, OUT]

// Context and service types
trait TimerService
enum TimeDomain
trait KeyedStateStore
class Configuration
trait RuntimeContext
trait IterationRuntimeContext
trait Counter
trait MetricGroup

// Window types
trait Window
class TimeWindow
class GlobalWindow

// Utility types
trait Collector[T]
class OutputTag[T]