Streaming source and sink functionality with advanced configuration options for real-time data processing with Apache Kafka.
Configure streaming reads from Kafka with various performance and reliability options.
Control the rate of data consumption to prevent overwhelming downstream processing.
/**
* Maximum number of offsets to process per trigger
* Helps prevent memory issues and provides backpressure control
* @param limit - Number of offsets per trigger (default: unlimited)
*/
.option("maxOffsetsPerTrigger", "1000")Usage Example:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "high-volume-topic")
.option("maxOffsetsPerTrigger", "5000") // Process max 5000 messages per batch
.load()
val query = df
.selectExpr("CAST(value AS STRING) as message")
.writeStream
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode("append")
.format("console")
.start()Configure minimum partitions for reading to improve parallelism.
/**
* Minimum number of partitions for the streaming DataFrame
* Useful when Kafka topic has fewer partitions than desired Spark parallelism
* @param partitions - Minimum partition count (must be positive)
*/
.option("minPartitions", "8")Usage Example:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "single-partition-topic")
.option("minPartitions", "10") // Create 10 Spark partitions from 1 Kafka partition
.load()Configure behavior when data is no longer available in Kafka.
/**
* Control behavior when data loss is detected
* @param policy - "true" (fail query) or "false" (continue with warning)
* Default: "true" (fail on data loss)
*/
.option("failOnDataLoss", "false")Usage Examples:
// Fail query when data is lost (default behavior)
val strictDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "critical-events")
.option("failOnDataLoss", "true") // Explicit - this is the default
.load()
// Continue processing despite data loss (with warnings)
val lenientDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "logs")
.option("failOnDataLoss", "false") // Continue on data loss
.load()Advanced options for tuning Kafka consumer performance.
/**
* Consumer poll timeout in milliseconds
* How long to wait for data in each consumer.poll() call
* @param timeout - Timeout in milliseconds (default: 60000)
*/
.option("kafkaConsumer.pollTimeoutMs", "30000")
/**
* Number of retries for offset fetching operations
* @param retries - Number of retry attempts (default: 3)
*/
.option("fetchOffset.numRetries", "5")
/**
* Interval between offset fetch retry attempts
* @param interval - Retry interval in milliseconds (default: 1000)
*/
.option("fetchOffset.retryIntervalMs", "2000")Usage Example:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("kafkaConsumer.pollTimeoutMs", "5000") // 5 second poll timeout
.option("fetchOffset.numRetries", "10") // Retry offset fetch 10 times
.option("fetchOffset.retryIntervalMs", "500") // 500ms between retries
.load()Write streaming data to Kafka topics with various configuration options.
/**
* Write streaming DataFrame to Kafka
* Requires 'value' column and optionally 'key', 'topic', 'partition', 'headers' columns
*/
dataFrame.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic") // Default topic (optional if topic column present)
.start()Usage Example:
val inputDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input-events")
.load()
// Transform and write back to Kafka
val processedDF = inputDF
.selectExpr("CAST(value AS STRING) as json_str")
.select(from_json(col("json_str"), inputSchema).as("data"))
.select("data.*")
.filter("status = 'active'")
.select(to_json(struct("*")).cast("binary").as("value"))
val query = processedDF
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "processed-events")
.outputMode("append")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()/**
* Advanced write options for streaming
*/
.option("kafka.acks", "all") // Producer acknowledgment level
.option("kafka.retries", "5") // Producer retry count
.option("kafka.batch.size", "16384") // Batch size for producer
.option("kafka.linger.ms", "10") // Linger time for batching
.option("kafka.buffer.memory", "33554432") // Producer buffer memoryUsage Example:
val query = processedDF
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "high-throughput-topic")
.option("kafka.acks", "1") // Faster acknowledgment
.option("kafka.batch.size", "32768") // Larger batches
.option("kafka.linger.ms", "50") // Wait longer for batching
.option("kafka.compression.type", "snappy") // Enable compression
.outputMode("append")
.start()Configuration specific to micro-batch streaming execution.
/**
* Micro-batch stream implementation
* Processes data in small batches with exactly-once semantics
*/
class KafkaMicroBatchStream {
/**
* Convert scan to micro-batch stream
* @param checkpointLocation - Location for checkpoint storage
* @return MicroBatchStream instance
*/
def toMicroBatchStream(checkpointLocation: String): MicroBatchStream
}Usage Example:
import org.apache.spark.sql.streaming.Trigger
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.load()
// Micro-batch processing with explicit trigger
val query = df
.selectExpr("CAST(value AS STRING) as message")
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("5 minutes")) // Micro-batch every 5 minutes
.option("checkpointLocation", "/tmp/checkpoint") // Required for streaming
.start()Configuration for continuous streaming execution (experimental).
/**
* Continuous stream implementation
* Provides lower latency but with at-least-once semantics
*/
class KafkaContinuousStream {
/**
* Convert scan to continuous stream
* @param checkpointLocation - Location for checkpoint storage
* @return ContinuousStream instance
*/
def toContinuousStream(checkpointLocation: String): ContinuousStream
}Usage Example:
import org.apache.spark.sql.streaming.Trigger
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "low-latency-events")
.load()
// Continuous processing for low latency
val query = df
.selectExpr("CAST(value AS STRING) as message")
.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second")) // Continuous with 1s checkpoints
.option("checkpointLocation", "/tmp/continuous") // Required
.start()Monitor streaming query progress and performance.
/**
* Access streaming query metrics and progress
*/
val query = df.writeStream./* ... */.start()
// Monitor progress
query.lastProgress // Latest batch progress
query.status // Current query status
query.recentProgress // Recent progress history
// Query management
query.stop() // Stop the query
query.awaitTermination() // Wait for completion
query.awaitTermination(timeoutMs) // Wait with timeoutUsage Example:
val query = df
.selectExpr("CAST(value AS STRING) as message")
.writeStream
.outputMode("append")
.format("console")
.start()
// Monitor in separate thread
new Thread(() => {
while (query.isActive) {
val progress = query.lastProgress
println(s"Batch ${progress.batchId}: ${progress.inputRowsPerSecond} rows/sec")
Thread.sleep(10000)
}
}).start()
// Wait for termination
query.awaitTermination()import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, ContinuousStream}
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
// Streaming implementations
class KafkaMicroBatchStream extends MicroBatchStream
class KafkaContinuousStream extends ContinuousStream
// Trigger types for different execution modes
object Trigger {
def ProcessingTime(interval: String): Trigger // Micro-batch with time interval
def ProcessingTime(interval: Long): Trigger // Micro-batch with duration
def Once(): Trigger // Single micro-batch execution
def Continuous(interval: String): Trigger // Continuous processing
}
// Query monitoring types
trait StreamingQuery {
def lastProgress: StreamingQueryProgress // Latest progress information
def recentProgress: Array[StreamingQueryProgress] // Recent progress history
def status: StreamingQueryStatus // Current status
def isActive: Boolean // Whether query is running
}