CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly

Assembly JAR providing Apache Spark integration with Apache Kafka 0.10 for reliable distributed streaming data processing

Pending
Overview
Eval results
Files

stream-creation.mddocs/

Stream Creation

Core functionality for creating Kafka-backed Spark streams and RDDs with configurable location strategies and consumer strategies. Provides both streaming (DStream) and batch (RDD) interfaces for consuming Kafka data.

Capabilities

Direct Stream Creation (Scala)

Creates a DStream where each Kafka topic/partition corresponds to an RDD partition, enabling efficient parallel processing.

def createDirectStream[K, V](
  ssc: StreamingContext,
  locationStrategy: LocationStrategy,
  consumerStrategy: ConsumerStrategy[K, V]
): InputDStream[ConsumerRecord[K, V]]

Parameters:

  • ssc: StreamingContext - The Spark Streaming context
  • locationStrategy: LocationStrategy - How to schedule consumers (use LocationStrategies.PreferConsistent in most cases)
  • consumerStrategy: ConsumerStrategy[K, V] - How to create and configure consumers (use ConsumerStrategies.Subscribe in most cases)
  • Returns: InputDStream[ConsumerRecord[K, V]] - Stream of Kafka consumer records

Usage Example:

import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer

val ssc = new StreamingContext(spark.sparkContext, Seconds(5))

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "my-group",
  "auto.offset.reset" -> "latest"
)

val topics = Array("topic1", "topic2")

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

// The stream produces ConsumerRecord objects
stream.foreachRDD { rdd =>
  rdd.foreach { record: ConsumerRecord[String, String] =>
    println(s"Topic: ${record.topic}, Partition: ${record.partition}, " +
            s"Offset: ${record.offset}, Key: ${record.key}, Value: ${record.value}")
  }
}

Direct Stream Creation with Per-Partition Config (Scala)

Creates a DStream with custom per-partition configuration for rate limiting and other settings.

def createDirectStream[K, V](
  ssc: StreamingContext,
  locationStrategy: LocationStrategy,
  consumerStrategy: ConsumerStrategy[K, V],
  perPartitionConfig: PerPartitionConfig
): InputDStream[ConsumerRecord[K, V]]

Parameters:

  • ssc: StreamingContext - The Spark Streaming context
  • locationStrategy: LocationStrategy - How to schedule consumers
  • consumerStrategy: ConsumerStrategy[K, V] - How to create and configure consumers
  • perPartitionConfig: PerPartitionConfig - Per-partition configuration settings
  • Returns: InputDStream[ConsumerRecord[K, V]] - Stream of Kafka consumer records

Usage Example:

import org.apache.spark.streaming.kafka010._

// Custom per-partition configuration
class CustomPerPartitionConfig extends PerPartitionConfig {
  def maxRatePerPartition(topicPartition: TopicPartition): Long = {
    // Different rates for different partitions
    if (topicPartition.topic() == "high-volume-topic") 1000 else 500
  }
  
  override def minRatePerPartition(topicPartition: TopicPartition): Long = 10
}

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
  new CustomPerPartitionConfig()
)

Direct Stream Creation (Java)

Java API for creating direct streams from Kafka.

def createDirectStream[K, V](
  jssc: JavaStreamingContext,
  locationStrategy: LocationStrategy,
  consumerStrategy: ConsumerStrategy[K, V]
): JavaInputDStream[ConsumerRecord[K, V]]

def createDirectStream[K, V](
  jssc: JavaStreamingContext,
  locationStrategy: LocationStrategy,
  consumerStrategy: ConsumerStrategy[K, V],
  perPartitionConfig: PerPartitionConfig
): JavaInputDStream[ConsumerRecord[K, V]]

Usage Example (Java):

import org.apache.spark.streaming.kafka010.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;

JavaStreamingContext jssc = new JavaStreamingContext(spark.sparkContext(), Durations.seconds(5));

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "my-group");
kafkaParams.put("auto.offset.reset", "latest");

Collection<String> topics = Arrays.asList("topic1", "topic2");

JavaInputDStream<ConsumerRecord<String, String>> stream = 
  KafkaUtils.createDirectStream(
    jssc,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.Subscribe(topics, kafkaParams)
  );

RDD Creation (Scala)

Creates a batch-oriented RDD interface for consuming from Kafka with specified offset ranges for exactly-once semantics.

def createRDD[K, V](
  sc: SparkContext,
  kafkaParams: java.util.Map[String, Object],
  offsetRanges: Array[OffsetRange],
  locationStrategy: LocationStrategy
): RDD[ConsumerRecord[K, V]]

Parameters:

  • sc: SparkContext - The Spark context
  • kafkaParams: java.util.Map[String, Object] - Kafka configuration parameters (must include "bootstrap.servers")
  • offsetRanges: Array[OffsetRange] - Offset ranges defining the Kafka data for this RDD
  • locationStrategy: LocationStrategy - How to schedule consumers
  • Returns: RDD[ConsumerRecord[K, V]] - RDD of Kafka consumer records

Usage Example:

import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.TopicPartition

val kafkaParams = new java.util.HashMap[String, Object]()
kafkaParams.put("bootstrap.servers", "localhost:9092")
kafkaParams.put("key.deserializer", classOf[StringDeserializer])
kafkaParams.put("value.deserializer", classOf[StringDeserializer])
kafkaParams.put("group.id", "batch-group")

// Define specific offset ranges to consume
val offsetRanges = Array(
  OffsetRange("topic1", 0, 0, 1000),    // partition 0, offsets 0-999
  OffsetRange("topic1", 1, 500, 1500),  // partition 1, offsets 500-1499
  OffsetRange("topic2", 0, 0, 2000)     // topic2 partition 0, offsets 0-1999
)

val rdd = KafkaUtils.createRDD[String, String](
  spark.sparkContext,
  kafkaParams,
  offsetRanges,
  LocationStrategies.PreferConsistent
)

// Process the RDD
rdd.foreach { record =>
  println(s"Consumed: ${record.key} -> ${record.value}")
}

RDD Creation (Java)

Java API for creating batch-oriented RDDs from Kafka.

def createRDD[K, V](
  jsc: JavaSparkContext,
  kafkaParams: java.util.Map[String, Object],
  offsetRanges: Array[OffsetRange],
  locationStrategy: LocationStrategy
): JavaRDD[ConsumerRecord[K, V]]

Usage Example (Java):

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.kafka010.*;

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

OffsetRange[] offsetRanges = {
  OffsetRange.create("topic1", 0, 0L, 1000L),
  OffsetRange.create("topic1", 1, 500L, 1500L)
};

JavaRDD<ConsumerRecord<String, String>> rdd = 
  KafkaUtils.createRDD(
    jsc,
    kafkaParams,
    offsetRanges,
    LocationStrategies.PreferConsistent()
  );

Rate Control

The direct stream supports automatic rate limiting through Spark's backpressure mechanism:

  • Set spark.streaming.backpressure.enabled=true to enable backpressure
  • Set spark.streaming.kafka.maxRatePerPartition to limit messages per second per partition
  • Set spark.streaming.kafka.minRatePerPartition to set minimum processing rate
  • Use custom PerPartitionConfig for fine-grained per-partition control

Error Handling

The stream creation handles several Kafka-specific edge cases:

  • KAFKA-3370 Workaround: Handles NoOffsetForPartitionException when auto.offset.reset=none
  • Parameter Validation: Automatically fixes problematic Kafka parameters for executors
  • Consumer Caching: Manages consumer lifecycle and caching for performance
  • Offset Management: Ensures proper offset handling for exactly-once semantics

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly

docs

consumer-strategies.md

index.md

location-strategies.md

offset-management.md

per-partition-config.md

stream-creation.md

tile.json