A shaded JAR assembly containing all dependencies needed for Kafka integration with Spark Streaming
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10@1.6.0Apache Spark Streaming Kafka Assembly provides seamless integration between Apache Kafka and Spark Streaming, enabling real-time data processing from Kafka topics. This shaded JAR assembly includes all necessary dependencies to avoid version conflicts, supporting both receiver-based and direct (no-receiver) streaming approaches with exactly-once processing semantics.
spark-streaming-kafka-assembly_2.10-1.6.3.jar (add to classpath)org.apache.spark:spark-streaming-kafka-assembly_2.10:1.6.3import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.{StringDecoder, DefaultDecoder}import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.apache.spark.streaming.kafka.Broker;
import org.apache.spark.streaming.kafka.HasOffsetRanges;import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "localhost:9092",
"auto.offset.reset" -> "largest"
)
val topics = Set("my-topic")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
streamingContext, kafkaParams, topics
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// Process the data
rdd.foreach(println)
}val kafkaParams = Map[String, String](
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "my-consumer-group"
)
val topicMap = Map("my-topic" -> 1)
val stream = KafkaUtils.createStream(streamingContext, "localhost:2181", "my-consumer-group", topicMap)
stream.print()The Spark Streaming Kafka integration is built around several key components:
Creates input streams that directly pull messages from Kafka brokers without receivers, providing exactly-once semantics and manual offset control.
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)]
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R]Creates input streams using long-running receivers that pull messages from Kafka brokers through Zookeeper coordination (legacy approach).
def createStream[K, V, U <: Decoder[K], T <: Decoder[V]](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)]Creates RDDs from Kafka using specific offset ranges for batch processing scenarios.
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange]
): RDD[(K, V)]
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange],
leaders: Map[TopicAndPartition, Broker],
messageHandler: MessageAndMetadata[K, V] => R
): RDD[R]Complete Java API with type-safe wrappers for all Scala functionality, supporting both streaming and batch processing scenarios.
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>>
JavaPairInputDStream<K, V> createDirectStream(
JavaStreamingContext jssc,
Class<K> keyClass,
Class<V> valueClass,
Class<KD> keyDecoderClass,
Class<VD> valueDecoderClass,
Map<String, String> kafkaParams,
Set<String> topics
)Utilities for managing Kafka offsets, including offset range representation and cluster interaction helpers.
final class OffsetRange(
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long
)
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}// K - Type of Kafka message key
// V - Type of Kafka message value
// KD - Type of Kafka message key decoder (extends kafka.serializer.Decoder[K])
// VD - Type of Kafka message value decoder (extends kafka.serializer.Decoder[V])
// R - Type returned by message handler function// From Kafka library
kafka.common.TopicAndPartition
kafka.message.MessageAndMetadata[K, V]
kafka.serializer.Decoder[T]
kafka.serializer.StringDecoder
kafka.serializer.DefaultDecoder// Core Spark classes
org.apache.spark.streaming.StreamingContext
org.apache.spark.SparkContext
org.apache.spark.rdd.RDD[T]
org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
org.apache.spark.storage.StorageLevel
// Spark Streaming Kafka classes (from this package)
org.apache.spark.streaming.kafka.OffsetRange
org.apache.spark.streaming.kafka.Broker
org.apache.spark.streaming.kafka.HasOffsetRanges