or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconfiguration.mdconsumer-strategies.mddata-writing.mdindex.mdoffset-management.mdstreaming-sources.md

offset-management.mddocs/

0

# Offset Management

1

2

Comprehensive offset tracking and management system for precise control over Kafka data consumption boundaries, supporting both streaming and batch processing with fault tolerance and exactly-once semantics.

3

4

## Capabilities

5

6

### Offset Range Limits

7

8

Defines desired offset range limits for consuming data from Kafka partitions.

9

10

```scala { .api }

11

/**

12

* Base trait for offset range limits

13

*/

14

sealed trait KafkaOffsetRangeLimit

15

16

/**

17

* Binds to earliest available offsets in partitions

18

*/

19

case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit

20

21

/**

22

* Binds to latest available offsets in partitions

23

*/

24

case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit

25

26

/**

27

* Binds to specific offset positions per partition

28

* @param partitionOffsets Map of TopicPartition to specific offset

29

*/

30

case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit

31

```

32

33

### Offset Range Limit Constants

34

35

```scala { .api }

36

object KafkaOffsetRangeLimit {

37

/** Indicates resolution to latest offset */

38

val LATEST: Long = -1L

39

40

/** Indicates resolution to earliest offset */

41

val EARLIEST: Long = -2L

42

}

43

```

44

45

### Source Offset Management

46

47

Custom offset implementation for Kafka sources containing partition-to-offset mappings.

48

49

```scala { .api }

50

/**

51

* Custom offset for Kafka source containing partition-to-offset mappings

52

* @param partitionToOffsets Map of TopicPartition to offset

53

*/

54

case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2 {

55

/** JSON representation of partition offsets */

56

def json: String

57

}

58

59

/**

60

* Companion object for KafkaSourceOffset

61

*/

62

object KafkaSourceOffset {

63

/**

64

* Extracts partition offsets from generic offset

65

* @param offset Generic offset to extract from

66

* @return Map of TopicPartition to offset

67

*/

68

def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long]

69

70

/**

71

* Creates offset from topic-partition-offset tuples

72

* @param offsetTuples Variable arguments of (topic, partition, offset)

73

* @return KafkaSourceOffset instance

74

*/

75

def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset

76

77

/**

78

* Creates offset from JSON representation

79

* @param offset Serialized offset in JSON format

80

* @return KafkaSourceOffset instance

81

*/

82

def apply(offset: SerializedOffset): KafkaSourceOffset

83

}

84

```

85

86

### Partition Offset for Continuous Streaming

87

88

```scala { .api }

89

/**

90

* Represents offset for a specific partition in continuous streaming

91

* @param topicPartition The topic partition

92

* @param partitionOffset The offset within the partition

93

*/

94

case class KafkaSourcePartitionOffset(

95

topicPartition: TopicPartition,

96

partitionOffset: Long

97

) extends PartitionOffset

98

```

99

100

### Offset Reader

101

102

Component for reading offset information from Kafka using the KafkaConsumer API.

103

104

```scala { .api }

105

/**

106

* Component for reading offset information from Kafka

107

*/

108

class KafkaOffsetReader extends Logging {

109

/**

110

* Closes connection to Kafka brokers

111

*/

112

def close(): Unit

113

114

/**

115

* Fetches topic partitions based on consumer strategy

116

* @return Set of TopicPartition objects

117

*/

118

def fetchTopicPartitions(): Set[TopicPartition]

119

120

/**

121

* Resolves specific partition offsets

122

* @param partitionOffsets Map of partitions to offsets (may contain special values)

123

* @param reportDataLoss Function to report data loss

124

* @return KafkaSourceOffset with resolved offsets

125

*/

126

def fetchSpecificOffsets(

127

partitionOffsets: Map[TopicPartition, Long],

128

reportDataLoss: String => Unit

129

): KafkaSourceOffset

130

131

/**

132

* Fetches earliest available offsets for all partitions

133

* @return Map of TopicPartition to earliest offset

134

*/

135

def fetchEarliestOffsets(): Map[TopicPartition, Long]

136

137

/**

138

* Fetches latest available offsets

139

* @param knownOffsets Previously known offsets for comparison

140

* @return Map of TopicPartition to latest offset

141

*/

142

def fetchLatestOffsets(knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap

143

144

/**

145

* Fetches earliest offsets for specific partitions

146

* @param newPartitions Set of partitions to fetch offsets for

147

* @return Map of TopicPartition to earliest offset

148

*/

149

def fetchEarliestOffsets(newPartitions: Set[TopicPartition]): Map[TopicPartition, Long]

150

}

151

152

/**

153

* Companion object for KafkaOffsetReader

154

*/

155

object KafkaOffsetReader {

156

/**

157

* Returns the fixed schema for Kafka records

158

* @return StructType defining Kafka record schema

159

*/

160

def kafkaSchema: StructType

161

}

162

```

163

164

### Offset Range Calculation

165

166

Calculates offset ranges based on minPartitions configuration for parallelism optimization.

167

168

```scala { .api }

169

/**

170

* Calculates offset ranges based on minPartitions configuration

171

* @param minPartitions Minimum number of partitions to create

172

*/

173

class KafkaOffsetRangeCalculator(minPartitions: Option[Int]) {

174

/**

175

* Calculates offset ranges with preferred executor locations

176

* @param fromOffsets Starting offsets per partition

177

* @param untilOffsets Ending offsets per partition

178

* @param executorLocations Available executor locations

179

* @return Sequence of KafkaOffsetRange objects

180

*/

181

def getRanges(

182

fromOffsets: Map[TopicPartition, Long],

183

untilOffsets: Map[TopicPartition, Long],

184

executorLocations: Seq[String]

185

): Seq[KafkaOffsetRange]

186

}

187

188

/**

189

* Companion object for KafkaOffsetRangeCalculator

190

*/

191

object KafkaOffsetRangeCalculator {

192

/**

193

* Creates calculator from DataSource options

194

* @param options DataSource options containing minPartitions

195

* @return KafkaOffsetRangeCalculator instance

196

*/

197

def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator

198

}

199

200

/**

201

* Represents an offset range for a topic partition with preferred executor location

202

* @param topicPartition The topic partition

203

* @param fromOffset Starting offset (inclusive)

204

* @param untilOffset Ending offset (exclusive)

205

* @param preferredLoc Preferred executor location for locality

206

*/

207

case class KafkaOffsetRange(

208

topicPartition: TopicPartition,

209

fromOffset: Long,

210

untilOffset: Long,

211

preferredLoc: Option[String]

212

) {

213

/** Lazy-computed size of the offset range */

214

lazy val size: Long = untilOffset - fromOffset

215

}

216

```

217

218

## Configuration Options

219

220

### Starting Offsets (Streaming)

221

222

```scala

223

// Start from earliest available offsets

224

.option("startingOffsets", "earliest")

225

226

// Start from latest available offsets

227

.option("startingOffsets", "latest")

228

229

// Start from specific offsets (JSON format)

230

.option("startingOffsets", """{"topic1":{"0":123,"1":456},"topic2":{"0":789}}""")

231

```

232

233

### Ending Offsets (Batch Only)

234

235

```scala

236

// Read until latest available offsets

237

.option("endingOffsets", "latest")

238

239

// Read until specific offsets (JSON format)

240

.option("endingOffsets", """{"topic1":{"0":500,"1":600},"topic2":{"0":800}}""")

241

```

242

243

### Data Loss Handling

244

245

```scala

246

// Fail query on data loss (default)

247

.option("failOnDataLoss", "true")

248

249

// Continue processing on data loss

250

.option("failOnDataLoss", "false")

251

```

252

253

## Usage Examples

254

255

### Streaming with Specific Starting Offsets

256

257

```scala

258

val df = spark

259

.readStream

260

.format("kafka")

261

.option("kafka.bootstrap.servers", "localhost:9092")

262

.option("subscribe", "events")

263

.option("startingOffsets", """{"events":{"0":1000,"1":2000}}""")

264

.option("failOnDataLoss", "false")

265

.load()

266

```

267

268

### Batch Processing with Offset Range

269

270

```scala

271

val batchDF = spark

272

.read

273

.format("kafka")

274

.option("kafka.bootstrap.servers", "localhost:9092")

275

.option("subscribe", "transactions")

276

.option("startingOffsets", "earliest")

277

.option("endingOffsets", """{"transactions":{"0":5000,"1":6000}}""")

278

.load()

279

```

280

281

### Micro-batch with Offset Limits

282

283

```scala

284

val query = spark

285

.readStream

286

.format("kafka")

287

.option("kafka.bootstrap.servers", "localhost:9092")

288

.option("subscribe", "clickstream")

289

.option("startingOffsets", "latest")

290

.option("maxOffsetsPerTrigger", "10000")

291

.load()

292

.writeStream

293

.trigger(Trigger.ProcessingTime("30 seconds"))

294

.start()

295

```

296

297

## Fault Tolerance

298

299

### Offset Checkpointing

300

301

Spark automatically checkpoints processed offsets for exactly-once processing:

302

303

```scala

304

val query = spark

305

.readStream

306

.format("kafka")

307

.option("kafka.bootstrap.servers", "localhost:9092")

308

.option("subscribe", "events")

309

.load()

310

.writeStream

311

.option("checkpointLocation", "/path/to/checkpoint")

312

.start()

313

```

314

315

### Data Loss Detection

316

317

The system detects and handles various data loss scenarios:

318

319

- **Topic deletion**: Detects when subscribed topics are deleted

320

- **Partition reassignment**: Handles partition count changes

321

- **Offset expiration**: Detects when requested offsets are no longer available

322

- **Broker failures**: Handles temporary broker unavailability

323

324

### Recovery Strategies

325

326

```scala

327

// Strict mode - fail on any data loss

328

.option("failOnDataLoss", "true")

329

330

// Lenient mode - log warnings and continue

331

.option("failOnDataLoss", "false")

332

```

333

334

## Performance Tuning

335

336

### Partition Parallelism

337

338

```scala

339

// Minimum partitions for parallel processing

340

.option("minPartitions", "20")

341

342

// Rate limiting for streaming

343

.option("maxOffsetsPerTrigger", "1000000")

344

```

345

346

### Memory Management

347

348

```scala

349

// Kafka consumer buffer sizes

350

.option("kafka.receive.buffer.bytes", "65536")

351

.option("kafka.fetch.max.bytes", "52428800")

352

.option("kafka.max.poll.records", "500")

353

```

354

355

## JSON Format Reference

356

357

### Partition Assignment JSON

358

359

```json

360

{

361

"topic1": [0, 1, 2],

362

"topic2": [0, 1]

363

}

364

```

365

366

### Specific Offsets JSON

367

368

```json

369

{

370

"topic1": {

371

"0": 1000,

372

"1": 2000,

373

"2": 3000

374

},

375

"topic2": {

376

"0": 500,

377

"1": 600

378

}

379

}

380

```