Stateful processing capabilities for maintaining state across batches, including updateStateByKey and mapWithState operations for building stateful streaming applications.
Abstract class for managing state in mapWithState operations, providing methods to check, get, update, and remove state.
/**
* Abstract class for managing state in mapWithState operations
* @tparam S - Type of the state
*/
abstract class State[S] {
/** Check if state exists for the key */
def exists(): Boolean
/** Get the state value (throws exception if not exists) */
def get(): S
/** Get state as Option */
def getOption(): Option[S]
/** Update the state with new value */
def update(newState: S): Unit
/** Remove the state for this key */
def remove(): Unit
/** Check if this state is timing out in current batch */
def isTimingOut(): Boolean
}Specification class for configuring mapWithState operations, including mapping functions, initial state, partitioning, and timeout settings.
/**
* Specification for mapWithState operation configuration
* @tparam KeyType - Type of keys in the DStream
* @tparam ValueType - Type of values in the DStream
* @tparam StateType - Type of the state being maintained
* @tparam MappedType - Type of mapped output
*/
abstract class StateSpec[KeyType, ValueType, StateType, MappedType] {
/** Set initial state RDD */
def initialState(rdd: RDD[(KeyType, StateType)]): StateSpec[KeyType, ValueType, StateType, MappedType]
/** Set initial state from JavaPairRDD */
def initialState(rdd: JavaPairRDD[KeyType, StateType]): StateSpec[KeyType, ValueType, StateType, MappedType]
/** Set number of partitions for state */
def numPartitions(numPartitions: Int): StateSpec[KeyType, ValueType, StateType, MappedType]
/** Set custom partitioner for state */
def partitioner(partitioner: Partitioner): StateSpec[KeyType, ValueType, StateType, MappedType]
/** Set timeout duration for inactive keys */
def timeout(timeout: Duration): StateSpec[KeyType, ValueType, StateType, MappedType]
}
/**
* Factory object for creating StateSpec instances
*/
object StateSpec {
/** Create StateSpec with Scala function */
def function[KeyType, ValueType, StateType, MappedType](
mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
): StateSpec[KeyType, ValueType, StateType, MappedType]
/** Create StateSpec with Java Function3 */
def function[KeyType, ValueType, StateType, MappedType](
mappingFunction: Function3[KeyType, Optional[ValueType], State[StateType], MappedType]
): StateSpec[KeyType, ValueType, StateType, MappedType]
/** Create StateSpec with Java Function4 (includes timeout) */
def function[KeyType, ValueType, StateType, MappedType](
mappingFunction: Function4[Time, KeyType, Optional[ValueType], State[StateType], Optional[MappedType]]
): StateSpec[KeyType, ValueType, StateType, MappedType]
}Usage Examples:
// Define state mapping function
def updateFunction(key: String, value: Option[Int], state: State[Int]): Int = {
val currentState = state.getOption().getOrElse(0)
val newState = currentState + value.getOrElse(0)
state.update(newState)
newState
}
// Create StateSpec
val stateSpec = StateSpec.function(updateFunction _)
.initialState(initialStateRDD)
.numPartitions(10)
.timeout(Minutes(10))
// Apply to DStream
val stateDStream = pairDStream.mapWithState(stateSpec)Advanced stateful operations using StateSpec for efficient state management with configurable timeouts and partitioning.
/**
* Available on DStream[(K, V)] through implicit conversion
*/
class PairDStreamFunctions[K, V](self: DStream[(K, V)]) {
/**
* Map with state operation using StateSpec
* @param spec - StateSpec configuration
* @return MapWithStateDStream for further operations
*/
def mapWithState[StateType, MappedType](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType]
}
/**
* DStream returned by mapWithState operations
*/
abstract class MapWithStateDStream[KeyType, ValueType, StateType, MappedType]
extends DStream[MappedType] {
/** Get current state snapshots as DStream */
def stateSnapshots(): DStream[(KeyType, StateType)]
}Usage Examples:
// Running word count with state
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" ")).map((_, 1))
// Define state update function
def updateWordCount(word: String, count: Option[Int], state: State[Int]): Int = {
val currentCount = state.getOption().getOrElse(0)
val newCount = currentCount + count.getOrElse(0)
state.update(newCount)
newCount
}
// Configure and apply mapWithState
val wordCountsState = words.mapWithState(
StateSpec.function(updateWordCount _)
.timeout(Minutes(5))
)
// Get state snapshots
val currentCounts = wordCountsState.stateSnapshots()
currentCounts.print()Legacy stateful operations for maintaining state across batches using update functions.
/**
* Available on DStream[(K, V)] through implicit conversion
*/
class PairDStreamFunctions[K, V](self: DStream[(K, V)]) {
/**
* Update state by key using update function
* @param updateFunc - Function to update state given current values and previous state
* @return DStream of (key, state) pairs
*/
def updateStateByKey[S](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)]
/**
* Update state by key with custom partitioner
*/
def updateStateByKey[S](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
): DStream[(K, S)]
/**
* Update state by key with specified number of partitions
*/
def updateStateByKey[S](
updateFunc: (Seq[V], Option[S]) => Option[S],
numPartitions: Int
): DStream[(K, S)]
/**
* Update state by key with initial state RDD
*/
def updateStateByKey[S](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner,
initialRDD: RDD[(K, S)]
): DStream[(K, S)]
}Usage Examples:
// Simple running count
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" ")).map((_, 1))
// Update function for word counting
def updateWordCount(values: Seq[Int], state: Option[Int]): Option[Int] = {
val currentCount = state.getOrElse(0)
val newCount = currentCount + values.sum
Some(newCount)
}
// Apply updateStateByKey
val wordCounts = words.updateStateByKey(updateWordCount)
wordCounts.print()
// With custom partitioner
val wordCountsPartitioned = words.updateStateByKey(
updateWordCount,
new HashPartitioner(4)
)
// With initial state
val initialCounts: RDD[(String, Int)] = ssc.sparkContext.parallelize(
List(("hello", 10), ("world", 5))
)
val wordCountsWithInitial = words.updateStateByKey(
updateWordCount,
new HashPartitioner(4),
initialCounts
)Checkpointing configuration required for stateful operations to enable fault tolerance.
/**
* StreamingContext methods for checkpoint configuration
*/
class StreamingContext {
/** Set checkpoint directory for fault tolerance */
def checkpoint(directory: String): Unit
/** Get checkpoint directory if set */
def checkpointDir: Option[String]
}
/**
* Factory method for creating StreamingContext from checkpoint
*/
object StreamingContext {
/** Create StreamingContext from checkpoint data */
def getOrCreate(
checkpointDirectory: String,
creatingFunc: () => StreamingContext
): StreamingContext
}Usage Examples:
// Enable checkpointing for stateful operations
ssc.checkpoint("hdfs://namenode:9000/checkpoints")
// Or create fault-tolerant streaming context
def createStreamingContext(): StreamingContext = {
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("hdfs://namenode:9000/checkpoints")
// Define streaming computation with stateful operations
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" ")).map((_, 1))
val wordCounts = words.updateStateByKey(updateWordCount)
wordCounts.print()
ssc
}
// Get or create from checkpoint
val ssc = StreamingContext.getOrCreate(
"hdfs://namenode:9000/checkpoints",
createStreamingContext _
)Use mapWithState when:
Use updateStateByKey when:
// Configure state timeout to prevent memory leaks
val stateSpec = StateSpec.function(updateFunc _)
.timeout(Minutes(30)) // Remove inactive state after 30 minutes
// Control partitioning for memory distribution
val stateSpec = StateSpec.function(updateFunc _)
.numPartitions(100) // Distribute state across 100 partitions// Use custom partitioner for better locality
import org.apache.spark.HashPartitioner
val stateSpec = StateSpec.function(updateFunc _)
.partitioner(new HashPartitioner(50))
// Provide initial state to avoid cold start
val initialState: RDD[(String, Int)] = loadInitialState()
val stateSpec = StateSpec.function(updateFunc _)
.initialState(initialState)