or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

advanced-transformations.mdcore-streaming.mdindex.mdinput-sources.mdjava-api.md
tile.json

core-streaming.mddocs/

Core Streaming Operations

Core streaming functionality providing the essential components for building streaming applications including StreamingContext creation, DStream transformations, and output operations.

Capabilities

StreamingContext

The main entry point for Spark Streaming functionality. Creates and manages streaming computations.

/**
 * Main entry point for Spark Streaming functionality
 */
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
  /** Start the streaming computation */
  def start(): Unit
  
  /** Stop the streaming computation (uses spark.streaming.stopSparkContextByDefault config) */
  def stop(): Unit
  
  /** Stop the streaming computation with option to stop SparkContext */
  def stop(stopSparkContext: Boolean): Unit
  
  /** Stop with graceful shutdown and optional SparkContext stop */
  def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
  
  /** Wait for the streaming context to terminate */
  def awaitTermination(): Unit
  
  /** Wait for termination or timeout */
  def awaitTerminationOrTimeout(timeout: Long): Boolean
  
  /** Get current state of the streaming context */
  def getState(): StreamingContextState
  
  /** Set checkpoint directory for fault tolerance */
  def checkpoint(directory: String): Unit
  
  /** Set how long DStreams should remember their RDDs */
  def remember(duration: Duration): Unit
  
  /** Get the underlying SparkContext */
  def sparkContext: SparkContext
  
  /** Add a streaming listener for event notifications */
  def addStreamingListener(streamingListener: StreamingListener): Unit
  
  /** Remove a streaming listener */
  def removeStreamingListener(streamingListener: StreamingListener): Unit
}

Alternative Constructors:

/** Create from SparkConf */
def this(conf: SparkConf, batchDuration: Duration)

/** Create with master and app name */  
def this(master: String, appName: String, batchDuration: Duration)

/** Create with master, app name, batch duration, and Spark home */
def this(master: String, appName: String, batchDuration: Duration, sparkHome: String)

/** Create with master, app name, batch duration, Spark home, and JAR files */
def this(master: String, appName: String, batchDuration: Duration, sparkHome: String, jars: Seq[String])

/** Create with master, app name, batch duration, Spark home, JARs, and environment */
def this(master: String, appName: String, batchDuration: Duration, sparkHome: String, jars: Seq[String], environment: Map[String, String])

/** Restore from checkpoint */
def this(path: String)

/** Restore from checkpoint with Hadoop configuration */
def this(path: String, hadoopConf: Configuration)

/** Restore from checkpoint with existing SparkContext */
def this(path: String, sparkContext: SparkContext)

Companion Object Methods:

object StreamingContext {
  /** Get currently active StreamingContext */
  def getActive(): Option[StreamingContext]
  
  /** Get active context or create new one */
  def getActiveOrCreate(creatingFunc: () => StreamingContext): StreamingContext
  
  /** Create from checkpoint or use creating function */
  def getOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext): StreamingContext
  
  /** Create from checkpoint with Hadoop configuration or use creating function */
  def getOrCreate(checkpointPath: String, hadoopConf: Configuration, creatingFunc: () => StreamingContext): StreamingContext
  
  /** Create from checkpoint with Hadoop configuration and SparkContext or use creating function */
  def getOrCreate(checkpointPath: String, hadoopConf: Configuration, creatingFunc: () => StreamingContext, createOnError: Boolean): StreamingContext
}

Usage Examples:

import org.apache.spark._
import org.apache.spark.streaming._

// Create with SparkContext and batch duration
val conf = new SparkConf().setAppName("MyStreamingApp")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))

// Create directly from SparkConf
val ssc2 = new StreamingContext(conf, Seconds(2))

// Create with checkpoint recovery
val ssc3 = StreamingContext.getOrCreate("/path/to/checkpoint", () => {
  val conf = new SparkConf().setAppName("RecoverableApp")
  new StreamingContext(conf, Seconds(1))
})

// Configure and start
ssc.checkpoint("/path/to/checkpoint")
ssc.start()
ssc.awaitTermination()

DStream[T]

Discretized Stream - represents a continuous sequence of RDDs. The fundamental abstraction in Spark Streaming.

/**
 * Discretized Stream - represents a continuous sequence of RDDs
 * @tparam T the type of elements in the stream
 */
abstract class DStream[T] {
  /** The StreamingContext associated with this DStream */
  def context: StreamingContext
  
  /** The slide duration of this DStream */
  def slideDuration: Duration
  
  /** List of parent DStreams on which this DStream depends */
  def dependencies: List[DStream[_]]
  
  /** Persist RDDs in this DStream with the default storage level */
  def persist(): DStream[T]
  
  /** Persist RDDs with specific storage level */
  def persist(level: StorageLevel): DStream[T]
  
  /** Cache RDDs in memory */
  def cache(): DStream[T]
  
  /** Enable periodic checkpointing */
  def checkpoint(interval: Duration): DStream[T]
}

Basic Transformations:

abstract class DStream[T] {
  /** Transform each element using the provided function */
  def map[U](mapFunc: T => U): DStream[U]
  
  /** Transform each element and flatten results */
  def flatMap[U](flatMapFunc: T => TraversableOnce[U]): DStream[U]
  
  /** Filter elements based on predicate */
  def filter(filterFunc: T => Boolean): DStream[T]
  
  /** Transform each partition independently */
  def mapPartitions[U](mapPartFunc: Iterator[T] => Iterator[U]): DStream[U]
  
  /** Group elements of each partition into an array */
  def glom(): DStream[Array[T]]
  
  /** Repartition RDDs in the DStream */
  def repartition(numPartitions: Int): DStream[T]
  
  /** Union with another DStream */
  def union(that: DStream[T]): DStream[T]
}

Reduction Operations:

abstract class DStream[T] {
  /** Reduce elements using associative and commutative function */
  def reduce(reduceFunc: (T, T) => T): DStream[T]
  
  /** Count number of elements in each RDD */
  def count(): DStream[Long]
  
  /** Count occurrences of each unique value */
  def countByValue(): DStream[(T, Long)]
}

Output Operations:

abstract class DStream[T] {
  /** Print first 10 elements of each RDD */
  def print(): Unit
  
  /** Print first num elements of each RDD */ 
  def print(num: Int): Unit
  
  /** Apply function to each RDD in the DStream */
  def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
  
  /** Apply function to each RDD with timestamp */
  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
  
  /** Save as object files with prefix and suffix */
  def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit
  
  /** Save as text files with prefix and suffix */
  def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
}

Advanced Transformations:

abstract class DStream[T] {
  /** Transform using arbitrary RDD operation */
  def transform[U](transformFunc: RDD[T] => RDD[U]): DStream[U]
  
  /** Transform with access to timestamp */
  def transform[U](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
  
  /** Transform with another DStream */
  def transformWith[U, V](other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]): DStream[V]
  
  /** Transform with another DStream and timestamp access */
  def transformWith[U, V](other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]): DStream[V]
}

Utility Methods:

abstract class DStream[T] {
  /** Retrieve RDDs in specified time interval */
  def slice(interval: Interval): Seq[RDD[T]]
  
  /** Retrieve RDDs between specified times */
  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]]
}

Usage Examples:

import org.apache.spark.streaming._

// Basic transformations
val lines: DStream[String] = ssc.textFileStream("/path/to/files")
val words = lines.flatMap(_.split(" "))
val filtered = words.filter(_.length > 3)
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

// Persistence and checkpointing
val important = lines.filter(_.contains("ERROR")).cache()
important.checkpoint(Seconds(30))

// Custom transformations
val processed = lines.transform { rdd =>
  rdd.mapPartitions(partition => {
    // Custom partition-level processing
    partition.map(_.toUpperCase)
  })
}

// Output operations
wordCounts.print(20)
wordCounts.foreachRDD { (rdd, time) =>
  println(s"Batch time: $time")
  rdd.take(10).foreach(println)
}

PairDStreamFunctions[K, V]

Enhanced functionality for DStreams of key-value pairs, providing operations like grouping, joins, and aggregations.

/**
 * Extra functionality available on DStreams of (key, value) pairs
 * Available as implicit conversion from DStream[(K, V)]
 */
class PairDStreamFunctions[K, V](self: DStream[(K, V)]) {
  /** Group values by key */
  def groupByKey(): DStream[(K, Iterable[V])]
  
  /** Group by key with specific number of partitions */
  def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
  
  /** Group by key using custom partitioner */
  def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]
  
  /** Reduce values by key */
  def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]
  
  /** Reduce by key with specific partitions */
  def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)]
  
  /** Reduce by key with custom partitioner */
  def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]
  
  /** Combine values by key with custom combiners */
  def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C, 
    mergeCombiner: (C, C) => C
  ): DStream[(K, C)]
  
  /** Transform values while preserving keys */
  def mapValues[U](mapValuesFunc: V => U): DStream[(K, U)]
  
  /** Flat map values while preserving keys */
  def flatMapValues[U](flatMapValuesFunc: V => TraversableOnce[U]): DStream[(K, U)]
}

Join Operations:

class PairDStreamFunctions[K, V] {
  /** Inner join with another pair DStream */
  def join[W](other: DStream[(K, W)]): DStream[(K, (V, W))]
  
  /** Left outer join */
  def leftOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
  
  /** Right outer join */
  def rightOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
  
  /** Full outer join */
  def fullOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]
  
  /** Cogroup with another pair DStream */
  def cogroup[W](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]
}

Usage Examples:

val pairs: DStream[(String, Int)] = words.map(word => (word, 1))

// Basic operations
val wordCounts = pairs.reduceByKey(_ + _)
val upperWords = pairs.mapValues(_.toString.toUpperCase)
val grouped = pairs.groupByKey()

// Joins
val pairs2: DStream[(String, Double)] = // another pair DStream
val joined = pairs.join(pairs2) // DStream[(String, (Int, Double))]
val leftJoined = pairs.leftOuterJoin(pairs2)

// Custom partitioning
val customPartitioner = new HashPartitioner(4)
val partitioned = pairs.reduceByKey(_ + _, customPartitioner)