A shaded JAR assembly containing all dependencies needed for Kafka integration with Spark Streaming
—
Receiver-based streaming creates input streams that use long-running receivers to pull messages from Kafka brokers. This is the legacy approach that relies on Zookeeper for coordination and consumer group management.
Creates a receiver-based stream with String key/value types.
/**
* Create an input stream that pulls messages from Kafka Brokers using receivers.
*
* @param ssc StreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
* @return DStream of (Kafka message key, Kafka message value)
*/
def createStream(
ssc: StreamingContext,
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[(String, String)]Usage Example:
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
val zkQuorum = "localhost:2181"
val groupId = "my-consumer-group"
val topics = Map("user-events" -> 1, "purchase-events" -> 2)
val stream = KafkaUtils.createStream(
streamingContext, zkQuorum, groupId, topics, StorageLevel.MEMORY_AND_DISK_SER_2
)
stream.foreachRDD { rdd =>
rdd.foreach { case (key, value) =>
println(s"Received: $key -> $value")
}
}Creates a receiver-based stream with custom key/value types and decoders.
/**
* Create an input stream that pulls messages from Kafka Brokers using receivers
* with custom key/value types.
*
* @param ssc StreamingContext object
* @param kafkaParams Map of kafka configuration parameters
* @param topics Map of (topic_name -> numPartitions) to consume
* @param storageLevel Storage level to use for storing the received objects
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam U type of Kafka message key decoder
* @tparam T type of Kafka message value decoder
* @return DStream of (Kafka message key, Kafka message value)
*/
def createStream[K: ClassTag, V: ClassTag, U <: Decoder[K]: ClassTag, T <: Decoder[V]: ClassTag](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)]Usage Example:
import kafka.serializer.{StringDecoder, DefaultDecoder}
val kafkaParams = Map[String, String](
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "my-consumer-group",
"zookeeper.connection.timeout.ms" -> "10000"
)
val topics = Map("binary-data" -> 1)
val binaryStream = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
streamingContext, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER
)
binaryStream.foreachRDD { rdd =>
rdd.foreach { case (keyBytes, valueBytes) =>
println(s"Key length: ${keyBytes.length}, Value length: ${valueBytes.length}")
}
}Java-friendly API for receiver-based stream creation.
/**
* Create an input stream that pulls messages from Kafka Brokers using receivers (Java API).
*
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
* @param topics Map of (topic_name -> numPartitions) to consume
* @return DStream of (Kafka message key, Kafka message value)
*/
public static JavaPairReceiverInputDStream<String, String> createStream(
JavaStreamingContext jssc,
String zkQuorum,
String groupId,
Map<String, Integer> topics
)
/**
* Create an input stream with custom storage level (Java API).
*/
public static JavaPairReceiverInputDStream<String, String> createStream(
JavaStreamingContext jssc,
String zkQuorum,
String groupId,
Map<String, Integer> topics,
StorageLevel storageLevel
)Java Usage Example:
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.storage.StorageLevel;
Map<String, Integer> topics = new HashMap<>();
topics.put("my-topic", 1);
JavaPairReceiverInputDStream<String, String> stream = KafkaUtils.createStream(
jssc,
"localhost:2181",
"my-consumer-group",
topics,
StorageLevel.MEMORY_AND_DISK_SER_2()
);
stream.foreachRDD(rdd -> {
rdd.foreach(record -> {
System.out.println("Key: " + record._1 + ", Value: " + record._2);
});
});Java API with custom key/value types and decoders.
/**
* Create an input stream with custom types and decoders (Java API).
*
* @param jssc JavaStreamingContext object
* @param keyTypeClass Key type of DStream
* @param valueTypeClass Value type of DStream
* @param keyDecoderClass Type of kafka key decoder
* @param valueDecoderClass Type of kafka value decoder
* @param kafkaParams Map of kafka configuration parameters
* @param topics Map of (topic_name -> numPartitions) to consume
* @param storageLevel RDD storage level
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam U type of Kafka message key decoder
* @tparam T type of Kafka message value decoder
* @return DStream of (Kafka message key, Kafka message value)
*/
public static <K, V, U extends Decoder<K>, T extends Decoder<V>>
JavaPairReceiverInputDStream<K, V> createStream(
JavaStreamingContext jssc,
Class<K> keyTypeClass,
Class<V> valueTypeClass,
Class<U> keyDecoderClass,
Class<T> valueDecoderClass,
Map<String, String> kafkaParams,
Map<String, Integer> topics,
StorageLevel storageLevel
)spark.streaming.receiver.writeAheadLog.enable=trueCommon storage levels for different reliability/performance trade-offs:
StorageLevel.MEMORY_AND_DISK_SER_2 // Default: Memory + disk, serialized, replicated
StorageLevel.MEMORY_AND_DISK_SER // Memory + disk, serialized, not replicated
StorageLevel.MEMORY_ONLY_SER_2 // Memory only, serialized, replicated
StorageLevel.DISK_ONLY_2 // Disk only, replicatedMap(
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "my-consumer-group",
"zookeeper.connection.timeout.ms" -> "10000",
"zookeeper.session.timeout.ms" -> "10000",
"zookeeper.sync.time.ms" -> "2000",
"auto.commit.interval.ms" -> "1000"
)// Map of topic name to number of threads/partitions to consume
Map(
"topic1" -> 1, // 1 thread for topic1
"topic2" -> 2, // 2 threads for topic2
"topic3" -> 4 // 4 threads for topic3
)For new applications, consider using direct streaming instead:
// Old receiver-based approach
val receiverStream = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
// New direct streaming approach
val directStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics.keySet
)Benefits of migration:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10