Advanced streaming readers supporting both micro-batch and continuous processing modes with fault tolerance, exactly-once semantics, and efficient partition-based data consumption from Kafka.
Micro-batch reader for structured streaming that processes data in discrete batches with configurable trigger intervals.
/**
* Micro-batch reader for Kafka data in structured streaming
*/
class KafkaMicroBatchReader extends MicroBatchReader with Logging {
/**
* Sets offset range for the current micro-batch
* @param start Starting offset (None for initial batch)
* @param end Ending offset for this batch
*/
def setOffsetRange(start: Option[Offset], end: Offset): Unit
/**
* Plans input partitions for the current batch
* @return List of InputPartition objects for parallel processing
*/
def planInputPartitions(): ju.List[InputPartition[InternalRow]]
/**
* Gets the start offset for the current batch
* @return Starting offset or null if none
*/
def getStartOffset: Offset
/**
* Gets the end offset for the current batch
* @return Ending offset for this batch
*/
def getEndOffset: Offset
/**
* Deserializes offset from JSON string
* @param json JSON representation of offset
* @return Deserialized Offset object
*/
def deserializeOffset(json: String): Offset
/**
* Returns schema for Kafka records
* @return StructType defining record schema
*/
def readSchema(): StructType
/**
* Commits the processed offset
* @param end Offset to commit
*/
def commit(end: Offset): Unit
/**
* Stops the reader and releases resources
*/
def stop(): Unit
}Input partition for micro-batch processing that handles a specific offset range.
/**
* Input partition for micro-batch processing
* @param offsetRange Range of offsets to process
* @param executorKafkaParams Kafka parameters for executors
* @param pollTimeoutMs Timeout for Kafka consumer polls
* @param failOnDataLoss Whether to fail on data loss
* @param reuseKafkaConsumer Whether to reuse consumer instances
*/
case class KafkaMicroBatchInputPartition(
offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean,
reuseKafkaConsumer: Boolean
) extends InputPartition[InternalRow] {
/**
* Gets preferred executor locations for data locality
* @return Array of preferred executor hostnames
*/
def preferredLocations(): Array[String]
/**
* Creates partition reader for this input partition
* @return InputPartitionReader for processing records
*/
def createPartitionReader(): InputPartitionReader[InternalRow]
}Partition reader for micro-batch processing that reads records from a specific offset range.
/**
* Partition reader for micro-batch processing
* @param offsetRange Range of offsets to read
* @param executorKafkaParams Kafka parameters for this executor
* @param pollTimeoutMs Consumer poll timeout
* @param failOnDataLoss Whether to fail on data loss
* @param reuseKafkaConsumer Whether to reuse consumer
*/
case class KafkaMicroBatchInputPartitionReader(
offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean,
reuseKafkaConsumer: Boolean
) extends InputPartitionReader[InternalRow] with Logging {
/**
* Advances to the next record
* @return true if next record is available
*/
def next(): Boolean
/**
* Gets the current record as UnsafeRow
* @return Current record as UnsafeRow
*/
def get(): UnsafeRow
/**
* Closes the reader and releases resources
*/
def close(): Unit
}Continuous reader for low-latency streaming with sub-second processing capabilities.
/**
* Continuous reader for Kafka data in structured streaming
*/
class KafkaContinuousReader extends ContinuousReader with Logging {
/**
* Returns schema for Kafka records
* @return StructType defining record schema
*/
def readSchema: StructType
/**
* Sets starting offset for continuous processing
* @param start Starting offset (None to start from beginning)
*/
def setStartOffset(start: Option[Offset]): Unit
/**
* Gets the starting offset
* @return Starting offset or null if none
*/
def getStartOffset(): Offset
/**
* Deserializes offset from JSON string
* @param json JSON representation of offset
* @return Deserialized Offset object
*/
def deserializeOffset(json: String): Offset
/**
* Plans input partitions for continuous processing
* @return List of InputPartition objects
*/
def planInputPartitions(): ju.List[InputPartition[InternalRow]]
/**
* Stops the reader and releases resources
*/
def stop(): Unit
/**
* Commits processed offsets
* @param end Offset to commit
*/
def commit(end: Offset): Unit
/**
* Merges partition offsets into a single offset
* @param offsets Array of partition offsets
* @return Merged offset
*/
def mergeOffsets(offsets: Array[PartitionOffset]): Offset
/**
* Checks if reader reconfiguration is needed
* @return true if reconfiguration is required
*/
def needsReconfiguration(): Boolean
}Input partition for continuous processing that handles a specific topic partition.
/**
* Input partition for continuous processing
* @param topicPartition Kafka topic partition
* @param startOffset Starting offset for processing
* @param kafkaParams Kafka consumer parameters
* @param pollTimeoutMs Consumer poll timeout
* @param failOnDataLoss Whether to fail on data loss
*/
case class KafkaContinuousInputPartition(
topicPartition: TopicPartition,
startOffset: Long,
kafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean
) extends ContinuousInputPartition[InternalRow] {
/**
* Creates continuous reader for this partition
* @param offset Starting partition offset
* @return InputPartitionReader for continuous processing
*/
def createContinuousReader(offset: PartitionOffset): InputPartitionReader[InternalRow]
/**
* Creates partition reader for this partition
* @return KafkaContinuousInputPartitionReader instance
*/
def createPartitionReader(): KafkaContinuousInputPartitionReader
}Partition reader for continuous processing that provides low-latency record consumption.
/**
* Partition reader for continuous processing
* @param topicPartition Kafka topic partition to read from
* @param startOffset Starting offset for reading
* @param kafkaParams Kafka consumer parameters
* @param pollTimeoutMs Consumer poll timeout
* @param failOnDataLoss Whether to fail on data loss
*/
class KafkaContinuousInputPartitionReader(
topicPartition: TopicPartition,
startOffset: Long,
kafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean
) extends ContinuousInputPartitionReader[InternalRow] {
/**
* Advances to the next record
* @return true if next record is available
*/
def next(): Boolean
/**
* Gets the current record as UnsafeRow
* @return Current record as UnsafeRow
*/
def get(): UnsafeRow
/**
* Gets the current partition offset
* @return Current offset for this partition
*/
def getOffset(): KafkaSourcePartitionOffset
/**
* Closes the reader and releases resources
*/
def close(): Unit
}Legacy streaming source for backward compatibility with DataSource V1 API.
/**
* Legacy streaming source for reading from Kafka (DataSource V1)
*/
class KafkaSource extends Source with Logging {
/**
* Returns the schema of Kafka records
* @return StructType defining record schema
*/
def schema: StructType
/**
* Gets the maximum available offset
* @return Optional offset representing latest available data
*/
def getOffset: Option[Offset]
/**
* Gets batch data between specified offsets
* @param start Starting offset (None for initial batch)
* @param end Ending offset for this batch
* @return DataFrame containing batch data
*/
def getBatch(start: Option[Offset], end: Offset): DataFrame
/**
* Stops the source and releases resources
*/
def stop(): Unit
}
/**
* Companion object for KafkaSource
*/
object KafkaSource {
/**
* Gets sorted list of executor IDs for locality optimization
* @param sc SparkContext for accessing executor information
* @return Array of executor IDs sorted for consistent assignment
*/
def getSortedExecutorList(sc: SparkContext): Array[String]
}import org.apache.spark.sql.streaming.Trigger
import java.util.concurrent.TimeUnit
val microBatchDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", "10000")
.load()
val query = microBatchDF
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime(30, TimeUnit.SECONDS))
.option("checkpointLocation", "/tmp/checkpoint")
.start()
query.awaitTermination()val continuousDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "low-latency-events")
.option("startingOffsets", "latest")
.load()
val continuousQuery = continuousDF
.selectExpr("CAST(value AS STRING) as json")
.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.Continuous("1 second"))
.option("checkpointLocation", "/tmp/continuous-checkpoint")
.start()
continuousQuery.awaitTermination()val legacyDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.options(Map(
"subscribe" -> "legacy-topic",
"startingOffsets" -> "earliest"
))
.load().option("kafka.fetch.min.bytes", "1024") // Minimum fetch size
.option("kafka.fetch.max.wait.ms", "500") // Max wait for min bytes
.option("kafka.max.poll.records", "500") // Records per poll
.option("kafka.session.timeout.ms", "30000") // Session timeout
.option("kafka.heartbeat.interval.ms", "3000") // Heartbeat interval.option("minPartitions", "10") // Minimum Spark partitions
.option("maxOffsetsPerTrigger", "1000000") // Rate limiting.option("kafka.receive.buffer.bytes", "65536") // Receive buffer
.option("kafka.send.buffer.bytes", "131072") // Send buffer
.option("kafka.fetch.max.bytes", "52428800") // Max fetch sizeThe streaming sources handle various data loss scenarios:
// Strict error handling
.option("failOnDataLoss", "true")
// Lenient error handling with warnings
.option("failOnDataLoss", "false")// Consumer poll timeout
.option("kafkaConsumer.pollTimeoutMs", "10000")
// Connection timeout
.option("kafka.request.timeout.ms", "30000")// Checkpoint location for exactly-once processing
.option("checkpointLocation", "/path/to/checkpoint")
// Automatic retry configuration
.option("kafka.retry.backoff.ms", "100")
.option("kafka.reconnect.backoff.ms", "50")val query = df.writeStream
.format("console")
.start()
// Monitor progress
val progress = query.lastProgress
println(s"Input rows per second: ${progress.inputRowsPerSecond}")
println(s"Processing time: ${progress.durationMs}")// Access current offsets
val currentOffsets = query.lastProgress.sources(0).endOffset
println(s"Current offsets: $currentOffsets")| Feature | Micro-batch | Continuous | Legacy |
|---|---|---|---|
| Latency | ~100ms+ | ~1ms | ~100ms+ |
| Throughput | High | Medium | High |
| Fault Tolerance | Full | Full | Full |
| Exactly-once | Yes | Yes | Yes |
| State Management | Full | Limited | Full |
| Aggregations | All | Simple | All |
| API Version | DataSource V2 | DataSource V2 | DataSource V1 |