Spark Structured Streaming provides a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. It allows you to express streaming computations the same way you would express a batch computation on static data, using the same DataFrame and Dataset APIs.
class DataStreamReader {
def format(source: String): DataStreamReader
def schema(schema: StructType): DataStreamReader
def schema(schemaString: String): DataStreamReader
def option(key: String, value: String): DataStreamReader
def option(key: String, value: Boolean): DataStreamReader
def option(key: String, value: Long): DataStreamReader
def option(key: String, value: Double): DataStreamReader
def options(options: scala.collection.Map[String, String]): DataStreamReader
def options(options: java.util.Map[String, String]): DataStreamReader
def load(): DataFrame
def load(path: String): DataFrame
}class DataStreamWriter[T] {
def outputMode(outputMode: OutputMode): DataStreamWriter[T]
def outputMode(outputMode: String): DataStreamWriter[T]
def trigger(trigger: Trigger): DataStreamWriter[T]
def queryName(queryName: String): DataStreamWriter[T]
def format(source: String): DataStreamWriter[T]
def partitionBy(colNames: String*): DataStreamWriter[T]
def option(key: String, value: String): DataStreamWriter[T]
def option(key: String, value: Boolean): DataStreamWriter[T]
def option(key: String, value: Long): DataStreamWriter[T]
def option(key: String, value: Double): DataStreamWriter[T]
def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]
def options(options: java.util.Map[String, String]): DataStreamWriter[T]
def start(): StreamingQuery
def start(path: String): StreamingQuery
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]
}Usage Examples:
// JSON file stream
val jsonStream = spark.readStream
.format("json")
.schema(schema) // Schema is required for file sources
.option("path", "/path/to/json/files")
.load()
// CSV file stream
val csvStream = spark.readStream
.format("csv")
.schema(csvSchema)
.option("header", "true")
.option("path", "/path/to/csv/files")
.load()
// Parquet file stream
val parquetStream = spark.readStream
.format("parquet")
.schema(parquetSchema)
.load("/path/to/parquet/files")
// Text file stream
val textStream = spark.readStream
.format("text")
.option("path", "/path/to/text/files")
.load()// Basic Kafka stream
val kafkaStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1,topic2")
.load()
// Kafka with specific partitions
val kafkaPartitions = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "latest")
.option("endingOffsets", "latest")
.load()
// Kafka stream processing
val processedKafka = kafkaStream
.select(
col("key").cast("string"),
col("value").cast("string"),
col("topic"),
col("partition"),
col("offset"),
col("timestamp")
)
.filter(col("topic") === "important_topic")// Socket stream (testing only)
val socketStream = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()// Rate source for load testing
val rateStream = spark.readStream
.format("rate")
.option("rowsPerSecond", "100")
.option("numPartitions", "10")
.load()object OutputMode {
val Append: OutputMode
val Complete: OutputMode
val Update: OutputMode
}// Append mode - only new rows added to result table
val appendQuery = df.writeStream
.outputMode(OutputMode.Append)
.format("console")
.start()
// Complete mode - entire result table is output
val completeQuery = df
.groupBy("category")
.count()
.writeStream
.outputMode(OutputMode.Complete)
.format("console")
.start()
// Update mode - only updated rows in result table
val updateQuery = df
.groupBy("id")
.agg(max("timestamp"))
.writeStream
.outputMode(OutputMode.Update)
.format("console")
.start()object Trigger {
def ProcessingTime(interval: String): Trigger
def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
def Once(): Trigger
def Continuous(interval: String): Trigger
def Continuous(interval: Long, unit: TimeUnit): Trigger
}import org.apache.spark.sql.streaming.Trigger
import java.util.concurrent.TimeUnit
// Micro-batch processing
val microBatchQuery = df.writeStream
.trigger(Trigger.ProcessingTime("30 seconds"))
.outputMode("append")
.format("console")
.start()
// One-time trigger (batch-like)
val onceQuery = df.writeStream
.trigger(Trigger.Once())
.outputMode("append")
.format("parquet")
.option("path", "/output/path")
.start()
// Continuous processing (experimental)
val continuousQuery = df.writeStream
.trigger(Trigger.Continuous("1 second"))
.outputMode("append")
.format("console")
.start()class StreamingQuery {
def id: UUID
def runId: UUID
def name: String
def sparkSession: SparkSession
def isActive: Boolean
def exception: Option[StreamingQueryException]
def status: StreamingQueryStatus
def recentProgress: Array[StreamingQueryProgress]
def lastProgress: StreamingQueryProgress
def awaitTermination(): Unit
def awaitTermination(timeoutMs: Long): Boolean
def processAllAvailable(): Unit
def stop(): Unit
def explain(): Unit
def explain(extended: Boolean): Unit
}Usage Examples:
// Start a streaming query
val query = df
.groupBy("category")
.count()
.writeStream
.queryName("category_counts")
.outputMode("complete")
.format("memory")
.start()
// Monitor query status
println(s"Query ID: ${query.id}")
println(s"Is Active: ${query.isActive}")
println(s"Recent Progress: ${query.recentProgress.length} batches")
// Wait for termination
query.awaitTermination()
// Or wait with timeout
val finished = query.awaitTermination(60000) // 60 seconds
if (!finished) {
println("Query still running after timeout")
query.stop()
}
// Process all available data (testing)
query.processAllAvailable()
// Stop the query
query.stop()
// Exception handling
query.exception match {
case Some(e) => println(s"Query failed: ${e.getMessage}")
case None => println("Query completed successfully")
}// Manage multiple streaming queries
val queries = mutable.ArrayBuffer[StreamingQuery]()
// Start multiple queries
queries += df1.writeStream.queryName("query1").format("console").start()
queries += df2.writeStream.queryName("query2").format("console").start()
queries += df3.writeStream.queryName("query3").format("console").start()
// Wait for all queries
try {
queries.foreach(_.awaitTermination())
} catch {
case e: Exception =>
println(s"A query failed: ${e.getMessage}")
queries.foreach(_.stop())
}
// Access all active queries
val allQueries = spark.streams.active
allQueries.foreach { query =>
println(s"Query: ${query.name}, Active: ${query.isActive}")
}// Basic console output
val consoleQuery = df.writeStream
.format("console")
.outputMode("append")
.start()
// Console with options
val detailedConsole = df.writeStream
.format("console")
.option("numRows", "50")
.option("truncate", "false")
.outputMode("append")
.start()// Parquet sink
val parquetSink = df.writeStream
.format("parquet")
.option("path", "/output/parquet")
.option("checkpointLocation", "/checkpoint/parquet")
.outputMode("append")
.start()
// JSON sink with partitioning
val jsonSink = df.writeStream
.format("json")
.option("path", "/output/json")
.option("checkpointLocation", "/checkpoint/json")
.partitionBy("year", "month")
.outputMode("append")
.start()
// Delta sink (requires Delta Lake)
val deltaSink = df.writeStream
.format("delta")
.option("path", "/delta/table")
.option("checkpointLocation", "/checkpoint/delta")
.outputMode("append")
.start()// Kafka sink
val kafkaSink = df
.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output_topic")
.option("checkpointLocation", "/checkpoint/kafka")
.outputMode("append")
.start()
// Kafka sink with headers
val kafkaWithHeaders = df
.withColumn("headers",
array(struct(lit("source").alias("key"), lit("spark").alias("value"))))
.selectExpr(
"CAST(id AS STRING) AS key",
"to_json(struct(*)) AS value",
"headers"
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output_topic")
.outputMode("append")
.start()// Memory sink for testing
val memoryQuery = df
.groupBy("category")
.count()
.writeStream
.queryName("memory_table")
.format("memory")
.outputMode("complete")
.start()
// Query the memory table
spark.sql("SELECT * FROM memory_table").show()abstract class ForeachWriter[T] extends Serializable {
def open(partitionId: Long, epochId: Long): Boolean
def process(value: T): Unit
def close(errorOrNull: Throwable): Unit
}import org.apache.spark.sql.ForeachWriter
// Custom writer to database
class DatabaseWriter extends ForeachWriter[Row] {
var connection: java.sql.Connection = _
var statement: java.sql.PreparedStatement = _
def open(partitionId: Long, epochId: Long): Boolean = {
// Initialize connection
connection = java.sql.DriverManager.getConnection(
"jdbc:postgresql://localhost/mydb", "user", "password")
statement = connection.prepareStatement(
"INSERT INTO results (id, value, timestamp) VALUES (?, ?, ?)")
true
}
def process(value: Row): Unit = {
// Process each row
statement.setLong(1, value.getLong(0))
statement.setString(2, value.getString(1))
statement.setTimestamp(3, value.getTimestamp(2))
statement.executeUpdate()
}
def close(errorOrNull: Throwable): Unit = {
// Clean up
if (statement != null) statement.close()
if (connection != null) connection.close()
}
}
// Use custom writer
val customWriterQuery = df.writeStream
.foreach(new DatabaseWriter())
.outputMode("append")
.start()// ForeachBatch for custom processing
val foreachBatchQuery = df.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
println(s"Processing batch $batchId")
// Custom logic per batch
if (batchDF.count() > 0) {
// Write to multiple sinks
batchDF.write
.mode("append")
.parquet(s"/output/batch_$batchId")
// Also update a summary table
val summary = batchDF
.groupBy("category")
.agg(sum("amount").alias("total"))
.withColumn("batch_id", lit(batchId))
summary.write
.mode("append")
.saveAsTable("batch_summaries")
}
}
.outputMode("append")
.start()import org.apache.spark.sql.functions._
// Tumbling window
val tumblingWindow = df
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "15 minutes"),
col("category")
)
.agg(
sum("amount").alias("total_amount"),
count("*").alias("count")
)
// Sliding window
val slidingWindow = df
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "15 minutes", "5 minutes"),
col("category")
)
.agg(
avg("value").alias("avg_value"),
max("value").alias("max_value")
)
// Session window (Spark 3.2+)
val sessionWindow = df
.withWatermark("timestamp", "10 minutes")
.groupBy(
session_window(col("timestamp"), "30 minutes"),
col("user_id")
)
.agg(
count("*").alias("events_in_session"),
sum("duration").alias("total_duration")
)// Handle late data with watermarks
val lateDataQuery = df
.withWatermark("event_time", "10 minutes") // Allow 10 minutes of late data
.groupBy(
window(col("event_time"), "5 minutes"),
col("device_id")
)
.agg(
avg("temperature").alias("avg_temp"),
count("*").alias("reading_count")
)
.writeStream
.outputMode("append") // Only complete windows are output
.format("console")
.start()// Inner join between two streams
val stream1 = spark.readStream.format("kafka")...
val stream2 = spark.readStream.format("kafka")...
val joined = stream1.alias("s1")
.join(
stream2.alias("s2"),
expr("s1.id = s2.id AND s1.timestamp BETWEEN s2.timestamp - INTERVAL 5 MINUTES AND s2.timestamp + INTERVAL 5 MINUTES")
)
// Stream-static join
val staticDF = spark.read.parquet("/path/to/static/data")
val streamStaticJoin = streamDF
.join(staticDF, "key") // No time constraints needed
// Left outer join with watermarks
val outerJoined = stream1
.withWatermark("timestamp", "10 minutes")
.alias("left")
.join(
stream2.withWatermark("timestamp", "20 minutes").alias("right"),
expr("left.id = right.id AND left.timestamp BETWEEN right.timestamp - INTERVAL 5 MINUTES AND right.timestamp + INTERVAL 5 MINUTES"),
"leftOuter"
)import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}
case class InputEvent(userId: String, action: String, timestamp: Long)
case class UserSession(userId: String, startTime: Long, endTime: Long, actionCount: Int)
def updateUserSession(userId: String,
events: Iterator[InputEvent],
state: GroupState[UserSession]): UserSession = {
val currentSession = if (state.exists) state.get else UserSession(userId, Long.MaxValue, 0L, 0)
val newEvents = events.toSeq
val newActionCount = currentSession.actionCount + newEvents.size
val newStartTime = math.min(currentSession.startTime, newEvents.map(_.timestamp).min)
val newEndTime = math.max(currentSession.endTime, newEvents.map(_.timestamp).max)
val updatedSession = UserSession(userId, newStartTime, newEndTime, newActionCount)
// Update timeout
state.setTimeoutDuration("30 minutes")
state.update(updatedSession)
updatedSession
}
val sessionUpdates = inputStream
.as[InputEvent]
.groupByKey(_.userId)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(updateUserSession)// Monitor query progress
val query = df.writeStream...
// Get progress information
val progress = query.lastProgress
println(s"Batch ID: ${progress.batchId}")
println(s"Input rows: ${progress.inputRowsPerSecond}")
println(s"Processing rate: ${progress.inputRowsPerSecond}")
println(s"Batch duration: ${progress.batchDuration}")
// Historical progress
query.recentProgress.foreach { progress =>
println(s"Batch ${progress.batchId}: ${progress.inputRowsPerSecond} rows/sec")
}val status = query.status
println(s"Message: ${status.message}")
println(s"Is trigger active: ${status.isTriggerActive}")
println(s"Is data available: ${status.isDataAvailable}")// Query with error handling
val resilientQuery = df.writeStream
.foreachBatch { (batchDF, batchId) =>
try {
batchDF.write.mode("append").saveAsTable("output_table")
} catch {
case e: Exception =>
println(s"Failed to process batch $batchId: ${e.getMessage}")
// Log to error table or external system
val errorDF = spark.createDataFrame(Seq(
(batchId, e.getMessage, System.currentTimeMillis())
)).toDF("batch_id", "error_message", "timestamp")
errorDF.write.mode("append").saveAsTable("error_log")
}
}
.start()