Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.
—
KeyedStream represents a partitioned stream where elements are grouped by key, enabling stateful operations, aggregations, and windowing. This is essential for maintaining state per key and performing keyed computations.
Access key type information and stream metadata.
class KeyedStream[T, K] {
/**
* Get the type information for the key
* @return TypeInformation for key type K
*/
def getKeyType: TypeInformation[K]
}Built-in aggregation functions for common operations.
class KeyedStream[T, K] {
/**
* Reduce elements using a reduction function
* @param reducer Function to combine two elements
* @return DataStream with reduced elements per key
*/
def reduce(reducer: ReduceFunction[T]): DataStream[T]
/**
* Reduce elements using a function
* @param fun Function to combine two elements
* @return DataStream with reduced elements per key
*/
def reduce(fun: (T, T) => T): DataStream[T]
/**
* Sum numeric field by position
* @param position Field position for summation
* @return DataStream with summed values per key
*/
def sum(position: Int): DataStream[T]
/**
* Sum numeric field by name
* @param field Field name for summation
* @return DataStream with summed values per key
*/
def sum(field: String): DataStream[T]
/**
* Get maximum value by field position
* @param position Field position for maximum
* @return DataStream with maximum values per key
*/
def max(position: Int): DataStream[T]
/**
* Get maximum value by field name
* @param field Field name for maximum
* @return DataStream with maximum values per key
*/
def max(field: String): DataStream[T]
/**
* Get minimum value by field position
* @param position Field position for minimum
* @return DataStream with minimum values per key
*/
def min(position: Int): DataStream[T]
/**
* Get minimum value by field name
* @param field Field name for minimum
* @return DataStream with minimum values per key
*/
def min(field: String): DataStream[T]
/**
* Get element with maximum value by field position
* @param position Field position for maximum element
* @return DataStream with maximum elements per key
*/
def maxBy(position: Int): DataStream[T]
/**
* Get element with maximum value by field name
* @param field Field name for maximum element
* @return DataStream with maximum elements per key
*/
def maxBy(field: String): DataStream[T]
/**
* Get element with minimum value by field position
* @param position Field position for minimum element
* @return DataStream with minimum elements per key
*/
def minBy(position: Int): DataStream[T]
/**
* Get element with minimum value by field name
* @param field Field name for minimum element
* @return DataStream with minimum elements per key
*/
def minBy(field: String): DataStream[T]
}Usage Examples:
import org.apache.flink.streaming.api.scala._
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
val readings = env.fromElements(
SensorReading("sensor1", 20.0, 1000L),
SensorReading("sensor1", 25.0, 2000L),
SensorReading("sensor2", 15.0, 1500L),
SensorReading("sensor2", 18.0, 2500L)
)
val keyedReadings = readings.keyBy(_.sensorId)
// Sum temperatures by sensor
val totalTemps = keyedReadings.sum("temperature")
// Get maximum temperature per sensor
val maxTemps = keyedReadings.max("temperature")
// Get reading with maximum temperature per sensor
val maxTempReadings = keyedReadings.maxBy("temperature")
// Custom reduction - average temperature
val avgTemps = keyedReadings.reduce((r1, r2) =>
SensorReading(r1.sensorId, (r1.temperature + r2.temperature) / 2, math.max(r1.timestamp, r2.timestamp))
)Apply time or count-based windows to keyed streams.
class KeyedStream[T, K] {
/**
* Apply time-based tumbling windowing (deprecated)
* @param size Window size
* @return WindowedStream for aggregations
*/
@deprecated("Use window(TumblingEventTimeWindows.of(size))", "1.12.0")
def timeWindow(size: Time): WindowedStream[T, K, TimeWindow]
/**
* Apply time-based sliding windowing (deprecated)
* @param size Window size
* @param slide Slide interval
* @return WindowedStream for aggregations
*/
@deprecated("Use window(SlidingEventTimeWindows.of(size, slide))", "1.12.0")
def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow]
/**
* Apply count-based windowing
* @param size Window size (number of elements)
* @return WindowedStream for aggregations
*/
def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]
/**
* Apply sliding count-based windowing
* @param size Window size (number of elements)
* @param slide Slide size (number of elements)
* @return WindowedStream for aggregations
*/
def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow]
/**
* Apply custom windowing
* @param assigner Window assigner implementation
* @return WindowedStream for aggregations
*/
def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]
}Usage Examples:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
val keyedReadings = readings.keyBy(_.sensorId)
// Count-based window - every 5 readings per sensor
val countWindow = keyedReadings
.countWindow(5)
.reduce((r1, r2) => SensorReading(r1.sensorId, math.max(r1.temperature, r2.temperature), r2.timestamp))
// Time-based window - 1 minute tumbling windows
val timeWindow = keyedReadings
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce((r1, r2) => SensorReading(r1.sensorId, (r1.temperature + r2.temperature) / 2, r2.timestamp))Manage per-key state for complex processing logic.
class KeyedStream[T, K] {
/**
* Map with per-key state management
* @param fun Function with state access: (value, state) => (result, newState)
* @return DataStream with stateful mapping results
*/
def mapWithState[R: TypeInformation, S: TypeInformation](
fun: (T, Option[S]) => (R, Option[S])
): DataStream[R]
/**
* FlatMap with per-key state management
* @param fun Function with state access: (value, state) => (results, newState)
* @return DataStream with stateful flatMap results
*/
def flatMapWithState[R: TypeInformation, S: TypeInformation](
fun: (T, Option[S]) => (TraversableOnce[R], Option[S])
): DataStream[R]
/**
* Filter with per-key state management
* @param fun Function with state access: (value, state) => (keep, newState)
* @return DataStream with stateful filtering results
*/
def filterWithState[S: TypeInformation](
fun: (T, Option[S]) => (Boolean, Option[S])
): DataStream[T]
}Usage Examples:
import org.apache.flink.streaming.api.scala._
case class Event(key: String, value: Int, timestamp: Long)
val events = env.fromElements(
Event("A", 1, 1000L),
Event("A", 2, 2000L),
Event("B", 5, 1500L),
Event("A", 3, 3000L)
)
val keyedEvents = events.keyBy(_.key)
// Count events per key with state
val eventCounts = keyedEvents.mapWithState[Int, Int] { (event, count) =>
val newCount = count.getOrElse(0) + 1
(newCount, Some(newCount))
}
// Running sum with state
val runningSums = keyedEvents.mapWithState[Int, Int] { (event, sum) =>
val newSum = sum.getOrElse(0) + event.value
(newSum, Some(newSum))
}
// Filter events based on count state (only keep first 3 events per key)
val limitedEvents = keyedEvents.filterWithState[Int] { (event, count) =>
val currentCount = count.getOrElse(0) + 1
(currentCount <= 3, Some(currentCount))
}Apply custom processing logic with access to timers and state.
class KeyedStream[T, K] {
/**
* Apply a KeyedProcessFunction for low-level processing
* @param keyedProcessFunction ProcessFunction implementation with key access
* @return DataStream with processed results
*/
def process[R: TypeInformation](
keyedProcessFunction: KeyedProcessFunction[K, T, R]
): DataStream[R]
}Make keyed state queryable from external applications.
class KeyedStream[T, K] {
/**
* Make stream queryable with default state descriptor
* @param queryableStateName Name for queryable state
* @return QueryableStateStream for external queries
*/
def asQueryableState(queryableStateName: String): QueryableStateStream[K, T]
/**
* Make stream queryable with custom ValueStateDescriptor
* @param queryableStateName Name for queryable state
* @param stateDescriptor State descriptor for value state
* @return QueryableStateStream for external queries
*/
def asQueryableState(
queryableStateName: String,
stateDescriptor: ValueStateDescriptor[T]
): QueryableStateStream[K, T]
/**
* Make stream queryable with ReducingStateDescriptor
* @param queryableStateName Name for queryable state
* @param stateDescriptor State descriptor for reducing state
* @return QueryableStateStream for external queries
*/
def asQueryableState(
queryableStateName: String,
stateDescriptor: ReducingStateDescriptor[T]
): QueryableStateStream[K, T]
}Join with another keyed stream within a time interval.
class KeyedStream[T, K] {
/**
* Create an interval join with another keyed stream
* @param otherStream Other keyed stream to join with
* @return IntervalJoin for configuring join parameters
*/
def intervalJoin[OTHER](otherStream: KeyedStream[OTHER, K]): IntervalJoin[T, OTHER, K]
}
// IntervalJoin configuration
class IntervalJoin[IN1, IN2, KEY] {
/**
* Define the time interval for the join
* @param lowerBound Lower bound of the time interval
* @param upperBound Upper bound of the time interval
* @return IntervalJoined for processing configuration
*/
def between(lowerBound: Time, upperBound: Time): IntervalJoined[IN1, IN2, KEY]
}
class IntervalJoined[IN1, IN2, KEY] {
/**
* Make lower bound exclusive
* @return IntervalJoined with exclusive lower bound
*/
def lowerBoundExclusive(): IntervalJoined[IN1, IN2, KEY]
/**
* Make upper bound exclusive
* @return IntervalJoined with exclusive upper bound
*/
def upperBoundExclusive(): IntervalJoined[IN1, IN2, KEY]
/**
* Process joined elements with a ProcessJoinFunction
* @param processJoinFunction Function to process joined elements
* @return DataStream with join results
*/
def process[OUT: TypeInformation](
processJoinFunction: ProcessJoinFunction[IN1, IN2, OUT]
): DataStream[OUT]
}Usage Examples:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.util.Collector
case class Order(id: String, customerId: String, amount: Double, timestamp: Long)
case class Payment(id: String, customerId: String, amount: Double, timestamp: Long)
case class OrderPayment(orderId: String, paymentId: String, customerId: String, timestamp: Long)
val orders = env.fromElements(
Order("o1", "c1", 100.0, 1000L),
Order("o2", "c2", 200.0, 2000L)
).keyBy(_.customerId)
val payments = env.fromElements(
Payment("p1", "c1", 100.0, 1100L),
Payment("p2", "c2", 200.0, 2100L)
).keyBy(_.customerId)
// Join orders and payments within 5 minutes
val joined = orders
.intervalJoin(payments)
.between(Time.minutes(-5), Time.minutes(5))
.process(new ProcessJoinFunction[Order, Payment, OrderPayment] {
override def processElement(
left: Order,
right: Payment,
ctx: ProcessJoinFunction[Order, Payment, OrderPayment]#Context,
out: Collector[OrderPayment]
): Unit = {
out.collect(OrderPayment(left.id, right.id, left.customerId, math.max(left.timestamp, right.timestamp)))
}
})// Reduce function interface
trait ReduceFunction[T] {
def reduce(value1: T, value2: T): T
}
// Process function for keyed streams
abstract class KeyedProcessFunction[K, I, O] {
def processElement(value: I, ctx: Context, out: Collector[O]): Unit
def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit = {}
abstract class Context {
def timestamp(): Long
def getCurrentKey: K
def timerService(): TimerService
def output[X](outputTag: OutputTag[X], value: X): Unit
}
abstract class OnTimerContext extends Context {
def timeDomain(): TimeDomain
}
}
// Process join function for interval joins
abstract class ProcessJoinFunction[IN1, IN2, OUT] {
def processElement(left: IN1, right: IN2, ctx: Context, out: Collector[OUT]): Unit
abstract class Context {
def getLeftTimestamp: Long
def getRightTimestamp: Long
def getCurrentWatermark: Long
}
}
// Queryable state stream
class QueryableStateStream[K, V] {
def getQueryableStateName: String
def getKeyType: TypeInformation[K]
def getValueType: TypeInformation[V]
}
// Timer service for processing functions
trait TimerService {
def currentProcessingTime(): Long
def currentWatermark(): Long
def registerProcessingTimeTimer(time: Long): Unit
def registerEventTimeTimer(time: Long): Unit
def deleteProcessingTimeTimer(time: Long): Unit
def deleteEventTimeTimer(time: Long): Unit
}
// Time domain enum
sealed trait TimeDomain
object TimeDomain {
case object EVENT_TIME extends TimeDomain
case object PROCESSING_TIME extends TimeDomain
}
// State descriptors for queryable state
class ValueStateDescriptor[T](name: String, typeInfo: TypeInformation[T])
class ReducingStateDescriptor[T](name: String, reduceFunction: ReduceFunction[T], typeInfo: TypeInformation[T])Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-12