Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.
—
Low-level processing functions provide access to element processing with timers, state, and side outputs. These are essential for complex event-driven logic and stateful stream processing.
Basic processing function for DataStream operations.
abstract class ProcessFunction[I, O] {
/**
* Process each element from the input stream
* @param value Input element to process
* @param ctx Context providing access to timestamp, timers, and side outputs
* @param out Collector for emitting output elements
*/
def processElement(value: I, ctx: Context, out: Collector[O]): Unit
/**
* Called when a timer fires (optional override)
* @param timestamp Timestamp of the fired timer
* @param ctx OnTimerContext providing timer information
* @param out Collector for emitting output elements
*/
def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit = {}
abstract class Context {
def timestamp(): Long
def timerService(): TimerService
def output[X](outputTag: OutputTag[X], value: X): Unit
}
abstract class OnTimerContext extends Context {
def timeDomain(): TimeDomain
}
}Processing function for KeyedStream operations with access to keys.
abstract class KeyedProcessFunction[K, I, O] {
/**
* Process each element from the input keyed stream
* @param value Input element to process
* @param ctx Context providing access to key, timestamp, timers, and side outputs
* @param out Collector for emitting output elements
*/
def processElement(value: I, ctx: Context, out: Collector[O]): Unit
/**
* Called when a timer fires (optional override)
* @param timestamp Timestamp of the fired timer
* @param ctx OnTimerContext providing timer and key information
* @param out Collector for emitting output elements
*/
def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit = {}
abstract class Context {
def timestamp(): Long
def getCurrentKey: K
def timerService(): TimerService
def output[X](outputTag: OutputTag[X], value: X): Unit
}
abstract class OnTimerContext extends Context {
def timeDomain(): TimeDomain
}
}Usage Examples:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
case class Alert(sensorId: String, message: String, timestamp: Long)
// ProcessFunction example - detect temperature spikes
class TemperatureSpikeFunction extends ProcessFunction[SensorReading, Alert] {
override def processElement(
reading: SensorReading,
ctx: ProcessFunction[SensorReading, Alert]#Context,
out: Collector[Alert]
): Unit = {
if (reading.temperature > 50.0) {
out.collect(Alert(
reading.sensorId,
s"High temperature detected: ${reading.temperature}°C",
ctx.timestamp()
))
}
}
}
// KeyedProcessFunction example - timeout detection
class SensorTimeoutFunction extends KeyedProcessFunction[String, SensorReading, Alert] {
override def processElement(
reading: SensorReading,
ctx: KeyedProcessFunction[String, SensorReading, Alert]#Context,
out: Collector[Alert]
): Unit = {
// Set a timer for 60 seconds from now
val timeoutTime = ctx.timestamp() + 60000
ctx.timerService().registerEventTimeTimer(timeoutTime)
}
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[String, SensorReading, Alert]#OnTimerContext,
out: Collector[Alert]
): Unit = {
out.collect(Alert(
ctx.getCurrentKey,
s"Sensor ${ctx.getCurrentKey} has been inactive for 60 seconds",
timestamp
))
}
}
// Apply processing functions
val readings = env.fromElements(
SensorReading("sensor1", 45.0, 1000L),
SensorReading("sensor2", 55.0, 2000L)
)
// Apply process function
val alerts = readings.process(new TemperatureSpikeFunction)
// Apply keyed process function
val timeoutAlerts = readings
.keyBy(_.sensorId)
.process(new SensorTimeoutFunction)Processing function for ConnectedStreams with two input types.
abstract class CoProcessFunction[IN1, IN2, OUT] {
/**
* Process element from first input stream
* @param value Element from first stream
* @param ctx Context providing access to timers and side outputs
* @param out Collector for emitting output elements
*/
def processElement1(value: IN1, ctx: Context, out: Collector[OUT]): Unit
/**
* Process element from second input stream
* @param value Element from second stream
* @param ctx Context providing access to timers and side outputs
* @param out Collector for emitting output elements
*/
def processElement2(value: IN2, ctx: Context, out: Collector[OUT]): Unit
/**
* Called when a timer fires (optional override)
* @param timestamp Timestamp of the fired timer
* @param ctx OnTimerContext providing timer information
* @param out Collector for emitting output elements
*/
def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {}
abstract class Context {
def timestamp(): Long
def timerService(): TimerService
def output[X](outputTag: OutputTag[X], value: X): Unit
}
abstract class OnTimerContext extends Context {
def timeDomain(): TimeDomain
}
}Service for registering and managing timers in processing functions.
trait TimerService {
/**
* Get current processing time
* @return Current processing time in milliseconds
*/
def currentProcessingTime(): Long
/**
* Get current watermark (event time)
* @return Current watermark in milliseconds
*/
def currentWatermark(): Long
/**
* Register a processing time timer
* @param time Timer timestamp in processing time
*/
def registerProcessingTimeTimer(time: Long): Unit
/**
* Register an event time timer
* @param time Timer timestamp in event time
*/
def registerEventTimeTimer(time: Long): Unit
/**
* Delete a processing time timer
* @param time Timer timestamp to delete
*/
def deleteProcessingTimeTimer(time: Long): Unit
/**
* Delete an event time timer
* @param time Timer timestamp to delete
*/
def deleteEventTimeTimer(time: Long): Unit
}Usage Examples:
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
case class Order(id: String, customerId: String, amount: Double, timestamp: Long)
case class Payment(id: String, customerId: String, amount: Double, timestamp: Long)
case class OrderPaymentMatch(orderId: String, paymentId: String, customerId: String)
class OrderPaymentMatcher extends CoProcessFunction[Order, Payment, OrderPaymentMatch] {
override def processElement1(
order: Order,
ctx: CoProcessFunction[Order, Payment, OrderPaymentMatch]#Context,
out: Collector[OrderPaymentMatch]
): Unit = {
// Store order and set timeout timer
// This would typically use state to store pending orders
val timeoutTime = ctx.timestamp() + 300000 // 5 minutes
ctx.timerService().registerEventTimeTimer(timeoutTime)
}
override def processElement2(
payment: Payment,
ctx: CoProcessFunction[Order, Payment, OrderPaymentMatch]#Context,
out: Collector[OrderPaymentMatch]
): Unit = {
// Match payment with stored orders
// This would typically check state for matching orders
out.collect(OrderPaymentMatch("order1", payment.id, payment.customerId))
}
override def onTimer(
timestamp: Long,
ctx: CoProcessFunction[Order, Payment, OrderPaymentMatch]#OnTimerContext,
out: Collector[OrderPaymentMatch]
): Unit = {
// Handle unmatched orders (timeout case)
println(s"Order timeout at $timestamp")
}
}// Time domain enumeration
sealed trait TimeDomain
object TimeDomain {
case object EVENT_TIME extends TimeDomain
case object PROCESSING_TIME extends TimeDomain
}
// Output tag for side outputs
case class OutputTag[T: TypeInformation](id: String) {
def getTypeInfo: TypeInformation[T]
}
// Rich process function with lifecycle methods
abstract class RichProcessFunction[I, O] extends ProcessFunction[I, O] with RichFunction {
override def open(parameters: Configuration): Unit = {}
override def close(): Unit = {}
def getRuntimeContext: RuntimeContext
def setRuntimeContext(t: RuntimeContext): Unit
}
// Rich keyed process function
abstract class RichKeyedProcessFunction[K, I, O] extends KeyedProcessFunction[K, I, O] with RichFunction {
override def open(parameters: Configuration): Unit = {}
override def close(): Unit = {}
def getRuntimeContext: RuntimeContext
def setRuntimeContext(t: RuntimeContext): Unit
}
// Rich co-process function
abstract class RichCoProcessFunction[IN1, IN2, OUT] extends CoProcessFunction[IN1, IN2, OUT] with RichFunction {
override def open(parameters: Configuration): Unit = {}
override def close(): Unit = {}
def getRuntimeContext: RuntimeContext
def setRuntimeContext(t: RuntimeContext): Unit
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-12