PySpark Streaming module that enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive transformation operations for processing streaming data including mapping, filtering, windowing, aggregations, and advanced operations.
Transform each element:
def map[U: ClassTag](mapFunc: T => U): DStream[U]Transform each element with partition information:
def mapPartitions[U: ClassTag](
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean = false
): DStream[U]Example map operations:
val lines = ssc.socketTextStream("localhost", 9999)
val lengths = lines.map(_.length)
val upperCase = lines.map(_.toUpperCase)
// MapPartitions for batch processing
val batchProcessed = lines.mapPartitions { iter =>
val batch = iter.toList
processBatch(batch).iterator
}Transform and flatten elements:
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]Example word splitting:
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split("\\s+"))
val nonEmptyWords = lines.flatMap(_.split("\\s+").filter(_.nonEmpty))Filter elements based on predicate:
def filter(filterFunc: T => Boolean): DStream[T]Example filtering:
val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
val evenNumbers = numbers.filter(_ % 2 == 0)
val positiveNumbers = numbers.filter(_ > 0)Reduce elements in each RDD:
def reduce(reduceFunc: (T, T) => T): DStream[T]Count elements in each RDD:
def count(): DStream[Long]Count by value:
def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)]Example aggregations:
val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
val sum = numbers.reduce(_ + _)
val count = numbers.count()
val histogram = numbers.countByValue()
sum.print()
count.print()
histogram.print()Change number of partitions:
def repartition(numPartitions: Int): DStream[T]Coalesce elements within partitions:
def glom(): DStream[Array[T]]Example partitioning:
val lines = ssc.socketTextStream("localhost", 9999)
val repartitioned = lines.repartition(4)
val grouped = lines.glom() // Group elements within partitionsCreate windowed DStream:
def window(windowDuration: Duration): DStream[T]
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]Example windowing:
val lines = ssc.socketTextStream("localhost", 9999)
// 30-second windows sliding every 10 seconds
val windowedLines = lines.window(Seconds(30), Seconds(10))
val windowCounts = windowedLines.count()Reduce over windows:
def reduceByWindow(
reduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T]Optimized windowed reduction with inverse function:
def reduceByWindow(
reduceFunc: (T, T) => T,
invReduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T]Example windowed reductions:
val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
// Sum over 1-minute windows, sliding every 10 seconds
val windowSum = numbers.reduceByWindow(_ + _, Minutes(1), Seconds(10))
// Optimized version with inverse function for subtraction
val optimizedSum = numbers.reduceByWindow(
_ + _, // Add function
_ - _, // Inverse (subtract) function
Minutes(1), // Window duration
Seconds(10) // Slide duration
)Count elements over windows:
def countByWindow(
windowDuration: Duration,
slideDuration: Duration
): DStream[Long]Count by value over windows:
def countByValueAndWindow(
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int = ssc.sc.defaultParallelism
): DStream[(T, Long)]Map values while preserving keys:
def mapValues[U: ClassTag](f: V => U): DStream[(K, U)] // On DStream[(K, V)]FlatMap values:
def flatMapValues[U: ClassTag](f: V => TraversableOnce[U]): DStream[(K, U)]Example key-value transformations:
val pairs = ssc.socketTextStream("localhost", 9999)
.map(line => (line.split(":")(0), line.split(":")(1)))
val upperValues = pairs.mapValues(_.toUpperCase)
val wordValues = pairs.flatMapValues(_.split("\\s+"))Group by key:
def groupByKey(): DStream[(K, Iterable[V])] // On DStream[(K, V)]
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]Example grouping:
val keyValuePairs = ssc.socketTextStream("localhost", 9999)
.map(line => (line.charAt(0).toString, line))
val grouped = keyValuePairs.groupByKey()
grouped.foreachRDD { rdd =>
rdd.collect().foreach { case (key, values) =>
println(s"Key: $key, Count: ${values.size}")
}
}Reduce by key:
def reduceByKey(func: (V, V) => V): DStream[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]
def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]Combine by key:
def combineByKey[C: ClassTag](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true
): DStream[(K, C)]Example reductions:
val wordCounts = ssc.socketTextStream("localhost", 9999)
.flatMap(_.split("\\s+"))
.map((_, 1))
.reduceByKey(_ + _)
// Advanced combine operation for computing averages
val scores = ssc.socketTextStream("localhost", 9999)
.map(line => (line.split(",")(0), line.split(",")(1).toDouble))
val averages = scores.combineByKey(
(score: Double) => (score, 1), // Create combiner
(acc: (Double, Int), score: Double) => (acc._1 + score, acc._2 + 1), // Merge value
(acc1: (Double, Int), acc2: (Double, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // Merge combiners
).mapValues { case (sum, count) => sum / count }Group by key over windows:
def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
): DStream[(K, Iterable[V])]Reduce by key over windows:
def reduceByKeyAndWindow(
func: (V, V) => V,
windowDuration: Duration
): DStream[(K, V)]
def reduceByKeyAndWindow(
func: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)]
def reduceByKeyAndWindow(
func: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
): DStream[(K, V)]
def reduceByKeyAndWindow(
func: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
): DStream[(K, V)]Optimized windowed reduction:
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)]Inner join:
def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]Left outer join:
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
def leftOuterJoin[W: ClassTag](
other: DStream[(K, W)],
numPartitions: Int
): DStream[(K, (V, Option[W]))]Right outer join:
def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
def rightOuterJoin[W: ClassTag](
other: DStream[(K, W)],
numPartitions: Int
): DStream[(K, (Option[V], W))]Full outer join:
def fullOuterJoin[W: ClassTag](
other: DStream[(K, W)]
): DStream[(K, (Option[V], Option[W]))]
def fullOuterJoin[W: ClassTag](
other: DStream[(K, W)],
numPartitions: Int
): DStream[(K, (Option[V], Option[W]))]Example joins:
val userActions = ssc.socketTextStream("localhost", 9999)
.map(line => (line.split(",")(0), line.split(",")(1))) // (userId, action)
val userProfiles = ssc.socketTextStream("localhost", 9998)
.map(line => (line.split(",")(0), line.split(",")(1))) // (userId, profile)
val enrichedActions = userActions.join(userProfiles)
val optionalEnriched = userActions.leftOuterJoin(userProfiles)Union multiple DStreams:
def union(that: DStream[T]): DStream[T]Static union method:
def union[T](streams: Seq[DStream[T]]): DStream[T] // On StreamingContextExample union:
val stream1 = ssc.socketTextStream("localhost", 9999)
val stream2 = ssc.socketTextStream("localhost", 9998)
val stream3 = ssc.textFileStream("/data/input")
// Union two streams
val combined = stream1.union(stream2)
// Union multiple streams
val allCombined = ssc.union(Seq(stream1, stream2, stream3))Apply arbitrary RDD transformations:
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]Transform with another DStream:
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U],
transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V]
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U],
transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
): DStream[V]Example transforms:
val lines = ssc.socketTextStream("localhost", 9999)
// Apply complex RDD operations
val processed = lines.transform { rdd =>
rdd.filter(_.nonEmpty)
.map(_.toLowerCase)
.zipWithIndex()
.filter(_._2 % 2 == 0)
.map(_._1)
}
// Transform with time information
val timestamped = lines.transform { (rdd, time) =>
rdd.map(line => s"${time.milliseconds}: $line")
}
// Transform with another stream
val stream2 = ssc.socketTextStream("localhost", 9998)
val combined = lines.transformWith(stream2) { (rdd1, rdd2) =>
rdd1.union(rdd2).distinct()
}Cache DStream:
def cache(): DStream[T]
def persist(): DStream[T]
def persist(level: StorageLevel): DStream[T]Enable checkpointing:
def checkpoint(interval: Duration): DStream[T]Example persistence:
val expensiveStream = lines
.map(expensiveTransformation)
.cache() // Cache for reuse
val checkpointedStream = expensiveStream
.checkpoint(Seconds(10)) // Checkpoint every 10 secondsGet RDDs from time range:
def slice(interval: Interval): Seq[RDD[T]]
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]]Example slicing:
val stream = ssc.socketTextStream("localhost", 9999)
// Get RDDs from last 30 seconds
val currentTime = new Time(System.currentTimeMillis())
val past30Seconds = currentTime - Seconds(30)
val recentRDDs = stream.slice(past30Seconds, currentTime)Install with Tessl CLI
npx tessl i tessl/pypi-pyspark-streaming