or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-reading.mdconsumer-strategies.mddata-source.mdindex.mdoffset-management.mdschema-conversion.mdstreaming-sources.mdwriting.md

consumer-strategies.mddocs/

0

# Consumer Strategies

1

2

Consumer strategies define how the Kafka connector subscribes to and reads from Kafka topics. The connector supports three different subscription patterns to accommodate various use cases.

3

4

## Capabilities

5

6

### ConsumerStrategy Interface

7

8

Base interface for all consumer subscription strategies.

9

10

```scala { .api }

11

/**

12

* Base interface for different Kafka subscription strategies

13

* Handles consumer creation and topic partition discovery

14

*/

15

sealed trait ConsumerStrategy {

16

17

/** Creates a Kafka consumer with the specified configuration */

18

def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]

19

20

/** Creates a Kafka admin client for metadata operations */

21

def createAdmin(kafkaParams: ju.Map[String, Object]): Admin

22

23

/** Discovers all topic partitions assigned to this strategy */

24

def assignedTopicPartitions(admin: Admin): Set[TopicPartition]

25

}

26

```

27

28

### AssignStrategy

29

30

Direct assignment of specific topic partitions to the consumer. Provides precise control over which partitions are consumed.

31

32

```scala { .api }

33

/**

34

* Assigns specific topic partitions to the consumer

35

* Provides direct control over partition assignment

36

*

37

* @param partitions Array of TopicPartition objects to assign

38

*/

39

case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy

40

```

41

42

**Configuration:**

43

44

Use the `assign` option with JSON specification of topic partitions:

45

46

```scala

47

.option("assign", """{"topic1":[0,1,2],"topic2":[0,1]}""")

48

```

49

50

**Usage Examples:**

51

52

```scala

53

// Assign specific partitions from multiple topics

54

val df = spark

55

.readStream

56

.format("kafka")

57

.option("kafka.bootstrap.servers", "localhost:9092")

58

.option("assign", """{"orders":[0,1,2],"payments":[0,1]}""")

59

.load()

60

61

// Assign single partition

62

val df2 = spark

63

.readStream

64

.format("kafka")

65

.option("kafka.bootstrap.servers", "localhost:9092")

66

.option("assign", """{"high-priority-topic":[0]}""")

67

.load()

68

```

69

70

**When to Use:**

71

- When you need precise control over partition assignment

72

- For consuming specific partitions in multi-consumer scenarios

73

- When implementing custom partitioning strategies

74

- For testing with specific partition data

75

76

### SubscribeStrategy

77

78

Subscribes to a fixed collection of topics by name. Kafka handles partition assignment automatically.

79

80

```scala { .api }

81

/**

82

* Subscribes to a fixed collection of topics

83

* Kafka handles partition assignment automatically

84

*

85

* @param topics Sequence of topic names to subscribe to

86

*/

87

case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy

88

```

89

90

**Configuration:**

91

92

Use the `subscribe` option with comma-delimited topic names:

93

94

```scala

95

.option("subscribe", "topic1,topic2,topic3")

96

```

97

98

**Usage Examples:**

99

100

```scala

101

// Subscribe to multiple topics

102

val df = spark

103

.readStream

104

.format("kafka")

105

.option("kafka.bootstrap.servers", "localhost:9092")

106

.option("subscribe", "orders,payments,inventory")

107

.load()

108

109

// Subscribe to single topic

110

val df2 = spark

111

.readStream

112

.format("kafka")

113

.option("kafka.bootstrap.servers", "localhost:9092")

114

.option("subscribe", "user-events")

115

.load()

116

```

117

118

**When to Use:**

119

- When you want to consume all partitions from specific topics

120

- For simple topic subscription scenarios

121

- When Kafka's built-in partition assignment is suitable

122

- Most common use case for topic consumption

123

124

### SubscribePatternStrategy

125

126

Uses a regular expression pattern to match topic names. Automatically discovers and subscribes to matching topics.

127

128

```scala { .api }

129

/**

130

* Uses regex pattern to specify topics of interest

131

* Automatically discovers matching topics

132

*

133

* @param topicPattern Regular expression pattern for topic matching

134

*/

135

case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy

136

```

137

138

**Configuration:**

139

140

Use the `subscribePattern` option with a regular expression:

141

142

```scala

143

.option("subscribePattern", "user-events-.*")

144

```

145

146

**Usage Examples:**

147

148

```scala

149

// Subscribe to all topics matching a pattern

150

val df = spark

151

.readStream

152

.format("kafka")

153

.option("kafka.bootstrap.servers", "localhost:9092")

154

.option("subscribePattern", "logs-.*")

155

.load()

156

157

// Subscribe to topics with specific prefix and suffix

158

val df2 = spark

159

.readStream

160

.format("kafka")

161

.option("kafka.bootstrap.servers", "localhost:9092")

162

.option("subscribePattern", "analytics-.*-events")

163

.load()

164

165

// Subscribe to topics from specific environment

166

val df3 = spark

167

.readStream

168

.format("kafka")

169

.option("kafka.bootstrap.servers", "localhost:9092")

170

.option("subscribePattern", "prod-.*")

171

.load()

172

```

173

174

**Pattern Compilation:**

175

176

The topic pattern is compiled as a Java regex pattern using `Pattern.compile()`. The connector automatically discovers all topics matching the pattern at startup and during execution.

177

178

**When to Use:**

179

- When topic names follow a predictable pattern

180

- For dynamic topic discovery scenarios

181

- When new topics matching the pattern should be automatically included

182

- For environment-specific topic consumption (dev-, staging-, prod-)

183

184

## Strategy Selection Logic

185

186

The connector validates that exactly one strategy is specified:

187

188

```scala

189

// Valid - exactly one strategy specified

190

.option("subscribe", "my-topic")

191

192

// Valid - exactly one strategy specified

193

.option("subscribePattern", "logs-.*")

194

195

// Valid - exactly one strategy specified

196

.option("assign", """{"topic1":[0,1]}""")

197

198

// Invalid - no strategy specified

199

// Will throw: "One of the following options must be specified for Kafka source: subscribe, subscribePattern, assign"

200

201

// Invalid - multiple strategies specified

202

.option("subscribe", "topic1")

203

.option("subscribePattern", "topic.*")

204

// Will throw: "Only one of the following options can be specified for Kafka source: subscribe, subscribePattern, assign"

205

```

206

207

## Validation Rules

208

209

### AssignStrategy Validation

210

211

```scala

212

// Must be valid JSON with topic-partition mapping

213

.option("assign", """{"topic1":[0,1,2],"topic2":[0]}""") // Valid

214

215

.option("assign", "topic1") // Invalid - not JSON

216

// Will throw: "No topicpartitions to assign as specified value for option 'assign'"

217

```

218

219

### SubscribeStrategy Validation

220

221

```scala

222

// Must contain at least one non-empty topic name

223

.option("subscribe", "topic1,topic2") // Valid

224

.option("subscribe", "topic1") // Valid

225

226

.option("subscribe", "") // Invalid - empty

227

.option("subscribe", ",,,") // Invalid - no valid topics

228

// Will throw: "No topics to subscribe to as specified value for option 'subscribe'"

229

```

230

231

### SubscribePatternStrategy Validation

232

233

```scala

234

// Must be non-empty pattern

235

.option("subscribePattern", "logs-.*") // Valid

236

.option("subscribePattern", ".*") // Valid

237

238

.option("subscribePattern", "") // Invalid - empty pattern

239

// Will throw: "Pattern to subscribe is empty as specified value for option 'subscribePattern'"

240

```

241

242

## TopicPartition Type

243

244

All strategies work with the Kafka `TopicPartition` type:

245

246

```scala { .api }

247

// From org.apache.kafka.common.TopicPartition

248

case class TopicPartition(topic: String, partition: Int) {

249

def topic(): String

250

def partition(): Int

251

}

252

```

253

254

## Consumer Group Behavior

255

256

Each strategy handles consumer groups differently:

257

258

- **AssignStrategy**: Uses generated unique group ID, no rebalancing

259

- **SubscribeStrategy**: Uses generated unique group ID, supports rebalancing

260

- **SubscribePatternStrategy**: Uses generated unique group ID, supports rebalancing and topic discovery

261

262

The connector automatically generates unique group IDs to prevent interference between queries:

263

264

```scala

265

// Default group ID pattern for streaming queries

266

s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

267

268

// Default group ID pattern for batch queries

269

s"spark-kafka-relation-${UUID.randomUUID}"

270

271

// Custom group ID prefix (optional)

272

.option("groupIdPrefix", "my-app")

273

// Results in: "my-app-${UUID.randomUUID}-${metadataPath.hashCode}"

274

```