PySpark Streaming module that enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core functionality for creating and managing Spark Streaming contexts, controlling application lifecycle, and basic DStream operations.
Create StreamingContext with existing SparkContext:
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration)Create StreamingContext with Spark configuration:
class StreamingContext(conf: SparkConf, batchDuration: Duration)Create StreamingContext with master URL and app name:
class StreamingContext(
master: String,
appName: String,
batchDuration: Duration,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map()
)Create StreamingContext from checkpoint:
class StreamingContext(path: String)
class StreamingContext(path: String, hadoopConf: Configuration)
class StreamingContext(path: String, sparkContext: SparkContext)Example checkpoint recovery:
val checkpointDir = "hdfs://checkpoint-dir"
def createStreamingContext(): StreamingContext = {
val conf = new SparkConf().setAppName("MyApp")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint(checkpointDir)
// Define your streams and transformations here
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)Start streaming computation:
def start(): UnitStop streaming computation:
def stop(stopSparkContext: Boolean = true): Unit
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): UnitWait for termination:
def awaitTermination(): Unit
def awaitTerminationOrTimeout(timeout: Long): BooleanExample lifecycle management:
val ssc = new StreamingContext(conf, Seconds(1))
// Define your streams and transformations
val lines = ssc.socketTextStream("localhost", 9999)
val wordCounts = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
wordCounts.print()
// Start the streaming context
ssc.start()
// Add shutdown hook for graceful termination
sys.ShutdownHookThread {
println("Gracefully stopping Spark Streaming Application")
ssc.stop(true, true)
println("Application stopped")
}
// Wait for termination
ssc.awaitTermination()Get current context state:
def getState(): StreamingContextStateStreamingContextState values:
INITIALIZED - Context created but not startedACTIVE - Context started and runningSTOPPED - Context stoppedAccess underlying SparkContext:
def sparkContext: SparkContextSet checkpoint directory:
def checkpoint(directory: String): UnitSet remember duration for DStreams:
def remember(duration: Duration): UnitExample checkpoint setup:
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("hdfs://namenode:9000/checkpoint")
ssc.remember(Minutes(2)) // Remember last 2 minutes of dataGet currently active StreamingContext:
def getActive(): Option[StreamingContext]Get or create StreamingContext:
def getActiveOrCreate(creatingFunc: () => StreamingContext): StreamingContextGet or create from checkpoint:
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContextExample singleton management:
object StreamingApp {
def createContext(): StreamingContext = {
val conf = new SparkConf().setAppName("MyStreamingApp")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("checkpoint-dir")
// Define streaming logic
ssc
}
def main(args: Array[String]): Unit = {
// This will create new context or recover from checkpoint
val ssc = StreamingContext.getOrCreate("checkpoint-dir", createContext _)
ssc.start()
ssc.awaitTermination()
}
}Union multiple DStreams:
def union[T](streams: Seq[DStream[T]]): DStream[T]Example union:
val stream1 = ssc.socketTextStream("localhost", 9999)
val stream2 = ssc.socketTextStream("localhost", 9998)
val stream3 = ssc.textFileStream("/data/input")
val unionStream = ssc.union(Seq(stream1, stream2, stream3))Transform multiple DStreams together:
def transform[T](
dstreams: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T]Example multi-stream transform:
val stream1 = ssc.socketTextStream("localhost", 9999)
val stream2 = ssc.socketTextStream("localhost", 9998)
val combinedStream = ssc.transform(Seq(stream1, stream2)) { (rdds, time) =>
val rdd1 = rdds(0).asInstanceOf[RDD[String]]
val rdd2 = rdds(1).asInstanceOf[RDD[String]]
// Custom transformation logic
rdd1.union(rdd2).filter(_.length > 5)
}Add streaming listener:
def addStreamingListener(streamingListener: StreamingListener): UnitRemove streaming listener:
def removeStreamingListener(streamingListener: StreamingListener): UnitExample custom listener:
class MyStreamingListener extends StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val batchInfo = batchCompleted.batchInfo
println(s"Batch ${batchInfo.batchTime} completed in ${batchInfo.processingDelay}ms")
}
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
println(s"Receiver error: ${receiverError.receiverInfo.lastErrorMessage}")
}
}
ssc.addStreamingListener(new MyStreamingListener())Get JAR file for a class:
def jarOfClass(cls: Class[_]): Option[String]Example usage:
val jarPath = StreamingContext.jarOfClass(classOf[MyCustomClass])Access the underlying Spark context:
val sc = ssc.sparkContext
val appName = sc.appName
val mastser = sc.masterCreate duration objects:
object Milliseconds {
def apply(milliseconds: Long): Duration
}
object Seconds {
def apply(seconds: Long): Duration
}
object Minutes {
def apply(minutes: Long): Duration
}Common duration patterns:
val batchInterval = Seconds(5) // 5 second batches
val windowSize = Minutes(10) // 10 minute windows
val slideInterval = Seconds(30) // 30 second slides
val checkpointInterval = Minutes(2) // Checkpoint every 2 minutesInstall with Tessl CLI
npx tessl i tessl/pypi-pyspark-streaming