or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

consumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdper-partition-config.mdstream-creation.md
tile.json

tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly

Assembly JAR providing Apache Spark integration with Apache Kafka 0.10 for reliable distributed streaming data processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-kafka-0-10-assembly_2.13@4.0.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly@4.0.0

index.mddocs/

Spark Streaming Kafka 0.10 Assembly

Apache Spark's integration with Apache Kafka 0.10 for reliable distributed streaming data processing. This assembly JAR packages the core Kafka 0.10 streaming library and all its dependencies into a single deployable JAR file, enabling consumption of data from Kafka topics as Spark DStreams and RDDs with exactly-once delivery semantics.

Package Information

  • Package Name: spark-streaming-kafka-0-10-assembly_2.13
  • Package Type: maven
  • Language: Scala
  • Version: 4.0.0
  • License: Apache-2.0
  • Installation: Add to Maven dependencies or include JAR in Spark classpath
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10-assembly_2.13</artifactId>
  <version>4.0.0</version>
</dependency>

Core Imports

import org.apache.spark.streaming.kafka010._

For specific functionality:

import org.apache.spark.streaming.kafka010.{KafkaUtils, LocationStrategies, ConsumerStrategies}
import org.apache.spark.streaming.kafka010.{OffsetRange, HasOffsetRanges, CanCommitOffsets}

Basic Usage

Streaming with Direct Stream

import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import java.util.HashMap

val ssc = new StreamingContext(spark.sparkContext, Seconds(5))

val kafkaParams = new HashMap[String, Object]()
kafkaParams.put("bootstrap.servers", "localhost:9092")
kafkaParams.put("key.deserializer", classOf[StringDeserializer])
kafkaParams.put("value.deserializer", classOf[StringDeserializer])
kafkaParams.put("group.id", "my-group")
kafkaParams.put("auto.offset.reset", "latest")

val topics = Array("topic1", "topic2")

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

// Process the stream
stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreach { record =>
    println(s"${record.key}: ${record.value}")
  }
  // Commit offsets if needed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

ssc.start()
ssc.awaitTermination()

Batch Processing with RDD

import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.TopicPartition

val offsetRanges = Array(
  OffsetRange("topic1", 0, 0, 1000),
  OffsetRange("topic1", 1, 0, 1000)
)

val rdd = KafkaUtils.createRDD[String, String](
  spark.sparkContext,
  kafkaParams,
  offsetRanges,
  LocationStrategies.PreferConsistent
)

// Process the RDD
rdd.foreach { record =>
  println(s"${record.key}: ${record.value}")
}

Architecture

The Kafka 0.10 integration is built around several key components:

  • KafkaUtils: Main entry point providing factory methods for creating Kafka RDDs and DStreams
  • Location Strategies: Control where Kafka consumers are scheduled on executors for optimal performance
  • Consumer Strategies: Define how Kafka consumers are created and configured (Subscribe, SubscribePattern, Assign)
  • Offset Management: Handle offset ranges and commit operations for exactly-once semantics
  • Per-Partition Configuration: Configure rate limiting and other settings on a per-partition basis
  • Consumer Caching: Optimize performance by caching Kafka consumers across partition computations

Capabilities

Stream Creation

Core functionality for creating Kafka-backed Spark streams and RDDs with configurable location strategies and consumer strategies.

object KafkaUtils {
  // Scala Stream Creation
  def createDirectStream[K, V](
    ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V]
  ): InputDStream[ConsumerRecord[K, V]]
  
  def createDirectStream[K, V](
    ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V],
    perPartitionConfig: PerPartitionConfig
  ): InputDStream[ConsumerRecord[K, V]]
  
  // Scala RDD Creation
  def createRDD[K, V](
    sc: SparkContext,
    kafkaParams: java.util.Map[String, Object],
    offsetRanges: Array[OffsetRange],
    locationStrategy: LocationStrategy
  ): RDD[ConsumerRecord[K, V]]
}

Stream Creation

Location Strategies

Strategies for scheduling Kafka consumers on executors to optimize performance and network locality.

object LocationStrategies {
  def PreferBrokers: LocationStrategy
  def PreferConsistent: LocationStrategy
  def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy
  def PreferFixed(hostMap: java.util.Map[TopicPartition, String]): LocationStrategy
}

Location Strategies

Consumer Strategies

Configuration strategies for creating and managing Kafka consumers with different subscription patterns.

object ConsumerStrategies {
  // Subscribe to specific topics
  def Subscribe[K, V](
    topics: Iterable[String],
    kafkaParams: collection.Map[String, Object]
  ): ConsumerStrategy[K, V]
  
  // Subscribe to topics matching a pattern
  def SubscribePattern[K, V](
    pattern: java.util.regex.Pattern,
    kafkaParams: collection.Map[String, Object]
  ): ConsumerStrategy[K, V]
  
  // Assign specific topic partitions
  def Assign[K, V](
    topicPartitions: Iterable[TopicPartition],
    kafkaParams: collection.Map[String, Object]
  ): ConsumerStrategy[K, V]
}

Consumer Strategies

Offset Management

Comprehensive offset range management and commit operations for exactly-once processing semantics.

final class OffsetRange {
  val topic: String
  val partition: Int
  val fromOffset: Long
  val untilOffset: Long
  
  def topicPartition(): TopicPartition
  def count(): Long
}

object OffsetRange {
  def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
  def create(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
}

trait HasOffsetRanges {
  def offsetRanges: Array[OffsetRange]
}

trait CanCommitOffsets {
  def commitAsync(offsetRanges: Array[OffsetRange]): Unit
  def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
}

Offset Management

Per-Partition Configuration

Configuration interface for controlling processing rates and other settings on a per-partition basis.

abstract class PerPartitionConfig extends Serializable {
  def maxRatePerPartition(topicPartition: TopicPartition): Long
  def minRatePerPartition(topicPartition: TopicPartition): Long
}

Per-Partition Configuration

Configuration Parameters

The integration supports numerous Spark configuration parameters for fine-tuning performance:

  • spark.streaming.kafka.maxRatePerPartition: Maximum records per second per partition (default: 0 = unlimited)
  • spark.streaming.kafka.minRatePerPartition: Minimum records per second per partition (default: 1)
  • spark.streaming.kafka.consumer.cache.enabled: Enable consumer caching (default: true)
  • spark.streaming.kafka.consumer.cache.maxCapacity: Maximum cached consumers (default: 64)
  • spark.streaming.kafka.consumer.cache.initialCapacity: Initial cache capacity (default: 16)
  • spark.streaming.kafka.consumer.cache.loadFactor: Cache load factor (default: 0.75)
  • spark.streaming.kafka.consumer.poll.ms: Consumer poll timeout in milliseconds (optional)
  • spark.streaming.kafka.allowNonConsecutiveOffsets: Allow non-consecutive offsets (default: false)

Types

// Core Kafka types (from kafka-clients dependency)
import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetCommitCallback}
import org.apache.kafka.common.TopicPartition

// Spark types
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.rdd.RDD
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaInputDStream}

// Java types
import java.util.{Map => JMap, Collection => JCollection}
import java.util.regex.Pattern
import java.{lang => jl}