Real-time data processing with structured streaming. Provides unified batch and streaming APIs with exactly-once processing guarantees, fault tolerance, and advanced streaming analytics capabilities.
Interface for reading streaming data from various sources.
/**
* Interface for reading streaming data from various sources
*/
class DataStreamReader {
/** Specify streaming data source format */
def format(source: String): DataStreamReader
/** Set schema for the streaming data */
def schema(schema: StructType): DataStreamReader
def schema(schemaString: String): DataStreamReader
/** Set options for the streaming source */
def option(key: String, value: String): DataStreamReader
def option(key: String, value: Boolean): DataStreamReader
def option(key: String, value: Long): DataStreamReader
def option(key: String, value: Double): DataStreamReader
def options(options: scala.collection.Map[String, String]): DataStreamReader
def options(options: java.util.Map[String, String]): DataStreamReader
/** Load streaming data using generic interface */
def load(): DataFrame
def load(path: String): DataFrame
/** Built-in streaming sources */
def text(path: String): DataFrame
def textFile(path: String): Dataset[String]
def csv(path: String): DataFrame
def json(path: String): DataFrame
def parquet(path: String): DataFrame
def orc(path: String): DataFrame
/** Kafka streaming source */
def kafka(): DataFrame
/** Socket streaming source (for testing) */
def socket(host: String, port: Int): DataFrame
def socket(host: String, port: Int, includeTimestamp: Boolean): DataFrame
/** Rate streaming source (for testing) */
def rate(rowsPerSecond: Long): DataFrame
def rate(rowsPerSecond: Long, numPartitions: Int): DataFrame
}Usage Examples:
// File-based streaming
val streamingDf = spark.readStream
.schema(schema)
.option("maxFilesPerTrigger", "10")
.json("path/to/streaming/json/files")
// Kafka streaming
val kafkaStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", "latest")
.load()
// Socket streaming (for testing)
val socketStream = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Rate source (for testing)
val rateStream = spark.readStream
.format("rate")
.option("rowsPerSecond", 100)
.option("numPartitions", 4)
.load()Interface for writing streaming Dataset to various sinks.
/**
* Interface for writing streaming Dataset to various sinks
* @tparam T Type of the streaming Dataset
*/
class DataStreamWriter[T] {
/** Specify output format */
def format(source: String): DataStreamWriter[T]
/** Set output mode */
def outputMode(outputMode: OutputMode): DataStreamWriter[T]
def outputMode(outputMode: String): DataStreamWriter[T]
/** Set trigger for micro-batch processing */
def trigger(trigger: Trigger): DataStreamWriter[T]
/** Set options for the streaming sink */
def option(key: String, value: String): DataStreamWriter[T]
def option(key: String, value: Boolean): DataStreamWriter[T]
def option(key: String, value: Long): DataStreamWriter[T]
def option(key: String, value: Double): DataStreamWriter[T]
def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]
def options(options: java.util.Map[String, String]): DataStreamWriter[T]
/** Partition output by columns */
def partitionBy(colNames: String*): DataStreamWriter[T]
/** Query name for identification */
def queryName(queryName: String): DataStreamWriter[T]
/** Start streaming query */
def start(): StreamingQuery
def start(path: String): StreamingQuery
/** Built-in streaming sinks */
def console(): DataStreamWriter[T]
def console(numRows: Int): DataStreamWriter[T]
def console(numRows: Int, truncate: Boolean): DataStreamWriter[T]
/** File-based sinks */
def json(path: String): StreamingQuery
def parquet(path: String): StreamingQuery
def orc(path: String): StreamingQuery
def text(path: String): StreamingQuery
def csv(path: String): StreamingQuery
/** Custom row-by-row processing */
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]
/** Memory sink (for testing) */
def memory(queryName: String): DataStreamWriter[T]
}Usage Examples:
// Console output
val query1 = streamingDf.writeStream
.outputMode("append")
.format("console")
.option("numRows", 20)
.option("truncate", false)
.start()
// File output with partitioning
val query2 = streamingDf.writeStream
.format("parquet")
.outputMode("append")
.option("path", "output/streaming")
.option("checkpointLocation", "checkpoints/streaming")
.partitionBy("year", "month")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
// Kafka output
val query3 = processedStream
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.option("checkpointLocation", "checkpoints/kafka-out")
.outputMode("append")
.start()
// Custom processing with foreachBatch
val query4 = streamingDf.writeStream
.foreachBatch { (batchDf: DataFrame, batchId: Long) =>
println(s"Processing batch $batchId with ${batchDf.count()} records")
batchDf.write
.mode(SaveMode.Append)
.saveAsTable("streaming_results")
}
.trigger(Trigger.ProcessingTime("1 minute"))
.start()Different output modes for streaming queries.
/**
* Output modes for streaming queries
*/
object OutputMode {
/** Only append new rows to the result table */
val Append: OutputMode = "append"
/** Output complete result table for each trigger */
val Complete: OutputMode = "complete"
/** Output only updated rows since last trigger */
val Update: OutputMode = "update"
}Control timing and execution of streaming micro-batches.
/**
* Triggers for controlling streaming execution
*/
sealed trait Trigger
object Trigger {
/** Process data as fast as possible */
def ProcessingTime(interval: String): Trigger
def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
/** Process data once and stop */
def Once(): Trigger
/** Trigger based on availability of data */
def AvailableNow(): Trigger
/** Continuous processing (experimental) */
def Continuous(interval: String): Trigger
def Continuous(interval: Long, unit: TimeUnit): Trigger
}Usage Examples:
import java.util.concurrent.TimeUnit
// Process every 30 seconds
val trigger1 = Trigger.ProcessingTime("30 seconds")
val trigger2 = Trigger.ProcessingTime(30, TimeUnit.SECONDS)
// Process once and terminate
val trigger3 = Trigger.Once()
// Process all available data and terminate
val trigger4 = Trigger.AvailableNow()
// Continuous processing (low-latency)
val trigger5 = Trigger.Continuous("1 second")Handle to a running streaming query with control and monitoring capabilities.
/**
* Handle to a running streaming query
*/
trait StreamingQuery {
/** Unique identifier for the query */
def id: UUID
/** Name of the query */
def name: String
/** Check if query is currently active */
def isActive: Boolean
/** Block until query terminates */
def awaitTermination(): Unit
def awaitTermination(timeoutMs: Long): Boolean
/** Stop the query */
def stop(): Unit
/** Get the most recent progress update */
def lastProgress: StreamingQueryProgress
/** Get recent progress updates */
def recentProgress: Array[StreamingQueryProgress]
/** Get current status */
def status: StreamingQueryStatus
/** Get exception that caused query to stop (if any) */
def exception: Option[StreamingQueryException]
/** Explain the query plan */
def explain(): Unit
def explain(extended: Boolean): Unit
}Manager for all StreamingQueries in a SparkSession.
/**
* Manager for all StreamingQueries in a SparkSession
*/
class StreamingQueryManager {
/** Get currently active streaming queries */
def active: Array[StreamingQuery]
/** Get a query by id */
def get(id: UUID): StreamingQuery
def get(id: String): StreamingQuery
/** Block until all queries terminate */
def awaitAnyTermination(): Unit
def awaitAnyTermination(timeoutMs: Long): Boolean
/** Reset terminates all active queries */
def resetTerminated(): Unit
/** Add listener for streaming query events */
def addListener(listener: StreamingQueryListener): Unit
def removeListener(listener: StreamingQueryListener): Unit
}Custom sink for row-by-row processing.
/**
* Abstract class for custom streaming sinks
* @tparam T Type of rows to process
*/
abstract class ForeachWriter[T] extends Serializable {
/** Called when starting to process a partition */
def open(partitionId: Long, epochId: Long): Boolean
/** Called for each row */
def process(value: T): Unit
/** Called when finishing processing a partition */
def close(errorOrNull: Throwable): Unit
}Usage Example:
import org.apache.spark.sql.ForeachWriter
val customWriter = new ForeachWriter[Row] {
def open(partitionId: Long, epochId: Long): Boolean = {
// Initialize resources (e.g., database connection)
println(s"Opening partition $partitionId for epoch $epochId")
true
}
def process(value: Row): Unit = {
// Process each row
val id = value.getAs[Long]("id")
val name = value.getAs[String]("name")
println(s"Processing record: $id, $name")
// Write to external system
}
def close(errorOrNull: Throwable): Unit = {
// Clean up resources
if (errorOrNull != null) {
println(s"Error occurred: ${errorOrNull.getMessage}")
}
println("Closing partition")
}
}
val query = streamingDf.writeStream
.foreach(customWriter)
.start()Advanced aggregation operations in streaming context.
Time-based aggregations:
import org.apache.spark.sql.functions._
// Window-based aggregations
val windowedCounts = streamingDf
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "5 minutes", "1 minute"),
col("category")
)
.count()
// Tumbling window
val tumblingWindow = streamingDf
.withWatermark("timestamp", "10 minutes")
.groupBy(window(col("timestamp"), "10 minutes"))
.agg(sum("amount").alias("total_amount"))
// Session window (event-time sessions)
val sessionWindow = streamingDf
.withWatermark("timestamp", "10 minutes")
.groupBy(
col("userId"),
session_window(col("timestamp"), "5 minutes")
)
.agg(count("*").alias("events_in_session"))Stateful operations:
// Global aggregations (require complete output mode)
val globalCounts = streamingDf
.groupBy("category")
.count()
// With watermarking for late data handling
val withWatermark = streamingDf
.withWatermark("eventTime", "1 hour")
.groupBy("userId", window(col("eventTime"), "30 minutes"))
.agg(sum("amount").alias("total"))Exactly-once processing:
val query = streamingDf
.writeStream
.outputMode("append")
.option("checkpointLocation", "s3://bucket/checkpoints/query1")
.format("delta") // or other ACID-compliant sink
.start("s3://bucket/output/table")Error handling and monitoring:
val query = streamingDf.writeStream
.foreachBatch { (batchDf: DataFrame, batchId: Long) =>
try {
batchDf.write
.mode(SaveMode.Append)
.saveAsTable("results")
} catch {
case e: Exception =>
println(s"Error processing batch $batchId: ${e.getMessage}")
// Log to monitoring system
throw e // Re-throw to trigger query restart
}
}
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
// Monitor query progress
val progressInfo = query.lastProgress
println(s"Input rows/sec: ${progressInfo.inputRowsPerSecond}")
println(s"Processing time: ${progressInfo.durationMs}")Stream-stream joins:
val stream1 = spark.readStream.format("kafka")...
val stream2 = spark.readStream.format("kafka")...
val joinedStream = stream1.alias("s1")
.join(stream2.alias("s2"),
expr("s1.key = s2.key AND s1.timestamp >= s2.timestamp - interval 1 hour AND s1.timestamp <= s2.timestamp + interval 1 hour"),
"inner"
)Stream-static joins:
val staticDf = spark.read.table("reference_data")
val streamingDf = spark.readStream...
val enrichedStream = streamingDf
.join(staticDf, "key") // Broadcast join with static data