tessl install tessl/maven-org-apache-spark--spark-streaming_2-11@1.6.0Apache Spark Streaming library for processing live data streams with fault-tolerance and high throughput.
DStream (Discretized Stream) is the basic abstraction in Spark Streaming, representing a continuous sequence of RDDs. DStreams provide a comprehensive set of transformations and actions for stream processing, similar to Spark RDD operations but designed for continuous data streams.
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDDdef map[U: ClassTag](f: T => U): DStream[U]
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): DStream[U]
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservePartitioning: Boolean = false): DStream[U]map - Transform each element using the provided function.
flatMap - Transform each element and flatten the results.
mapPartitions - Transform entire partitions using the provided function.
Examples:
val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
val words = lines.flatMap(_.split(" "))
val processedPartitions = stream.mapPartitions(partition => {
// Process entire partition at once
partition.map(processElement)
})def filter(f: T => Boolean): DStream[T]
def glom(): DStream[Array[T]]
def repartition(numPartitions: Int): DStream[T]filter - Return a new DStream containing only elements that satisfy the predicate.
glom - Group all elements of each partition into arrays.
repartition - Change the number of partitions in the DStream.
Examples:
val positiveNumbers = numbers.filter(_ > 0)
val arrays = stream.glom() // Each RDD becomes Array[T]
val repartitioned = stream.repartition(4)def union(that: DStream[T]): DStream[T]
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
def transformWith[U: ClassTag, V: ClassTag](
that: DStream[U],
transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V]
def transformWith[U: ClassTag, V: ClassTag](
that: DStream[U],
transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
): DStream[V]union - Combine this DStream with another DStream of the same type.
transform - Apply arbitrary RDD-to-RDD function to each RDD in the DStream.
transformWith - Apply function that takes two RDDs and returns one RDD.
Examples:
val combined = stream1.union(stream2)
val transformed = stream.transform(rdd => {
// Apply any RDD operation
rdd.filter(_.length > 5).map(_.toUpperCase)
})
val joined = stream1.transformWith(stream2, (rdd1, rdd2) => {
rdd1.union(rdd2).distinct()
})def reduce(f: (T, T) => T): DStream[T]
def count(): DStream[Long]
def countByValue(): DStream[(T, Long)]
def countByValueAndWindow(
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int = ssc.sc.defaultParallelism
): DStream[(T, Long)]reduce - Aggregate elements using the provided associative function.
count - Count the number of elements in each RDD.
countByValue - Count occurrences of each unique value.
countByValueAndWindow - Count occurrences over a sliding window.
Examples:
val sum = numbers.reduce(_ + _)
val elementCount = stream.count()
val valueCounts = words.countByValue()
val windowCounts = words.countByValueAndWindow(Seconds(10), Seconds(2))def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): DStream[U]
def treeAggregate[U: ClassTag](
zeroValue: U,
depth: Int = 2
)(seqOp: (U, T) => U, combOp: (U, U) => U): DStream[U]aggregate - Aggregate elements using zero value and two functions.
treeAggregate - More efficient aggregation using tree reduction.
Examples:
val stats = numbers.aggregate((0, 0, Int.MaxValue, Int.MinValue))(
(acc, value) => (acc._1 + value, acc._2 + 1, math.min(acc._3, value), math.max(acc._4, value)),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2, math.min(acc1._3, acc2._3), math.max(acc1._4, acc2._4))
)def slice(interval: Interval): Seq[RDD[T]]
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]]slice(interval) - Return all RDDs in this DStream between the given interval.
slice(fromTime, toTime) - Return all RDDs between fromTime and toTime (both inclusive).
Examples:
// Get RDDs from a specific time interval
val interval = new Interval(startTime, endTime)
val rdds = stream.slice(interval)
// Get RDDs between specific times
val historicalRDDs = stream.slice(Time(1000), Time(5000))def print(): Unit
def print(num: Int): Unitprint() - Print the first 10 elements of each RDD in the DStream.
print(num) - Print the first num elements of each RDD.
Examples:
wordCounts.print() // Print first 10 elements
wordCounts.print(20) // Print first 20 elementsdef saveAsTextFiles(prefix: String, suffix: String = ""): Unit
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
prefix: String,
suffix: String = ""
)(implicit fm: ClassTag[F]): UnitsaveAsTextFiles - Save each RDD as text files with given prefix and suffix.
saveAsObjectFiles - Save each RDD as serialized object files.
saveAsHadoopFiles - Save using Hadoop OutputFormat.
Examples:
lines.saveAsTextFiles("output/text", ".txt")
stream.saveAsObjectFiles("output/objects")
pairs.saveAsHadoopFiles[TextOutputFormat[String, Int]]("output/hadoop")def foreach(foreachFunc: RDD[T] => Unit): Unit
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unitforeach / foreachRDD - Apply a function to each RDD in the DStream.
foreachRDD(with time) - Apply function with access to batch time.
Examples:
// Basic output operation
wordCounts.foreachRDD { rdd =>
val results = rdd.collect()
println(s"Batch results: ${results.mkString(", ")}")
}
// Output with batch time
wordCounts.foreachRDD { (rdd, time) =>
println(s"Batch time: $time")
rdd.foreach(println)
}
// Write to database
userActions.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val connection = createConnection()
partition.foreach { record =>
insertIntoDatabase(connection, record)
}
connection.close()
}
}def persist(): DStream.this.type
def persist(level: StorageLevel): DStream.this.type
def cache(): DStream.this.type
def unpersist(): Unitpersist() - Persist RDDs with default storage level (MEMORY_ONLY_SER).
persist(level) - Persist RDDs with specified storage level.
cache() - Cache RDDs in memory (shorthand for persist(MEMORY_ONLY)).
unpersist() - Remove cached RDDs from memory.
Examples:
val expensiveStream = lines
.map(expensiveTransformation)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
val cachedStream = frequentlyAccessedStream.cache()
// Later cleanup
expensiveStream.unpersist()def checkpoint(interval: Duration): DStream.this.typecheckpoint - Enable periodic checkpointing for fault tolerance.
Examples:
val checkpointedStream = expensiveComputations
.checkpoint(Seconds(10)) // Checkpoint every 10 seconds
val statefulStream = pairs
.updateStateByKey(updateFunction)
.checkpoint(Seconds(30)) // Required for stateful operationsdef sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): DStream[T]
def coalesce(numPartitions: Int, shuffle: Boolean = false): DStream[T]sample - Sample fraction of elements from each RDD with optional seed for reproducibility.
coalesce - Reduce number of partitions, optionally with shuffle. More efficient than repartition when reducing partitions.
Examples:
val sampledStream = largeStream.sample(false, 0.1) // 10% sample without replacement
val sampledWithSeed = largeStream.sample(false, 0.1, 42L) // Reproducible sample
val coalescedStream = stream.coalesce(2) // Reduce to 2 partitions without shuffle
val coalescedWithShuffle = stream.coalesce(2, shuffle = true) // With shuffle for better balancedef context: StreamingContext
def slideDuration: Duration
def dependencies: List[DStream[_]]context - Get the StreamingContext.
slideDuration - Get the slide duration.
dependencies - Get parent DStreams.
Examples:
val ssc = stream.context
val duration = stream.slideDuration
val parents = stream.dependenciesimport org.apache.spark.streaming.{StreamingContext, Seconds}
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
// Chain multiple transformations
val wordCounts = lines
.filter(_.nonEmpty) // Remove empty lines
.flatMap(_.split("\\s+")) // Split into words
.map(word => (word.toLowerCase, 1)) // Convert to pairs
.reduceByKey(_ + _) // Count words
wordCounts.print()val processedStream = rawStream
.map(parseJson) // Parse JSON strings
.filter(_.isDefined) // Keep valid records
.map(_.get) // Extract values
.transform { rdd => // Custom RDD transformation
rdd.map(enrichWithMetadata)
.filter(isValidRecord)
}
.cache() // Cache for multiple outputs
// Multiple outputs from same stream
processedStream.filter(_.priority == "HIGH").saveAsTextFiles("output/high")
processedStream.filter(_.priority == "LOW").saveAsTextFiles("output/low")val monitoredStream = inputStream
.transform { (rdd, time) =>
val count = rdd.count()
println(s"Batch $time: processing $count records")
if (count == 0) {
println(s"Warning: Empty batch at $time")
}
rdd
}
.map { record =>
try {
processRecord(record)
} catch {
case e: Exception =>
println(s"Error processing record: $e")
None
}
}
.filter(_.isDefined)
.map(_.get)
monitoredStream.foreachRDD { (rdd, time) =>
val successCount = rdd.count()
println(s"Successfully processed $successCount records at $time")
}val resourceManagedStream = inputStream
.mapPartitions { partition =>
// Initialize resources per partition
val dbConnection = createDatabaseConnection()
val httpClient = createHttpClient()
val results = partition.map { record =>
// Process with resources
val enriched = enrichFromDatabase(dbConnection, record)
sendToApi(httpClient, enriched)
}
// Cleanup resources
dbConnection.close()
httpClient.close()
results
}
.persist(StorageLevel.MEMORY_AND_DISK_SER) // Persist expensive computation
// Ensure cleanup when done
ssc.addStreamingListener(new StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
if (shouldCleanup(batchCompleted)) {
resourceManagedStream.unpersist()
}
}
})