Assembly JAR providing Apache Spark integration with Apache Kafka 0.10 for reliable distributed streaming data processing
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly@4.0.0Apache Spark's integration with Apache Kafka 0.10 for reliable distributed streaming data processing. This assembly JAR packages the core Kafka 0.10 streaming library and all its dependencies into a single deployable JAR file, enabling consumption of data from Kafka topics as Spark DStreams and RDDs with exactly-once delivery semantics.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10-assembly_2.13</artifactId>
<version>4.0.0</version>
</dependency>import org.apache.spark.streaming.kafka010._For specific functionality:
import org.apache.spark.streaming.kafka010.{KafkaUtils, LocationStrategies, ConsumerStrategies}
import org.apache.spark.streaming.kafka010.{OffsetRange, HasOffsetRanges, CanCommitOffsets}import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import java.util.HashMap
val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
val kafkaParams = new HashMap[String, Object]()
kafkaParams.put("bootstrap.servers", "localhost:9092")
kafkaParams.put("key.deserializer", classOf[StringDeserializer])
kafkaParams.put("value.deserializer", classOf[StringDeserializer])
kafkaParams.put("group.id", "my-group")
kafkaParams.put("auto.offset.reset", "latest")
val topics = Array("topic1", "topic2")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// Process the stream
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreach { record =>
println(s"${record.key}: ${record.value}")
}
// Commit offsets if needed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
ssc.start()
ssc.awaitTermination()import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.TopicPartition
val offsetRanges = Array(
OffsetRange("topic1", 0, 0, 1000),
OffsetRange("topic1", 1, 0, 1000)
)
val rdd = KafkaUtils.createRDD[String, String](
spark.sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent
)
// Process the RDD
rdd.foreach { record =>
println(s"${record.key}: ${record.value}")
}The Kafka 0.10 integration is built around several key components:
Core functionality for creating Kafka-backed Spark streams and RDDs with configurable location strategies and consumer strategies.
object KafkaUtils {
// Scala Stream Creation
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): InputDStream[ConsumerRecord[K, V]]
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V],
perPartitionConfig: PerPartitionConfig
): InputDStream[ConsumerRecord[K, V]]
// Scala RDD Creation
def createRDD[K, V](
sc: SparkContext,
kafkaParams: java.util.Map[String, Object],
offsetRanges: Array[OffsetRange],
locationStrategy: LocationStrategy
): RDD[ConsumerRecord[K, V]]
}Strategies for scheduling Kafka consumers on executors to optimize performance and network locality.
object LocationStrategies {
def PreferBrokers: LocationStrategy
def PreferConsistent: LocationStrategy
def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy
def PreferFixed(hostMap: java.util.Map[TopicPartition, String]): LocationStrategy
}Configuration strategies for creating and managing Kafka consumers with different subscription patterns.
object ConsumerStrategies {
// Subscribe to specific topics
def Subscribe[K, V](
topics: Iterable[String],
kafkaParams: collection.Map[String, Object]
): ConsumerStrategy[K, V]
// Subscribe to topics matching a pattern
def SubscribePattern[K, V](
pattern: java.util.regex.Pattern,
kafkaParams: collection.Map[String, Object]
): ConsumerStrategy[K, V]
// Assign specific topic partitions
def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object]
): ConsumerStrategy[K, V]
}Comprehensive offset range management and commit operations for exactly-once processing semantics.
final class OffsetRange {
val topic: String
val partition: Int
val fromOffset: Long
val untilOffset: Long
def topicPartition(): TopicPartition
def count(): Long
}
object OffsetRange {
def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
def create(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
}
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}
trait CanCommitOffsets {
def commitAsync(offsetRanges: Array[OffsetRange]): Unit
def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
}Configuration interface for controlling processing rates and other settings on a per-partition basis.
abstract class PerPartitionConfig extends Serializable {
def maxRatePerPartition(topicPartition: TopicPartition): Long
def minRatePerPartition(topicPartition: TopicPartition): Long
}The integration supports numerous Spark configuration parameters for fine-tuning performance:
spark.streaming.kafka.maxRatePerPartition: Maximum records per second per partition (default: 0 = unlimited)spark.streaming.kafka.minRatePerPartition: Minimum records per second per partition (default: 1)spark.streaming.kafka.consumer.cache.enabled: Enable consumer caching (default: true)spark.streaming.kafka.consumer.cache.maxCapacity: Maximum cached consumers (default: 64)spark.streaming.kafka.consumer.cache.initialCapacity: Initial cache capacity (default: 16)spark.streaming.kafka.consumer.cache.loadFactor: Cache load factor (default: 0.75)spark.streaming.kafka.consumer.poll.ms: Consumer poll timeout in milliseconds (optional)spark.streaming.kafka.allowNonConsecutiveOffsets: Allow non-consecutive offsets (default: false)// Core Kafka types (from kafka-clients dependency)
import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetCommitCallback}
import org.apache.kafka.common.TopicPartition
// Spark types
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.rdd.RDD
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaInputDStream}
// Java types
import java.util.{Map => JMap, Collection => JCollection}
import java.util.regex.Pattern
import java.{lang => jl}