0
# Data Stream Consumer
1
2
The FlinkKafkaConsumer010 provides comprehensive functionality for consuming data from Apache Kafka 0.10.x topics with exactly-once processing guarantees, flexible subscription patterns, and advanced features like rate limiting and dynamic partition discovery.
3
4
## Capabilities
5
6
### Single Topic Consumer
7
8
Creates a consumer for a single Kafka topic with value-only deserialization.
9
10
```java { .api }
11
/**
12
* Creates a new Kafka streaming source consumer for Kafka 0.10.x
13
* @param topic The name of the topic that should be consumed
14
* @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and Flink's objects
15
* @param props The properties used to configure the Kafka consumer client
16
*/
17
public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
18
19
/**
20
* Creates a new Kafka streaming source consumer for Kafka 0.10.x with key-value deserialization
21
* @param topic The name of the topic that should be consumed
22
* @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects
23
* @param props The properties used to configure the Kafka consumer client
24
*/
25
public FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);
26
```
27
28
**Usage Examples:**
29
30
```java
31
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
32
import org.apache.flink.api.common.serialization.SimpleStringSchema;
33
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
34
import org.apache.kafka.clients.consumer.ConsumerRecord;
35
36
import java.util.Properties;
37
38
// Simple string deserialization
39
Properties props = new Properties();
40
props.setProperty("bootstrap.servers", "localhost:9092");
41
props.setProperty("group.id", "my-consumer-group");
42
43
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
44
"events-topic",
45
new SimpleStringSchema(),
46
props
47
);
48
49
// Custom key-value deserialization
50
FlinkKafkaConsumer010<MyEvent> eventConsumer = new FlinkKafkaConsumer010<>(
51
"events-topic",
52
new KafkaDeserializationSchema<MyEvent>() {
53
@Override
54
public MyEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
55
return MyEvent.fromJson(new String(record.value()));
56
}
57
58
@Override
59
public boolean isEndOfStream(MyEvent nextElement) {
60
return false;
61
}
62
63
@Override
64
public TypeInformation<MyEvent> getProducedType() {
65
return TypeInformation.of(MyEvent.class);
66
}
67
},
68
props
69
);
70
```
71
72
### Multiple Topics Consumer
73
74
Creates a consumer for multiple Kafka topics specified as a list.
75
76
```java { .api }
77
/**
78
* Creates a new Kafka streaming source consumer for multiple topics
79
* @param topics The Kafka topics to read from
80
* @param deserializer The de-/serializer used to convert between Kafka's byte messages and Flink's objects
81
* @param props The properties that are used to configure both the fetcher and the offset handler
82
*/
83
public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
84
85
/**
86
* Creates a new Kafka streaming source consumer for multiple topics with key-value deserialization
87
* @param topics The Kafka topics to read from
88
* @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects
89
* @param props The properties that are used to configure both the fetcher and the offset handler
90
*/
91
public FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);
92
```
93
94
**Usage Examples:**
95
96
```java
97
import java.util.Arrays;
98
import java.util.List;
99
100
// Multiple topics with simple deserialization
101
List<String> topics = Arrays.asList("topic1", "topic2", "topic3");
102
FlinkKafkaConsumer010<String> multiTopicConsumer = new FlinkKafkaConsumer010<>(
103
topics,
104
new SimpleStringSchema(),
105
props
106
);
107
108
// Multiple topics with JSON deserialization
109
FlinkKafkaConsumer010<JsonNode> jsonConsumer = new FlinkKafkaConsumer010<>(
110
topics,
111
new JsonDeserializationSchema(),
112
props
113
);
114
```
115
116
### Pattern-Based Topic Subscription
117
118
Creates a consumer that subscribes to topics matching a regular expression pattern, with automatic discovery of new matching topics.
119
120
```java { .api }
121
/**
122
* Creates a new Kafka streaming source consumer using pattern-based topic subscription
123
* @param subscriptionPattern The regular expression for a pattern of topic names to subscribe to
124
* @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and Flink's objects
125
* @param props The properties used to configure the Kafka consumer client
126
*/
127
public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
128
129
/**
130
* Creates a new Kafka streaming source consumer using pattern-based topic subscription with key-value deserialization
131
* @param subscriptionPattern The regular expression for a pattern of topic names to subscribe to
132
* @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects
133
* @param props The properties used to configure the Kafka consumer client
134
*/
135
public FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);
136
```
137
138
**Usage Examples:**
139
140
```java
141
import java.util.regex.Pattern;
142
143
// Subscribe to all topics starting with "logs-"
144
Pattern logTopicsPattern = Pattern.compile("logs-.*");
145
FlinkKafkaConsumer010<String> patternConsumer = new FlinkKafkaConsumer010<>(
146
logTopicsPattern,
147
new SimpleStringSchema(),
148
props
149
);
150
151
// Subscribe to topics matching environment-specific pattern
152
Pattern envPattern = Pattern.compile("(prod|staging)-events-.*");
153
FlinkKafkaConsumer010<String> envConsumer = new FlinkKafkaConsumer010<>(
154
envPattern,
155
new SimpleStringSchema(),
156
props
157
);
158
```
159
160
### Rate Limiting
161
162
Configure rate limiting to throttle the number of bytes read from Kafka per second.
163
164
```java { .api }
165
/**
166
* Set rate limiter for throttling bytes read from Kafka
167
* @param kafkaRateLimiter Rate limiter implementation to control consumption rate
168
*/
169
public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);
170
171
/**
172
* Get the configured rate limiter
173
* @return The currently configured rate limiter, or null if none is set
174
*/
175
public FlinkConnectorRateLimiter getRateLimiter();
176
```
177
178
**Usage Examples:**
179
180
```java
181
import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
182
import org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter;
183
184
// Create consumer
185
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
186
"high-volume-topic",
187
new SimpleStringSchema(),
188
props
189
);
190
191
// Set rate limiter to 1MB per second
192
FlinkConnectorRateLimiter rateLimiter = new GuavaFlinkConnectorRateLimiter(1024 * 1024); // 1MB/s
193
consumer.setRateLimiter(rateLimiter);
194
195
// Check current rate limiter
196
FlinkConnectorRateLimiter currentLimiter = consumer.getRateLimiter();
197
```
198
199
## Configuration Properties
200
201
### Consumer-Specific Properties
202
203
```java { .api }
204
// Configuration key for polling timeout
205
public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
206
207
// Default polling timeout in milliseconds
208
public static final long DEFAULT_POLL_TIMEOUT = 100L;
209
```
210
211
### Common Kafka Consumer Properties
212
213
Required properties:
214
- **bootstrap.servers**: Comma-separated list of Kafka broker addresses
215
- **group.id**: Consumer group identifier
216
217
Optional properties:
218
- **flink.poll-timeout**: Time in milliseconds spent waiting in poll if data is not available (default: 100)
219
- **auto.offset.reset**: What to do when there is no initial offset ("earliest", "latest", "none")
220
- **enable.auto.commit**: Whether to automatically commit offsets (should be false for exactly-once)
221
- **max.poll.records**: Maximum number of records returned in a single poll
222
- **fetch.min.bytes**: Minimum amount of data the server should return for a fetch request
223
- **fetch.max.wait.ms**: Maximum time the server will block before responding to fetch request
224
225
### Dynamic Partition Discovery
226
227
Enable automatic discovery of new partitions by setting partition discovery interval:
228
229
```java
230
// Enable partition discovery every 30 seconds
231
props.setProperty("flink.partition-discovery.interval-millis", "30000");
232
```
233
234
## Exactly-Once Processing
235
236
The FlinkKafkaConsumer010 integrates with Flink's checkpointing mechanism to provide exactly-once processing guarantees:
237
238
1. **Checkpointing**: Offsets are stored in Flink's checkpoints, not committed to Kafka
239
2. **Recovery**: On restart, consumer resumes from the last successful checkpoint
240
3. **Commit Strategy**: Offsets are committed to Kafka only for monitoring purposes
241
242
```java
243
// Configure for exactly-once processing
244
props.setProperty("enable.auto.commit", "false");
245
props.setProperty("auto.offset.reset", "earliest");
246
247
// In streaming environment
248
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
249
env.enableCheckpointing(5000); // Checkpoint every 5 seconds
250
```
251
252
## Error Handling
253
254
The consumer handles various error scenarios:
255
256
- **Broker failures**: Automatic reconnection with backoff
257
- **Deserialization errors**: Can be configured to skip or fail
258
- **Partition reassignment**: Automatic handling of partition changes
259
- **Consumer group rebalancing**: Graceful handling of consumer group membership changes
260
261
## Watermarks and Event Time
262
263
The consumer supports event time processing with watermark extraction:
264
265
```java
266
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
267
268
FlinkKafkaConsumer010<MyEvent> consumer = new FlinkKafkaConsumer010<>(...);
269
270
// Assign watermarks and timestamps
271
consumer.assignTimestampsAndWatermarks(
272
WatermarkStrategy.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
273
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
274
);
275
```