Advanced streaming operations including windowed computations, stateful transformations, and complex data processing patterns for sophisticated streaming analytics and temporal data analysis.
Apply operations over sliding windows of data for temporal aggregations and time-based analytics.
abstract class DStream[T] {
/** Create windowed DStream with specified window and slide durations */
def window(windowDuration: Duration): DStream[T]
/** Create windowed DStream with custom slide duration */
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
/** Reduce elements over a sliding window */
def reduceByWindow(
reduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T]
/** Incremental reduce with inverse function for efficiency */
def reduceByWindow(
reduceFunc: (T, T) => T,
invReduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T]
/** Count elements in sliding window */
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
/** Count unique values in sliding window */
def countByValueAndWindow(
windowDuration: Duration,
slideDuration: Duration
): DStream[(T, Long)]
/** Count unique values with custom partitions */
def countByValueAndWindow(
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
): DStream[(T, Long)]
}Usage Examples:
import org.apache.spark.streaming._
val numbers: DStream[Int] = // stream of integers
// Basic windowing - collect data over 30 seconds, slide every 10 seconds
val windowed = numbers.window(Seconds(30), Seconds(10))
// Reduce over window - sum all numbers in window
val windowSum = numbers.reduceByWindow(_ + _, Seconds(30), Seconds(10))
// Incremental reduce for efficiency
val efficientSum = numbers.reduceByWindow(
_ + _, // reduce function
_ - _, // inverse reduce function
Seconds(30), // window duration
Seconds(10) // slide duration
)
// Count elements in window
val windowCounts = numbers.countByWindow(Seconds(30), Seconds(10))
// Window operations on text
val words: DStream[String] = ssc.socketTextStream("localhost", 9999).flatMap(_.split(" "))
val wordCounts = words.countByValueAndWindow(Seconds(30), Seconds(10))
wordCounts.print()Windowed operations specifically for key-value pairs, enabling temporal aggregations by key.
class PairDStreamFunctions[K, V] {
/** Group by key over sliding window */
def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
/** Group by key with custom slide duration */
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, Iterable[V])]
/** Group by key with custom partitioner */
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
): DStream[(K, Iterable[V])]
/** Group by key with partition count */
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
): DStream[(K, Iterable[V])]
/** Reduce by key over sliding window */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)]
/** Reduce by key with custom partitioner */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
): DStream[(K, V)]
/** Incremental reduce by key with inverse function */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)]
/** Incremental reduce with custom partitioner */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
): DStream[(K, V)]
/** Incremental reduce with partition count and filtering */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int,
filterFunc: ((K, V)) => Boolean
): DStream[(K, V)]
}Usage Examples:
val pairs: DStream[(String, Int)] = words.map(word => (word, 1))
// Group words by key over 1-minute windows
val groupedByWindow = pairs.groupByKeyAndWindow(Minutes(1), Seconds(10))
// Word count over sliding window
val wordCountsWindow = pairs.reduceByKeyAndWindow(_ + _, Minutes(1), Seconds(10))
// Efficient incremental word counting
val efficientWordCounts = pairs.reduceByKeyAndWindow(
_ + _, // add new words
_ - _, // subtract old words
Minutes(1), // window duration
Seconds(10) // slide duration
)
// Window with filtering - only words with count > 5
val filteredCounts = pairs.reduceByKeyAndWindow(
_ + _,
_ - _,
Minutes(1),
Seconds(10),
10, // numPartitions
{ case (word, count) => count > 5 }
)Maintain state across batches for sophisticated streaming computations that require memory of past events.
class PairDStreamFunctions[K, V] {
/** Update state by key using update function */
def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
/** Update state with custom partitioner */
def updateStateByKey[S](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
): DStream[(K, S)]
/** Update state with partition count */
def updateStateByKey[S](
updateFunc: (Seq[V], Option[S]) => Option[S],
numPartitions: Int
): DStream[(K, S)]
/** Update state with initial RDD */
def updateStateByKey[S](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner,
initialRDD: RDD[(K, S)]
): DStream[(K, S)]
/** Advanced stateful operations using mapWithState */
def mapWithState[StateType, MappedType](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType]
}Usage Examples:
// Running word count with updateStateByKey
val runningCounts = pairs.updateStateByKey[Int] { (values: Seq[Int], state: Option[Int]) =>
val currentCount = values.sum
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
// Session tracking example
case class Session(startTime: Long, lastSeen: Long, eventCount: Int)
val sessionUpdates = userEvents.updateStateByKey[Session] {
(events: Seq[UserEvent], session: Option[Session]) =>
if (events.nonEmpty) {
val now = System.currentTimeMillis()
session match {
case Some(s) =>
// Update existing session
Some(s.copy(lastSeen = now, eventCount = s.eventCount + events.length))
case None =>
// New session
Some(Session(now, now, events.length))
}
} else {
// No new events - check if session should timeout
session.filter(s => System.currentTimeMillis() - s.lastSeen < 300000) // 5 min timeout
}
}
// Complex state with custom partitioner
val partitioner = new HashPartitioner(8)
val statefulStream = pairs.updateStateByKey(updateFunc, partitioner)More efficient and flexible stateful operations with timeout support and better performance.
/**
* Specification for mapWithState operation
*/
class StateSpec[KeyType, ValueType, StateType, MappedType] private (
mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
) {
/** Set initial state RDD */
def initialState(rdd: RDD[(KeyType, StateType)]): StateSpec[KeyType, ValueType, StateType, MappedType]
/** Set number of partitions */
def numPartitions(numPartitions: Int): StateSpec[KeyType, ValueType, StateType, MappedType]
/** Set custom partitioner */
def partitioner(partitioner: Partitioner): StateSpec[KeyType, ValueType, StateType, MappedType]
/** Set timeout duration for idle keys */
def timeout(idleDuration: Duration): StateSpec[KeyType, ValueType, StateType, MappedType]
}
object StateSpec {
/** Create StateSpec with mapping function */
def function[KeyType, ValueType, StateType, MappedType](
mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
): StateSpec[KeyType, ValueType, StateType, MappedType]
}
/**
* State object for managing state in mapWithState operations
*/
abstract class State[S] {
/** Check if state exists */
def exists(): Boolean
/** Get current state value */
def get(): S
/** Update state with new value */
def update(newState: S): Unit
/** Remove state */
def remove(): Unit
/** Check if state is timing out */
def isTimingOut(): Boolean
}
/**
* DStream returned by mapWithState operation
*/
class MapWithStateDStream[K, V, StateType, MappedType] extends DStream[MappedType] {
/** Get snapshots of current state */
def stateSnapshots(): DStream[(K, StateType)]
}Usage Examples:
import org.apache.spark.streaming._
// Efficient session tracking with mapWithState
val trackSessions = StateSpec.function(
(userId: String, event: Option[UserEvent], state: State[Session]) => {
val currentTime = System.currentTimeMillis()
event match {
case Some(e) =>
// Update or create session
if (state.exists()) {
val session = state.get()
state.update(session.copy(lastSeen = currentTime, eventCount = session.eventCount + 1))
(userId, session.eventCount + 1) // return event count
} else {
val newSession = Session(currentTime, currentTime, 1)
state.update(newSession)
(userId, 1)
}
case None =>
// Timeout case
if (state.isTimingOut()) {
val session = state.get()
state.remove()
(userId, -1) // indicate session ended
} else {
(userId, 0) // no change
}
}
}
).timeout(Minutes(30)) // 30-minute timeout
val sessionEvents: MapWithStateDStream[String, UserEvent, Session, (String, Int)] = userEvents.mapWithState(trackSessions)
// Word count with mapWithState
val wordCountSpec = StateSpec.function(
(word: String, count: Option[Int], state: State[Int]) => {
val currentCount = count.getOrElse(0)
val previousCount = state.getOrElse(0)
val newCount = currentCount + previousCount
state.update(newCount)
(word, newCount)
}
).numPartitions(10)
val wordCounts: MapWithStateDStream[String, Int, Int, (String, Int)] = pairs.mapWithState(wordCountSpec)
// Get snapshot of current state
val currentState: DStream[(String, Int)] = wordCounts.stateSnapshots()
currentState.print()Apply arbitrary RDD transformations to DStreams for custom processing logic.
abstract class DStream[T] {
/** Transform DStream using RDD operations */
def transform[U](transformFunc: RDD[T] => RDD[U]): DStream[U]
/** Transform with access to batch time */
def transform[U](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
/** Transform with multiple DStreams */
def transformWith[U, V](
other: DStream[U],
transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V]
/** Transform with multiple DStreams and time access */
def transformWith[U, V](
other: DStream[U],
transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
): DStream[V]
}
class StreamingContext {
/** Transform multiple DStreams together */
def transform[T](transformFunc: Seq[RDD[_]] => RDD[T]): DStream[T]
}Usage Examples:
val lines: DStream[String] = ssc.textFileStream("/path/to/files")
// Custom transformation with RDD operations
val processed = lines.transform { rdd =>
// Use any RDD operation
rdd.filter(_.nonEmpty)
.map(_.split(","))
.filter(_.length >= 3)
.map(fields => (fields(0), fields(1).toInt))
.reduceByKey(_ + _)
}
// Transform with time information
val timeAware = lines.transform { (rdd, time) =>
println(s"Processing batch at time: $time")
rdd.map(line => s"$time: $line")
}
// Transform two DStreams together
val stream1: DStream[String] = // first stream
val stream2: DStream[Int] = // second stream
val combined = stream1.transformWith(stream2) { (rdd1, rdd2) =>
val count1 = rdd1.count()
val count2 = rdd2.count()
rdd1.sparkContext.parallelize(Seq(s"Stream1: $count1, Stream2: $count2"))
}
// Multi-stream transformation
val multiTransform = ssc.transform(Seq(stream1, stream2)) { rdds =>
val rdd1 = rdds(0).asInstanceOf[RDD[String]]
val rdd2 = rdds(1).asInstanceOf[RDD[Int]]
// Custom logic combining multiple RDDs
rdd1.zipWithIndex().join(rdd2.zipWithIndex()).map(_._2)
}Complex streaming patterns for sophisticated data processing scenarios.
/**
* Pattern: Deduplication over time window
*/
def deduplicateOverWindow[T](
stream: DStream[T],
windowDuration: Duration,
slideDuration: Duration
)(keyFunc: T => String): DStream[T] = {
stream.window(windowDuration, slideDuration)
.map(item => (keyFunc(item), item))
.groupByKey()
.map(_._2.head) // Take first occurrence
}
/**
* Pattern: Top-K elements by window
*/
def topKByWindow[T](
stream: DStream[T],
k: Int,
windowDuration: Duration,
slideDuration: Duration
)(implicit ord: Ordering[T]): DStream[Array[T]] = {
stream.window(windowDuration, slideDuration)
.transform(_.takeOrdered(k)(ord.reverse))
}
/**
* Pattern: Threshold-based alerting
*/
def thresholdAlert[K, V](
stream: DStream[(K, V)],
threshold: V
)(implicit ord: Ordering[V]): DStream[(K, V)] = {
stream.filter { case (_, value) => ord.gteq(value, threshold) }
}Usage Examples:
// Deduplication example
val events: DStream[Event] = // stream of events
val uniqueEvents = deduplicateOverWindow(events, Minutes(5), Seconds(30))(_.id)
// Top-K pattern
val scores: DStream[Int] = // stream of scores
val topScores = topKByWindow(scores, 10, Minutes(1), Seconds(10))
// Threshold alerting
val metrics: DStream[(String, Double)] = // stream of metrics
val alerts = thresholdAlert(metrics, 90.0) // alert when > 90
alerts.foreachRDD { rdd =>
rdd.collect().foreach { case (metric, value) =>
println(s"ALERT: $metric exceeded threshold with value $value")
}
}