0
# Offset Management
1
2
The Kafka connector provides comprehensive offset management capabilities for controlling where to start and stop reading data, including support for specific offsets, timestamps, and automatic offset tracking.
3
4
## Capabilities
5
6
### KafkaOffsetRangeLimit
7
8
Base interface for defining offset boundaries and reading ranges.
9
10
```scala { .api }
11
/**
12
* Represents desired offset range limits for starting, ending, and specific offsets
13
* Used to control the boundaries of Kafka data consumption
14
*/
15
sealed trait KafkaOffsetRangeLimit
16
```
17
18
### EarliestOffsetRangeLimit
19
20
Binds to the earliest available offsets in Kafka topics.
21
22
```scala { .api }
23
/**
24
* Binds to earliest available offsets in Kafka
25
* Starts reading from the beginning of each partition
26
*/
27
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit
28
```
29
30
**Configuration:**
31
32
```scala
33
// For streaming queries
34
.option("startingOffsets", "earliest")
35
36
// For batch queries (default for startingOffsets)
37
.option("startingOffsets", "earliest")
38
.option("endingOffsets", "latest") // Cannot use "earliest" for ending
39
```
40
41
**Usage Examples:**
42
43
```scala
44
// Stream from beginning of topics
45
val stream = spark
46
.readStream
47
.format("kafka")
48
.option("kafka.bootstrap.servers", "localhost:9092")
49
.option("subscribe", "my-topic")
50
.option("startingOffsets", "earliest") // Read from beginning
51
.load()
52
53
// Batch read from earliest to latest
54
val batch = spark
55
.read
56
.format("kafka")
57
.option("kafka.bootstrap.servers", "localhost:9092")
58
.option("subscribe", "my-topic")
59
.option("startingOffsets", "earliest")
60
.option("endingOffsets", "latest")
61
.load()
62
```
63
64
### LatestOffsetRangeLimit
65
66
Binds to the latest available offsets in Kafka topics.
67
68
```scala { .api }
69
/**
70
* Binds to latest available offsets in Kafka
71
* Starts reading from the current end of each partition
72
*/
73
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit
74
```
75
76
**Configuration:**
77
78
```scala
79
// For streaming queries (default for startingOffsets)
80
.option("startingOffsets", "latest")
81
82
// For batch queries (default for endingOffsets)
83
.option("endingOffsets", "latest") // Cannot use "latest" for starting in batch
84
```
85
86
**Usage Examples:**
87
88
```scala
89
// Stream from current position (default behavior)
90
val stream = spark
91
.readStream
92
.format("kafka")
93
.option("kafka.bootstrap.servers", "localhost:9092")
94
.option("subscribe", "my-topic")
95
.option("startingOffsets", "latest") // Start from current end
96
.load()
97
98
// Batch read up to current position
99
val batch = spark
100
.read
101
.format("kafka")
102
.option("kafka.bootstrap.servers", "localhost:9092")
103
.option("subscribe", "my-topic")
104
.option("startingOffsets", "earliest")
105
.option("endingOffsets", "latest") // Read up to current end
106
.load()
107
```
108
109
### SpecificOffsetRangeLimit
110
111
Binds to specific offsets for precise control over reading positions.
112
113
```scala { .api }
114
/**
115
* Binds to specific offsets per partition
116
* Provides precise control over starting/ending positions
117
*
118
* @param partitionOffsets Map of TopicPartition to offset values
119
* -1 = latest offset, -2 = earliest offset
120
*/
121
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit
122
```
123
124
**Configuration:**
125
126
```scala
127
// JSON specification of partition offsets
128
.option("startingOffsets", """{"topic1":{"0":23,"1":345},"topic2":{"0":0}}""")
129
.option("endingOffsets", """{"topic1":{"0":100,"1":500},"topic2":{"0":50}}""")
130
```
131
132
**Usage Examples:**
133
134
```scala
135
// Start from specific offsets
136
val stream = spark
137
.readStream
138
.format("kafka")
139
.option("kafka.bootstrap.servers", "localhost:9092")
140
.option("subscribe", "orders,payments")
141
.option("startingOffsets", """{"orders":{"0":1000,"1":2000},"payments":{"0":500}}""")
142
.load()
143
144
// Batch read with specific ranges
145
val batch = spark
146
.read
147
.format("kafka")
148
.option("kafka.bootstrap.servers", "localhost:9092")
149
.option("subscribe", "my-topic")
150
.option("startingOffsets", """{"my-topic":{"0":100,"1":200}}""")
151
.option("endingOffsets", """{"my-topic":{"0":1000,"1":2000}}""")
152
.load()
153
154
// Mix specific offsets with latest/earliest using special values
155
val mixed = spark
156
.read
157
.format("kafka")
158
.option("kafka.bootstrap.servers", "localhost:9092")
159
.option("subscribe", "my-topic")
160
.option("startingOffsets", """{"my-topic":{"0":-2,"1":500}}""") // -2 = earliest, 500 = specific
161
.option("endingOffsets", """{"my-topic":{"0":-1,"1":1000}}""") // -1 = latest, 1000 = specific
162
.load()
163
```
164
165
**Special Offset Values:**
166
167
```scala { .api }
168
object KafkaOffsetRangeLimit {
169
val LATEST = -1L // Use latest available offset
170
val EARLIEST = -2L // Use earliest available offset
171
}
172
```
173
174
### SpecificTimestampRangeLimit
175
176
Binds to earliest offset with timestamp greater than or equal to specified timestamp per partition.
177
178
```scala { .api }
179
/**
180
* Binds to earliest offset with timestamp >= specified timestamp per partition
181
* Enables time-based offset resolution
182
*
183
* @param topicTimestamps Map of TopicPartition to timestamp values (Unix milliseconds)
184
* @param strategyOnNoMatchingStartingOffset Strategy when no matching timestamp found
185
*/
186
case class SpecificTimestampRangeLimit(
187
topicTimestamps: Map[TopicPartition, Long],
188
strategyOnNoMatchingStartingOffset: StrategyOnNoMatchStartingOffset.Value
189
) extends KafkaOffsetRangeLimit
190
```
191
192
**Configuration:**
193
194
```scala
195
// JSON specification of partition timestamps
196
.option("startingOffsetsByTimestamp", """{"topic1":{"0":1609459200000,"1":1609459200000}}""")
197
.option("endingOffsetsByTimestamp", """{"topic1":{"0":1609545600000,"1":1609545600000}}""")
198
199
// Strategy for when no matching timestamp is found
200
.option("startingOffsetsByTimestampStrategy", "error") // Default: throw error
201
.option("startingOffsetsByTimestampStrategy", "latest") // Use latest offset
202
```
203
204
**Usage Examples:**
205
206
```scala
207
// Start from specific timestamp (January 1, 2021 00:00:00 UTC)
208
val stream = spark
209
.readStream
210
.format("kafka")
211
.option("kafka.bootstrap.servers", "localhost:9092")
212
.option("subscribe", "events")
213
.option("startingOffsetsByTimestamp", """{"events":{"0":1609459200000,"1":1609459200000}}""")
214
.load()
215
216
// Batch read between timestamps
217
val batch = spark
218
.read
219
.format("kafka")
220
.option("kafka.bootstrap.servers", "localhost:9092")
221
.option("subscribe", "logs")
222
.option("startingOffsetsByTimestamp", """{"logs":{"0":1609459200000}}""") // Jan 1, 2021
223
.option("endingOffsetsByTimestamp", """{"logs":{"0":1609545600000}}""") // Jan 2, 2021
224
.load()
225
```
226
227
### GlobalTimestampRangeLimit
228
229
Applies timestamp-based offset resolution to all partitions using a single timestamp.
230
231
```scala { .api }
232
/**
233
* Applies timestamp-based offset resolution to all partitions
234
* Uses single timestamp for all discovered partitions
235
*
236
* @param timestamp Unix timestamp in milliseconds
237
* @param strategyOnNoMatchingStartingOffset Strategy when no matching timestamp found
238
*/
239
case class GlobalTimestampRangeLimit(
240
timestamp: Long,
241
strategyOnNoMatchingStartingOffset: StrategyOnNoMatchStartingOffset.Value
242
) extends KafkaOffsetRangeLimit
243
```
244
245
**Configuration:**
246
247
```scala
248
// Single timestamp applied to all partitions
249
.option("startingTimestamp", "1609459200000") // January 1, 2021 00:00:00 UTC
250
.option("endingTimestamp", "1609545600000") // January 2, 2021 00:00:00 UTC
251
```
252
253
**Usage Examples:**
254
255
```scala
256
// Start all partitions from same timestamp
257
val stream = spark
258
.readStream
259
.format("kafka")
260
.option("kafka.bootstrap.servers", "localhost:9092")
261
.option("subscribe", "user-events")
262
.option("startingTimestamp", "1609459200000") // Applies to all partitions
263
.load()
264
265
// Batch read time range across all partitions
266
val batch = spark
267
.read
268
.format("kafka")
269
.option("kafka.bootstrap.servers", "localhost:9092")
270
.option("subscribePattern", "logs-.*")
271
.option("startingTimestamp", "1609459200000")
272
.option("endingTimestamp", "1609545600000")
273
.load()
274
```
275
276
## KafkaSourceOffset
277
278
Custom offset implementation for tracking partition positions in streaming queries.
279
280
```scala { .api }
281
/**
282
* Custom Offset implementation tracking all partitions and their offsets
283
* Used internally by streaming sources for checkpoint management
284
*
285
* @param partitionToOffsets Map of TopicPartition to current offset
286
*/
287
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends streaming.Offset
288
```
289
290
**Companion Object Methods:**
291
292
```scala { .api }
293
object KafkaSourceOffset {
294
/** Extract partition offsets from generic Offset */
295
def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long]
296
297
/** Create offset from offset tuples */
298
def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset
299
300
/** Create from serialized offset */
301
def apply(offset: SerializedOffset): KafkaSourceOffset
302
303
/** Create from streaming offset */
304
def apply(offset: streaming.Offset): KafkaSourceOffset
305
}
306
```
307
308
### KafkaSourcePartitionOffset
309
310
Represents offset for a specific partition in V2 DataSource API.
311
312
```scala { .api }
313
/**
314
* Represents offset for a specific partition in V2 DataSource API
315
* Used for continuous streaming offset management
316
*
317
* @param topicPartition The topic partition
318
* @param partitionOffset Current offset in the partition
319
*/
320
case class KafkaSourcePartitionOffset(topicPartition: TopicPartition, partitionOffset: Long) extends PartitionOffset
321
```
322
323
## Offset Reading and Management
324
325
### KafkaOffsetReader
326
327
Interface for fetching and managing offsets from Kafka.
328
329
```scala { .api }
330
/**
331
* Base interface for fetching offsets from Kafka
332
* Handles interaction with Kafka brokers for offset management
333
*/
334
trait KafkaOffsetReader {
335
/** Close resources and connections */
336
def close(): Unit
337
338
/** Fetch partition offsets based on range limit */
339
def fetchPartitionOffsets(offsetRangeLimit: KafkaOffsetRangeLimit, isStartingOffsets: Boolean): Map[TopicPartition, Long]
340
341
/** Fetch specific offsets with validation */
342
def fetchSpecificOffsets(partitionOffsets: Map[TopicPartition, Long], reportDataLoss: String => Unit): KafkaSourceOffset
343
344
/** Fetch timestamp-based offsets for specific partitions */
345
def fetchSpecificTimestampBasedOffsets(
346
topicTimestamps: Map[TopicPartition, Long],
347
isStartingOffsets: Boolean,
348
strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value
349
): KafkaSourceOffset
350
351
/** Fetch timestamp-based offsets globally */
352
def fetchGlobalTimestampBasedOffsets(
353
timestamp: Long,
354
isStartingOffsets: Boolean,
355
strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value
356
): KafkaSourceOffset
357
358
/** Fetch earliest available offsets */
359
def fetchEarliestOffsets(): Map[TopicPartition, Long]
360
361
/** Fetch latest available offsets */
362
def fetchLatestOffsets(knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap
363
}
364
```
365
366
## Timestamp Strategy Configuration
367
368
When using timestamp-based offsets, configure the strategy for handling missing timestamps:
369
370
```scala { .api }
371
object StrategyOnNoMatchStartingOffset extends Enumeration {
372
val ERROR = Value // Throw exception when no matching timestamp found (default)
373
val LATEST = Value // Use latest offset when no matching timestamp found
374
}
375
```
376
377
**Configuration:**
378
379
```scala
380
// Default behavior - throw error if timestamp not found
381
.option("startingOffsetsByTimestampStrategy", "error")
382
383
// Fallback to latest offset if timestamp not found
384
.option("startingOffsetsByTimestampStrategy", "latest")
385
```
386
387
## Option Priority
388
389
When multiple offset options are specified, they are processed in priority order:
390
391
1. **Global timestamp**: `startingTimestamp` / `endingTimestamp`
392
2. **Partition timestamps**: `startingOffsetsByTimestamp` / `endingOffsetsByTimestamp`
393
3. **Specific offsets**: `startingOffsets` / `endingOffsets`
394
4. **Default values**: `LatestOffsetRangeLimit` for streaming, `EarliestOffsetRangeLimit` for batch
395
396
## Validation Rules
397
398
### Streaming Query Restrictions
399
400
```scala
401
// These options are invalid for streaming queries
402
.option("endingOffsets", "...") // Not supported
403
.option("endingOffsetsByTimestamp", "...") // Not supported
404
.option("endingTimestamp", "...") // Not supported
405
```
406
407
### Batch Query Restrictions
408
409
```scala
410
// Invalid starting offset for batch
411
.option("startingOffsets", "latest") // Must use "earliest" or specific offsets
412
413
// Invalid ending offset for batch
414
.option("endingOffsets", "earliest") // Must use "latest" or specific offsets
415
416
// Specific offset restrictions
417
.option("startingOffsets", """{"topic":{"0":-1}}""") // -1 (latest) not allowed for starting
418
.option("endingOffsets", """{"topic":{"0":-2}}""") // -2 (earliest) not allowed for ending
419
```