or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconfiguration.mdconsumer-strategies.mddata-writing.mdindex.mdoffset-management.mdstreaming-sources.md

consumer-strategies.mddocs/

0

# Consumer Strategies

1

2

Consumer strategies define how Spark connects to and consumes data from Kafka topics. The module provides three flexible patterns for topic assignment and subscription.

3

4

## Capabilities

5

6

### Consumer Strategy Base

7

8

Base trait for all consumer strategies that defines how Kafka consumers are created and configured.

9

10

```scala { .api }

11

/**

12

* Base trait for Kafka consumer strategies

13

*/

14

sealed trait ConsumerStrategy {

15

/**

16

* Creates a Kafka consumer with strategy-specific configuration

17

* @param kafkaParams Kafka consumer configuration parameters

18

* @return Configured Kafka consumer

19

*/

20

def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]

21

}

22

```

23

24

### Assign Strategy

25

26

Strategy for assigning specific topic partitions to consume from. Provides precise control over which partitions are consumed.

27

28

```scala { .api }

29

/**

30

* Strategy for assigning specific topic partitions

31

* @param partitions Array of TopicPartition objects to assign

32

*/

33

case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy

34

```

35

36

**Usage Examples:**

37

38

```scala

39

import org.apache.kafka.common.TopicPartition

40

41

// Assign specific partitions

42

val partitions = Array(

43

new TopicPartition("topic1", 0),

44

new TopicPartition("topic1", 1),

45

new TopicPartition("topic2", 0)

46

)

47

48

val df = spark

49

.readStream

50

.format("kafka")

51

.option("kafka.bootstrap.servers", "localhost:9092")

52

.option("assign", """{"topic1":[0,1],"topic2":[0]}""")

53

.option("startingOffsets", "earliest")

54

.load()

55

```

56

57

### Subscribe Strategy

58

59

Strategy for subscribing to specific topic names. Automatically handles partition assignment and rebalancing.

60

61

```scala { .api }

62

/**

63

* Strategy for subscribing to specific topic names

64

* @param topics Sequence of topic names to subscribe to

65

*/

66

case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy

67

```

68

69

**Usage Examples:**

70

71

```scala

72

// Subscribe to specific topics

73

val df = spark

74

.readStream

75

.format("kafka")

76

.option("kafka.bootstrap.servers", "localhost:9092")

77

.option("subscribe", "topic1,topic2,topic3")

78

.option("startingOffsets", "latest")

79

.load()

80

81

// Subscribe to single topic

82

val singleTopicDF = spark

83

.readStream

84

.format("kafka")

85

.option("kafka.bootstrap.servers", "localhost:9092")

86

.option("subscribe", "events")

87

.load()

88

```

89

90

### Subscribe Pattern Strategy

91

92

Strategy for subscribing to topics matching a regex pattern. Dynamically discovers new topics that match the pattern.

93

94

```scala { .api }

95

/**

96

* Strategy for subscribing to topics matching a regex pattern

97

* @param topicPattern Regular expression pattern for topic names

98

*/

99

case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy

100

```

101

102

**Usage Examples:**

103

104

```scala

105

// Subscribe to topics matching pattern

106

val df = spark

107

.readStream

108

.format("kafka")

109

.option("kafka.bootstrap.servers", "localhost:9092")

110

.option("subscribePattern", "events_.*")

111

.option("startingOffsets", "earliest")

112

.load()

113

114

// Pattern for different environments

115

val envTopics = spark

116

.readStream

117

.format("kafka")

118

.option("kafka.bootstrap.servers", "localhost:9092")

119

.option("subscribePattern", s"${environment}_.*")

120

.load()

121

```

122

123

## Strategy Selection Guidelines

124

125

### Use AssignStrategy when:

126

- You need precise control over partition assignment

127

- Working with a static set of partitions

128

- Implementing custom partition assignment logic

129

- Handling partition-specific processing requirements

130

131

### Use SubscribeStrategy when:

132

- Working with a known set of topic names

133

- Want automatic partition assignment and rebalancing

134

- Topics may gain or lose partitions dynamically

135

- Standard consumer group behavior is desired

136

137

### Use SubscribePatternStrategy when:

138

- Topics are created dynamically and follow naming patterns

139

- Working with multi-tenant systems with topic per tenant

140

- Need to consume from topics that may not exist at startup

141

- Topic names follow predictable regex patterns

142

143

## Configuration Integration

144

145

Consumer strategies are integrated with Spark's DataSource options:

146

147

```scala

148

// Strategy options (exactly one must be specified)

149

.option("assign", """{"topic1":[0,1],"topic2":[0]}""") // AssignStrategy

150

.option("subscribe", "topic1,topic2") // SubscribeStrategy

151

.option("subscribePattern", "events_.*") // SubscribePatternStrategy

152

153

// Additional Kafka consumer parameters

154

.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")

155

.option("kafka.security.protocol", "SASL_SSL")

156

.option("kafka.sasl.mechanism", "PLAIN")

157

```

158

159

## Error Handling

160

161

The module validates consumer strategy configuration at query startup:

162

163

- **Multiple strategies**: Throws `IllegalArgumentException` if more than one strategy is specified

164

- **No strategy**: Throws `IllegalArgumentException` if no strategy is specified

165

- **Invalid assign JSON**: Throws `IllegalArgumentException` for malformed partition assignments

166

- **Empty subscribe list**: Throws `IllegalArgumentException` for empty topic lists

167

- **Empty pattern**: Throws `IllegalArgumentException` for empty regex patterns

168

169

## Advanced Configuration

170

171

### Kafka Consumer Parameters

172

173

All consumer strategies support the full range of Kafka consumer configuration through prefixed parameters:

174

175

```scala

176

.option("kafka.session.timeout.ms", "30000")

177

.option("kafka.heartbeat.interval.ms", "3000")

178

.option("kafka.max.poll.records", "500")

179

.option("kafka.fetch.min.bytes", "1024")

180

.option("kafka.fetch.max.wait.ms", "500")

181

```

182

183

### Unsupported Parameters

184

185

Certain Kafka consumer parameters are managed internally and cannot be overridden:

186

187

- `group.id` - Automatically generated unique group IDs per query

188

- `auto.offset.reset` - Controlled via `startingOffsets` option

189

- `key.deserializer` - Fixed to `ByteArrayDeserializer`

190

- `value.deserializer` - Fixed to `ByteArrayDeserializer`

191

- `enable.auto.commit` - Disabled for offset management

192

- `interceptor.classes` - Not supported for safety