or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog.mddata-sources.mddata-types.mddataframe-dataset.mdindex.mdsession-management.mdsql-functions.mdstreaming.mdudfs.md
tile.json

streaming.mddocs/

Apache Spark SQL - Streaming

Capabilities

Structured Streaming Query Execution

  • Execute continuous data processing with fault-tolerant, exactly-once semantics using micro-batch processing
  • Handle late-arriving data with configurable watermarking and event-time processing
  • Support for stateful operations including aggregations, joins, and custom state management
  • Process unbounded streams with automatic checkpointing and recovery mechanisms

Stream Data Sources and Sinks

  • Read from various streaming sources including Kafka, files, sockets, and rate sources for testing
  • Write to multiple sink types including files, Kafka, console, memory, and foreach sinks
  • Support for different output modes including append, complete, and update for different use cases
  • Handle schema evolution and format changes in streaming data pipelines

Trigger Management and Processing Control

  • Configure processing triggers including fixed intervals, once triggers, continuous processing, and available now
  • Control micro-batch sizing and processing intervals for throughput and latency optimization
  • Support for event-time processing with watermarks for handling out-of-order data
  • Enable backpressure handling and dynamic batch sizing based on cluster capacity

Stateful Stream Processing

  • Maintain state across micro-batches for complex event processing and session analytics
  • Support for arbitrary stateful processing using mapGroupsWithState and flatMapGroupsWithState
  • Handle state expiration and cleanup with configurable timeout policies
  • Enable stateful stream-stream joins with configurable state retention policies

API Reference

DataStreamReader Class

class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {
  // Format specification
  def format(source: String): DataStreamReader
  
  // Schema definition
  def schema(schema: StructType): DataStreamReader
  def schema(schemaString: String): DataStreamReader
  
  // Options configuration
  def option(key: String, value: String): DataStreamReader
  def option(key: String, value: Boolean): DataStreamReader
  def option(key: String, value: Long): DataStreamReader
  def option(key: String, value: Double): DataStreamReader
  def options(options: scala.collection.Map[String, String]): DataStreamReader
  def options(options: java.util.Map[String, String]): DataStreamReader
  
  // Load operations
  def load(): DataFrame
  def load(path: String): DataFrame
  
  // Format-specific loaders
  def json(path: String): DataFrame
  def csv(path: String): DataFrame
  def parquet(path: String): DataFrame
  def orc(path: String): DataFrame
  def text(path: String): DataFrame
  def textFile(path: String): Dataset[String]
  
  // Kafka loader
  def kafka(): DataFrame
  
  // Socket and rate sources
  def socket(host: String, port: Int): DataFrame
  def socket(host: String, port: Int, includeTimestamp: Boolean): DataFrame
  def rate(rowsPerSecond: Long): DataFrame
  def rate(rowsPerSecond: Long, rampUpTime: Long): DataFrame
  def rate(rowsPerSecond: Long, rampUpTime: Long, numPartitions: Int): DataFrame
}

DataStreamWriter[T] Class

class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
  // Format specification
  def format(source: String): DataStreamWriter[T]
  
  // Output mode configuration
  def outputMode(outputMode: OutputMode): DataStreamWriter[T]
  def outputMode(outputMode: String): DataStreamWriter[T]
  
  // Trigger configuration  
  def trigger(trigger: Trigger): DataStreamWriter[T]
  
  // Options configuration
  def option(key: String, value: String): DataStreamWriter[T]
  def option(key: String, value: Boolean): DataStreamWriter[T]
  def option(key: String, value: Long): DataStreamWriter[T]
  def option(key: String, value: Double): DataStreamWriter[T]
  def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]
  def options(options: java.util.Map[String, String]): DataStreamWriter[T]
  
  // Partitioning and ordering
  def partitionBy(colNames: String*): DataStreamWriter[T]
  def partitionBy(cols: Seq[String]): DataStreamWriter[T]
  
  // Query naming and checkpointing
  def queryName(queryName: String): DataStreamWriter[T]
  def queryTimeout(timeoutMs: Long): DataStreamWriter[T]
  
  // Start operations
  def start(): StreamingQuery
  def start(path: String): StreamingQuery
  
  // Format-specific writers
  def json(path: String): StreamingQuery  
  def csv(path: String): StreamingQuery
  def parquet(path: String): StreamingQuery
  def orc(path: String): StreamingQuery
  def text(path: String): StreamingQuery
  
  // Special sinks
  def console(): StreamingQuery
  def console(numRows: Int): StreamingQuery
  def console(numRows: Int, truncate: Boolean): StreamingQuery
  def memory(queryName: String): StreamingQuery
  def kafka(): StreamingQuery
  
  // Custom sink
  def foreach(writer: ForeachWriter[T]): StreamingQuery
  def foreachBatch(function: (Dataset[T], Long) => Unit): StreamingQuery
  def foreachBatch(function: VoidFunction2[Dataset[T], java.lang.Long]): StreamingQuery
}

StreamingQuery Interface

trait StreamingQuery {
  // Query identification
  def id: UUID
  def runId: UUID
  def name: String
  
  // Query control
  def start(): StreamingQuery
  def stop(): Unit
  def stop(stopGracefully: Boolean): Unit
  def processAllAvailable(): Unit
  
  // Query state
  def isActive: Boolean
  def awaitTermination(): Unit
  def awaitTermination(timeoutMs: Long): Boolean
  def exception: Option[StreamingQueryException]
  
  // Progress monitoring
  def lastProgress: StreamingQueryProgress
  def recentProgress: Array[StreamingQueryProgress]
  def status: StreamingQueryStatus
  
  // Explain plans
  def explain(): Unit
  def explain(extended: Boolean): Unit
}

StreamingQueryManager Class

abstract class StreamingQueryManager {
  // Active queries management
  def active: Array[StreamingQuery]
  def get(id: UUID): StreamingQuery
  def get(name: String): StreamingQuery
  
  // Termination handling
  def awaitAnyTermination(): Unit
  def awaitAnyTermination(timeoutMs: Long): Boolean
  def resetTerminated(): Unit
  
  // Listeners
  def addListener(listener: StreamingQueryListener): Unit
  def removeListener(listener: StreamingQueryListener): Unit
}

Trigger Types

// Base trigger trait
sealed trait Trigger

// Processing time trigger
case class ProcessingTimeTrigger(interval: Long) extends Trigger

object Trigger {
  // Factory methods
  def ProcessingTime(interval: String): Trigger
  def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
  def ProcessingTime(interval: Duration): Trigger
  def Once(): Trigger
  def Continuous(interval: String): Trigger
  def Continuous(interval: Long, unit: TimeUnit): Trigger  
  def Continuous(interval: Duration): Trigger
  def AvailableNow(): Trigger
}

// Specific trigger implementations
case object OnceTrigger extends Trigger
case class ContinuousTrigger(interval: Long) extends Trigger  
case object AvailableNowTrigger extends Trigger

Output Modes

sealed trait OutputMode

object OutputMode {
  case object Append extends OutputMode
  case object Complete extends OutputMode  
  case object Update extends OutputMode
  
  def apply(outputMode: String): OutputMode
}

State Management

// Group state for stateful operations
abstract class GroupState[S] extends Serializable {
  // State access
  def exists: Boolean
  def get: S
  def getOption: Option[S]
  def update(newState: S): Unit
  def remove(): Unit
  
  // Timeout management
  def setTimeoutDuration(durationMs: Long): Unit
  def setTimeoutDuration(duration: String): Unit
  def setTimeoutTimestamp(timestampMs: Long): Unit
  def setTimeoutTimestamp(timestamp: Date): Unit
  def getCurrentWatermarkMs(): Long
  def getCurrentProcessingTimeMs(): Long
  def hasTimedOut: Boolean
}

// State timeout configuration
sealed trait GroupStateTimeout
case object NoTimeout extends GroupStateTimeout
case object ProcessingTimeTimeout extends GroupStateTimeout
case object EventTimeTimeout extends GroupStateTimeout

Streaming Query Progress and Status

// Query progress information
class StreamingQueryProgress private[sql] (
    val id: UUID,
    val runId: UUID, 
    val name: String,
    val timestamp: String,
    val batchId: Long,
    val batchDuration: Long,
    val durationMs: Map[String, Long],
    val eventTime: Map[String, String],
    val stateOperators: Array[StateOperatorProgress],
    val sources: Array[SourceProgress],
    val sink: SinkProgress,
    val observedMetrics: Map[String, Row]) extends Serializable {
  
  def inputRowsPerSecond: Double
  def processedRowsPerSecond: Double  
  def prettyJson: String
  def json: String
}

// Query status information
class StreamingQueryStatus private[sql] (
    val message: String,
    val isDataAvailable: Boolean,
    val isTriggerActive: Boolean) extends Serializable

// State operator progress
class StateOperatorProgress private[sql] (
    val operatorName: String,
    val numRowsTotal: Long,
    val numRowsUpdated: Long,
    val memoryUsedBytes: Long,
    val customMetrics: Map[String, Long] = Map.empty) extends Serializable

// Source progress  
class SourceProgress private[sql] (
    val description: String,
    val startOffset: String,
    val endOffset: String,
    val numInputRows: Long,
    val inputRowsPerSecond: Double,
    val processedRowsPerSecond: Double,
    val metrics: Map[String, String] = Map.empty) extends Serializable

// Sink progress
class SinkProgress private[sql] (
    val description: String,
    val numOutputRows: Long,
    val metrics: Map[String, String] = Map.empty) extends Serializable

Custom Sinks

// ForeachWriter for custom output
abstract class ForeachWriter[T] extends Serializable {
  // Lifecycle methods
  def open(partitionId: Long, epochId: Long): Boolean
  def process(value: T): Unit
  def close(errorOrNull: Throwable): Unit
}

// Simplified batch writer  
abstract class ForeachBatchSink[T] extends Serializable {
  def process(batchDF: Dataset[T], batchId: Long): Unit
}

Watermark Operations

// Watermark functions (in Dataset class)
class Dataset[T] {
  def withWatermark(eventTime: String, delayThreshold: String): Dataset[T]
}

// Watermark utilities
object Functions {
  def window(timeColumn: Column, windowDuration: String): Column
  def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column
  def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column
  def session_window(timeColumn: Column, gapDuration: String): Column
}

Usage Examples

Basic Streaming Setup

import org.apache.spark.sql.{SparkSession, Dataset, DataFrame}
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val spark = SparkSession.builder()
  .appName("Streaming Example")
  .config("spark.sql.streaming.checkpointLocation", "/path/to/checkpoint")
  .getOrCreate()

// Define schema for streaming data
val userActivitySchema = StructType(Array(
  StructField("user_id", StringType, nullable = false),
  StructField("action", StringType, nullable = false),
  StructField("timestamp", TimestampType, nullable = false),
  StructField("session_id", StringType, nullable = false),
  StructField("page_url", StringType, nullable = true),
  StructField("duration", IntegerType, nullable = true)
))

// Read from JSON files
val inputStream = spark.readStream
  .format("json")
  .schema(userActivitySchema)
  .option("maxFilesPerTrigger", "1")
  .load("/path/to/streaming/data")

// Basic transformations
val processedStream = inputStream
  .filter($"action" =!= "heartbeat")
  .withColumn("hour", hour($"timestamp"))
  .withColumn("date", to_date($"timestamp"))

File-based Streaming Sources

// CSV streaming
val csvStream = spark.readStream
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "false")
  .schema(userActivitySchema)
  .load("/path/to/csv/files")

// Parquet streaming
val parquetStream = spark.readStream
  .format("parquet") 
  .schema(userActivitySchema)
  .load("/path/to/parquet/files")

// Text file streaming
val textStream = spark.readStream
  .format("text")
  .load("/path/to/text/files")
  .select(
    regexp_extract($"value", """(\w+)\s+(.+)""", 1).as("level"),
    regexp_extract($"value", """(\w+)\s+(.+)""", 2).as("message"),
    current_timestamp().as("processed_time")
  )

Kafka Integration

// Read from Kafka
val kafkaStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "user-activity,page-views")
  .option("startingOffsets", "latest") // or "earliest"
  .option("failOnDataLoss", "false")
  .load()

// Parse Kafka messages
val parsedKafkaStream = kafkaStream.select(
  $"topic",
  $"partition", 
  $"offset",
  $"timestamp",
  $"key".cast(StringType).as("message_key"),
  from_json($"value".cast(StringType), userActivitySchema).as("data")
).select($"topic", $"partition", $"offset", $"timestamp", $"message_key", $"data.*")

// Write to Kafka
val kafkaOutput = processedStream.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "processed-activity")
  .option("checkpointLocation", "/path/to/kafka/checkpoint")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

Aggregations and Windowing

// Simple aggregations
val userCounts = inputStream
  .groupBy($"user_id")
  .count()

// Time-based windowing
val windowedCounts = inputStream
  .withWatermark("timestamp", "10 minutes")
  .groupBy(
    window($"timestamp", "5 minutes", "1 minute"),
    $"action"
  )
  .agg(
    count("*").as("action_count"),
    countDistinct("user_id").as("unique_users"),
    avg("duration").as("avg_duration")
  )

// Session windows
val sessionAnalysis = inputStream
  .withWatermark("timestamp", "30 minutes")
  .groupBy(
    $"user_id",
    session_window($"timestamp", "10 minutes")
  )
  .agg(
    count("*").as("actions_in_session"),
    sum("duration").as("total_session_time"),
    collect_list("action").as("session_actions")
  )

// Complex aggregations with multiple time windows
val multiWindowAnalysis = inputStream
  .withWatermark("timestamp", "1 hour")
  .groupBy(
    $"user_id",
    window($"timestamp", "1 hour"),  
    $"action"
  )
  .agg(
    count("*").as("hourly_count"),
    first("timestamp").as("first_action_time"),
    last("timestamp").as("last_action_time")
  )
  .groupBy($"user_id", $"window")
  .agg(
    sum("hourly_count").as("total_actions"),
    collect_map("action", "hourly_count").as("action_breakdown")
  )

Stateful Processing

case class UserSession(
  userId: String,
  startTime: Long,
  lastActivity: Long,
  actionCount: Int,
  totalDuration: Int
)

// Stateful session tracking
val sessionTracking = inputStream
  .groupByKey(_.getString(0)) // Group by user_id
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout) {
    (userId: String, values: Iterator[Row], state: GroupState[UserSession]) =>
      
      val events = values.toSeq
      val now = System.currentTimeMillis()
      
      // Get or create session state
      val session = if (state.exists) {
        state.get
      } else {
        UserSession(userId, now, now, 0, 0)
      }
      
      // Update session with new events
      val updatedSession = events.foldLeft(session) { (s, event) =>
        val eventTime = event.getAs[java.sql.Timestamp]("timestamp").getTime
        val duration = Option(event.getAs[Integer]("duration")).map(_.toInt).getOrElse(0)
        
        s.copy(
          lastActivity = math.max(s.lastActivity, eventTime),
          actionCount = s.actionCount + 1,
          totalDuration = s.totalDuration + duration
        )
      }
      
      // Set timeout for 30 minutes of inactivity
      state.setTimeoutDuration("30 minutes")
      
      // Check for session timeout
      if (state.hasTimedOut) {
        state.remove()
        ("session_expired", updatedSession)
      } else {
        state.update(updatedSession)
        ("session_active", updatedSession)
      }
  }

Stream-Stream Joins

// Define two streams
val clickStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "clicks")
  .load()
  .select(
    from_json($"value".cast("string"), clickSchema).as("data")
  )
  .select($"data.*")
  .withWatermark("timestamp", "2 hours")

val impressionStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092") 
  .option("subscribe", "impressions")
  .load()
  .select(
    from_json($"value".cast("string"), impressionSchema).as("data")
  )
  .select($"data.*")
  .withWatermark("timestamp", "3 hours")

// Join streams with time constraints
val joinedStream = impressionStream.join(
  clickStream,
  expr("""
    impression_id = click_impression_id AND
    click_timestamp >= impression_timestamp AND  
    click_timestamp <= impression_timestamp + interval 1 hour
  """),
  joinType = "leftOuter"
)

// Aggregate joined data
val conversionAnalysis = joinedStream
  .withWatermark("impression_timestamp", "1 hour")
  .groupBy(
    window($"impression_timestamp", "10 minutes"),
    $"campaign_id"
  )
  .agg(
    count("impression_id").as("impressions"),
    count("click_impression_id").as("clicks"),
    (count("click_impression_id") * 100.0 / count("impression_id")).as("ctr")
  )

Output Modes and Triggers

// Append mode - only new rows
val appendQuery = processedStream.writeStream
  .outputMode(OutputMode.Append)
  .format("parquet")
  .option("path", "/path/to/output")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(Trigger.ProcessingTime("1 minute"))
  .start()

// Complete mode - entire result table
val completeQuery = userCounts.writeStream
  .outputMode(OutputMode.Complete)
  .format("memory")
  .queryName("user_counts_table")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

// Update mode - only changed rows
val updateQuery = windowedCounts.writeStream
  .outputMode(OutputMode.Update)
  .format("console")
  .option("truncate", "false")
  .trigger(Trigger.ProcessingTime("2 minutes"))
  .start()

// Once trigger - single micro-batch
val onceQuery = processedStream.writeStream
  .outputMode(OutputMode.Append)
  .format("json")
  .option("path", "/path/to/batch/output")
  .trigger(Trigger.Once)
  .start()

// Available now trigger - process all available data
val availableNowQuery = processedStream.writeStream
  .outputMode(OutputMode.Append)
  .format("delta")
  .option("path", "/path/to/delta/table")
  .trigger(Trigger.AvailableNow)
  .start()

// Continuous processing (experimental)
val continuousQuery = inputStream
  .filter($"action" === "purchase")
  .writeStream
  .outputMode(OutputMode.Append)
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()

Custom Sinks

// Custom ForeachWriter
class DatabaseWriter extends ForeachWriter[Row] {
  var connection: java.sql.Connection = _
  var statement: java.sql.PreparedStatement = _
  
  override def open(partitionId: Long, epochId: Long): Boolean = {
    // Initialize database connection
    connection = java.sql.DriverManager.getConnection(
      "jdbc:postgresql://localhost/mydb", "user", "password"
    )
    statement = connection.prepareStatement(
      "INSERT INTO user_activity (user_id, action, timestamp) VALUES (?, ?, ?)"
    )
    true
  }
  
  override def process(value: Row): Unit = {
    statement.setString(1, value.getString(0))
    statement.setString(2, value.getString(1))
    statement.setTimestamp(3, value.getTimestamp(2))
    statement.executeUpdate()
  }
  
  override def close(errorOrNull: Throwable): Unit = {
    if (statement != null) statement.close()
    if (connection != null) connection.close()
  }
}

// Use custom writer
val customSinkQuery = processedStream.writeStream
  .foreach(new DatabaseWriter())
  .option("checkpointLocation", "/path/to/custom/checkpoint")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

// ForeachBatch for batch processing
val foreachBatchQuery = processedStream.writeStream
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    println(s"Processing batch $batchId with ${batchDF.count()} records")
    
    // Custom processing logic
    batchDF.cache()
    
    // Write to multiple sinks
    batchDF.write
      .mode("append")
      .parquet(s"/path/to/archive/batch_$batchId")
      
    batchDF.write
      .format("jdbc")
      .option("url", "jdbc:postgresql://localhost/mydb")
      .option("dbtable", "processed_activity")
      .mode("append")
      .save()
    
    batchDF.unpersist()
  }
  .option("checkpointLocation", "/path/to/batch/checkpoint")
  .start()

Query Management and Monitoring

// Start multiple queries
val queries = Array(appendQuery, completeQuery, updateQuery)

// Monitor query status
queries.foreach { query =>
  println(s"Query ${query.name} - Active: ${query.isActive}")
  if (query.lastProgress != null) {
    println(s"Batch ${query.lastProgress.batchId} processed ${query.lastProgress.inputRowsPerSecond} rows/sec")
  }
}

// Wait for termination
spark.streams.awaitAnyTermination()

// Query manager operations
val activeQueries = spark.streams.active
println(s"Number of active queries: ${activeQueries.length}")

// Get specific query by name
val specificQuery = spark.streams.get("user_counts_table")

// Add streaming query listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
    println(s"Query started: ${queryStarted.name}")
  }
  
  override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
    println(s"Query terminated: ${queryTerminated.id}")
  }
  
  override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
    val progress = queryProgress.progress
    println(s"Query ${progress.name}: processed ${progress.inputRowsPerSecond} rows/sec")
  }
})

// Graceful shutdown
sys.addShutdownHook {
  println("Stopping streaming queries...")
  spark.streams.active.foreach(_.stop())
  spark.stop()
}