The Kafka connector provides comprehensive streaming capabilities with both micro-batch and continuous processing modes, supporting various trigger types and providing detailed metrics.
V2 DataSource micro-batch streaming implementation for Kafka with comprehensive trigger support.
/**
* V2 DataSource micro-batch streaming implementation for Kafka
* Provides reliable streaming with exactly-once semantics
*/
class KafkaMicroBatchStream extends MicroBatchStream
with SupportsTriggerAvailableNow
with ReportsSourceMetrics {
/** Returns the initial offset for starting the stream */
def initialOffset(): Offset
/** Returns the latest available offset */
def latestOffset(): Offset
/** Returns latest offset with read limit applied */
def latestOffset(startOffset: Offset, readLimit: ReadLimit): Offset
/** Plans input partitions for the given offset range */
def planInputPartitions(start: Offset, end: Offset): Array[InputPartition]
/** Creates reader factory for reading partitions */
def createReaderFactory(): PartitionReaderFactory
/** Deserializes offset from checkpoint */
def deserializeOffset(json: String): Offset
/** Commits processed offset */
def commit(end: Offset): Unit
/** Stops the stream and releases resources */
def stop(): Unit
/** Returns stream metrics */
def metrics(latestConsumedOffset: Optional[Offset]): ju.Map[String, String]
/** Prepares stream for Trigger.AvailableNow mode */
def prepareForTriggerAvailableNow(): Unit
}Usage Examples:
// Basic micro-batch streaming
val microBatchStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.load()
val query = microBatchStream
.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds")) // Micro-batch every 30 seconds
.start()
// Trigger.AvailableNow for processing all available data
val availableNowQuery = microBatchStream
.writeStream
.outputMode("append")
.format("parquet")
.option("path", "/output/path")
.trigger(Trigger.AvailableNow()) // Process all available data then stop
.start()V2 DataSource continuous streaming implementation for low-latency processing.
/**
* V2 DataSource continuous streaming implementation
* Provides low-latency processing with at-least-once semantics
*/
class KafkaContinuousStream extends ContinuousStream {
/** Merges partition offsets into a single offset */
def mergeOffsets(offsets: Array[PartitionOffset]): Offset
/** Returns initial offset for continuous processing */
def initialOffset(): Offset
/** Deserializes offset from checkpoint */
def deserializeOffset(json: String): Offset
/** Plans continuous reader tasks */
def planInputPartitions(start: Offset): Array[InputPartition]
/** Creates continuous reader factory */
def createContinuousReaderFactory(): ContinuousPartitionReaderFactory
/** Commits processed offset */
def commit(end: Offset): Unit
/** Stops continuous processing */
def stop(): Unit
}Usage Examples:
// Continuous streaming for low-latency processing
val continuousStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "real-time-events")
.load()
val continuousQuery = continuousStream
.writeStream
.outputMode("append")
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "processed-events")
.trigger(Trigger.Continuous("1 second")) // Continuous with 1-second checkpoint
.start()Legacy V1 DataSource streaming implementation, maintained for backward compatibility.
/**
* Legacy streaming source for reading from Kafka (DataSource V1)
* Maintained for backward compatibility
*/
class KafkaSource extends Source with SupportsTriggerAvailableNow {
/** Returns the schema of the source */
def schema: StructType
/** Gets the current offset */
def getOffset: Option[Offset]
/** Gets batch DataFrame for the given offset range */
def getBatch(start: Option[Offset], end: Offset): DataFrame
/** Stops the source */
def stop(): Unit
/** Returns default read limit */
def getDefaultReadLimit: ReadLimit
/** Gets latest offset with read limit */
def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset
/** Reports latest offset for monitoring */
def reportLatestOffset(): streaming.Offset
/** Prepares for Trigger.AvailableNow */
def prepareForTriggerAvailableNow(): Unit
}Regular micro-batch processing at fixed intervals:
// Process every 30 seconds
.trigger(Trigger.ProcessingTime("30 seconds"))
// Process every 5 minutes
.trigger(Trigger.ProcessingTime("5 minutes"))
// Process as fast as possible
.trigger(Trigger.ProcessingTime("0 seconds"))Process all currently available data then stop:
// Process all available data once
val query = kafkaStream
.writeStream
.outputMode("append")
.format("delta")
.option("path", "/delta/table/path")
.trigger(Trigger.AvailableNow())
.start()
// Wait for completion
query.awaitTermination()Low-latency continuous processing:
// Continuous processing with 1-second checkpoints
.trigger(Trigger.Continuous("1 second"))
// Continuous processing with 10-second checkpoints
.trigger(Trigger.Continuous("10 seconds"))Process one micro-batch then stop:
// Process exactly one batch
.trigger(Trigger.Once())Control the amount of data processed per trigger:
// Limit to 1000 offsets per trigger across all partitions
val rateLimitedStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "high-volume-topic")
.option("maxOffsetsPerTrigger", "1000")
.load()// Ensure at least 100 offsets are processed per trigger
val minRateStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "low-volume-topic")
.option("minOffsetsPerTrigger", "100")
.load()// Maximum delay between triggers when minOffsetsPerTrigger is not met
val delayedStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "variable-volume-topic")
.option("minOffsetsPerTrigger", "500")
.option("maxTriggerDelay", "60s") // Wait max 60 seconds
.load()The streaming sources provide comprehensive metrics:
// Available metrics from ReportsSourceMetrics
def metrics(latestConsumedOffset: Optional[Offset]): ju.Map[String, String] = {
// Returns metrics including:
// - offsetOutOfRange: Number of out-of-range offsets
// - dataLoss: Number of data loss events
// - numRecords: Number of records processed
// - estimatedOffsetsToProcess: Estimated remaining offsets
}Usage Examples:
// Access metrics through streaming query
val query = kafkaStream
.writeStream
.outputMode("append")
.format("console")
.start()
// Get metrics
val progress = query.lastProgress
val inputMetrics = progress.sources(0) // First source metrics
println(s"Records processed: ${inputMetrics.numInputRows}")
println(s"Processing rate: ${inputMetrics.inputRowsPerSecond}")The connector exposes custom metrics for monitoring:
// OffsetOutOfRangeMetric
class OffsetOutOfRangeMetric extends CustomSumMetric {
def name(): String = "offsetOutOfRange"
def description(): String = "estimated number of fetched offsets out of range"
}
// DataLossMetric
class DataLossMetric extends CustomSumMetric {
def name(): String = "dataLoss"
def description(): String = "number of data loss error"
}Accessing Custom Metrics:
// Custom metrics are included in source metrics
val customMetrics = query.lastProgress.sources(0).metrics
val offsetOutOfRange = customMetrics.getOrElse("offsetOutOfRange", "0")
val dataLoss = customMetrics.getOrElse("dataLoss", "0")Streaming queries automatically maintain checkpoints for fault tolerance:
val faultTolerantQuery = kafkaStream
.writeStream
.outputMode("append")
.format("delta")
.option("path", "/output/path")
.option("checkpointLocation", "/checkpoint/path") // Essential for production
.start()Configure how to handle data loss scenarios:
// Fail query on data loss (default, recommended for critical data)
val strictStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "critical-events")
.option("failOnDataLoss", "true") // Default
.load()
// Continue processing despite data loss (for non-critical data)
val lenientStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "logs")
.option("failOnDataLoss", "false") // Continue on data loss
.load()Control the minimum number of Spark partitions:
// Ensure at least 10 Spark partitions for parallelism
val partitionedStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "large-topic")
.option("minPartitions", "10")
.load()Configure consumer pool behavior:
// Configure consumer cache settings
val configuredStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("kafkaConsumer.pollTimeoutMs", "5000") // Poll timeout
.option("fetchOffset.numRetries", "5") // Retry attempts
.option("fetchOffset.retryIntervalMs", "1000") // Retry interval
.load()// Balance latency vs throughput
val optimizedStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("maxOffsetsPerTrigger", "50000") // Larger batches for higher throughput
.load()
.repartition(20) // Increase parallelism for processing// Optimize Kafka consumer settings
val tuedStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("kafka.fetch.min.bytes", "1048576") // 1MB min fetch
.option("kafka.fetch.max.wait.ms", "500") // 500ms max wait
.option("kafka.max.poll.records", "10000") // More records per poll
.load()// Automatic restart on failure
val resilientQuery = kafkaStream
.writeStream
.outputMode("append")
.format("console")
.option("checkpointLocation", "/checkpoint/path")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
// Monitor for exceptions
query.exception.foreach { ex =>
println(s"Query failed with exception: ${ex.getMessage}")
// Implement custom restart logic
}// Graceful shutdown handling
sys.addShutdownHook {
println("Shutting down streaming query...")
query.stop()
query.awaitTermination(30000) // Wait up to 30 seconds
}