Apache Spark streaming integration with Kafka 0.10+ providing exactly-once semantics and high-performance data consumption
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kafka-0-10_2-11@2.4.0Apache Spark Streaming integration with Kafka 0.10+ that provides exactly-once semantics and high-performance real-time data processing. This library offers a direct approach to Kafka integration with better performance and reliability compared to earlier receiver-based approaches.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.8</version>
</dependency>import org.apache.spark.streaming.kafka010._For specific components:
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.ConsumerStrategiesJava imports:
import org.apache.spark.streaming.kafka010.*;import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark-streaming-group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topic1", "topic2")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// Process RDD
rdd.foreach(record => println(s"${record.key}: ${record.value}"))
// Commit offsets after processing
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}The Spark Streaming Kafka integration is built around several key components:
Core functionality for creating Kafka DStreams with exactly-once semantics and configurable consumer strategies.
object KafkaUtils {
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): InputDStream[ConsumerRecord[K, V]]
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V],
perPartitionConfig: PerPartitionConfig
): InputDStream[ConsumerRecord[K, V]]
}Batch-oriented interface for consuming specific offset ranges from Kafka with full control over exactly-once semantics.
object KafkaUtils {
def createRDD[K, V](
sc: SparkContext,
kafkaParams: java.util.Map[String, Object],
offsetRanges: Array[OffsetRange],
locationStrategy: LocationStrategy
): RDD[ConsumerRecord[K, V]]
}Control how Kafka consumers are scheduled across Spark executors for optimal performance and data locality.
object LocationStrategies {
def PreferBrokers: LocationStrategy
def PreferConsistent: LocationStrategy
def PreferFixed(hostMap: Map[TopicPartition, String]): LocationStrategy
}Flexible consumer configuration supporting topic subscription, pattern-based subscription, and partition assignment.
object ConsumerStrategies {
def Subscribe[K, V](
topics: Iterable[String],
kafkaParams: Map[String, Object]
): ConsumerStrategy[K, V]
def SubscribePattern[K, V](
pattern: java.util.regex.Pattern,
kafkaParams: Map[String, Object]
): ConsumerStrategy[K, V]
def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: Map[String, Object]
): ConsumerStrategy[K, V]
}Precise offset tracking and management for exactly-once processing guarantees and reliable stream processing.
final class OffsetRange(
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long
) {
def topicPartition(): TopicPartition
def count(): Long
}
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}
trait CanCommitOffsets {
def commitAsync(offsetRanges: Array[OffsetRange]): Unit
def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
}// Kafka consumer record type (from kafka-clients)
import org.apache.kafka.clients.consumer.ConsumerRecord
// Kafka TopicPartition (from kafka-clients)
import org.apache.kafka.common.TopicPartition
// Spark streaming context
import org.apache.spark.streaming.StreamingContext
// Spark context for batch operations
import org.apache.spark.SparkContextabstract class PerPartitionConfig extends Serializable {
def maxRatePerPartition(topicPartition: TopicPartition): Long
def minRatePerPartition(topicPartition: TopicPartition): Long = 1L
}Key Spark configuration parameters that affect Kafka integration behavior:
// Rate limiting per partition (default: 0 = unlimited)
"spark.streaming.kafka.maxRatePerPartition" -> "1000"
// Minimum rate per partition (default: 1)
"spark.streaming.kafka.minRatePerPartition" -> "1"
// Consumer poll timeout in milliseconds (default: 120000)
"spark.streaming.kafka.consumer.poll.ms" -> "120000"
// Allow non-consecutive offsets (default: false)
"spark.streaming.kafka.allowNonConsecutiveOffsets" -> "false"
// Consumer cache configuration
"spark.streaming.kafka.consumer.cache.enabled" -> "true"
"spark.streaming.kafka.consumer.cache.capacity" -> "64"
"spark.streaming.kafka.consumer.cache.timeout" -> "300s"The Kafka integration automatically modifies certain consumer parameters on executors for reliability:
enable.auto.commit is always set to false on executorsauto.offset.reset is set to none on executorsgroup.id is prefixed with spark-executor- on executorsreceive.buffer.bytes is set to minimum 65536 (KAFKA-3135 workaround)These modifications ensure proper exactly-once semantics and prevent consumer conflicts between driver and executors.
Common exceptions thrown by the Kafka integration:
Always handle these exceptions appropriately and ensure proper Kafka configuration, especially bootstrap.servers and consumer group settings.