0
# Streaming Consumer
1
2
The `FlinkKafkaConsumer011` provides robust Kafka topic consumption with exactly-once processing guarantees, flexible startup modes, and seamless integration with Flink's checkpointing mechanism.
3
4
## Capabilities
5
6
### FlinkKafkaConsumer011 Class
7
8
Main consumer class for reading from Kafka 0.11.x topics with comprehensive configuration options.
9
10
```java { .api }
11
/**
12
* Kafka consumer for Kafka 0.11.x supporting exactly-once semantics and checkpointing
13
* Extends FlinkKafkaConsumer010 with additional 0.11.x-specific features
14
*/
15
@PublicEvolving
16
class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {
17
18
// Single topic constructors
19
FlinkKafkaConsumer011(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
20
FlinkKafkaConsumer011(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);
21
22
// Multiple topics constructors
23
FlinkKafkaConsumer011(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
24
FlinkKafkaConsumer011(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);
25
26
// Pattern-based topic subscription (for dynamic topic discovery)
27
@PublicEvolving
28
FlinkKafkaConsumer011(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
29
@PublicEvolving
30
FlinkKafkaConsumer011(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);
31
}
32
```
33
34
**Usage Examples:**
35
36
```java
37
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
38
import org.apache.flink.api.common.serialization.SimpleStringSchema;
39
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
40
41
// Single topic consumption
42
Properties props = new Properties();
43
props.setProperty("bootstrap.servers", "localhost:9092");
44
props.setProperty("group.id", "my-consumer-group");
45
46
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
47
"user-events",
48
new SimpleStringSchema(),
49
props
50
);
51
52
// Multiple topics consumption
53
List<String> topics = Arrays.asList("orders", "payments", "shipments");
54
FlinkKafkaConsumer011<String> multiTopicConsumer = new FlinkKafkaConsumer011<>(
55
topics,
56
new SimpleStringSchema(),
57
props
58
);
59
60
// Pattern-based subscription for dynamic topic discovery
61
Pattern topicPattern = Pattern.compile("events-.*");
62
FlinkKafkaConsumer011<String> patternConsumer = new FlinkKafkaConsumer011<>(
63
topicPattern,
64
new SimpleStringSchema(),
65
props
66
);
67
```
68
69
### Startup Mode Configuration
70
71
Control how the consumer starts reading from Kafka topics.
72
73
```java { .api }
74
// Configure startup behavior (inherited from FlinkKafkaConsumerBase)
75
FlinkKafkaConsumer011<T> setStartFromEarliest();
76
FlinkKafkaConsumer011<T> setStartFromLatest();
77
FlinkKafkaConsumer011<T> setStartFromGroupOffsets();
78
FlinkKafkaConsumer011<T> setStartFromTimestamp(long startupOffsetsTimestamp);
79
FlinkKafkaConsumer011<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);
80
```
81
82
**Usage Examples:**
83
84
```java
85
// Start from earliest available messages
86
consumer.setStartFromEarliest();
87
88
// Start from latest messages (skip existing messages)
89
consumer.setStartFromLatest();
90
91
// Start from committed offsets (default behavior)
92
consumer.setStartFromGroupOffsets();
93
94
// Start from specific timestamp
95
long timestamp = System.currentTimeMillis() - (24 * 60 * 60 * 1000); // 24 hours ago
96
consumer.setStartFromTimestamp(timestamp);
97
98
// Start from specific partition offsets
99
Map<KafkaTopicPartition, Long> offsets = new HashMap<>();
100
offsets.put(new KafkaTopicPartition("my-topic", 0), 1000L);
101
offsets.put(new KafkaTopicPartition("my-topic", 1), 2000L);
102
consumer.setStartFromSpecificOffsets(offsets);
103
```
104
105
### Consumer Configuration
106
107
Essential configuration options for optimal consumer behavior.
108
109
```java { .api }
110
// Key configuration constants (inherited from FlinkKafkaConsumerBase)
111
static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
112
static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
113
static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
114
static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
115
```
116
117
**Usage Examples:**
118
119
```java
120
Properties props = new Properties();
121
props.setProperty("bootstrap.servers", "localhost:9092");
122
props.setProperty("group.id", "my-group");
123
124
// Kafka consumer configuration
125
props.setProperty("auto.offset.reset", "earliest");
126
props.setProperty("enable.auto.commit", "false"); // Managed by Flink
127
props.setProperty("max.poll.records", "500");
128
129
// Flink-specific configuration
130
props.setProperty("flink.disable-metrics", "false");
131
props.setProperty("flink.partition-discovery.interval-millis", "30000"); // 30 seconds
132
133
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
134
"my-topic",
135
new SimpleStringSchema(),
136
props
137
);
138
139
// Enable partition discovery for dynamic partitions
140
consumer.setProperty("flink.partition-discovery.interval-millis", "30000");
141
```
142
143
### Advanced Consumer Features
144
145
Additional configuration methods for specialized use cases.
146
147
```java { .api }
148
// Commit mode configuration (inherited from FlinkKafkaConsumerBase)
149
FlinkKafkaConsumer011<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);
150
151
// Partition assignment and metadata access
152
FlinkKafkaConsumer011<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy);
153
```
154
155
**Usage Examples:**
156
157
```java
158
// Control offset committing behavior
159
consumer.setCommitOffsetsOnCheckpoints(true); // Commit offsets to Kafka on checkpoint
160
161
// Configure watermark generation for event time processing
162
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
163
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
164
.withTimestampAssigner((event, timestamp) -> extractTimestamp(event));
165
166
consumer.assignTimestampsAndWatermarks(watermarkStrategy);
167
168
// Integration with Flink DataStream
169
DataStream<String> kafkaStream = env
170
.addSource(consumer)
171
.name("Kafka Source");
172
```
173
174
## Error Handling
175
176
The consumer integrates with the Flink Kafka exception hierarchy for comprehensive error management.
177
178
```java { .api }
179
// Consumer may throw FlinkKafka011Exception for configuration or runtime errors
180
// Exception handling is typically done at the Flink job level through restart strategies
181
```
182
183
**Configuration for resilience:**
184
185
```java
186
// Configure consumer for resilient operation
187
props.setProperty("session.timeout.ms", "30000");
188
props.setProperty("heartbeat.interval.ms", "10000");
189
props.setProperty("max.poll.interval.ms", "300000");
190
props.setProperty("connections.max.idle.ms", "540000");
191
props.setProperty("request.timeout.ms", "60000");
192
```