A shaded JAR assembly containing all dependencies needed for Kafka integration with Spark Streaming
—
Direct streaming creates input streams that directly query Kafka brokers without using any receiver. This approach provides exactly-once semantics, lower latency, and better throughput compared to receiver-based streaming.
Creates a direct stream with automatic offset management.
/**
* Create an input stream that directly pulls messages from Kafka Brokers
* without using any receiver. This stream can guarantee that each message
* from Kafka is included in transformations exactly once.
*
* @param ssc StreamingContext object
* @param kafkaParams Kafka configuration parameters. Requires "metadata.broker.list"
* or "bootstrap.servers" to be set with Kafka broker(s) specified in
* host1:port1,host2:port2 form. If not starting from checkpoint,
* "auto.offset.reset" may be set to "largest" or "smallest"
* @param topics Names of the topics to consume
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @return DStream of (Kafka message key, Kafka message value)
*/
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag
](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)]Usage Example:
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "localhost:9092",
"auto.offset.reset" -> "largest"
)
val topics = Set("user-events", "purchase-events")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
streamingContext, kafkaParams, topics
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreach { case (key, value) =>
println(s"Key: $key, Value: $value")
}
// Commit offsets to external store if needed
offsetRanges.foreach(println)
}Creates a direct stream with explicit starting offsets and custom message transformation.
/**
* Create an input stream that directly pulls messages from Kafka Brokers
* with explicit starting offsets and custom message handler.
*
* @param ssc StreamingContext object
* @param kafkaParams Kafka configuration parameters
* @param fromOffsets Per-topic/partition Kafka offsets defining the inclusive starting point
* @param messageHandler Function for translating each message and metadata into desired type
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @tparam R type returned by messageHandler
* @return DStream of R
*/
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag,
R: ClassTag
](
ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R]Usage Example:
import kafka.common.TopicAndPartition
val fromOffsets = Map(
TopicAndPartition("events", 0) -> 1000L,
TopicAndPartition("events", 1) -> 2000L
)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => {
s"${mmd.topic}:${mmd.partition}:${mmd.offset} -> ${mmd.key()}:${mmd.message()}"
}
val customStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
streamingContext, kafkaParams, fromOffsets, messageHandler
)
customStream.print()Java-friendly API for direct stream creation.
/**
* Create an input stream that directly pulls messages from Kafka Brokers
* without using any receiver (Java API).
*
* @param jssc JavaStreamingContext object
* @param keyClass Class of the keys in the Kafka records
* @param valueClass Class of the values in the Kafka records
* @param keyDecoderClass Class of the key decoder
* @param valueDecoderClass Class type of the value decoder
* @param kafkaParams Kafka configuration parameters
* @param topics Names of the topics to consume
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @return DStream of (Kafka message key, Kafka message value)
*/
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>>
JavaPairInputDStream<K, V> createDirectStream(
JavaStreamingContext jssc,
Class<K> keyClass,
Class<V> valueClass,
Class<KD> keyDecoderClass,
Class<VD> valueDecoderClass,
Map<String, String> kafkaParams,
Set<String> topics
)Java Usage Example:
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("auto.offset.reset", "largest");
Set<String> topics = Collections.singleton("my-topic");
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topics
);
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreach(record -> {
System.out.println("Key: " + record._1 + ", Value: " + record._2);
});
});Configure maximum messages per partition per second using spark.streaming.kafka.maxRatePerPartition:
val conf = new SparkConf()
.setAppName("KafkaDirectStream")
.set("spark.streaming.kafka.maxRatePerPartition", "1000")Access offset information from generated RDDs:
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach { offsetRange =>
println(s"${offsetRange.topic} ${offsetRange.partition} " +
s"${offsetRange.fromOffset} ${offsetRange.untilOffset}")
}
}Map(
"metadata.broker.list" -> "host1:port1,host2:port2" // Kafka brokers (NOT zookeeper)
)Map(
"auto.offset.reset" -> "largest", // or "smallest"
"group.id" -> "my-consumer-group",
"enable.auto.commit" -> "false"
)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10