Apache Flink SQL connector for Apache Kafka 0.11 with shaded dependencies providing streaming and table API integration
The Flink Kafka 0.11 connector provides extensive configuration options for consumer startup behavior, custom partitioning strategies, and fine-tuning performance characteristics.
Control how Kafka consumers begin reading from topics with flexible startup options.
/**
* Startup modes for Kafka Consumer determining initial read position
*/
@Internal
enum StartupMode {
/** Start from committed group offsets in Kafka */
GROUP_OFFSETS,
/** Start from earliest available offset */
EARLIEST,
/** Start from latest offset (skip existing messages) */
LATEST,
/** Start from user-supplied timestamp */
TIMESTAMP,
/** Start from user-supplied specific partition offsets */
SPECIFIC_OFFSETS
}Usage Examples:
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "my-consumer-group");
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
"my-topic",
new SimpleStringSchema(),
properties
);
// Start from earliest available messages
consumer.setStartFromEarliest();
// Start from latest messages (skip backlog)
consumer.setStartFromLatest();
// Start from committed group offsets (default behavior)
consumer.setStartFromGroupOffsets();
// Start from specific timestamp (1 hour ago)
long oneHourAgo = System.currentTimeMillis() - (60 * 60 * 1000);
consumer.setStartFromTimestamp(oneHourAgo);
// Start from specific partition offsets
Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
specificOffsets.put(new KafkaTopicPartition("my-topic", 0), 1000L);
specificOffsets.put(new KafkaTopicPartition("my-topic", 1), 2000L);
specificOffsets.put(new KafkaTopicPartition("my-topic", 2), 1500L);
consumer.setStartFromSpecificOffsets(specificOffsets);Configure how and when consumer offsets are committed back to Kafka.
/**
* Offset commit modes controlling when offsets are committed to Kafka
*/
@Internal
enum OffsetCommitMode {
/** Completely disable offset committing - no offsets written to Kafka */
DISABLED,
/** Commit offsets back to Kafka only when checkpoints are completed */
ON_CHECKPOINTS,
/** Commit offsets periodically back to Kafka using Kafka's auto commit mechanism */
KAFKA_PERIODIC
}Usage Examples:
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("group.id", "my-group");
// Disable auto commit - Flink manages offsets via checkpoints
consumerProps.setProperty("enable.auto.commit", "false"); // OffsetCommitMode.ON_CHECKPOINTS
// Enable periodic offset commits (not recommended with checkpointing)
consumerProps.setProperty("enable.auto.commit", "true"); // OffsetCommitMode.KAFKA_PERIODIC
consumerProps.setProperty("auto.commit.interval.ms", "5000");
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
"my-topic",
new SimpleStringSchema(),
consumerProps
);
// Note: When using checkpointing, OffsetCommitMode.ON_CHECKPOINTS is recommended
// to ensure exactly-once processing guaranteesKey configuration options and limits for consumer behavior tuning.
/**
* Configuration constants for consumer behavior
* Located in FlinkKafkaConsumerBase
*/
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; // Maximum pending checkpoints
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE; // Disable partition discovery
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; // Disable metrics collection
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis"; // Partition discovery intervalUsage Examples:
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("group.id", "my-group");
// Standard Kafka consumer configuration
consumerProps.setProperty("auto.offset.reset", "earliest");
consumerProps.setProperty("enable.auto.commit", "false"); // Managed by Flink
consumerProps.setProperty("max.poll.records", "500");
consumerProps.setProperty("session.timeout.ms", "30000");
consumerProps.setProperty("heartbeat.interval.ms", "10000");
// Flink-specific configuration
consumerProps.setProperty("flink.disable-metrics", "false"); // Enable metrics
consumerProps.setProperty("flink.partition-discovery.interval-millis", "30000"); // 30-second discovery
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
Arrays.asList("topic1", "topic2"),
new SimpleStringSchema(),
consumerProps
);
// Enable dynamic partition discovery
consumer.setProperty("flink.partition-discovery.interval-millis", "30000");Essential constants for producer configuration and transaction management.
/**
* Producer configuration constants for optimal performance and reliability
* Located in FlinkKafkaProducer011
*/
public static final int SAFE_SCALE_DOWN_FACTOR = 5; // Safe scale down factor for transactional IDs
public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; // Default producer pool size
public static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours(1); // Default transaction timeout
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; // Configuration key for disabling metricsUsage Examples:
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
// Transaction configuration for exactly-once semantics
producerProps.setProperty("transaction.timeout.ms", "900000"); // 15 minutes
producerProps.setProperty("enable.idempotence", "true");
producerProps.setProperty("retries", "2147483647"); // Max retries
producerProps.setProperty("max.in.flight.requests.per.connection", "5");
producerProps.setProperty("acks", "all");
// Performance tuning
producerProps.setProperty("batch.size", "16384");
producerProps.setProperty("linger.ms", "5");
producerProps.setProperty("buffer.memory", "33554432");
producerProps.setProperty("compression.type", "snappy");
// Flink-specific configuration
producerProps.setProperty("flink.disable-metrics", "false");
// Create producer with custom pool size
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
"output-topic",
new SimpleStringSchema(),
producerProps,
Optional.empty(),
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
10 // Custom producer pool size (default is DEFAULT_KAFKA_PRODUCERS_POOL_SIZE)
);Implement custom partitioning strategies for controlling message distribution across Kafka partitions.
/**
* Base class for custom Kafka partitioners
* Provides deterministic partition assignment for Flink records
*/
@PublicEvolving
abstract class FlinkKafkaPartitioner<T> implements Serializable {
/**
* Initialize the partitioner with parallel execution context
* @param parallelInstanceId the parallel instance ID of this subtask (0-based)
* @param parallelInstances total number of parallel instances
*/
public void open(int parallelInstanceId, int parallelInstances) {
// Default implementation does nothing
// Override for initialization logic
}
/**
* Determine the partition for a record
* @param record the record to partition
* @param key the record key (may be null)
* @param value the record value (may be null)
* @param targetTopic the target topic name
* @param partitions array of available partition IDs for the topic
* @return the partition ID to use (must be one of the values in partitions array)
*/
public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}
/**
* Fixed partitioner ensuring each Flink subtask writes to one Kafka partition
* Provides deterministic assignment for exactly-once semantics
*/
@PublicEvolving
class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
private int parallelInstanceId;
@Override
public void open(int parallelInstanceId, int parallelInstances) {
this.parallelInstanceId = parallelInstanceId;
}
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Assign each Flink subtask to a specific Kafka partition
return partitions[parallelInstanceId % partitions.length];
}
}Custom Partitioner Examples:
// Hash-based partitioner using record content
public class ContentHashPartitioner extends FlinkKafkaPartitioner<UserEvent> {
@Override
public int partition(UserEvent record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Partition based on user ID hash for even distribution
int hash = Math.abs(record.getUserId().hashCode());
return partitions[hash % partitions.length];
}
}
// Time-based partitioner for temporal data organization
public class TimeBasedPartitioner extends FlinkKafkaPartitioner<TimestampedEvent> {
@Override
public int partition(TimestampedEvent record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Partition based on hour of day for temporal organization
long timestamp = record.getTimestamp();
int hourOfDay = (int) ((timestamp / (1000 * 60 * 60)) % 24);
return partitions[hourOfDay % partitions.length];
}
}
// Round-robin partitioner for load balancing
public class RoundRobinPartitioner<T> extends FlinkKafkaPartitioner<T> {
private int nextPartition = 0;
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
int partition = partitions[nextPartition];
nextPartition = (nextPartition + 1) % partitions.length;
return partition;
}
}
// Usage with producer
ContentHashPartitioner partitioner = new ContentHashPartitioner();
FlinkKafkaProducer011<UserEvent> producer = new FlinkKafkaProducer011<>(
"partitioned-events",
new UserEventSerializer("partitioned-events"),
properties,
Optional.of(partitioner),
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE
);Best practices for production deployments and performance optimization.
// Producer configuration for high-throughput, exactly-once scenarios
Properties highThroughputProps = new Properties();
highThroughputProps.setProperty("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
// Transaction configuration
highThroughputProps.setProperty("transaction.timeout.ms", "600000"); // 10 minutes
highThroughputProps.setProperty("enable.idempotence", "true");
highThroughputProps.setProperty("retries", "2147483647");
highThroughputProps.setProperty("max.in.flight.requests.per.connection", "5");
highThroughputProps.setProperty("acks", "all");
// Performance optimization
highThroughputProps.setProperty("batch.size", "32768"); // Larger batches
highThroughputProps.setProperty("linger.ms", "10"); // Slight batching delay
highThroughputProps.setProperty("buffer.memory", "67108864"); // 64MB buffer
highThroughputProps.setProperty("compression.type", "lz4"); // Fast compression
highThroughputProps.setProperty("send.buffer.bytes", "131072"); // 128KB socket buffer
highThroughputProps.setProperty("receive.buffer.bytes", "65536"); // 64KB socket buffer
// Consumer configuration for low-latency, high-throughput scenarios
Properties lowLatencyConsumerProps = new Properties();
lowLatencyConsumerProps.setProperty("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
lowLatencyConsumerProps.setProperty("group.id", "low-latency-processor");
// Fetch configuration for minimal latency
lowLatencyConsumerProps.setProperty("fetch.min.bytes", "1"); // Don't wait for large batches
lowLatencyConsumerProps.setProperty("fetch.max.wait.ms", "100"); // Max 100ms wait
lowLatencyConsumerProps.setProperty("max.poll.records", "1000"); // Process more records per poll
lowLatencyConsumerProps.setProperty("receive.buffer.bytes", "131072"); // 128KB receive buffer
lowLatencyConsumerProps.setProperty("send.buffer.bytes", "131072"); // 128KB send buffer
// Session management
lowLatencyConsumerProps.setProperty("session.timeout.ms", "20000"); // 20 seconds
lowLatencyConsumerProps.setProperty("heartbeat.interval.ms", "6000"); // 6 seconds
lowLatencyConsumerProps.setProperty("max.poll.interval.ms", "600000"); // 10 minutes
// Flink-specific optimizations
lowLatencyConsumerProps.setProperty("flink.partition-discovery.interval-millis", "10000"); // 10-second discoveryConfiguration for reliable offset management and checkpoint integration.
// Consumer offset management configuration
Properties offsetProps = new Properties();
offsetProps.setProperty("bootstrap.servers", "localhost:9092");
offsetProps.setProperty("group.id", "reliable-consumer");
// Offset management (should be managed by Flink for exactly-once processing)
offsetProps.setProperty("enable.auto.commit", "false"); // Let Flink handle commits
offsetProps.setProperty("auto.offset.reset", "earliest"); // Fallback for new consumer groups
// Configure consumer for checkpoint integration
FlinkKafkaConsumer011<String> checkpointConsumer = new FlinkKafkaConsumer011<>(
"reliable-topic",
new SimpleStringSchema(),
offsetProps
);
// Enable offset committing on checkpoints (recommended for exactly-once)
checkpointConsumer.setCommitOffsetsOnCheckpoints(true);
// Configure startup from group offsets with fallback
checkpointConsumer.setStartFromGroupOffsets();
// In case of job restart from savepoint, consumer will resume from checkpoint offsets
// In case of new deployment, consumer will start from group offsets or auto.offset.resetProduction Deployment Example:
// Complete production-ready configuration
public class ProductionKafkaConfig {
public static FlinkKafkaConsumer011<OrderEvent> createOrderConsumer() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka-cluster:9092");
props.setProperty("group.id", "order-processor-v2");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("enable.auto.commit", "false");
props.setProperty("session.timeout.ms", "30000");
props.setProperty("heartbeat.interval.ms", "10000");
props.setProperty("max.poll.records", "1000");
props.setProperty("flink.partition-discovery.interval-millis", "30000");
props.setProperty("flink.disable-metrics", "false");
FlinkKafkaConsumer011<OrderEvent> consumer = new FlinkKafkaConsumer011<>(
Arrays.asList("orders", "order-updates"),
new OrderEventDeserializer(),
props
);
consumer.setStartFromGroupOffsets();
consumer.setCommitOffsetsOnCheckpoints(true);
return consumer;
}
public static FlinkKafkaProducer011<ProcessedOrder> createOrderProducer() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka-cluster:9092");
props.setProperty("transaction.timeout.ms", "900000");
props.setProperty("enable.idempotence", "true");
props.setProperty("retries", "2147483647");
props.setProperty("acks", "all");
props.setProperty("batch.size", "16384");
props.setProperty("linger.ms", "5");
props.setProperty("compression.type", "snappy");
return new FlinkKafkaProducer011<>(
"processed-orders",
new ProcessedOrderSerializer("processed-orders"),
props,
Optional.of(new FlinkFixedPartitioner<>()),
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE
);
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-kafka-0-11-2-11