A shaded JAR assembly containing all dependencies needed for Kafka integration with Spark Streaming
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Spark Streaming Kafka Assembly provides seamless integration between Apache Kafka and Spark Streaming, enabling real-time data processing from Kafka topics. This shaded JAR assembly includes all necessary dependencies to avoid version conflicts, supporting both receiver-based and direct (no-receiver) streaming approaches with exactly-once processing semantics.
spark-streaming-kafka-assembly_2.10-1.6.3.jar (add to classpath)org.apache.spark:spark-streaming-kafka-assembly_2.10:1.6.3import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.{StringDecoder, DefaultDecoder}import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.apache.spark.streaming.kafka.Broker;
import org.apache.spark.streaming.kafka.HasOffsetRanges;import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "localhost:9092",
"auto.offset.reset" -> "largest"
)
val topics = Set("my-topic")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
streamingContext, kafkaParams, topics
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// Process the data
rdd.foreach(println)
}val kafkaParams = Map[String, String](
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "my-consumer-group"
)
val topicMap = Map("my-topic" -> 1)
val stream = KafkaUtils.createStream(streamingContext, "localhost:2181", "my-consumer-group", topicMap)
stream.print()The Spark Streaming Kafka integration is built around several key components:
Creates input streams that directly pull messages from Kafka brokers without receivers, providing exactly-once semantics and manual offset control.
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)]
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R]Creates input streams using long-running receivers that pull messages from Kafka brokers through Zookeeper coordination (legacy approach).
def createStream[K, V, U <: Decoder[K], T <: Decoder[V]](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)]Creates RDDs from Kafka using specific offset ranges for batch processing scenarios.
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange]
): RDD[(K, V)]
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange],
leaders: Map[TopicAndPartition, Broker],
messageHandler: MessageAndMetadata[K, V] => R
): RDD[R]Complete Java API with type-safe wrappers for all Scala functionality, supporting both streaming and batch processing scenarios.
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>>
JavaPairInputDStream<K, V> createDirectStream(
JavaStreamingContext jssc,
Class<K> keyClass,
Class<V> valueClass,
Class<KD> keyDecoderClass,
Class<VD> valueDecoderClass,
Map<String, String> kafkaParams,
Set<String> topics
)Utilities for managing Kafka offsets, including offset range representation and cluster interaction helpers.
final class OffsetRange(
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long
)
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}// K - Type of Kafka message key
// V - Type of Kafka message value
// KD - Type of Kafka message key decoder (extends kafka.serializer.Decoder[K])
// VD - Type of Kafka message value decoder (extends kafka.serializer.Decoder[V])
// R - Type returned by message handler function// From Kafka library
kafka.common.TopicAndPartition
kafka.message.MessageAndMetadata[K, V]
kafka.serializer.Decoder[T]
kafka.serializer.StringDecoder
kafka.serializer.DefaultDecoder// Core Spark classes
org.apache.spark.streaming.StreamingContext
org.apache.spark.SparkContext
org.apache.spark.rdd.RDD[T]
org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
org.apache.spark.storage.StorageLevel
// Spark Streaming Kafka classes (from this package)
org.apache.spark.streaming.kafka.OffsetRange
org.apache.spark.streaming.kafka.Broker
org.apache.spark.streaming.kafka.HasOffsetRanges