or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog.mdcolumns-functions.mddata-io.mddataset-dataframe.mdindex.mdsession-management.mdstreaming.mdtypes-encoders.mdudfs.md
tile.json

streaming.mddocs/

Streaming Operations

Real-time data processing with structured streaming. Provides unified batch and streaming APIs with exactly-once processing guarantees, fault tolerance, and advanced streaming analytics capabilities.

Capabilities

DataStreamReader

Interface for reading streaming data from various sources.

/**
 * Interface for reading streaming data from various sources
 */
class DataStreamReader {
  /** Specify streaming data source format */
  def format(source: String): DataStreamReader
  
  /** Set schema for the streaming data */
  def schema(schema: StructType): DataStreamReader
  def schema(schemaString: String): DataStreamReader
  
  /** Set options for the streaming source */
  def option(key: String, value: String): DataStreamReader
  def option(key: String, value: Boolean): DataStreamReader
  def option(key: String, value: Long): DataStreamReader
  def option(key: String, value: Double): DataStreamReader
  def options(options: scala.collection.Map[String, String]): DataStreamReader
  def options(options: java.util.Map[String, String]): DataStreamReader
  
  /** Load streaming data using generic interface */
  def load(): DataFrame
  def load(path: String): DataFrame
  
  /** Built-in streaming sources */
  def text(path: String): DataFrame
  def textFile(path: String): Dataset[String]
  def csv(path: String): DataFrame
  def json(path: String): DataFrame
  def parquet(path: String): DataFrame
  def orc(path: String): DataFrame
  
  /** Kafka streaming source */
  def kafka(): DataFrame
  
  /** Socket streaming source (for testing) */
  def socket(host: String, port: Int): DataFrame
  def socket(host: String, port: Int, includeTimestamp: Boolean): DataFrame
  
  /** Rate streaming source (for testing) */
  def rate(rowsPerSecond: Long): DataFrame
  def rate(rowsPerSecond: Long, numPartitions: Int): DataFrame
}

Usage Examples:

// File-based streaming
val streamingDf = spark.readStream
  .schema(schema)
  .option("maxFilesPerTrigger", "10")
  .json("path/to/streaming/json/files")

// Kafka streaming
val kafkaStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", "latest")
  .load()

// Socket streaming (for testing)
val socketStream = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Rate source (for testing)
val rateStream = spark.readStream
  .format("rate")
  .option("rowsPerSecond", 100)
  .option("numPartitions", 4)
  .load()

DataStreamWriter

Interface for writing streaming Dataset to various sinks.

/**
 * Interface for writing streaming Dataset to various sinks
 * @tparam T Type of the streaming Dataset
 */
class DataStreamWriter[T] {
  /** Specify output format */
  def format(source: String): DataStreamWriter[T]
  
  /** Set output mode */
  def outputMode(outputMode: OutputMode): DataStreamWriter[T]
  def outputMode(outputMode: String): DataStreamWriter[T]
  
  /** Set trigger for micro-batch processing */
  def trigger(trigger: Trigger): DataStreamWriter[T]
  
  /** Set options for the streaming sink */
  def option(key: String, value: String): DataStreamWriter[T]
  def option(key: String, value: Boolean): DataStreamWriter[T]
  def option(key: String, value: Long): DataStreamWriter[T]
  def option(key: String, value: Double): DataStreamWriter[T]
  def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]
  def options(options: java.util.Map[String, String]): DataStreamWriter[T]
  
  /** Partition output by columns */
  def partitionBy(colNames: String*): DataStreamWriter[T]
  
  /** Query name for identification */
  def queryName(queryName: String): DataStreamWriter[T]
  
  /** Start streaming query */
  def start(): StreamingQuery
  def start(path: String): StreamingQuery
  
  /** Built-in streaming sinks */
  def console(): DataStreamWriter[T]
  def console(numRows: Int): DataStreamWriter[T]
  def console(numRows: Int, truncate: Boolean): DataStreamWriter[T]
  
  /** File-based sinks */
  def json(path: String): StreamingQuery
  def parquet(path: String): StreamingQuery
  def orc(path: String): StreamingQuery
  def text(path: String): StreamingQuery
  def csv(path: String): StreamingQuery
  
  /** Custom row-by-row processing */
  def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
  def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]
  
  /** Memory sink (for testing) */
  def memory(queryName: String): DataStreamWriter[T]
}

Usage Examples:

// Console output
val query1 = streamingDf.writeStream
  .outputMode("append")
  .format("console")
  .option("numRows", 20)
  .option("truncate", false)
  .start()

// File output with partitioning
val query2 = streamingDf.writeStream
  .format("parquet")
  .outputMode("append")
  .option("path", "output/streaming")
  .option("checkpointLocation", "checkpoints/streaming")
  .partitionBy("year", "month")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

// Kafka output
val query3 = processedStream
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .option("checkpointLocation", "checkpoints/kafka-out")
  .outputMode("append")
  .start()

// Custom processing with foreachBatch
val query4 = streamingDf.writeStream
  .foreachBatch { (batchDf: DataFrame, batchId: Long) =>
    println(s"Processing batch $batchId with ${batchDf.count()} records")
    batchDf.write
      .mode(SaveMode.Append)
      .saveAsTable("streaming_results")
  }
  .trigger(Trigger.ProcessingTime("1 minute"))
  .start()

Output Modes

Different output modes for streaming queries.

/**
 * Output modes for streaming queries
 */
object OutputMode {
  /** Only append new rows to the result table */
  val Append: OutputMode = "append"
  
  /** Output complete result table for each trigger */
  val Complete: OutputMode = "complete"
  
  /** Output only updated rows since last trigger */
  val Update: OutputMode = "update"
}

Triggers

Control timing and execution of streaming micro-batches.

/**
 * Triggers for controlling streaming execution
 */
sealed trait Trigger

object Trigger {
  /** Process data as fast as possible */
  def ProcessingTime(interval: String): Trigger
  def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
  
  /** Process data once and stop */
  def Once(): Trigger
  
  /** Trigger based on availability of data */
  def AvailableNow(): Trigger
  
  /** Continuous processing (experimental) */
  def Continuous(interval: String): Trigger
  def Continuous(interval: Long, unit: TimeUnit): Trigger
}

Usage Examples:

import java.util.concurrent.TimeUnit

// Process every 30 seconds
val trigger1 = Trigger.ProcessingTime("30 seconds")
val trigger2 = Trigger.ProcessingTime(30, TimeUnit.SECONDS)

// Process once and terminate
val trigger3 = Trigger.Once()

// Process all available data and terminate
val trigger4 = Trigger.AvailableNow()

// Continuous processing (low-latency)
val trigger5 = Trigger.Continuous("1 second")

StreamingQuery

Handle to a running streaming query with control and monitoring capabilities.

/**
 * Handle to a running streaming query
 */
trait StreamingQuery {
  /** Unique identifier for the query */
  def id: UUID
  
  /** Name of the query */
  def name: String
  
  /** Check if query is currently active */
  def isActive: Boolean
  
  /** Block until query terminates */
  def awaitTermination(): Unit
  def awaitTermination(timeoutMs: Long): Boolean
  
  /** Stop the query */
  def stop(): Unit
  
  /** Get the most recent progress update */
  def lastProgress: StreamingQueryProgress
  
  /** Get recent progress updates */
  def recentProgress: Array[StreamingQueryProgress]
  
  /** Get current status */
  def status: StreamingQueryStatus
  
  /** Get exception that caused query to stop (if any) */
  def exception: Option[StreamingQueryException]
  
  /** Explain the query plan */
  def explain(): Unit
  def explain(extended: Boolean): Unit
}

StreamingQueryManager

Manager for all StreamingQueries in a SparkSession.

/**
 * Manager for all StreamingQueries in a SparkSession
 */
class StreamingQueryManager {
  /** Get currently active streaming queries */
  def active: Array[StreamingQuery]
  
  /** Get a query by id */
  def get(id: UUID): StreamingQuery
  def get(id: String): StreamingQuery
  
  /** Block until all queries terminate */
  def awaitAnyTermination(): Unit
  def awaitAnyTermination(timeoutMs: Long): Boolean
  
  /** Reset terminates all active queries */
  def resetTerminated(): Unit
  
  /** Add listener for streaming query events */
  def addListener(listener: StreamingQueryListener): Unit
  def removeListener(listener: StreamingQueryListener): Unit
}

ForeachWriter

Custom sink for row-by-row processing.

/**
 * Abstract class for custom streaming sinks
 * @tparam T Type of rows to process
 */
abstract class ForeachWriter[T] extends Serializable {
  /** Called when starting to process a partition */
  def open(partitionId: Long, epochId: Long): Boolean
  
  /** Called for each row */
  def process(value: T): Unit
  
  /** Called when finishing processing a partition */
  def close(errorOrNull: Throwable): Unit
}

Usage Example:

import org.apache.spark.sql.ForeachWriter

val customWriter = new ForeachWriter[Row] {
  def open(partitionId: Long, epochId: Long): Boolean = {
    // Initialize resources (e.g., database connection)
    println(s"Opening partition $partitionId for epoch $epochId")
    true
  }
  
  def process(value: Row): Unit = {
    // Process each row
    val id = value.getAs[Long]("id")
    val name = value.getAs[String]("name")
    println(s"Processing record: $id, $name")
    // Write to external system
  }
  
  def close(errorOrNull: Throwable): Unit = {
    // Clean up resources
    if (errorOrNull != null) {
      println(s"Error occurred: ${errorOrNull.getMessage}")
    }
    println("Closing partition")
  }
}

val query = streamingDf.writeStream
  .foreach(customWriter)
  .start()

Streaming Aggregations

Advanced aggregation operations in streaming context.

Time-based aggregations:

import org.apache.spark.sql.functions._

// Window-based aggregations
val windowedCounts = streamingDf
  .withWatermark("timestamp", "10 minutes")
  .groupBy(
    window(col("timestamp"), "5 minutes", "1 minute"),
    col("category")
  )
  .count()

// Tumbling window
val tumblingWindow = streamingDf
  .withWatermark("timestamp", "10 minutes")
  .groupBy(window(col("timestamp"), "10 minutes"))
  .agg(sum("amount").alias("total_amount"))

// Session window (event-time sessions)
val sessionWindow = streamingDf
  .withWatermark("timestamp", "10 minutes")
  .groupBy(
    col("userId"),
    session_window(col("timestamp"), "5 minutes")
  )
  .agg(count("*").alias("events_in_session"))

Stateful operations:

// Global aggregations (require complete output mode)
val globalCounts = streamingDf
  .groupBy("category")
  .count()

// With watermarking for late data handling
val withWatermark = streamingDf
  .withWatermark("eventTime", "1 hour")
  .groupBy("userId", window(col("eventTime"), "30 minutes"))
  .agg(sum("amount").alias("total"))

Common Streaming Patterns

Exactly-once processing:

val query = streamingDf
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", "s3://bucket/checkpoints/query1")
  .format("delta") // or other ACID-compliant sink
  .start("s3://bucket/output/table")

Error handling and monitoring:

val query = streamingDf.writeStream
  .foreachBatch { (batchDf: DataFrame, batchId: Long) =>
    try {
      batchDf.write
        .mode(SaveMode.Append)
        .saveAsTable("results")
    } catch {
      case e: Exception =>
        println(s"Error processing batch $batchId: ${e.getMessage}")
        // Log to monitoring system
        throw e  // Re-throw to trigger query restart
    }
  }
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

// Monitor query progress
val progressInfo = query.lastProgress
println(s"Input rows/sec: ${progressInfo.inputRowsPerSecond}")
println(s"Processing time: ${progressInfo.durationMs}")

Stream-stream joins:

val stream1 = spark.readStream.format("kafka")...
val stream2 = spark.readStream.format("kafka")...

val joinedStream = stream1.alias("s1")
  .join(stream2.alias("s2"),
    expr("s1.key = s2.key AND s1.timestamp >= s2.timestamp - interval 1 hour AND s1.timestamp <= s2.timestamp + interval 1 hour"),
    "inner"
  )

Stream-static joins:

val staticDf = spark.read.table("reference_data")
val streamingDf = spark.readStream...

val enrichedStream = streamingDf
  .join(staticDf, "key")  // Broadcast join with static data