CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-2-11

Apache Spark streaming integration with Kafka 0.10+ providing exactly-once semantics and high-performance data consumption

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

Spark Streaming Kafka 0.10+ Integration

Apache Spark Streaming integration with Kafka 0.10+ that provides exactly-once semantics and high-performance real-time data processing. This library offers a direct approach to Kafka integration with better performance and reliability compared to earlier receiver-based approaches.

Package Information

  • Package Name: spark-streaming-kafka-0-10_2.11
  • Package Type: Maven
  • Language: Scala (with Java API support)
  • Group ID: org.apache.spark
  • Version: 2.4.8
  • Installation:
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.4.8</version>
    </dependency>

Core Imports

import org.apache.spark.streaming.kafka010._

For specific components:

import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.ConsumerStrategies

Java imports:

import org.apache.spark.streaming.kafka010.*;

Basic Usage

import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "spark-streaming-group",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topic1", "topic2")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  // Process RDD
  rdd.foreach(record => println(s"${record.key}: ${record.value}"))
  // Commit offsets after processing
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

Architecture

The Spark Streaming Kafka integration is built around several key components:

  • KafkaUtils: Primary factory object for creating Kafka RDDs and DStreams
  • DirectKafkaInputDStream: Core streaming implementation providing exactly-once semantics
  • KafkaRDD: Batch-oriented RDD for precise offset control
  • Location Strategies: Control consumer placement for optimal performance
  • Consumer Strategies: Flexible consumer configuration (Subscribe, SubscribePattern, Assign)
  • Offset Management: Precise offset control and commit functionality
  • Rate Control: Per-partition rate limiting and backpressure support

Capabilities

Stream Creation

Core functionality for creating Kafka DStreams with exactly-once semantics and configurable consumer strategies.

object KafkaUtils {
  def createDirectStream[K, V](
    ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V]
  ): InputDStream[ConsumerRecord[K, V]]
  
  def createDirectStream[K, V](
    ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V],
    perPartitionConfig: PerPartitionConfig
  ): InputDStream[ConsumerRecord[K, V]]
}

Stream Creation

Batch Processing

Batch-oriented interface for consuming specific offset ranges from Kafka with full control over exactly-once semantics.

object KafkaUtils {
  def createRDD[K, V](
    sc: SparkContext,
    kafkaParams: java.util.Map[String, Object],
    offsetRanges: Array[OffsetRange],
    locationStrategy: LocationStrategy
  ): RDD[ConsumerRecord[K, V]]
}

Batch Processing

Location Strategies

Control how Kafka consumers are scheduled across Spark executors for optimal performance and data locality.

object LocationStrategies {
  def PreferBrokers: LocationStrategy
  def PreferConsistent: LocationStrategy
  def PreferFixed(hostMap: Map[TopicPartition, String]): LocationStrategy
}

Location Strategies

Consumer Strategies

Flexible consumer configuration supporting topic subscription, pattern-based subscription, and partition assignment.

object ConsumerStrategies {
  def Subscribe[K, V](
    topics: Iterable[String],
    kafkaParams: Map[String, Object]
  ): ConsumerStrategy[K, V]
  
  def SubscribePattern[K, V](
    pattern: java.util.regex.Pattern,
    kafkaParams: Map[String, Object]
  ): ConsumerStrategy[K, V]
  
  def Assign[K, V](
    topicPartitions: Iterable[TopicPartition],
    kafkaParams: Map[String, Object]
  ): ConsumerStrategy[K, V]
}

Consumer Strategies

Offset Management

Precise offset tracking and management for exactly-once processing guarantees and reliable stream processing.

final class OffsetRange(
  val topic: String,
  val partition: Int,
  val fromOffset: Long,
  val untilOffset: Long
) {
  def topicPartition(): TopicPartition
  def count(): Long
}

trait HasOffsetRanges {
  def offsetRanges: Array[OffsetRange]
}

trait CanCommitOffsets {
  def commitAsync(offsetRanges: Array[OffsetRange]): Unit
  def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
}

Offset Management

Types

Core Types

// Kafka consumer record type (from kafka-clients)
import org.apache.kafka.clients.consumer.ConsumerRecord

// Kafka TopicPartition (from kafka-clients)
import org.apache.kafka.common.TopicPartition

// Spark streaming context
import org.apache.spark.streaming.StreamingContext

// Spark context for batch operations
import org.apache.spark.SparkContext

Configuration Types

abstract class PerPartitionConfig extends Serializable {
  def maxRatePerPartition(topicPartition: TopicPartition): Long
  def minRatePerPartition(topicPartition: TopicPartition): Long = 1L
}

Configuration

Spark Configuration Parameters

Key Spark configuration parameters that affect Kafka integration behavior:

// Rate limiting per partition (default: 0 = unlimited)
"spark.streaming.kafka.maxRatePerPartition" -> "1000"

// Minimum rate per partition (default: 1)
"spark.streaming.kafka.minRatePerPartition" -> "1"

// Consumer poll timeout in milliseconds (default: 120000)
"spark.streaming.kafka.consumer.poll.ms" -> "120000"

// Allow non-consecutive offsets (default: false)
"spark.streaming.kafka.allowNonConsecutiveOffsets" -> "false"

// Consumer cache configuration
"spark.streaming.kafka.consumer.cache.enabled" -> "true"
"spark.streaming.kafka.consumer.cache.capacity" -> "64"
"spark.streaming.kafka.consumer.cache.timeout" -> "300s"

Automatic Parameter Handling

The Kafka integration automatically modifies certain consumer parameters on executors for reliability:

  • enable.auto.commit is always set to false on executors
  • auto.offset.reset is set to none on executors
  • group.id is prefixed with spark-executor- on executors
  • receive.buffer.bytes is set to minimum 65536 (KAFKA-3135 workaround)

These modifications ensure proper exactly-once semantics and prevent consumer conflicts between driver and executors.

Error Handling

Common exceptions thrown by the Kafka integration:

  • NoOffsetForPartitionException: Thrown when no offset is available for a partition and auto.offset.reset is "none"
  • AssertionError: Thrown when using PreferBrokers without proper host mapping
  • IllegalArgumentException: Thrown for invalid Kafka parameters or configurations

Always handle these exceptions appropriately and ensure proper Kafka configuration, especially bootstrap.servers and consumer group settings.

docs

batch-processing.md

consumer-strategies.md

index.md

location-strategies.md

offset-management.md

stream-creation.md

tile.json