or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

consumer.mdindex.mdproducer.mdtable-api.md

producer.mddocs/

0

# Data Stream Producer

1

2

The FlinkKafkaProducer010 provides comprehensive functionality for producing data to Apache Kafka 0.10.x topics with exactly-once processing guarantees, custom partitioning strategies, and timestamp support.

3

4

## Capabilities

5

6

### Value-Only Serialization Producers

7

8

Creates producers that serialize only the record values, without keys.

9

10

```java { .api }

11

/**

12

* Creates a FlinkKafkaProducer for a given topic using broker list

13

* @param brokerList Comma separated addresses of the brokers

14

* @param topicId ID of the Kafka topic

15

* @param serializationSchema User defined key-less serialization schema

16

*/

17

public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema);

18

19

/**

20

* Creates a FlinkKafkaProducer for a given topic using properties

21

* @param topicId ID of the Kafka topic

22

* @param serializationSchema User defined key-less serialization schema

23

* @param producerConfig Properties with the producer configuration

24

*/

25

public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig);

26

27

/**

28

* Creates a FlinkKafkaProducer with custom partitioning

29

* @param topicId The topic to write data to

30

* @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]

31

* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument

32

* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. If set to null, records will be distributed to Kafka partitions in a round-robin fashion

33

*/

34

public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);

35

```

36

37

**Usage Examples:**

38

39

```java

40

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;

41

import org.apache.flink.api.common.serialization.SimpleStringSchema;

42

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

43

44

import java.util.Properties;

45

46

// Simple producer with broker list

47

FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(

48

"localhost:9092",

49

"output-topic",

50

new SimpleStringSchema()

51

);

52

53

// Producer with properties configuration

54

Properties props = new Properties();

55

props.setProperty("bootstrap.servers", "localhost:9092");

56

props.setProperty("acks", "all");

57

props.setProperty("retries", "3");

58

59

FlinkKafkaProducer010<String> configuredProducer = new FlinkKafkaProducer010<>(

60

"output-topic",

61

new SimpleStringSchema(),

62

props

63

);

64

65

// Producer with custom partitioner

66

FlinkKafkaProducer010<MyEvent> eventProducer = new FlinkKafkaProducer010<>(

67

"events-topic",

68

new MyEventSerializationSchema(),

69

props,

70

new MyCustomPartitioner<>()

71

);

72

```

73

74

### Key-Value Serialization Producers

75

76

Creates producers that serialize both keys and values, enabling key-based partitioning.

77

78

```java { .api }

79

/**

80

* Creates a FlinkKafkaProducer with key-value serialization using broker list

81

* @param brokerList Comma separated addresses of the brokers

82

* @param topicId ID of the Kafka topic

83

* @param serializationSchema User defined serialization schema supporting key/value messages

84

*/

85

public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema);

86

87

/**

88

* Creates a FlinkKafkaProducer with key-value serialization using properties

89

* @param topicId ID of the Kafka topic

90

* @param serializationSchema User defined serialization schema supporting key/value messages

91

* @param producerConfig Properties with the producer configuration

92

*/

93

public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig);

94

95

/**

96

* Creates a FlinkKafkaProducer with key-value serialization and custom partitioning

97

* @param topicId The topic to write data to

98

* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages

99

* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument

100

* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. If set to null, records will be partitioned by the key of each record

101

*/

102

public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);

103

```

104

105

**Usage Examples:**

106

107

```java

108

import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;

109

110

// Custom keyed serialization schema

111

KeyedSerializationSchema<MyEvent> keyedSchema = new KeyedSerializationSchema<MyEvent>() {

112

@Override

113

public byte[] serializeKey(MyEvent element) {

114

return element.getUserId().getBytes();

115

}

116

117

@Override

118

public byte[] serializeValue(MyEvent element) {

119

return element.toJson().getBytes();

120

}

121

122

@Override

123

public String getTargetTopic(MyEvent element) {

124

return null; // Use default topic

125

}

126

};

127

128

// Producer with key-value serialization

129

FlinkKafkaProducer010<MyEvent> keyedProducer = new FlinkKafkaProducer010<>(

130

"keyed-events-topic",

131

keyedSchema,

132

props

133

);

134

135

// Producer with custom partitioner for key-value data

136

FlinkKafkaProducer010<MyEvent> customKeyedProducer = new FlinkKafkaProducer010<>(

137

"partitioned-events-topic",

138

keyedSchema,

139

props,

140

new UserIdPartitioner<>()

141

);

142

```

143

144

### Timestamp Configuration

145

146

Configure the producer to write Flink's event time timestamps to Kafka records.

147

148

```java { .api }

149

/**

150

* If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.

151

* Timestamps must be positive for Kafka to accept them.

152

* @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka

153

*/

154

public void setWriteTimestampToKafka(boolean writeTimestampToKafka);

155

```

156

157

**Usage Examples:**

158

159

```java

160

FlinkKafkaProducer010<MyEvent> producer = new FlinkKafkaProducer010<>(

161

"timestamped-events",

162

new MyEventSerializationSchema(),

163

props

164

);

165

166

// Enable timestamp writing to Kafka

167

producer.setWriteTimestampToKafka(true);

168

169

// Use in streaming pipeline

170

DataStream<MyEvent> events = env.addSource(new MyEventSource());

171

events.addSink(producer);

172

```

173

174

## Deprecated Factory Methods

175

176

Legacy factory methods for creating producers with timestamp support (deprecated in favor of constructor + setWriteTimestampToKafka).

177

178

```java { .api }

179

/**

180

* @deprecated Use FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties) and call setWriteTimestampToKafka(boolean)

181

*/

182

@Deprecated

183

public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(

184

DataStream<T> inStream,

185

String topicId,

186

KeyedSerializationSchema<T> serializationSchema,

187

Properties producerConfig);

188

189

/**

190

* @deprecated Use FlinkKafkaProducer010(String, SerializationSchema, Properties) and call setWriteTimestampToKafka(boolean)

191

*/

192

@Deprecated

193

public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(

194

DataStream<T> inStream,

195

String topicId,

196

SerializationSchema<T> serializationSchema,

197

Properties producerConfig);

198

199

/**

200

* @deprecated Use FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner) and call setWriteTimestampToKafka(boolean)

201

*/

202

@Deprecated

203

public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(

204

DataStream<T> inStream,

205

String topicId,

206

KeyedSerializationSchema<T> serializationSchema,

207

Properties producerConfig,

208

FlinkKafkaPartitioner<T> customPartitioner);

209

```

210

211

### Configuration Wrapper (Deprecated)

212

213

```java { .api }

214

/**

215

* Configuration wrapper for deprecated timestamp-enabled producer factory methods

216

* @deprecated Use constructor approach instead

217

*/

218

@Deprecated

219

public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {

220

/**

221

* Configure failure logging behavior

222

* @param logFailuresOnly Flag indicating if failures should only be logged instead of causing job failure

223

*/

224

public void setLogFailuresOnly(boolean logFailuresOnly);

225

226

/**

227

* Configure checkpoint flushing behavior

228

* @param flush Flag indicating if producer should flush on checkpoint

229

*/

230

public void setFlushOnCheckpoint(boolean flush);

231

232

/**

233

* Configure timestamp writing to Kafka

234

* @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka

235

*/

236

public void setWriteTimestampToKafka(boolean writeTimestampToKafka);

237

}

238

```

239

240

## Configuration Properties

241

242

### Producer-Specific Properties

243

244

Required properties:

245

- **bootstrap.servers**: Comma-separated list of Kafka broker addresses

246

247

Recommended properties for exactly-once:

248

- **acks**: Set to "all" for maximum durability

249

- **retries**: Number of retries for failed sends (e.g., "3")

250

- **enable.idempotence**: Set to "true" for exactly-once semantics

251

- **max.in.flight.requests.per.connection**: Set to "1" for ordering guarantees

252

253

Performance tuning properties:

254

- **batch.size**: Batch size for grouping records (default: 16384)

255

- **linger.ms**: Time to wait before sending batch (default: 0)

256

- **buffer.memory**: Total memory available for buffering (default: 33554432)

257

- **compression.type**: Compression algorithm ("none", "gzip", "snappy", "lz4", "zstd")

258

259

## Custom Partitioning

260

261

Implement custom partitioning logic by extending FlinkKafkaPartitioner:

262

263

```java

264

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

265

266

public class UserIdPartitioner<T> extends FlinkKafkaPartitioner<MyEvent> {

267

@Override

268

public int partition(MyEvent record, byte[] key, byte[] value, String targetTopic, int[] partitions) {

269

// Partition based on user ID hash

270

return Math.abs(record.getUserId().hashCode()) % partitions.length;

271

}

272

273

@Override

274

public void open(int parallelInstanceId, int parallelInstances) {

275

// Optional: Initialize partitioner

276

}

277

}

278

279

// Use custom partitioner

280

FlinkKafkaProducer010<MyEvent> producer = new FlinkKafkaProducer010<>(

281

"user-events",

282

new MyEventSerializationSchema(),

283

props,

284

new UserIdPartitioner<>()

285

);

286

```

287

288

## Exactly-Once Processing

289

290

Configure the producer for exactly-once processing guarantees:

291

292

```java

293

Properties props = new Properties();

294

props.setProperty("bootstrap.servers", "localhost:9092");

295

props.setProperty("acks", "all");

296

props.setProperty("retries", "3");

297

props.setProperty("enable.idempotence", "true");

298

props.setProperty("max.in.flight.requests.per.connection", "1");

299

300

FlinkKafkaProducer010<String> exactlyOnceProducer = new FlinkKafkaProducer010<>(

301

"exactly-once-topic",

302

new SimpleStringSchema(),

303

props

304

);

305

306

// Enable checkpointing in streaming environment

307

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

308

env.enableCheckpointing(5000); // Checkpoint every 5 seconds

309

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

310

```

311

312

## Error Handling

313

314

The producer handles various error scenarios:

315

316

- **Broker failures**: Automatic reconnection with configurable retries

317

- **Serialization errors**: Configurable failure or logging behavior

318

- **Network partitions**: Buffering and retry mechanisms

319

- **Topic creation**: Automatic topic creation if enabled in Kafka

320

321

## Fault Tolerance

322

323

The producer integrates with Flink's fault tolerance mechanisms:

324

325

1. **Checkpointing**: Producer state is included in Flink checkpoints

326

2. **Recovery**: On restart, exactly-once guarantees are maintained

327

3. **Commit Protocol**: Two-phase commit protocol for exactly-once end-to-end guarantees

328

4. **Flush on Checkpoint**: Ensures all buffered records are committed before checkpoint completion

329

330

```java

331

// Configure fault tolerance

332

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

333

env.enableCheckpointing(5000);

334

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

335

env.getCheckpointConfig().setCheckpointTimeout(60000);

336

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

337

```