or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

consumer-base.mdindex.mdpartitioners.mdproducer-base.mdserialization.mdtable-api.md

consumer-base.mddocs/

0

# Consumer Base Classes

1

2

Abstract base implementations for Kafka consumers that provide common functionality across all Kafka versions. These classes handle offset management, checkpointing, watermark assignment, and state management while delegating version-specific operations to concrete implementations.

3

4

## Capabilities

5

6

### FlinkKafkaConsumerBase

7

8

The core abstract base class that all Flink Kafka consumers extend. Provides comprehensive functionality for consuming from Kafka topics with exactly-once processing guarantees.

9

10

```java { .api }

11

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>

12

implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {

13

14

public FlinkKafkaConsumerBase(

15

List<String> topics,

16

Pattern topicPattern,

17

KeyedDeserializationSchema<T> deserializer,

18

long discoveryIntervalMillis,

19

boolean useMetrics

20

);

21

}

22

```

23

24

**Parameters:**

25

- `topics` - List of Kafka topics to consume from (null if using pattern)

26

- `topicPattern` - Regex pattern for topic subscription (null if using explicit topics)

27

- `deserializer` - Schema for deserializing Kafka messages

28

- `discoveryIntervalMillis` - Interval for partition discovery (use `PARTITION_DISCOVERY_DISABLED` to disable)

29

- `useMetrics` - Whether to expose Kafka consumer metrics

30

31

### Startup Mode Configuration

32

33

Configure where the consumer starts reading from Kafka topics.

34

35

```java { .api }

36

public FlinkKafkaConsumerBase<T> setStartFromEarliest();

37

public FlinkKafkaConsumerBase<T> setStartFromLatest();

38

public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets();

39

public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);

40

```

41

42

**Usage Examples:**

43

44

```java

45

// Start from earliest available messages

46

consumer.setStartFromEarliest();

47

48

// Start from latest messages (skip existing)

49

consumer.setStartFromLatest();

50

51

// Start from committed group offsets (default)

52

consumer.setStartFromGroupOffsets();

53

54

// Start from specific offsets

55

Map<KafkaTopicPartition, Long> offsets = new HashMap<>();

56

offsets.put(new KafkaTopicPartition("my-topic", 0), 12345L);

57

offsets.put(new KafkaTopicPartition("my-topic", 1), 67890L);

58

consumer.setStartFromSpecificOffsets(offsets);

59

```

60

61

### Offset Management

62

63

Control how and when offsets are committed back to Kafka.

64

65

```java { .api }

66

public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);

67

```

68

69

**Parameters:**

70

- `commitOnCheckpoints` - If true, offsets are committed only on successful checkpoints (recommended for exactly-once)

71

72

**Usage Example:**

73

74

```java

75

// Enable offset commits on checkpoints for exactly-once processing

76

consumer.setCommitOffsetsOnCheckpoints(true);

77

```

78

79

### Watermark Assignment

80

81

Assign timestamps and watermarks for event-time processing.

82

83

```java { .api }

84

public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner);

85

public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner);

86

```

87

88

**Parameters:**

89

- `assigner` - Watermark assigner for extracting timestamps and generating watermarks

90

91

**Usage Examples:**

92

93

```java

94

// Periodic watermarks with bounded out-of-orderness

95

consumer.assignTimestampsAndWatermarks(

96

new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {

97

@Override

98

public long extractTimestamp(MyEvent element) {

99

return element.getTimestamp();

100

}

101

}

102

);

103

104

// Punctuated watermarks based on special marker records

105

consumer.assignTimestampsAndWatermarks(new PunctuatedWatermarkAssigner<MyEvent>() {

106

@Override

107

public long extractTimestamp(MyEvent element, long recordTimestamp) {

108

return element.getTimestamp();

109

}

110

111

@Override

112

public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {

113

return element.isWatermarkMarker() ? new Watermark(extractedTimestamp) : null;

114

}

115

});

116

```

117

118

### Type Information

119

120

Provide type information for the deserialized elements.

121

122

```java { .api }

123

public TypeInformation<T> getProducedType();

124

```

125

126

**Returns:** Type information for elements produced by this consumer

127

128

### State Management

129

130

Handle checkpointing and state recovery (implemented by the framework).

131

132

```java { .api }

133

public void initializeState(FunctionInitializationContext context) throws Exception;

134

public void snapshotState(FunctionSnapshotContext context) throws Exception;

135

public void notifyCheckpointComplete(long checkpointId) throws Exception;

136

```

137

138

These methods are called by the Flink runtime for checkpoint coordination and should not be called directly by user code.

139

140

### Lifecycle Methods

141

142

Source function lifecycle management (called by Flink runtime).

143

144

```java { .api }

145

public void open(Configuration configuration) throws Exception;

146

public void run(SourceContext<T> sourceContext) throws Exception;

147

public void cancel();

148

public void close() throws Exception;

149

```

150

151

**Methods:**

152

- `open()` - Initialize the consumer (called once per parallel instance)

153

- Sets up offset commit mode, metrics, and other configuration

154

- Called before `run()` method

155

156

- `run()` - Main execution method (runs in dedicated thread)

157

- Discovers partitions and initializes fetchers

158

- Runs the main consumption loop until cancelled

159

- Handles partition discovery and consumer coordination

160

161

- `cancel()` - Stop the consumer gracefully

162

- Sets running flag to false to exit consumption loop

163

- Interrupts discovery thread if running

164

- Called by Flink when job is cancelled

165

166

- `close()` - Clean up resources (called after cancellation)

167

- Closes fetchers and releases resources

168

- Called by Flink runtime during shutdown

169

170

**Usage Note:** These lifecycle methods are managed by the Flink runtime and should not be called directly by user code.

171

172

### Abstract Methods

173

174

Concrete implementations must implement these version-specific methods:

175

176

```java { .api }

177

protected abstract AbstractFetcher<T, ?> createFetcher(

178

SourceContext<T> sourceContext,

179

Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,

180

SerializedValue<AssignerWithTimestamps<T>> watermarksPeriodic,

181

SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,

182

StreamingRuntimeContext runtimeContext,

183

OffsetCommitMode offsetCommitMode,

184

MetricGroup consumerMetricGroup,

185

boolean useMetrics

186

) throws Exception;

187

188

protected abstract AbstractPartitionDiscoverer createPartitionDiscoverer(

189

KafkaTopicsDescriptor topicsDescriptor,

190

int indexOfThisSubtask,

191

int numParallelSubtasks

192

);

193

194

protected abstract boolean getIsAutoCommitEnabled();

195

```

196

197

## Configuration Constants

198

199

```java { .api }

200

public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;

201

public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;

202

public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";

203

public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";

204

```

205

206

- `MAX_NUM_PENDING_CHECKPOINTS` - Maximum number of pending checkpoints to track

207

- `PARTITION_DISCOVERY_DISABLED` - Use this value to disable automatic partition discovery

208

- `KEY_DISABLE_METRICS` - Configuration key for disabling Kafka metrics collection

209

- `KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS` - Configuration key for partition discovery interval