or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdtable-api.md

kafka-producer.mddocs/

0

# Kafka Producer API

1

2

Comprehensive API reference for Kafka 0.8 producer classes in the Apache Flink Kafka Connector.

3

4

## FlinkKafkaProducer08<IN>

5

6

**Package:** `org.apache.flink.streaming.connectors.kafka`

7

**Annotations:** `@PublicEvolving`

8

**Extends:** `FlinkKafkaProducerBase<IN>`

9

**Description:** Flink sink to produce data into a Kafka topic. Compatible with Kafka 0.8.x. **Important Note**: This producer does not have reliability guarantees and may lose messages on failures.

10

11

### Class Declaration

12

13

```java { .api }

14

@PublicEvolving

15

public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>

16

```

17

18

### Constants

19

20

```java { .api }

21

private static final long serialVersionUID = 1L;

22

```

23

24

### Constructors

25

26

#### Key-less Serialization Constructors

27

28

```java { .api }

29

/**

30

* Creates a FlinkKafkaProducer08 using a broker list and SerializationSchema.

31

*

32

* @param brokerList Comma separated list of Kafka brokers (e.g., "localhost:9092,broker2:9092")

33

* @param topicId Target Kafka topic name

34

* @param serializationSchema Schema to serialize objects to byte arrays

35

*/

36

public FlinkKafkaProducer08(String brokerList,

37

String topicId,

38

SerializationSchema<IN> serializationSchema)

39

40

/**

41

* Creates a FlinkKafkaProducer08 using Properties and SerializationSchema.

42

*

43

* @param topicId Target Kafka topic name

44

* @param serializationSchema Schema to serialize objects to byte arrays

45

* @param producerConfig Properties containing Kafka producer configuration

46

*/

47

public FlinkKafkaProducer08(String topicId,

48

SerializationSchema<IN> serializationSchema,

49

Properties producerConfig)

50

51

/**

52

* Creates a FlinkKafkaProducer08 with custom partitioner and SerializationSchema.

53

*

54

* @param topicId Target Kafka topic name

55

* @param serializationSchema Schema to serialize objects to byte arrays

56

* @param producerConfig Properties containing Kafka producer configuration

57

* @param customPartitioner Custom partitioner for determining target partition (can be null)

58

*/

59

public FlinkKafkaProducer08(String topicId,

60

SerializationSchema<IN> serializationSchema,

61

Properties producerConfig,

62

@Nullable FlinkKafkaPartitioner<IN> customPartitioner)

63

```

64

65

#### Key/Value Serialization Constructors

66

67

```java { .api }

68

/**

69

* Creates a FlinkKafkaProducer08 using broker list and KeyedSerializationSchema.

70

*

71

* @param brokerList Comma separated list of Kafka brokers

72

* @param topicId Target Kafka topic name

73

* @param serializationSchema Schema to serialize objects to key/value byte arrays

74

*/

75

public FlinkKafkaProducer08(String brokerList,

76

String topicId,

77

KeyedSerializationSchema<IN> serializationSchema)

78

79

/**

80

* Creates a FlinkKafkaProducer08 using Properties and KeyedSerializationSchema.

81

*

82

* @param topicId Target Kafka topic name

83

* @param serializationSchema Schema to serialize objects to key/value byte arrays

84

* @param producerConfig Properties containing Kafka producer configuration

85

*/

86

public FlinkKafkaProducer08(String topicId,

87

KeyedSerializationSchema<IN> serializationSchema,

88

Properties producerConfig)

89

90

/**

91

* Creates a FlinkKafkaProducer08 with custom partitioner and KeyedSerializationSchema.

92

*

93

* @param topicId Target Kafka topic name

94

* @param serializationSchema Schema to serialize objects to key/value byte arrays

95

* @param producerConfig Properties containing Kafka producer configuration

96

* @param customPartitioner Custom partitioner for determining target partition (can be null)

97

*/

98

public FlinkKafkaProducer08(String topicId,

99

KeyedSerializationSchema<IN> serializationSchema,

100

Properties producerConfig,

101

@Nullable FlinkKafkaPartitioner<IN> customPartitioner)

102

```

103

104

#### Deprecated Constructors

105

106

```java { .api }

107

/**

108

* @deprecated Use constructor with FlinkKafkaPartitioner instead of KafkaPartitioner

109

*/

110

@Deprecated

111

public FlinkKafkaProducer08(String topicId,

112

SerializationSchema<IN> serializationSchema,

113

Properties producerConfig,

114

KafkaPartitioner<IN> customPartitioner)

115

116

/**

117

* @deprecated Use constructor with FlinkKafkaPartitioner instead of KafkaPartitioner

118

*/

119

@Deprecated

120

public FlinkKafkaProducer08(String topicId,

121

KeyedSerializationSchema<IN> serializationSchema,

122

Properties producerConfig,

123

KafkaPartitioner<IN> customPartitioner)

124

```

125

126

### Configuration Methods (Inherited from FlinkKafkaProducerBase)

127

128

```java { .api }

129

/**

130

* Configures whether the producer should only log failures instead of throwing exceptions.

131

* Default is false (failures cause exceptions).

132

*

133

* @param logFailuresOnly If true, only log failures; if false, throw exceptions on failures

134

*/

135

public void setLogFailuresOnly(boolean logFailuresOnly)

136

137

/**

138

* Configures whether the producer should flush data on checkpoint operations.

139

* This can improve reliability at the cost of performance.

140

*

141

* @param flush If true, flush on checkpoints; if false, don't flush

142

*/

143

public void setFlushOnCheckpoint(boolean flush)

144

```

145

146

### Lifecycle Methods (Inherited from FlinkKafkaProducerBase)

147

148

```java { .api }

149

/**

150

* Opens the producer and initializes resources.

151

*

152

* @param configuration The task configuration

153

*/

154

public void open(Configuration configuration)

155

156

/**

157

* Sends a record to Kafka.

158

*

159

* @param next The record to send

160

* @param context The sink context (can be used for side outputs)

161

* @throws Exception If sending fails

162

*/

163

public void invoke(IN next, Context context) throws Exception

164

165

/**

166

* Closes the producer and cleans up resources.

167

*

168

* @throws Exception If cleanup fails

169

*/

170

public void close() throws Exception

171

```

172

173

### Checkpointing Interface Methods (Inherited)

174

175

```java { .api }

176

/**

177

* Initializes the state of the function from a checkpoint.

178

*

179

* @param context The initialization context

180

* @throws Exception If state initialization fails

181

*/

182

public void initializeState(FunctionInitializationContext context) throws Exception

183

184

/**

185

* Snapshots the function's state during checkpointing.

186

*

187

* @param ctx The snapshot context

188

* @throws Exception If state snapshotting fails

189

*/

190

public void snapshotState(FunctionSnapshotContext ctx) throws Exception

191

```

192

193

### Static Utility Methods (Inherited)

194

195

```java { .api }

196

/**

197

* Converts a comma-separated broker list to Properties.

198

*

199

* @param brokerList Comma-separated list of brokers (e.g., "broker1:9092,broker2:9092")

200

* @return Properties object with "metadata.broker.list" set

201

*/

202

public static Properties getPropertiesFromBrokerList(String brokerList)

203

```

204

205

### Protected Methods

206

207

```java { .api }

208

/**

209

* Kafka 0.8 specific flush implementation (no-op since Kafka 0.8 doesn't support flushing).

210

*/

211

protected void flush()

212

```

213

214

## FlinkKafkaPartitioner<T> Interface

215

216

**Package:** `org.apache.flink.streaming.connectors.kafka.partitioner`

217

**Description:** Interface for custom partitioning strategies in Flink Kafka producers.

218

219

```java { .api }

220

public abstract class FlinkKafkaPartitioner<T> implements Serializable {

221

222

/**

223

* Returns the partition ID for a given record.

224

*

225

* @param record The record to partition

226

* @param key The serialized key (can be null)

227

* @param value The serialized value

228

* @param targetTopic The target topic name

229

* @param partitions Array of available partition IDs

230

* @return The partition ID (must be within partitions array bounds)

231

*/

232

public abstract int partition(T record, byte[] key, byte[] value,

233

String targetTopic, int[] partitions);

234

235

/**

236

* Called when the partitioner is opened. Override for initialization logic.

237

*

238

* @param parallelInstanceId The parallel instance ID of this subtask

239

* @param parallelInstances The total number of parallel instances

240

*/

241

public void open(int parallelInstanceId, int parallelInstances) {

242

// Default implementation is empty

243

}

244

}

245

```

246

247

### Custom Partitioner Examples

248

249

```java { .api }

250

// Hash-based partitioning by customer ID

251

public class CustomerHashPartitioner extends FlinkKafkaPartitioner<CustomerEvent> {

252

253

@Override

254

public int partition(CustomerEvent record, byte[] key, byte[] value,

255

String targetTopic, int[] partitions) {

256

int customerId = record.getCustomerId();

257

return Math.abs(customerId % partitions.length);

258

}

259

}

260

261

// Round-robin partitioning

262

public class RoundRobinPartitioner<T> extends FlinkKafkaPartitioner<T> {

263

private int counter = 0;

264

265

@Override

266

public int partition(T record, byte[] key, byte[] value,

267

String targetTopic, int[] partitions) {

268

return partitions[(counter++) % partitions.length];

269

}

270

}

271

272

// Time-based partitioning

273

public class TimeBasedPartitioner extends FlinkKafkaPartitioner<TimestampedEvent> {

274

275

@Override

276

public int partition(TimestampedEvent record, byte[] key, byte[] value,

277

String targetTopic, int[] partitions) {

278

// Partition based on hour of day

279

long timestamp = record.getTimestamp();

280

int hour = (int) ((timestamp / 3600000) % 24);

281

return partitions[hour % partitions.length];

282

}

283

}

284

```

285

286

## Usage Examples

287

288

### Basic Producer Setup

289

290

```java { .api }

291

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

292

import org.apache.flink.streaming.api.datastream.DataStream;

293

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;

294

import org.apache.flink.api.common.serialization.SimpleStringSchema;

295

296

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

297

298

Properties properties = new Properties();

299

properties.setProperty("metadata.broker.list", "localhost:9092");

300

301

FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(

302

"my-topic",

303

new SimpleStringSchema(),

304

properties

305

);

306

307

DataStream<String> stream = env.fromElements("Hello", "World", "Kafka");

308

stream.addSink(producer);

309

310

env.execute("Kafka Producer Job");

311

```

312

313

### Producer with Custom Partitioner

314

315

```java { .api }

316

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

317

318

// Custom partitioner implementation

319

FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkKafkaPartitioner<MyEvent>() {

320

@Override

321

public int partition(MyEvent record, byte[] key, byte[] value,

322

String targetTopic, int[] partitions) {

323

// Partition based on event type

324

return Math.abs(record.getEventType().hashCode() % partitions.length);

325

}

326

};

327

328

FlinkKafkaProducer08<MyEvent> producer = new FlinkKafkaProducer08<>(

329

"events-topic",

330

new MyEventSerializer(),

331

properties,

332

partitioner

333

);

334

335

// Configure for reliability

336

producer.setLogFailuresOnly(false); // Fail on errors

337

producer.setFlushOnCheckpoint(true); // Flush on checkpoints

338

339

DataStream<MyEvent> events = // ... your event stream

340

events.addSink(producer);

341

```

342

343

### Producer with Key/Value Serialization

344

345

```java { .api }

346

import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;

347

348

KeyedSerializationSchema<CustomerOrder> schema = new KeyedSerializationSchema<CustomerOrder>() {

349

@Override

350

public byte[] serializeKey(CustomerOrder element) {

351

// Use customer ID as key for partitioning

352

return String.valueOf(element.getCustomerId()).getBytes();

353

}

354

355

@Override

356

public byte[] serializeValue(CustomerOrder element) {

357

// Serialize order details as JSON

358

return element.toJson().getBytes();

359

}

360

361

@Override

362

public String getTargetTopic(CustomerOrder element) {

363

// Dynamic topic selection based on order type

364

return "orders-" + element.getOrderType().toLowerCase();

365

}

366

};

367

368

FlinkKafkaProducer08<CustomerOrder> producer = new FlinkKafkaProducer08<>(

369

null, // topic will be determined by schema.getTargetTopic()

370

schema,

371

properties

372

);

373

374

DataStream<CustomerOrder> orders = // ... your order stream

375

orders.addSink(producer);

376

```

377

378

### Producer Configuration Examples

379

380

```java { .api }

381

Properties producerProps = new Properties();

382

383

// Required: Broker connection

384

producerProps.setProperty("metadata.broker.list", "broker1:9092,broker2:9092");

385

386

// Performance tuning

387

producerProps.setProperty("batch.num.messages", "200");

388

producerProps.setProperty("queue.buffering.max.ms", "1000");

389

producerProps.setProperty("queue.buffering.max.messages", "10000");

390

391

// Compression (optional)

392

producerProps.setProperty("compression.codec", "snappy"); // or "gzip", "lz4"

393

394

// Network settings

395

producerProps.setProperty("send.buffer.bytes", "102400");

396

producerProps.setProperty("client.id", "flink-kafka-producer");

397

398

// Reliability settings (Kafka 0.8 limitations apply)

399

producerProps.setProperty("request.required.acks", "1"); // 0, 1, or -1

400

producerProps.setProperty("request.timeout.ms", "10000");

401

producerProps.setProperty("message.send.max.retries", "3");

402

producerProps.setProperty("retry.backoff.ms", "100");

403

404

FlinkKafkaProducer08<MyData> producer = new FlinkKafkaProducer08<>(

405

"my-topic", new MyDataSerializer(), producerProps);

406

```

407

408

### Error Handling and Reliability

409

410

```java { .api }

411

// Configure producer for different reliability levels

412

FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(

413

"my-topic", new SimpleStringSchema(), properties);

414

415

// Option 1: Fail fast on errors (default)

416

producer.setLogFailuresOnly(false);

417

418

// Option 2: Log errors but continue processing

419

producer.setLogFailuresOnly(true);

420

421

// Enable flushing on checkpoints for better reliability

422

producer.setFlushOnCheckpoint(true);

423

424

// Note: Kafka 0.8 producer limitations

425

// - No transactional guarantees

426

// - No exactly-once semantics

427

// - Messages may be lost on producer failures

428

// - No built-in retry mechanism for failed sends

429

```

430

431

### Custom Serialization Examples

432

433

```java { .api }

434

// Simple SerializationSchema example

435

public class JsonSerializationSchema<T> implements SerializationSchema<T> {

436

private final ObjectMapper objectMapper = new ObjectMapper();

437

438

@Override

439

public byte[] serialize(T element) {

440

try {

441

return objectMapper.writeValueAsBytes(element);

442

} catch (Exception e) {

443

throw new RuntimeException("Failed to serialize to JSON", e);

444

}

445

}

446

}

447

448

// KeyedSerializationSchema with dynamic routing

449

public class RoutingKeyedSchema<T> implements KeyedSerializationSchema<T> {

450

451

@Override

452

public byte[] serializeKey(T element) {

453

if (element instanceof Keyed) {

454

return ((Keyed) element).getKey().getBytes();

455

}

456

return null; // No key

457

}

458

459

@Override

460

public byte[] serializeValue(T element) {

461

return element.toString().getBytes();

462

}

463

464

@Override

465

public String getTargetTopic(T element) {

466

if (element instanceof Routable) {

467

return ((Routable) element).getTargetTopic();

468

}

469

return null; // Use default topic

470

}

471

}

472

```

473

474

## Deprecated FlinkKafkaProducer<IN>

475

476

**Package:** `org.apache.flink.streaming.connectors.kafka`

477

**Annotations:** `@Deprecated`

478

**Extends:** `FlinkKafkaProducer08<IN>`

479

480

```java { .api }

481

@Deprecated

482

public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>

483

```

484

485

### Deprecated Constructors

486

487

```java { .api }

488

/**

489

* @deprecated Use FlinkKafkaProducer08 instead

490

*/

491

@Deprecated

492

public FlinkKafkaProducer(String brokerList, String topicId,

493

SerializationSchema<IN> serializationSchema)

494

495

/**

496

* @deprecated Use FlinkKafkaProducer08 instead

497

*/

498

@Deprecated

499

public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema,

500

Properties producerConfig)

501

502

/**

503

* @deprecated Use FlinkKafkaProducer08 instead

504

*/

505

@Deprecated

506

public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema,

507

Properties producerConfig, KafkaPartitioner customPartitioner)

508

509

/**

510

* @deprecated Use FlinkKafkaProducer08 instead

511

*/

512

@Deprecated

513

public FlinkKafkaProducer(String brokerList, String topicId,

514

KeyedSerializationSchema<IN> serializationSchema)

515

516

/**

517

* @deprecated Use FlinkKafkaProducer08 instead

518

*/

519

@Deprecated

520

public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema,

521

Properties producerConfig)

522

523

/**

524

* @deprecated Use FlinkKafkaProducer08 instead

525

*/

526

@Deprecated

527

public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema,

528

Properties producerConfig, KafkaPartitioner customPartitioner)

529

```

530

531

## FlinkKafkaPartitioner<T> Interface

532

533

**Package:** `org.apache.flink.streaming.connectors.kafka.partitioner`

534

**Annotations:** `@PublicEvolving`

535

**Description:** Abstract base class for custom partitioning logic determining which Kafka partition records should be written to.

536

537

### Interface Definition

538

539

```java { .api }

540

@PublicEvolving

541

public abstract class FlinkKafkaPartitioner<T> implements Serializable {

542

543

/**

544

* Initializer for the partitioner. Called once on each parallel sink instance.

545

*

546

* @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink

547

* @param parallelInstances the total number of parallel instances

548

*/

549

public void open(int parallelInstanceId, int parallelInstances);

550

551

/**

552

* Determine the id of the partition that the record should be written to.

553

*

554

* @param record the record value

555

* @param key serialized key of the record

556

* @param value serialized value of the record

557

* @param targetTopic target topic for the record

558

* @param partitions found partitions for the target topic

559

* @return the id of the target partition

560

*/

561

public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);

562

}

563

```

564

565

### Custom Partitioner Example

566

567

```java { .api }

568

public class CustomerIdPartitioner extends FlinkKafkaPartitioner<CustomerEvent> {

569

570

@Override

571

public int partition(CustomerEvent record, byte[] key, byte[] value,

572

String targetTopic, int[] partitions) {

573

// Partition based on customer ID to ensure events for same customer go to same partition

574

return Math.abs(record.getCustomerId().hashCode() % partitions.length);

575

}

576

}

577

578

// Usage with producer

579

FlinkKafkaProducer08<CustomerEvent> producer = new FlinkKafkaProducer08<>(

580

"customer-events",

581

new CustomerEventSchema(),

582

properties,

583

new CustomerIdPartitioner()

584

);

585

```

586

587

## Kafka 0.8 Producer Limitations

588

589

### Important Reliability Considerations

590

591

1. **No Exactly-Once Guarantees**: Kafka 0.8 producers cannot provide exactly-once delivery semantics

592

2. **No Transactional Support**: No atomic writes across multiple partitions/topics

593

3. **No Built-in Flushing**: The `flush()` method is a no-op in Kafka 0.8

594

4. **Limited Error Recovery**: Failed messages may be lost without explicit retry logic

595

5. **No Idempotent Writes**: Duplicate messages may occur during retries

596

597

### Recommended Practices

598

599

```java { .api }

600

// Enable checkpointing for better reliability

601

env.enableCheckpointing(60000); // Checkpoint every minute

602

603

// Use flush on checkpoint

604

producer.setFlushOnCheckpoint(true);

605

606

// Configure appropriate retry settings in Kafka properties

607

Properties props = new Properties();

608

props.setProperty("message.send.max.retries", "3");

609

props.setProperty("retry.backoff.ms", "100");

610

props.setProperty("request.required.acks", "1"); // Wait for leader acknowledgment

611

612

// Consider using exactly-once sink if available in newer Flink versions

613

// For Kafka 0.8, implement custom retry logic if needed

614

```

615

616

### Migration Recommendations

617

618

For production systems requiring strong delivery guarantees:

619

620

1. **Upgrade to Kafka 0.9+**: Use `flink-connector-kafka-0.9` or newer for exactly-once semantics

621

2. **Implement Custom Reliability**: Add application-level retry and deduplication logic

622

3. **Monitor Producer Metrics**: Track failed sends and implement alerting

623

4. **Use Synchronous Sending**: If throughput allows, consider synchronous sends for critical data