0
# Kafka Consumer
1
2
The Flink Kafka consumer provides reliable message consumption from Kafka 0.8.x topics with exactly-once processing guarantees through Flink's checkpointing mechanism.
3
4
## Capabilities
5
6
### FlinkKafkaConsumer08
7
8
The primary consumer class for Kafka 0.8.x integration with comprehensive configuration options.
9
10
```java { .api }
11
/**
12
* Kafka consumer for Apache Kafka 0.8.x with exactly-once processing guarantees
13
*/
14
public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
15
16
/** Configuration key for partition retrieval retries */
17
public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
18
19
/** Default number of partition retrieval retries */
20
public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
21
22
/**
23
* Creates consumer for single topic with value-only deserialization
24
* @param topic Kafka topic name
25
* @param valueDeserializer Deserialization schema for message values
26
* @param props Kafka consumer properties
27
*/
28
public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
29
30
/**
31
* Creates consumer for single topic with key-value deserialization
32
* @param topic Kafka topic name
33
* @param deserializer Keyed deserialization schema for both keys and values
34
* @param props Kafka consumer properties
35
*/
36
public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props);
37
38
/**
39
* Creates consumer for multiple topics with value-only deserialization
40
* @param topics List of Kafka topic names
41
* @param deserializer Deserialization schema for message values
42
* @param props Kafka consumer properties
43
*/
44
public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
45
46
/**
47
* Creates consumer for multiple topics with key-value deserialization
48
* @param topics List of Kafka topic names
49
* @param deserializer Keyed deserialization schema for both keys and values
50
* @param props Kafka consumer properties
51
*/
52
public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props);
53
54
/**
55
* Gets partition information for specified topics
56
* @param topics List of topic names to analyze
57
* @param properties Kafka connection properties
58
* @return List of partition leaders for the topics
59
*/
60
public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties);
61
}
62
```
63
64
**Usage Examples:**
65
66
```java
67
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
68
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
69
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
70
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
71
72
import java.util.Properties;
73
import java.util.Arrays;
74
75
// Basic single topic consumer
76
Properties props = new Properties();
77
props.setProperty("bootstrap.servers", "localhost:9092");
78
props.setProperty("zookeeper.connect", "localhost:2181");
79
props.setProperty("group.id", "my-consumer-group");
80
81
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
82
"my-topic",
83
new SimpleStringSchema(),
84
props
85
);
86
87
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
88
DataStream<String> stream = env.addSource(consumer);
89
90
// Multi-topic consumer with JSON deserialization
91
FlinkKafkaConsumer08<ObjectNode> jsonConsumer = new FlinkKafkaConsumer08<>(
92
Arrays.asList("topic1", "topic2", "topic3"),
93
new JSONKeyValueDeserializationSchema(false), // false = include metadata
94
props
95
);
96
97
DataStream<ObjectNode> jsonStream = env.addSource(jsonConsumer);
98
99
// Consumer with custom offset reset
100
Properties customProps = new Properties();
101
customProps.setProperty("bootstrap.servers", "localhost:9092");
102
customProps.setProperty("zookeeper.connect", "localhost:2181");
103
customProps.setProperty("group.id", "custom-group");
104
customProps.setProperty("auto.offset.reset", "earliest");
105
106
FlinkKafkaConsumer08<String> earliestConsumer = new FlinkKafkaConsumer08<>(
107
"events-topic",
108
new SimpleStringSchema(),
109
customProps
110
);
111
```
112
113
### FlinkKafkaConsumer081 (Deprecated)
114
115
Deprecated alias that redirects to FlinkKafkaConsumer08.
116
117
```java { .api }
118
/**
119
* @deprecated Use FlinkKafkaConsumer08 instead
120
*/
121
@Deprecated
122
public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer08<T> {
123
/**
124
* @deprecated Use FlinkKafkaConsumer08 constructor instead
125
*/
126
@Deprecated
127
public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
128
}
129
```
130
131
### FlinkKafkaConsumer082 (Deprecated)
132
133
Deprecated alias that redirects to FlinkKafkaConsumer08.
134
135
```java { .api }
136
/**
137
* @deprecated Use FlinkKafkaConsumer08 instead
138
*/
139
@Deprecated
140
public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer08<T> {
141
/**
142
* @deprecated Use FlinkKafkaConsumer08 constructor instead
143
*/
144
@Deprecated
145
public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
146
}
147
```
148
149
## Configuration
150
151
### Required Properties
152
153
- **bootstrap.servers**: Comma-separated list of Kafka broker addresses
154
- **zookeeper.connect**: ZooKeeper connection string for offset management
155
- **group.id**: Consumer group identifier for offset coordination
156
157
### Optional Properties
158
159
- **auto.offset.reset**: Strategy when no initial offset (`earliest`, `latest`)
160
- **enable.auto.commit**: Whether to automatically commit offsets (should be false for exactly-once)
161
- **session.timeout.ms**: Session timeout for consumer group coordination
162
- **heartbeat.interval.ms**: Heartbeat interval for group membership
163
- **max.poll.records**: Maximum records returned in single poll
164
- **flink.get-partitions.retry**: Number of retries for partition discovery (default: 3)
165
166
## Checkpointing and Fault Tolerance
167
168
The consumer integrates with Flink's checkpointing mechanism:
169
170
```java
171
// Enable checkpointing for exactly-once guarantees
172
env.enableCheckpointing(5000); // checkpoint every 5 seconds
173
174
// Consumer automatically participates in checkpointing
175
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
176
"my-topic",
177
new SimpleStringSchema(),
178
props
179
);
180
181
// Starting position can be configured
182
consumer.setStartFromEarliest(); // start from earliest available
183
consumer.setStartFromLatest(); // start from latest (default)
184
consumer.setStartFromGroupOffsets(); // start from committed group offsets
185
consumer.setStartFromTimestamp(timestamp); // start from specific timestamp
186
```
187
188
## Error Handling
189
190
Common exceptions and handling strategies:
191
192
- **IllegalArgumentException**: Invalid topic names or properties
193
- **RuntimeException**: Kafka connection or ZooKeeper issues
194
- **SerializationException**: Deserialization failures
195
196
```java
197
// Proper error handling setup
198
try {
199
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
200
"my-topic",
201
new SimpleStringSchema(),
202
props
203
);
204
205
env.addSource(consumer);
206
env.execute("Kafka Consumer Job");
207
} catch (Exception e) {
208
// Handle consumer setup or execution errors
209
logger.error("Kafka consumer failed", e);
210
}
211
```