or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-processing.mdconsumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdstream-creation.md
tile.json

tessl/maven-org-apache-spark--spark-streaming-kafka-0-10_2-11

Apache Spark streaming integration with Kafka 0.10+ providing exactly-once semantics and high-performance data consumption

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-kafka-0-10_2.11@2.4.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kafka-0-10_2-11@2.4.0

index.mddocs/

Spark Streaming Kafka 0.10+ Integration

Apache Spark Streaming integration with Kafka 0.10+ that provides exactly-once semantics and high-performance real-time data processing. This library offers a direct approach to Kafka integration with better performance and reliability compared to earlier receiver-based approaches.

Package Information

  • Package Name: spark-streaming-kafka-0-10_2.11
  • Package Type: Maven
  • Language: Scala (with Java API support)
  • Group ID: org.apache.spark
  • Version: 2.4.8
  • Installation:
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.4.8</version>
    </dependency>

Core Imports

import org.apache.spark.streaming.kafka010._

For specific components:

import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.ConsumerStrategies

Java imports:

import org.apache.spark.streaming.kafka010.*;

Basic Usage

import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "spark-streaming-group",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topic1", "topic2")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  // Process RDD
  rdd.foreach(record => println(s"${record.key}: ${record.value}"))
  // Commit offsets after processing
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

Architecture

The Spark Streaming Kafka integration is built around several key components:

  • KafkaUtils: Primary factory object for creating Kafka RDDs and DStreams
  • DirectKafkaInputDStream: Core streaming implementation providing exactly-once semantics
  • KafkaRDD: Batch-oriented RDD for precise offset control
  • Location Strategies: Control consumer placement for optimal performance
  • Consumer Strategies: Flexible consumer configuration (Subscribe, SubscribePattern, Assign)
  • Offset Management: Precise offset control and commit functionality
  • Rate Control: Per-partition rate limiting and backpressure support

Capabilities

Stream Creation

Core functionality for creating Kafka DStreams with exactly-once semantics and configurable consumer strategies.

object KafkaUtils {
  def createDirectStream[K, V](
    ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V]
  ): InputDStream[ConsumerRecord[K, V]]
  
  def createDirectStream[K, V](
    ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V],
    perPartitionConfig: PerPartitionConfig
  ): InputDStream[ConsumerRecord[K, V]]
}

Stream Creation

Batch Processing

Batch-oriented interface for consuming specific offset ranges from Kafka with full control over exactly-once semantics.

object KafkaUtils {
  def createRDD[K, V](
    sc: SparkContext,
    kafkaParams: java.util.Map[String, Object],
    offsetRanges: Array[OffsetRange],
    locationStrategy: LocationStrategy
  ): RDD[ConsumerRecord[K, V]]
}

Batch Processing

Location Strategies

Control how Kafka consumers are scheduled across Spark executors for optimal performance and data locality.

object LocationStrategies {
  def PreferBrokers: LocationStrategy
  def PreferConsistent: LocationStrategy
  def PreferFixed(hostMap: Map[TopicPartition, String]): LocationStrategy
}

Location Strategies

Consumer Strategies

Flexible consumer configuration supporting topic subscription, pattern-based subscription, and partition assignment.

object ConsumerStrategies {
  def Subscribe[K, V](
    topics: Iterable[String],
    kafkaParams: Map[String, Object]
  ): ConsumerStrategy[K, V]
  
  def SubscribePattern[K, V](
    pattern: java.util.regex.Pattern,
    kafkaParams: Map[String, Object]
  ): ConsumerStrategy[K, V]
  
  def Assign[K, V](
    topicPartitions: Iterable[TopicPartition],
    kafkaParams: Map[String, Object]
  ): ConsumerStrategy[K, V]
}

Consumer Strategies

Offset Management

Precise offset tracking and management for exactly-once processing guarantees and reliable stream processing.

final class OffsetRange(
  val topic: String,
  val partition: Int,
  val fromOffset: Long,
  val untilOffset: Long
) {
  def topicPartition(): TopicPartition
  def count(): Long
}

trait HasOffsetRanges {
  def offsetRanges: Array[OffsetRange]
}

trait CanCommitOffsets {
  def commitAsync(offsetRanges: Array[OffsetRange]): Unit
  def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
}

Offset Management

Types

Core Types

// Kafka consumer record type (from kafka-clients)
import org.apache.kafka.clients.consumer.ConsumerRecord

// Kafka TopicPartition (from kafka-clients)
import org.apache.kafka.common.TopicPartition

// Spark streaming context
import org.apache.spark.streaming.StreamingContext

// Spark context for batch operations
import org.apache.spark.SparkContext

Configuration Types

abstract class PerPartitionConfig extends Serializable {
  def maxRatePerPartition(topicPartition: TopicPartition): Long
  def minRatePerPartition(topicPartition: TopicPartition): Long = 1L
}

Configuration

Spark Configuration Parameters

Key Spark configuration parameters that affect Kafka integration behavior:

// Rate limiting per partition (default: 0 = unlimited)
"spark.streaming.kafka.maxRatePerPartition" -> "1000"

// Minimum rate per partition (default: 1)
"spark.streaming.kafka.minRatePerPartition" -> "1"

// Consumer poll timeout in milliseconds (default: 120000)
"spark.streaming.kafka.consumer.poll.ms" -> "120000"

// Allow non-consecutive offsets (default: false)
"spark.streaming.kafka.allowNonConsecutiveOffsets" -> "false"

// Consumer cache configuration
"spark.streaming.kafka.consumer.cache.enabled" -> "true"
"spark.streaming.kafka.consumer.cache.capacity" -> "64"
"spark.streaming.kafka.consumer.cache.timeout" -> "300s"

Automatic Parameter Handling

The Kafka integration automatically modifies certain consumer parameters on executors for reliability:

  • enable.auto.commit is always set to false on executors
  • auto.offset.reset is set to none on executors
  • group.id is prefixed with spark-executor- on executors
  • receive.buffer.bytes is set to minimum 65536 (KAFKA-3135 workaround)

These modifications ensure proper exactly-once semantics and prevent consumer conflicts between driver and executors.

Error Handling

Common exceptions thrown by the Kafka integration:

  • NoOffsetForPartitionException: Thrown when no offset is available for a partition and auto.offset.reset is "none"
  • AssertionError: Thrown when using PreferBrokers without proper host mapping
  • IllegalArgumentException: Thrown for invalid Kafka parameters or configurations

Always handle these exceptions appropriately and ensure proper Kafka configuration, especially bootstrap.servers and consumer group settings.