0
# Offset Management
1
2
The Flink Kafka 0.8 connector provides comprehensive offset management through ZooKeeper integration and internal utilities for reliable message processing.
3
4
## Capabilities
5
6
### ZookeeperOffsetHandler
7
8
Utility class for manual offset management through ZooKeeper.
9
10
```java { .api }
11
/**
12
* Handler for managing Kafka consumer offsets in ZooKeeper
13
*/
14
public class ZookeeperOffsetHandler {
15
/**
16
* Sets the consumer offset for a specific topic partition in ZooKeeper
17
* @param curatorClient ZooKeeper client instance
18
* @param groupId Consumer group identifier
19
* @param topic Kafka topic name
20
* @param partition Partition number
21
* @param offset Offset value to set
22
* @throws Exception if ZooKeeper operation fails
23
*/
24
public static void setOffsetInZooKeeper(
25
CuratorFramework curatorClient,
26
String groupId,
27
String topic,
28
int partition,
29
long offset
30
) throws Exception;
31
32
/**
33
* Retrieves the consumer offset for a specific topic partition from ZooKeeper
34
* @param curatorClient ZooKeeper client instance
35
* @param groupId Consumer group identifier
36
* @param topic Kafka topic name
37
* @param partition Partition number
38
* @return Current offset value, or null if not found
39
* @throws Exception if ZooKeeper operation fails
40
*/
41
public static Long getOffsetFromZooKeeper(
42
CuratorFramework curatorClient,
43
String groupId,
44
String topic,
45
int partition
46
) throws Exception;
47
}
48
```
49
50
**Usage Example:**
51
52
```java
53
import org.apache.curator.framework.CuratorFramework;
54
import org.apache.curator.framework.CuratorFrameworkFactory;
55
import org.apache.curator.retry.ExponentialBackoffRetry;
56
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
57
58
// Create ZooKeeper client
59
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(
60
"localhost:2181",
61
new ExponentialBackoffRetry(1000, 3)
62
);
63
zkClient.start();
64
65
try {
66
// Set offset for a specific partition
67
ZookeeperOffsetHandler.setOffsetInZooKeeper(
68
zkClient,
69
"my-consumer-group",
70
"my-topic",
71
0, // partition 0
72
12345L // offset value
73
);
74
75
// Retrieve offset for a partition
76
Long currentOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(
77
zkClient,
78
"my-consumer-group",
79
"my-topic",
80
0 // partition 0
81
);
82
83
if (currentOffset != null) {
84
System.out.println("Current offset: " + currentOffset);
85
} else {
86
System.out.println("No offset found");
87
}
88
89
} finally {
90
zkClient.close();
91
}
92
```
93
94
### PeriodicOffsetCommitter
95
96
Internal thread for periodic offset commits to ZooKeeper.
97
98
```java { .api }
99
/**
100
* Background thread that periodically commits consumer offsets to ZooKeeper
101
*/
102
public class PeriodicOffsetCommitter extends Thread {
103
// Implementation details are internal
104
// Used automatically by FlinkKafkaConsumer08
105
}
106
```
107
108
### ClosableBlockingQueue
109
110
Thread-safe queue with close capability used internally for offset management.
111
112
```java { .api }
113
/**
114
* Thread-safe blocking queue that can be atomically closed
115
*/
116
public class ClosableBlockingQueue<E> {
117
/**
118
* Creates empty closable queue
119
*/
120
public ClosableBlockingQueue();
121
122
/**
123
* Creates closable queue with initial capacity
124
* @param initialSize Initial capacity hint
125
*/
126
public ClosableBlockingQueue(int initialSize);
127
128
/**
129
* Creates closable queue with initial elements
130
* @param initialElements Initial elements to add
131
*/
132
public ClosableBlockingQueue(Collection<? extends E> initialElements);
133
134
/**
135
* Returns current queue size
136
* @return Number of elements in queue
137
*/
138
public int size();
139
140
/**
141
* Checks if queue is empty
142
* @return true if queue contains no elements
143
*/
144
public boolean isEmpty();
145
146
/**
147
* Checks if queue is still open for operations
148
* @return true if queue accepts new elements
149
*/
150
public boolean isOpen();
151
152
/**
153
* Atomically closes the queue, preventing new additions
154
* @return true if queue was successfully closed
155
*/
156
public boolean close();
157
158
/**
159
* Adds element to queue only if queue is open
160
* @param element Element to add
161
* @return true if element was added successfully
162
*/
163
public boolean addIfOpen(E element);
164
165
/**
166
* Adds element to queue (blocks if closed)
167
* @param element Element to add
168
* @throws IllegalStateException if queue is closed
169
*/
170
public void add(E element);
171
172
/**
173
* Retrieves but does not remove head element
174
* @return Head element or null if empty
175
*/
176
public E peek();
177
178
/**
179
* Retrieves and removes head element
180
* @return Head element or null if empty
181
*/
182
public E poll();
183
184
/**
185
* Retrieves and removes all available elements
186
* @return List of all elements removed from queue
187
*/
188
public List<E> pollBatch();
189
190
/**
191
* Blocks until element is available, then returns it
192
* @return Next available element
193
* @throws InterruptedException if interrupted while waiting
194
*/
195
public E getElementBlocking() throws InterruptedException;
196
197
/**
198
* Blocks until element is available or timeout expires
199
* @param timeoutMillis Maximum wait time in milliseconds
200
* @return Next available element or null if timeout
201
* @throws InterruptedException if interrupted while waiting
202
*/
203
public E getElementBlocking(long timeoutMillis) throws InterruptedException;
204
205
/**
206
* Blocks until batch of elements is available
207
* @return List of available elements
208
* @throws InterruptedException if interrupted while waiting
209
*/
210
public List<E> getBatchBlocking() throws InterruptedException;
211
212
/**
213
* Blocks until batch is available or timeout expires
214
* @param timeoutMillis Maximum wait time in milliseconds
215
* @return List of available elements or empty list if timeout
216
* @throws InterruptedException if interrupted while waiting
217
*/
218
public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException;
219
}
220
```
221
222
## Offset Management Strategies
223
224
### Automatic Offset Management
225
226
The consumer automatically manages offsets through Flink's checkpointing:
227
228
```java
229
// Enable checkpointing for automatic offset management
230
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
231
env.enableCheckpointing(5000); // checkpoint every 5 seconds
232
233
Properties props = new Properties();
234
props.setProperty("bootstrap.servers", "localhost:9092");
235
props.setProperty("zookeeper.connect", "localhost:2181");
236
props.setProperty("group.id", "auto-offset-group");
237
props.setProperty("enable.auto.commit", "false"); // Let Flink manage offsets
238
239
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
240
"my-topic",
241
new SimpleStringSchema(),
242
props
243
);
244
245
// Configure starting position
246
consumer.setStartFromGroupOffsets(); // Default: use committed offsets
247
// OR
248
consumer.setStartFromEarliest(); // Start from beginning
249
// OR
250
consumer.setStartFromLatest(); // Start from end
251
// OR
252
consumer.setStartFromTimestamp(timestamp); // Start from specific time
253
254
env.addSource(consumer);
255
```
256
257
### Manual Offset Management
258
259
For advanced use cases, manually manage offsets:
260
261
```java
262
import org.apache.curator.framework.CuratorFramework;
263
import org.apache.curator.framework.CuratorFrameworkFactory;
264
import org.apache.curator.retry.ExponentialBackoffRetry;
265
266
public class ManualOffsetManager {
267
private final CuratorFramework zkClient;
268
private final String groupId;
269
270
public ManualOffsetManager(String zkConnect, String groupId) {
271
this.groupId = groupId;
272
this.zkClient = CuratorFrameworkFactory.newClient(
273
zkConnect,
274
new ExponentialBackoffRetry(1000, 3)
275
);
276
zkClient.start();
277
}
278
279
public void commitOffset(String topic, int partition, long offset) {
280
try {
281
ZookeeperOffsetHandler.setOffsetInZooKeeper(
282
zkClient, groupId, topic, partition, offset
283
);
284
} catch (Exception e) {
285
throw new RuntimeException("Failed to commit offset", e);
286
}
287
}
288
289
public Long getCurrentOffset(String topic, int partition) {
290
try {
291
return ZookeeperOffsetHandler.getOffsetFromZooKeeper(
292
zkClient, groupId, topic, partition
293
);
294
} catch (Exception e) {
295
throw new RuntimeException("Failed to get offset", e);
296
}
297
}
298
299
public void close() {
300
zkClient.close();
301
}
302
}
303
304
// Usage
305
ManualOffsetManager offsetManager = new ManualOffsetManager(
306
"localhost:2181",
307
"manual-offset-group"
308
);
309
310
// Check current offset before processing
311
Long currentOffset = offsetManager.getCurrentOffset("my-topic", 0);
312
if (currentOffset == null) {
313
// No previous offset, start from beginning
314
currentOffset = 0L;
315
}
316
317
// Process messages and commit offset periodically
318
offsetManager.commitOffset("my-topic", 0, processedOffset);
319
```
320
321
### Offset Reset Strategies
322
323
Configure behavior when no committed offset exists:
324
325
```java
326
Properties props = new Properties();
327
props.setProperty("bootstrap.servers", "localhost:9092");
328
props.setProperty("zookeeper.connect", "localhost:2181");
329
props.setProperty("group.id", "reset-strategy-group");
330
331
// Configure reset strategy
332
props.setProperty("auto.offset.reset", "earliest"); // Start from beginning
333
// OR
334
props.setProperty("auto.offset.reset", "latest"); // Start from end
335
336
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
337
"my-topic",
338
new SimpleStringSchema(),
339
props
340
);
341
342
// Override reset strategy programmatically
343
consumer.setStartFromEarliest(); // Always start from beginning
344
consumer.setStartFromLatest(); // Always start from end
345
consumer.setStartFromGroupOffsets(); // Use committed offsets or auto.offset.reset
346
```
347
348
## Fault Tolerance
349
350
### Checkpointing Integration
351
352
Offsets are automatically included in Flink checkpoints:
353
354
```java
355
// Configure checkpointing for fault tolerance
356
env.enableCheckpointing(5000); // Checkpoint interval
357
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // Min pause
358
env.getCheckpointConfig().setCheckpointTimeout(60000); // Timeout
359
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // Max concurrent
360
361
// Consumer participates automatically in checkpointing
362
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
363
"fault-tolerant-topic",
364
new SimpleStringSchema(),
365
props
366
);
367
368
env.addSource(consumer);
369
```
370
371
### Recovery Behavior
372
373
On recovery from failure:
374
375
1. **Checkpoint Recovery**: Offsets restored from last successful checkpoint
376
2. **ZooKeeper Fallback**: If no checkpoint, use committed offsets in ZooKeeper
377
3. **Reset Strategy**: If no committed offsets, use `auto.offset.reset` setting
378
379
### Error Handling
380
381
Handle offset-related errors:
382
383
```java
384
// Monitor offset commit failures
385
props.setProperty("consumer.offset.commit.timeout.ms", "5000");
386
387
try {
388
// Offset operations may throw exceptions
389
Long offset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(
390
zkClient, groupId, topic, partition
391
);
392
} catch (Exception e) {
393
logger.warn("Failed to retrieve offset from ZooKeeper", e);
394
// Fall back to default behavior
395
offset = null;
396
}
397
```
398
399
## Performance Considerations
400
401
- **Commit Frequency**: Balance between performance and durability
402
- **ZooKeeper Load**: Minimize ZooKeeper operations for high-throughput scenarios
403
- **Batch Processing**: Use batch operations when possible
404
- **Connection Pooling**: Reuse ZooKeeper connections across operations
405
- **Monitoring**: Track offset lag and commit latency