DStream (Discretized Stream) is the core abstraction in Spark Streaming representing a continuous stream of data. Internally, a DStream is represented as a sequence of RDDs (Resilient Distributed Datasets), where each RDD contains data for a specific time interval.
Core transformation operations that modify each element or the structure of the DStream.
/**
* Transform each element using a function
* @param mapFunc - Function to apply to each element
* @returns New DStream with transformed elements
*/
def map[U: ClassTag](mapFunc: T => U): DStream[U]
/**
* Transform each element to zero or more elements
* @param flatMapFunc - Function returning a collection for each element
* @returns New DStream with flattened results
*/
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]
/**
* Filter elements based on a predicate
* @param filterFunc - Function returning true for elements to keep
* @returns New DStream with filtered elements
*/
def filter(filterFunc: T => Boolean): DStream[T]
/**
* Group elements within each RDD into arrays
* @returns DStream where each element becomes an array of elements
*/
def glom(): DStream[Array[T]]
/**
* Transform each partition using a function
* @param mapPartFunc - Function to transform an iterator of elements
* @param preservePartitioning - Whether to preserve the partitioner
* @returns New DStream with transformed partitions
*/
def mapPartitions[U: ClassTag](
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean = false
): DStream[U]Usage Examples:
val lines = ssc.socketTextStream("localhost", 9999)
// Map transformation
val lengths = lines.map(_.length)
// FlatMap transformation
val words = lines.flatMap(_.split(" "))
// Filter transformation
val longLines = lines.filter(_.length > 10)
// Combined transformations
val wordCounts = lines
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)More complex transformation operations including custom RDD transformations.
/**
* Transform each RDD using a custom function
* @param transformFunc - Function to transform each RDD
* @returns New DStream with transformed RDDs
*/
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
/**
* Transform each RDD with access to time information
* @param transformFunc - Function receiving RDD and time
* @returns New DStream with transformed RDDs
*/
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
/**
* Transform two DStreams together
* @param other - Another DStream to combine with
* @param transformFunc - Function to transform both RDDs together
* @returns New DStream with combined transformation
*/
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U],
transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V]
/**
* Transform two DStreams with time information
* @param other - Another DStream to combine with
* @param transformFunc - Function receiving both RDDs and time
* @returns New DStream with combined transformation
*/
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U],
transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
): DStream[V]
/**
* Repartition the DStream to specified number of partitions
* @param numPartitions - Number of partitions for output
* @returns Repartitioned DStream
*/
def repartition(numPartitions: Int): DStream[T]
/**
* Union this DStream with another DStream of same type
* @param that - DStream to union with
* @returns Combined DStream containing data from both streams
*/
def union(that: DStream[T]): DStream[T]Control how DStream data is stored and cached across operations.
/**
* Persist DStream RDDs with specified storage level
* @param level - Storage level (MEMORY_ONLY, MEMORY_AND_DISK, etc.)
* @returns This DStream for method chaining
*/
def persist(level: StorageLevel): DStream[T]
/**
* Cache DStream RDDs in memory
* @returns This DStream for method chaining
*/
def cache(): DStream[T]
/**
* Enable checkpointing for this DStream
* @param interval - Interval between checkpoints
* @returns This DStream for method chaining
*/
def checkpoint(interval: Duration): DStream[T]Operations that aggregate data within each batch.
/**
* Reduce elements in each RDD using a function
* @param reduceFunc - Associative and commutative reduce function
* @returns DStream with one element per RDD (the reduced result)
*/
def reduce(reduceFunc: (T, T) => T): DStream[T]
/**
* Count elements in each RDD
* @returns DStream of Long values representing counts
*/
def count(): DStream[Long]
/**
* Count occurrences of each unique element
* @param numPartitions - Number of partitions for the result
* @returns DStream of (element, count) pairs
*/
def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)]Time-based windowing operations for analyzing data across multiple batches.
/**
* Create windowed DStream
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval of the window (optional, defaults to batch interval)
* @returns DStream containing data from the specified window
*/
def window(windowDuration: Duration): DStream[T]
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
/**
* Count elements over a sliding window
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval of the window
* @returns DStream of window counts
*/
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
/**
* Count occurrences of each element over a sliding window
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval of the window
* @param numPartitions - Number of partitions for result (optional)
* @returns DStream of (element, count) pairs over the window
*/
def countByValueAndWindow(
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int = ssc.sc.defaultParallelism
): DStream[(T, Long)]
/**
* Reduce elements over a sliding window
* @param reduceFunc - Associative and commutative reduce function
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval of the window
* @returns DStream with reduced results over windows
*/
def reduceByWindow(
reduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T]
/**
* Reduce elements over sliding window with inverse function for efficiency
* @param reduceFunc - Associative reduce function
* @param invReduceFunc - Inverse of the reduce function
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval of the window
* @returns DStream with reduced results over windows
*/
def reduceByWindow(
reduceFunc: (T, T) => T,
invReduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T]Actions that send data to external systems or trigger computation.
/**
* Apply function to each RDD in the DStream
* @param foreachFunc - Function to apply to each RDD
*/
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
/**
* Apply function to each RDD with time information
* @param foreachFunc - Function receiving RDD and time
*/
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
/**
* Print first num elements of each RDD to console
* @param num - Number of elements to print (default 10)
*/
def print(num: Int = 10): Unit
/**
* Save DStream as text files
* @param prefix - Prefix for output file names
* @param suffix - Suffix for output file names (optional)
*/
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
/**
* Save DStream as object files (serialized)
* @param prefix - Prefix for output file names
* @param suffix - Suffix for output file names (optional)
*/
def saveAsObjectFiles(prefix: String, suffix: String = ""): UnitUsage Examples:
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
// Window operations
val windowedWords = words.window(Seconds(30), Seconds(10))
val windowedCounts = words.countByWindow(Seconds(30), Seconds(10))
// Reduce over windows
val windowedWordCount = words
.map((_, 1))
.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
// Output operations
words.foreachRDD { rdd =>
rdd.collect().foreach(println)
}
windowedWordCount.print()
windowedWordCount.saveAsTextFiles("output/wordcount")Helper methods for accessing DStream data and metadata.
/**
* Get RDDs for a specific time range
* @param fromTime - Start time (inclusive)
* @param toTime - End time (exclusive)
* @returns Sequence of RDDs in the time range
*/
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]]
/**
* Get RDDs for a specific time interval
* @param interval - Time interval to retrieve
* @returns Sequence of RDDs in the interval
*/
def slice(interval: Interval): Seq[RDD[T]]
/**
* Compute the RDD for a specific time
* @param validTime - Time for which to compute the RDD
* @returns RDD containing data for the specified time
*/
def compute(validTime: Time): Option[RDD[T]]// DStream properties
def slideDuration: Duration // Sliding interval of the DStream
def dependencies: List[DStream[_]] // Parent DStreams this DStream depends on
def generatedRDDs: HashMap[Time, RDD[T]] // Cache of generated RDDs
def rememberDuration: Duration // How long to remember RDDs
def storageLevel: StorageLevel // Storage level for persistence
def mustCheckpoint: Boolean // Whether this DStream requires checkpointing
def checkpointDuration: Duration // Checkpoint intervalAdvanced Usage:
// Custom transformation with RDD operations
val processed = lines.transform { rdd =>
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.toDF("line")
df.filter($"line".contains("ERROR"))
.select($"line")
.rdd
.map(_.getString(0))
}
// Complex windowed operations
val complexWindow = words
.map((_, 1))
.reduceByKeyAndWindow(
(a: Int, b: Int) => a + b, // Add new values
(a: Int, b: Int) => a - b, // Remove old values
Seconds(30), // Window duration
Seconds(10) // Slide duration
)
.filter(_._2 > 5) // Filter low counts
// Combining multiple streams with transformWith
val stream1 = ssc.socketTextStream("host1", 9999)
val stream2 = ssc.socketTextStream("host2", 9999)
val combined = stream1.transformWith(stream2, (rdd1: RDD[String], rdd2: RDD[String]) => {
rdd1.intersection(rdd2) // Find common elements
})