CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-scala-2-12

Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.

Pending
Overview
Eval results
Files

processing-functions.mddocs/

Processing Functions

Low-level processing functions provide access to element processing with timers, state, and side outputs. These are essential for complex event-driven logic and stateful stream processing.

Capabilities

ProcessFunction

Basic processing function for DataStream operations.

abstract class ProcessFunction[I, O] {
  /**
   * Process each element from the input stream
   * @param value Input element to process
   * @param ctx Context providing access to timestamp, timers, and side outputs
   * @param out Collector for emitting output elements
   */
  def processElement(value: I, ctx: Context, out: Collector[O]): Unit
  
  /**
   * Called when a timer fires (optional override)
   * @param timestamp Timestamp of the fired timer
   * @param ctx OnTimerContext providing timer information
   * @param out Collector for emitting output elements
   */
  def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit = {}
  
  abstract class Context {
    def timestamp(): Long
    def timerService(): TimerService
    def output[X](outputTag: OutputTag[X], value: X): Unit
  }
  
  abstract class OnTimerContext extends Context {
    def timeDomain(): TimeDomain
  }
}

KeyedProcessFunction

Processing function for KeyedStream operations with access to keys.

abstract class KeyedProcessFunction[K, I, O] {
  /**
   * Process each element from the input keyed stream
   * @param value Input element to process
   * @param ctx Context providing access to key, timestamp, timers, and side outputs
   * @param out Collector for emitting output elements
   */
  def processElement(value: I, ctx: Context, out: Collector[O]): Unit
  
  /**
   * Called when a timer fires (optional override)
   * @param timestamp Timestamp of the fired timer
   * @param ctx OnTimerContext providing timer and key information
   * @param out Collector for emitting output elements
   */
  def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit = {}
  
  abstract class Context {
    def timestamp(): Long
    def getCurrentKey: K
    def timerService(): TimerService
    def output[X](outputTag: OutputTag[X], value: X): Unit
  }
  
  abstract class OnTimerContext extends Context {
    def timeDomain(): TimeDomain
  }
}

Usage Examples:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector

case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
case class Alert(sensorId: String, message: String, timestamp: Long)

// ProcessFunction example - detect temperature spikes
class TemperatureSpikeFunction extends ProcessFunction[SensorReading, Alert] {
  override def processElement(
    reading: SensorReading, 
    ctx: ProcessFunction[SensorReading, Alert]#Context, 
    out: Collector[Alert]
  ): Unit = {
    if (reading.temperature > 50.0) {
      out.collect(Alert(
        reading.sensorId, 
        s"High temperature detected: ${reading.temperature}°C", 
        ctx.timestamp()
      ))
    }
  }
}

// KeyedProcessFunction example - timeout detection
class SensorTimeoutFunction extends KeyedProcessFunction[String, SensorReading, Alert] {
  
  override def processElement(
    reading: SensorReading,
    ctx: KeyedProcessFunction[String, SensorReading, Alert]#Context,
    out: Collector[Alert]
  ): Unit = {
    // Set a timer for 60 seconds from now
    val timeoutTime = ctx.timestamp() + 60000
    ctx.timerService().registerEventTimeTimer(timeoutTime)
  }
  
  override def onTimer(
    timestamp: Long,
    ctx: KeyedProcessFunction[String, SensorReading, Alert]#OnTimerContext,
    out: Collector[Alert]
  ): Unit = {
    out.collect(Alert(
      ctx.getCurrentKey,
      s"Sensor ${ctx.getCurrentKey} has been inactive for 60 seconds",
      timestamp
    ))
  }
}

// Apply processing functions
val readings = env.fromElements(
  SensorReading("sensor1", 45.0, 1000L),
  SensorReading("sensor2", 55.0, 2000L)
)

// Apply process function
val alerts = readings.process(new TemperatureSpikeFunction)

// Apply keyed process function
val timeoutAlerts = readings
  .keyBy(_.sensorId)
  .process(new SensorTimeoutFunction)

CoProcessFunction

Processing function for ConnectedStreams with two input types.

abstract class CoProcessFunction[IN1, IN2, OUT] {
  /**
   * Process element from first input stream
   * @param value Element from first stream
   * @param ctx Context providing access to timers and side outputs
   * @param out Collector for emitting output elements
   */
  def processElement1(value: IN1, ctx: Context, out: Collector[OUT]): Unit
  
  /**
   * Process element from second input stream
   * @param value Element from second stream
   * @param ctx Context providing access to timers and side outputs
   * @param out Collector for emitting output elements
   */
  def processElement2(value: IN2, ctx: Context, out: Collector[OUT]): Unit
  
  /**
   * Called when a timer fires (optional override)
   * @param timestamp Timestamp of the fired timer
   * @param ctx OnTimerContext providing timer information
   * @param out Collector for emitting output elements
   */
  def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {}
  
  abstract class Context {
    def timestamp(): Long
    def timerService(): TimerService
    def output[X](outputTag: OutputTag[X], value: X): Unit
  }
  
  abstract class OnTimerContext extends Context {
    def timeDomain(): TimeDomain
  }
}

Timer Service

Service for registering and managing timers in processing functions.

trait TimerService {
  /**
   * Get current processing time
   * @return Current processing time in milliseconds
   */
  def currentProcessingTime(): Long
  
  /**
   * Get current watermark (event time)
   * @return Current watermark in milliseconds
   */
  def currentWatermark(): Long
  
  /**
   * Register a processing time timer
   * @param time Timer timestamp in processing time
   */
  def registerProcessingTimeTimer(time: Long): Unit
  
  /**
   * Register an event time timer
   * @param time Timer timestamp in event time
   */
  def registerEventTimeTimer(time: Long): Unit
  
  /**
   * Delete a processing time timer
   * @param time Timer timestamp to delete
   */
  def deleteProcessingTimeTimer(time: Long): Unit
  
  /**
   * Delete an event time timer
   * @param time Timer timestamp to delete
   */
  def deleteEventTimeTimer(time: Long): Unit
}

Usage Examples:

import org.apache.flink.streaming.api.functions.co.CoProcessFunction

case class Order(id: String, customerId: String, amount: Double, timestamp: Long)
case class Payment(id: String, customerId: String, amount: Double, timestamp: Long)
case class OrderPaymentMatch(orderId: String, paymentId: String, customerId: String)

class OrderPaymentMatcher extends CoProcessFunction[Order, Payment, OrderPaymentMatch] {
  
  override def processElement1(
    order: Order,
    ctx: CoProcessFunction[Order, Payment, OrderPaymentMatch]#Context,
    out: Collector[OrderPaymentMatch]
  ): Unit = {
    // Store order and set timeout timer
    // This would typically use state to store pending orders
    val timeoutTime = ctx.timestamp() + 300000 // 5 minutes
    ctx.timerService().registerEventTimeTimer(timeoutTime)
  }
  
  override def processElement2(
    payment: Payment,
    ctx: CoProcessFunction[Order, Payment, OrderPaymentMatch]#Context,
    out: Collector[OrderPaymentMatch]
  ): Unit = {
    // Match payment with stored orders
    // This would typically check state for matching orders
    out.collect(OrderPaymentMatch("order1", payment.id, payment.customerId))
  }
  
  override def onTimer(
    timestamp: Long,
    ctx: CoProcessFunction[Order, Payment, OrderPaymentMatch]#OnTimerContext,
    out: Collector[OrderPaymentMatch]
  ): Unit = {
    // Handle unmatched orders (timeout case)
    println(s"Order timeout at $timestamp")
  }
}

Types

// Time domain enumeration
sealed trait TimeDomain
object TimeDomain {
  case object EVENT_TIME extends TimeDomain
  case object PROCESSING_TIME extends TimeDomain
}

// Output tag for side outputs
case class OutputTag[T: TypeInformation](id: String) {
  def getTypeInfo: TypeInformation[T]
}

// Rich process function with lifecycle methods
abstract class RichProcessFunction[I, O] extends ProcessFunction[I, O] with RichFunction {
  override def open(parameters: Configuration): Unit = {}
  override def close(): Unit = {}
  def getRuntimeContext: RuntimeContext
  def setRuntimeContext(t: RuntimeContext): Unit
}

// Rich keyed process function
abstract class RichKeyedProcessFunction[K, I, O] extends KeyedProcessFunction[K, I, O] with RichFunction {
  override def open(parameters: Configuration): Unit = {}
  override def close(): Unit = {}
  def getRuntimeContext: RuntimeContext
  def setRuntimeContext(t: RuntimeContext): Unit
}

// Rich co-process function
abstract class RichCoProcessFunction[IN1, IN2, OUT] extends CoProcessFunction[IN1, IN2, OUT] with RichFunction {
  override def open(parameters: Configuration): Unit = {}
  override def close(): Unit = {}
  def getRuntimeContext: RuntimeContext
  def setRuntimeContext(t: RuntimeContext): Unit
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-12

docs

async-io.md

data-streams.md

execution-environment.md

index.md

keyed-streams.md

processing-functions.md

sinks-output.md

stream-connections.md

window-functions.md

windowing.md

tile.json