or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

consumer.mdindex.mdproducer.mdtable-api.md

consumer.mddocs/

0

# Data Stream Consumer

1

2

The FlinkKafkaConsumer010 provides comprehensive functionality for consuming data from Apache Kafka 0.10.x topics with exactly-once processing guarantees, flexible subscription patterns, and advanced features like rate limiting and dynamic partition discovery.

3

4

## Capabilities

5

6

### Single Topic Consumer

7

8

Creates a consumer for a single Kafka topic with value-only deserialization.

9

10

```java { .api }

11

/**

12

* Creates a new Kafka streaming source consumer for Kafka 0.10.x

13

* @param topic The name of the topic that should be consumed

14

* @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and Flink's objects

15

* @param props The properties used to configure the Kafka consumer client

16

*/

17

public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props);

18

19

/**

20

* Creates a new Kafka streaming source consumer for Kafka 0.10.x with key-value deserialization

21

* @param topic The name of the topic that should be consumed

22

* @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects

23

* @param props The properties used to configure the Kafka consumer client

24

*/

25

public FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);

26

```

27

28

**Usage Examples:**

29

30

```java

31

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

32

import org.apache.flink.api.common.serialization.SimpleStringSchema;

33

import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

34

import org.apache.kafka.clients.consumer.ConsumerRecord;

35

36

import java.util.Properties;

37

38

// Simple string deserialization

39

Properties props = new Properties();

40

props.setProperty("bootstrap.servers", "localhost:9092");

41

props.setProperty("group.id", "my-consumer-group");

42

43

FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(

44

"events-topic",

45

new SimpleStringSchema(),

46

props

47

);

48

49

// Custom key-value deserialization

50

FlinkKafkaConsumer010<MyEvent> eventConsumer = new FlinkKafkaConsumer010<>(

51

"events-topic",

52

new KafkaDeserializationSchema<MyEvent>() {

53

@Override

54

public MyEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {

55

return MyEvent.fromJson(new String(record.value()));

56

}

57

58

@Override

59

public boolean isEndOfStream(MyEvent nextElement) {

60

return false;

61

}

62

63

@Override

64

public TypeInformation<MyEvent> getProducedType() {

65

return TypeInformation.of(MyEvent.class);

66

}

67

},

68

props

69

);

70

```

71

72

### Multiple Topics Consumer

73

74

Creates a consumer for multiple Kafka topics specified as a list.

75

76

```java { .api }

77

/**

78

* Creates a new Kafka streaming source consumer for multiple topics

79

* @param topics The Kafka topics to read from

80

* @param deserializer The de-/serializer used to convert between Kafka's byte messages and Flink's objects

81

* @param props The properties that are used to configure both the fetcher and the offset handler

82

*/

83

public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props);

84

85

/**

86

* Creates a new Kafka streaming source consumer for multiple topics with key-value deserialization

87

* @param topics The Kafka topics to read from

88

* @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects

89

* @param props The properties that are used to configure both the fetcher and the offset handler

90

*/

91

public FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);

92

```

93

94

**Usage Examples:**

95

96

```java

97

import java.util.Arrays;

98

import java.util.List;

99

100

// Multiple topics with simple deserialization

101

List<String> topics = Arrays.asList("topic1", "topic2", "topic3");

102

FlinkKafkaConsumer010<String> multiTopicConsumer = new FlinkKafkaConsumer010<>(

103

topics,

104

new SimpleStringSchema(),

105

props

106

);

107

108

// Multiple topics with JSON deserialization

109

FlinkKafkaConsumer010<JsonNode> jsonConsumer = new FlinkKafkaConsumer010<>(

110

topics,

111

new JsonDeserializationSchema(),

112

props

113

);

114

```

115

116

### Pattern-Based Topic Subscription

117

118

Creates a consumer that subscribes to topics matching a regular expression pattern, with automatic discovery of new matching topics.

119

120

```java { .api }

121

/**

122

* Creates a new Kafka streaming source consumer using pattern-based topic subscription

123

* @param subscriptionPattern The regular expression for a pattern of topic names to subscribe to

124

* @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and Flink's objects

125

* @param props The properties used to configure the Kafka consumer client

126

*/

127

public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);

128

129

/**

130

* Creates a new Kafka streaming source consumer using pattern-based topic subscription with key-value deserialization

131

* @param subscriptionPattern The regular expression for a pattern of topic names to subscribe to

132

* @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects

133

* @param props The properties used to configure the Kafka consumer client

134

*/

135

public FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);

136

```

137

138

**Usage Examples:**

139

140

```java

141

import java.util.regex.Pattern;

142

143

// Subscribe to all topics starting with "logs-"

144

Pattern logTopicsPattern = Pattern.compile("logs-.*");

145

FlinkKafkaConsumer010<String> patternConsumer = new FlinkKafkaConsumer010<>(

146

logTopicsPattern,

147

new SimpleStringSchema(),

148

props

149

);

150

151

// Subscribe to topics matching environment-specific pattern

152

Pattern envPattern = Pattern.compile("(prod|staging)-events-.*");

153

FlinkKafkaConsumer010<String> envConsumer = new FlinkKafkaConsumer010<>(

154

envPattern,

155

new SimpleStringSchema(),

156

props

157

);

158

```

159

160

### Rate Limiting

161

162

Configure rate limiting to throttle the number of bytes read from Kafka per second.

163

164

```java { .api }

165

/**

166

* Set rate limiter for throttling bytes read from Kafka

167

* @param kafkaRateLimiter Rate limiter implementation to control consumption rate

168

*/

169

public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);

170

171

/**

172

* Get the configured rate limiter

173

* @return The currently configured rate limiter, or null if none is set

174

*/

175

public FlinkConnectorRateLimiter getRateLimiter();

176

```

177

178

**Usage Examples:**

179

180

```java

181

import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;

182

import org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter;

183

184

// Create consumer

185

FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(

186

"high-volume-topic",

187

new SimpleStringSchema(),

188

props

189

);

190

191

// Set rate limiter to 1MB per second

192

FlinkConnectorRateLimiter rateLimiter = new GuavaFlinkConnectorRateLimiter(1024 * 1024); // 1MB/s

193

consumer.setRateLimiter(rateLimiter);

194

195

// Check current rate limiter

196

FlinkConnectorRateLimiter currentLimiter = consumer.getRateLimiter();

197

```

198

199

## Configuration Properties

200

201

### Consumer-Specific Properties

202

203

```java { .api }

204

// Configuration key for polling timeout

205

public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";

206

207

// Default polling timeout in milliseconds

208

public static final long DEFAULT_POLL_TIMEOUT = 100L;

209

```

210

211

### Common Kafka Consumer Properties

212

213

Required properties:

214

- **bootstrap.servers**: Comma-separated list of Kafka broker addresses

215

- **group.id**: Consumer group identifier

216

217

Optional properties:

218

- **flink.poll-timeout**: Time in milliseconds spent waiting in poll if data is not available (default: 100)

219

- **auto.offset.reset**: What to do when there is no initial offset ("earliest", "latest", "none")

220

- **enable.auto.commit**: Whether to automatically commit offsets (should be false for exactly-once)

221

- **max.poll.records**: Maximum number of records returned in a single poll

222

- **fetch.min.bytes**: Minimum amount of data the server should return for a fetch request

223

- **fetch.max.wait.ms**: Maximum time the server will block before responding to fetch request

224

225

### Dynamic Partition Discovery

226

227

Enable automatic discovery of new partitions by setting partition discovery interval:

228

229

```java

230

// Enable partition discovery every 30 seconds

231

props.setProperty("flink.partition-discovery.interval-millis", "30000");

232

```

233

234

## Exactly-Once Processing

235

236

The FlinkKafkaConsumer010 integrates with Flink's checkpointing mechanism to provide exactly-once processing guarantees:

237

238

1. **Checkpointing**: Offsets are stored in Flink's checkpoints, not committed to Kafka

239

2. **Recovery**: On restart, consumer resumes from the last successful checkpoint

240

3. **Commit Strategy**: Offsets are committed to Kafka only for monitoring purposes

241

242

```java

243

// Configure for exactly-once processing

244

props.setProperty("enable.auto.commit", "false");

245

props.setProperty("auto.offset.reset", "earliest");

246

247

// In streaming environment

248

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

249

env.enableCheckpointing(5000); // Checkpoint every 5 seconds

250

```

251

252

## Error Handling

253

254

The consumer handles various error scenarios:

255

256

- **Broker failures**: Automatic reconnection with backoff

257

- **Deserialization errors**: Can be configured to skip or fail

258

- **Partition reassignment**: Automatic handling of partition changes

259

- **Consumer group rebalancing**: Graceful handling of consumer group membership changes

260

261

## Watermarks and Event Time

262

263

The consumer supports event time processing with watermark extraction:

264

265

```java

266

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

267

268

FlinkKafkaConsumer010<MyEvent> consumer = new FlinkKafkaConsumer010<>(...);

269

270

// Assign watermarks and timestamps

271

consumer.assignTimestampsAndWatermarks(

272

WatermarkStrategy.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))

273

.withTimestampAssigner((event, timestamp) -> event.getEventTime())

274

);

275

```