Assembly JAR providing Apache Spark integration with Apache Kafka 0.10 for reliable distributed streaming data processing
—
Strategies for scheduling Kafka consumers on executors to optimize performance and network locality. Location strategies control where Kafka consumers are created and cached, which is crucial for performance since Kafka 0.10 consumers prefetch messages.
Use this strategy in most cases - it consistently distributes partitions across all executors for balanced load distribution.
def PreferConsistent: LocationStrategyUsage:
import org.apache.spark.streaming.kafka010.LocationStrategies
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent, // Recommended for most use cases
consumerStrategy
)When to use:
Use this strategy only when your Spark executors are running on the same nodes as your Kafka brokers to minimize network traffic.
def PreferBrokers: LocationStrategyUsage:
import org.apache.spark.streaming.kafka010.LocationStrategies
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferBrokers, // Use only when executors are co-located with brokers
consumerStrategy
)When to use:
Note: This strategy will throw an IllegalArgumentException when used with KafkaUtils.createRDD because RDDs don't have a driver consumer to look up broker locations.
Use this strategy to explicitly control which executors handle specific topic partitions, useful for load balancing when you have uneven partition sizes.
def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy
def PreferFixed(hostMap: java.util.Map[TopicPartition, String]): LocationStrategyParameters:
hostMap: Map from TopicPartition to preferred host/executor - Any TopicPartition not in the map will use consistent locationUsage (Scala):
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.kafka.common.TopicPartition
import scala.collection.mutable
// Create a map of preferred locations
val hostMap = mutable.Map[TopicPartition, String]()
hostMap += new TopicPartition("high-volume-topic", 0) -> "executor-host-1"
hostMap += new TopicPartition("high-volume-topic", 1) -> "executor-host-2"
hostMap += new TopicPartition("low-volume-topic", 0) -> "executor-host-3"
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferFixed(hostMap),
consumerStrategy
)Usage (Java):
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.kafka.common.TopicPartition;
import java.util.HashMap;
import java.util.Map;
Map<TopicPartition, String> hostMap = new HashMap<>();
hostMap.put(new TopicPartition("high-volume-topic", 0), "executor-host-1");
hostMap.put(new TopicPartition("high-volume-topic", 1), "executor-host-2");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferFixed(hostMap),
consumerStrategy
);When to use:
Location strategies directly impact consumer caching performance:
spark.streaming.kafka.consumer.cache.enabled=true (default)spark.streaming.kafka.consumer.cache.maxCapacity=64 (default)spark.streaming.kafka.consumer.cache.initialCapacity=16 (default)// Example: Optimize for network locality
val brokerExecutorMap = Map[TopicPartition, String](
new TopicPartition("topic1", 0) -> "broker1.example.com",
new TopicPartition("topic1", 1) -> "broker2.example.com",
new TopicPartition("topic2", 0) -> "broker3.example.com"
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferFixed(brokerExecutorMap),
consumerStrategy
)// Example: Balance high-volume partitions across dedicated executors
val loadBalancedMap = Map[TopicPartition, String](
// Distribute high-volume partitions
new TopicPartition("metrics", 0) -> "high-memory-executor-1",
new TopicPartition("metrics", 1) -> "high-memory-executor-2",
new TopicPartition("metrics", 2) -> "high-memory-executor-3",
// Group low-volume partitions
new TopicPartition("alerts", 0) -> "standard-executor-1",
new TopicPartition("alerts", 1) -> "standard-executor-1"
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferFixed(loadBalancedMap),
consumerStrategy
)Start with PreferConsistent: Use this as your default choice unless you have specific locality requirements.
Use PreferBrokers carefully: Only when executors are truly co-located with Kafka brokers and you've verified the performance benefit.
Monitor cache effectiveness: Check consumer cache hit rates and adjust cache settings if needed.
Profile before optimizing: Measure actual performance impact before implementing complex PreferFixed strategies.
Consider partition count: Location strategies become more important with higher partition counts.
Account for dynamic scaling: PreferFixed strategies may need adjustment when cluster size changes.
Location strategies include built-in error handling:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly