Apache Spark Structured Streaming integration with Apache Kafka providing comprehensive data source and sink capabilities for both batch and streaming workloads.
Comprehensive offset tracking and management system for precise control over Kafka data consumption boundaries, supporting both streaming and batch processing with fault tolerance and exactly-once semantics.
Defines desired offset range limits for consuming data from Kafka partitions.
/**
* Base trait for offset range limits
*/
sealed trait KafkaOffsetRangeLimit
/**
* Binds to earliest available offsets in partitions
*/
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit
/**
* Binds to latest available offsets in partitions
*/
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit
/**
* Binds to specific offset positions per partition
* @param partitionOffsets Map of TopicPartition to specific offset
*/
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimitobject KafkaOffsetRangeLimit {
/** Indicates resolution to latest offset */
val LATEST: Long = -1L
/** Indicates resolution to earliest offset */
val EARLIEST: Long = -2L
}Custom offset implementation for Kafka sources containing partition-to-offset mappings.
/**
* Custom offset for Kafka source containing partition-to-offset mappings
* @param partitionToOffsets Map of TopicPartition to offset
*/
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2 {
/** JSON representation of partition offsets */
def json: String
}
/**
* Companion object for KafkaSourceOffset
*/
object KafkaSourceOffset {
/**
* Extracts partition offsets from generic offset
* @param offset Generic offset to extract from
* @return Map of TopicPartition to offset
*/
def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long]
/**
* Creates offset from topic-partition-offset tuples
* @param offsetTuples Variable arguments of (topic, partition, offset)
* @return KafkaSourceOffset instance
*/
def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset
/**
* Creates offset from JSON representation
* @param offset Serialized offset in JSON format
* @return KafkaSourceOffset instance
*/
def apply(offset: SerializedOffset): KafkaSourceOffset
}/**
* Represents offset for a specific partition in continuous streaming
* @param topicPartition The topic partition
* @param partitionOffset The offset within the partition
*/
case class KafkaSourcePartitionOffset(
topicPartition: TopicPartition,
partitionOffset: Long
) extends PartitionOffsetComponent for reading offset information from Kafka using the KafkaConsumer API.
/**
* Component for reading offset information from Kafka
*/
class KafkaOffsetReader extends Logging {
/**
* Closes connection to Kafka brokers
*/
def close(): Unit
/**
* Fetches topic partitions based on consumer strategy
* @return Set of TopicPartition objects
*/
def fetchTopicPartitions(): Set[TopicPartition]
/**
* Resolves specific partition offsets
* @param partitionOffsets Map of partitions to offsets (may contain special values)
* @param reportDataLoss Function to report data loss
* @return KafkaSourceOffset with resolved offsets
*/
def fetchSpecificOffsets(
partitionOffsets: Map[TopicPartition, Long],
reportDataLoss: String => Unit
): KafkaSourceOffset
/**
* Fetches earliest available offsets for all partitions
* @return Map of TopicPartition to earliest offset
*/
def fetchEarliestOffsets(): Map[TopicPartition, Long]
/**
* Fetches latest available offsets
* @param knownOffsets Previously known offsets for comparison
* @return Map of TopicPartition to latest offset
*/
def fetchLatestOffsets(knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap
/**
* Fetches earliest offsets for specific partitions
* @param newPartitions Set of partitions to fetch offsets for
* @return Map of TopicPartition to earliest offset
*/
def fetchEarliestOffsets(newPartitions: Set[TopicPartition]): Map[TopicPartition, Long]
}
/**
* Companion object for KafkaOffsetReader
*/
object KafkaOffsetReader {
/**
* Returns the fixed schema for Kafka records
* @return StructType defining Kafka record schema
*/
def kafkaSchema: StructType
}Calculates offset ranges based on minPartitions configuration for parallelism optimization.
/**
* Calculates offset ranges based on minPartitions configuration
* @param minPartitions Minimum number of partitions to create
*/
class KafkaOffsetRangeCalculator(minPartitions: Option[Int]) {
/**
* Calculates offset ranges with preferred executor locations
* @param fromOffsets Starting offsets per partition
* @param untilOffsets Ending offsets per partition
* @param executorLocations Available executor locations
* @return Sequence of KafkaOffsetRange objects
*/
def getRanges(
fromOffsets: Map[TopicPartition, Long],
untilOffsets: Map[TopicPartition, Long],
executorLocations: Seq[String]
): Seq[KafkaOffsetRange]
}
/**
* Companion object for KafkaOffsetRangeCalculator
*/
object KafkaOffsetRangeCalculator {
/**
* Creates calculator from DataSource options
* @param options DataSource options containing minPartitions
* @return KafkaOffsetRangeCalculator instance
*/
def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator
}
/**
* Represents an offset range for a topic partition with preferred executor location
* @param topicPartition The topic partition
* @param fromOffset Starting offset (inclusive)
* @param untilOffset Ending offset (exclusive)
* @param preferredLoc Preferred executor location for locality
*/
case class KafkaOffsetRange(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
preferredLoc: Option[String]
) {
/** Lazy-computed size of the offset range */
lazy val size: Long = untilOffset - fromOffset
}// Start from earliest available offsets
.option("startingOffsets", "earliest")
// Start from latest available offsets
.option("startingOffsets", "latest")
// Start from specific offsets (JSON format)
.option("startingOffsets", """{"topic1":{"0":123,"1":456},"topic2":{"0":789}}""")// Read until latest available offsets
.option("endingOffsets", "latest")
// Read until specific offsets (JSON format)
.option("endingOffsets", """{"topic1":{"0":500,"1":600},"topic2":{"0":800}}""")// Fail query on data loss (default)
.option("failOnDataLoss", "true")
// Continue processing on data loss
.option("failOnDataLoss", "false")val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", """{"events":{"0":1000,"1":2000}}""")
.option("failOnDataLoss", "false")
.load()val batchDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "transactions")
.option("startingOffsets", "earliest")
.option("endingOffsets", """{"transactions":{"0":5000,"1":6000}}""")
.load()val query = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "clickstream")
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", "10000")
.load()
.writeStream
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()Spark automatically checkpoints processed offsets for exactly-once processing:
val query = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.load()
.writeStream
.option("checkpointLocation", "/path/to/checkpoint")
.start()The system detects and handles various data loss scenarios:
// Strict mode - fail on any data loss
.option("failOnDataLoss", "true")
// Lenient mode - log warnings and continue
.option("failOnDataLoss", "false")// Minimum partitions for parallel processing
.option("minPartitions", "20")
// Rate limiting for streaming
.option("maxOffsetsPerTrigger", "1000000")// Kafka consumer buffer sizes
.option("kafka.receive.buffer.bytes", "65536")
.option("kafka.fetch.max.bytes", "52428800")
.option("kafka.max.poll.records", "500"){
"topic1": [0, 1, 2],
"topic2": [0, 1]
}{
"topic1": {
"0": 1000,
"1": 2000,
"2": 3000
},
"topic2": {
"0": 500,
"1": 600
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-sql-kafka-0-10-2-11