The Flink Kafka 0.11 connector provides extensive configuration options for consumer startup behavior, custom partitioning strategies, and fine-tuning performance characteristics.
Control how Kafka consumers begin reading from topics with flexible startup options.
/**
* Startup modes for Kafka Consumer determining initial read position
*/
@Internal
enum StartupMode {
/** Start from committed group offsets in Kafka */
GROUP_OFFSETS,
/** Start from earliest available offset */
EARLIEST,
/** Start from latest offset (skip existing messages) */
LATEST,
/** Start from user-supplied timestamp */
TIMESTAMP,
/** Start from user-supplied specific partition offsets */
SPECIFIC_OFFSETS
}Usage Examples:
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "my-consumer-group");
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
"my-topic",
new SimpleStringSchema(),
properties
);
// Start from earliest available messages
consumer.setStartFromEarliest();
// Start from latest messages (skip backlog)
consumer.setStartFromLatest();
// Start from committed group offsets (default behavior)
consumer.setStartFromGroupOffsets();
// Start from specific timestamp (1 hour ago)
long oneHourAgo = System.currentTimeMillis() - (60 * 60 * 1000);
consumer.setStartFromTimestamp(oneHourAgo);
// Start from specific partition offsets
Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
specificOffsets.put(new KafkaTopicPartition("my-topic", 0), 1000L);
specificOffsets.put(new KafkaTopicPartition("my-topic", 1), 2000L);
specificOffsets.put(new KafkaTopicPartition("my-topic", 2), 1500L);
consumer.setStartFromSpecificOffsets(specificOffsets);Configure how and when consumer offsets are committed back to Kafka.
/**
* Offset commit modes controlling when offsets are committed to Kafka
*/
@Internal
enum OffsetCommitMode {
/** Completely disable offset committing - no offsets written to Kafka */
DISABLED,
/** Commit offsets back to Kafka only when checkpoints are completed */
ON_CHECKPOINTS,
/** Commit offsets periodically back to Kafka using Kafka's auto commit mechanism */
KAFKA_PERIODIC
}Usage Examples:
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("group.id", "my-group");
// Disable auto commit - Flink manages offsets via checkpoints
consumerProps.setProperty("enable.auto.commit", "false"); // OffsetCommitMode.ON_CHECKPOINTS
// Enable periodic offset commits (not recommended with checkpointing)
consumerProps.setProperty("enable.auto.commit", "true"); // OffsetCommitMode.KAFKA_PERIODIC
consumerProps.setProperty("auto.commit.interval.ms", "5000");
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
"my-topic",
new SimpleStringSchema(),
consumerProps
);
// Note: When using checkpointing, OffsetCommitMode.ON_CHECKPOINTS is recommended
// to ensure exactly-once processing guaranteesKey configuration options and limits for consumer behavior tuning.
/**
* Configuration constants for consumer behavior
* Located in FlinkKafkaConsumerBase
*/
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; // Maximum pending checkpoints
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE; // Disable partition discovery
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; // Disable metrics collection
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis"; // Partition discovery intervalUsage Examples:
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("group.id", "my-group");
// Standard Kafka consumer configuration
consumerProps.setProperty("auto.offset.reset", "earliest");
consumerProps.setProperty("enable.auto.commit", "false"); // Managed by Flink
consumerProps.setProperty("max.poll.records", "500");
consumerProps.setProperty("session.timeout.ms", "30000");
consumerProps.setProperty("heartbeat.interval.ms", "10000");
// Flink-specific configuration
consumerProps.setProperty("flink.disable-metrics", "false"); // Enable metrics
consumerProps.setProperty("flink.partition-discovery.interval-millis", "30000"); // 30-second discovery
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
Arrays.asList("topic1", "topic2"),
new SimpleStringSchema(),
consumerProps
);
// Enable dynamic partition discovery
consumer.setProperty("flink.partition-discovery.interval-millis", "30000");Essential constants for producer configuration and transaction management.
/**
* Producer configuration constants for optimal performance and reliability
* Located in FlinkKafkaProducer011
*/
public static final int SAFE_SCALE_DOWN_FACTOR = 5; // Safe scale down factor for transactional IDs
public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; // Default producer pool size
public static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours(1); // Default transaction timeout
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; // Configuration key for disabling metricsUsage Examples:
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
// Transaction configuration for exactly-once semantics
producerProps.setProperty("transaction.timeout.ms", "900000"); // 15 minutes
producerProps.setProperty("enable.idempotence", "true");
producerProps.setProperty("retries", "2147483647"); // Max retries
producerProps.setProperty("max.in.flight.requests.per.connection", "5");
producerProps.setProperty("acks", "all");
// Performance tuning
producerProps.setProperty("batch.size", "16384");
producerProps.setProperty("linger.ms", "5");
producerProps.setProperty("buffer.memory", "33554432");
producerProps.setProperty("compression.type", "snappy");
// Flink-specific configuration
producerProps.setProperty("flink.disable-metrics", "false");
// Create producer with custom pool size
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
"output-topic",
new SimpleStringSchema(),
producerProps,
Optional.empty(),
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
10 // Custom producer pool size (default is DEFAULT_KAFKA_PRODUCERS_POOL_SIZE)
);Implement custom partitioning strategies for controlling message distribution across Kafka partitions.
/**
* Base class for custom Kafka partitioners
* Provides deterministic partition assignment for Flink records
*/
@PublicEvolving
abstract class FlinkKafkaPartitioner<T> implements Serializable {
/**
* Initialize the partitioner with parallel execution context
* @param parallelInstanceId the parallel instance ID of this subtask (0-based)
* @param parallelInstances total number of parallel instances
*/
public void open(int parallelInstanceId, int parallelInstances) {
// Default implementation does nothing
// Override for initialization logic
}
/**
* Determine the partition for a record
* @param record the record to partition
* @param key the record key (may be null)
* @param value the record value (may be null)
* @param targetTopic the target topic name
* @param partitions array of available partition IDs for the topic
* @return the partition ID to use (must be one of the values in partitions array)
*/
public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}
/**
* Fixed partitioner ensuring each Flink subtask writes to one Kafka partition
* Provides deterministic assignment for exactly-once semantics
*/
@PublicEvolving
class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
private int parallelInstanceId;
@Override
public void open(int parallelInstanceId, int parallelInstances) {
this.parallelInstanceId = parallelInstanceId;
}
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Assign each Flink subtask to a specific Kafka partition
return partitions[parallelInstanceId % partitions.length];
}
}Custom Partitioner Examples:
// Hash-based partitioner using record content
public class ContentHashPartitioner extends FlinkKafkaPartitioner<UserEvent> {
@Override
public int partition(UserEvent record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Partition based on user ID hash for even distribution
int hash = Math.abs(record.getUserId().hashCode());
return partitions[hash % partitions.length];
}
}
// Time-based partitioner for temporal data organization
public class TimeBasedPartitioner extends FlinkKafkaPartitioner<TimestampedEvent> {
@Override
public int partition(TimestampedEvent record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Partition based on hour of day for temporal organization
long timestamp = record.getTimestamp();
int hourOfDay = (int) ((timestamp / (1000 * 60 * 60)) % 24);
return partitions[hourOfDay % partitions.length];
}
}
// Round-robin partitioner for load balancing
public class RoundRobinPartitioner<T> extends FlinkKafkaPartitioner<T> {
private int nextPartition = 0;
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
int partition = partitions[nextPartition];
nextPartition = (nextPartition + 1) % partitions.length;
return partition;
}
}
// Usage with producer
ContentHashPartitioner partitioner = new ContentHashPartitioner();
FlinkKafkaProducer011<UserEvent> producer = new FlinkKafkaProducer011<>(
"partitioned-events",
new UserEventSerializer("partitioned-events"),
properties,
Optional.of(partitioner),
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE
);Best practices for production deployments and performance optimization.
// Producer configuration for high-throughput, exactly-once scenarios
Properties highThroughputProps = new Properties();
highThroughputProps.setProperty("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
// Transaction configuration
highThroughputProps.setProperty("transaction.timeout.ms", "600000"); // 10 minutes
highThroughputProps.setProperty("enable.idempotence", "true");
highThroughputProps.setProperty("retries", "2147483647");
highThroughputProps.setProperty("max.in.flight.requests.per.connection", "5");
highThroughputProps.setProperty("acks", "all");
// Performance optimization
highThroughputProps.setProperty("batch.size", "32768"); // Larger batches
highThroughputProps.setProperty("linger.ms", "10"); // Slight batching delay
highThroughputProps.setProperty("buffer.memory", "67108864"); // 64MB buffer
highThroughputProps.setProperty("compression.type", "lz4"); // Fast compression
highThroughputProps.setProperty("send.buffer.bytes", "131072"); // 128KB socket buffer
highThroughputProps.setProperty("receive.buffer.bytes", "65536"); // 64KB socket buffer
// Consumer configuration for low-latency, high-throughput scenarios
Properties lowLatencyConsumerProps = new Properties();
lowLatencyConsumerProps.setProperty("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
lowLatencyConsumerProps.setProperty("group.id", "low-latency-processor");
// Fetch configuration for minimal latency
lowLatencyConsumerProps.setProperty("fetch.min.bytes", "1"); // Don't wait for large batches
lowLatencyConsumerProps.setProperty("fetch.max.wait.ms", "100"); // Max 100ms wait
lowLatencyConsumerProps.setProperty("max.poll.records", "1000"); // Process more records per poll
lowLatencyConsumerProps.setProperty("receive.buffer.bytes", "131072"); // 128KB receive buffer
lowLatencyConsumerProps.setProperty("send.buffer.bytes", "131072"); // 128KB send buffer
// Session management
lowLatencyConsumerProps.setProperty("session.timeout.ms", "20000"); // 20 seconds
lowLatencyConsumerProps.setProperty("heartbeat.interval.ms", "6000"); // 6 seconds
lowLatencyConsumerProps.setProperty("max.poll.interval.ms", "600000"); // 10 minutes
// Flink-specific optimizations
lowLatencyConsumerProps.setProperty("flink.partition-discovery.interval-millis", "10000"); // 10-second discoveryConfiguration for reliable offset management and checkpoint integration.
// Consumer offset management configuration
Properties offsetProps = new Properties();
offsetProps.setProperty("bootstrap.servers", "localhost:9092");
offsetProps.setProperty("group.id", "reliable-consumer");
// Offset management (should be managed by Flink for exactly-once processing)
offsetProps.setProperty("enable.auto.commit", "false"); // Let Flink handle commits
offsetProps.setProperty("auto.offset.reset", "earliest"); // Fallback for new consumer groups
// Configure consumer for checkpoint integration
FlinkKafkaConsumer011<String> checkpointConsumer = new FlinkKafkaConsumer011<>(
"reliable-topic",
new SimpleStringSchema(),
offsetProps
);
// Enable offset committing on checkpoints (recommended for exactly-once)
checkpointConsumer.setCommitOffsetsOnCheckpoints(true);
// Configure startup from group offsets with fallback
checkpointConsumer.setStartFromGroupOffsets();
// In case of job restart from savepoint, consumer will resume from checkpoint offsets
// In case of new deployment, consumer will start from group offsets or auto.offset.resetProduction Deployment Example:
// Complete production-ready configuration
public class ProductionKafkaConfig {
public static FlinkKafkaConsumer011<OrderEvent> createOrderConsumer() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka-cluster:9092");
props.setProperty("group.id", "order-processor-v2");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("enable.auto.commit", "false");
props.setProperty("session.timeout.ms", "30000");
props.setProperty("heartbeat.interval.ms", "10000");
props.setProperty("max.poll.records", "1000");
props.setProperty("flink.partition-discovery.interval-millis", "30000");
props.setProperty("flink.disable-metrics", "false");
FlinkKafkaConsumer011<OrderEvent> consumer = new FlinkKafkaConsumer011<>(
Arrays.asList("orders", "order-updates"),
new OrderEventDeserializer(),
props
);
consumer.setStartFromGroupOffsets();
consumer.setCommitOffsetsOnCheckpoints(true);
return consumer;
}
public static FlinkKafkaProducer011<ProcessedOrder> createOrderProducer() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka-cluster:9092");
props.setProperty("transaction.timeout.ms", "900000");
props.setProperty("enable.idempotence", "true");
props.setProperty("retries", "2147483647");
props.setProperty("acks", "all");
props.setProperty("batch.size", "16384");
props.setProperty("linger.ms", "5");
props.setProperty("compression.type", "snappy");
return new FlinkKafkaProducer011<>(
"processed-orders",
new ProcessedOrderSerializer("processed-orders"),
props,
Optional.of(new FlinkFixedPartitioner<>()),
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE
);
}
}