or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-types.mdencoders.mderror-handling.mdindex.mdrow-operations.mdstreaming-operations.mdutilities.md
tile.json

streaming-operations.mddocs/

Streaming Operations

Stateful operations for complex streaming analytics with timeout support and watermark handling. Enables sophisticated event-time processing and stateful transformations in Spark Structured Streaming.

Capabilities

Group State Management

Per-group state management for stateful streaming operations.

/**
 * Per-group state for streaming operations
 * @tparam S State type
 */
trait GroupState[S] extends LogicalGroupState[S] {
  /** Check if state exists for this group */
  def exists: Boolean
  
  /** Get state value (throws if not exists) */
  def get: S
  
  /** Get state as Option */
  def getOption: Option[S]
  
  /** Update state with new value */
  def update(newState: S): Unit
  
  /** Remove state for this group */
  def remove(): Unit
  
  /** Check if group has timed out */
  def hasTimedOut: Boolean
}

Timeout Management

Processing time and event time timeout configuration for stateful operations.

trait GroupState[S] extends LogicalGroupState[S] {
  // Processing time timeouts
  /** Set processing time timeout in milliseconds */
  def setTimeoutDuration(durationMs: Long): Unit
  
  /** Set processing time timeout with string duration (e.g., "10 minutes") */
  def setTimeoutDuration(duration: String): Unit
  
  // Event time timeouts  
  /** Set event time timeout timestamp in milliseconds */
  def setTimeoutTimestamp(timestampMs: Long): Unit
  
  /** Set event time timeout with Date */
  def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
}

Time Information Access

Access to watermark and processing time information within stateful operations.

trait GroupState[S] extends LogicalGroupState[S] {
  /** Get current watermark in milliseconds since epoch */
  def getCurrentWatermarkMs(): Long
  
  /** Get current processing time in milliseconds since epoch */
  def getCurrentProcessingTimeMs(): Long
}

Trigger Types

Different trigger types for controlling streaming query execution timing.

// Process all available data once and stop
case object OneTimeTrigger extends Trigger

// Process all available data in multiple batches
case object AvailableNowTrigger extends Trigger

/**
 * Micro-batch processing with fixed intervals
 * @param intervalMs Processing interval in milliseconds
 */
case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger

object ProcessingTimeTrigger {
  def apply(interval: String): ProcessingTimeTrigger
  def apply(interval: java.time.Duration): ProcessingTimeTrigger
  def create(interval: Long, unit: java.util.concurrent.TimeUnit): ProcessingTimeTrigger
}

/**
 * Continuous processing with low latency
 * @param intervalMs Checkpoint interval in milliseconds
 */
case class ContinuousTrigger(intervalMs: Long) extends Trigger

object ContinuousTrigger {
  def apply(interval: String): ContinuousTrigger  
  def apply(interval: java.time.Duration): ContinuousTrigger
  def create(interval: Long, unit: java.util.concurrent.TimeUnit): ContinuousTrigger
}

Usage Examples

Basic stateful operations:

import org.apache.spark.sql.streaming.GroupState
import org.apache.spark.sql.execution.streaming._

// Define state type
case class UserActivityState(
  loginCount: Int,
  lastSeenTime: Long,
  totalSessionTime: Long
)

// Stateful function for mapGroupsWithState
def updateUserActivity(
  userId: String,
  events: Iterator[UserEvent], 
  state: GroupState[UserActivityState]
): UserActivityOutput = {
  
  val currentState = if (state.exists) {
    state.get
  } else {
    UserActivityState(0, 0L, 0L)
  }
  
  // Process events and update state
  val newEvents = events.toSeq
  val newLoginCount = currentState.loginCount + newEvents.count(_.eventType == "login")
  val latestTime = newEvents.map(_.timestamp).maxOption.getOrElse(currentState.lastSeenTime)
  
  val updatedState = currentState.copy(
    loginCount = newLoginCount,
    lastSeenTime = latestTime,
    totalSessionTime = currentState.totalSessionTime + calculateSessionTime(newEvents)
  )
  
  // Update state
  state.update(updatedState)
  
  // Return output
  UserActivityOutput(userId, updatedState.loginCount, updatedState.lastSeenTime)
}

Timeout-based state expiration:

def updateUserActivityWithTimeout(
  userId: String,
  events: Iterator[UserEvent],
  state: GroupState[UserActivityState]
): Option[UserActivityOutput] = {
  
  // Handle timeout
  if (state.hasTimedOut) {
    val finalState = state.get
    state.remove() // Clean up expired state
    return Some(UserActivityOutput(userId, finalState.loginCount, finalState.lastSeenTime, expired = true))
  }
  
  val currentState = if (state.exists) {
    state.get  
  } else {
    UserActivityState(0, 0L, 0L)
  }
  
  // Process new events
  val newEvents = events.toSeq
  if (newEvents.nonEmpty) {
    val updatedState = processEvents(currentState, newEvents)
    state.update(updatedState)
    
    // Set timeout for 1 hour of inactivity
    state.setTimeoutDuration("1 hour")
    
    Some(UserActivityOutput(userId, updatedState.loginCount, updatedState.lastSeenTime))
  } else {
    None // No output for this batch
  }
}

Event time timeout management:

def updateSessionsWithEventTime(
  sessionId: String,
  events: Iterator[SessionEvent],
  state: GroupState[SessionState]
): Option[SessionOutput] = {
  
  if (state.hasTimedOut) {
    val expiredSession = state.get
    state.remove()
    return Some(SessionOutput(sessionId, expiredSession, expired = true))
  }
  
  val currentState = state.getOption.getOrElse(SessionState.empty)
  val newEvents = events.toSeq.sortBy(_.eventTime)
  
  if (newEvents.nonEmpty) {
    val latestEventTime = newEvents.last.eventTime
    val updatedState = processSessionEvents(currentState, newEvents)
    
    // Set event time timeout to 30 minutes after latest event
    val timeoutTimestamp = latestEventTime + (30 * 60 * 1000) // 30 minutes in ms
    state.setTimeoutTimestamp(timeoutTimestamp)
    
    state.update(updatedState)
    Some(SessionOutput(sessionId, updatedState))
  } else {
    None
  }
}

Working with watermarks:

def updateWithWatermarkAwareness(
  key: String,
  events: Iterator[Event],
  state: GroupState[EventState]
): Option[EventOutput] = {
  
  val currentWatermark = state.getCurrentWatermarkMs()
  val processingTime = state.getCurrentProcessingTimeMs()
  
  println(s"Current watermark: $currentWatermark, Processing time: $processingTime")
  
  // Filter out late events based on watermark
  val validEvents = events.filter(_.timestamp >= currentWatermark).toSeq
  
  if (validEvents.nonEmpty) {
    val currentState = state.getOption.getOrElse(EventState.empty)
    val updatedState = processValidEvents(currentState, validEvents)
    
    state.update(updatedState)
    Some(EventOutput(key, updatedState.count, updatedState.lastTimestamp))
  } else {
    None // All events were too late
  }
}

Trigger configuration examples:

import org.apache.spark.sql.execution.streaming._
import java.util.concurrent.TimeUnit

// One-time processing (batch-like)
val oneTimeTrigger = OneTimeTrigger

// Process all available data in batches  
val availableNowTrigger = AvailableNowTrigger

// Micro-batch with fixed intervals
val processingTrigger1 = ProcessingTimeTrigger("30 seconds")
val processingTrigger2 = ProcessingTimeTrigger(java.time.Duration.ofMinutes(5))
val processingTrigger3 = ProcessingTimeTrigger.create(10, TimeUnit.SECONDS)

// Continuous processing (low latency)
val continuousTrigger1 = ContinuousTrigger("1 second")
val continuousTrigger2 = ContinuousTrigger(java.time.Duration.ofMillis(500))
val continuousTrigger3 = ContinuousTrigger.create(100, TimeUnit.MILLISECONDS)

State lifecycle management:

def manageStateLifecycle(
  groupKey: String,
  values: Iterator[DataPoint],
  state: GroupState[AggregationState]
): AggregationResult = {
  
  // Initialize state if first time seeing this group
  val currentState = if (state.exists) {
    state.get
  } else {
    AggregationState.initialize()
  }
  
  val dataPoints = values.toSeq
  
  if (dataPoints.isEmpty && currentState.isEmpty) {
    // No data and no existing state - remove if exists
    if (state.exists) state.remove()
    AggregationResult.empty(groupKey)
  } else if (dataPoints.nonEmpty) {
    // Update state with new data
    val updatedState = currentState.aggregate(dataPoints)
    
    if (updatedState.shouldKeep) {
      state.update(updatedState)
      // Set reasonable timeout
      state.setTimeoutDuration("2 hours")
    } else {
      // State no longer needed
      state.remove()
    }
    
    AggregationResult(groupKey, updatedState.result)
  } else {
    // No new data, return current result
    AggregationResult(groupKey, currentState.result)
  }
}

Error handling in stateful operations:

def robustStatefulUpdate(
  key: String,
  events: Iterator[Event], 
  state: GroupState[MyState]
): Option[Output] = {
  
  try {
    // Handle timeout first
    if (state.hasTimedOut) {
      handleTimeout(key, state)
    } else {
      processEvents(key, events, state)
    }
  } catch {
    case ex: Exception =>
      // Log error but don't fail the entire stream
      logger.error(s"Error processing group $key", ex)
      
      // Optionally reset state on error
      if (state.exists) state.remove()
      
      None // Skip output for this batch
  }
}

def handleTimeout(key: String, state: GroupState[MyState]): Option[Output] = {
  val finalState = state.getOption
  state.remove() // Always clean up on timeout
  
  finalState.map(s => Output(key, s.finalResult, timedOut = true))
}