or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdserialization.mdstreaming-consumer.mdstreaming-producer.mdtable-api.md

streaming-consumer.mddocs/

0

# Streaming Consumer

1

2

The `FlinkKafkaConsumer011` provides robust Kafka topic consumption with exactly-once processing guarantees, flexible startup modes, and seamless integration with Flink's checkpointing mechanism.

3

4

## Capabilities

5

6

### FlinkKafkaConsumer011 Class

7

8

Main consumer class for reading from Kafka 0.11.x topics with comprehensive configuration options.

9

10

```java { .api }

11

/**

12

* Kafka consumer for Kafka 0.11.x supporting exactly-once semantics and checkpointing

13

* Extends FlinkKafkaConsumer010 with additional 0.11.x-specific features

14

*/

15

@PublicEvolving

16

class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {

17

18

// Single topic constructors

19

FlinkKafkaConsumer011(String topic, DeserializationSchema<T> valueDeserializer, Properties props);

20

FlinkKafkaConsumer011(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);

21

22

// Multiple topics constructors

23

FlinkKafkaConsumer011(List<String> topics, DeserializationSchema<T> deserializer, Properties props);

24

FlinkKafkaConsumer011(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);

25

26

// Pattern-based topic subscription (for dynamic topic discovery)

27

@PublicEvolving

28

FlinkKafkaConsumer011(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);

29

@PublicEvolving

30

FlinkKafkaConsumer011(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);

31

}

32

```

33

34

**Usage Examples:**

35

36

```java

37

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

38

import org.apache.flink.api.common.serialization.SimpleStringSchema;

39

import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

40

41

// Single topic consumption

42

Properties props = new Properties();

43

props.setProperty("bootstrap.servers", "localhost:9092");

44

props.setProperty("group.id", "my-consumer-group");

45

46

FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(

47

"user-events",

48

new SimpleStringSchema(),

49

props

50

);

51

52

// Multiple topics consumption

53

List<String> topics = Arrays.asList("orders", "payments", "shipments");

54

FlinkKafkaConsumer011<String> multiTopicConsumer = new FlinkKafkaConsumer011<>(

55

topics,

56

new SimpleStringSchema(),

57

props

58

);

59

60

// Pattern-based subscription for dynamic topic discovery

61

Pattern topicPattern = Pattern.compile("events-.*");

62

FlinkKafkaConsumer011<String> patternConsumer = new FlinkKafkaConsumer011<>(

63

topicPattern,

64

new SimpleStringSchema(),

65

props

66

);

67

```

68

69

### Startup Mode Configuration

70

71

Control how the consumer starts reading from Kafka topics.

72

73

```java { .api }

74

// Configure startup behavior (inherited from FlinkKafkaConsumerBase)

75

FlinkKafkaConsumer011<T> setStartFromEarliest();

76

FlinkKafkaConsumer011<T> setStartFromLatest();

77

FlinkKafkaConsumer011<T> setStartFromGroupOffsets();

78

FlinkKafkaConsumer011<T> setStartFromTimestamp(long startupOffsetsTimestamp);

79

FlinkKafkaConsumer011<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);

80

```

81

82

**Usage Examples:**

83

84

```java

85

// Start from earliest available messages

86

consumer.setStartFromEarliest();

87

88

// Start from latest messages (skip existing messages)

89

consumer.setStartFromLatest();

90

91

// Start from committed offsets (default behavior)

92

consumer.setStartFromGroupOffsets();

93

94

// Start from specific timestamp

95

long timestamp = System.currentTimeMillis() - (24 * 60 * 60 * 1000); // 24 hours ago

96

consumer.setStartFromTimestamp(timestamp);

97

98

// Start from specific partition offsets

99

Map<KafkaTopicPartition, Long> offsets = new HashMap<>();

100

offsets.put(new KafkaTopicPartition("my-topic", 0), 1000L);

101

offsets.put(new KafkaTopicPartition("my-topic", 1), 2000L);

102

consumer.setStartFromSpecificOffsets(offsets);

103

```

104

105

### Consumer Configuration

106

107

Essential configuration options for optimal consumer behavior.

108

109

```java { .api }

110

// Key configuration constants (inherited from FlinkKafkaConsumerBase)

111

static final String KEY_DISABLE_METRICS = "flink.disable-metrics";

112

static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";

113

static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;

114

static final int MAX_NUM_PENDING_CHECKPOINTS = 100;

115

```

116

117

**Usage Examples:**

118

119

```java

120

Properties props = new Properties();

121

props.setProperty("bootstrap.servers", "localhost:9092");

122

props.setProperty("group.id", "my-group");

123

124

// Kafka consumer configuration

125

props.setProperty("auto.offset.reset", "earliest");

126

props.setProperty("enable.auto.commit", "false"); // Managed by Flink

127

props.setProperty("max.poll.records", "500");

128

129

// Flink-specific configuration

130

props.setProperty("flink.disable-metrics", "false");

131

props.setProperty("flink.partition-discovery.interval-millis", "30000"); // 30 seconds

132

133

FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(

134

"my-topic",

135

new SimpleStringSchema(),

136

props

137

);

138

139

// Enable partition discovery for dynamic partitions

140

consumer.setProperty("flink.partition-discovery.interval-millis", "30000");

141

```

142

143

### Advanced Consumer Features

144

145

Additional configuration methods for specialized use cases.

146

147

```java { .api }

148

// Commit mode configuration (inherited from FlinkKafkaConsumerBase)

149

FlinkKafkaConsumer011<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);

150

151

// Partition assignment and metadata access

152

FlinkKafkaConsumer011<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy);

153

```

154

155

**Usage Examples:**

156

157

```java

158

// Control offset committing behavior

159

consumer.setCommitOffsetsOnCheckpoints(true); // Commit offsets to Kafka on checkpoint

160

161

// Configure watermark generation for event time processing

162

WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy

163

.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))

164

.withTimestampAssigner((event, timestamp) -> extractTimestamp(event));

165

166

consumer.assignTimestampsAndWatermarks(watermarkStrategy);

167

168

// Integration with Flink DataStream

169

DataStream<String> kafkaStream = env

170

.addSource(consumer)

171

.name("Kafka Source");

172

```

173

174

## Error Handling

175

176

The consumer integrates with the Flink Kafka exception hierarchy for comprehensive error management.

177

178

```java { .api }

179

// Consumer may throw FlinkKafka011Exception for configuration or runtime errors

180

// Exception handling is typically done at the Flink job level through restart strategies

181

```

182

183

**Configuration for resilience:**

184

185

```java

186

// Configure consumer for resilient operation

187

props.setProperty("session.timeout.ms", "30000");

188

props.setProperty("heartbeat.interval.ms", "10000");

189

props.setProperty("max.poll.interval.ms", "300000");

190

props.setProperty("connections.max.idle.ms", "540000");

191

props.setProperty("request.timeout.ms", "60000");

192

```