CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly

Assembly JAR providing Apache Spark integration with Apache Kafka 0.10 for reliable distributed streaming data processing

Pending
Overview
Eval results
Files

location-strategies.mddocs/

Location Strategies

Strategies for scheduling Kafka consumers on executors to optimize performance and network locality. Location strategies control where Kafka consumers are created and cached, which is crucial for performance since Kafka 0.10 consumers prefetch messages.

Capabilities

PreferConsistent Strategy

Use this strategy in most cases - it consistently distributes partitions across all executors for balanced load distribution.

def PreferConsistent: LocationStrategy

Usage:

import org.apache.spark.streaming.kafka010.LocationStrategies

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,  // Recommended for most use cases
  consumerStrategy
)

When to use:

  • Default choice for most applications
  • When you want balanced load distribution across executors
  • When you don't have specific locality requirements
  • When your Kafka brokers are not co-located with Spark executors

PreferBrokers Strategy

Use this strategy only when your Spark executors are running on the same nodes as your Kafka brokers to minimize network traffic.

def PreferBrokers: LocationStrategy

Usage:

import org.apache.spark.streaming.kafka010.LocationStrategies

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferBrokers,  // Use only when executors are co-located with brokers
  consumerStrategy
)

When to use:

  • When Spark executors run on the same physical machines as Kafka brokers
  • To minimize network I/O by keeping data local
  • In containerized environments where Spark and Kafka pods are co-scheduled

Note: This strategy will throw an IllegalArgumentException when used with KafkaUtils.createRDD because RDDs don't have a driver consumer to look up broker locations.

PreferFixed Strategy

Use this strategy to explicitly control which executors handle specific topic partitions, useful for load balancing when you have uneven partition sizes.

def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy
def PreferFixed(hostMap: java.util.Map[TopicPartition, String]): LocationStrategy

Parameters:

  • hostMap: Map from TopicPartition to preferred host/executor - Any TopicPartition not in the map will use consistent location

Usage (Scala):

import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.kafka.common.TopicPartition
import scala.collection.mutable

// Create a map of preferred locations
val hostMap = mutable.Map[TopicPartition, String]()
hostMap += new TopicPartition("high-volume-topic", 0) -> "executor-host-1"
hostMap += new TopicPartition("high-volume-topic", 1) -> "executor-host-2"
hostMap += new TopicPartition("low-volume-topic", 0) -> "executor-host-3"

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferFixed(hostMap),
  consumerStrategy
)

Usage (Java):

import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.kafka.common.TopicPartition;
import java.util.HashMap;
import java.util.Map;

Map<TopicPartition, String> hostMap = new HashMap<>();
hostMap.put(new TopicPartition("high-volume-topic", 0), "executor-host-1");
hostMap.put(new TopicPartition("high-volume-topic", 1), "executor-host-2");

JavaInputDStream<ConsumerRecord<String, String>> stream = 
  KafkaUtils.createDirectStream(
    jssc,
    LocationStrategies.PreferFixed(hostMap),
    consumerStrategy
  );

When to use:

  • When you have uneven partition sizes and want to balance load manually
  • When certain partitions have different processing requirements
  • When you want to isolate high-volume partitions to specific executors
  • For debugging or testing specific partition assignments

Performance Considerations

Consumer Caching

Location strategies directly impact consumer caching performance:

  • Consistent placement allows consumers to be reused across micro-batches
  • Inconsistent placement forces consumer recreation, hurting performance
  • Cache configuration can be tuned via Spark configuration:
    • spark.streaming.kafka.consumer.cache.enabled=true (default)
    • spark.streaming.kafka.consumer.cache.maxCapacity=64 (default)
    • spark.streaming.kafka.consumer.cache.initialCapacity=16 (default)

Network Optimization

// Example: Optimize for network locality
val brokerExecutorMap = Map[TopicPartition, String](
  new TopicPartition("topic1", 0) -> "broker1.example.com",
  new TopicPartition("topic1", 1) -> "broker2.example.com",
  new TopicPartition("topic2", 0) -> "broker3.example.com"
)

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferFixed(brokerExecutorMap),
  consumerStrategy
)

Load Balancing

// Example: Balance high-volume partitions across dedicated executors
val loadBalancedMap = Map[TopicPartition, String](
  // Distribute high-volume partitions
  new TopicPartition("metrics", 0) -> "high-memory-executor-1",
  new TopicPartition("metrics", 1) -> "high-memory-executor-2",
  new TopicPartition("metrics", 2) -> "high-memory-executor-3",
  
  // Group low-volume partitions
  new TopicPartition("alerts", 0) -> "standard-executor-1",
  new TopicPartition("alerts", 1) -> "standard-executor-1"
)

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferFixed(loadBalancedMap),
  consumerStrategy
)

Best Practices

  1. Start with PreferConsistent: Use this as your default choice unless you have specific locality requirements.

  2. Use PreferBrokers carefully: Only when executors are truly co-located with Kafka brokers and you've verified the performance benefit.

  3. Monitor cache effectiveness: Check consumer cache hit rates and adjust cache settings if needed.

  4. Profile before optimizing: Measure actual performance impact before implementing complex PreferFixed strategies.

  5. Consider partition count: Location strategies become more important with higher partition counts.

  6. Account for dynamic scaling: PreferFixed strategies may need adjustment when cluster size changes.

Error Handling

Location strategies include built-in error handling:

  • PreferBrokers with RDD: Throws IllegalArgumentException with clear error message
  • Invalid host mapping: Falls back to consistent placement for unmapped partitions
  • Executor unavailability: Automatically reassigns to available executors
  • Cache misses: Gracefully creates new consumers when cache entries are unavailable

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly

docs

consumer-strategies.md

index.md

location-strategies.md

offset-management.md

per-partition-config.md

stream-creation.md

tile.json