KeyedStream represents a partitioned stream where elements with the same key are guaranteed to be processed by the same operator instance. This enables stateful operations, aggregations, and key-based computations.
Built-in aggregation functions for common operations.
/**
* Aggregation operations on keyed streams
*/
class KeyedStream[T, K] {
def sum(field: Int): DataStream[T]
def sum(field: String): DataStream[T]
def min(field: Int): DataStream[T]
def min(field: String): DataStream[T]
def max(field: Int): DataStream[T]
def max(field: String): DataStream[T]
def minBy(field: Int): DataStream[T]
def minBy(field: String): DataStream[T]
def maxBy(field: Int): DataStream[T]
def maxBy(field: String): DataStream[T]
}Usage Examples:
import org.apache.flink.streaming.api.scala._
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
val readings = env.fromElements(
SensorReading("sensor1", 20.0, 1000L),
SensorReading("sensor1", 25.0, 2000L),
SensorReading("sensor2", 15.0, 1500L),
SensorReading("sensor2", 18.0, 2500L)
)
val keyedReadings = readings.keyBy(_.sensorId)
// Sum temperatures by sensor
val tempSum = keyedReadings.sum("temperature")
// Get maximum temperature reading per sensor
val maxTemp = keyedReadings.max("temperature")
// Get the reading with maximum temperature per sensor (entire object)
val maxTempReading = keyedReadings.maxBy("temperature")Custom reduction logic for aggregating elements.
/**
* Reduction operations
*/
class KeyedStream[T, K] {
def reduce(reducer: (T, T) => T): DataStream[T]
def reduce(reducer: ReduceFunction[T]): DataStream[T]
}Usage Examples:
case class WordCount(word: String, count: Int)
val words = env.fromElements(
WordCount("hello", 1),
WordCount("world", 1),
WordCount("hello", 1),
WordCount("flink", 1)
)
val keyedWords = words.keyBy(_.word)
// Reduce to count words
val wordCounts = keyedWords.reduce((w1, w2) => WordCount(w1.word, w1.count + w2.count))
// Using ReduceFunction
val wordCountsFunc = keyedWords.reduce(new ReduceFunction[WordCount] {
override def reduce(value1: WordCount, value2: WordCount): WordCount = {
WordCount(value1.word, value1.count + value2.count)
}
})Apply windowing to keyed streams for time-based or count-based operations.
/**
* Window operations on keyed streams
*/
class KeyedStream[T, K] {
def window[W <: Window](assigner: WindowAssigner[T, W]): WindowedStream[T, K, W]
def timeWindow(size: Time): WindowedStream[T, K, TimeWindow]
def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow]
def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]
def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow]
}Usage Examples:
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
val keyedReadings = readings.keyBy(_.sensorId)
// Tumbling time window of 5 seconds
val windowedReadings = keyedReadings.timeWindow(Time.seconds(5))
// Sliding time window: 10 second window, sliding every 5 seconds
val slidingWindowedReadings = keyedReadings.timeWindow(Time.seconds(10), Time.seconds(5))
// Count window: trigger every 100 elements
val countWindowedReadings = keyedReadings.countWindow(100)
// Custom window assigner
val customWindowedReadings = keyedReadings.window(TumblingEventTimeWindows.of(Time.minutes(1)))Apply custom stateful processing logic.
/**
* Process functions for stateful operations
*/
class KeyedStream[T, K] {
def process[R](function: KeyedProcessFunction[K, T, R]): DataStream[R]
def process[R](function: KeyedProcessFunction[K, T, R], outputType: TypeInformation[R]): DataStream[R]
}Usage Examples:
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
class TemperatureAlertFunction extends KeyedProcessFunction[String, SensorReading, String] {
override def processElement(
value: SensorReading,
ctx: KeyedProcessFunction.Context,
out: Collector[String]
): Unit = {
// Custom stateful logic
if (value.temperature > 30.0) {
out.collect(s"Alert: High temperature ${value.temperature} from ${value.sensorId}")
}
// Set timer for 10 seconds in the future
ctx.timerService().registerEventTimeTimer(value.timestamp + 10000)
}
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction.OnTimerContext,
out: Collector[String]
): Unit = {
out.collect(s"Timer fired at $timestamp for key ${ctx.getCurrentKey}")
}
}
val alerts = keyedReadings.process(new TemperatureAlertFunction())Access and manage keyed state for stateful computations.
/**
* State management operations (available in process functions)
*/
trait KeyedProcessFunction[K, I, O] {
def getRuntimeContext: RuntimeContext
// State descriptors and access methods available through getRuntimeContext
}
// State types available
trait ValueState[T]
trait ListState[T]
trait MapState[UK, UV]
trait ReducingState[T]
trait AggregatingState[IN, OUT]Usage Examples:
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
class StatefulProcessFunction extends KeyedProcessFunction[String, SensorReading, String] {
private var lastTemperature: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
val descriptor = new ValueStateDescriptor[Double]("lastTemperature", classOf[Double])
lastTemperature = getRuntimeContext.getState(descriptor)
}
override def processElement(
value: SensorReading,
ctx: KeyedProcessFunction.Context,
out: Collector[String]
): Unit = {
val lastTemp = Option(lastTemperature.value()).getOrElse(0.0)
if (math.abs(value.temperature - lastTemp) > 5.0) {
out.collect(s"Temperature change detected: $lastTemp -> ${value.temperature}")
}
lastTemperature.update(value.temperature)
}
}Legacy fold operations for custom aggregations.
/**
* Fold operations (deprecated, use process functions instead)
*/
class KeyedStream[T, K] {
@deprecated("Use process functions with state instead", "1.12")
def fold[R](initialValue: R)(folder: (R, T) => R): DataStream[R]
@deprecated("Use process functions with state instead", "1.12")
def fold[R](initialValue: R, folder: FoldFunction[T, R]): DataStream[R]
}Configure properties of the keyed stream.
/**
* Configuration operations
*/
class KeyedStream[T, K] {
def setParallelism(parallelism: Int): KeyedStream[T, K]
def getParallelism: Int
def setMaxParallelism(maxParallelism: Int): KeyedStream[T, K]
def getMaxParallelism: Int
def name(name: String): KeyedStream[T, K]
def uid(uid: String): KeyedStream[T, K]
def setUidHash(uidHash: String): KeyedStream[T, K]
def disableChaining(): KeyedStream[T, K]
def startNewChain(): KeyedStream[T, K]
def slotSharingGroup(slotSharingGroup: String): KeyedStream[T, K]
}Join keyed streams based on time intervals.
/**
* Interval join operations
*/
class KeyedStream[T, K] {
def intervalJoin[T2](otherStream: KeyedStream[T2, K]): IntervalJoin[T, T2, K]
}
class IntervalJoin[T1, T2, K] {
def between(lowerBound: Time, upperBound: Time): IntervalJoin[T1, T2, K]
def process[R](function: ProcessJoinFunction[T1, T2, R]): DataStream[R]
}Usage Examples:
val orders = env.fromElements(/* order events */).keyBy(_.orderId)
val payments = env.fromElements(/* payment events */).keyBy(_.orderId)
// Join orders and payments within 10 minutes
val joinedStream = orders
.intervalJoin(payments)
.between(Time.minutes(-10), Time.minutes(10))
.process(new ProcessJoinFunction[Order, Payment, OrderPayment] {
override def processElement(
left: Order,
right: Payment,
ctx: ProcessJoinFunction.Context,
out: Collector[OrderPayment]
): Unit = {
out.collect(OrderPayment(left, right))
}
})// Main keyed stream class
class KeyedStream[T, K]
// Window-related types
class WindowedStream[T, K, W <: Window]
trait Window
class TimeWindow extends Window
class GlobalWindow extends Window
// Function types
trait ReduceFunction[T]
trait FoldFunction[T, O] // Deprecated
trait KeyedProcessFunction[K, I, O]
trait ProcessJoinFunction[IN1, IN2, OUT]
// State types
trait ValueState[T]
trait ListState[T]
trait MapState[UK, UV]
trait ReducingState[T]
trait AggregatingState[IN, OUT]
// State descriptors
class ValueStateDescriptor[T]
class ListStateDescriptor[T]
class MapStateDescriptor[UK, UV]
class ReducingStateDescriptor[T]
class AggregatingStateDescriptor[IN, ACC, OUT]
// Join types
class IntervalJoin[T1, T2, K]
// Time types
class Time