CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyspark-streaming

PySpark Streaming module that enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

transformations.mddocs/

DStream Transformations

Comprehensive transformation operations for processing streaming data including mapping, filtering, windowing, aggregations, and advanced operations.

Basic Transformations

Map Operations

Transform each element:

def map[U: ClassTag](mapFunc: T => U): DStream[U]

Transform each element with partition information:

def mapPartitions[U: ClassTag](
  mapPartFunc: Iterator[T] => Iterator[U],
  preservePartitioning: Boolean = false
): DStream[U]

Example map operations:

val lines = ssc.socketTextStream("localhost", 9999)
val lengths = lines.map(_.length)
val upperCase = lines.map(_.toUpperCase)

// MapPartitions for batch processing
val batchProcessed = lines.mapPartitions { iter =>
  val batch = iter.toList
  processBatch(batch).iterator
}

FlatMap Operations

Transform and flatten elements:

def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]

Example word splitting:

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split("\\s+"))
val nonEmptyWords = lines.flatMap(_.split("\\s+").filter(_.nonEmpty))

Filter Operations

Filter elements based on predicate:

def filter(filterFunc: T => Boolean): DStream[T]

Example filtering:

val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
val evenNumbers = numbers.filter(_ % 2 == 0)
val positiveNumbers = numbers.filter(_ > 0)

Aggregation Operations

Reduce Operations

Reduce elements in each RDD:

def reduce(reduceFunc: (T, T) => T): DStream[T]

Count elements in each RDD:

def count(): DStream[Long]

Count by value:

def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)]

Example aggregations:

val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)

val sum = numbers.reduce(_ + _)
val count = numbers.count()
val histogram = numbers.countByValue()

sum.print()
count.print()  
histogram.print()

Partitioning Operations

Repartition

Change number of partitions:

def repartition(numPartitions: Int): DStream[T]

Coalesce

Coalesce elements within partitions:

def glom(): DStream[Array[T]]

Example partitioning:

val lines = ssc.socketTextStream("localhost", 9999)
val repartitioned = lines.repartition(4)
val grouped = lines.glom() // Group elements within partitions

Window Operations

Basic Windowing

Create windowed DStream:

def window(windowDuration: Duration): DStream[T]
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]

Example windowing:

val lines = ssc.socketTextStream("localhost", 9999)

// 30-second windows sliding every 10 seconds
val windowedLines = lines.window(Seconds(30), Seconds(10))
val windowCounts = windowedLines.count()

Windowed Reductions

Reduce over windows:

def reduceByWindow(
  reduceFunc: (T, T) => T,
  windowDuration: Duration,
  slideDuration: Duration
): DStream[T]

Optimized windowed reduction with inverse function:

def reduceByWindow(
  reduceFunc: (T, T) => T,
  invReduceFunc: (T, T) => T,
  windowDuration: Duration,
  slideDuration: Duration
): DStream[T]

Example windowed reductions:

val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)

// Sum over 1-minute windows, sliding every 10 seconds
val windowSum = numbers.reduceByWindow(_ + _, Minutes(1), Seconds(10))

// Optimized version with inverse function for subtraction
val optimizedSum = numbers.reduceByWindow(
  _ + _,           // Add function
  _ - _,           // Inverse (subtract) function  
  Minutes(1),      // Window duration
  Seconds(10)      // Slide duration
)

Windowed Counting

Count elements over windows:

def countByWindow(
  windowDuration: Duration,
  slideDuration: Duration
): DStream[Long]

Count by value over windows:

def countByValueAndWindow(
  windowDuration: Duration,
  slideDuration: Duration,
  numPartitions: Int = ssc.sc.defaultParallelism
): DStream[(T, Long)]

Pair DStream Operations

Key-Value Transformations

Map values while preserving keys:

def mapValues[U: ClassTag](f: V => U): DStream[(K, U)]  // On DStream[(K, V)]

FlatMap values:

def flatMapValues[U: ClassTag](f: V => TraversableOnce[U]): DStream[(K, U)]

Example key-value transformations:

val pairs = ssc.socketTextStream("localhost", 9999)
  .map(line => (line.split(":")(0), line.split(":")(1)))

val upperValues = pairs.mapValues(_.toUpperCase)
val wordValues = pairs.flatMapValues(_.split("\\s+"))

Grouping Operations

Group by key:

def groupByKey(): DStream[(K, Iterable[V])]  // On DStream[(K, V)]
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]

Example grouping:

val keyValuePairs = ssc.socketTextStream("localhost", 9999)
  .map(line => (line.charAt(0).toString, line))

val grouped = keyValuePairs.groupByKey()
grouped.foreachRDD { rdd =>
  rdd.collect().foreach { case (key, values) =>
    println(s"Key: $key, Count: ${values.size}")
  }
}

Reduction Operations

Reduce by key:

def reduceByKey(func: (V, V) => V): DStream[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]
def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]

Combine by key:

def combineByKey[C: ClassTag](
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiner: (C, C) => C,
  partitioner: Partitioner,
  mapSideCombine: Boolean = true
): DStream[(K, C)]

Example reductions:

val wordCounts = ssc.socketTextStream("localhost", 9999)
  .flatMap(_.split("\\s+"))
  .map((_, 1))
  .reduceByKey(_ + _)

// Advanced combine operation for computing averages
val scores = ssc.socketTextStream("localhost", 9999)
  .map(line => (line.split(",")(0), line.split(",")(1).toDouble))

val averages = scores.combineByKey(
  (score: Double) => (score, 1),                    // Create combiner
  (acc: (Double, Int), score: Double) => (acc._1 + score, acc._2 + 1),  // Merge value
  (acc1: (Double, Int), acc2: (Double, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)  // Merge combiners
).mapValues { case (sum, count) => sum / count }

Windowed Key-Value Operations

Group by key over windows:

def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(
  windowDuration: Duration,
  slideDuration: Duration
): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(
  windowDuration: Duration,
  slideDuration: Duration,
  numPartitions: Int
): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(
  windowDuration: Duration,
  slideDuration: Duration,
  partitioner: Partitioner
): DStream[(K, Iterable[V])]

Reduce by key over windows:

def reduceByKeyAndWindow(
  func: (V, V) => V,
  windowDuration: Duration
): DStream[(K, V)]

def reduceByKeyAndWindow(
  func: (V, V) => V,
  windowDuration: Duration,
  slideDuration: Duration
): DStream[(K, V)]

def reduceByKeyAndWindow(
  func: (V, V) => V,
  windowDuration: Duration,
  slideDuration: Duration,
  numPartitions: Int
): DStream[(K, V)]

def reduceByKeyAndWindow(
  func: (V, V) => V,
  windowDuration: Duration,
  slideDuration: Duration,
  partitioner: Partitioner
): DStream[(K, V)]

Optimized windowed reduction:

def reduceByKeyAndWindow(
  reduceFunc: (V, V) => V,
  invReduceFunc: (V, V) => V,
  windowDuration: Duration,
  slideDuration: Duration
): DStream[(K, V)]

Join Operations

Basic Joins

Inner join:

def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]

Left outer join:

def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
def leftOuterJoin[W: ClassTag](
  other: DStream[(K, W)],
  numPartitions: Int
): DStream[(K, (V, Option[W]))]

Right outer join:

def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
def rightOuterJoin[W: ClassTag](
  other: DStream[(K, W)],
  numPartitions: Int
): DStream[(K, (Option[V], W))]

Full outer join:

def fullOuterJoin[W: ClassTag](
  other: DStream[(K, W)]
): DStream[(K, (Option[V], Option[W]))]
def fullOuterJoin[W: ClassTag](
  other: DStream[(K, W)],
  numPartitions: Int
): DStream[(K, (Option[V], Option[W]))]

Example joins:

val userActions = ssc.socketTextStream("localhost", 9999)
  .map(line => (line.split(",")(0), line.split(",")(1))) // (userId, action)

val userProfiles = ssc.socketTextStream("localhost", 9998)  
  .map(line => (line.split(",")(0), line.split(",")(1))) // (userId, profile)

val enrichedActions = userActions.join(userProfiles)
val optionalEnriched = userActions.leftOuterJoin(userProfiles)

Union Operations

Union multiple DStreams:

def union(that: DStream[T]): DStream[T]

Static union method:

def union[T](streams: Seq[DStream[T]]): DStream[T]  // On StreamingContext

Example union:

val stream1 = ssc.socketTextStream("localhost", 9999)
val stream2 = ssc.socketTextStream("localhost", 9998)
val stream3 = ssc.textFileStream("/data/input")

// Union two streams
val combined = stream1.union(stream2)

// Union multiple streams
val allCombined = ssc.union(Seq(stream1, stream2, stream3))

Transform Operations

RDD-level Transformations

Apply arbitrary RDD transformations:

def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]

Transform with another DStream:

def transformWith[U: ClassTag, V: ClassTag](
  other: DStream[U],
  transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V]

def transformWith[U: ClassTag, V: ClassTag](
  other: DStream[U],
  transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
): DStream[V]

Example transforms:

val lines = ssc.socketTextStream("localhost", 9999)

// Apply complex RDD operations
val processed = lines.transform { rdd =>
  rdd.filter(_.nonEmpty)
     .map(_.toLowerCase)
     .zipWithIndex()
     .filter(_._2 % 2 == 0)
     .map(_._1)
}

// Transform with time information
val timestamped = lines.transform { (rdd, time) =>
  rdd.map(line => s"${time.milliseconds}: $line")
}

// Transform with another stream
val stream2 = ssc.socketTextStream("localhost", 9998)
val combined = lines.transformWith(stream2) { (rdd1, rdd2) =>
  rdd1.union(rdd2).distinct()
}

Utility Transformations

Cache and Persistence

Cache DStream:

def cache(): DStream[T]
def persist(): DStream[T]
def persist(level: StorageLevel): DStream[T]

Checkpointing

Enable checkpointing:

def checkpoint(interval: Duration): DStream[T]

Example persistence:

val expensiveStream = lines
  .map(expensiveTransformation)
  .cache()  // Cache for reuse

val checkpointedStream = expensiveStream
  .checkpoint(Seconds(10))  // Checkpoint every 10 seconds

Slicing

Get RDDs from time range:

def slice(interval: Interval): Seq[RDD[T]]
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]]

Example slicing:

val stream = ssc.socketTextStream("localhost", 9999)

// Get RDDs from last 30 seconds
val currentTime = new Time(System.currentTimeMillis())
val past30Seconds = currentTime - Seconds(30)
val recentRDDs = stream.slice(past30Seconds, currentTime)

Install with Tessl CLI

npx tessl i tessl/pypi-pyspark-streaming

docs

core-operations.md

index.md

input-sources.md

java-api.md

output-operations.md

stateful-operations.md

transformations.md

tile.json