or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-processing.mdconfiguration.mdconsumer-strategies.mddata-writing.mdindex.mdoffset-management.mdstreaming-sources.md
tile.json

consumer-strategies.mddocs/

Consumer Strategies

Consumer strategies define how Spark connects to and consumes data from Kafka topics. The module provides three flexible patterns for topic assignment and subscription.

Capabilities

Consumer Strategy Base

Base trait for all consumer strategies that defines how Kafka consumers are created and configured.

/**
 * Base trait for Kafka consumer strategies
 */
sealed trait ConsumerStrategy {
  /**
   * Creates a Kafka consumer with strategy-specific configuration
   * @param kafkaParams Kafka consumer configuration parameters
   * @return Configured Kafka consumer
   */
  def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
}

Assign Strategy

Strategy for assigning specific topic partitions to consume from. Provides precise control over which partitions are consumed.

/**
 * Strategy for assigning specific topic partitions
 * @param partitions Array of TopicPartition objects to assign
 */
case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy

Usage Examples:

import org.apache.kafka.common.TopicPartition

// Assign specific partitions
val partitions = Array(
  new TopicPartition("topic1", 0),
  new TopicPartition("topic1", 1),
  new TopicPartition("topic2", 0)
)

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("assign", """{"topic1":[0,1],"topic2":[0]}""")
  .option("startingOffsets", "earliest")
  .load()

Subscribe Strategy

Strategy for subscribing to specific topic names. Automatically handles partition assignment and rebalancing.

/**
 * Strategy for subscribing to specific topic names
 * @param topics Sequence of topic names to subscribe to
 */
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy

Usage Examples:

// Subscribe to specific topics
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic1,topic2,topic3")
  .option("startingOffsets", "latest")
  .load()

// Subscribe to single topic
val singleTopicDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .load()

Subscribe Pattern Strategy

Strategy for subscribing to topics matching a regex pattern. Dynamically discovers new topics that match the pattern.

/**
 * Strategy for subscribing to topics matching a regex pattern
 * @param topicPattern Regular expression pattern for topic names
 */
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy

Usage Examples:

// Subscribe to topics matching pattern
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribePattern", "events_.*")
  .option("startingOffsets", "earliest")
  .load()

// Pattern for different environments
val envTopics = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribePattern", s"${environment}_.*")
  .load()

Strategy Selection Guidelines

Use AssignStrategy when:

  • You need precise control over partition assignment
  • Working with a static set of partitions
  • Implementing custom partition assignment logic
  • Handling partition-specific processing requirements

Use SubscribeStrategy when:

  • Working with a known set of topic names
  • Want automatic partition assignment and rebalancing
  • Topics may gain or lose partitions dynamically
  • Standard consumer group behavior is desired

Use SubscribePatternStrategy when:

  • Topics are created dynamically and follow naming patterns
  • Working with multi-tenant systems with topic per tenant
  • Need to consume from topics that may not exist at startup
  • Topic names follow predictable regex patterns

Configuration Integration

Consumer strategies are integrated with Spark's DataSource options:

// Strategy options (exactly one must be specified)
.option("assign", """{"topic1":[0,1],"topic2":[0]}""")     // AssignStrategy
.option("subscribe", "topic1,topic2")                        // SubscribeStrategy  
.option("subscribePattern", "events_.*")                     // SubscribePatternStrategy

// Additional Kafka consumer parameters
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")

Error Handling

The module validates consumer strategy configuration at query startup:

  • Multiple strategies: Throws IllegalArgumentException if more than one strategy is specified
  • No strategy: Throws IllegalArgumentException if no strategy is specified
  • Invalid assign JSON: Throws IllegalArgumentException for malformed partition assignments
  • Empty subscribe list: Throws IllegalArgumentException for empty topic lists
  • Empty pattern: Throws IllegalArgumentException for empty regex patterns

Advanced Configuration

Kafka Consumer Parameters

All consumer strategies support the full range of Kafka consumer configuration through prefixed parameters:

.option("kafka.session.timeout.ms", "30000")
.option("kafka.heartbeat.interval.ms", "3000")
.option("kafka.max.poll.records", "500")
.option("kafka.fetch.min.bytes", "1024")
.option("kafka.fetch.max.wait.ms", "500")

Unsupported Parameters

Certain Kafka consumer parameters are managed internally and cannot be overridden:

  • group.id - Automatically generated unique group IDs per query
  • auto.offset.reset - Controlled via startingOffsets option
  • key.deserializer - Fixed to ByteArrayDeserializer
  • value.deserializer - Fixed to ByteArrayDeserializer
  • enable.auto.commit - Disabled for offset management
  • interceptor.classes - Not supported for safety