or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-processing.mdconsumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdstream-creation.md
tile.json

consumer-strategies.mddocs/

Consumer Strategies

Consumer strategies encapsulate how Kafka consumers are created and configured on driver and executors. They handle the complex setup required for Kafka 0.10+ consumers, including subscription management, offset initialization, and parameter validation. The strategy pattern allows for flexible consumer configuration while maintaining checkpoint compatibility.

Core Strategies

Subscribe

Subscribe to a collection of specific topics for dynamic partition assignment.

// Scala versions
def Subscribe[K, V](
  topics: Iterable[String],
  kafkaParams: collection.Map[String, Object]
): ConsumerStrategy[K, V]

def Subscribe[K, V](
  topics: Iterable[String],
  kafkaParams: collection.Map[String, Object],
  offsets: collection.Map[TopicPartition, Long]
): ConsumerStrategy[K, V]

// Java versions  
def Subscribe[K, V](
  topics: java.util.Collection[String],
  kafkaParams: java.util.Map[String, Object]
): ConsumerStrategy[K, V]

def Subscribe[K, V](
  topics: java.util.Collection[String],
  kafkaParams: java.util.Map[String, Object],
  offsets: java.util.Map[TopicPartition, java.lang.Long]
): ConsumerStrategy[K, V]

Parameters:

  • topics: Collection of topic names to subscribe to
  • kafkaParams: Kafka consumer configuration parameters
  • offsets: Optional initial offsets for specific partitions

Use when: You want to consume from specific known topics with automatic partition discovery.

SubscribePattern

Subscribe to all topics matching a regex pattern for dynamic topic and partition discovery.

// Scala versions
def SubscribePattern[K, V](
  pattern: java.util.regex.Pattern,
  kafkaParams: collection.Map[String, Object]
): ConsumerStrategy[K, V]

def SubscribePattern[K, V](
  pattern: java.util.regex.Pattern,
  kafkaParams: collection.Map[String, Object],
  offsets: collection.Map[TopicPartition, Long]
): ConsumerStrategy[K, V]

// Java versions
def SubscribePattern[K, V](
  pattern: java.util.regex.Pattern,
  kafkaParams: java.util.Map[String, Object]
): ConsumerStrategy[K, V]

def SubscribePattern[K, V](
  pattern: java.util.regex.Pattern,
  kafkaParams: java.util.Map[String, Object],
  offsets: java.util.Map[TopicPartition, java.lang.Long]
): ConsumerStrategy[K, V]

Parameters:

  • pattern: Regex pattern to match topic names
  • kafkaParams: Kafka consumer configuration parameters
  • offsets: Optional initial offsets for specific partitions

Use when: You want to dynamically discover and consume from topics matching a pattern.

Assign

Assign a fixed collection of specific TopicPartitions for static partition assignment.

// Scala versions
def Assign[K, V](
  topicPartitions: Iterable[TopicPartition],
  kafkaParams: collection.Map[String, Object]
): ConsumerStrategy[K, V]

def Assign[K, V](
  topicPartitions: Iterable[TopicPartition],
  kafkaParams: collection.Map[String, Object],
  offsets: collection.Map[TopicPartition, Long]
): ConsumerStrategy[K, V]

// Java versions
def Assign[K, V](
  topicPartitions: java.util.Collection[TopicPartition],
  kafkaParams: java.util.Map[String, Object]
): ConsumerStrategy[K, V]

def Assign[K, V](
  topicPartitions: java.util.Collection[TopicPartition],
  kafkaParams: java.util.Map[String, Object],
  offsets: java.util.Map[TopicPartition, java.lang.Long]
): ConsumerStrategy[K, V]

Parameters:

  • topicPartitions: Collection of specific TopicPartitions to assign
  • kafkaParams: Kafka consumer configuration parameters
  • offsets: Optional initial offsets for specific partitions

Use when: You want precise control over which partitions to consume from.

Usage Examples

Subscribe to Specific Topics

import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer], 
  "group.id" -> "my-consumer-group",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("orders", "payments", "users")
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  LocationStrategies.PreferConsistent,
  consumerStrategy
)

Subscribe with Initial Offsets

import org.apache.kafka.common.TopicPartition

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "my-consumer-group",
  "auto.offset.reset" -> "none" // Will use provided offsets
)

val topics = Array("orders", "payments")
val offsets = Map(
  new TopicPartition("orders", 0) -> 100L,
  new TopicPartition("orders", 1) -> 200L,
  new TopicPartition("payments", 0) -> 50L
)

val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)

Pattern-based Topic Subscription

import java.util.regex.Pattern

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "pattern-consumer-group"
)

// Subscribe to all topics starting with "events-"
val pattern = Pattern.compile("events-.*")
val consumerStrategy = ConsumerStrategies.SubscribePattern[String, String](pattern, kafkaParams)

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  LocationStrategies.PreferConsistent,
  consumerStrategy
)

Fixed Partition Assignment

import org.apache.kafka.common.TopicPartition

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "assigned-consumer-group"
)

// Assign specific partitions
val topicPartitions = Array(
  new TopicPartition("orders", 0),
  new TopicPartition("orders", 2),
  new TopicPartition("payments", 1)
)

val consumerStrategy = ConsumerStrategies.Assign[String, String](topicPartitions, kafkaParams)

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  LocationStrategies.PreferConsistent,
  consumerStrategy
)

Java API Examples

import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

// Subscribe strategy
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "java-consumer-group");

Collection<String> topics = Arrays.asList("topic1", "topic2");
ConsumerStrategy<String, String> strategy = ConsumerStrategies.Subscribe(topics, kafkaParams);

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
  javaStreamingContext,
  LocationStrategies.PreferConsistent(),
  strategy
);

Pattern Strategy with Java

import java.util.regex.Pattern;

Pattern pattern = Pattern.compile("logs-\\d{4}-\\d{2}-\\d{2}");
ConsumerStrategy<String, String> patternStrategy = 
  ConsumerStrategies.SubscribePattern(pattern, kafkaParams);

JavaInputDStream<ConsumerRecord<String, String>> patternStream = KafkaUtils.createDirectStream(
  javaStreamingContext,
  LocationStrategies.PreferConsistent(),
  patternStrategy
);

Configuration Details

Required Kafka Parameters

All strategies require these parameters:

  • bootstrap.servers: Kafka broker addresses
  • key.deserializer: Key deserializer class
  • value.deserializer: Value deserializer class

Important Kafka Parameters

  • group.id: Consumer group ID (required for Subscribe and SubscribePattern)
  • auto.offset.reset: What to do when no initial offset ("earliest", "latest", or "none")
  • enable.auto.commit: Set to false for manual offset management
  • session.timeout.ms: Session timeout for consumer group management
  • heartbeat.interval.ms: Heartbeat interval for consumer liveness

Offset Initialization

When providing initial offsets:

  1. With current offsets (restart from checkpoint): Uses current offsets from checkpoint
  2. Without current offsets (fresh start): Uses provided initial offsets
  3. No offsets provided: Uses auto.offset.reset configuration or committed offsets

Strategy Selection Guidelines

Use Subscribe When:

  • You know the specific topic names to consume from
  • You want automatic partition discovery as topics scale
  • You want consumer group management and rebalancing
  • You need dynamic partition assignment

Use SubscribePattern When:

  • Topics are created dynamically with predictable naming patterns
  • You want to automatically consume from new matching topics
  • You have time-based or categorized topic naming schemes
  • You need the most flexible topic discovery

Use Assign When:

  • You need precise control over partition assignment
  • You want to avoid consumer group coordination
  • You're implementing custom partition assignment logic
  • You're doing partition-specific processing

Advanced Configuration

KAFKA-3370 Workaround

The library automatically handles the KAFKA-3370 issue when auto.offset.reset is "none":

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "auto.offset.reset" -> "none", // Will trigger automatic workaround
  // ... other params
)

// The strategy automatically handles NoOffsetForPartitionException
val strategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)

Consumer Lifecycle Management

All strategies handle:

  • Consumer creation and configuration
  • Subscription or assignment setup
  • Offset seeking for initial positions
  • Consumer pause/resume for rate limiting
  • Proper cleanup and resource management

Error Handling

Common Exceptions

  • NoOffsetForPartitionException: Automatically handled when auto.offset.reset is "none"
  • InvalidTopicException: Thrown for invalid topic names
  • TimeoutException: Thrown for broker connectivity issues
  • AuthenticationException: Thrown for authentication failures

Configuration Validation

// Invalid configuration will be caught at consumer creation time
val invalidParams = Map[String, Object](
  "bootstrap.servers" -> "", // Empty - will cause error
  "key.deserializer" -> "invalid.class.name" // Invalid class - will cause error
)

try {
  val strategy = ConsumerStrategies.Subscribe[String, String](topics, invalidParams)
  // Error will occur when consumer is created, not when strategy is created
} catch {
  case e: Exception => println(s"Configuration error: ${e.getMessage}")
}

Important Notes

  • All consumer strategies are marked as @Experimental in Spark 2.4.8
  • Consumer strategies are serializable and checkpoint-compatible
  • The same Kafka parameters are used on driver and executors with automatic modifications
  • Consumer instances are automatically cached and managed per executor
  • Pattern-based subscription checks for new topics periodically
  • Fixed assignment (Assign) doesn't require consumer group coordination