or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

consumer-base.mdindex.mdpartitioners.mdproducer-base.mdserialization.mdtable-api.md
tile.json

consumer-base.mddocs/

Consumer Base Classes

Abstract base implementations for Kafka consumers that provide common functionality across all Kafka versions. These classes handle offset management, checkpointing, watermark assignment, and state management while delegating version-specific operations to concrete implementations.

Capabilities

FlinkKafkaConsumerBase

The core abstract base class that all Flink Kafka consumers extend. Provides comprehensive functionality for consuming from Kafka topics with exactly-once processing guarantees.

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> 
    implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {
    
    public FlinkKafkaConsumerBase(
        List<String> topics,
        Pattern topicPattern,
        KeyedDeserializationSchema<T> deserializer,
        long discoveryIntervalMillis,
        boolean useMetrics
    );
}

Parameters:

  • topics - List of Kafka topics to consume from (null if using pattern)
  • topicPattern - Regex pattern for topic subscription (null if using explicit topics)
  • deserializer - Schema for deserializing Kafka messages
  • discoveryIntervalMillis - Interval for partition discovery (use PARTITION_DISCOVERY_DISABLED to disable)
  • useMetrics - Whether to expose Kafka consumer metrics

Startup Mode Configuration

Configure where the consumer starts reading from Kafka topics.

public FlinkKafkaConsumerBase<T> setStartFromEarliest();
public FlinkKafkaConsumerBase<T> setStartFromLatest();
public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets();
public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);

Usage Examples:

// Start from earliest available messages
consumer.setStartFromEarliest();

// Start from latest messages (skip existing)
consumer.setStartFromLatest();

// Start from committed group offsets (default)
consumer.setStartFromGroupOffsets();

// Start from specific offsets
Map<KafkaTopicPartition, Long> offsets = new HashMap<>();
offsets.put(new KafkaTopicPartition("my-topic", 0), 12345L);
offsets.put(new KafkaTopicPartition("my-topic", 1), 67890L);
consumer.setStartFromSpecificOffsets(offsets);

Offset Management

Control how and when offsets are committed back to Kafka.

public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);

Parameters:

  • commitOnCheckpoints - If true, offsets are committed only on successful checkpoints (recommended for exactly-once)

Usage Example:

// Enable offset commits on checkpoints for exactly-once processing
consumer.setCommitOffsetsOnCheckpoints(true);

Watermark Assignment

Assign timestamps and watermarks for event-time processing.

public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner);
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner);

Parameters:

  • assigner - Watermark assigner for extracting timestamps and generating watermarks

Usage Examples:

// Periodic watermarks with bounded out-of-orderness
consumer.assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
        @Override
        public long extractTimestamp(MyEvent element) {
            return element.getTimestamp();
        }
    }
);

// Punctuated watermarks based on special marker records
consumer.assignTimestampsAndWatermarks(new PunctuatedWatermarkAssigner<MyEvent>() {
    @Override
    public long extractTimestamp(MyEvent element, long recordTimestamp) {
        return element.getTimestamp();
    }
    
    @Override
    public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
        return element.isWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
    }
});

Type Information

Provide type information for the deserialized elements.

public TypeInformation<T> getProducedType();

Returns: Type information for elements produced by this consumer

State Management

Handle checkpointing and state recovery (implemented by the framework).

public void initializeState(FunctionInitializationContext context) throws Exception;
public void snapshotState(FunctionSnapshotContext context) throws Exception;
public void notifyCheckpointComplete(long checkpointId) throws Exception;

These methods are called by the Flink runtime for checkpoint coordination and should not be called directly by user code.

Lifecycle Methods

Source function lifecycle management (called by Flink runtime).

public void open(Configuration configuration) throws Exception;
public void run(SourceContext<T> sourceContext) throws Exception;
public void cancel();
public void close() throws Exception;

Methods:

  • open() - Initialize the consumer (called once per parallel instance)

    • Sets up offset commit mode, metrics, and other configuration
    • Called before run() method
  • run() - Main execution method (runs in dedicated thread)

    • Discovers partitions and initializes fetchers
    • Runs the main consumption loop until cancelled
    • Handles partition discovery and consumer coordination
  • cancel() - Stop the consumer gracefully

    • Sets running flag to false to exit consumption loop
    • Interrupts discovery thread if running
    • Called by Flink when job is cancelled
  • close() - Clean up resources (called after cancellation)

    • Closes fetchers and releases resources
    • Called by Flink runtime during shutdown

Usage Note: These lifecycle methods are managed by the Flink runtime and should not be called directly by user code.

Abstract Methods

Concrete implementations must implement these version-specific methods:

protected abstract AbstractFetcher<T, ?> createFetcher(
    SourceContext<T> sourceContext,
    Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
    SerializedValue<AssignerWithTimestamps<T>> watermarksPeriodic,
    SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
    StreamingRuntimeContext runtimeContext,
    OffsetCommitMode offsetCommitMode,
    MetricGroup consumerMetricGroup,
    boolean useMetrics
) throws Exception;

protected abstract AbstractPartitionDiscoverer createPartitionDiscoverer(
    KafkaTopicsDescriptor topicsDescriptor,
    int indexOfThisSubtask,
    int numParallelSubtasks
);

protected abstract boolean getIsAutoCommitEnabled();

Configuration Constants

public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
  • MAX_NUM_PENDING_CHECKPOINTS - Maximum number of pending checkpoints to track
  • PARTITION_DISCOVERY_DISABLED - Use this value to disable automatic partition discovery
  • KEY_DISABLE_METRICS - Configuration key for disabling Kafka metrics collection
  • KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS - Configuration key for partition discovery interval