CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-parent-2-12

Apache Spark - Unified analytics engine for large-scale data processing

Pending
Overview
Eval results
Files

streaming.mddocs/

Stream Processing

Structured Streaming provides real-time stream processing with exactly-once fault-tolerance guarantees. Built on the Spark SQL engine for seamless integration with batch processing. Also includes legacy Spark Streaming (DStreams) for micro-batch processing.

Capabilities

Structured Streaming

Modern streaming API built on DataFrame/Dataset with continuous processing and exactly-once guarantees.

/**
 * Reader for streaming data sources
 */
class DataStreamReader {
  /** Specify streaming data source format */
  def format(source: String): DataStreamReader
  /** Add input option */
  def option(key: String, value: String): DataStreamReader
  def option(key: String, value: Boolean): DataStreamReader
  def option(key: String, value: Long): DataStreamReader
  def option(key: String, value: Double): DataStreamReader
  /** Add multiple options */
  def options(options: Map[String, String]): DataStreamReader
  /** Set expected schema */
  def schema(schema: StructType): DataStreamReader
  def schema(schemaString: String): DataStreamReader
  
  /** Load streaming DataFrame */
  def load(): DataFrame
  def load(path: String): DataFrame
  
  /** Format-specific methods */
  def json(path: String): DataFrame
  def csv(path: String): DataFrame
  def parquet(path: String): DataFrame
  def text(path: String): DataFrame
  def textFile(path: String): Dataset[String]
  def kafka(options: Map[String, String]): DataFrame
  def socket(host: String, port: Int): DataFrame
}

/**
 * Writer for streaming data sinks
 */
class DataStreamWriter[T] {
  /** Set output mode */
  def outputMode(outputMode: OutputMode): DataStreamWriter[T]
  def outputMode(outputMode: String): DataStreamWriter[T]
  /** Set trigger interval */
  def trigger(trigger: Trigger): DataStreamWriter[T]
  /** Specify output format */
  def format(source: String): DataStreamWriter[T]
  /** Add output option */
  def option(key: String, value: String): DataStreamWriter[T]
  def option(key: String, value: Boolean): DataStreamWriter[T]
  def option(key: String, value: Long): DataStreamWriter[T]
  def option(key: String, value: Double): DataStreamWriter[T]
  /** Add multiple options */
  def options(options: Map[String, String]): DataStreamWriter[T]
  /** Partition output by columns */
  def partitionBy(colNames: String*): DataStreamWriter[T]
  /** Set query name */
  def queryName(queryName: String): DataStreamWriter[T]
  
  /** Start streaming query */
  def start(): StreamingQuery
  def start(path: String): StreamingQuery
  
  /** Format-specific methods */
  def console(): StreamingQuery
  def json(path: String): StreamingQuery
  def csv(path: String): StreamingQuery
  def parquet(path: String): StreamingQuery
  def text(path: String): StreamingQuery
  def kafka(): StreamingQuery
  def memory(queryName: String): StreamingQuery
}

/**
 * Output modes for streaming queries
 */
object OutputMode {
  val Append: OutputMode
  val Complete: OutputMode
  val Update: OutputMode
}

/**
 * Trigger policies for streaming queries
 */
object Trigger {
  /** Process data as fast as possible */
  def ProcessingTime(interval: String): Trigger
  def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
  /** Process data once then stop */
  def Once(): Trigger
  /** Continuous processing with low latency */
  def Continuous(interval: String): Trigger
  def Continuous(interval: Long, unit: TimeUnit): Trigger
}

Usage Examples:

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.functions._

// Read from file source
val lines = spark.readStream
  .format("text")
  .option("path", "input-directory")
  .load()

// Word count example
val words = lines
  .select(explode(split($"value", " ")).as("word"))
  .groupBy("word")
  .count()

// Write to console
val query = words.writeStream
  .outputMode("complete")
  .format("console")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

query.awaitTermination()

// Kafka source and sink
val kafkaDF = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic1")
  .load()

val processedDF = kafkaDF
  .select(from_json($"value".cast("string"), schema).as("data"))
  .select("data.*")
  .groupBy("category")
  .count()

val output = processedDF.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .outputMode("update")
  .start()

StreamingQuery Management

Interface for managing and monitoring streaming queries.

/**
 * Handle to a running streaming query
 */
abstract class StreamingQuery {
  /** Unique identifier for the query */
  def id: UUID
  /** Name of the query */
  def name: String
  /** Check if query is currently active */
  def isActive: Boolean
  /** Get current status */
  def status: StreamingQueryStatus
  /** Get recent progress updates */
  def recentProgress: Array[StreamingQueryProgress]
  /** Get last progress update */
  def lastProgress: StreamingQueryProgress
  /** Block until query terminates */
  def awaitTermination(): Unit
  def awaitTermination(timeoutMs: Long): Boolean
  /** Stop the query */
  def stop(): Unit
  /** Get exception that caused query to fail */
  def exception: Option[StreamingQueryException]
  /** Explain the streaming query plan */
  def explain(): Unit
  def explain(extended: Boolean): Unit
}

/**
 * Manager for streaming queries
 */
class StreamingQueryManager {
  /** Get currently active queries */
  def active: Array[StreamingQuery]
  /** Get query by id */
  def get(id: UUID): StreamingQuery
  def get(id: String): StreamingQuery
  /** Block until all queries terminate */
  def awaitAnyTermination(): Unit
  def awaitAnyTermination(timeoutMs: Long): Boolean
  /** Stop all active queries */
  def stopAll(): Unit
  /** Add listener for query events */
  def addListener(listener: StreamingQueryListener): Unit
  /** Remove listener */
  def removeListener(listener: StreamingQueryListener): Unit
}

/**
 * Progress information for streaming queries
 */
case class StreamingQueryProgress(
  id: UUID,
  runId: UUID,
  name: String,
  timestamp: String,
  batchId: Long,
  batchDuration: Long,
  durationMs: Map[String, Long],
  eventTime: Map[String, String],
  stateOperators: Array[StateOperatorProgress],
  sources: Array[SourceProgress],
  sink: SinkProgress
)

Window Operations

Time-based windowing for streaming aggregations.

/**
 * Window functions for streaming data
 */
object functions {
  /** Tumbling time window */
  def window(
    timeColumn: Column,
    windowDuration: String
  ): Column
  
  /** Sliding time window */
  def window(
    timeColumn: Column,
    windowDuration: String,
    slideDuration: String
  ): Column
  
  /** Sliding time window with start offset */
  def window(
    timeColumn: Column,
    windowDuration: String,
    slideDuration: String,
    startTime: String
  ): Column
  
  /** Session window */
  def session_window(
    timeColumn: Column,
    gapDuration: String
  ): Column
}

Usage Examples:

import org.apache.spark.sql.functions._

// Tumbling window (10 minute windows)
val windowed = events
  .withWatermark("timestamp", "10 minutes")
  .groupBy(
    window($"timestamp", "10 minutes"),
    $"category"
  )
  .count()

// Sliding window (10 minute windows, sliding every 5 minutes)
val sliding = events
  .withWatermark("timestamp", "10 minutes")
  .groupBy(
    window($"timestamp", "10 minutes", "5 minutes"),
    $"userId"
  )
  .agg(sum("amount").as("total"))

// Session window (gap-based)
val sessions = events
  .withWatermark("timestamp", "30 minutes")
  .groupBy(
    $"userId",
    session_window($"timestamp", "20 minutes")
  )
  .count()

Watermarking and Late Data

Handling late-arriving data with watermarks.

/**
 * Watermarking for handling late data
 */
implicit class DatasetWatermark[T](ds: Dataset[T]) {
  /** Define watermark for late data handling */
  def withWatermark(eventTime: String, delayThreshold: String): Dataset[T]
}

Usage Examples:

// Handle late data with watermarking
val result = inputStream
  .withWatermark("eventTime", "10 minutes")  // Allow 10 minutes of late data
  .groupBy(
    window($"eventTime", "5 minutes"),
    $"deviceId"
  )
  .count()

// Watermark affects state cleanup
val stateStream = inputStream
  .withWatermark("eventTime", "1 hour")
  .groupBy($"sessionId")
  .agg(
    min("eventTime").as("sessionStart"),
    max("eventTime").as("sessionEnd"),
    count("*").as("eventCount")
  )

Spark Streaming (Legacy DStreams)

Original streaming API using discretized streams (micro-batches).

/**
 * Main entry point for Spark Streaming
 */
class StreamingContext(conf: SparkConf, batchDuration: Duration) {
  /** Create DStream from TCP socket */
  def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String]
  
  /** Monitor directory for new text files */
  def textFileStream(directory: String): DStream[String]
  
  /** Monitor directory for new files */
  def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](
    directory: String
  ): InputDStream[(K, V)]
  
  /** Create DStream from RDD queue */
  def queueStream[T: ClassTag](
    queue: Queue[RDD[T]],
    oneAtATime: Boolean = true
  ): InputDStream[T]
  
  /** Union multiple DStreams */
  def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]
  
  /** Start streaming computation */
  def start(): Unit
  /** Stop streaming */
  def stop(stopSparkContext: Boolean = true, stopGracefully: Boolean = false): Unit
  /** Wait for termination */
  def awaitTermination(): Unit
  def awaitTermination(timeout: Long): Boolean
  /** Set remember duration for RDDs */
  def remember(duration: Duration): Unit
  /** Set checkpoint directory */
  def checkpoint(directory: String): Unit
}

/**
 * Discretized Stream - sequence of RDDs
 */
abstract class DStream[T: ClassTag] {
  /** Transform each element */
  def map[U: ClassTag](mapFunc: T => U): DStream[U]
  /** Transform and flatten */
  def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]
  /** Filter elements */
  def filter(filterFunc: T => Boolean): DStream[T]
  /** Reduce elements in each RDD */
  def reduce(reduceFunc: (T, T) => T): DStream[T]
  /** Count elements in each RDD */
  def count(): DStream[Long]
  /** Union with another DStream */
  def union(that: DStream[T]): DStream[T]
  
  /** Transform with function on RDD */
  def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
  def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
  
  /** Window operations */
  def window(windowDuration: Duration): DStream[T]
  def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
  /** Reduce over window */
  def reduceByWindow(
    reduceFunc: (T, T) => T,
    windowDuration: Duration,
    slideDuration: Duration
  ): DStream[T]
  /** Count over window */
  def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
  
  /** Apply function to each RDD */
  def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
  /** Print elements from each RDD */
  def print(num: Int = 10): Unit
  /** Save as text files */
  def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
  
  /** Cache stream RDDs */
  def cache(): DStream[T]
  /** Persist stream RDDs */
  def persist(level: StorageLevel = StorageLevel.MEMORY_ONLY_SER): DStream[T]
}

DStream Operations for Key-Value Pairs

Additional operations for DStreams of key-value pairs.

/**
 * Additional operations for pair DStreams
 */
implicit class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K, V)]) {
  /** Group values by key */
  def groupByKey(): DStream[(K, Iterable[V])]
  def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
  
  /** Reduce values by key */
  def reduceByKey(func: (V, V) => V): DStream[(K, V)]
  def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]
  
  /** Reduce values by key over window */
  def reduceByKeyAndWindow(
    reduceFunc: (V, V) => V,
    windowDuration: Duration,
    slideDuration: Duration
  ): DStream[(K, V)]
  
  /** Join with another pair DStream */
  def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
  def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]
  
  /** Left outer join */
  def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
  
  /** Cogroup with another pair DStream */
  def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]
  
  /** Update state by key */
  def updateStateByKey[S: ClassTag](
    updateFunc: (Seq[V], Option[S]) => Option[S]
  ): DStream[(K, S)]
  
  /** Map with state */
  def mapWithState[StateType: ClassTag, MappedType: ClassTag](
    spec: StateSpec[K, V, StateType, MappedType]
  ): MapWithStateDStream[K, V, StateType, MappedType]
}

Usage Examples:

import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._

// Create StreamingContext
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("checkpoint")

// Socket stream
val lines = ssc.socketTextStream("localhost", 9999)

// Word count
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()

// Windowed operations
val windowedWordCounts = pairs
  .reduceByKeyAndWindow(
    (a: Int, b: Int) => (a + b),
    Seconds(30),
    Seconds(10)
  )

// Stateful operations
val runningCounts = pairs.updateStateByKey[Int] { (values, state) =>
  val currentCount = values.sum
  val previousCount = state.getOrElse(0)
  Some(currentCount + previousCount)
}

// File stream
val textFiles = ssc.textFileStream("input-directory")
val processedFiles = textFiles
  .flatMap(_.split("\n"))
  .filter(_.nonEmpty)
  .map(line => (line.split(",")(0), 1))
  .reduceByKey(_ + _)

ssc.start()
ssc.awaitTermination()

Time and Duration

Time handling utilities for streaming applications.

/**
 * Duration for streaming intervals
 */
case class Duration(private val millis: Long) {
  def +(other: Duration): Duration
  def -(other: Duration): Duration
  def *(times: Int): Duration
  def /(that: Duration): Double
  def <(other: Duration): Boolean
  def <=(other: Duration): Boolean
  def >(other: Duration): Boolean
  def >=(other: Duration): Boolean
  def milliseconds: Long
}

/** Duration factory methods */
object Duration {
  def apply(length: Long): Duration
}

object Milliseconds {
  def apply(milliseconds: Long): Duration
}

object Seconds {
  def apply(seconds: Long): Duration
}

object Minutes {
  def apply(minutes: Long): Duration
}

/**
 * Time instance for DStreams
 */
case class Time(private val millis: Long) {
  def +(other: Duration): Time
  def -(other: Duration): Time
  def -(other: Time): Duration
  def <(other: Time): Boolean
  def <=(other: Time): Boolean
  def >(other: Time): Boolean
  def >=(other: Time): Boolean
  def milliseconds: Long
}

Checkpointing and Fault Tolerance

Mechanisms for fault tolerance in streaming applications.

/**
 * Checkpointing utilities
 */
object StreamingContext {
  /** Create or recover StreamingContext from checkpoint */
  def getOrCreate(
    checkpointPath: String,
    creatingFunc: () => StreamingContext
  ): StreamingContext
  
  /** Create or recover with Hadoop configuration */
  def getOrCreate(
    checkpointPath: String,
    creatingFunc: () => StreamingContext,
    hadoopConf: Configuration
  ): StreamingContext
}

Usage Examples:

// Checkpointing example
def createStreamingContext(): StreamingContext = {
  val conf = new SparkConf().setAppName("CheckpointApp")
  val ssc = new StreamingContext(conf, Seconds(5))
  ssc.checkpoint("hdfs://checkpoint")
  
  // Define streaming computation
  val lines = ssc.socketTextStream("localhost", 9999)
  val words = lines.flatMap(_.split(" "))
  val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  wordCounts.print()
  
  ssc
}

// Recovery from checkpoint
val ssc = StreamingContext.getOrCreate("hdfs://checkpoint", createStreamingContext _)
ssc.start()
ssc.awaitTermination()

Error Handling

Common streaming exceptions:

  • StreamingQueryException - Streaming query execution failures
  • AnalysisException - Invalid streaming operations or configurations
  • IllegalStateException - Invalid streaming application state
  • TimeoutException - Query termination timeouts

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-parent-2-12

docs

core.md

deployment.md

graphx.md

index.md

ml.md

sql.md

streaming.md

tile.json