CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10

A shaded JAR assembly containing all dependencies needed for Kafka integration with Spark Streaming

Pending
Overview
Eval results
Files

direct-streaming.mddocs/

Direct Streaming

Direct streaming creates input streams that directly query Kafka brokers without using any receiver. This approach provides exactly-once semantics, lower latency, and better throughput compared to receiver-based streaming.

Capabilities

Basic Direct Stream Creation

Creates a direct stream with automatic offset management.

/**
 * Create an input stream that directly pulls messages from Kafka Brokers
 * without using any receiver. This stream can guarantee that each message
 * from Kafka is included in transformations exactly once.
 *
 * @param ssc StreamingContext object
 * @param kafkaParams Kafka configuration parameters. Requires "metadata.broker.list" 
 *                    or "bootstrap.servers" to be set with Kafka broker(s) specified in
 *                    host1:port1,host2:port2 form. If not starting from checkpoint,
 *                    "auto.offset.reset" may be set to "largest" or "smallest"
 * @param topics Names of the topics to consume
 * @tparam K type of Kafka message key
 * @tparam V type of Kafka message value
 * @tparam KD type of Kafka message key decoder
 * @tparam VD type of Kafka message value decoder
 * @return DStream of (Kafka message key, Kafka message value)
 */
def createDirectStream[
  K: ClassTag,
  V: ClassTag,
  KD <: Decoder[K]: ClassTag,
  VD <: Decoder[V]: ClassTag
](
  ssc: StreamingContext,
  kafkaParams: Map[String, String],
  topics: Set[String]
): InputDStream[(K, V)]

Usage Example:

import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder

val kafkaParams = Map[String, String](
  "metadata.broker.list" -> "localhost:9092",
  "auto.offset.reset" -> "largest"
)
val topics = Set("user-events", "purchase-events")

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  streamingContext, kafkaParams, topics
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreach { case (key, value) =>
    println(s"Key: $key, Value: $value")
  }
  // Commit offsets to external store if needed
  offsetRanges.foreach(println)
}

Advanced Direct Stream with Custom Message Handler

Creates a direct stream with explicit starting offsets and custom message transformation.

/**
 * Create an input stream that directly pulls messages from Kafka Brokers
 * with explicit starting offsets and custom message handler.
 *
 * @param ssc StreamingContext object
 * @param kafkaParams Kafka configuration parameters
 * @param fromOffsets Per-topic/partition Kafka offsets defining the inclusive starting point
 * @param messageHandler Function for translating each message and metadata into desired type
 * @tparam K type of Kafka message key
 * @tparam V type of Kafka message value
 * @tparam KD type of Kafka message key decoder
 * @tparam VD type of Kafka message value decoder
 * @tparam R type returned by messageHandler
 * @return DStream of R
 */
def createDirectStream[
  K: ClassTag,
  V: ClassTag,
  KD <: Decoder[K]: ClassTag,
  VD <: Decoder[V]: ClassTag,
  R: ClassTag
](
  ssc: StreamingContext,
  kafkaParams: Map[String, String],
  fromOffsets: Map[TopicAndPartition, Long],
  messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R]

Usage Example:

import kafka.common.TopicAndPartition

val fromOffsets = Map(
  TopicAndPartition("events", 0) -> 1000L,
  TopicAndPartition("events", 1) -> 2000L
)

val messageHandler = (mmd: MessageAndMetadata[String, String]) => {
  s"${mmd.topic}:${mmd.partition}:${mmd.offset} -> ${mmd.key()}:${mmd.message()}"
}

val customStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
  streamingContext, kafkaParams, fromOffsets, messageHandler
)

customStream.print()

Java Direct Stream API

Java-friendly API for direct stream creation.

/**
 * Create an input stream that directly pulls messages from Kafka Brokers
 * without using any receiver (Java API).
 *
 * @param jssc JavaStreamingContext object
 * @param keyClass Class of the keys in the Kafka records
 * @param valueClass Class of the values in the Kafka records
 * @param keyDecoderClass Class of the key decoder
 * @param valueDecoderClass Class type of the value decoder
 * @param kafkaParams Kafka configuration parameters
 * @param topics Names of the topics to consume
 * @tparam K type of Kafka message key
 * @tparam V type of Kafka message value
 * @tparam KD type of Kafka message key decoder
 * @tparam VD type of Kafka message value decoder
 * @return DStream of (Kafka message key, Kafka message value)
 */
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>> 
JavaPairInputDStream<K, V> createDirectStream(
  JavaStreamingContext jssc,
  Class<K> keyClass,
  Class<V> valueClass,
  Class<KD> keyDecoderClass,
  Class<VD> valueDecoderClass,
  Map<String, String> kafkaParams,
  Set<String> topics
)

Java Usage Example:

import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;

Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("auto.offset.reset", "largest");

Set<String> topics = Collections.singleton("my-topic");

JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
    jssc,
    String.class,
    String.class,
    StringDecoder.class,
    StringDecoder.class,
    kafkaParams,
    topics
);

stream.foreachRDD(rdd -> {
    OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    rdd.foreach(record -> {
        System.out.println("Key: " + record._1 + ", Value: " + record._2);
    });
});

Key Features

Exactly-Once Semantics

  • No receivers: Stream directly queries Kafka brokers
  • Manual offset tracking: Offsets tracked by stream itself, not Zookeeper
  • Failure recovery: Enable checkpointing in StreamingContext for driver failure recovery
  • Idempotent output: Ensure output operations are idempotent for end-to-end exactly-once semantics

Rate Limiting

Configure maximum messages per partition per second using spark.streaming.kafka.maxRatePerPartition:

val conf = new SparkConf()
  .setAppName("KafkaDirectStream")
  .set("spark.streaming.kafka.maxRatePerPartition", "1000")

Offset Access

Access offset information from generated RDDs:

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  offsetRanges.foreach { offsetRange =>
    println(s"${offsetRange.topic} ${offsetRange.partition} " +
            s"${offsetRange.fromOffset} ${offsetRange.untilOffset}")
  }
}

Configuration Parameters

Required Parameters

Map(
  "metadata.broker.list" -> "host1:port1,host2:port2" // Kafka brokers (NOT zookeeper)
)

Optional Parameters

Map(
  "auto.offset.reset" -> "largest", // or "smallest"
  "group.id" -> "my-consumer-group",
  "enable.auto.commit" -> "false"
)

Error Handling

  • SparkException: Thrown for connectivity issues, invalid offsets, or configuration problems
  • Offset validation: Automatic validation that requested offsets are available on Kafka brokers
  • Leader discovery: Automatic discovery of partition leaders with fallback handling

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10

docs

batch-rdd.md

direct-streaming.md

index.md

java-api.md

offset-management.md

receiver-streaming.md

tile.json