Location strategies control how Kafka consumers are scheduled for TopicPartitions on Spark executors. Since Kafka 0.10+ consumers prefetch messages, it's crucial for performance to keep cached consumers on appropriate executors rather than recreating them for every partition. The choice of location is a preference, not an absolute requirement - partitions may be scheduled elsewhere if needed.
The recommended default strategy that consistently distributes partitions across all available executors.
def PreferConsistent: LocationStrategyUse when: In most cases - provides good load balancing across executors.
Characteristics:
Strategy for when your Spark executors are co-located on the same nodes as your Kafka brokers.
def PreferBrokers: LocationStrategyUse when: Your executors run on the same physical nodes as Kafka brokers.
Characteristics:
createRDD (no driver consumer available)Strategy for custom partition-to-host mapping when you have uneven load distribution or specific placement requirements.
def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategyParameters:
hostMap: collection.Map[TopicPartition, String] - Mapping from TopicPartition to preferred hostUse when: You have specific knowledge about partition load or host capabilities.
Java version of the fixed mapping strategy.
public static LocationStrategy PreferFixed(java.util.Map<TopicPartition, String> hostMap)Parameters:
hostMap: java.util.Map[TopicPartition, String] - Mapping from TopicPartition to preferred hostimport org.apache.spark.streaming.kafka010._
// Use PreferConsistent in most cases
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)import org.apache.spark.streaming.kafka010._
// Use when executors are on same nodes as Kafka brokers
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferBrokers,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// Note: Cannot be used with createRDD
// This will throw AssertionError:
// val rdd = KafkaUtils.createRDD[String, String](
// sparkContext, kafkaParams, offsetRanges, LocationStrategies.PreferBrokers
// )import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.TopicPartition
// Define custom partition-to-host mapping
val hostMap = Map(
new TopicPartition("high-volume-topic", 0) -> "executor-host-1",
new TopicPartition("high-volume-topic", 1) -> "executor-host-2",
new TopicPartition("low-volume-topic", 0) -> "executor-host-3"
)
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferFixed(hostMap),
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.common.TopicPartition;
Map<TopicPartition, String> hostMap = new HashMap<>();
hostMap.put(new TopicPartition("topic1", 0), "host1");
hostMap.put(new TopicPartition("topic1", 1), "host2");
hostMap.put(new TopicPartition("topic2", 0), "host3");
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
javaStreamingContext,
LocationStrategies.PreferFixed(hostMap),
ConsumerStrategies.Subscribe(topics, kafkaParams)
);import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.TopicPartition
// Only specify mapping for some partitions
val partialHostMap = Map(
new TopicPartition("critical-topic", 0) -> "high-performance-host"
// Other partitions not specified will use consistent location
)
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferFixed(partialHostMap),
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// Any TopicPartition not in the map will use consistent location strategy// This will throw AssertionError
try {
val rdd = KafkaUtils.createRDD[String, String](
sparkContext, kafkaParams, offsetRanges, LocationStrategies.PreferBrokers
)
} catch {
case e: AssertionError =>
println("PreferBrokers cannot be used with createRDD - use PreferConsistent or PreferFixed")
}@Experimental in Spark 2.4.8