or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aggregations.mdcatalog.mddata-io.mddataframe-dataset.mdfunctions-expressions.mdindex.mdsession-management.mdstreaming.md
tile.json

streaming.mddocs/

Streaming Queries

Spark Structured Streaming provides a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. It allows you to express streaming computations the same way you would express a batch computation on static data, using the same DataFrame and Dataset APIs.

Core Streaming Concepts

DataStreamReader

class DataStreamReader {
  def format(source: String): DataStreamReader
  def schema(schema: StructType): DataStreamReader
  def schema(schemaString: String): DataStreamReader
  def option(key: String, value: String): DataStreamReader
  def option(key: String, value: Boolean): DataStreamReader
  def option(key: String, value: Long): DataStreamReader
  def option(key: String, value: Double): DataStreamReader
  def options(options: scala.collection.Map[String, String]): DataStreamReader
  def options(options: java.util.Map[String, String]): DataStreamReader
  def load(): DataFrame
  def load(path: String): DataFrame
}

DataStreamWriter

class DataStreamWriter[T] {
  def outputMode(outputMode: OutputMode): DataStreamWriter[T]
  def outputMode(outputMode: String): DataStreamWriter[T]
  def trigger(trigger: Trigger): DataStreamWriter[T]
  def queryName(queryName: String): DataStreamWriter[T]
  def format(source: String): DataStreamWriter[T]
  def partitionBy(colNames: String*): DataStreamWriter[T]
  def option(key: String, value: String): DataStreamWriter[T]
  def option(key: String, value: Boolean): DataStreamWriter[T]
  def option(key: String, value: Long): DataStreamWriter[T]
  def option(key: String, value: Double): DataStreamWriter[T]
  def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]
  def options(options: java.util.Map[String, String]): DataStreamWriter[T]
  def start(): StreamingQuery
  def start(path: String): StreamingQuery
  def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
  def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]
}

Stream Sources

File Sources

Usage Examples:

// JSON file stream
val jsonStream = spark.readStream
  .format("json")
  .schema(schema)  // Schema is required for file sources
  .option("path", "/path/to/json/files")
  .load()

// CSV file stream
val csvStream = spark.readStream
  .format("csv")
  .schema(csvSchema)
  .option("header", "true")
  .option("path", "/path/to/csv/files")
  .load()

// Parquet file stream
val parquetStream = spark.readStream
  .format("parquet")
  .schema(parquetSchema)
  .load("/path/to/parquet/files")

// Text file stream
val textStream = spark.readStream
  .format("text")
  .option("path", "/path/to/text/files")
  .load()

Kafka Source

// Basic Kafka stream
val kafkaStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic1,topic2")
  .load()

// Kafka with specific partitions
val kafkaPartitions = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "latest")
  .option("endingOffsets", "latest")
  .load()

// Kafka stream processing
val processedKafka = kafkaStream
  .select(
    col("key").cast("string"),
    col("value").cast("string"),
    col("topic"),
    col("partition"),
    col("offset"),
    col("timestamp")
  )
  .filter(col("topic") === "important_topic")

Socket Source (for testing)

// Socket stream (testing only)
val socketStream = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

Rate Source (for testing)

// Rate source for load testing
val rateStream = spark.readStream
  .format("rate")
  .option("rowsPerSecond", "100")
  .option("numPartitions", "10")
  .load()

Output Modes

object OutputMode {
  val Append: OutputMode
  val Complete: OutputMode
  val Update: OutputMode
}

Output Mode Usage

// Append mode - only new rows added to result table
val appendQuery = df.writeStream
  .outputMode(OutputMode.Append)
  .format("console")
  .start()

// Complete mode - entire result table is output
val completeQuery = df
  .groupBy("category")
  .count()
  .writeStream
  .outputMode(OutputMode.Complete)
  .format("console")
  .start()

// Update mode - only updated rows in result table
val updateQuery = df
  .groupBy("id")
  .agg(max("timestamp"))
  .writeStream
  .outputMode(OutputMode.Update)
  .format("console")
  .start()

Triggers

object Trigger {
  def ProcessingTime(interval: String): Trigger
  def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
  def Once(): Trigger
  def Continuous(interval: String): Trigger
  def Continuous(interval: Long, unit: TimeUnit): Trigger
}

Trigger Examples

import org.apache.spark.sql.streaming.Trigger
import java.util.concurrent.TimeUnit

// Micro-batch processing
val microBatchQuery = df.writeStream
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .outputMode("append")
  .format("console")
  .start()

// One-time trigger (batch-like)
val onceQuery = df.writeStream
  .trigger(Trigger.Once())
  .outputMode("append")
  .format("parquet")
  .option("path", "/output/path")
  .start()

// Continuous processing (experimental)
val continuousQuery = df.writeStream
  .trigger(Trigger.Continuous("1 second"))
  .outputMode("append")
  .format("console")
  .start()

StreamingQuery Management

class StreamingQuery {
  def id: UUID
  def runId: UUID
  def name: String
  def sparkSession: SparkSession
  def isActive: Boolean
  def exception: Option[StreamingQueryException]
  def status: StreamingQueryStatus
  def recentProgress: Array[StreamingQueryProgress]
  def lastProgress: StreamingQueryProgress
  
  def awaitTermination(): Unit
  def awaitTermination(timeoutMs: Long): Boolean
  def processAllAvailable(): Unit
  def stop(): Unit
  def explain(): Unit
  def explain(extended: Boolean): Unit
}

Query Lifecycle Management

Usage Examples:

// Start a streaming query
val query = df
  .groupBy("category")
  .count()
  .writeStream
  .queryName("category_counts")
  .outputMode("complete")
  .format("memory")
  .start()

// Monitor query status
println(s"Query ID: ${query.id}")
println(s"Is Active: ${query.isActive}")
println(s"Recent Progress: ${query.recentProgress.length} batches")

// Wait for termination
query.awaitTermination()

// Or wait with timeout
val finished = query.awaitTermination(60000) // 60 seconds
if (!finished) {
  println("Query still running after timeout")
  query.stop()
}

// Process all available data (testing)
query.processAllAvailable()

// Stop the query
query.stop()

// Exception handling
query.exception match {
  case Some(e) => println(s"Query failed: ${e.getMessage}")
  case None => println("Query completed successfully")
}

Multiple Query Management

// Manage multiple streaming queries
val queries = mutable.ArrayBuffer[StreamingQuery]()

// Start multiple queries
queries += df1.writeStream.queryName("query1").format("console").start()
queries += df2.writeStream.queryName("query2").format("console").start()
queries += df3.writeStream.queryName("query3").format("console").start()

// Wait for all queries
try {
  queries.foreach(_.awaitTermination())
} catch {
  case e: Exception =>
    println(s"A query failed: ${e.getMessage}")
    queries.foreach(_.stop())
}

// Access all active queries
val allQueries = spark.streams.active
allQueries.foreach { query =>
  println(s"Query: ${query.name}, Active: ${query.isActive}")
}

Stream Sinks

Console Sink

// Basic console output
val consoleQuery = df.writeStream
  .format("console")
  .outputMode("append")
  .start()

// Console with options
val detailedConsole = df.writeStream
  .format("console")
  .option("numRows", "50")
  .option("truncate", "false")
  .outputMode("append")
  .start()

File Sinks

// Parquet sink
val parquetSink = df.writeStream
  .format("parquet")
  .option("path", "/output/parquet")
  .option("checkpointLocation", "/checkpoint/parquet")
  .outputMode("append")
  .start()

// JSON sink with partitioning
val jsonSink = df.writeStream
  .format("json")
  .option("path", "/output/json")
  .option("checkpointLocation", "/checkpoint/json")
  .partitionBy("year", "month")
  .outputMode("append")
  .start()

// Delta sink (requires Delta Lake)
val deltaSink = df.writeStream
  .format("delta")
  .option("path", "/delta/table")
  .option("checkpointLocation", "/checkpoint/delta")
  .outputMode("append")
  .start()

Kafka Sink

// Kafka sink
val kafkaSink = df
  .selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output_topic")
  .option("checkpointLocation", "/checkpoint/kafka")
  .outputMode("append")
  .start()

// Kafka sink with headers
val kafkaWithHeaders = df
  .withColumn("headers", 
    array(struct(lit("source").alias("key"), lit("spark").alias("value"))))
  .selectExpr(
    "CAST(id AS STRING) AS key",
    "to_json(struct(*)) AS value",
    "headers"
  )
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output_topic")
  .outputMode("append")
  .start()

Memory Sink (for testing)

// Memory sink for testing
val memoryQuery = df
  .groupBy("category")
  .count()
  .writeStream
  .queryName("memory_table")
  .format("memory")
  .outputMode("complete")
  .start()

// Query the memory table
spark.sql("SELECT * FROM memory_table").show()

Custom Sinks with ForeachWriter

abstract class ForeachWriter[T] extends Serializable {
  def open(partitionId: Long, epochId: Long): Boolean
  def process(value: T): Unit
  def close(errorOrNull: Throwable): Unit
}

Custom ForeachWriter Example

import org.apache.spark.sql.ForeachWriter

// Custom writer to database
class DatabaseWriter extends ForeachWriter[Row] {
  var connection: java.sql.Connection = _
  var statement: java.sql.PreparedStatement = _
  
  def open(partitionId: Long, epochId: Long): Boolean = {
    // Initialize connection
    connection = java.sql.DriverManager.getConnection(
      "jdbc:postgresql://localhost/mydb", "user", "password")
    statement = connection.prepareStatement(
      "INSERT INTO results (id, value, timestamp) VALUES (?, ?, ?)")
    true
  }
  
  def process(value: Row): Unit = {
    // Process each row
    statement.setLong(1, value.getLong(0))
    statement.setString(2, value.getString(1))
    statement.setTimestamp(3, value.getTimestamp(2))
    statement.executeUpdate()
  }
  
  def close(errorOrNull: Throwable): Unit = {
    // Clean up
    if (statement != null) statement.close()
    if (connection != null) connection.close()
  }
}

// Use custom writer
val customWriterQuery = df.writeStream
  .foreach(new DatabaseWriter())
  .outputMode("append")
  .start()

ForeachBatch for Custom Logic

// ForeachBatch for custom processing
val foreachBatchQuery = df.writeStream
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    println(s"Processing batch $batchId")
    
    // Custom logic per batch
    if (batchDF.count() > 0) {
      // Write to multiple sinks
      batchDF.write
        .mode("append")
        .parquet(s"/output/batch_$batchId")
      
      // Also update a summary table
      val summary = batchDF
        .groupBy("category")
        .agg(sum("amount").alias("total"))
        .withColumn("batch_id", lit(batchId))
      
      summary.write
        .mode("append")
        .saveAsTable("batch_summaries")
    }
  }
  .outputMode("append")
  .start()

Windowed Aggregations

Time Windows

import org.apache.spark.sql.functions._

// Tumbling window
val tumblingWindow = df
  .withWatermark("timestamp", "10 minutes")
  .groupBy(
    window(col("timestamp"), "15 minutes"),
    col("category")
  )
  .agg(
    sum("amount").alias("total_amount"),
    count("*").alias("count")
  )

// Sliding window
val slidingWindow = df
  .withWatermark("timestamp", "10 minutes")
  .groupBy(
    window(col("timestamp"), "15 minutes", "5 minutes"),
    col("category")
  )
  .agg(
    avg("value").alias("avg_value"),
    max("value").alias("max_value")
  )

// Session window (Spark 3.2+)
val sessionWindow = df
  .withWatermark("timestamp", "10 minutes")
  .groupBy(
    session_window(col("timestamp"), "30 minutes"),
    col("user_id")
  )
  .agg(
    count("*").alias("events_in_session"),
    sum("duration").alias("total_duration")
  )

Watermarks for Late Data

// Handle late data with watermarks
val lateDataQuery = df
  .withWatermark("event_time", "10 minutes")  // Allow 10 minutes of late data
  .groupBy(
    window(col("event_time"), "5 minutes"),
    col("device_id")
  )
  .agg(
    avg("temperature").alias("avg_temp"),
    count("*").alias("reading_count")
  )
  .writeStream
  .outputMode("append")  // Only complete windows are output
  .format("console")
  .start()

Stream-Stream Joins

// Inner join between two streams
val stream1 = spark.readStream.format("kafka")...
val stream2 = spark.readStream.format("kafka")...

val joined = stream1.alias("s1")
  .join(
    stream2.alias("s2"),
    expr("s1.id = s2.id AND s1.timestamp BETWEEN s2.timestamp - INTERVAL 5 MINUTES AND s2.timestamp + INTERVAL 5 MINUTES")
  )

// Stream-static join
val staticDF = spark.read.parquet("/path/to/static/data")
val streamStaticJoin = streamDF
  .join(staticDF, "key")  // No time constraints needed

// Left outer join with watermarks
val outerJoined = stream1
  .withWatermark("timestamp", "10 minutes")
  .alias("left")
  .join(
    stream2.withWatermark("timestamp", "20 minutes").alias("right"),
    expr("left.id = right.id AND left.timestamp BETWEEN right.timestamp - INTERVAL 5 MINUTES AND right.timestamp + INTERVAL 5 MINUTES"),
    "leftOuter"
  )

Stateful Processing

mapGroupsWithState

import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}

case class InputEvent(userId: String, action: String, timestamp: Long)
case class UserSession(userId: String, startTime: Long, endTime: Long, actionCount: Int)

def updateUserSession(userId: String, 
                     events: Iterator[InputEvent], 
                     state: GroupState[UserSession]): UserSession = {
  
  val currentSession = if (state.exists) state.get else UserSession(userId, Long.MaxValue, 0L, 0)
  
  val newEvents = events.toSeq
  val newActionCount = currentSession.actionCount + newEvents.size
  val newStartTime = math.min(currentSession.startTime, newEvents.map(_.timestamp).min)
  val newEndTime = math.max(currentSession.endTime, newEvents.map(_.timestamp).max)
  
  val updatedSession = UserSession(userId, newStartTime, newEndTime, newActionCount)
  
  // Update timeout
  state.setTimeoutDuration("30 minutes")
  state.update(updatedSession)
  
  updatedSession
}

val sessionUpdates = inputStream
  .as[InputEvent]
  .groupByKey(_.userId)
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(updateUserSession)

Monitoring and Debugging

Query Progress

// Monitor query progress
val query = df.writeStream...

// Get progress information
val progress = query.lastProgress
println(s"Batch ID: ${progress.batchId}")
println(s"Input rows: ${progress.inputRowsPerSecond}")
println(s"Processing rate: ${progress.inputRowsPerSecond}")
println(s"Batch duration: ${progress.batchDuration}")

// Historical progress
query.recentProgress.foreach { progress =>
  println(s"Batch ${progress.batchId}: ${progress.inputRowsPerSecond} rows/sec")
}

Streaming Query Status

val status = query.status
println(s"Message: ${status.message}")
println(s"Is trigger active: ${status.isTriggerActive}")
println(s"Is data available: ${status.isDataAvailable}")

Error Handling

// Query with error handling
val resilientQuery = df.writeStream
  .foreachBatch { (batchDF, batchId) =>
    try {
      batchDF.write.mode("append").saveAsTable("output_table")
    } catch {
      case e: Exception =>
        println(s"Failed to process batch $batchId: ${e.getMessage}")
        // Log to error table or external system
        val errorDF = spark.createDataFrame(Seq(
          (batchId, e.getMessage, System.currentTimeMillis())
        )).toDF("batch_id", "error_message", "timestamp")
        errorDF.write.mode("append").saveAsTable("error_log")
    }
  }
  .start()