Consumer strategies encapsulate how Kafka consumers are created and configured on driver and executors. They handle the complex setup required for Kafka 0.10+ consumers, including subscription management, offset initialization, and parameter validation. The strategy pattern allows for flexible consumer configuration while maintaining checkpoint compatibility.
Subscribe to a collection of specific topics for dynamic partition assignment.
// Scala versions
def Subscribe[K, V](
topics: Iterable[String],
kafkaParams: collection.Map[String, Object]
): ConsumerStrategy[K, V]
def Subscribe[K, V](
topics: Iterable[String],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]
): ConsumerStrategy[K, V]
// Java versions
def Subscribe[K, V](
topics: java.util.Collection[String],
kafkaParams: java.util.Map[String, Object]
): ConsumerStrategy[K, V]
def Subscribe[K, V](
topics: java.util.Collection[String],
kafkaParams: java.util.Map[String, Object],
offsets: java.util.Map[TopicPartition, java.lang.Long]
): ConsumerStrategy[K, V]Parameters:
topics: Collection of topic names to subscribe tokafkaParams: Kafka consumer configuration parametersoffsets: Optional initial offsets for specific partitionsUse when: You want to consume from specific known topics with automatic partition discovery.
Subscribe to all topics matching a regex pattern for dynamic topic and partition discovery.
// Scala versions
def SubscribePattern[K, V](
pattern: java.util.regex.Pattern,
kafkaParams: collection.Map[String, Object]
): ConsumerStrategy[K, V]
def SubscribePattern[K, V](
pattern: java.util.regex.Pattern,
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]
): ConsumerStrategy[K, V]
// Java versions
def SubscribePattern[K, V](
pattern: java.util.regex.Pattern,
kafkaParams: java.util.Map[String, Object]
): ConsumerStrategy[K, V]
def SubscribePattern[K, V](
pattern: java.util.regex.Pattern,
kafkaParams: java.util.Map[String, Object],
offsets: java.util.Map[TopicPartition, java.lang.Long]
): ConsumerStrategy[K, V]Parameters:
pattern: Regex pattern to match topic nameskafkaParams: Kafka consumer configuration parametersoffsets: Optional initial offsets for specific partitionsUse when: You want to dynamically discover and consume from topics matching a pattern.
Assign a fixed collection of specific TopicPartitions for static partition assignment.
// Scala versions
def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object]
): ConsumerStrategy[K, V]
def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]
): ConsumerStrategy[K, V]
// Java versions
def Assign[K, V](
topicPartitions: java.util.Collection[TopicPartition],
kafkaParams: java.util.Map[String, Object]
): ConsumerStrategy[K, V]
def Assign[K, V](
topicPartitions: java.util.Collection[TopicPartition],
kafkaParams: java.util.Map[String, Object],
offsets: java.util.Map[TopicPartition, java.lang.Long]
): ConsumerStrategy[K, V]Parameters:
topicPartitions: Collection of specific TopicPartitions to assignkafkaParams: Kafka consumer configuration parametersoffsets: Optional initial offsets for specific partitionsUse when: You want precise control over which partitions to consume from.
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "my-consumer-group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("orders", "payments", "users")
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
consumerStrategy
)import org.apache.kafka.common.TopicPartition
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "my-consumer-group",
"auto.offset.reset" -> "none" // Will use provided offsets
)
val topics = Array("orders", "payments")
val offsets = Map(
new TopicPartition("orders", 0) -> 100L,
new TopicPartition("orders", 1) -> 200L,
new TopicPartition("payments", 0) -> 50L
)
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)import java.util.regex.Pattern
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "pattern-consumer-group"
)
// Subscribe to all topics starting with "events-"
val pattern = Pattern.compile("events-.*")
val consumerStrategy = ConsumerStrategies.SubscribePattern[String, String](pattern, kafkaParams)
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
consumerStrategy
)import org.apache.kafka.common.TopicPartition
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "assigned-consumer-group"
)
// Assign specific partitions
val topicPartitions = Array(
new TopicPartition("orders", 0),
new TopicPartition("orders", 2),
new TopicPartition("payments", 1)
)
val consumerStrategy = ConsumerStrategies.Assign[String, String](topicPartitions, kafkaParams)
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
consumerStrategy
)import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
// Subscribe strategy
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "java-consumer-group");
Collection<String> topics = Arrays.asList("topic1", "topic2");
ConsumerStrategy<String, String> strategy = ConsumerStrategies.Subscribe(topics, kafkaParams);
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
javaStreamingContext,
LocationStrategies.PreferConsistent(),
strategy
);import java.util.regex.Pattern;
Pattern pattern = Pattern.compile("logs-\\d{4}-\\d{2}-\\d{2}");
ConsumerStrategy<String, String> patternStrategy =
ConsumerStrategies.SubscribePattern(pattern, kafkaParams);
JavaInputDStream<ConsumerRecord<String, String>> patternStream = KafkaUtils.createDirectStream(
javaStreamingContext,
LocationStrategies.PreferConsistent(),
patternStrategy
);All strategies require these parameters:
bootstrap.servers: Kafka broker addresseskey.deserializer: Key deserializer classvalue.deserializer: Value deserializer classgroup.id: Consumer group ID (required for Subscribe and SubscribePattern)auto.offset.reset: What to do when no initial offset ("earliest", "latest", or "none")enable.auto.commit: Set to false for manual offset managementsession.timeout.ms: Session timeout for consumer group managementheartbeat.interval.ms: Heartbeat interval for consumer livenessWhen providing initial offsets:
auto.offset.reset configuration or committed offsetsThe library automatically handles the KAFKA-3370 issue when auto.offset.reset is "none":
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"auto.offset.reset" -> "none", // Will trigger automatic workaround
// ... other params
)
// The strategy automatically handles NoOffsetForPartitionException
val strategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)All strategies handle:
auto.offset.reset is "none"// Invalid configuration will be caught at consumer creation time
val invalidParams = Map[String, Object](
"bootstrap.servers" -> "", // Empty - will cause error
"key.deserializer" -> "invalid.class.name" // Invalid class - will cause error
)
try {
val strategy = ConsumerStrategies.Subscribe[String, String](topics, invalidParams)
// Error will occur when consumer is created, not when strategy is created
} catch {
case e: Exception => println(s"Configuration error: ${e.getMessage}")
}@Experimental in Spark 2.4.8