or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdoffset-management.mdtable-api.md

kafka-consumer.mddocs/

0

# Kafka Consumer

1

2

The Flink Kafka consumer provides reliable message consumption from Kafka 0.8.x topics with exactly-once processing guarantees through Flink's checkpointing mechanism.

3

4

## Capabilities

5

6

### FlinkKafkaConsumer08

7

8

The primary consumer class for Kafka 0.8.x integration with comprehensive configuration options.

9

10

```java { .api }

11

/**

12

* Kafka consumer for Apache Kafka 0.8.x with exactly-once processing guarantees

13

*/

14

public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {

15

16

/** Configuration key for partition retrieval retries */

17

public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";

18

19

/** Default number of partition retrieval retries */

20

public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;

21

22

/**

23

* Creates consumer for single topic with value-only deserialization

24

* @param topic Kafka topic name

25

* @param valueDeserializer Deserialization schema for message values

26

* @param props Kafka consumer properties

27

*/

28

public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props);

29

30

/**

31

* Creates consumer for single topic with key-value deserialization

32

* @param topic Kafka topic name

33

* @param deserializer Keyed deserialization schema for both keys and values

34

* @param props Kafka consumer properties

35

*/

36

public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props);

37

38

/**

39

* Creates consumer for multiple topics with value-only deserialization

40

* @param topics List of Kafka topic names

41

* @param deserializer Deserialization schema for message values

42

* @param props Kafka consumer properties

43

*/

44

public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props);

45

46

/**

47

* Creates consumer for multiple topics with key-value deserialization

48

* @param topics List of Kafka topic names

49

* @param deserializer Keyed deserialization schema for both keys and values

50

* @param props Kafka consumer properties

51

*/

52

public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props);

53

54

/**

55

* Gets partition information for specified topics

56

* @param topics List of topic names to analyze

57

* @param properties Kafka connection properties

58

* @return List of partition leaders for the topics

59

*/

60

public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties);

61

}

62

```

63

64

**Usage Examples:**

65

66

```java

67

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

68

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;

69

import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

70

import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;

71

72

import java.util.Properties;

73

import java.util.Arrays;

74

75

// Basic single topic consumer

76

Properties props = new Properties();

77

props.setProperty("bootstrap.servers", "localhost:9092");

78

props.setProperty("zookeeper.connect", "localhost:2181");

79

props.setProperty("group.id", "my-consumer-group");

80

81

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(

82

"my-topic",

83

new SimpleStringSchema(),

84

props

85

);

86

87

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

88

DataStream<String> stream = env.addSource(consumer);

89

90

// Multi-topic consumer with JSON deserialization

91

FlinkKafkaConsumer08<ObjectNode> jsonConsumer = new FlinkKafkaConsumer08<>(

92

Arrays.asList("topic1", "topic2", "topic3"),

93

new JSONKeyValueDeserializationSchema(false), // false = include metadata

94

props

95

);

96

97

DataStream<ObjectNode> jsonStream = env.addSource(jsonConsumer);

98

99

// Consumer with custom offset reset

100

Properties customProps = new Properties();

101

customProps.setProperty("bootstrap.servers", "localhost:9092");

102

customProps.setProperty("zookeeper.connect", "localhost:2181");

103

customProps.setProperty("group.id", "custom-group");

104

customProps.setProperty("auto.offset.reset", "earliest");

105

106

FlinkKafkaConsumer08<String> earliestConsumer = new FlinkKafkaConsumer08<>(

107

"events-topic",

108

new SimpleStringSchema(),

109

customProps

110

);

111

```

112

113

### FlinkKafkaConsumer081 (Deprecated)

114

115

Deprecated alias that redirects to FlinkKafkaConsumer08.

116

117

```java { .api }

118

/**

119

* @deprecated Use FlinkKafkaConsumer08 instead

120

*/

121

@Deprecated

122

public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer08<T> {

123

/**

124

* @deprecated Use FlinkKafkaConsumer08 constructor instead

125

*/

126

@Deprecated

127

public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props);

128

}

129

```

130

131

### FlinkKafkaConsumer082 (Deprecated)

132

133

Deprecated alias that redirects to FlinkKafkaConsumer08.

134

135

```java { .api }

136

/**

137

* @deprecated Use FlinkKafkaConsumer08 instead

138

*/

139

@Deprecated

140

public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer08<T> {

141

/**

142

* @deprecated Use FlinkKafkaConsumer08 constructor instead

143

*/

144

@Deprecated

145

public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props);

146

}

147

```

148

149

## Configuration

150

151

### Required Properties

152

153

- **bootstrap.servers**: Comma-separated list of Kafka broker addresses

154

- **zookeeper.connect**: ZooKeeper connection string for offset management

155

- **group.id**: Consumer group identifier for offset coordination

156

157

### Optional Properties

158

159

- **auto.offset.reset**: Strategy when no initial offset (`earliest`, `latest`)

160

- **enable.auto.commit**: Whether to automatically commit offsets (should be false for exactly-once)

161

- **session.timeout.ms**: Session timeout for consumer group coordination

162

- **heartbeat.interval.ms**: Heartbeat interval for group membership

163

- **max.poll.records**: Maximum records returned in single poll

164

- **flink.get-partitions.retry**: Number of retries for partition discovery (default: 3)

165

166

## Checkpointing and Fault Tolerance

167

168

The consumer integrates with Flink's checkpointing mechanism:

169

170

```java

171

// Enable checkpointing for exactly-once guarantees

172

env.enableCheckpointing(5000); // checkpoint every 5 seconds

173

174

// Consumer automatically participates in checkpointing

175

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(

176

"my-topic",

177

new SimpleStringSchema(),

178

props

179

);

180

181

// Starting position can be configured

182

consumer.setStartFromEarliest(); // start from earliest available

183

consumer.setStartFromLatest(); // start from latest (default)

184

consumer.setStartFromGroupOffsets(); // start from committed group offsets

185

consumer.setStartFromTimestamp(timestamp); // start from specific timestamp

186

```

187

188

## Error Handling

189

190

Common exceptions and handling strategies:

191

192

- **IllegalArgumentException**: Invalid topic names or properties

193

- **RuntimeException**: Kafka connection or ZooKeeper issues

194

- **SerializationException**: Deserialization failures

195

196

```java

197

// Proper error handling setup

198

try {

199

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(

200

"my-topic",

201

new SimpleStringSchema(),

202

props

203

);

204

205

env.addSource(consumer);

206

env.execute("Kafka Consumer Job");

207

} catch (Exception e) {

208

// Handle consumer setup or execution errors

209

logger.error("Kafka consumer failed", e);

210

}

211

```