User-defined functions (UDFs) allow you to implement custom processing logic in Flink. The Scala API provides various function interfaces for different use cases, from simple transformations to complex stateful processing.
Core process functions for custom stream processing logic.
/**
* ProcessFunction for single stream processing
*/
trait ProcessFunction[I, O] {
def processElement(value: I, ctx: ProcessFunction.Context, out: Collector[O]): Unit
def onTimer(timestamp: Long, ctx: ProcessFunction.OnTimerContext, out: Collector[O]): Unit = {}
trait Context {
def element(): I
def timestamp(): Long
def timerService(): TimerService
def output[X](outputTag: OutputTag[X], value: X): Unit
}
trait OnTimerContext extends Context {
def timeDomain(): TimeDomain
}
}
/**
* KeyedProcessFunction for keyed stream processing
*/
trait KeyedProcessFunction[K, I, O] {
def processElement(value: I, ctx: KeyedProcessFunction.Context, out: Collector[O]): Unit
def onTimer(timestamp: Long, ctx: KeyedProcessFunction.OnTimerContext, out: Collector[O]): Unit = {}
trait Context {
def getCurrentKey: K
def element(): I
def timestamp(): Long
def timerService(): TimerService
def output[X](outputTag: OutputTag[X], value: X): Unit
}
trait OnTimerContext extends Context {
def timeDomain(): TimeDomain
}
}Usage Examples:
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.TimeDomain
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
// Simple process function
class TemperatureAlertFunction extends ProcessFunction[SensorReading, String] {
override def processElement(
reading: SensorReading,
ctx: ProcessFunction.Context,
out: Collector[String]
): Unit = {
if (reading.temperature > 30.0) {
out.collect(s"High temperature alert: ${reading.temperature}°C from ${reading.sensorId}")
}
// Set timer for 10 seconds later
ctx.timerService().registerEventTimeTimer(reading.timestamp + 10000)
}
override def onTimer(
timestamp: Long,
ctx: ProcessFunction.OnTimerContext,
out: Collector[String]
): Unit = {
out.collect(s"Timer fired at $timestamp")
}
}
// Keyed process function with state
class TemperatureMonitorFunction extends KeyedProcessFunction[String, SensorReading, String] {
private var lastTemperature: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
val descriptor = new ValueStateDescriptor[Double]("lastTemp", classOf[Double])
lastTemperature = getRuntimeContext.getState(descriptor)
}
override def processElement(
reading: SensorReading,
ctx: KeyedProcessFunction.Context,
out: Collector[String]
): Unit = {
val previousTemp = Option(lastTemperature.value()).getOrElse(0.0)
val currentTemp = reading.temperature
if (math.abs(currentTemp - previousTemp) > 5.0) {
out.collect(s"Temperature spike detected for ${ctx.getCurrentKey}: $previousTemp -> $currentTemp")
}
lastTemperature.update(currentTemp)
}
}Functions for processing windowed data.
/**
* WindowFunction for processing window contents
*/
trait WindowFunction[IN, OUT, KEY, W <: Window] {
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}
/**
* ProcessWindowFunction with rich context
*/
trait ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
trait Context {
def window: W
def currentProcessingTime: Long
def currentWatermark: Long
def windowState: KeyedStateStore
def globalState: KeyedStateStore
def output[X](outputTag: OutputTag[X], value: X): Unit
}
}
/**
* AllWindowFunction for non-keyed windows
*/
trait AllWindowFunction[IN, OUT, W <: Window] {
def apply(window: W, values: Iterable[IN], out: Collector[OUT]): Unit
}
/**
* ProcessAllWindowFunction for non-keyed windows with rich context
*/
trait ProcessAllWindowFunction[IN, OUT, W <: Window] {
def process(context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
trait Context {
def window: W
def currentProcessingTime: Long
def currentWatermark: Long
def windowState: KeyedStateStore
def globalState: KeyedStateStore
def output[X](outputTag: OutputTag[X], value: X): Unit
}
}Usage Examples:
import org.apache.flink.streaming.api.scala.function.{WindowFunction, ProcessWindowFunction}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
// Simple window function
class TemperatureStatsFunction extends WindowFunction[SensorReading, String, String, TimeWindow] {
override def apply(
sensorId: String,
window: TimeWindow,
input: Iterable[SensorReading],
out: Collector[String]
): Unit = {
val readings = input.toList
val count = readings.length
val avgTemp = readings.map(_.temperature).sum / count
val minTemp = readings.map(_.temperature).min
val maxTemp = readings.map(_.temperature).max
out.collect(s"Sensor $sensorId: Window [${window.getStart}-${window.getEnd}], " +
s"Count: $count, Avg: $avgTemp, Min: $minTemp, Max: $maxTemp")
}
}
// Process window function with state
class ProcessTemperatureStatsFunction extends ProcessWindowFunction[SensorReading, String, String, TimeWindow] {
override def process(
sensorId: String,
context: Context,
elements: Iterable[SensorReading],
out: Collector[String]
): Unit = {
val readings = elements.toList
val stats = calculateStats(readings)
// Access window metadata
val windowStart = context.window.getStart
val windowEnd = context.window.getEnd
val watermark = context.currentWatermark
// Use window state for cross-window information
val windowCountDescriptor = new ValueStateDescriptor[Long]("windowCount", classOf[Long])
val windowCount = context.windowState.getState(windowCountDescriptor)
val currentCount = Option(windowCount.value()).getOrElse(0L) + 1
windowCount.update(currentCount)
out.collect(s"Sensor $sensorId: Window #$currentCount [$windowStart-$windowEnd], " +
s"Stats: $stats, Watermark: $watermark")
}
private def calculateStats(readings: List[SensorReading]): String = {
val temps = readings.map(_.temperature)
s"Count: ${temps.length}, Avg: ${temps.sum / temps.length}, Min: ${temps.min}, Max: ${temps.max}"
}
}Rich versions of functions with lifecycle methods and runtime context access.
/**
* RichFunction base trait
*/
trait RichFunction {
def open(parameters: Configuration): Unit = {}
def close(): Unit = {}
def getRuntimeContext: RuntimeContext
def setRuntimeContext(context: RuntimeContext): Unit
def getIterationRuntimeContext: IterationRuntimeContext
}
/**
* Rich process functions
*/
abstract class RichProcessFunction[I, O] extends ProcessFunction[I, O] with RichFunction
abstract class RichKeyedProcessFunction[K, I, O] extends KeyedProcessFunction[K, I, O] with RichFunction
/**
* Rich window functions
*/
abstract class RichWindowFunction[IN, OUT, KEY, W <: Window] extends WindowFunction[IN, OUT, KEY, W] with RichFunction
abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window] extends ProcessWindowFunction[IN, OUT, KEY, W] with RichFunction
abstract class RichAllWindowFunction[IN, OUT, W <: Window] extends AllWindowFunction[IN, OUT, W] with RichFunction
abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window] extends ProcessAllWindowFunction[IN, OUT, W] with RichFunctionUsage Examples:
import org.apache.flink.streaming.api.scala.function.RichProcessFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.metrics.Counter
import org.apache.flink.configuration.Configuration
class RichTemperatureProcessor extends RichProcessFunction[SensorReading, String] {
private var highTempCount: Counter = _
private var lastProcessingTime: ValueState[Long] = _
override def open(parameters: Configuration): Unit = {
// Initialize metrics
highTempCount = getRuntimeContext
.getMetricGroup
.addGroup("temperature")
.counter("high_temp_count")
// Initialize state
val descriptor = new ValueStateDescriptor[Long]("lastProcessingTime", classOf[Long])
lastProcessingTime = getRuntimeContext.getState(descriptor)
}
override def close(): Unit = {
// Cleanup resources if needed
}
override def processElement(
reading: SensorReading,
ctx: ProcessFunction.Context,
out: Collector[String]
): Unit = {
val currentTime = ctx.timerService().currentProcessingTime()
val lastTime = Option(lastProcessingTime.value()).getOrElse(0L)
if (reading.temperature > 30.0) {
highTempCount.inc()
out.collect(s"High temperature: ${reading.temperature}°C")
}
// Update processing time
lastProcessingTime.update(currentTime)
// Access runtime context information
val subtaskIndex = getRuntimeContext.getIndexOfThisSubtask
val parallelism = getRuntimeContext.getNumberOfParallelSubtasks
out.collect(s"Processed by subtask $subtaskIndex/$parallelism at time $currentTime")
}
}Functions that maintain state across elements.
/**
* StatefulFunction marker trait
*/
trait StatefulFunction
/**
* State descriptors for different state types
*/
class ValueStateDescriptor[T](name: String, typeClass: Class[T])
class ListStateDescriptor[T](name: String, typeClass: Class[T])
class MapStateDescriptor[UK, UV](name: String, keyClass: Class[UK], valueClass: Class[UV])
class ReducingStateDescriptor[T](name: String, reduceFunction: ReduceFunction[T], typeClass: Class[T])
class AggregatingStateDescriptor[IN, ACC, OUT](
name: String,
aggregateFunction: AggregateFunction[IN, ACC, OUT],
accClass: Class[ACC]
)Usage Examples:
import org.apache.flink.api.common.state._
import org.apache.flink.api.common.functions.ReduceFunction
class StatefulWordCounter extends RichKeyedProcessFunction[String, String, (String, Long)] {
private var wordCount: ValueState[Long] = _
private var wordHistory: ListState[String] = _
private var wordTimestamps: MapState[String, Long] = _
private var totalCount: ReducingState[Long] = _
override def open(parameters: Configuration): Unit = {
// Value state
wordCount = getRuntimeContext.getState(
new ValueStateDescriptor[Long]("wordCount", classOf[Long])
)
// List state
wordHistory = getRuntimeContext.getListState(
new ListStateDescriptor[String]("wordHistory", classOf[String])
)
// Map state
wordTimestamps = getRuntimeContext.getMapState(
new MapStateDescriptor[String, Long]("wordTimestamps", classOf[String], classOf[Long])
)
// Reducing state
totalCount = getRuntimeContext.getReducingState(
new ReducingStateDescriptor[Long](
"totalCount",
new ReduceFunction[Long] {
override def reduce(value1: Long, value2: Long): Long = value1 + value2
},
classOf[Long]
)
)
}
override def processElement(
word: String,
ctx: KeyedProcessFunction.Context,
out: Collector[(String, Long)]
): Unit = {
// Update value state
val currentCount = Option(wordCount.value()).getOrElse(0L) + 1
wordCount.update(currentCount)
// Update list state
wordHistory.add(word)
// Update map state
wordTimestamps.put(word, ctx.timestamp())
// Update reducing state
totalCount.add(1L)
out.collect((word, currentCount))
}
}Service for registering and managing timers in process functions.
/**
* TimerService for managing timers
*/
trait TimerService {
def currentProcessingTime(): Long
def currentWatermark(): Long
def registerProcessingTimeTimer(time: Long): Unit
def registerEventTimeTimer(time: Long): Unit
def deleteProcessingTimeTimer(time: Long): Unit
def deleteEventTimeTimer(time: Long): Unit
}
/**
* TimeDomain enumeration
*/
enum TimeDomain {
EVENT_TIME, PROCESSING_TIME
}Usage Examples:
class TimerBasedProcessor extends KeyedProcessFunction[String, SensorReading, String] {
private var lastReading: ValueState[SensorReading] = _
override def open(parameters: Configuration): Unit = {
lastReading = getRuntimeContext.getState(
new ValueStateDescriptor[SensorReading]("lastReading", classOf[SensorReading])
)
}
override def processElement(
reading: SensorReading,
ctx: KeyedProcessFunction.Context,
out: Collector[String]
): Unit = {
// Store current reading
lastReading.update(reading)
// Register timer for 30 seconds later (event time)
val timerTime = reading.timestamp + 30000
ctx.timerService().registerEventTimeTimer(timerTime)
// Register processing time timer for 1 minute later
val processingTimer = ctx.timerService().currentProcessingTime() + 60000
ctx.timerService().registerProcessingTimeTimer(processingTimer)
out.collect(s"Processed reading: $reading")
}
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction.OnTimerContext,
out: Collector[String]
): Unit = {
val key = ctx.getCurrentKey
val timeDomain = ctx.timeDomain()
val lastReadingValue = lastReading.value()
timeDomain match {
case TimeDomain.EVENT_TIME =>
out.collect(s"Event time timer fired for $key at $timestamp. Last reading: $lastReadingValue")
case TimeDomain.PROCESSING_TIME =>
out.collect(s"Processing time timer fired for $key at $timestamp. Last reading: $lastReadingValue")
}
// Clean up state if needed
if (lastReadingValue != null && timestamp - lastReadingValue.timestamp > 300000) {
lastReading.clear()
}
}
}// Core function types
trait ProcessFunction[I, O]
trait KeyedProcessFunction[K, I, O]
trait WindowFunction[IN, OUT, KEY, W <: Window]
trait ProcessWindowFunction[IN, OUT, KEY, W <: Window]
trait AllWindowFunction[IN, OUT, W <: Window]
trait ProcessAllWindowFunction[IN, OUT, W <: Window]
// Rich function types
trait RichFunction
abstract class RichProcessFunction[I, O]
abstract class RichKeyedProcessFunction[K, I, O]
abstract class RichWindowFunction[IN, OUT, KEY, W <: Window]
abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window]
abstract class RichAllWindowFunction[IN, OUT, W <: Window]
abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window]
// State types
trait ValueState[T]
trait ListState[T]
trait MapState[UK, UV]
trait ReducingState[T]
trait AggregatingState[IN, OUT]
// State descriptors
class ValueStateDescriptor[T]
class ListStateDescriptor[T]
class MapStateDescriptor[UK, UV]
class ReducingStateDescriptor[T]
class AggregatingStateDescriptor[IN, ACC, OUT]
// Context and service types
trait TimerService
enum TimeDomain
trait KeyedStateStore
class Configuration
trait RuntimeContext
trait IterationRuntimeContext
trait Counter
trait MetricGroup
// Window types
trait Window
class TimeWindow
class GlobalWindow
// Utility types
trait Collector[T]
class OutputTag[T]