Assembly JAR providing Apache Spark integration with Apache Kafka 0.10 for reliable distributed streaming data processing
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Spark's integration with Apache Kafka 0.10 for reliable distributed streaming data processing. This assembly JAR packages the core Kafka 0.10 streaming library and all its dependencies into a single deployable JAR file, enabling consumption of data from Kafka topics as Spark DStreams and RDDs with exactly-once delivery semantics.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10-assembly_2.13</artifactId>
<version>4.0.0</version>
</dependency>import org.apache.spark.streaming.kafka010._For specific functionality:
import org.apache.spark.streaming.kafka010.{KafkaUtils, LocationStrategies, ConsumerStrategies}
import org.apache.spark.streaming.kafka010.{OffsetRange, HasOffsetRanges, CanCommitOffsets}import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import java.util.HashMap
val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
val kafkaParams = new HashMap[String, Object]()
kafkaParams.put("bootstrap.servers", "localhost:9092")
kafkaParams.put("key.deserializer", classOf[StringDeserializer])
kafkaParams.put("value.deserializer", classOf[StringDeserializer])
kafkaParams.put("group.id", "my-group")
kafkaParams.put("auto.offset.reset", "latest")
val topics = Array("topic1", "topic2")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// Process the stream
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreach { record =>
println(s"${record.key}: ${record.value}")
}
// Commit offsets if needed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
ssc.start()
ssc.awaitTermination()import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.TopicPartition
val offsetRanges = Array(
OffsetRange("topic1", 0, 0, 1000),
OffsetRange("topic1", 1, 0, 1000)
)
val rdd = KafkaUtils.createRDD[String, String](
spark.sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent
)
// Process the RDD
rdd.foreach { record =>
println(s"${record.key}: ${record.value}")
}The Kafka 0.10 integration is built around several key components:
Core functionality for creating Kafka-backed Spark streams and RDDs with configurable location strategies and consumer strategies.
object KafkaUtils {
// Scala Stream Creation
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): InputDStream[ConsumerRecord[K, V]]
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V],
perPartitionConfig: PerPartitionConfig
): InputDStream[ConsumerRecord[K, V]]
// Scala RDD Creation
def createRDD[K, V](
sc: SparkContext,
kafkaParams: java.util.Map[String, Object],
offsetRanges: Array[OffsetRange],
locationStrategy: LocationStrategy
): RDD[ConsumerRecord[K, V]]
}Strategies for scheduling Kafka consumers on executors to optimize performance and network locality.
object LocationStrategies {
def PreferBrokers: LocationStrategy
def PreferConsistent: LocationStrategy
def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy
def PreferFixed(hostMap: java.util.Map[TopicPartition, String]): LocationStrategy
}Configuration strategies for creating and managing Kafka consumers with different subscription patterns.
object ConsumerStrategies {
// Subscribe to specific topics
def Subscribe[K, V](
topics: Iterable[String],
kafkaParams: collection.Map[String, Object]
): ConsumerStrategy[K, V]
// Subscribe to topics matching a pattern
def SubscribePattern[K, V](
pattern: java.util.regex.Pattern,
kafkaParams: collection.Map[String, Object]
): ConsumerStrategy[K, V]
// Assign specific topic partitions
def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object]
): ConsumerStrategy[K, V]
}Comprehensive offset range management and commit operations for exactly-once processing semantics.
final class OffsetRange {
val topic: String
val partition: Int
val fromOffset: Long
val untilOffset: Long
def topicPartition(): TopicPartition
def count(): Long
}
object OffsetRange {
def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
def create(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
}
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}
trait CanCommitOffsets {
def commitAsync(offsetRanges: Array[OffsetRange]): Unit
def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
}Configuration interface for controlling processing rates and other settings on a per-partition basis.
abstract class PerPartitionConfig extends Serializable {
def maxRatePerPartition(topicPartition: TopicPartition): Long
def minRatePerPartition(topicPartition: TopicPartition): Long
}The integration supports numerous Spark configuration parameters for fine-tuning performance:
spark.streaming.kafka.maxRatePerPartition: Maximum records per second per partition (default: 0 = unlimited)spark.streaming.kafka.minRatePerPartition: Minimum records per second per partition (default: 1)spark.streaming.kafka.consumer.cache.enabled: Enable consumer caching (default: true)spark.streaming.kafka.consumer.cache.maxCapacity: Maximum cached consumers (default: 64)spark.streaming.kafka.consumer.cache.initialCapacity: Initial cache capacity (default: 16)spark.streaming.kafka.consumer.cache.loadFactor: Cache load factor (default: 0.75)spark.streaming.kafka.consumer.poll.ms: Consumer poll timeout in milliseconds (optional)spark.streaming.kafka.allowNonConsecutiveOffsets: Allow non-consecutive offsets (default: false)// Core Kafka types (from kafka-clients dependency)
import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetCommitCallback}
import org.apache.kafka.common.TopicPartition
// Spark types
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.rdd.RDD
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaInputDStream}
// Java types
import java.util.{Map => JMap, Collection => JCollection}
import java.util.regex.Pattern
import java.{lang => jl}