Stateful operations for complex streaming analytics with timeout support and watermark handling. Enables sophisticated event-time processing and stateful transformations in Spark Structured Streaming.
Per-group state management for stateful streaming operations.
/**
* Per-group state for streaming operations
* @tparam S State type
*/
trait GroupState[S] extends LogicalGroupState[S] {
/** Check if state exists for this group */
def exists: Boolean
/** Get state value (throws if not exists) */
def get: S
/** Get state as Option */
def getOption: Option[S]
/** Update state with new value */
def update(newState: S): Unit
/** Remove state for this group */
def remove(): Unit
/** Check if group has timed out */
def hasTimedOut: Boolean
}Processing time and event time timeout configuration for stateful operations.
trait GroupState[S] extends LogicalGroupState[S] {
// Processing time timeouts
/** Set processing time timeout in milliseconds */
def setTimeoutDuration(durationMs: Long): Unit
/** Set processing time timeout with string duration (e.g., "10 minutes") */
def setTimeoutDuration(duration: String): Unit
// Event time timeouts
/** Set event time timeout timestamp in milliseconds */
def setTimeoutTimestamp(timestampMs: Long): Unit
/** Set event time timeout with Date */
def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
}Access to watermark and processing time information within stateful operations.
trait GroupState[S] extends LogicalGroupState[S] {
/** Get current watermark in milliseconds since epoch */
def getCurrentWatermarkMs(): Long
/** Get current processing time in milliseconds since epoch */
def getCurrentProcessingTimeMs(): Long
}Different trigger types for controlling streaming query execution timing.
// Process all available data once and stop
case object OneTimeTrigger extends Trigger
// Process all available data in multiple batches
case object AvailableNowTrigger extends Trigger
/**
* Micro-batch processing with fixed intervals
* @param intervalMs Processing interval in milliseconds
*/
case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger
object ProcessingTimeTrigger {
def apply(interval: String): ProcessingTimeTrigger
def apply(interval: java.time.Duration): ProcessingTimeTrigger
def create(interval: Long, unit: java.util.concurrent.TimeUnit): ProcessingTimeTrigger
}
/**
* Continuous processing with low latency
* @param intervalMs Checkpoint interval in milliseconds
*/
case class ContinuousTrigger(intervalMs: Long) extends Trigger
object ContinuousTrigger {
def apply(interval: String): ContinuousTrigger
def apply(interval: java.time.Duration): ContinuousTrigger
def create(interval: Long, unit: java.util.concurrent.TimeUnit): ContinuousTrigger
}Basic stateful operations:
import org.apache.spark.sql.streaming.GroupState
import org.apache.spark.sql.execution.streaming._
// Define state type
case class UserActivityState(
loginCount: Int,
lastSeenTime: Long,
totalSessionTime: Long
)
// Stateful function for mapGroupsWithState
def updateUserActivity(
userId: String,
events: Iterator[UserEvent],
state: GroupState[UserActivityState]
): UserActivityOutput = {
val currentState = if (state.exists) {
state.get
} else {
UserActivityState(0, 0L, 0L)
}
// Process events and update state
val newEvents = events.toSeq
val newLoginCount = currentState.loginCount + newEvents.count(_.eventType == "login")
val latestTime = newEvents.map(_.timestamp).maxOption.getOrElse(currentState.lastSeenTime)
val updatedState = currentState.copy(
loginCount = newLoginCount,
lastSeenTime = latestTime,
totalSessionTime = currentState.totalSessionTime + calculateSessionTime(newEvents)
)
// Update state
state.update(updatedState)
// Return output
UserActivityOutput(userId, updatedState.loginCount, updatedState.lastSeenTime)
}Timeout-based state expiration:
def updateUserActivityWithTimeout(
userId: String,
events: Iterator[UserEvent],
state: GroupState[UserActivityState]
): Option[UserActivityOutput] = {
// Handle timeout
if (state.hasTimedOut) {
val finalState = state.get
state.remove() // Clean up expired state
return Some(UserActivityOutput(userId, finalState.loginCount, finalState.lastSeenTime, expired = true))
}
val currentState = if (state.exists) {
state.get
} else {
UserActivityState(0, 0L, 0L)
}
// Process new events
val newEvents = events.toSeq
if (newEvents.nonEmpty) {
val updatedState = processEvents(currentState, newEvents)
state.update(updatedState)
// Set timeout for 1 hour of inactivity
state.setTimeoutDuration("1 hour")
Some(UserActivityOutput(userId, updatedState.loginCount, updatedState.lastSeenTime))
} else {
None // No output for this batch
}
}Event time timeout management:
def updateSessionsWithEventTime(
sessionId: String,
events: Iterator[SessionEvent],
state: GroupState[SessionState]
): Option[SessionOutput] = {
if (state.hasTimedOut) {
val expiredSession = state.get
state.remove()
return Some(SessionOutput(sessionId, expiredSession, expired = true))
}
val currentState = state.getOption.getOrElse(SessionState.empty)
val newEvents = events.toSeq.sortBy(_.eventTime)
if (newEvents.nonEmpty) {
val latestEventTime = newEvents.last.eventTime
val updatedState = processSessionEvents(currentState, newEvents)
// Set event time timeout to 30 minutes after latest event
val timeoutTimestamp = latestEventTime + (30 * 60 * 1000) // 30 minutes in ms
state.setTimeoutTimestamp(timeoutTimestamp)
state.update(updatedState)
Some(SessionOutput(sessionId, updatedState))
} else {
None
}
}Working with watermarks:
def updateWithWatermarkAwareness(
key: String,
events: Iterator[Event],
state: GroupState[EventState]
): Option[EventOutput] = {
val currentWatermark = state.getCurrentWatermarkMs()
val processingTime = state.getCurrentProcessingTimeMs()
println(s"Current watermark: $currentWatermark, Processing time: $processingTime")
// Filter out late events based on watermark
val validEvents = events.filter(_.timestamp >= currentWatermark).toSeq
if (validEvents.nonEmpty) {
val currentState = state.getOption.getOrElse(EventState.empty)
val updatedState = processValidEvents(currentState, validEvents)
state.update(updatedState)
Some(EventOutput(key, updatedState.count, updatedState.lastTimestamp))
} else {
None // All events were too late
}
}Trigger configuration examples:
import org.apache.spark.sql.execution.streaming._
import java.util.concurrent.TimeUnit
// One-time processing (batch-like)
val oneTimeTrigger = OneTimeTrigger
// Process all available data in batches
val availableNowTrigger = AvailableNowTrigger
// Micro-batch with fixed intervals
val processingTrigger1 = ProcessingTimeTrigger("30 seconds")
val processingTrigger2 = ProcessingTimeTrigger(java.time.Duration.ofMinutes(5))
val processingTrigger3 = ProcessingTimeTrigger.create(10, TimeUnit.SECONDS)
// Continuous processing (low latency)
val continuousTrigger1 = ContinuousTrigger("1 second")
val continuousTrigger2 = ContinuousTrigger(java.time.Duration.ofMillis(500))
val continuousTrigger3 = ContinuousTrigger.create(100, TimeUnit.MILLISECONDS)State lifecycle management:
def manageStateLifecycle(
groupKey: String,
values: Iterator[DataPoint],
state: GroupState[AggregationState]
): AggregationResult = {
// Initialize state if first time seeing this group
val currentState = if (state.exists) {
state.get
} else {
AggregationState.initialize()
}
val dataPoints = values.toSeq
if (dataPoints.isEmpty && currentState.isEmpty) {
// No data and no existing state - remove if exists
if (state.exists) state.remove()
AggregationResult.empty(groupKey)
} else if (dataPoints.nonEmpty) {
// Update state with new data
val updatedState = currentState.aggregate(dataPoints)
if (updatedState.shouldKeep) {
state.update(updatedState)
// Set reasonable timeout
state.setTimeoutDuration("2 hours")
} else {
// State no longer needed
state.remove()
}
AggregationResult(groupKey, updatedState.result)
} else {
// No new data, return current result
AggregationResult(groupKey, currentState.result)
}
}Error handling in stateful operations:
def robustStatefulUpdate(
key: String,
events: Iterator[Event],
state: GroupState[MyState]
): Option[Output] = {
try {
// Handle timeout first
if (state.hasTimedOut) {
handleTimeout(key, state)
} else {
processEvents(key, events, state)
}
} catch {
case ex: Exception =>
// Log error but don't fail the entire stream
logger.error(s"Error processing group $key", ex)
// Optionally reset state on error
if (state.exists) state.remove()
None // Skip output for this batch
}
}
def handleTimeout(key: String, state: GroupState[MyState]): Option[Output] = {
val finalState = state.getOption
state.remove() // Always clean up on timeout
finalState.map(s => Output(key, s.finalResult, timedOut = true))
}