or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-processing.mdconsumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdstream-creation.md
tile.json

location-strategies.mddocs/

Location Strategies

Location strategies control how Kafka consumers are scheduled for TopicPartitions on Spark executors. Since Kafka 0.10+ consumers prefetch messages, it's crucial for performance to keep cached consumers on appropriate executors rather than recreating them for every partition. The choice of location is a preference, not an absolute requirement - partitions may be scheduled elsewhere if needed.

Core Strategies

PreferConsistent

The recommended default strategy that consistently distributes partitions across all available executors.

def PreferConsistent: LocationStrategy

Use when: In most cases - provides good load balancing across executors.

Characteristics:

  • Distributes partitions evenly across all executors
  • Maintains consistent assignment across batches
  • No special host requirements
  • Best general-purpose strategy

PreferBrokers

Strategy for when your Spark executors are co-located on the same nodes as your Kafka brokers.

def PreferBrokers: LocationStrategy

Use when: Your executors run on the same physical nodes as Kafka brokers.

Characteristics:

  • Attempts to place consumers on the same hosts as Kafka brokers
  • Reduces network overhead by keeping data local
  • Requires executors to be on broker nodes
  • Cannot be used with createRDD (no driver consumer available)

PreferFixed (Scala Collection)

Strategy for custom partition-to-host mapping when you have uneven load distribution or specific placement requirements.

def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy

Parameters:

  • hostMap: collection.Map[TopicPartition, String] - Mapping from TopicPartition to preferred host

Use when: You have specific knowledge about partition load or host capabilities.

PreferFixed (Java Collection)

Java version of the fixed mapping strategy.

public static LocationStrategy PreferFixed(java.util.Map<TopicPartition, String> hostMap)

Parameters:

  • hostMap: java.util.Map[TopicPartition, String] - Mapping from TopicPartition to preferred host

Usage Examples

Default Strategy (Recommended)

import org.apache.spark.streaming.kafka010._

// Use PreferConsistent in most cases
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

Broker Co-location Strategy

import org.apache.spark.streaming.kafka010._

// Use when executors are on same nodes as Kafka brokers
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  LocationStrategies.PreferBrokers,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

// Note: Cannot be used with createRDD
// This will throw AssertionError:
// val rdd = KafkaUtils.createRDD[String, String](
//   sparkContext, kafkaParams, offsetRanges, LocationStrategies.PreferBrokers
// )

Custom Fixed Mapping (Scala)

import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.TopicPartition

// Define custom partition-to-host mapping
val hostMap = Map(
  new TopicPartition("high-volume-topic", 0) -> "executor-host-1",
  new TopicPartition("high-volume-topic", 1) -> "executor-host-2",
  new TopicPartition("low-volume-topic", 0) -> "executor-host-3"
)

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  LocationStrategies.PreferFixed(hostMap),
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

Custom Fixed Mapping (Java)

import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.common.TopicPartition;

Map<TopicPartition, String> hostMap = new HashMap<>();
hostMap.put(new TopicPartition("topic1", 0), "host1");
hostMap.put(new TopicPartition("topic1", 1), "host2");
hostMap.put(new TopicPartition("topic2", 0), "host3");

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
  javaStreamingContext,
  LocationStrategies.PreferFixed(hostMap),
  ConsumerStrategies.Subscribe(topics, kafkaParams)
);

Fallback Behavior with PreferFixed

import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.TopicPartition

// Only specify mapping for some partitions
val partialHostMap = Map(
  new TopicPartition("critical-topic", 0) -> "high-performance-host"
  // Other partitions not specified will use consistent location
)

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  LocationStrategies.PreferFixed(partialHostMap),
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

// Any TopicPartition not in the map will use consistent location strategy

Strategy Selection Guidelines

Use PreferConsistent When:

  • You don't have specific host requirements
  • You want simple, balanced partition distribution
  • You're unsure which strategy to use (default choice)
  • Your Kafka brokers and Spark executors are on different nodes

Use PreferBrokers When:

  • Your Spark executors run on the same nodes as Kafka brokers
  • You want to minimize network I/O
  • You're only using streaming (not batch RDD operations)
  • You have control over both Kafka and Spark cluster deployment

Use PreferFixed When:

  • You have uneven partition loads and want specific placement
  • Some partitions require more processing power than others
  • You have detailed knowledge of your cluster topology
  • You need to isolate certain partitions on specific hosts

Performance Considerations

Consumer Caching

  • Kafka consumers are cached per executor to avoid recreation overhead
  • Location preferences help maintain cache effectiveness
  • Consistent placement improves consumer reuse across batches

Network Locality

  • PreferBrokers reduces network traffic when possible
  • PreferFixed allows fine-tuned placement for optimal network usage
  • PreferConsistent provides balanced load without network optimization

Load Balancing

  • PreferConsistent ensures even distribution across executors
  • PreferFixed allows custom load balancing based on partition characteristics
  • PreferBrokers may create uneven load if broker hosts have different capabilities

Error Handling

PreferBrokers with createRDD

// This will throw AssertionError
try {
  val rdd = KafkaUtils.createRDD[String, String](
    sparkContext, kafkaParams, offsetRanges, LocationStrategies.PreferBrokers
  )
} catch {
  case e: AssertionError =>
    println("PreferBrokers cannot be used with createRDD - use PreferConsistent or PreferFixed")
}

Important Notes

  • All location strategies are marked as @Experimental in Spark 2.4.8
  • Location preferences are hints, not guarantees - Spark may place partitions elsewhere
  • Consumer instances are automatically managed and cached for performance
  • PreferBrokers requires driver consumer access and cannot be used with batch RDD operations
  • PreferFixed falls back to consistent location for unmapped TopicPartitions