or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-reading.mdconsumer-strategies.mddata-source.mdindex.mdoffset-management.mdschema-conversion.mdstreaming-sources.mdwriting.md
tile.json

consumer-strategies.mddocs/

Consumer Strategies

Consumer strategies define how the Kafka connector subscribes to and reads from Kafka topics. The connector supports three different subscription patterns to accommodate various use cases.

Capabilities

ConsumerStrategy Interface

Base interface for all consumer subscription strategies.

/**
 * Base interface for different Kafka subscription strategies
 * Handles consumer creation and topic partition discovery
 */
sealed trait ConsumerStrategy {
  
  /** Creates a Kafka consumer with the specified configuration */
  def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
  
  /** Creates a Kafka admin client for metadata operations */
  def createAdmin(kafkaParams: ju.Map[String, Object]): Admin
  
  /** Discovers all topic partitions assigned to this strategy */
  def assignedTopicPartitions(admin: Admin): Set[TopicPartition]
}

AssignStrategy

Direct assignment of specific topic partitions to the consumer. Provides precise control over which partitions are consumed.

/**
 * Assigns specific topic partitions to the consumer
 * Provides direct control over partition assignment
 * 
 * @param partitions Array of TopicPartition objects to assign
 */
case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy

Configuration:

Use the assign option with JSON specification of topic partitions:

.option("assign", """{"topic1":[0,1,2],"topic2":[0,1]}""")

Usage Examples:

// Assign specific partitions from multiple topics
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("assign", """{"orders":[0,1,2],"payments":[0,1]}""")
  .load()

// Assign single partition
val df2 = spark
  .readStream  
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("assign", """{"high-priority-topic":[0]}""")
  .load()

When to Use:

  • When you need precise control over partition assignment
  • For consuming specific partitions in multi-consumer scenarios
  • When implementing custom partitioning strategies
  • For testing with specific partition data

SubscribeStrategy

Subscribes to a fixed collection of topics by name. Kafka handles partition assignment automatically.

/**
 * Subscribes to a fixed collection of topics
 * Kafka handles partition assignment automatically
 * 
 * @param topics Sequence of topic names to subscribe to
 */
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy

Configuration:

Use the subscribe option with comma-delimited topic names:

.option("subscribe", "topic1,topic2,topic3")

Usage Examples:

// Subscribe to multiple topics
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "orders,payments,inventory")
  .load()

// Subscribe to single topic  
val df2 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "user-events")
  .load()

When to Use:

  • When you want to consume all partitions from specific topics
  • For simple topic subscription scenarios
  • When Kafka's built-in partition assignment is suitable
  • Most common use case for topic consumption

SubscribePatternStrategy

Uses a regular expression pattern to match topic names. Automatically discovers and subscribes to matching topics.

/**
 * Uses regex pattern to specify topics of interest
 * Automatically discovers matching topics
 * 
 * @param topicPattern Regular expression pattern for topic matching
 */
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy

Configuration:

Use the subscribePattern option with a regular expression:

.option("subscribePattern", "user-events-.*")

Usage Examples:

// Subscribe to all topics matching a pattern
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribePattern", "logs-.*")
  .load()

// Subscribe to topics with specific prefix and suffix
val df2 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribePattern", "analytics-.*-events")
  .load()

// Subscribe to topics from specific environment
val df3 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribePattern", "prod-.*")
  .load()

Pattern Compilation:

The topic pattern is compiled as a Java regex pattern using Pattern.compile(). The connector automatically discovers all topics matching the pattern at startup and during execution.

When to Use:

  • When topic names follow a predictable pattern
  • For dynamic topic discovery scenarios
  • When new topics matching the pattern should be automatically included
  • For environment-specific topic consumption (dev-, staging-, prod-)

Strategy Selection Logic

The connector validates that exactly one strategy is specified:

// Valid - exactly one strategy specified
.option("subscribe", "my-topic")

// Valid - exactly one strategy specified  
.option("subscribePattern", "logs-.*")

// Valid - exactly one strategy specified
.option("assign", """{"topic1":[0,1]}""")

// Invalid - no strategy specified
// Will throw: "One of the following options must be specified for Kafka source: subscribe, subscribePattern, assign"

// Invalid - multiple strategies specified
.option("subscribe", "topic1")
.option("subscribePattern", "topic.*")
// Will throw: "Only one of the following options can be specified for Kafka source: subscribe, subscribePattern, assign"

Validation Rules

AssignStrategy Validation

// Must be valid JSON with topic-partition mapping
.option("assign", """{"topic1":[0,1,2],"topic2":[0]}""")  // Valid

.option("assign", "topic1")  // Invalid - not JSON
// Will throw: "No topicpartitions to assign as specified value for option 'assign'"

SubscribeStrategy Validation

// Must contain at least one non-empty topic name
.option("subscribe", "topic1,topic2")     // Valid
.option("subscribe", "topic1")            // Valid

.option("subscribe", "")                  // Invalid - empty
.option("subscribe", ",,,")               // Invalid - no valid topics
// Will throw: "No topics to subscribe to as specified value for option 'subscribe'"

SubscribePatternStrategy Validation

// Must be non-empty pattern
.option("subscribePattern", "logs-.*")    // Valid
.option("subscribePattern", ".*")         // Valid

.option("subscribePattern", "")           // Invalid - empty pattern
// Will throw: "Pattern to subscribe is empty as specified value for option 'subscribePattern'"

TopicPartition Type

All strategies work with the Kafka TopicPartition type:

// From org.apache.kafka.common.TopicPartition
case class TopicPartition(topic: String, partition: Int) {
  def topic(): String
  def partition(): Int
}

Consumer Group Behavior

Each strategy handles consumer groups differently:

  • AssignStrategy: Uses generated unique group ID, no rebalancing
  • SubscribeStrategy: Uses generated unique group ID, supports rebalancing
  • SubscribePatternStrategy: Uses generated unique group ID, supports rebalancing and topic discovery

The connector automatically generates unique group IDs to prevent interference between queries:

// Default group ID pattern for streaming queries
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

// Default group ID pattern for batch queries  
s"spark-kafka-relation-${UUID.randomUUID}"

// Custom group ID prefix (optional)
.option("groupIdPrefix", "my-app")
// Results in: "my-app-${UUID.randomUUID}-${metadataPath.hashCode}"