or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-consumption.mddata-production.mdindex.mdtable-api-integration.md

data-consumption.mddocs/

0

# Data Consumption

1

2

Streaming data source functionality for consuming from Kafka 0.9.x topics with exactly-once processing guarantees, configurable deserialization, automatic offset management, and fault tolerance through Flink's checkpointing mechanism.

3

4

## Capabilities

5

6

### FlinkKafkaConsumer09 Class

7

8

Main Kafka consumer class for streaming data from Kafka 0.9.x topics into Flink data streams.

9

10

```java { .api }

11

/**

12

* Kafka consumer for streaming data from Apache Kafka 0.9.x topics.

13

* Supports exactly-once processing, fault tolerance, and parallel consumption.

14

*

15

* @param <T> The type of records consumed from Kafka

16

*/

17

@PublicEvolving

18

public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {

19

20

/**

21

* Creates a new Kafka consumer for a single topic with value-only deserialization.

22

*

23

* @param topic The name of the topic to consume from

24

* @param valueDeserializer The deserializer for converting byte messages to objects

25

* @param props Kafka consumer properties and configuration

26

*/

27

public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props);

28

29

/**

30

* Creates a new Kafka consumer for a single topic with key/value deserialization.

31

*

32

* @param topic The name of the topic to consume from

33

* @param deserializer The keyed deserializer for reading key/value pairs, offsets, and topic names

34

* @param props Kafka consumer properties and configuration

35

*/

36

public FlinkKafkaConsumer09(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);

37

38

/**

39

* Creates a new Kafka consumer for multiple topics with value-only deserialization.

40

*

41

* @param topics The list of topic names to consume from

42

* @param deserializer The deserializer for converting byte messages to objects

43

* @param props Kafka consumer properties and configuration

44

*/

45

public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props);

46

47

/**

48

* Creates a new Kafka consumer for multiple topics with key/value deserialization.

49

*

50

* @param topics The list of topic names to consume from

51

* @param deserializer The keyed deserializer for reading key/value pairs, offsets, and topic names

52

* @param props Kafka consumer properties and configuration

53

*/

54

public FlinkKafkaConsumer09(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);

55

56

/**

57

* Creates a new Kafka consumer for topics matching a pattern with value-only deserialization.

58

* Dynamic topic discovery enabled if partition discovery interval is configured.

59

*

60

* @param subscriptionPattern Regular expression pattern for topic names to subscribe to

61

* @param valueDeserializer The deserializer for converting byte messages to objects

62

* @param props Kafka consumer properties and configuration

63

*/

64

@PublicEvolving

65

public FlinkKafkaConsumer09(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);

66

67

/**

68

* Creates a new Kafka consumer for topics matching a pattern with key/value deserialization.

69

* Dynamic topic discovery enabled if partition discovery interval is configured.

70

*

71

* @param subscriptionPattern Regular expression pattern for topic names to subscribe to

72

* @param deserializer The keyed deserializer for reading key/value pairs, offsets, and topic names

73

* @param props Kafka consumer properties and configuration

74

*/

75

@PublicEvolving

76

public FlinkKafkaConsumer09(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);

77

78

/**

79

* Sets a rate limiter to throttle bytes read from Kafka.

80

*

81

* @param kafkaRateLimiter The rate limiter to control consumption rate

82

*/

83

public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);

84

85

/**

86

* Gets the currently configured rate limiter.

87

*

88

* @return The configured rate limiter, or null if none is set

89

*/

90

public FlinkConnectorRateLimiter getRateLimiter();

91

92

/**

93

* Assigns watermarks and timestamp extractors using punctuated watermarks.

94

*

95

* @param assigner The punctuated watermark assigner

96

* @return This consumer instance for method chaining

97

*/

98

public FlinkKafkaConsumer09<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner);

99

100

/**

101

* Assigns watermarks and timestamp extractors using periodic watermarks.

102

*

103

* @param assigner The periodic watermark assigner

104

* @return This consumer instance for method chaining

105

*/

106

public FlinkKafkaConsumer09<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner);

107

108

/**

109

* Configures whether to commit consumed offsets back to Kafka on checkpoints.

110

*

111

* @param commitOnCheckpoints True to enable offset commits on checkpoints

112

* @return This consumer instance for method chaining

113

*/

114

public FlinkKafkaConsumer09<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);

115

116

/**

117

* Configures the consumer to start reading from the earliest available offsets.

118

*

119

* @return This consumer instance for method chaining

120

*/

121

public FlinkKafkaConsumer09<T> setStartFromEarliest();

122

123

/**

124

* Configures the consumer to start reading from the latest available offsets.

125

*

126

* @return This consumer instance for method chaining

127

*/

128

public FlinkKafkaConsumer09<T> setStartFromLatest();

129

130

/**

131

* Configures the consumer to start reading from the consumer group's committed offsets.

132

*

133

* @return This consumer instance for method chaining

134

*/

135

public FlinkKafkaConsumer09<T> setStartFromGroupOffsets();

136

137

/**

138

* Configures the consumer to start reading from specific partition offsets.

139

*

140

* @param specificStartupOffsets Map of partitions to their starting offsets

141

* @return This consumer instance for method chaining

142

*/

143

public FlinkKafkaConsumer09<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);

144

145

/**

146

* Disables filtering of restored partitions with currently subscribed topics.

147

*

148

* @return This consumer instance for method chaining

149

*/

150

public FlinkKafkaConsumer09<T> disableFilterRestoredPartitionsWithSubscribedTopics();

151

}

152

```

153

154

### Configuration Constants

155

156

```java { .api }

157

/** Configuration key to change the polling timeout */

158

public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";

159

160

/** Default time in milliseconds spent waiting in poll if data is not available */

161

public static final long DEFAULT_POLL_TIMEOUT = 100L;

162

163

/** The maximum number of pending non-committed checkpoints to track */

164

public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;

165

166

/** The default interval to execute partition discovery (disabled by default) */

167

public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;

168

169

/** Boolean configuration key to disable metrics tracking */

170

public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";

171

172

/** Configuration key to define the consumer's partition discovery interval, in milliseconds */

173

public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";

174

```

175

176

## Usage Examples

177

178

### Basic Single Topic Consumption

179

180

```java

181

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

182

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

183

import org.apache.flink.api.common.serialization.SimpleStringSchema;

184

import java.util.Properties;

185

186

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

187

188

Properties properties = new Properties();

189

properties.setProperty("bootstrap.servers", "localhost:9092");

190

properties.setProperty("group.id", "my-consumer-group");

191

192

FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(

193

"my-topic",

194

new SimpleStringSchema(),

195

properties

196

);

197

198

env.addSource(consumer)

199

.print();

200

201

env.execute("Basic Kafka Consumer");

202

```

203

204

### Multiple Topics Consumption

205

206

```java

207

import java.util.Arrays;

208

209

Properties properties = new Properties();

210

properties.setProperty("bootstrap.servers", "localhost:9092");

211

properties.setProperty("group.id", "multi-topic-group");

212

213

FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(

214

Arrays.asList("topic-1", "topic-2", "topic-3"),

215

new SimpleStringSchema(),

216

properties

217

);

218

219

env.addSource(consumer)

220

.map(value -> "Processed: " + value)

221

.print();

222

```

223

224

### Pattern-Based Topic Subscription

225

226

```java

227

import java.util.regex.Pattern;

228

229

Properties properties = new Properties();

230

properties.setProperty("bootstrap.servers", "localhost:9092");

231

properties.setProperty("group.id", "pattern-group");

232

// Enable dynamic topic discovery

233

properties.setProperty("flink.partition-discovery.interval-millis", "30000");

234

235

FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(

236

Pattern.compile("log-topic-.*"),

237

new SimpleStringSchema(),

238

properties

239

);

240

241

env.addSource(consumer)

242

.filter(value -> value.contains("ERROR"))

243

.print();

244

```

245

246

### Key/Value Deserialization

247

248

```java

249

import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;

250

import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;

251

import org.apache.flink.api.common.typeinfo.TypeInformation;

252

import org.apache.kafka.clients.consumer.ConsumerRecord;

253

254

// Custom keyed deserializer for accessing keys, values, and metadata

255

FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(

256

"keyed-topic",

257

new KafkaDeserializationSchema<String>() {

258

@Override

259

public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {

260

String key = record.key() != null ? new String(record.key()) : "null";

261

String value = record.value() != null ? new String(record.value()) : "null";

262

return String.format("Key: %s, Value: %s, Partition: %d, Offset: %d",

263

key, value, record.partition(), record.offset());

264

}

265

266

@Override

267

public boolean isEndOfStream(String nextElement) {

268

return false;

269

}

270

271

@Override

272

public TypeInformation<String> getProducedType() {

273

return TypeInformation.of(String.class);

274

}

275

},

276

properties

277

);

278

```

279

280

### Rate Limited Consumption

281

282

```java

283

import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;

284

import org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter;

285

286

FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(

287

"high-volume-topic",

288

new SimpleStringSchema(),

289

properties

290

);

291

292

// Limit consumption to 1000 bytes per second

293

consumer.setRateLimiter(new GuavaFlinkConnectorRateLimiter(1000.0));

294

295

env.addSource(consumer)

296

.print();

297

```

298

299

### Advanced Configuration

300

301

```java

302

Properties properties = new Properties();

303

// Required Kafka settings

304

properties.setProperty("bootstrap.servers", "kafka-broker-1:9092,kafka-broker-2:9092");

305

properties.setProperty("group.id", "advanced-consumer-group");

306

307

// Offset management

308

properties.setProperty("auto.offset.reset", "earliest"); // or "latest"

309

properties.setProperty("enable.auto.commit", "false"); // Flink manages commits

310

311

// Performance tuning

312

properties.setProperty("fetch.min.bytes", "1024");

313

properties.setProperty("fetch.max.wait.ms", "500");

314

properties.setProperty("max.partition.fetch.bytes", "1048576");

315

316

// Custom polling timeout

317

properties.setProperty("flink.poll-timeout", "100");

318

319

// Enable partition discovery for dynamic topics

320

properties.setProperty("flink.partition-discovery.interval-millis", "30000");

321

322

FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(

323

"configured-topic",

324

new SimpleStringSchema(),

325

properties

326

);

327

328

// Configure watermark generation for event time processing

329

consumer.assignTimestampsAndWatermarks(

330

WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))

331

.withTimestampAssigner((event, timestamp) -> System.currentTimeMillis())

332

);

333

```

334

335

## Error Handling

336

337

The FlinkKafkaConsumer09 integrates with Flink's fault tolerance mechanisms:

338

339

- **Checkpointing**: Automatically saves consumer offsets during checkpoints

340

- **Recovery**: Restores from last successful checkpoint on failure

341

- **Exactly-once**: Guarantees no data loss or duplication when checkpointing is enabled

342

- **Parallelism**: Each parallel instance consumes from assigned partitions independently

343

344

Common exceptions:

345

- `IllegalArgumentException`: Invalid configuration or parameters

346

- `IOException`: Network or serialization issues

347

- `RuntimeException`: Kafka client errors or partition assignment failures