Apache Kafka 0.8.x connector for Apache Flink streaming data processing
The Flink Kafka 0.8 connector provides comprehensive offset management through ZooKeeper integration and internal utilities for reliable message processing.
Utility class for manual offset management through ZooKeeper.
/**
* Handler for managing Kafka consumer offsets in ZooKeeper
*/
public class ZookeeperOffsetHandler {
/**
* Sets the consumer offset for a specific topic partition in ZooKeeper
* @param curatorClient ZooKeeper client instance
* @param groupId Consumer group identifier
* @param topic Kafka topic name
* @param partition Partition number
* @param offset Offset value to set
* @throws Exception if ZooKeeper operation fails
*/
public static void setOffsetInZooKeeper(
CuratorFramework curatorClient,
String groupId,
String topic,
int partition,
long offset
) throws Exception;
/**
* Retrieves the consumer offset for a specific topic partition from ZooKeeper
* @param curatorClient ZooKeeper client instance
* @param groupId Consumer group identifier
* @param topic Kafka topic name
* @param partition Partition number
* @return Current offset value, or null if not found
* @throws Exception if ZooKeeper operation fails
*/
public static Long getOffsetFromZooKeeper(
CuratorFramework curatorClient,
String groupId,
String topic,
int partition
) throws Exception;
}Usage Example:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
// Create ZooKeeper client
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(
"localhost:2181",
new ExponentialBackoffRetry(1000, 3)
);
zkClient.start();
try {
// Set offset for a specific partition
ZookeeperOffsetHandler.setOffsetInZooKeeper(
zkClient,
"my-consumer-group",
"my-topic",
0, // partition 0
12345L // offset value
);
// Retrieve offset for a partition
Long currentOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(
zkClient,
"my-consumer-group",
"my-topic",
0 // partition 0
);
if (currentOffset != null) {
System.out.println("Current offset: " + currentOffset);
} else {
System.out.println("No offset found");
}
} finally {
zkClient.close();
}Internal thread for periodic offset commits to ZooKeeper.
/**
* Background thread that periodically commits consumer offsets to ZooKeeper
*/
public class PeriodicOffsetCommitter extends Thread {
// Implementation details are internal
// Used automatically by FlinkKafkaConsumer08
}Thread-safe queue with close capability used internally for offset management.
/**
* Thread-safe blocking queue that can be atomically closed
*/
public class ClosableBlockingQueue<E> {
/**
* Creates empty closable queue
*/
public ClosableBlockingQueue();
/**
* Creates closable queue with initial capacity
* @param initialSize Initial capacity hint
*/
public ClosableBlockingQueue(int initialSize);
/**
* Creates closable queue with initial elements
* @param initialElements Initial elements to add
*/
public ClosableBlockingQueue(Collection<? extends E> initialElements);
/**
* Returns current queue size
* @return Number of elements in queue
*/
public int size();
/**
* Checks if queue is empty
* @return true if queue contains no elements
*/
public boolean isEmpty();
/**
* Checks if queue is still open for operations
* @return true if queue accepts new elements
*/
public boolean isOpen();
/**
* Atomically closes the queue, preventing new additions
* @return true if queue was successfully closed
*/
public boolean close();
/**
* Adds element to queue only if queue is open
* @param element Element to add
* @return true if element was added successfully
*/
public boolean addIfOpen(E element);
/**
* Adds element to queue (blocks if closed)
* @param element Element to add
* @throws IllegalStateException if queue is closed
*/
public void add(E element);
/**
* Retrieves but does not remove head element
* @return Head element or null if empty
*/
public E peek();
/**
* Retrieves and removes head element
* @return Head element or null if empty
*/
public E poll();
/**
* Retrieves and removes all available elements
* @return List of all elements removed from queue
*/
public List<E> pollBatch();
/**
* Blocks until element is available, then returns it
* @return Next available element
* @throws InterruptedException if interrupted while waiting
*/
public E getElementBlocking() throws InterruptedException;
/**
* Blocks until element is available or timeout expires
* @param timeoutMillis Maximum wait time in milliseconds
* @return Next available element or null if timeout
* @throws InterruptedException if interrupted while waiting
*/
public E getElementBlocking(long timeoutMillis) throws InterruptedException;
/**
* Blocks until batch of elements is available
* @return List of available elements
* @throws InterruptedException if interrupted while waiting
*/
public List<E> getBatchBlocking() throws InterruptedException;
/**
* Blocks until batch is available or timeout expires
* @param timeoutMillis Maximum wait time in milliseconds
* @return List of available elements or empty list if timeout
* @throws InterruptedException if interrupted while waiting
*/
public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException;
}The consumer automatically manages offsets through Flink's checkpointing:
// Enable checkpointing for automatic offset management
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5 seconds
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("zookeeper.connect", "localhost:2181");
props.setProperty("group.id", "auto-offset-group");
props.setProperty("enable.auto.commit", "false"); // Let Flink manage offsets
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
"my-topic",
new SimpleStringSchema(),
props
);
// Configure starting position
consumer.setStartFromGroupOffsets(); // Default: use committed offsets
// OR
consumer.setStartFromEarliest(); // Start from beginning
// OR
consumer.setStartFromLatest(); // Start from end
// OR
consumer.setStartFromTimestamp(timestamp); // Start from specific time
env.addSource(consumer);For advanced use cases, manually manage offsets:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class ManualOffsetManager {
private final CuratorFramework zkClient;
private final String groupId;
public ManualOffsetManager(String zkConnect, String groupId) {
this.groupId = groupId;
this.zkClient = CuratorFrameworkFactory.newClient(
zkConnect,
new ExponentialBackoffRetry(1000, 3)
);
zkClient.start();
}
public void commitOffset(String topic, int partition, long offset) {
try {
ZookeeperOffsetHandler.setOffsetInZooKeeper(
zkClient, groupId, topic, partition, offset
);
} catch (Exception e) {
throw new RuntimeException("Failed to commit offset", e);
}
}
public Long getCurrentOffset(String topic, int partition) {
try {
return ZookeeperOffsetHandler.getOffsetFromZooKeeper(
zkClient, groupId, topic, partition
);
} catch (Exception e) {
throw new RuntimeException("Failed to get offset", e);
}
}
public void close() {
zkClient.close();
}
}
// Usage
ManualOffsetManager offsetManager = new ManualOffsetManager(
"localhost:2181",
"manual-offset-group"
);
// Check current offset before processing
Long currentOffset = offsetManager.getCurrentOffset("my-topic", 0);
if (currentOffset == null) {
// No previous offset, start from beginning
currentOffset = 0L;
}
// Process messages and commit offset periodically
offsetManager.commitOffset("my-topic", 0, processedOffset);Configure behavior when no committed offset exists:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("zookeeper.connect", "localhost:2181");
props.setProperty("group.id", "reset-strategy-group");
// Configure reset strategy
props.setProperty("auto.offset.reset", "earliest"); // Start from beginning
// OR
props.setProperty("auto.offset.reset", "latest"); // Start from end
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
"my-topic",
new SimpleStringSchema(),
props
);
// Override reset strategy programmatically
consumer.setStartFromEarliest(); // Always start from beginning
consumer.setStartFromLatest(); // Always start from end
consumer.setStartFromGroupOffsets(); // Use committed offsets or auto.offset.resetOffsets are automatically included in Flink checkpoints:
// Configure checkpointing for fault tolerance
env.enableCheckpointing(5000); // Checkpoint interval
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // Min pause
env.getCheckpointConfig().setCheckpointTimeout(60000); // Timeout
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // Max concurrent
// Consumer participates automatically in checkpointing
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
"fault-tolerant-topic",
new SimpleStringSchema(),
props
);
env.addSource(consumer);On recovery from failure:
auto.offset.reset settingHandle offset-related errors:
// Monitor offset commit failures
props.setProperty("consumer.offset.commit.timeout.ms", "5000");
try {
// Offset operations may throw exceptions
Long offset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(
zkClient, groupId, topic, partition
);
} catch (Exception e) {
logger.warn("Failed to retrieve offset from ZooKeeper", e);
// Fall back to default behavior
offset = null;
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-kafka-0-8-2-10