0
# Offset Management
1
2
Comprehensive offset tracking and management system for precise control over Kafka data consumption boundaries, supporting both streaming and batch processing with fault tolerance and exactly-once semantics.
3
4
## Capabilities
5
6
### Offset Range Limits
7
8
Defines desired offset range limits for consuming data from Kafka partitions.
9
10
```scala { .api }
11
/**
12
* Base trait for offset range limits
13
*/
14
sealed trait KafkaOffsetRangeLimit
15
16
/**
17
* Binds to earliest available offsets in partitions
18
*/
19
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit
20
21
/**
22
* Binds to latest available offsets in partitions
23
*/
24
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit
25
26
/**
27
* Binds to specific offset positions per partition
28
* @param partitionOffsets Map of TopicPartition to specific offset
29
*/
30
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit
31
```
32
33
### Offset Range Limit Constants
34
35
```scala { .api }
36
object KafkaOffsetRangeLimit {
37
/** Indicates resolution to latest offset */
38
val LATEST: Long = -1L
39
40
/** Indicates resolution to earliest offset */
41
val EARLIEST: Long = -2L
42
}
43
```
44
45
### Source Offset Management
46
47
Custom offset implementation for Kafka sources containing partition-to-offset mappings.
48
49
```scala { .api }
50
/**
51
* Custom offset for Kafka source containing partition-to-offset mappings
52
* @param partitionToOffsets Map of TopicPartition to offset
53
*/
54
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2 {
55
/** JSON representation of partition offsets */
56
def json: String
57
}
58
59
/**
60
* Companion object for KafkaSourceOffset
61
*/
62
object KafkaSourceOffset {
63
/**
64
* Extracts partition offsets from generic offset
65
* @param offset Generic offset to extract from
66
* @return Map of TopicPartition to offset
67
*/
68
def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long]
69
70
/**
71
* Creates offset from topic-partition-offset tuples
72
* @param offsetTuples Variable arguments of (topic, partition, offset)
73
* @return KafkaSourceOffset instance
74
*/
75
def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset
76
77
/**
78
* Creates offset from JSON representation
79
* @param offset Serialized offset in JSON format
80
* @return KafkaSourceOffset instance
81
*/
82
def apply(offset: SerializedOffset): KafkaSourceOffset
83
}
84
```
85
86
### Partition Offset for Continuous Streaming
87
88
```scala { .api }
89
/**
90
* Represents offset for a specific partition in continuous streaming
91
* @param topicPartition The topic partition
92
* @param partitionOffset The offset within the partition
93
*/
94
case class KafkaSourcePartitionOffset(
95
topicPartition: TopicPartition,
96
partitionOffset: Long
97
) extends PartitionOffset
98
```
99
100
### Offset Reader
101
102
Component for reading offset information from Kafka using the KafkaConsumer API.
103
104
```scala { .api }
105
/**
106
* Component for reading offset information from Kafka
107
*/
108
class KafkaOffsetReader extends Logging {
109
/**
110
* Closes connection to Kafka brokers
111
*/
112
def close(): Unit
113
114
/**
115
* Fetches topic partitions based on consumer strategy
116
* @return Set of TopicPartition objects
117
*/
118
def fetchTopicPartitions(): Set[TopicPartition]
119
120
/**
121
* Resolves specific partition offsets
122
* @param partitionOffsets Map of partitions to offsets (may contain special values)
123
* @param reportDataLoss Function to report data loss
124
* @return KafkaSourceOffset with resolved offsets
125
*/
126
def fetchSpecificOffsets(
127
partitionOffsets: Map[TopicPartition, Long],
128
reportDataLoss: String => Unit
129
): KafkaSourceOffset
130
131
/**
132
* Fetches earliest available offsets for all partitions
133
* @return Map of TopicPartition to earliest offset
134
*/
135
def fetchEarliestOffsets(): Map[TopicPartition, Long]
136
137
/**
138
* Fetches latest available offsets
139
* @param knownOffsets Previously known offsets for comparison
140
* @return Map of TopicPartition to latest offset
141
*/
142
def fetchLatestOffsets(knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap
143
144
/**
145
* Fetches earliest offsets for specific partitions
146
* @param newPartitions Set of partitions to fetch offsets for
147
* @return Map of TopicPartition to earliest offset
148
*/
149
def fetchEarliestOffsets(newPartitions: Set[TopicPartition]): Map[TopicPartition, Long]
150
}
151
152
/**
153
* Companion object for KafkaOffsetReader
154
*/
155
object KafkaOffsetReader {
156
/**
157
* Returns the fixed schema for Kafka records
158
* @return StructType defining Kafka record schema
159
*/
160
def kafkaSchema: StructType
161
}
162
```
163
164
### Offset Range Calculation
165
166
Calculates offset ranges based on minPartitions configuration for parallelism optimization.
167
168
```scala { .api }
169
/**
170
* Calculates offset ranges based on minPartitions configuration
171
* @param minPartitions Minimum number of partitions to create
172
*/
173
class KafkaOffsetRangeCalculator(minPartitions: Option[Int]) {
174
/**
175
* Calculates offset ranges with preferred executor locations
176
* @param fromOffsets Starting offsets per partition
177
* @param untilOffsets Ending offsets per partition
178
* @param executorLocations Available executor locations
179
* @return Sequence of KafkaOffsetRange objects
180
*/
181
def getRanges(
182
fromOffsets: Map[TopicPartition, Long],
183
untilOffsets: Map[TopicPartition, Long],
184
executorLocations: Seq[String]
185
): Seq[KafkaOffsetRange]
186
}
187
188
/**
189
* Companion object for KafkaOffsetRangeCalculator
190
*/
191
object KafkaOffsetRangeCalculator {
192
/**
193
* Creates calculator from DataSource options
194
* @param options DataSource options containing minPartitions
195
* @return KafkaOffsetRangeCalculator instance
196
*/
197
def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator
198
}
199
200
/**
201
* Represents an offset range for a topic partition with preferred executor location
202
* @param topicPartition The topic partition
203
* @param fromOffset Starting offset (inclusive)
204
* @param untilOffset Ending offset (exclusive)
205
* @param preferredLoc Preferred executor location for locality
206
*/
207
case class KafkaOffsetRange(
208
topicPartition: TopicPartition,
209
fromOffset: Long,
210
untilOffset: Long,
211
preferredLoc: Option[String]
212
) {
213
/** Lazy-computed size of the offset range */
214
lazy val size: Long = untilOffset - fromOffset
215
}
216
```
217
218
## Configuration Options
219
220
### Starting Offsets (Streaming)
221
222
```scala
223
// Start from earliest available offsets
224
.option("startingOffsets", "earliest")
225
226
// Start from latest available offsets
227
.option("startingOffsets", "latest")
228
229
// Start from specific offsets (JSON format)
230
.option("startingOffsets", """{"topic1":{"0":123,"1":456},"topic2":{"0":789}}""")
231
```
232
233
### Ending Offsets (Batch Only)
234
235
```scala
236
// Read until latest available offsets
237
.option("endingOffsets", "latest")
238
239
// Read until specific offsets (JSON format)
240
.option("endingOffsets", """{"topic1":{"0":500,"1":600},"topic2":{"0":800}}""")
241
```
242
243
### Data Loss Handling
244
245
```scala
246
// Fail query on data loss (default)
247
.option("failOnDataLoss", "true")
248
249
// Continue processing on data loss
250
.option("failOnDataLoss", "false")
251
```
252
253
## Usage Examples
254
255
### Streaming with Specific Starting Offsets
256
257
```scala
258
val df = spark
259
.readStream
260
.format("kafka")
261
.option("kafka.bootstrap.servers", "localhost:9092")
262
.option("subscribe", "events")
263
.option("startingOffsets", """{"events":{"0":1000,"1":2000}}""")
264
.option("failOnDataLoss", "false")
265
.load()
266
```
267
268
### Batch Processing with Offset Range
269
270
```scala
271
val batchDF = spark
272
.read
273
.format("kafka")
274
.option("kafka.bootstrap.servers", "localhost:9092")
275
.option("subscribe", "transactions")
276
.option("startingOffsets", "earliest")
277
.option("endingOffsets", """{"transactions":{"0":5000,"1":6000}}""")
278
.load()
279
```
280
281
### Micro-batch with Offset Limits
282
283
```scala
284
val query = spark
285
.readStream
286
.format("kafka")
287
.option("kafka.bootstrap.servers", "localhost:9092")
288
.option("subscribe", "clickstream")
289
.option("startingOffsets", "latest")
290
.option("maxOffsetsPerTrigger", "10000")
291
.load()
292
.writeStream
293
.trigger(Trigger.ProcessingTime("30 seconds"))
294
.start()
295
```
296
297
## Fault Tolerance
298
299
### Offset Checkpointing
300
301
Spark automatically checkpoints processed offsets for exactly-once processing:
302
303
```scala
304
val query = spark
305
.readStream
306
.format("kafka")
307
.option("kafka.bootstrap.servers", "localhost:9092")
308
.option("subscribe", "events")
309
.load()
310
.writeStream
311
.option("checkpointLocation", "/path/to/checkpoint")
312
.start()
313
```
314
315
### Data Loss Detection
316
317
The system detects and handles various data loss scenarios:
318
319
- **Topic deletion**: Detects when subscribed topics are deleted
320
- **Partition reassignment**: Handles partition count changes
321
- **Offset expiration**: Detects when requested offsets are no longer available
322
- **Broker failures**: Handles temporary broker unavailability
323
324
### Recovery Strategies
325
326
```scala
327
// Strict mode - fail on any data loss
328
.option("failOnDataLoss", "true")
329
330
// Lenient mode - log warnings and continue
331
.option("failOnDataLoss", "false")
332
```
333
334
## Performance Tuning
335
336
### Partition Parallelism
337
338
```scala
339
// Minimum partitions for parallel processing
340
.option("minPartitions", "20")
341
342
// Rate limiting for streaming
343
.option("maxOffsetsPerTrigger", "1000000")
344
```
345
346
### Memory Management
347
348
```scala
349
// Kafka consumer buffer sizes
350
.option("kafka.receive.buffer.bytes", "65536")
351
.option("kafka.fetch.max.bytes", "52428800")
352
.option("kafka.max.poll.records", "500")
353
```
354
355
## JSON Format Reference
356
357
### Partition Assignment JSON
358
359
```json
360
{
361
"topic1": [0, 1, 2],
362
"topic2": [0, 1]
363
}
364
```
365
366
### Specific Offsets JSON
367
368
```json
369
{
370
"topic1": {
371
"0": 1000,
372
"1": 2000,
373
"2": 3000
374
},
375
"topic2": {
376
"0": 500,
377
"1": 600
378
}
379
}
380
```