or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdoffset-management.mdtable-api.md
tile.json

offset-management.mddocs/

Offset Management

The Flink Kafka 0.8 connector provides comprehensive offset management through ZooKeeper integration and internal utilities for reliable message processing.

Capabilities

ZookeeperOffsetHandler

Utility class for manual offset management through ZooKeeper.

/**
 * Handler for managing Kafka consumer offsets in ZooKeeper
 */
public class ZookeeperOffsetHandler {
    /**
     * Sets the consumer offset for a specific topic partition in ZooKeeper
     * @param curatorClient ZooKeeper client instance
     * @param groupId Consumer group identifier
     * @param topic Kafka topic name
     * @param partition Partition number
     * @param offset Offset value to set
     * @throws Exception if ZooKeeper operation fails
     */
    public static void setOffsetInZooKeeper(
        CuratorFramework curatorClient, 
        String groupId, 
        String topic, 
        int partition, 
        long offset
    ) throws Exception;
    
    /**
     * Retrieves the consumer offset for a specific topic partition from ZooKeeper
     * @param curatorClient ZooKeeper client instance
     * @param groupId Consumer group identifier
     * @param topic Kafka topic name
     * @param partition Partition number
     * @return Current offset value, or null if not found
     * @throws Exception if ZooKeeper operation fails
     */
    public static Long getOffsetFromZooKeeper(
        CuratorFramework curatorClient, 
        String groupId, 
        String topic, 
        int partition
    ) throws Exception;
}

Usage Example:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;

// Create ZooKeeper client
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(
    "localhost:2181",
    new ExponentialBackoffRetry(1000, 3)
);
zkClient.start();

try {
    // Set offset for a specific partition
    ZookeeperOffsetHandler.setOffsetInZooKeeper(
        zkClient,
        "my-consumer-group",
        "my-topic",
        0,        // partition 0
        12345L    // offset value
    );
    
    // Retrieve offset for a partition
    Long currentOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(
        zkClient,
        "my-consumer-group",
        "my-topic",
        0         // partition 0
    );
    
    if (currentOffset != null) {
        System.out.println("Current offset: " + currentOffset);
    } else {
        System.out.println("No offset found");
    }
    
} finally {
    zkClient.close();
}

PeriodicOffsetCommitter

Internal thread for periodic offset commits to ZooKeeper.

/**
 * Background thread that periodically commits consumer offsets to ZooKeeper
 */
public class PeriodicOffsetCommitter extends Thread {
    // Implementation details are internal
    // Used automatically by FlinkKafkaConsumer08
}

ClosableBlockingQueue

Thread-safe queue with close capability used internally for offset management.

/**
 * Thread-safe blocking queue that can be atomically closed
 */
public class ClosableBlockingQueue<E> {
    /**
     * Creates empty closable queue
     */
    public ClosableBlockingQueue();
    
    /**
     * Creates closable queue with initial capacity
     * @param initialSize Initial capacity hint
     */
    public ClosableBlockingQueue(int initialSize);
    
    /**
     * Creates closable queue with initial elements
     * @param initialElements Initial elements to add
     */
    public ClosableBlockingQueue(Collection<? extends E> initialElements);
    
    /**
     * Returns current queue size
     * @return Number of elements in queue
     */
    public int size();
    
    /**
     * Checks if queue is empty
     * @return true if queue contains no elements
     */
    public boolean isEmpty();
    
    /**
     * Checks if queue is still open for operations
     * @return true if queue accepts new elements
     */
    public boolean isOpen();
    
    /**
     * Atomically closes the queue, preventing new additions
     * @return true if queue was successfully closed
     */
    public boolean close();
    
    /**
     * Adds element to queue only if queue is open
     * @param element Element to add
     * @return true if element was added successfully
     */
    public boolean addIfOpen(E element);
    
    /**
     * Adds element to queue (blocks if closed)
     * @param element Element to add
     * @throws IllegalStateException if queue is closed
     */
    public void add(E element);
    
    /**
     * Retrieves but does not remove head element
     * @return Head element or null if empty
     */
    public E peek();
    
    /**
     * Retrieves and removes head element
     * @return Head element or null if empty
     */
    public E poll();
    
    /**
     * Retrieves and removes all available elements
     * @return List of all elements removed from queue
     */
    public List<E> pollBatch();
    
    /**
     * Blocks until element is available, then returns it
     * @return Next available element
     * @throws InterruptedException if interrupted while waiting
     */
    public E getElementBlocking() throws InterruptedException;
    
    /**
     * Blocks until element is available or timeout expires
     * @param timeoutMillis Maximum wait time in milliseconds
     * @return Next available element or null if timeout
     * @throws InterruptedException if interrupted while waiting
     */
    public E getElementBlocking(long timeoutMillis) throws InterruptedException;
    
    /**
     * Blocks until batch of elements is available
     * @return List of available elements
     * @throws InterruptedException if interrupted while waiting
     */
    public List<E> getBatchBlocking() throws InterruptedException;
    
    /**
     * Blocks until batch is available or timeout expires
     * @param timeoutMillis Maximum wait time in milliseconds
     * @return List of available elements or empty list if timeout
     * @throws InterruptedException if interrupted while waiting
     */
    public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException;
}

Offset Management Strategies

Automatic Offset Management

The consumer automatically manages offsets through Flink's checkpointing:

// Enable checkpointing for automatic offset management
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5 seconds

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("zookeeper.connect", "localhost:2181");
props.setProperty("group.id", "auto-offset-group");
props.setProperty("enable.auto.commit", "false"); // Let Flink manage offsets

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
    "my-topic",
    new SimpleStringSchema(),
    props
);

// Configure starting position
consumer.setStartFromGroupOffsets(); // Default: use committed offsets
// OR
consumer.setStartFromEarliest();     // Start from beginning
// OR
consumer.setStartFromLatest();       // Start from end
// OR
consumer.setStartFromTimestamp(timestamp); // Start from specific time

env.addSource(consumer);

Manual Offset Management

For advanced use cases, manually manage offsets:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class ManualOffsetManager {
    private final CuratorFramework zkClient;
    private final String groupId;
    
    public ManualOffsetManager(String zkConnect, String groupId) {
        this.groupId = groupId;
        this.zkClient = CuratorFrameworkFactory.newClient(
            zkConnect,
            new ExponentialBackoffRetry(1000, 3)
        );
        zkClient.start();
    }
    
    public void commitOffset(String topic, int partition, long offset) {
        try {
            ZookeeperOffsetHandler.setOffsetInZooKeeper(
                zkClient, groupId, topic, partition, offset
            );
        } catch (Exception e) {
            throw new RuntimeException("Failed to commit offset", e);
        }
    }
    
    public Long getCurrentOffset(String topic, int partition) {
        try {
            return ZookeeperOffsetHandler.getOffsetFromZooKeeper(
                zkClient, groupId, topic, partition
            );
        } catch (Exception e) {
            throw new RuntimeException("Failed to get offset", e);
        }
    }
    
    public void close() {
        zkClient.close();
    }
}

// Usage
ManualOffsetManager offsetManager = new ManualOffsetManager(
    "localhost:2181", 
    "manual-offset-group"
);

// Check current offset before processing
Long currentOffset = offsetManager.getCurrentOffset("my-topic", 0);
if (currentOffset == null) {
    // No previous offset, start from beginning
    currentOffset = 0L;
}

// Process messages and commit offset periodically
offsetManager.commitOffset("my-topic", 0, processedOffset);

Offset Reset Strategies

Configure behavior when no committed offset exists:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("zookeeper.connect", "localhost:2181");
props.setProperty("group.id", "reset-strategy-group");

// Configure reset strategy
props.setProperty("auto.offset.reset", "earliest"); // Start from beginning
// OR
props.setProperty("auto.offset.reset", "latest");   // Start from end

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
    "my-topic",
    new SimpleStringSchema(),
    props
);

// Override reset strategy programmatically
consumer.setStartFromEarliest();  // Always start from beginning
consumer.setStartFromLatest();    // Always start from end
consumer.setStartFromGroupOffsets(); // Use committed offsets or auto.offset.reset

Fault Tolerance

Checkpointing Integration

Offsets are automatically included in Flink checkpoints:

// Configure checkpointing for fault tolerance
env.enableCheckpointing(5000);                    // Checkpoint interval
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // Min pause
env.getCheckpointConfig().setCheckpointTimeout(60000);         // Timeout
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);      // Max concurrent

// Consumer participates automatically in checkpointing
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
    "fault-tolerant-topic",
    new SimpleStringSchema(),
    props
);

env.addSource(consumer);

Recovery Behavior

On recovery from failure:

  1. Checkpoint Recovery: Offsets restored from last successful checkpoint
  2. ZooKeeper Fallback: If no checkpoint, use committed offsets in ZooKeeper
  3. Reset Strategy: If no committed offsets, use auto.offset.reset setting

Error Handling

Handle offset-related errors:

// Monitor offset commit failures
props.setProperty("consumer.offset.commit.timeout.ms", "5000");

try {
    // Offset operations may throw exceptions
    Long offset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(
        zkClient, groupId, topic, partition
    );
} catch (Exception e) {
    logger.warn("Failed to retrieve offset from ZooKeeper", e);
    // Fall back to default behavior
    offset = null;
}

Performance Considerations

  • Commit Frequency: Balance between performance and durability
  • ZooKeeper Load: Minimize ZooKeeper operations for high-throughput scenarios
  • Batch Processing: Use batch operations when possible
  • Connection Pooling: Reuse ZooKeeper connections across operations
  • Monitoring: Track offset lag and commit latency