tessl install tessl/maven-org-apache-spark--spark-streaming_2-11@1.6.0Apache Spark Streaming library for processing live data streams with fault-tolerance and high throughput.
The StreamingContext is the main entry point for all Spark Streaming functionality. It provides methods to create DStreams from various input sources, manages the streaming application lifecycle, and coordinates the execution of streaming computations.
import org.apache.spark.streaming.{StreamingContext, Duration, Seconds}
import org.apache.spark.{SparkConf, SparkContext}class StreamingContext(sparkContext: SparkContext, batchDuration: Duration)Create a StreamingContext using an existing SparkContext.
Parameters:
sparkContext - Existing SparkContext to usebatchDuration - Time interval at which streaming data will be divided into batchesExample:
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))class StreamingContext(conf: SparkConf, batchDuration: Duration)Create a StreamingContext by providing configuration for a new SparkContext.
Parameters:
conf - SparkConf object specifying Spark parametersbatchDuration - Time interval for batch processingExample:
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))class StreamingContext(
master: String,
appName: String,
batchDuration: Duration,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map()
)Create a StreamingContext by providing connection details for a new SparkContext.
Parameters:
master - Cluster URL to connect to (e.g. "local[2]", "spark://host:port")appName - Name for the application, displayed in cluster web UIbatchDuration - Time interval for batch processingsparkHome - SPARK_HOME directory on slave nodes (optional)jars - JAR files to distribute to cluster (optional)environment - Environment variables for worker nodes (optional)Example:
val ssc = new StreamingContext("local[2]", "MyStreamingApp", Seconds(1))def start(): Unit
def stop(stopSparkContext: Boolean = true, stopGracefully: Boolean = false): Unit
def awaitTermination(): Unit
def awaitTermination(timeout: Long): Unitstart() - Start the execution of streams. Must be called after all transformations and output operations are defined.
stop() - Stop the execution of streams.
stopSparkContext - Whether to stop the underlying SparkContext (default: true)stopGracefully - Whether to stop gracefully by waiting for processing of all received data (default: false)awaitTermination() - Wait for the execution to stop. Blocks the current thread.
awaitTermination(timeout) - Wait for execution to stop with timeout in milliseconds.
Example:
ssc.start()
ssc.awaitTermination() // Wait until killed manuallydef sparkContext: SparkContext
def remember(duration: Duration): Unit
def checkpoint(directory: String): UnitsparkContext - Get the associated SparkContext.
remember(duration) - Set the duration for which DStreams will remember each RDD created. This must be set before any input streams are created.
checkpoint(directory) - Set the checkpoint directory for fault tolerance. Required for stateful operations like updateStateByKey.
Example:
ssc.checkpoint("hdfs://checkpoints/my-app")
ssc.remember(Minutes(2))def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T]
def rawSocketStream[T: ClassTag](
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[T]socketTextStream - Create input stream from TCP source that reads text data.
socketStream - Create input stream from TCP source with custom converter function.
rawSocketStream - Create input stream from TCP source for binary data.
Example:
val lines = ssc.socketTextStream("localhost", 9999)
val binaryData = ssc.rawSocketStream[ByteBuffer]("localhost", 8888)def textFileStream(directory: String): DStream[String]
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String,
kClass: Class[K],
vClass: Class[V],
fClass: Class[F]
): InputDStream[(K, V)]
def binaryRecordsStream(
directory: String,
recordLength: Int
): DStream[Array[Byte]]textFileStream - Monitor directory for new text files and read them as a stream.
fileStream - Monitor directory for new files using Hadoop InputFormat.
binaryRecordsStream - Monitor directory for new binary files with fixed record length.
Example:
val textFiles = ssc.textFileStream("hdfs://data/input")
val csvFiles = ssc.fileStream[LongWritable, Text, TextInputFormat]("/data/csv")def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true,
defaultRDD: RDD[T] = null
): InputDStream[T]
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]
def actorStream[T: ClassTag](
props: Props,
name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
): ReceiverInputDStream[T]queueStream - Create input stream from a queue of RDDs, useful for testing.
receiverStream - Create input stream using a custom Receiver.
actorStream - Create input stream using an Akka Actor as the receiver.
Example:
val rddQueue = new Queue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
val customReceiver = new MyCustomReceiver()
val receiverStream = ssc.receiverStream(customReceiver)def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]Create a unified DStream from multiple input streams of the same type.
Example:
val stream1 = ssc.socketTextStream("host1", 9999)
val stream2 = ssc.socketTextStream("host2", 9999)
val combinedStream = ssc.union(Seq(stream1, stream2))object StreamingContext {
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext
def getActive(): Option[StreamingContext]
def getActiveOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext
}getOrCreate - Get StreamingContext from checkpoint or create new one if checkpoint doesn't exist.
getActive - Get the currently active StreamingContext, if any.
getActiveOrCreate - Get active StreamingContext or create from checkpoint/function.
Example:
def createContext(): StreamingContext = {
val conf = new SparkConf().setAppName("RecoverableApp")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("hdfs://checkpoints")
// Define your streaming computation
ssc
}
val ssc = StreamingContext.getOrCreate("hdfs://checkpoints", createContext _)import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.SparkConf
// Create and configure StreamingContext
val conf = new SparkConf()
.setAppName("MyStreamingApp")
.setMaster("local[2]")
.set("spark.streaming.blockInterval", "200ms")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("checkpoints")
// Create input streams
val lines = ssc.socketTextStream("localhost", 9999)
// Define transformations
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// Define output
wordCounts.print()
// Start and wait
ssc.start()
ssc.awaitTermination()import org.apache.spark.streaming.{StreamingContext, Seconds}
def createStreamingContext(): StreamingContext = {
val conf = new SparkConf().setAppName("FaultTolerantApp")
val ssc = new StreamingContext(conf, Seconds(1))
// Set checkpoint directory
ssc.checkpoint("hdfs://checkpoints/fault-tolerant")
// Define your streaming computation
val lines = ssc.socketTextStream("localhost", 9999)
val wordCounts = lines
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
wordCounts.print()
ssc
}
// Recover from checkpoint or create new
val ssc = StreamingContext.getOrCreate(
"hdfs://checkpoints/fault-tolerant",
createStreamingContext _
)
ssc.start()
ssc.awaitTermination()val ssc = new StreamingContext(conf, Seconds(1))
// Multiple input sources
val kafkaStream = createKafkaStream(ssc) // Custom Kafka stream
val socketStream = ssc.socketTextStream("localhost", 9999)
val fileStream = ssc.textFileStream("hdfs://input")
// Union multiple streams
val allStreams = ssc.union(Seq(kafkaStream, socketStream, fileStream))
// Process unified stream
val processed = allStreams
.filter(_.nonEmpty)
.map(processMessage)
processed.print()
ssc.start()
ssc.awaitTermination()Key Spark Streaming configuration parameters that can be set in SparkConf:
spark.streaming.blockInterval - Interval for receiver block generation (default: 200ms)spark.streaming.receiver.maxRate - Maximum records per second per receiver (default: unlimited)spark.streaming.backpressure.enabled - Enable backpressure mechanism (default: false)spark.streaming.kafka.maxRatePerPartition - Max records per second per Kafka partitionspark.streaming.stopGracefullyOnShutdown - Enable graceful shutdown (default: false)spark.streaming.unpersist - Force unpersisting of old RDDs (default: true)Example:
val conf = new SparkConf()
.setAppName("ConfiguredApp")
.set("spark.streaming.blockInterval", "100ms")
.set("spark.streaming.receiver.maxRate", "1000")
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.streaming.stopGracefullyOnShutdown", "true")