Assembly JAR providing Apache Spark integration with Apache Kafka 0.10 for reliable distributed streaming data processing
—
Core functionality for creating Kafka-backed Spark streams and RDDs with configurable location strategies and consumer strategies. Provides both streaming (DStream) and batch (RDD) interfaces for consuming Kafka data.
Creates a DStream where each Kafka topic/partition corresponds to an RDD partition, enabling efficient parallel processing.
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): InputDStream[ConsumerRecord[K, V]]Parameters:
ssc: StreamingContext - The Spark Streaming contextlocationStrategy: LocationStrategy - How to schedule consumers (use LocationStrategies.PreferConsistent in most cases)consumerStrategy: ConsumerStrategy[K, V] - How to create and configure consumers (use ConsumerStrategies.Subscribe in most cases)Usage Example:
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "my-group",
"auto.offset.reset" -> "latest"
)
val topics = Array("topic1", "topic2")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// The stream produces ConsumerRecord objects
stream.foreachRDD { rdd =>
rdd.foreach { record: ConsumerRecord[String, String] =>
println(s"Topic: ${record.topic}, Partition: ${record.partition}, " +
s"Offset: ${record.offset}, Key: ${record.key}, Value: ${record.value}")
}
}Creates a DStream with custom per-partition configuration for rate limiting and other settings.
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V],
perPartitionConfig: PerPartitionConfig
): InputDStream[ConsumerRecord[K, V]]Parameters:
ssc: StreamingContext - The Spark Streaming contextlocationStrategy: LocationStrategy - How to schedule consumersconsumerStrategy: ConsumerStrategy[K, V] - How to create and configure consumersperPartitionConfig: PerPartitionConfig - Per-partition configuration settingsUsage Example:
import org.apache.spark.streaming.kafka010._
// Custom per-partition configuration
class CustomPerPartitionConfig extends PerPartitionConfig {
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
// Different rates for different partitions
if (topicPartition.topic() == "high-volume-topic") 1000 else 500
}
override def minRatePerPartition(topicPartition: TopicPartition): Long = 10
}
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
new CustomPerPartitionConfig()
)Java API for creating direct streams from Kafka.
def createDirectStream[K, V](
jssc: JavaStreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): JavaInputDStream[ConsumerRecord[K, V]]
def createDirectStream[K, V](
jssc: JavaStreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V],
perPartitionConfig: PerPartitionConfig
): JavaInputDStream[ConsumerRecord[K, V]]Usage Example (Java):
import org.apache.spark.streaming.kafka010.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
JavaStreamingContext jssc = new JavaStreamingContext(spark.sparkContext(), Durations.seconds(5));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "my-group");
kafkaParams.put("auto.offset.reset", "latest");
Collection<String> topics = Arrays.asList("topic1", "topic2");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams)
);Creates a batch-oriented RDD interface for consuming from Kafka with specified offset ranges for exactly-once semantics.
def createRDD[K, V](
sc: SparkContext,
kafkaParams: java.util.Map[String, Object],
offsetRanges: Array[OffsetRange],
locationStrategy: LocationStrategy
): RDD[ConsumerRecord[K, V]]Parameters:
sc: SparkContext - The Spark contextkafkaParams: java.util.Map[String, Object] - Kafka configuration parameters (must include "bootstrap.servers")offsetRanges: Array[OffsetRange] - Offset ranges defining the Kafka data for this RDDlocationStrategy: LocationStrategy - How to schedule consumersUsage Example:
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.TopicPartition
val kafkaParams = new java.util.HashMap[String, Object]()
kafkaParams.put("bootstrap.servers", "localhost:9092")
kafkaParams.put("key.deserializer", classOf[StringDeserializer])
kafkaParams.put("value.deserializer", classOf[StringDeserializer])
kafkaParams.put("group.id", "batch-group")
// Define specific offset ranges to consume
val offsetRanges = Array(
OffsetRange("topic1", 0, 0, 1000), // partition 0, offsets 0-999
OffsetRange("topic1", 1, 500, 1500), // partition 1, offsets 500-1499
OffsetRange("topic2", 0, 0, 2000) // topic2 partition 0, offsets 0-1999
)
val rdd = KafkaUtils.createRDD[String, String](
spark.sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent
)
// Process the RDD
rdd.foreach { record =>
println(s"Consumed: ${record.key} -> ${record.value}")
}Java API for creating batch-oriented RDDs from Kafka.
def createRDD[K, V](
jsc: JavaSparkContext,
kafkaParams: java.util.Map[String, Object],
offsetRanges: Array[OffsetRange],
locationStrategy: LocationStrategy
): JavaRDD[ConsumerRecord[K, V]]Usage Example (Java):
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.kafka010.*;
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
OffsetRange[] offsetRanges = {
OffsetRange.create("topic1", 0, 0L, 1000L),
OffsetRange.create("topic1", 1, 500L, 1500L)
};
JavaRDD<ConsumerRecord<String, String>> rdd =
KafkaUtils.createRDD(
jsc,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent()
);The direct stream supports automatic rate limiting through Spark's backpressure mechanism:
spark.streaming.backpressure.enabled=true to enable backpressurespark.streaming.kafka.maxRatePerPartition to limit messages per second per partitionspark.streaming.kafka.minRatePerPartition to set minimum processing ratePerPartitionConfig for fine-grained per-partition controlThe stream creation handles several Kafka-specific edge cases:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly