0
# Consumer Base Classes
1
2
Abstract base implementations for Kafka consumers that provide common functionality across all Kafka versions. These classes handle offset management, checkpointing, watermark assignment, and state management while delegating version-specific operations to concrete implementations.
3
4
## Capabilities
5
6
### FlinkKafkaConsumerBase
7
8
The core abstract base class that all Flink Kafka consumers extend. Provides comprehensive functionality for consuming from Kafka topics with exactly-once processing guarantees.
9
10
```java { .api }
11
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
12
implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {
13
14
public FlinkKafkaConsumerBase(
15
List<String> topics,
16
Pattern topicPattern,
17
KeyedDeserializationSchema<T> deserializer,
18
long discoveryIntervalMillis,
19
boolean useMetrics
20
);
21
}
22
```
23
24
**Parameters:**
25
- `topics` - List of Kafka topics to consume from (null if using pattern)
26
- `topicPattern` - Regex pattern for topic subscription (null if using explicit topics)
27
- `deserializer` - Schema for deserializing Kafka messages
28
- `discoveryIntervalMillis` - Interval for partition discovery (use `PARTITION_DISCOVERY_DISABLED` to disable)
29
- `useMetrics` - Whether to expose Kafka consumer metrics
30
31
### Startup Mode Configuration
32
33
Configure where the consumer starts reading from Kafka topics.
34
35
```java { .api }
36
public FlinkKafkaConsumerBase<T> setStartFromEarliest();
37
public FlinkKafkaConsumerBase<T> setStartFromLatest();
38
public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets();
39
public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);
40
```
41
42
**Usage Examples:**
43
44
```java
45
// Start from earliest available messages
46
consumer.setStartFromEarliest();
47
48
// Start from latest messages (skip existing)
49
consumer.setStartFromLatest();
50
51
// Start from committed group offsets (default)
52
consumer.setStartFromGroupOffsets();
53
54
// Start from specific offsets
55
Map<KafkaTopicPartition, Long> offsets = new HashMap<>();
56
offsets.put(new KafkaTopicPartition("my-topic", 0), 12345L);
57
offsets.put(new KafkaTopicPartition("my-topic", 1), 67890L);
58
consumer.setStartFromSpecificOffsets(offsets);
59
```
60
61
### Offset Management
62
63
Control how and when offsets are committed back to Kafka.
64
65
```java { .api }
66
public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);
67
```
68
69
**Parameters:**
70
- `commitOnCheckpoints` - If true, offsets are committed only on successful checkpoints (recommended for exactly-once)
71
72
**Usage Example:**
73
74
```java
75
// Enable offset commits on checkpoints for exactly-once processing
76
consumer.setCommitOffsetsOnCheckpoints(true);
77
```
78
79
### Watermark Assignment
80
81
Assign timestamps and watermarks for event-time processing.
82
83
```java { .api }
84
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner);
85
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner);
86
```
87
88
**Parameters:**
89
- `assigner` - Watermark assigner for extracting timestamps and generating watermarks
90
91
**Usage Examples:**
92
93
```java
94
// Periodic watermarks with bounded out-of-orderness
95
consumer.assignTimestampsAndWatermarks(
96
new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
97
@Override
98
public long extractTimestamp(MyEvent element) {
99
return element.getTimestamp();
100
}
101
}
102
);
103
104
// Punctuated watermarks based on special marker records
105
consumer.assignTimestampsAndWatermarks(new PunctuatedWatermarkAssigner<MyEvent>() {
106
@Override
107
public long extractTimestamp(MyEvent element, long recordTimestamp) {
108
return element.getTimestamp();
109
}
110
111
@Override
112
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
113
return element.isWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
114
}
115
});
116
```
117
118
### Type Information
119
120
Provide type information for the deserialized elements.
121
122
```java { .api }
123
public TypeInformation<T> getProducedType();
124
```
125
126
**Returns:** Type information for elements produced by this consumer
127
128
### State Management
129
130
Handle checkpointing and state recovery (implemented by the framework).
131
132
```java { .api }
133
public void initializeState(FunctionInitializationContext context) throws Exception;
134
public void snapshotState(FunctionSnapshotContext context) throws Exception;
135
public void notifyCheckpointComplete(long checkpointId) throws Exception;
136
```
137
138
These methods are called by the Flink runtime for checkpoint coordination and should not be called directly by user code.
139
140
### Lifecycle Methods
141
142
Source function lifecycle management (called by Flink runtime).
143
144
```java { .api }
145
public void open(Configuration configuration) throws Exception;
146
public void run(SourceContext<T> sourceContext) throws Exception;
147
public void cancel();
148
public void close() throws Exception;
149
```
150
151
**Methods:**
152
- `open()` - Initialize the consumer (called once per parallel instance)
153
- Sets up offset commit mode, metrics, and other configuration
154
- Called before `run()` method
155
156
- `run()` - Main execution method (runs in dedicated thread)
157
- Discovers partitions and initializes fetchers
158
- Runs the main consumption loop until cancelled
159
- Handles partition discovery and consumer coordination
160
161
- `cancel()` - Stop the consumer gracefully
162
- Sets running flag to false to exit consumption loop
163
- Interrupts discovery thread if running
164
- Called by Flink when job is cancelled
165
166
- `close()` - Clean up resources (called after cancellation)
167
- Closes fetchers and releases resources
168
- Called by Flink runtime during shutdown
169
170
**Usage Note:** These lifecycle methods are managed by the Flink runtime and should not be called directly by user code.
171
172
### Abstract Methods
173
174
Concrete implementations must implement these version-specific methods:
175
176
```java { .api }
177
protected abstract AbstractFetcher<T, ?> createFetcher(
178
SourceContext<T> sourceContext,
179
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
180
SerializedValue<AssignerWithTimestamps<T>> watermarksPeriodic,
181
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
182
StreamingRuntimeContext runtimeContext,
183
OffsetCommitMode offsetCommitMode,
184
MetricGroup consumerMetricGroup,
185
boolean useMetrics
186
) throws Exception;
187
188
protected abstract AbstractPartitionDiscoverer createPartitionDiscoverer(
189
KafkaTopicsDescriptor topicsDescriptor,
190
int indexOfThisSubtask,
191
int numParallelSubtasks
192
);
193
194
protected abstract boolean getIsAutoCommitEnabled();
195
```
196
197
## Configuration Constants
198
199
```java { .api }
200
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
201
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
202
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
203
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
204
```
205
206
- `MAX_NUM_PENDING_CHECKPOINTS` - Maximum number of pending checkpoints to track
207
- `PARTITION_DISCOVERY_DISABLED` - Use this value to disable automatic partition discovery
208
- `KEY_DISABLE_METRICS` - Configuration key for disabling Kafka metrics collection
209
- `KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS` - Configuration key for partition discovery interval