The Flink Kafka 0.8 connector provides comprehensive offset management through ZooKeeper integration and internal utilities for reliable message processing.
Utility class for manual offset management through ZooKeeper.
/**
* Handler for managing Kafka consumer offsets in ZooKeeper
*/
public class ZookeeperOffsetHandler {
/**
* Sets the consumer offset for a specific topic partition in ZooKeeper
* @param curatorClient ZooKeeper client instance
* @param groupId Consumer group identifier
* @param topic Kafka topic name
* @param partition Partition number
* @param offset Offset value to set
* @throws Exception if ZooKeeper operation fails
*/
public static void setOffsetInZooKeeper(
CuratorFramework curatorClient,
String groupId,
String topic,
int partition,
long offset
) throws Exception;
/**
* Retrieves the consumer offset for a specific topic partition from ZooKeeper
* @param curatorClient ZooKeeper client instance
* @param groupId Consumer group identifier
* @param topic Kafka topic name
* @param partition Partition number
* @return Current offset value, or null if not found
* @throws Exception if ZooKeeper operation fails
*/
public static Long getOffsetFromZooKeeper(
CuratorFramework curatorClient,
String groupId,
String topic,
int partition
) throws Exception;
}Usage Example:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
// Create ZooKeeper client
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(
"localhost:2181",
new ExponentialBackoffRetry(1000, 3)
);
zkClient.start();
try {
// Set offset for a specific partition
ZookeeperOffsetHandler.setOffsetInZooKeeper(
zkClient,
"my-consumer-group",
"my-topic",
0, // partition 0
12345L // offset value
);
// Retrieve offset for a partition
Long currentOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(
zkClient,
"my-consumer-group",
"my-topic",
0 // partition 0
);
if (currentOffset != null) {
System.out.println("Current offset: " + currentOffset);
} else {
System.out.println("No offset found");
}
} finally {
zkClient.close();
}Internal thread for periodic offset commits to ZooKeeper.
/**
* Background thread that periodically commits consumer offsets to ZooKeeper
*/
public class PeriodicOffsetCommitter extends Thread {
// Implementation details are internal
// Used automatically by FlinkKafkaConsumer08
}Thread-safe queue with close capability used internally for offset management.
/**
* Thread-safe blocking queue that can be atomically closed
*/
public class ClosableBlockingQueue<E> {
/**
* Creates empty closable queue
*/
public ClosableBlockingQueue();
/**
* Creates closable queue with initial capacity
* @param initialSize Initial capacity hint
*/
public ClosableBlockingQueue(int initialSize);
/**
* Creates closable queue with initial elements
* @param initialElements Initial elements to add
*/
public ClosableBlockingQueue(Collection<? extends E> initialElements);
/**
* Returns current queue size
* @return Number of elements in queue
*/
public int size();
/**
* Checks if queue is empty
* @return true if queue contains no elements
*/
public boolean isEmpty();
/**
* Checks if queue is still open for operations
* @return true if queue accepts new elements
*/
public boolean isOpen();
/**
* Atomically closes the queue, preventing new additions
* @return true if queue was successfully closed
*/
public boolean close();
/**
* Adds element to queue only if queue is open
* @param element Element to add
* @return true if element was added successfully
*/
public boolean addIfOpen(E element);
/**
* Adds element to queue (blocks if closed)
* @param element Element to add
* @throws IllegalStateException if queue is closed
*/
public void add(E element);
/**
* Retrieves but does not remove head element
* @return Head element or null if empty
*/
public E peek();
/**
* Retrieves and removes head element
* @return Head element or null if empty
*/
public E poll();
/**
* Retrieves and removes all available elements
* @return List of all elements removed from queue
*/
public List<E> pollBatch();
/**
* Blocks until element is available, then returns it
* @return Next available element
* @throws InterruptedException if interrupted while waiting
*/
public E getElementBlocking() throws InterruptedException;
/**
* Blocks until element is available or timeout expires
* @param timeoutMillis Maximum wait time in milliseconds
* @return Next available element or null if timeout
* @throws InterruptedException if interrupted while waiting
*/
public E getElementBlocking(long timeoutMillis) throws InterruptedException;
/**
* Blocks until batch of elements is available
* @return List of available elements
* @throws InterruptedException if interrupted while waiting
*/
public List<E> getBatchBlocking() throws InterruptedException;
/**
* Blocks until batch is available or timeout expires
* @param timeoutMillis Maximum wait time in milliseconds
* @return List of available elements or empty list if timeout
* @throws InterruptedException if interrupted while waiting
*/
public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException;
}The consumer automatically manages offsets through Flink's checkpointing:
// Enable checkpointing for automatic offset management
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5 seconds
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("zookeeper.connect", "localhost:2181");
props.setProperty("group.id", "auto-offset-group");
props.setProperty("enable.auto.commit", "false"); // Let Flink manage offsets
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
"my-topic",
new SimpleStringSchema(),
props
);
// Configure starting position
consumer.setStartFromGroupOffsets(); // Default: use committed offsets
// OR
consumer.setStartFromEarliest(); // Start from beginning
// OR
consumer.setStartFromLatest(); // Start from end
// OR
consumer.setStartFromTimestamp(timestamp); // Start from specific time
env.addSource(consumer);For advanced use cases, manually manage offsets:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class ManualOffsetManager {
private final CuratorFramework zkClient;
private final String groupId;
public ManualOffsetManager(String zkConnect, String groupId) {
this.groupId = groupId;
this.zkClient = CuratorFrameworkFactory.newClient(
zkConnect,
new ExponentialBackoffRetry(1000, 3)
);
zkClient.start();
}
public void commitOffset(String topic, int partition, long offset) {
try {
ZookeeperOffsetHandler.setOffsetInZooKeeper(
zkClient, groupId, topic, partition, offset
);
} catch (Exception e) {
throw new RuntimeException("Failed to commit offset", e);
}
}
public Long getCurrentOffset(String topic, int partition) {
try {
return ZookeeperOffsetHandler.getOffsetFromZooKeeper(
zkClient, groupId, topic, partition
);
} catch (Exception e) {
throw new RuntimeException("Failed to get offset", e);
}
}
public void close() {
zkClient.close();
}
}
// Usage
ManualOffsetManager offsetManager = new ManualOffsetManager(
"localhost:2181",
"manual-offset-group"
);
// Check current offset before processing
Long currentOffset = offsetManager.getCurrentOffset("my-topic", 0);
if (currentOffset == null) {
// No previous offset, start from beginning
currentOffset = 0L;
}
// Process messages and commit offset periodically
offsetManager.commitOffset("my-topic", 0, processedOffset);Configure behavior when no committed offset exists:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("zookeeper.connect", "localhost:2181");
props.setProperty("group.id", "reset-strategy-group");
// Configure reset strategy
props.setProperty("auto.offset.reset", "earliest"); // Start from beginning
// OR
props.setProperty("auto.offset.reset", "latest"); // Start from end
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
"my-topic",
new SimpleStringSchema(),
props
);
// Override reset strategy programmatically
consumer.setStartFromEarliest(); // Always start from beginning
consumer.setStartFromLatest(); // Always start from end
consumer.setStartFromGroupOffsets(); // Use committed offsets or auto.offset.resetOffsets are automatically included in Flink checkpoints:
// Configure checkpointing for fault tolerance
env.enableCheckpointing(5000); // Checkpoint interval
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // Min pause
env.getCheckpointConfig().setCheckpointTimeout(60000); // Timeout
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // Max concurrent
// Consumer participates automatically in checkpointing
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
"fault-tolerant-topic",
new SimpleStringSchema(),
props
);
env.addSource(consumer);On recovery from failure:
auto.offset.reset settingHandle offset-related errors:
// Monitor offset commit failures
props.setProperty("consumer.offset.commit.timeout.ms", "5000");
try {
// Offset operations may throw exceptions
Long offset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(
zkClient, groupId, topic, partition
);
} catch (Exception e) {
logger.warn("Failed to retrieve offset from ZooKeeper", e);
// Fall back to default behavior
offset = null;
}