Abstract base implementations for Kafka consumers that provide common functionality across all Kafka versions. These classes handle offset management, checkpointing, watermark assignment, and state management while delegating version-specific operations to concrete implementations.
The core abstract base class that all Flink Kafka consumers extend. Provides comprehensive functionality for consuming from Kafka topics with exactly-once processing guarantees.
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {
public FlinkKafkaConsumerBase(
List<String> topics,
Pattern topicPattern,
KeyedDeserializationSchema<T> deserializer,
long discoveryIntervalMillis,
boolean useMetrics
);
}Parameters:
topics - List of Kafka topics to consume from (null if using pattern)topicPattern - Regex pattern for topic subscription (null if using explicit topics)deserializer - Schema for deserializing Kafka messagesdiscoveryIntervalMillis - Interval for partition discovery (use PARTITION_DISCOVERY_DISABLED to disable)useMetrics - Whether to expose Kafka consumer metricsConfigure where the consumer starts reading from Kafka topics.
public FlinkKafkaConsumerBase<T> setStartFromEarliest();
public FlinkKafkaConsumerBase<T> setStartFromLatest();
public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets();
public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);Usage Examples:
// Start from earliest available messages
consumer.setStartFromEarliest();
// Start from latest messages (skip existing)
consumer.setStartFromLatest();
// Start from committed group offsets (default)
consumer.setStartFromGroupOffsets();
// Start from specific offsets
Map<KafkaTopicPartition, Long> offsets = new HashMap<>();
offsets.put(new KafkaTopicPartition("my-topic", 0), 12345L);
offsets.put(new KafkaTopicPartition("my-topic", 1), 67890L);
consumer.setStartFromSpecificOffsets(offsets);Control how and when offsets are committed back to Kafka.
public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);Parameters:
commitOnCheckpoints - If true, offsets are committed only on successful checkpoints (recommended for exactly-once)Usage Example:
// Enable offset commits on checkpoints for exactly-once processing
consumer.setCommitOffsetsOnCheckpoints(true);Assign timestamps and watermarks for event-time processing.
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner);
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner);Parameters:
assigner - Watermark assigner for extracting timestamps and generating watermarksUsage Examples:
// Periodic watermarks with bounded out-of-orderness
consumer.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
@Override
public long extractTimestamp(MyEvent element) {
return element.getTimestamp();
}
}
);
// Punctuated watermarks based on special marker records
consumer.assignTimestampsAndWatermarks(new PunctuatedWatermarkAssigner<MyEvent>() {
@Override
public long extractTimestamp(MyEvent element, long recordTimestamp) {
return element.getTimestamp();
}
@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
return element.isWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}
});Provide type information for the deserialized elements.
public TypeInformation<T> getProducedType();Returns: Type information for elements produced by this consumer
Handle checkpointing and state recovery (implemented by the framework).
public void initializeState(FunctionInitializationContext context) throws Exception;
public void snapshotState(FunctionSnapshotContext context) throws Exception;
public void notifyCheckpointComplete(long checkpointId) throws Exception;These methods are called by the Flink runtime for checkpoint coordination and should not be called directly by user code.
Source function lifecycle management (called by Flink runtime).
public void open(Configuration configuration) throws Exception;
public void run(SourceContext<T> sourceContext) throws Exception;
public void cancel();
public void close() throws Exception;Methods:
open() - Initialize the consumer (called once per parallel instance)
run() methodrun() - Main execution method (runs in dedicated thread)
cancel() - Stop the consumer gracefully
close() - Clean up resources (called after cancellation)
Usage Note: These lifecycle methods are managed by the Flink runtime and should not be called directly by user code.
Concrete implementations must implement these version-specific methods:
protected abstract AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithTimestamps<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup,
boolean useMetrics
) throws Exception;
protected abstract AbstractPartitionDiscoverer createPartitionDiscoverer(
KafkaTopicsDescriptor topicsDescriptor,
int indexOfThisSubtask,
int numParallelSubtasks
);
protected abstract boolean getIsAutoCommitEnabled();public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";MAX_NUM_PENDING_CHECKPOINTS - Maximum number of pending checkpoints to trackPARTITION_DISCOVERY_DISABLED - Use this value to disable automatic partition discoveryKEY_DISABLE_METRICS - Configuration key for disabling Kafka metrics collectionKEY_PARTITION_DISCOVERY_INTERVAL_MILLIS - Configuration key for partition discovery interval