or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming_2.11@1.6.x

docs

dstream-operations.mdindex.mdinput-sources.mdjava-api.mdmonitoring-listeners.mdpaired-dstream-operations.mdreceiver-framework.mdstreaming-context.mdutility-classes.mdwindow-operations.md
tile.json

tessl/maven-org-apache-spark--spark-streaming_2-11

tessl install tessl/maven-org-apache-spark--spark-streaming_2-11@1.6.0

Apache Spark Streaming library for processing live data streams with fault-tolerance and high throughput.

dstream-operations.mddocs/

DStream Operations

DStream (Discretized Stream) is the basic abstraction in Spark Streaming, representing a continuous sequence of RDDs. DStreams provide a comprehensive set of transformations and actions for stream processing, similar to Spark RDD operations but designed for continuous data streams.

Core Imports

import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD

Basic Transformations

Map Operations

def map[U: ClassTag](f: T => U): DStream[U]
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): DStream[U]
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservePartitioning: Boolean = false): DStream[U]

map - Transform each element using the provided function.

flatMap - Transform each element and flatten the results.

mapPartitions - Transform entire partitions using the provided function.

Examples:

val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
val words = lines.flatMap(_.split(" "))
val processedPartitions = stream.mapPartitions(partition => {
  // Process entire partition at once
  partition.map(processElement)
})

Filter and Selection

def filter(f: T => Boolean): DStream[T]
def glom(): DStream[Array[T]]
def repartition(numPartitions: Int): DStream[T]

filter - Return a new DStream containing only elements that satisfy the predicate.

glom - Group all elements of each partition into arrays.

repartition - Change the number of partitions in the DStream.

Examples:

val positiveNumbers = numbers.filter(_ > 0)
val arrays = stream.glom() // Each RDD becomes Array[T]
val repartitioned = stream.repartition(4)

Combining Streams

def union(that: DStream[T]): DStream[T]
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
def transformWith[U: ClassTag, V: ClassTag](
  that: DStream[U], 
  transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V]
def transformWith[U: ClassTag, V: ClassTag](
  that: DStream[U], 
  transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
): DStream[V]

union - Combine this DStream with another DStream of the same type.

transform - Apply arbitrary RDD-to-RDD function to each RDD in the DStream.

transformWith - Apply function that takes two RDDs and returns one RDD.

Examples:

val combined = stream1.union(stream2)

val transformed = stream.transform(rdd => {
  // Apply any RDD operation
  rdd.filter(_.length > 5).map(_.toUpperCase)
})

val joined = stream1.transformWith(stream2, (rdd1, rdd2) => {
  rdd1.union(rdd2).distinct()
})

Reduction Operations

Aggregations

def reduce(f: (T, T) => T): DStream[T]
def count(): DStream[Long]
def countByValue(): DStream[(T, Long)]
def countByValueAndWindow(
  windowDuration: Duration, 
  slideDuration: Duration, 
  numPartitions: Int = ssc.sc.defaultParallelism
): DStream[(T, Long)]

reduce - Aggregate elements using the provided associative function.

count - Count the number of elements in each RDD.

countByValue - Count occurrences of each unique value.

countByValueAndWindow - Count occurrences over a sliding window.

Examples:

val sum = numbers.reduce(_ + _)
val elementCount = stream.count()
val valueCounts = words.countByValue()
val windowCounts = words.countByValueAndWindow(Seconds(10), Seconds(2))

Statistical Operations

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): DStream[U]
def treeAggregate[U: ClassTag](
  zeroValue: U, 
  depth: Int = 2
)(seqOp: (U, T) => U, combOp: (U, U) => U): DStream[U]

aggregate - Aggregate elements using zero value and two functions.

treeAggregate - More efficient aggregation using tree reduction.

Examples:

val stats = numbers.aggregate((0, 0, Int.MaxValue, Int.MinValue))(
  (acc, value) => (acc._1 + value, acc._2 + 1, math.min(acc._3, value), math.max(acc._4, value)),
  (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2, math.min(acc1._3, acc2._3), math.max(acc1._4, acc2._4))
)

Slice Operations

def slice(interval: Interval): Seq[RDD[T]]
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]]

slice(interval) - Return all RDDs in this DStream between the given interval.

slice(fromTime, toTime) - Return all RDDs between fromTime and toTime (both inclusive).

Examples:

// Get RDDs from a specific time interval
val interval = new Interval(startTime, endTime)
val rdds = stream.slice(interval)

// Get RDDs between specific times
val historicalRDDs = stream.slice(Time(1000), Time(5000))

Output Operations

Print and Debug

def print(): Unit
def print(num: Int): Unit

print() - Print the first 10 elements of each RDD in the DStream.

print(num) - Print the first num elements of each RDD.

Examples:

wordCounts.print() // Print first 10 elements
wordCounts.print(20) // Print first 20 elements

File Output

def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
  prefix: String, 
  suffix: String = ""
)(implicit fm: ClassTag[F]): Unit

saveAsTextFiles - Save each RDD as text files with given prefix and suffix.

saveAsObjectFiles - Save each RDD as serialized object files.

saveAsHadoopFiles - Save using Hadoop OutputFormat.

Examples:

lines.saveAsTextFiles("output/text", ".txt")
stream.saveAsObjectFiles("output/objects")
pairs.saveAsHadoopFiles[TextOutputFormat[String, Int]]("output/hadoop")

Custom Output

def foreach(foreachFunc: RDD[T] => Unit): Unit
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit

foreach / foreachRDD - Apply a function to each RDD in the DStream.

foreachRDD(with time) - Apply function with access to batch time.

Examples:

// Basic output operation
wordCounts.foreachRDD { rdd =>
  val results = rdd.collect()
  println(s"Batch results: ${results.mkString(", ")}")
}

// Output with batch time
wordCounts.foreachRDD { (rdd, time) =>
  println(s"Batch time: $time")
  rdd.foreach(println)
}

// Write to database
userActions.foreachRDD { rdd =>
  rdd.foreachPartition { partition =>
    val connection = createConnection()
    partition.foreach { record =>
      insertIntoDatabase(connection, record)
    }
    connection.close()
  }
}

Caching and Persistence

Storage Control

def persist(): DStream.this.type
def persist(level: StorageLevel): DStream.this.type  
def cache(): DStream.this.type
def unpersist(): Unit

persist() - Persist RDDs with default storage level (MEMORY_ONLY_SER).

persist(level) - Persist RDDs with specified storage level.

cache() - Cache RDDs in memory (shorthand for persist(MEMORY_ONLY)).

unpersist() - Remove cached RDDs from memory.

Examples:

val expensiveStream = lines
  .map(expensiveTransformation)
  .persist(StorageLevel.MEMORY_AND_DISK_SER)

val cachedStream = frequentlyAccessedStream.cache()

// Later cleanup
expensiveStream.unpersist()

Checkpointing

Fault Tolerance

def checkpoint(interval: Duration): DStream.this.type

checkpoint - Enable periodic checkpointing for fault tolerance.

Examples:

val checkpointedStream = expensiveComputations
  .checkpoint(Seconds(10)) // Checkpoint every 10 seconds

val statefulStream = pairs
  .updateStateByKey(updateFunction)
  .checkpoint(Seconds(30)) // Required for stateful operations

Advanced Operations

Sampling and Partitioning

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): DStream[T]
def coalesce(numPartitions: Int, shuffle: Boolean = false): DStream[T]

sample - Sample fraction of elements from each RDD with optional seed for reproducibility.

coalesce - Reduce number of partitions, optionally with shuffle. More efficient than repartition when reducing partitions.

Examples:

val sampledStream = largeStream.sample(false, 0.1) // 10% sample without replacement
val sampledWithSeed = largeStream.sample(false, 0.1, 42L) // Reproducible sample
val coalescedStream = stream.coalesce(2) // Reduce to 2 partitions without shuffle
val coalescedWithShuffle = stream.coalesce(2, shuffle = true) // With shuffle for better balance

Stream Properties

def context: StreamingContext
def slideDuration: Duration
def dependencies: List[DStream[_]]

context - Get the StreamingContext.

slideDuration - Get the slide duration.

dependencies - Get parent DStreams.

Examples:

val ssc = stream.context
val duration = stream.slideDuration
val parents = stream.dependencies

Usage Examples

Basic Stream Processing

import org.apache.spark.streaming.{StreamingContext, Seconds}

val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)

// Chain multiple transformations  
val wordCounts = lines
  .filter(_.nonEmpty)                    // Remove empty lines
  .flatMap(_.split("\\s+"))             // Split into words
  .map(word => (word.toLowerCase, 1))    // Convert to pairs
  .reduceByKey(_ + _)                    // Count words

wordCounts.print()

Stream with Custom Processing

val processedStream = rawStream
  .map(parseJson)                        // Parse JSON strings
  .filter(_.isDefined)                   // Keep valid records
  .map(_.get)                           // Extract values
  .transform { rdd =>                    // Custom RDD transformation
    rdd.map(enrichWithMetadata)
       .filter(isValidRecord)
  }
  .cache()                              // Cache for multiple outputs

// Multiple outputs from same stream
processedStream.filter(_.priority == "HIGH").saveAsTextFiles("output/high")
processedStream.filter(_.priority == "LOW").saveAsTextFiles("output/low")

Error Handling and Monitoring

val monitoredStream = inputStream
  .transform { (rdd, time) =>
    val count = rdd.count()
    println(s"Batch $time: processing $count records")
    
    if (count == 0) {
      println(s"Warning: Empty batch at $time")
    }
    
    rdd
  }
  .map { record =>
    try {
      processRecord(record)
    } catch {
      case e: Exception =>
        println(s"Error processing record: $e")
        None
    }
  }
  .filter(_.isDefined)
  .map(_.get)

monitoredStream.foreachRDD { (rdd, time) =>
  val successCount = rdd.count()
  println(s"Successfully processed $successCount records at $time")
}

Resource Management

val resourceManagedStream = inputStream
  .mapPartitions { partition =>
    // Initialize resources per partition
    val dbConnection = createDatabaseConnection()
    val httpClient = createHttpClient()
    
    val results = partition.map { record =>
      // Process with resources
      val enriched = enrichFromDatabase(dbConnection, record)
      sendToApi(httpClient, enriched)
    }
    
    // Cleanup resources
    dbConnection.close()
    httpClient.close()
    
    results
  }
  .persist(StorageLevel.MEMORY_AND_DISK_SER) // Persist expensive computation

// Ensure cleanup when done
ssc.addStreamingListener(new StreamingListener {
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    if (shouldCleanup(batchCompleted)) {
      resourceManagedStream.unpersist()
    }
  }
})