0
# Kafka Consumer API
1
2
Comprehensive API reference for Kafka 0.8 consumer classes in the Apache Flink Kafka Connector.
3
4
## FlinkKafkaConsumer08<T>
5
6
**Package:** `org.apache.flink.streaming.connectors.kafka`
7
**Annotations:** `@PublicEvolving`
8
**Extends:** `FlinkKafkaConsumerBase<T>`
9
**Description:** The main Kafka consumer for Apache Kafka 0.8.x, providing streaming data source capabilities with exactly-once processing guarantees through Flink's checkpointing mechanism.
10
11
### Class Declaration
12
13
```java { .api }
14
@PublicEvolving
15
public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T>
16
```
17
18
### Constants
19
20
```java { .api }
21
// Serialization identifier (from FlinkKafkaConsumer08)
22
private static final long serialVersionUID = -6272159445203409112L;
23
24
// Configuration keys (from FlinkKafkaConsumer08)
25
public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
26
public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
27
28
// Inherited constants from FlinkKafkaConsumerBase
29
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
30
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
31
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
32
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
33
```
34
35
### Constructors
36
37
#### Single Topic Constructors
38
39
```java { .api }
40
/**
41
* Creates a consumer for a single topic with DeserializationSchema.
42
*
43
* @param topic The Kafka topic to read from
44
* @param valueDeserializer The deserializer used to convert raw bytes to data objects
45
* @param props Configuration properties for Kafka consumer and ZooKeeper client
46
*/
47
public FlinkKafkaConsumer08(String topic,
48
DeserializationSchema<T> valueDeserializer,
49
Properties props)
50
51
/**
52
* Creates a consumer for a single topic with KafkaDeserializationSchema.
53
*
54
* @param topic The Kafka topic to read from
55
* @param deserializer The deserializer used to convert Kafka ConsumerRecord to data objects
56
* @param props Configuration properties for Kafka consumer and ZooKeeper client
57
*/
58
public FlinkKafkaConsumer08(String topic,
59
KafkaDeserializationSchema<T> deserializer,
60
Properties props)
61
```
62
63
#### Multi-Topic Constructors
64
65
```java { .api }
66
/**
67
* Creates a consumer for multiple topics with DeserializationSchema.
68
*
69
* @param topics List of Kafka topics to read from
70
* @param deserializer The deserializer used to convert raw bytes to data objects
71
* @param props Configuration properties for Kafka consumer and ZooKeeper client
72
*/
73
public FlinkKafkaConsumer08(List<String> topics,
74
DeserializationSchema<T> deserializer,
75
Properties props)
76
77
/**
78
* Creates a consumer for multiple topics with KafkaDeserializationSchema.
79
*
80
* @param topics List of Kafka topics to read from
81
* @param deserializer The deserializer used to convert Kafka ConsumerRecord to data objects
82
* @param props Configuration properties for Kafka consumer and ZooKeeper client
83
*/
84
public FlinkKafkaConsumer08(List<String> topics,
85
KafkaDeserializationSchema<T> deserializer,
86
Properties props)
87
```
88
89
#### Pattern Subscription Constructors
90
91
```java { .api }
92
/**
93
* Creates a consumer with pattern-based topic subscription and DeserializationSchema.
94
* Enables dynamic discovery of topics matching the pattern.
95
*
96
* @param subscriptionPattern Regex pattern for topic names to subscribe to
97
* @param valueDeserializer The deserializer used to convert raw bytes to data objects
98
* @param props Configuration properties for Kafka consumer and ZooKeeper client
99
*/
100
@PublicEvolving
101
public FlinkKafkaConsumer08(Pattern subscriptionPattern,
102
DeserializationSchema<T> valueDeserializer,
103
Properties props)
104
105
/**
106
* Creates a consumer with pattern-based topic subscription and KafkaDeserializationSchema.
107
* Enables dynamic discovery of topics matching the pattern.
108
*
109
* @param subscriptionPattern Regex pattern for topic names to subscribe to
110
* @param deserializer The deserializer used to convert Kafka ConsumerRecord to data objects
111
* @param props Configuration properties for Kafka consumer and ZooKeeper client
112
*/
113
@PublicEvolving
114
public FlinkKafkaConsumer08(Pattern subscriptionPattern,
115
KafkaDeserializationSchema<T> deserializer,
116
Properties props)
117
```
118
119
### Configuration Methods (Inherited from FlinkKafkaConsumerBase)
120
121
#### Startup Position Configuration
122
123
```java { .api }
124
/**
125
* Configures the consumer to start reading from the earliest available offset.
126
*
127
* @return This consumer instance for method chaining
128
*/
129
public FlinkKafkaConsumerBase<T> setStartFromEarliest()
130
131
/**
132
* Configures the consumer to start reading from the latest available offset.
133
*
134
* @return This consumer instance for method chaining
135
*/
136
public FlinkKafkaConsumerBase<T> setStartFromLatest()
137
138
/**
139
* Configures the consumer to start reading from the consumer group's committed offsets.
140
* This is the default behavior.
141
*
142
* @return This consumer instance for method chaining
143
*/
144
public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets()
145
146
/**
147
* Configures the consumer to start reading from specific offsets per partition.
148
*
149
* @param specificStartupOffsets Map of partition to offset mappings
150
* @return This consumer instance for method chaining
151
*/
152
public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)
153
```
154
155
#### Watermark and Timestamp Assignment
156
157
```java { .api }
158
/**
159
* Assigns a timestamp assigner and watermark generator for punctuated watermarks.
160
*
161
* @param assigner The assigner that generates punctuated watermarks
162
* @return This consumer instance for method chaining
163
*/
164
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner)
165
166
/**
167
* Assigns a timestamp assigner and watermark generator for periodic watermarks.
168
*
169
* @param assigner The assigner that generates periodic watermarks
170
* @return This consumer instance for method chaining
171
*/
172
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner)
173
```
174
175
#### Checkpointing and Offset Management
176
177
```java { .api }
178
/**
179
* Configures whether the consumer should commit offsets back to Kafka on checkpoints.
180
* This is necessary for exactly-once processing guarantees.
181
*
182
* @param commitOnCheckpoints Whether to commit offsets on checkpoints (default: true)
183
* @return This consumer instance for method chaining
184
*/
185
public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints)
186
187
/**
188
* Disables the filtering of restored partitions based on subscribed topics.
189
* By default, only partitions of subscribed topics are restored.
190
*
191
* @return This consumer instance for method chaining
192
*/
193
public FlinkKafkaConsumerBase<T> disableFilterRestoredPartitionsWithSubscribedTopics()
194
```
195
196
### Lifecycle Methods (Inherited from FlinkKafkaConsumerBase)
197
198
```java { .api }
199
/**
200
* Opens the consumer and initializes resources.
201
*
202
* @param configuration The task configuration
203
* @throws Exception If initialization fails
204
*/
205
public void open(Configuration configuration) throws Exception
206
207
/**
208
* Main execution method that reads from Kafka and emits records.
209
*
210
* @param sourceContext The source context for emitting records
211
* @throws Exception If execution fails
212
*/
213
public void run(SourceContext<T> sourceContext) throws Exception
214
215
/**
216
* Cancels the consumer operation and stops reading.
217
*/
218
public void cancel()
219
220
/**
221
* Closes the consumer and cleans up resources.
222
*
223
* @throws Exception If cleanup fails
224
*/
225
public void close() throws Exception
226
227
/**
228
* Returns the type information for the produced data type.
229
*
230
* @return Type information for T
231
*/
232
public TypeInformation<T> getProducedType()
233
```
234
235
### Checkpointing Interface Methods (Inherited)
236
237
```java { .api }
238
/**
239
* Initializes the state of the function from a checkpoint.
240
*
241
* @param context The initialization context
242
* @throws Exception If state initialization fails
243
*/
244
public final void initializeState(FunctionInitializationContext context) throws Exception
245
246
/**
247
* Snapshots the function's state during checkpointing.
248
*
249
* @param context The snapshot context
250
* @throws Exception If state snapshotting fails
251
*/
252
public final void snapshotState(FunctionSnapshotContext context) throws Exception
253
254
/**
255
* Notifies the function that a checkpoint has been completed.
256
*
257
* @param checkpointId The ID of the completed checkpoint
258
* @throws Exception If notification processing fails
259
*/
260
public final void notifyCheckpointComplete(long checkpointId) throws Exception
261
```
262
263
### Static Validation Methods
264
265
```java { .api }
266
/**
267
* Validate the ZooKeeper configuration, checking for required parameters.
268
*
269
* @param props Properties to check
270
* @throws IllegalArgumentException if required properties are missing or invalid
271
*/
272
protected static void validateZooKeeperConfig(Properties props)
273
```
274
275
### Usage Examples
276
277
#### Basic Consumer Setup
278
279
```java { .api }
280
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
281
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
282
import org.apache.flink.api.common.serialization.SimpleStringSchema;
283
284
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
285
286
Properties properties = new Properties();
287
properties.setProperty("zookeeper.connect", "localhost:2181");
288
properties.setProperty("group.id", "my-consumer-group");
289
properties.setProperty("auto.offset.reset", "earliest");
290
291
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
292
"my-topic",
293
new SimpleStringSchema(),
294
properties
295
);
296
297
env.addSource(consumer).print();
298
```
299
300
#### Multi-Topic Consumer with Custom Deserialization
301
302
```java { .api }
303
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
304
import org.apache.flink.api.common.typeinfo.TypeInformation;
305
import org.apache.kafka.clients.consumer.ConsumerRecord;
306
307
List<String> topics = Arrays.asList("topic1", "topic2", "topic3");
308
309
KafkaDeserializationSchema<MyEvent> schema = new KafkaDeserializationSchema<MyEvent>() {
310
@Override
311
public MyEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
312
return MyEvent.fromJson(new String(record.value()));
313
}
314
315
@Override
316
public boolean isEndOfStream(MyEvent nextElement) {
317
return nextElement.isEndMarker();
318
}
319
320
@Override
321
public TypeInformation<MyEvent> getProducedType() {
322
return TypeInformation.of(MyEvent.class);
323
}
324
};
325
326
FlinkKafkaConsumer08<MyEvent> consumer = new FlinkKafkaConsumer08<>(
327
topics, schema, properties);
328
```
329
330
#### Pattern-Based Topic Subscription with Watermarks
331
332
```java { .api }
333
import org.apache.flink.streaming.api.functions.timestamps.AssignerWithPeriodicWatermarks;
334
import org.apache.flink.streaming.api.watermark.Watermark;
335
import java.util.regex.Pattern;
336
337
Pattern topicPattern = Pattern.compile("metrics-.*");
338
339
FlinkKafkaConsumer08<MyMetric> consumer = new FlinkKafkaConsumer08<>(
340
topicPattern,
341
new MyMetricDeserializer(),
342
properties
343
);
344
345
consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<MyMetric>() {
346
private long currentMaxTimestamp = Long.MIN_VALUE;
347
348
@Override
349
public long extractTimestamp(MyMetric element, long previousElementTimestamp) {
350
long timestamp = element.getTimestamp();
351
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
352
return timestamp;
353
}
354
355
@Override
356
public Watermark getCurrentWatermark() {
357
return new Watermark(currentMaxTimestamp - 5000); // 5 second delay
358
}
359
});
360
361
consumer.setStartFromLatest()
362
.setCommitOffsetsOnCheckpoints(true);
363
```
364
365
#### Specific Offset Configuration
366
367
```java { .api }
368
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
369
370
Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();
371
specificStartupOffsets.put(new KafkaTopicPartition("my-topic", 0), 12345L);
372
specificStartupOffsets.put(new KafkaTopicPartition("my-topic", 1), 67890L);
373
specificStartupOffsets.put(new KafkaTopicPartition("my-topic", 2), 54321L);
374
375
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
376
"my-topic", new SimpleStringSchema(), properties);
377
378
consumer.setStartFromSpecificOffsets(specificStartupOffsets)
379
.setCommitOffsetsOnCheckpoints(true);
380
```
381
382
## Deprecated Consumer Classes
383
384
### FlinkKafkaConsumer081<T>
385
386
```java { .api }
387
@Deprecated
388
public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer08<T>
389
390
/**
391
* @deprecated Use FlinkKafkaConsumer08 instead
392
*/
393
@Deprecated
394
public FlinkKafkaConsumer081(String topic,
395
DeserializationSchema<T> valueDeserializer,
396
Properties props)
397
```
398
399
### FlinkKafkaConsumer082<T>
400
401
```java { .api }
402
@Deprecated
403
public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer08<T>
404
405
/**
406
* @deprecated Use FlinkKafkaConsumer08 instead
407
*/
408
@Deprecated
409
public FlinkKafkaConsumer082(String topic,
410
DeserializationSchema<T> valueDeserializer,
411
Properties props)
412
```
413
414
## KafkaDeserializationSchema<T> Interface
415
416
**Package:** `org.apache.flink.streaming.connectors.kafka`
417
**Annotations:** `@PublicEvolving`
418
**Extends:** `Serializable, ResultTypeQueryable<T>`
419
420
```java { .api }
421
@PublicEvolving
422
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
423
424
/**
425
* Method to decide whether the element signals the end of the stream.
426
* If this returns true, the element won't be emitted.
427
*
428
* @param nextElement The element to test for end-of-stream signal
429
* @return True if the element signals end of stream, false otherwise
430
*/
431
boolean isEndOfStream(T nextElement);
432
433
/**
434
* Deserializes the Kafka record.
435
*
436
* @param record The ConsumerRecord from Kafka to deserialize
437
* @return The deserialized message as an object of type T
438
* @throws Exception If deserialization fails
439
*/
440
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
441
}
442
```
443
444
### Custom KafkaDeserializationSchema Example
445
446
```java { .api }
447
public class CustomEventDeserializer implements KafkaDeserializationSchema<CustomEvent> {
448
449
@Override
450
public CustomEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
451
// Access all ConsumerRecord information
452
String topic = record.topic();
453
int partition = record.partition();
454
long offset = record.offset();
455
long timestamp = record.timestamp();
456
byte[] key = record.key();
457
byte[] value = record.value();
458
459
// Custom deserialization logic
460
CustomEvent event = parseEvent(value);
461
event.setMetadata(topic, partition, offset, timestamp);
462
463
return event;
464
}
465
466
@Override
467
public boolean isEndOfStream(CustomEvent nextElement) {
468
// Check for end-of-stream marker
469
return nextElement != null && nextElement.getEventType().equals("END_STREAM");
470
}
471
472
@Override
473
public TypeInformation<CustomEvent> getProducedType() {
474
return TypeInformation.of(CustomEvent.class);
475
}
476
477
private CustomEvent parseEvent(byte[] data) throws Exception {
478
// Implementation specific parsing logic
479
return CustomEvent.fromBytes(data);
480
}
481
}
482
```
483
484
## Configuration Properties
485
486
### Required Properties
487
488
```java { .api }
489
Properties props = new Properties();
490
491
// Required: ZooKeeper connection for metadata
492
props.setProperty("zookeeper.connect", "localhost:2181");
493
494
// Required: Consumer group ID
495
props.setProperty("group.id", "my-consumer-group");
496
```
497
498
### Optional Consumer Properties
499
500
```java { .api }
501
// Offset reset behavior when no committed offset exists
502
props.setProperty("auto.offset.reset", "earliest"); // or "latest"
503
504
// Socket and network settings
505
props.setProperty("socket.timeout.ms", "30000");
506
props.setProperty("socket.receive.buffer.bytes", "65536");
507
508
// Fetch settings
509
props.setProperty("fetch.message.max.bytes", "1048576");
510
props.setProperty("fetch.wait.max.ms", "100");
511
512
// ZooKeeper settings
513
props.setProperty("zookeeper.session.timeout.ms", "6000");
514
props.setProperty("zookeeper.connection.timeout.ms", "6000");
515
516
// Auto-commit settings (recommend disabling for exactly-once)
517
props.setProperty("auto.commit.enable", "false");
518
props.setProperty("auto.commit.interval.ms", "60000");
519
```
520
521
### Flink-Specific Properties
522
523
```java { .api }
524
// Partition discovery interval (milliseconds)
525
props.setProperty("flink.partition-discovery.interval-millis", "30000");
526
527
// Disable metrics collection
528
props.setProperty("flink.disable-metrics", "false");
529
530
// Retry configuration for partition discovery
531
props.setProperty("flink.get-partitions.retry", "3");
532
```