or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdindex.mdserialization.mdstreaming-consumer.mdstreaming-producer.mdtable-api.md
tile.json

configuration.mddocs/

Configuration and Partitioning

The Flink Kafka 0.11 connector provides extensive configuration options for consumer startup behavior, custom partitioning strategies, and fine-tuning performance characteristics.

Capabilities

Startup Mode Configuration

Control how Kafka consumers begin reading from topics with flexible startup options.

/**
 * Startup modes for Kafka Consumer determining initial read position
 */
@Internal
enum StartupMode {
    /** Start from committed group offsets in Kafka */
    GROUP_OFFSETS,
    /** Start from earliest available offset */
    EARLIEST,
    /** Start from latest offset (skip existing messages) */
    LATEST,
    /** Start from user-supplied timestamp */
    TIMESTAMP,
    /** Start from user-supplied specific partition offsets */
    SPECIFIC_OFFSETS
}

Usage Examples:

import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "my-consumer-group");

FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
    "my-topic",
    new SimpleStringSchema(),
    properties
);

// Start from earliest available messages
consumer.setStartFromEarliest();

// Start from latest messages (skip backlog)
consumer.setStartFromLatest();

// Start from committed group offsets (default behavior)  
consumer.setStartFromGroupOffsets();

// Start from specific timestamp (1 hour ago)
long oneHourAgo = System.currentTimeMillis() - (60 * 60 * 1000);
consumer.setStartFromTimestamp(oneHourAgo);

// Start from specific partition offsets
Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
specificOffsets.put(new KafkaTopicPartition("my-topic", 0), 1000L);
specificOffsets.put(new KafkaTopicPartition("my-topic", 1), 2000L);
specificOffsets.put(new KafkaTopicPartition("my-topic", 2), 1500L);
consumer.setStartFromSpecificOffsets(specificOffsets);

Offset Commit Configuration

Configure how and when consumer offsets are committed back to Kafka.

/**
 * Offset commit modes controlling when offsets are committed to Kafka
 */
@Internal  
enum OffsetCommitMode {
    /** Completely disable offset committing - no offsets written to Kafka */
    DISABLED,
    /** Commit offsets back to Kafka only when checkpoints are completed */
    ON_CHECKPOINTS,
    /** Commit offsets periodically back to Kafka using Kafka's auto commit mechanism */
    KAFKA_PERIODIC
}

Usage Examples:

Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("group.id", "my-group");

// Disable auto commit - Flink manages offsets via checkpoints
consumerProps.setProperty("enable.auto.commit", "false"); // OffsetCommitMode.ON_CHECKPOINTS

// Enable periodic offset commits (not recommended with checkpointing)
consumerProps.setProperty("enable.auto.commit", "true");   // OffsetCommitMode.KAFKA_PERIODIC
consumerProps.setProperty("auto.commit.interval.ms", "5000");

FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
    "my-topic",
    new SimpleStringSchema(),
    consumerProps
);

// Note: When using checkpointing, OffsetCommitMode.ON_CHECKPOINTS is recommended
// to ensure exactly-once processing guarantees

Consumer Configuration Constants

Key configuration options and limits for consumer behavior tuning.

/**
 * Configuration constants for consumer behavior
 * Located in FlinkKafkaConsumerBase
 */
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; // Maximum pending checkpoints
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE; // Disable partition discovery
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; // Disable metrics collection
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis"; // Partition discovery interval

Usage Examples:

Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("group.id", "my-group");

// Standard Kafka consumer configuration
consumerProps.setProperty("auto.offset.reset", "earliest");
consumerProps.setProperty("enable.auto.commit", "false"); // Managed by Flink
consumerProps.setProperty("max.poll.records", "500");
consumerProps.setProperty("session.timeout.ms", "30000");
consumerProps.setProperty("heartbeat.interval.ms", "10000");

// Flink-specific configuration
consumerProps.setProperty("flink.disable-metrics", "false"); // Enable metrics
consumerProps.setProperty("flink.partition-discovery.interval-millis", "30000"); // 30-second discovery

FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
    Arrays.asList("topic1", "topic2"),
    new SimpleStringSchema(),
    consumerProps
);

// Enable dynamic partition discovery
consumer.setProperty("flink.partition-discovery.interval-millis", "30000");

Producer Configuration Constants

Essential constants for producer configuration and transaction management.

/**
 * Producer configuration constants for optimal performance and reliability
 * Located in FlinkKafkaProducer011
 */
public static final int SAFE_SCALE_DOWN_FACTOR = 5; // Safe scale down factor for transactional IDs
public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; // Default producer pool size  
public static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours(1); // Default transaction timeout
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; // Configuration key for disabling metrics

Usage Examples:

Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");

// Transaction configuration for exactly-once semantics
producerProps.setProperty("transaction.timeout.ms", "900000"); // 15 minutes
producerProps.setProperty("enable.idempotence", "true");
producerProps.setProperty("retries", "2147483647"); // Max retries
producerProps.setProperty("max.in.flight.requests.per.connection", "5");
producerProps.setProperty("acks", "all");

// Performance tuning
producerProps.setProperty("batch.size", "16384");
producerProps.setProperty("linger.ms", "5");
producerProps.setProperty("buffer.memory", "33554432");
producerProps.setProperty("compression.type", "snappy");

// Flink-specific configuration
producerProps.setProperty("flink.disable-metrics", "false");

// Create producer with custom pool size
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
    "output-topic",
    new SimpleStringSchema(),
    producerProps,
    Optional.empty(),
    FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
    10 // Custom producer pool size (default is DEFAULT_KAFKA_PRODUCERS_POOL_SIZE)
);

Custom Partitioning

Implement custom partitioning strategies for controlling message distribution across Kafka partitions.

/**
 * Base class for custom Kafka partitioners
 * Provides deterministic partition assignment for Flink records
 */
@PublicEvolving
abstract class FlinkKafkaPartitioner<T> implements Serializable {
    
    /**
     * Initialize the partitioner with parallel execution context
     * @param parallelInstanceId the parallel instance ID of this subtask (0-based)
     * @param parallelInstances total number of parallel instances
     */
    public void open(int parallelInstanceId, int parallelInstances) {
        // Default implementation does nothing
        // Override for initialization logic
    }
    
    /**
     * Determine the partition for a record
     * @param record the record to partition
     * @param key the record key (may be null)
     * @param value the record value (may be null)
     * @param targetTopic the target topic name
     * @param partitions array of available partition IDs for the topic
     * @return the partition ID to use (must be one of the values in partitions array)
     */
    public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}

/**
 * Fixed partitioner ensuring each Flink subtask writes to one Kafka partition
 * Provides deterministic assignment for exactly-once semantics
 */
@PublicEvolving
class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
    private int parallelInstanceId;
    
    @Override
    public void open(int parallelInstanceId, int parallelInstances) {
        this.parallelInstanceId = parallelInstanceId;
    }
    
    @Override
    public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        // Assign each Flink subtask to a specific Kafka partition
        return partitions[parallelInstanceId % partitions.length];
    }
}

Custom Partitioner Examples:

// Hash-based partitioner using record content
public class ContentHashPartitioner extends FlinkKafkaPartitioner<UserEvent> {
    @Override
    public int partition(UserEvent record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        // Partition based on user ID hash for even distribution
        int hash = Math.abs(record.getUserId().hashCode());
        return partitions[hash % partitions.length];
    }
}

// Time-based partitioner for temporal data organization
public class TimeBasedPartitioner extends FlinkKafkaPartitioner<TimestampedEvent> {
    @Override
    public int partition(TimestampedEvent record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        // Partition based on hour of day for temporal organization
        long timestamp = record.getTimestamp();
        int hourOfDay = (int) ((timestamp / (1000 * 60 * 60)) % 24);
        return partitions[hourOfDay % partitions.length]; 
    }
}

// Round-robin partitioner for load balancing
public class RoundRobinPartitioner<T> extends FlinkKafkaPartitioner<T> {
    private int nextPartition = 0;
    
    @Override
    public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        int partition = partitions[nextPartition];
        nextPartition = (nextPartition + 1) % partitions.length;
        return partition;
    }
}

// Usage with producer
ContentHashPartitioner partitioner = new ContentHashPartitioner();
FlinkKafkaProducer011<UserEvent> producer = new FlinkKafkaProducer011<>(
    "partitioned-events",
    new UserEventSerializer("partitioned-events"),
    properties,
    Optional.of(partitioner),
    FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
    FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE
);

Advanced Configuration Patterns

Best practices for production deployments and performance optimization.

// Producer configuration for high-throughput, exactly-once scenarios
Properties highThroughputProps = new Properties();
highThroughputProps.setProperty("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");

// Transaction configuration
highThroughputProps.setProperty("transaction.timeout.ms", "600000"); // 10 minutes
highThroughputProps.setProperty("enable.idempotence", "true");
highThroughputProps.setProperty("retries", "2147483647");
highThroughputProps.setProperty("max.in.flight.requests.per.connection", "5");
highThroughputProps.setProperty("acks", "all");

// Performance optimization
highThroughputProps.setProperty("batch.size", "32768"); // Larger batches
highThroughputProps.setProperty("linger.ms", "10"); // Slight batching delay
highThroughputProps.setProperty("buffer.memory", "67108864"); // 64MB buffer
highThroughputProps.setProperty("compression.type", "lz4"); // Fast compression
highThroughputProps.setProperty("send.buffer.bytes", "131072"); // 128KB socket buffer
highThroughputProps.setProperty("receive.buffer.bytes", "65536"); // 64KB socket buffer

// Consumer configuration for low-latency, high-throughput scenarios  
Properties lowLatencyConsumerProps = new Properties();
lowLatencyConsumerProps.setProperty("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
lowLatencyConsumerProps.setProperty("group.id", "low-latency-processor");

// Fetch configuration for minimal latency
lowLatencyConsumerProps.setProperty("fetch.min.bytes", "1"); // Don't wait for large batches
lowLatencyConsumerProps.setProperty("fetch.max.wait.ms", "100"); // Max 100ms wait
lowLatencyConsumerProps.setProperty("max.poll.records", "1000"); // Process more records per poll
lowLatencyConsumerProps.setProperty("receive.buffer.bytes", "131072"); // 128KB receive buffer
lowLatencyConsumerProps.setProperty("send.buffer.bytes", "131072"); // 128KB send buffer

// Session management
lowLatencyConsumerProps.setProperty("session.timeout.ms", "20000"); // 20 seconds
lowLatencyConsumerProps.setProperty("heartbeat.interval.ms", "6000"); // 6 seconds
lowLatencyConsumerProps.setProperty("max.poll.interval.ms", "600000"); // 10 minutes

// Flink-specific optimizations
lowLatencyConsumerProps.setProperty("flink.partition-discovery.interval-millis", "10000"); // 10-second discovery

Offset Management and Checkpointing

Configuration for reliable offset management and checkpoint integration.

// Consumer offset management configuration
Properties offsetProps = new Properties();
offsetProps.setProperty("bootstrap.servers", "localhost:9092");
offsetProps.setProperty("group.id", "reliable-consumer");

// Offset management (should be managed by Flink for exactly-once processing)
offsetProps.setProperty("enable.auto.commit", "false"); // Let Flink handle commits
offsetProps.setProperty("auto.offset.reset", "earliest"); // Fallback for new consumer groups

// Configure consumer for checkpoint integration
FlinkKafkaConsumer011<String> checkpointConsumer = new FlinkKafkaConsumer011<>(
    "reliable-topic",
    new SimpleStringSchema(),
    offsetProps
);

// Enable offset committing on checkpoints (recommended for exactly-once)
checkpointConsumer.setCommitOffsetsOnCheckpoints(true);

// Configure startup from group offsets with fallback
checkpointConsumer.setStartFromGroupOffsets();

// In case of job restart from savepoint, consumer will resume from checkpoint offsets
// In case of new deployment, consumer will start from group offsets or auto.offset.reset

Production Deployment Example:

// Complete production-ready configuration
public class ProductionKafkaConfig {
    
    public static FlinkKafkaConsumer011<OrderEvent> createOrderConsumer() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "kafka-cluster:9092");
        props.setProperty("group.id", "order-processor-v2");
        props.setProperty("auto.offset.reset", "earliest");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("session.timeout.ms", "30000");
        props.setProperty("heartbeat.interval.ms", "10000");
        props.setProperty("max.poll.records", "1000");
        props.setProperty("flink.partition-discovery.interval-millis", "30000");
        props.setProperty("flink.disable-metrics", "false");
        
        FlinkKafkaConsumer011<OrderEvent> consumer = new FlinkKafkaConsumer011<>(
            Arrays.asList("orders", "order-updates"),
            new OrderEventDeserializer(),
            props
        );
        
        consumer.setStartFromGroupOffsets();
        consumer.setCommitOffsetsOnCheckpoints(true);
        
        return consumer;
    }
    
    public static FlinkKafkaProducer011<ProcessedOrder> createOrderProducer() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "kafka-cluster:9092");
        props.setProperty("transaction.timeout.ms", "900000");
        props.setProperty("enable.idempotence", "true");
        props.setProperty("retries", "2147483647");
        props.setProperty("acks", "all");
        props.setProperty("batch.size", "16384");
        props.setProperty("linger.ms", "5");
        props.setProperty("compression.type", "snappy");
        
        return new FlinkKafkaProducer011<>(
            "processed-orders",
            new ProcessedOrderSerializer("processed-orders"),
            props,
            Optional.of(new FlinkFixedPartitioner<>()),
            FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
            FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE
        );
    }
}