or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdoffset-management.mdtable-api.md

kafka-producer.mddocs/

0

# Kafka Producer

1

2

The Flink Kafka producer enables reliable message production to Kafka 0.8.x topics with configurable partitioning and serialization strategies.

3

4

## Capabilities

5

6

### FlinkKafkaProducer08

7

8

The primary producer class for Kafka 0.8.x integration with extensive configuration options.

9

10

```java { .api }

11

/**

12

* Kafka producer for Apache Kafka 0.8.x (provides best-effort delivery guarantees)

13

*/

14

public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {

15

16

/**

17

* Creates producer with broker list, topic, and value-only serialization

18

* @param brokerList Comma-separated list of Kafka brokers

19

* @param topicId Target Kafka topic name

20

* @param serializationSchema Schema for serializing values

21

*/

22

public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);

23

24

/**

25

* Creates producer with properties and value-only serialization

26

* @param topicId Target Kafka topic name

27

* @param serializationSchema Schema for serializing values

28

* @param producerConfig Kafka producer properties

29

*/

30

public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);

31

32

/**

33

* Creates producer with properties, value serialization, and custom partitioner

34

* @param topicId Target Kafka topic name

35

* @param serializationSchema Schema for serializing values

36

* @param producerConfig Kafka producer properties

37

* @param customPartitioner Custom partitioner for message distribution

38

*/

39

public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);

40

41

/**

42

* Creates producer with broker list, topic, and key-value serialization

43

* @param brokerList Comma-separated list of Kafka brokers

44

* @param topicId Target Kafka topic name

45

* @param serializationSchema Schema for serializing keys and values

46

*/

47

public FlinkKafkaProducer08(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);

48

49

/**

50

* Creates producer with properties and key-value serialization

51

* @param topicId Target Kafka topic name

52

* @param serializationSchema Schema for serializing keys and values

53

* @param producerConfig Kafka producer properties

54

*/

55

public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);

56

57

/**

58

* Creates producer with properties, key-value serialization, and custom partitioner

59

* @param topicId Target Kafka topic name

60

* @param serializationSchema Schema for serializing keys and values

61

* @param producerConfig Kafka producer properties

62

* @param customPartitioner Custom partitioner for message distribution

63

*/

64

public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);

65

66

/**

67

* @deprecated Use FlinkKafkaPartitioner instead of KafkaPartitioner

68

*/

69

@Deprecated

70

public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);

71

72

/**

73

* @deprecated Use FlinkKafkaPartitioner instead of KafkaPartitioner

74

*/

75

@Deprecated

76

public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);

77

}

78

```

79

80

**Usage Examples:**

81

82

```java

83

import org.apache.flink.streaming.api.datastream.DataStream;

84

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;

85

import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

86

import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

87

88

import java.util.Properties;

89

90

// Basic producer with broker list

91

FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(

92

"localhost:9092",

93

"output-topic",

94

new SimpleStringSchema()

95

);

96

97

DataStream<String> stream = env.fromElements("Hello", "World", "Kafka");

98

stream.addSink(producer);

99

100

// Producer with properties configuration

101

Properties producerProps = new Properties();

102

producerProps.setProperty("bootstrap.servers", "localhost:9092");

103

producerProps.setProperty("batch.size", "16384");

104

producerProps.setProperty("linger.ms", "10");

105

producerProps.setProperty("compression.type", "snappy");

106

107

FlinkKafkaProducer08<String> configuredProducer = new FlinkKafkaProducer08<>(

108

"events-topic",

109

new SimpleStringSchema(),

110

producerProps

111

);

112

113

stream.addSink(configuredProducer);

114

115

// Producer with custom partitioner

116

FlinkKafkaProducer08<String> partitionedProducer = new FlinkKafkaProducer08<>(

117

"partitioned-topic",

118

new SimpleStringSchema(),

119

producerProps,

120

new CustomPartitioner<>() // implement FlinkKafkaPartitioner

121

);

122

123

// Key-value producer

124

KeyedSerializationSchema<Tuple2<String, String>> keyValueSchema =

125

new KeyedSerializationSchemaWrapper<>(

126

new SimpleStringSchema(), // key serializer

127

new SimpleStringSchema() // value serializer

128

);

129

130

FlinkKafkaProducer08<Tuple2<String, String>> kvProducer = new FlinkKafkaProducer08<>(

131

"key-value-topic",

132

keyValueSchema,

133

producerProps

134

);

135

136

DataStream<Tuple2<String, String>> kvStream = env.fromElements(

137

Tuple2.of("key1", "value1"),

138

Tuple2.of("key2", "value2")

139

);

140

kvStream.addSink(kvProducer);

141

```

142

143

### FlinkKafkaProducer (Deprecated)

144

145

Deprecated alias that redirects to FlinkKafkaProducer08.

146

147

```java { .api }

148

/**

149

* @deprecated Use FlinkKafkaProducer08 instead

150

*/

151

@Deprecated

152

public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> {

153

/**

154

* @deprecated Use FlinkKafkaProducer08 constructor instead

155

*/

156

@Deprecated

157

public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);

158

159

/**

160

* @deprecated Use FlinkKafkaProducer08 constructor instead

161

*/

162

@Deprecated

163

public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);

164

165

/**

166

* @deprecated Use FlinkKafkaProducer08 constructor instead

167

*/

168

@Deprecated

169

public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner);

170

171

/**

172

* @deprecated Use FlinkKafkaProducer08 constructor instead

173

*/

174

@Deprecated

175

public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);

176

177

/**

178

* @deprecated Use FlinkKafkaProducer08 constructor instead

179

*/

180

@Deprecated

181

public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);

182

183

/**

184

* @deprecated Use FlinkKafkaProducer08 constructor instead

185

*/

186

@Deprecated

187

public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner);

188

}

189

```

190

191

## Configuration

192

193

### Required Properties

194

195

- **bootstrap.servers**: Comma-separated list of Kafka broker addresses

196

197

### Recommended Properties

198

199

- **batch.size**: Number of bytes to batch before sending (default: 16384)

200

- **linger.ms**: Time to wait for additional messages in batch (default: 0)

201

- **compression.type**: Compression algorithm (`none`, `gzip`, `snappy`, `lz4`)

202

- **acks**: Acknowledgment mode (`0`, `1`, `all`)

203

- **retries**: Number of retry attempts on failure

204

- **retry.backoff.ms**: Backoff time between retries

205

206

### Advanced Properties

207

208

- **buffer.memory**: Total memory for producer buffering

209

- **max.block.ms**: Maximum time to block on send

210

- **request.timeout.ms**: Request timeout duration

211

- **delivery.timeout.ms**: Total time for message delivery

212

213

## Serialization Schemas

214

215

The producer supports various serialization schemas:

216

217

```java

218

// Simple string serialization

219

SerializationSchema<String> stringSchema = new SimpleStringSchema();

220

221

// JSON serialization

222

SerializationSchema<MyObject> jsonSchema = new JSONSerializationSchema<>();

223

224

// Avro serialization (with schema registry)

225

SerializationSchema<MyAvroRecord> avroSchema = new AvroSerializationSchema<>(MyAvroRecord.class);

226

227

// Custom serialization

228

SerializationSchema<MyCustomType> customSchema = new SerializationSchema<MyCustomType>() {

229

@Override

230

public byte[] serialize(MyCustomType element) {

231

// Custom serialization logic

232

return element.toBytes();

233

}

234

};

235

```

236

237

## Custom Partitioning

238

239

Implement custom partitioning logic:

240

241

```java

242

public class MyCustomPartitioner implements FlinkKafkaPartitioner<String> {

243

244

@Override

245

public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {

246

// Custom partitioning logic

247

if (record.startsWith("urgent")) {

248

return 0; // Route urgent messages to partition 0

249

}

250

return record.hashCode() % partitions.length;

251

}

252

}

253

254

// Use custom partitioner

255

FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(

256

"my-topic",

257

new SimpleStringSchema(),

258

props,

259

new MyCustomPartitioner()

260

);

261

```

262

263

## Error Handling

264

265

The producer handles errors according to Kafka 0.8.x limitations:

266

267

- **Best-effort delivery**: No exactly-once guarantees with Kafka 0.8.x

268

- **Retry behavior**: Configurable through Kafka producer properties

269

- **Failure modes**: Messages may be lost on producer failures

270

271

```java

272

// Configure retry behavior

273

Properties props = new Properties();

274

props.setProperty("bootstrap.servers", "localhost:9092");

275

props.setProperty("retries", "3");

276

props.setProperty("retry.backoff.ms", "1000");

277

props.setProperty("acks", "1"); // Wait for leader acknowledgment

278

279

// Error handling in application

280

try {

281

stream.addSink(producer);

282

env.execute("Kafka Producer Job");

283

} catch (Exception e) {

284

logger.error("Kafka producer failed", e);

285

// Implement fallback or recovery logic

286

}

287

```

288

289

## Performance Considerations

290

291

- **Batching**: Use appropriate `batch.size` and `linger.ms` for throughput

292

- **Compression**: Enable compression for network efficiency

293

- **Partitioning**: Distribute load evenly across partitions

294

- **Parallelism**: Match producer parallelism to topic partition count

295

- **Memory**: Configure `buffer.memory` based on throughput requirements