or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdtable-api.md

kafka-consumer.mddocs/

0

# Kafka Consumer API

1

2

Comprehensive API reference for Kafka 0.8 consumer classes in the Apache Flink Kafka Connector.

3

4

## FlinkKafkaConsumer08<T>

5

6

**Package:** `org.apache.flink.streaming.connectors.kafka`

7

**Annotations:** `@PublicEvolving`

8

**Extends:** `FlinkKafkaConsumerBase<T>`

9

**Description:** The main Kafka consumer for Apache Kafka 0.8.x, providing streaming data source capabilities with exactly-once processing guarantees through Flink's checkpointing mechanism.

10

11

### Class Declaration

12

13

```java { .api }

14

@PublicEvolving

15

public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T>

16

```

17

18

### Constants

19

20

```java { .api }

21

// Serialization identifier (from FlinkKafkaConsumer08)

22

private static final long serialVersionUID = -6272159445203409112L;

23

24

// Configuration keys (from FlinkKafkaConsumer08)

25

public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";

26

public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;

27

28

// Inherited constants from FlinkKafkaConsumerBase

29

public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;

30

public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;

31

public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";

32

public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";

33

```

34

35

### Constructors

36

37

#### Single Topic Constructors

38

39

```java { .api }

40

/**

41

* Creates a consumer for a single topic with DeserializationSchema.

42

*

43

* @param topic The Kafka topic to read from

44

* @param valueDeserializer The deserializer used to convert raw bytes to data objects

45

* @param props Configuration properties for Kafka consumer and ZooKeeper client

46

*/

47

public FlinkKafkaConsumer08(String topic,

48

DeserializationSchema<T> valueDeserializer,

49

Properties props)

50

51

/**

52

* Creates a consumer for a single topic with KafkaDeserializationSchema.

53

*

54

* @param topic The Kafka topic to read from

55

* @param deserializer The deserializer used to convert Kafka ConsumerRecord to data objects

56

* @param props Configuration properties for Kafka consumer and ZooKeeper client

57

*/

58

public FlinkKafkaConsumer08(String topic,

59

KafkaDeserializationSchema<T> deserializer,

60

Properties props)

61

```

62

63

#### Multi-Topic Constructors

64

65

```java { .api }

66

/**

67

* Creates a consumer for multiple topics with DeserializationSchema.

68

*

69

* @param topics List of Kafka topics to read from

70

* @param deserializer The deserializer used to convert raw bytes to data objects

71

* @param props Configuration properties for Kafka consumer and ZooKeeper client

72

*/

73

public FlinkKafkaConsumer08(List<String> topics,

74

DeserializationSchema<T> deserializer,

75

Properties props)

76

77

/**

78

* Creates a consumer for multiple topics with KafkaDeserializationSchema.

79

*

80

* @param topics List of Kafka topics to read from

81

* @param deserializer The deserializer used to convert Kafka ConsumerRecord to data objects

82

* @param props Configuration properties for Kafka consumer and ZooKeeper client

83

*/

84

public FlinkKafkaConsumer08(List<String> topics,

85

KafkaDeserializationSchema<T> deserializer,

86

Properties props)

87

```

88

89

#### Pattern Subscription Constructors

90

91

```java { .api }

92

/**

93

* Creates a consumer with pattern-based topic subscription and DeserializationSchema.

94

* Enables dynamic discovery of topics matching the pattern.

95

*

96

* @param subscriptionPattern Regex pattern for topic names to subscribe to

97

* @param valueDeserializer The deserializer used to convert raw bytes to data objects

98

* @param props Configuration properties for Kafka consumer and ZooKeeper client

99

*/

100

@PublicEvolving

101

public FlinkKafkaConsumer08(Pattern subscriptionPattern,

102

DeserializationSchema<T> valueDeserializer,

103

Properties props)

104

105

/**

106

* Creates a consumer with pattern-based topic subscription and KafkaDeserializationSchema.

107

* Enables dynamic discovery of topics matching the pattern.

108

*

109

* @param subscriptionPattern Regex pattern for topic names to subscribe to

110

* @param deserializer The deserializer used to convert Kafka ConsumerRecord to data objects

111

* @param props Configuration properties for Kafka consumer and ZooKeeper client

112

*/

113

@PublicEvolving

114

public FlinkKafkaConsumer08(Pattern subscriptionPattern,

115

KafkaDeserializationSchema<T> deserializer,

116

Properties props)

117

```

118

119

### Configuration Methods (Inherited from FlinkKafkaConsumerBase)

120

121

#### Startup Position Configuration

122

123

```java { .api }

124

/**

125

* Configures the consumer to start reading from the earliest available offset.

126

*

127

* @return This consumer instance for method chaining

128

*/

129

public FlinkKafkaConsumerBase<T> setStartFromEarliest()

130

131

/**

132

* Configures the consumer to start reading from the latest available offset.

133

*

134

* @return This consumer instance for method chaining

135

*/

136

public FlinkKafkaConsumerBase<T> setStartFromLatest()

137

138

/**

139

* Configures the consumer to start reading from the consumer group's committed offsets.

140

* This is the default behavior.

141

*

142

* @return This consumer instance for method chaining

143

*/

144

public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets()

145

146

/**

147

* Configures the consumer to start reading from specific offsets per partition.

148

*

149

* @param specificStartupOffsets Map of partition to offset mappings

150

* @return This consumer instance for method chaining

151

*/

152

public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)

153

```

154

155

#### Watermark and Timestamp Assignment

156

157

```java { .api }

158

/**

159

* Assigns a timestamp assigner and watermark generator for punctuated watermarks.

160

*

161

* @param assigner The assigner that generates punctuated watermarks

162

* @return This consumer instance for method chaining

163

*/

164

public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner)

165

166

/**

167

* Assigns a timestamp assigner and watermark generator for periodic watermarks.

168

*

169

* @param assigner The assigner that generates periodic watermarks

170

* @return This consumer instance for method chaining

171

*/

172

public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner)

173

```

174

175

#### Checkpointing and Offset Management

176

177

```java { .api }

178

/**

179

* Configures whether the consumer should commit offsets back to Kafka on checkpoints.

180

* This is necessary for exactly-once processing guarantees.

181

*

182

* @param commitOnCheckpoints Whether to commit offsets on checkpoints (default: true)

183

* @return This consumer instance for method chaining

184

*/

185

public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints)

186

187

/**

188

* Disables the filtering of restored partitions based on subscribed topics.

189

* By default, only partitions of subscribed topics are restored.

190

*

191

* @return This consumer instance for method chaining

192

*/

193

public FlinkKafkaConsumerBase<T> disableFilterRestoredPartitionsWithSubscribedTopics()

194

```

195

196

### Lifecycle Methods (Inherited from FlinkKafkaConsumerBase)

197

198

```java { .api }

199

/**

200

* Opens the consumer and initializes resources.

201

*

202

* @param configuration The task configuration

203

* @throws Exception If initialization fails

204

*/

205

public void open(Configuration configuration) throws Exception

206

207

/**

208

* Main execution method that reads from Kafka and emits records.

209

*

210

* @param sourceContext The source context for emitting records

211

* @throws Exception If execution fails

212

*/

213

public void run(SourceContext<T> sourceContext) throws Exception

214

215

/**

216

* Cancels the consumer operation and stops reading.

217

*/

218

public void cancel()

219

220

/**

221

* Closes the consumer and cleans up resources.

222

*

223

* @throws Exception If cleanup fails

224

*/

225

public void close() throws Exception

226

227

/**

228

* Returns the type information for the produced data type.

229

*

230

* @return Type information for T

231

*/

232

public TypeInformation<T> getProducedType()

233

```

234

235

### Checkpointing Interface Methods (Inherited)

236

237

```java { .api }

238

/**

239

* Initializes the state of the function from a checkpoint.

240

*

241

* @param context The initialization context

242

* @throws Exception If state initialization fails

243

*/

244

public final void initializeState(FunctionInitializationContext context) throws Exception

245

246

/**

247

* Snapshots the function's state during checkpointing.

248

*

249

* @param context The snapshot context

250

* @throws Exception If state snapshotting fails

251

*/

252

public final void snapshotState(FunctionSnapshotContext context) throws Exception

253

254

/**

255

* Notifies the function that a checkpoint has been completed.

256

*

257

* @param checkpointId The ID of the completed checkpoint

258

* @throws Exception If notification processing fails

259

*/

260

public final void notifyCheckpointComplete(long checkpointId) throws Exception

261

```

262

263

### Static Validation Methods

264

265

```java { .api }

266

/**

267

* Validate the ZooKeeper configuration, checking for required parameters.

268

*

269

* @param props Properties to check

270

* @throws IllegalArgumentException if required properties are missing or invalid

271

*/

272

protected static void validateZooKeeperConfig(Properties props)

273

```

274

275

### Usage Examples

276

277

#### Basic Consumer Setup

278

279

```java { .api }

280

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

281

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;

282

import org.apache.flink.api.common.serialization.SimpleStringSchema;

283

284

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

285

286

Properties properties = new Properties();

287

properties.setProperty("zookeeper.connect", "localhost:2181");

288

properties.setProperty("group.id", "my-consumer-group");

289

properties.setProperty("auto.offset.reset", "earliest");

290

291

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(

292

"my-topic",

293

new SimpleStringSchema(),

294

properties

295

);

296

297

env.addSource(consumer).print();

298

```

299

300

#### Multi-Topic Consumer with Custom Deserialization

301

302

```java { .api }

303

import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

304

import org.apache.flink.api.common.typeinfo.TypeInformation;

305

import org.apache.kafka.clients.consumer.ConsumerRecord;

306

307

List<String> topics = Arrays.asList("topic1", "topic2", "topic3");

308

309

KafkaDeserializationSchema<MyEvent> schema = new KafkaDeserializationSchema<MyEvent>() {

310

@Override

311

public MyEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {

312

return MyEvent.fromJson(new String(record.value()));

313

}

314

315

@Override

316

public boolean isEndOfStream(MyEvent nextElement) {

317

return nextElement.isEndMarker();

318

}

319

320

@Override

321

public TypeInformation<MyEvent> getProducedType() {

322

return TypeInformation.of(MyEvent.class);

323

}

324

};

325

326

FlinkKafkaConsumer08<MyEvent> consumer = new FlinkKafkaConsumer08<>(

327

topics, schema, properties);

328

```

329

330

#### Pattern-Based Topic Subscription with Watermarks

331

332

```java { .api }

333

import org.apache.flink.streaming.api.functions.timestamps.AssignerWithPeriodicWatermarks;

334

import org.apache.flink.streaming.api.watermark.Watermark;

335

import java.util.regex.Pattern;

336

337

Pattern topicPattern = Pattern.compile("metrics-.*");

338

339

FlinkKafkaConsumer08<MyMetric> consumer = new FlinkKafkaConsumer08<>(

340

topicPattern,

341

new MyMetricDeserializer(),

342

properties

343

);

344

345

consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<MyMetric>() {

346

private long currentMaxTimestamp = Long.MIN_VALUE;

347

348

@Override

349

public long extractTimestamp(MyMetric element, long previousElementTimestamp) {

350

long timestamp = element.getTimestamp();

351

currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);

352

return timestamp;

353

}

354

355

@Override

356

public Watermark getCurrentWatermark() {

357

return new Watermark(currentMaxTimestamp - 5000); // 5 second delay

358

}

359

});

360

361

consumer.setStartFromLatest()

362

.setCommitOffsetsOnCheckpoints(true);

363

```

364

365

#### Specific Offset Configuration

366

367

```java { .api }

368

import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;

369

370

Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();

371

specificStartupOffsets.put(new KafkaTopicPartition("my-topic", 0), 12345L);

372

specificStartupOffsets.put(new KafkaTopicPartition("my-topic", 1), 67890L);

373

specificStartupOffsets.put(new KafkaTopicPartition("my-topic", 2), 54321L);

374

375

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(

376

"my-topic", new SimpleStringSchema(), properties);

377

378

consumer.setStartFromSpecificOffsets(specificStartupOffsets)

379

.setCommitOffsetsOnCheckpoints(true);

380

```

381

382

## Deprecated Consumer Classes

383

384

### FlinkKafkaConsumer081<T>

385

386

```java { .api }

387

@Deprecated

388

public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer08<T>

389

390

/**

391

* @deprecated Use FlinkKafkaConsumer08 instead

392

*/

393

@Deprecated

394

public FlinkKafkaConsumer081(String topic,

395

DeserializationSchema<T> valueDeserializer,

396

Properties props)

397

```

398

399

### FlinkKafkaConsumer082<T>

400

401

```java { .api }

402

@Deprecated

403

public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer08<T>

404

405

/**

406

* @deprecated Use FlinkKafkaConsumer08 instead

407

*/

408

@Deprecated

409

public FlinkKafkaConsumer082(String topic,

410

DeserializationSchema<T> valueDeserializer,

411

Properties props)

412

```

413

414

## KafkaDeserializationSchema<T> Interface

415

416

**Package:** `org.apache.flink.streaming.connectors.kafka`

417

**Annotations:** `@PublicEvolving`

418

**Extends:** `Serializable, ResultTypeQueryable<T>`

419

420

```java { .api }

421

@PublicEvolving

422

public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

423

424

/**

425

* Method to decide whether the element signals the end of the stream.

426

* If this returns true, the element won't be emitted.

427

*

428

* @param nextElement The element to test for end-of-stream signal

429

* @return True if the element signals end of stream, false otherwise

430

*/

431

boolean isEndOfStream(T nextElement);

432

433

/**

434

* Deserializes the Kafka record.

435

*

436

* @param record The ConsumerRecord from Kafka to deserialize

437

* @return The deserialized message as an object of type T

438

* @throws Exception If deserialization fails

439

*/

440

T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;

441

}

442

```

443

444

### Custom KafkaDeserializationSchema Example

445

446

```java { .api }

447

public class CustomEventDeserializer implements KafkaDeserializationSchema<CustomEvent> {

448

449

@Override

450

public CustomEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {

451

// Access all ConsumerRecord information

452

String topic = record.topic();

453

int partition = record.partition();

454

long offset = record.offset();

455

long timestamp = record.timestamp();

456

byte[] key = record.key();

457

byte[] value = record.value();

458

459

// Custom deserialization logic

460

CustomEvent event = parseEvent(value);

461

event.setMetadata(topic, partition, offset, timestamp);

462

463

return event;

464

}

465

466

@Override

467

public boolean isEndOfStream(CustomEvent nextElement) {

468

// Check for end-of-stream marker

469

return nextElement != null && nextElement.getEventType().equals("END_STREAM");

470

}

471

472

@Override

473

public TypeInformation<CustomEvent> getProducedType() {

474

return TypeInformation.of(CustomEvent.class);

475

}

476

477

private CustomEvent parseEvent(byte[] data) throws Exception {

478

// Implementation specific parsing logic

479

return CustomEvent.fromBytes(data);

480

}

481

}

482

```

483

484

## Configuration Properties

485

486

### Required Properties

487

488

```java { .api }

489

Properties props = new Properties();

490

491

// Required: ZooKeeper connection for metadata

492

props.setProperty("zookeeper.connect", "localhost:2181");

493

494

// Required: Consumer group ID

495

props.setProperty("group.id", "my-consumer-group");

496

```

497

498

### Optional Consumer Properties

499

500

```java { .api }

501

// Offset reset behavior when no committed offset exists

502

props.setProperty("auto.offset.reset", "earliest"); // or "latest"

503

504

// Socket and network settings

505

props.setProperty("socket.timeout.ms", "30000");

506

props.setProperty("socket.receive.buffer.bytes", "65536");

507

508

// Fetch settings

509

props.setProperty("fetch.message.max.bytes", "1048576");

510

props.setProperty("fetch.wait.max.ms", "100");

511

512

// ZooKeeper settings

513

props.setProperty("zookeeper.session.timeout.ms", "6000");

514

props.setProperty("zookeeper.connection.timeout.ms", "6000");

515

516

// Auto-commit settings (recommend disabling for exactly-once)

517

props.setProperty("auto.commit.enable", "false");

518

props.setProperty("auto.commit.interval.ms", "60000");

519

```

520

521

### Flink-Specific Properties

522

523

```java { .api }

524

// Partition discovery interval (milliseconds)

525

props.setProperty("flink.partition-discovery.interval-millis", "30000");

526

527

// Disable metrics collection

528

props.setProperty("flink.disable-metrics", "false");

529

530

// Retry configuration for partition discovery

531

props.setProperty("flink.get-partitions.retry", "3");

532

```