0
# Data Consumption
1
2
Streaming data source functionality for consuming from Kafka 0.9.x topics with exactly-once processing guarantees, configurable deserialization, automatic offset management, and fault tolerance through Flink's checkpointing mechanism.
3
4
## Capabilities
5
6
### FlinkKafkaConsumer09 Class
7
8
Main Kafka consumer class for streaming data from Kafka 0.9.x topics into Flink data streams.
9
10
```java { .api }
11
/**
12
* Kafka consumer for streaming data from Apache Kafka 0.9.x topics.
13
* Supports exactly-once processing, fault tolerance, and parallel consumption.
14
*
15
* @param <T> The type of records consumed from Kafka
16
*/
17
@PublicEvolving
18
public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
19
20
/**
21
* Creates a new Kafka consumer for a single topic with value-only deserialization.
22
*
23
* @param topic The name of the topic to consume from
24
* @param valueDeserializer The deserializer for converting byte messages to objects
25
* @param props Kafka consumer properties and configuration
26
*/
27
public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
28
29
/**
30
* Creates a new Kafka consumer for a single topic with key/value deserialization.
31
*
32
* @param topic The name of the topic to consume from
33
* @param deserializer The keyed deserializer for reading key/value pairs, offsets, and topic names
34
* @param props Kafka consumer properties and configuration
35
*/
36
public FlinkKafkaConsumer09(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);
37
38
/**
39
* Creates a new Kafka consumer for multiple topics with value-only deserialization.
40
*
41
* @param topics The list of topic names to consume from
42
* @param deserializer The deserializer for converting byte messages to objects
43
* @param props Kafka consumer properties and configuration
44
*/
45
public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
46
47
/**
48
* Creates a new Kafka consumer for multiple topics with key/value deserialization.
49
*
50
* @param topics The list of topic names to consume from
51
* @param deserializer The keyed deserializer for reading key/value pairs, offsets, and topic names
52
* @param props Kafka consumer properties and configuration
53
*/
54
public FlinkKafkaConsumer09(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);
55
56
/**
57
* Creates a new Kafka consumer for topics matching a pattern with value-only deserialization.
58
* Dynamic topic discovery enabled if partition discovery interval is configured.
59
*
60
* @param subscriptionPattern Regular expression pattern for topic names to subscribe to
61
* @param valueDeserializer The deserializer for converting byte messages to objects
62
* @param props Kafka consumer properties and configuration
63
*/
64
@PublicEvolving
65
public FlinkKafkaConsumer09(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
66
67
/**
68
* Creates a new Kafka consumer for topics matching a pattern with key/value deserialization.
69
* Dynamic topic discovery enabled if partition discovery interval is configured.
70
*
71
* @param subscriptionPattern Regular expression pattern for topic names to subscribe to
72
* @param deserializer The keyed deserializer for reading key/value pairs, offsets, and topic names
73
* @param props Kafka consumer properties and configuration
74
*/
75
@PublicEvolving
76
public FlinkKafkaConsumer09(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);
77
78
/**
79
* Sets a rate limiter to throttle bytes read from Kafka.
80
*
81
* @param kafkaRateLimiter The rate limiter to control consumption rate
82
*/
83
public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);
84
85
/**
86
* Gets the currently configured rate limiter.
87
*
88
* @return The configured rate limiter, or null if none is set
89
*/
90
public FlinkConnectorRateLimiter getRateLimiter();
91
92
/**
93
* Assigns watermarks and timestamp extractors using punctuated watermarks.
94
*
95
* @param assigner The punctuated watermark assigner
96
* @return This consumer instance for method chaining
97
*/
98
public FlinkKafkaConsumer09<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner);
99
100
/**
101
* Assigns watermarks and timestamp extractors using periodic watermarks.
102
*
103
* @param assigner The periodic watermark assigner
104
* @return This consumer instance for method chaining
105
*/
106
public FlinkKafkaConsumer09<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner);
107
108
/**
109
* Configures whether to commit consumed offsets back to Kafka on checkpoints.
110
*
111
* @param commitOnCheckpoints True to enable offset commits on checkpoints
112
* @return This consumer instance for method chaining
113
*/
114
public FlinkKafkaConsumer09<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);
115
116
/**
117
* Configures the consumer to start reading from the earliest available offsets.
118
*
119
* @return This consumer instance for method chaining
120
*/
121
public FlinkKafkaConsumer09<T> setStartFromEarliest();
122
123
/**
124
* Configures the consumer to start reading from the latest available offsets.
125
*
126
* @return This consumer instance for method chaining
127
*/
128
public FlinkKafkaConsumer09<T> setStartFromLatest();
129
130
/**
131
* Configures the consumer to start reading from the consumer group's committed offsets.
132
*
133
* @return This consumer instance for method chaining
134
*/
135
public FlinkKafkaConsumer09<T> setStartFromGroupOffsets();
136
137
/**
138
* Configures the consumer to start reading from specific partition offsets.
139
*
140
* @param specificStartupOffsets Map of partitions to their starting offsets
141
* @return This consumer instance for method chaining
142
*/
143
public FlinkKafkaConsumer09<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);
144
145
/**
146
* Disables filtering of restored partitions with currently subscribed topics.
147
*
148
* @return This consumer instance for method chaining
149
*/
150
public FlinkKafkaConsumer09<T> disableFilterRestoredPartitionsWithSubscribedTopics();
151
}
152
```
153
154
### Configuration Constants
155
156
```java { .api }
157
/** Configuration key to change the polling timeout */
158
public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
159
160
/** Default time in milliseconds spent waiting in poll if data is not available */
161
public static final long DEFAULT_POLL_TIMEOUT = 100L;
162
163
/** The maximum number of pending non-committed checkpoints to track */
164
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
165
166
/** The default interval to execute partition discovery (disabled by default) */
167
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
168
169
/** Boolean configuration key to disable metrics tracking */
170
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
171
172
/** Configuration key to define the consumer's partition discovery interval, in milliseconds */
173
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
174
```
175
176
## Usage Examples
177
178
### Basic Single Topic Consumption
179
180
```java
181
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
182
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
183
import org.apache.flink.api.common.serialization.SimpleStringSchema;
184
import java.util.Properties;
185
186
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
187
188
Properties properties = new Properties();
189
properties.setProperty("bootstrap.servers", "localhost:9092");
190
properties.setProperty("group.id", "my-consumer-group");
191
192
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
193
"my-topic",
194
new SimpleStringSchema(),
195
properties
196
);
197
198
env.addSource(consumer)
199
.print();
200
201
env.execute("Basic Kafka Consumer");
202
```
203
204
### Multiple Topics Consumption
205
206
```java
207
import java.util.Arrays;
208
209
Properties properties = new Properties();
210
properties.setProperty("bootstrap.servers", "localhost:9092");
211
properties.setProperty("group.id", "multi-topic-group");
212
213
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
214
Arrays.asList("topic-1", "topic-2", "topic-3"),
215
new SimpleStringSchema(),
216
properties
217
);
218
219
env.addSource(consumer)
220
.map(value -> "Processed: " + value)
221
.print();
222
```
223
224
### Pattern-Based Topic Subscription
225
226
```java
227
import java.util.regex.Pattern;
228
229
Properties properties = new Properties();
230
properties.setProperty("bootstrap.servers", "localhost:9092");
231
properties.setProperty("group.id", "pattern-group");
232
// Enable dynamic topic discovery
233
properties.setProperty("flink.partition-discovery.interval-millis", "30000");
234
235
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
236
Pattern.compile("log-topic-.*"),
237
new SimpleStringSchema(),
238
properties
239
);
240
241
env.addSource(consumer)
242
.filter(value -> value.contains("ERROR"))
243
.print();
244
```
245
246
### Key/Value Deserialization
247
248
```java
249
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
250
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
251
import org.apache.flink.api.common.typeinfo.TypeInformation;
252
import org.apache.kafka.clients.consumer.ConsumerRecord;
253
254
// Custom keyed deserializer for accessing keys, values, and metadata
255
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
256
"keyed-topic",
257
new KafkaDeserializationSchema<String>() {
258
@Override
259
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
260
String key = record.key() != null ? new String(record.key()) : "null";
261
String value = record.value() != null ? new String(record.value()) : "null";
262
return String.format("Key: %s, Value: %s, Partition: %d, Offset: %d",
263
key, value, record.partition(), record.offset());
264
}
265
266
@Override
267
public boolean isEndOfStream(String nextElement) {
268
return false;
269
}
270
271
@Override
272
public TypeInformation<String> getProducedType() {
273
return TypeInformation.of(String.class);
274
}
275
},
276
properties
277
);
278
```
279
280
### Rate Limited Consumption
281
282
```java
283
import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
284
import org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter;
285
286
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
287
"high-volume-topic",
288
new SimpleStringSchema(),
289
properties
290
);
291
292
// Limit consumption to 1000 bytes per second
293
consumer.setRateLimiter(new GuavaFlinkConnectorRateLimiter(1000.0));
294
295
env.addSource(consumer)
296
.print();
297
```
298
299
### Advanced Configuration
300
301
```java
302
Properties properties = new Properties();
303
// Required Kafka settings
304
properties.setProperty("bootstrap.servers", "kafka-broker-1:9092,kafka-broker-2:9092");
305
properties.setProperty("group.id", "advanced-consumer-group");
306
307
// Offset management
308
properties.setProperty("auto.offset.reset", "earliest"); // or "latest"
309
properties.setProperty("enable.auto.commit", "false"); // Flink manages commits
310
311
// Performance tuning
312
properties.setProperty("fetch.min.bytes", "1024");
313
properties.setProperty("fetch.max.wait.ms", "500");
314
properties.setProperty("max.partition.fetch.bytes", "1048576");
315
316
// Custom polling timeout
317
properties.setProperty("flink.poll-timeout", "100");
318
319
// Enable partition discovery for dynamic topics
320
properties.setProperty("flink.partition-discovery.interval-millis", "30000");
321
322
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
323
"configured-topic",
324
new SimpleStringSchema(),
325
properties
326
);
327
328
// Configure watermark generation for event time processing
329
consumer.assignTimestampsAndWatermarks(
330
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
331
.withTimestampAssigner((event, timestamp) -> System.currentTimeMillis())
332
);
333
```
334
335
## Error Handling
336
337
The FlinkKafkaConsumer09 integrates with Flink's fault tolerance mechanisms:
338
339
- **Checkpointing**: Automatically saves consumer offsets during checkpoints
340
- **Recovery**: Restores from last successful checkpoint on failure
341
- **Exactly-once**: Guarantees no data loss or duplication when checkpointing is enabled
342
- **Parallelism**: Each parallel instance consumes from assigned partitions independently
343
344
Common exceptions:
345
- `IllegalArgumentException`: Invalid configuration or parameters
346
- `IOException`: Network or serialization issues
347
- `RuntimeException`: Kafka client errors or partition assignment failures