The FlinkKafkaConsumer011 provides robust Kafka topic consumption with exactly-once processing guarantees, flexible startup modes, and seamless integration with Flink's checkpointing mechanism.
Main consumer class for reading from Kafka 0.11.x topics with comprehensive configuration options.
/**
* Kafka consumer for Kafka 0.11.x supporting exactly-once semantics and checkpointing
* Extends FlinkKafkaConsumer010 with additional 0.11.x-specific features
*/
@PublicEvolving
class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {
// Single topic constructors
FlinkKafkaConsumer011(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
FlinkKafkaConsumer011(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);
// Multiple topics constructors
FlinkKafkaConsumer011(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
FlinkKafkaConsumer011(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);
// Pattern-based topic subscription (for dynamic topic discovery)
@PublicEvolving
FlinkKafkaConsumer011(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
@PublicEvolving
FlinkKafkaConsumer011(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);
}Usage Examples:
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
// Single topic consumption
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-consumer-group");
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
"user-events",
new SimpleStringSchema(),
props
);
// Multiple topics consumption
List<String> topics = Arrays.asList("orders", "payments", "shipments");
FlinkKafkaConsumer011<String> multiTopicConsumer = new FlinkKafkaConsumer011<>(
topics,
new SimpleStringSchema(),
props
);
// Pattern-based subscription for dynamic topic discovery
Pattern topicPattern = Pattern.compile("events-.*");
FlinkKafkaConsumer011<String> patternConsumer = new FlinkKafkaConsumer011<>(
topicPattern,
new SimpleStringSchema(),
props
);Control how the consumer starts reading from Kafka topics.
// Configure startup behavior (inherited from FlinkKafkaConsumerBase)
FlinkKafkaConsumer011<T> setStartFromEarliest();
FlinkKafkaConsumer011<T> setStartFromLatest();
FlinkKafkaConsumer011<T> setStartFromGroupOffsets();
FlinkKafkaConsumer011<T> setStartFromTimestamp(long startupOffsetsTimestamp);
FlinkKafkaConsumer011<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);Usage Examples:
// Start from earliest available messages
consumer.setStartFromEarliest();
// Start from latest messages (skip existing messages)
consumer.setStartFromLatest();
// Start from committed offsets (default behavior)
consumer.setStartFromGroupOffsets();
// Start from specific timestamp
long timestamp = System.currentTimeMillis() - (24 * 60 * 60 * 1000); // 24 hours ago
consumer.setStartFromTimestamp(timestamp);
// Start from specific partition offsets
Map<KafkaTopicPartition, Long> offsets = new HashMap<>();
offsets.put(new KafkaTopicPartition("my-topic", 0), 1000L);
offsets.put(new KafkaTopicPartition("my-topic", 1), 2000L);
consumer.setStartFromSpecificOffsets(offsets);Essential configuration options for optimal consumer behavior.
// Key configuration constants (inherited from FlinkKafkaConsumerBase)
static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
static final int MAX_NUM_PENDING_CHECKPOINTS = 100;Usage Examples:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-group");
// Kafka consumer configuration
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("enable.auto.commit", "false"); // Managed by Flink
props.setProperty("max.poll.records", "500");
// Flink-specific configuration
props.setProperty("flink.disable-metrics", "false");
props.setProperty("flink.partition-discovery.interval-millis", "30000"); // 30 seconds
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
"my-topic",
new SimpleStringSchema(),
props
);
// Enable partition discovery for dynamic partitions
consumer.setProperty("flink.partition-discovery.interval-millis", "30000");Additional configuration methods for specialized use cases.
// Commit mode configuration (inherited from FlinkKafkaConsumerBase)
FlinkKafkaConsumer011<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);
// Partition assignment and metadata access
FlinkKafkaConsumer011<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy);Usage Examples:
// Control offset committing behavior
consumer.setCommitOffsetsOnCheckpoints(true); // Commit offsets to Kafka on checkpoint
// Configure watermark generation for event time processing
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> extractTimestamp(event));
consumer.assignTimestampsAndWatermarks(watermarkStrategy);
// Integration with Flink DataStream
DataStream<String> kafkaStream = env
.addSource(consumer)
.name("Kafka Source");The consumer integrates with the Flink Kafka exception hierarchy for comprehensive error management.
// Consumer may throw FlinkKafka011Exception for configuration or runtime errors
// Exception handling is typically done at the Flink job level through restart strategiesConfiguration for resilience:
// Configure consumer for resilient operation
props.setProperty("session.timeout.ms", "30000");
props.setProperty("heartbeat.interval.ms", "10000");
props.setProperty("max.poll.interval.ms", "300000");
props.setProperty("connections.max.idle.ms", "540000");
props.setProperty("request.timeout.ms", "60000");