or run

tessl search
Log in

Version

Files

docs

batch-operations.mdconnection-configuration.mderror-handling-reliability.mdindex.mdoffset-management.mdschema-data-format.mdstreaming-operations.md
tile.json

streaming-operations.mddocs/

Streaming Operations

Streaming source and sink functionality with advanced configuration options for real-time data processing with Apache Kafka.

Capabilities

Streaming Source Configuration

Configure streaming reads from Kafka with various performance and reliability options.

Rate Limiting

Control the rate of data consumption to prevent overwhelming downstream processing.

/**
 * Maximum number of offsets to process per trigger
 * Helps prevent memory issues and provides backpressure control
 * @param limit - Number of offsets per trigger (default: unlimited)
 */
.option("maxOffsetsPerTrigger", "1000")

Usage Example:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "high-volume-topic")
  .option("maxOffsetsPerTrigger", "5000")  // Process max 5000 messages per batch
  .load()

val query = df
  .selectExpr("CAST(value AS STRING) as message")
  .writeStream
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .outputMode("append")
  .format("console")
  .start()

Partition Control

Configure minimum partitions for reading to improve parallelism.

/**
 * Minimum number of partitions for the streaming DataFrame
 * Useful when Kafka topic has fewer partitions than desired Spark parallelism
 * @param partitions - Minimum partition count (must be positive)
 */
.option("minPartitions", "8")

Usage Example:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "single-partition-topic")
  .option("minPartitions", "10")  // Create 10 Spark partitions from 1 Kafka partition
  .load()

Data Loss Handling

Configure behavior when data is no longer available in Kafka.

/**
 * Control behavior when data loss is detected
 * @param policy - "true" (fail query) or "false" (continue with warning)
 * Default: "true" (fail on data loss)
 */
.option("failOnDataLoss", "false")

Usage Examples:

// Fail query when data is lost (default behavior)
val strictDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "critical-events")
  .option("failOnDataLoss", "true")  // Explicit - this is the default
  .load()

// Continue processing despite data loss (with warnings)
val lenientDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "logs")
  .option("failOnDataLoss", "false")  // Continue on data loss
  .load()

Consumer Performance Tuning

Advanced options for tuning Kafka consumer performance.

/**
 * Consumer poll timeout in milliseconds
 * How long to wait for data in each consumer.poll() call
 * @param timeout - Timeout in milliseconds (default: 60000)
 */
.option("kafkaConsumer.pollTimeoutMs", "30000")

/**
 * Number of retries for offset fetching operations
 * @param retries - Number of retry attempts (default: 3)
 */
.option("fetchOffset.numRetries", "5")

/**
 * Interval between offset fetch retry attempts
 * @param interval - Retry interval in milliseconds (default: 1000)
 */
.option("fetchOffset.retryIntervalMs", "2000")

Usage Example:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("kafkaConsumer.pollTimeoutMs", "5000")    // 5 second poll timeout
  .option("fetchOffset.numRetries", "10")           // Retry offset fetch 10 times
  .option("fetchOffset.retryIntervalMs", "500")     // 500ms between retries
  .load()

Streaming Sink (Writing)

Write streaming data to Kafka topics with various configuration options.

Basic Streaming Write

/**  
 * Write streaming DataFrame to Kafka
 * Requires 'value' column and optionally 'key', 'topic', 'partition', 'headers' columns
 */
dataFrame.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092") 
  .option("topic", "output-topic")                     // Default topic (optional if topic column present)
  .start()

Usage Example:

val inputDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "input-events")
  .load()

// Transform and write back to Kafka
val processedDF = inputDF
  .selectExpr("CAST(value AS STRING) as json_str")
  .select(from_json(col("json_str"), inputSchema).as("data"))
  .select("data.*")
  .filter("status = 'active'")
  .select(to_json(struct("*")).cast("binary").as("value"))

val query = processedDF
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "processed-events")
  .outputMode("append")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

Advanced Streaming Write Options

/**
 * Advanced write options for streaming
 */
.option("kafka.acks", "all")                    // Producer acknowledgment level
.option("kafka.retries", "5")                   // Producer retry count
.option("kafka.batch.size", "16384")            // Batch size for producer
.option("kafka.linger.ms", "10")                // Linger time for batching
.option("kafka.buffer.memory", "33554432")      // Producer buffer memory

Usage Example:

val query = processedDF
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "high-throughput-topic")
  .option("kafka.acks", "1")                    // Faster acknowledgment
  .option("kafka.batch.size", "32768")          // Larger batches
  .option("kafka.linger.ms", "50")              // Wait longer for batching
  .option("kafka.compression.type", "snappy")    // Enable compression
  .outputMode("append")
  .start()

Micro-batch Stream Processing

Configuration specific to micro-batch streaming execution.

/**
 * Micro-batch stream implementation
 * Processes data in small batches with exactly-once semantics
 */
class KafkaMicroBatchStream {
  /**
   * Convert scan to micro-batch stream
   * @param checkpointLocation - Location for checkpoint storage
   * @return MicroBatchStream instance
   */
  def toMicroBatchStream(checkpointLocation: String): MicroBatchStream
}

Usage Example:

import org.apache.spark.sql.streaming.Trigger

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .load()

// Micro-batch processing with explicit trigger
val query = df
  .selectExpr("CAST(value AS STRING) as message")
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("5 minutes"))      // Micro-batch every 5 minutes
  .option("checkpointLocation", "/tmp/checkpoint")    // Required for streaming
  .start()

Continuous Stream Processing

Configuration for continuous streaming execution (experimental).

/**
 * Continuous stream implementation
 * Provides lower latency but with at-least-once semantics
 */
class KafkaContinuousStream {
  /**
   * Convert scan to continuous stream
   * @param checkpointLocation - Location for checkpoint storage  
   * @return ContinuousStream instance
   */
  def toContinuousStream(checkpointLocation: String): ContinuousStream
}

Usage Example:

import org.apache.spark.sql.streaming.Trigger

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "low-latency-events")
  .load()

// Continuous processing for low latency
val query = df
  .selectExpr("CAST(value AS STRING) as message")
  .writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))           // Continuous with 1s checkpoints
  .option("checkpointLocation", "/tmp/continuous")    // Required
  .start()

Stream Monitoring and Metrics

Monitor streaming query progress and performance.

/**
 * Access streaming query metrics and progress
 */
val query = df.writeStream./* ... */.start()

// Monitor progress
query.lastProgress      // Latest batch progress
query.status           // Current query status  
query.recentProgress   // Recent progress history

// Query management
query.stop()           // Stop the query
query.awaitTermination()  // Wait for completion
query.awaitTermination(timeoutMs)  // Wait with timeout

Usage Example:

val query = df
  .selectExpr("CAST(value AS STRING) as message")
  .writeStream
  .outputMode("append")
  .format("console")
  .start()

// Monitor in separate thread
new Thread(() => {
  while (query.isActive) {
    val progress = query.lastProgress
    println(s"Batch ${progress.batchId}: ${progress.inputRowsPerSecond} rows/sec")
    Thread.sleep(10000)
  }
}).start()

// Wait for termination
query.awaitTermination()

Types

import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, ContinuousStream}
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}

// Streaming implementations
class KafkaMicroBatchStream extends MicroBatchStream
class KafkaContinuousStream extends ContinuousStream

// Trigger types for different execution modes
object Trigger {
  def ProcessingTime(interval: String): Trigger      // Micro-batch with time interval
  def ProcessingTime(interval: Long): Trigger        // Micro-batch with duration
  def Once(): Trigger                                // Single micro-batch execution
  def Continuous(interval: String): Trigger          // Continuous processing
}

// Query monitoring types
trait StreamingQuery {
  def lastProgress: StreamingQueryProgress           // Latest progress information
  def recentProgress: Array[StreamingQueryProgress]  // Recent progress history
  def status: StreamingQueryStatus                   // Current status
  def isActive: Boolean                              // Whether query is running
}