Spark SQL API module providing core SQL data types, rows, and foundational APIs for Spark SQL operations
Stateful operations for complex streaming analytics with timeout support and watermark handling. Enables sophisticated event-time processing and stateful transformations in Spark Structured Streaming.
Per-group state management for stateful streaming operations.
/**
* Per-group state for streaming operations
* @tparam S State type
*/
trait GroupState[S] extends LogicalGroupState[S] {
/** Check if state exists for this group */
def exists: Boolean
/** Get state value (throws if not exists) */
def get: S
/** Get state as Option */
def getOption: Option[S]
/** Update state with new value */
def update(newState: S): Unit
/** Remove state for this group */
def remove(): Unit
/** Check if group has timed out */
def hasTimedOut: Boolean
}Processing time and event time timeout configuration for stateful operations.
trait GroupState[S] extends LogicalGroupState[S] {
// Processing time timeouts
/** Set processing time timeout in milliseconds */
def setTimeoutDuration(durationMs: Long): Unit
/** Set processing time timeout with string duration (e.g., "10 minutes") */
def setTimeoutDuration(duration: String): Unit
// Event time timeouts
/** Set event time timeout timestamp in milliseconds */
def setTimeoutTimestamp(timestampMs: Long): Unit
/** Set event time timeout with Date */
def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
}Access to watermark and processing time information within stateful operations.
trait GroupState[S] extends LogicalGroupState[S] {
/** Get current watermark in milliseconds since epoch */
def getCurrentWatermarkMs(): Long
/** Get current processing time in milliseconds since epoch */
def getCurrentProcessingTimeMs(): Long
}Different trigger types for controlling streaming query execution timing.
// Process all available data once and stop
case object OneTimeTrigger extends Trigger
// Process all available data in multiple batches
case object AvailableNowTrigger extends Trigger
/**
* Micro-batch processing with fixed intervals
* @param intervalMs Processing interval in milliseconds
*/
case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger
object ProcessingTimeTrigger {
def apply(interval: String): ProcessingTimeTrigger
def apply(interval: java.time.Duration): ProcessingTimeTrigger
def create(interval: Long, unit: java.util.concurrent.TimeUnit): ProcessingTimeTrigger
}
/**
* Continuous processing with low latency
* @param intervalMs Checkpoint interval in milliseconds
*/
case class ContinuousTrigger(intervalMs: Long) extends Trigger
object ContinuousTrigger {
def apply(interval: String): ContinuousTrigger
def apply(interval: java.time.Duration): ContinuousTrigger
def create(interval: Long, unit: java.util.concurrent.TimeUnit): ContinuousTrigger
}Basic stateful operations:
import org.apache.spark.sql.streaming.GroupState
import org.apache.spark.sql.execution.streaming._
// Define state type
case class UserActivityState(
loginCount: Int,
lastSeenTime: Long,
totalSessionTime: Long
)
// Stateful function for mapGroupsWithState
def updateUserActivity(
userId: String,
events: Iterator[UserEvent],
state: GroupState[UserActivityState]
): UserActivityOutput = {
val currentState = if (state.exists) {
state.get
} else {
UserActivityState(0, 0L, 0L)
}
// Process events and update state
val newEvents = events.toSeq
val newLoginCount = currentState.loginCount + newEvents.count(_.eventType == "login")
val latestTime = newEvents.map(_.timestamp).maxOption.getOrElse(currentState.lastSeenTime)
val updatedState = currentState.copy(
loginCount = newLoginCount,
lastSeenTime = latestTime,
totalSessionTime = currentState.totalSessionTime + calculateSessionTime(newEvents)
)
// Update state
state.update(updatedState)
// Return output
UserActivityOutput(userId, updatedState.loginCount, updatedState.lastSeenTime)
}Timeout-based state expiration:
def updateUserActivityWithTimeout(
userId: String,
events: Iterator[UserEvent],
state: GroupState[UserActivityState]
): Option[UserActivityOutput] = {
// Handle timeout
if (state.hasTimedOut) {
val finalState = state.get
state.remove() // Clean up expired state
return Some(UserActivityOutput(userId, finalState.loginCount, finalState.lastSeenTime, expired = true))
}
val currentState = if (state.exists) {
state.get
} else {
UserActivityState(0, 0L, 0L)
}
// Process new events
val newEvents = events.toSeq
if (newEvents.nonEmpty) {
val updatedState = processEvents(currentState, newEvents)
state.update(updatedState)
// Set timeout for 1 hour of inactivity
state.setTimeoutDuration("1 hour")
Some(UserActivityOutput(userId, updatedState.loginCount, updatedState.lastSeenTime))
} else {
None // No output for this batch
}
}Event time timeout management:
def updateSessionsWithEventTime(
sessionId: String,
events: Iterator[SessionEvent],
state: GroupState[SessionState]
): Option[SessionOutput] = {
if (state.hasTimedOut) {
val expiredSession = state.get
state.remove()
return Some(SessionOutput(sessionId, expiredSession, expired = true))
}
val currentState = state.getOption.getOrElse(SessionState.empty)
val newEvents = events.toSeq.sortBy(_.eventTime)
if (newEvents.nonEmpty) {
val latestEventTime = newEvents.last.eventTime
val updatedState = processSessionEvents(currentState, newEvents)
// Set event time timeout to 30 minutes after latest event
val timeoutTimestamp = latestEventTime + (30 * 60 * 1000) // 30 minutes in ms
state.setTimeoutTimestamp(timeoutTimestamp)
state.update(updatedState)
Some(SessionOutput(sessionId, updatedState))
} else {
None
}
}Working with watermarks:
def updateWithWatermarkAwareness(
key: String,
events: Iterator[Event],
state: GroupState[EventState]
): Option[EventOutput] = {
val currentWatermark = state.getCurrentWatermarkMs()
val processingTime = state.getCurrentProcessingTimeMs()
println(s"Current watermark: $currentWatermark, Processing time: $processingTime")
// Filter out late events based on watermark
val validEvents = events.filter(_.timestamp >= currentWatermark).toSeq
if (validEvents.nonEmpty) {
val currentState = state.getOption.getOrElse(EventState.empty)
val updatedState = processValidEvents(currentState, validEvents)
state.update(updatedState)
Some(EventOutput(key, updatedState.count, updatedState.lastTimestamp))
} else {
None // All events were too late
}
}Trigger configuration examples:
import org.apache.spark.sql.execution.streaming._
import java.util.concurrent.TimeUnit
// One-time processing (batch-like)
val oneTimeTrigger = OneTimeTrigger
// Process all available data in batches
val availableNowTrigger = AvailableNowTrigger
// Micro-batch with fixed intervals
val processingTrigger1 = ProcessingTimeTrigger("30 seconds")
val processingTrigger2 = ProcessingTimeTrigger(java.time.Duration.ofMinutes(5))
val processingTrigger3 = ProcessingTimeTrigger.create(10, TimeUnit.SECONDS)
// Continuous processing (low latency)
val continuousTrigger1 = ContinuousTrigger("1 second")
val continuousTrigger2 = ContinuousTrigger(java.time.Duration.ofMillis(500))
val continuousTrigger3 = ContinuousTrigger.create(100, TimeUnit.MILLISECONDS)State lifecycle management:
def manageStateLifecycle(
groupKey: String,
values: Iterator[DataPoint],
state: GroupState[AggregationState]
): AggregationResult = {
// Initialize state if first time seeing this group
val currentState = if (state.exists) {
state.get
} else {
AggregationState.initialize()
}
val dataPoints = values.toSeq
if (dataPoints.isEmpty && currentState.isEmpty) {
// No data and no existing state - remove if exists
if (state.exists) state.remove()
AggregationResult.empty(groupKey)
} else if (dataPoints.nonEmpty) {
// Update state with new data
val updatedState = currentState.aggregate(dataPoints)
if (updatedState.shouldKeep) {
state.update(updatedState)
// Set reasonable timeout
state.setTimeoutDuration("2 hours")
} else {
// State no longer needed
state.remove()
}
AggregationResult(groupKey, updatedState.result)
} else {
// No new data, return current result
AggregationResult(groupKey, currentState.result)
}
}Error handling in stateful operations:
def robustStatefulUpdate(
key: String,
events: Iterator[Event],
state: GroupState[MyState]
): Option[Output] = {
try {
// Handle timeout first
if (state.hasTimedOut) {
handleTimeout(key, state)
} else {
processEvents(key, events, state)
}
} catch {
case ex: Exception =>
// Log error but don't fail the entire stream
logger.error(s"Error processing group $key", ex)
// Optionally reset state on error
if (state.exists) state.remove()
None // Skip output for this batch
}
}
def handleTimeout(key: String, state: GroupState[MyState]): Option[Output] = {
val finalState = state.getOption
state.remove() // Always clean up on timeout
finalState.map(s => Output(key, s.finalResult, timedOut = true))
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-sql-api-2-12