CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyspark-streaming

PySpark Streaming module that enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

core-operations.mddocs/

Core Streaming Operations

Core functionality for creating and managing Spark Streaming contexts, controlling application lifecycle, and basic DStream operations.

StreamingContext Creation

Primary Constructors

Create StreamingContext with existing SparkContext:

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration)

Create StreamingContext with Spark configuration:

class StreamingContext(conf: SparkConf, batchDuration: Duration)

Create StreamingContext with master URL and app name:

class StreamingContext(
  master: String,
  appName: String,
  batchDuration: Duration,
  sparkHome: String = null,
  jars: Seq[String] = Nil,
  environment: Map[String, String] = Map()
)

Checkpoint Recovery

Create StreamingContext from checkpoint:

class StreamingContext(path: String)
class StreamingContext(path: String, hadoopConf: Configuration)
class StreamingContext(path: String, sparkContext: SparkContext)

Example checkpoint recovery:

val checkpointDir = "hdfs://checkpoint-dir"

def createStreamingContext(): StreamingContext = {
  val conf = new SparkConf().setAppName("MyApp")
  val ssc = new StreamingContext(conf, Seconds(5))
  ssc.checkpoint(checkpointDir)
  // Define your streams and transformations here
  ssc
}

val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)

Lifecycle Management

Starting and Stopping

Start streaming computation:

def start(): Unit

Stop streaming computation:

def stop(stopSparkContext: Boolean = true): Unit
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit

Wait for termination:

def awaitTermination(): Unit
def awaitTerminationOrTimeout(timeout: Long): Boolean

Example lifecycle management:

val ssc = new StreamingContext(conf, Seconds(1))

// Define your streams and transformations
val lines = ssc.socketTextStream("localhost", 9999)
val wordCounts = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
wordCounts.print()

// Start the streaming context
ssc.start()

// Add shutdown hook for graceful termination
sys.ShutdownHookThread {
  println("Gracefully stopping Spark Streaming Application")
  ssc.stop(true, true)
  println("Application stopped")
}

// Wait for termination
ssc.awaitTermination()

State Management

Get current context state:

def getState(): StreamingContextState

StreamingContextState values:

  • INITIALIZED - Context created but not started
  • ACTIVE - Context started and running
  • STOPPED - Context stopped

Access underlying SparkContext:

def sparkContext: SparkContext

Configuration and Checkpointing

Checkpoint Configuration

Set checkpoint directory:

def checkpoint(directory: String): Unit

Set remember duration for DStreams:

def remember(duration: Duration): Unit

Example checkpoint setup:

val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("hdfs://namenode:9000/checkpoint")
ssc.remember(Minutes(2)) // Remember last 2 minutes of data

Static Context Management

Singleton Context Management

Get currently active StreamingContext:

def getActive(): Option[StreamingContext]

Get or create StreamingContext:

def getActiveOrCreate(creatingFunc: () => StreamingContext): StreamingContext

Get or create from checkpoint:

def getOrCreate(
  checkpointPath: String,
  creatingFunc: () => StreamingContext,
  hadoopConf: Configuration = SparkHadoopUtil.get.conf,
  createOnError: Boolean = false
): StreamingContext

Example singleton management:

object StreamingApp {
  def createContext(): StreamingContext = {
    val conf = new SparkConf().setAppName("MyStreamingApp")
    val ssc = new StreamingContext(conf, Seconds(2))
    ssc.checkpoint("checkpoint-dir")
    // Define streaming logic
    ssc
  }

  def main(args: Array[String]): Unit = {
    // This will create new context or recover from checkpoint
    val ssc = StreamingContext.getOrCreate("checkpoint-dir", createContext _)
    ssc.start()
    ssc.awaitTermination()
  }
}

DStream Union and Transform

Union Operations

Union multiple DStreams:

def union[T](streams: Seq[DStream[T]]): DStream[T]

Example union:

val stream1 = ssc.socketTextStream("localhost", 9999)
val stream2 = ssc.socketTextStream("localhost", 9998)
val stream3 = ssc.textFileStream("/data/input")

val unionStream = ssc.union(Seq(stream1, stream2, stream3))

Transform Operations

Transform multiple DStreams together:

def transform[T](
  dstreams: Seq[DStream[_]],
  transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T]

Example multi-stream transform:

val stream1 = ssc.socketTextStream("localhost", 9999)
val stream2 = ssc.socketTextStream("localhost", 9998)

val combinedStream = ssc.transform(Seq(stream1, stream2)) { (rdds, time) =>
  val rdd1 = rdds(0).asInstanceOf[RDD[String]]
  val rdd2 = rdds(1).asInstanceOf[RDD[String]]
  
  // Custom transformation logic
  rdd1.union(rdd2).filter(_.length > 5)
}

Event Listeners

Adding and Removing Listeners

Add streaming listener:

def addStreamingListener(streamingListener: StreamingListener): Unit

Remove streaming listener:

def removeStreamingListener(streamingListener: StreamingListener): Unit

Example custom listener:

class MyStreamingListener extends StreamingListener {
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val batchInfo = batchCompleted.batchInfo
    println(s"Batch ${batchInfo.batchTime} completed in ${batchInfo.processingDelay}ms")
  }

  override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
    println(s"Receiver error: ${receiverError.receiverInfo.lastErrorMessage}")
  }
}

ssc.addStreamingListener(new MyStreamingListener())

Utility Methods

JAR Management

Get JAR file for a class:

def jarOfClass(cls: Class[_]): Option[String]

Example usage:

val jarPath = StreamingContext.jarOfClass(classOf[MyCustomClass])

Context Access

Access the underlying Spark context:

val sc = ssc.sparkContext
val appName = sc.appName
val mastser = sc.master

Duration Helpers

Create duration objects:

object Milliseconds {
  def apply(milliseconds: Long): Duration
}

object Seconds {
  def apply(seconds: Long): Duration  
}

object Minutes {
  def apply(minutes: Long): Duration
}

Common duration patterns:

val batchInterval = Seconds(5)           // 5 second batches
val windowSize = Minutes(10)             // 10 minute windows
val slideInterval = Seconds(30)          // 30 second slides
val checkpointInterval = Minutes(2)      // Checkpoint every 2 minutes

Install with Tessl CLI

npx tessl i tessl/pypi-pyspark-streaming

docs

core-operations.md

index.md

input-sources.md

java-api.md

output-operations.md

stateful-operations.md

transformations.md

tile.json