or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdserialization.mdstreaming-consumer.mdstreaming-producer.mdtable-api.md

streaming-producer.mddocs/

0

# Streaming Producer

1

2

The `FlinkKafkaProducer011` provides transactional Kafka message production with exactly-once semantics, flexible partitioning, and comprehensive delivery guarantee options optimized for Kafka 0.11.x.

3

4

## Capabilities

5

6

### FlinkKafkaProducer011 Class

7

8

Main producer class for writing to Kafka 0.11.x topics with transaction support and exactly-once guarantees.

9

10

```java { .api }

11

/**

12

* Kafka producer for Kafka 0.11.x with support for transactional writes and exactly-once semantics

13

* Extends TwoPhaseCommitSinkFunction for transaction coordination

14

*/

15

@PublicEvolving

16

class FlinkKafkaProducer011<IN> extends TwoPhaseCommitSinkFunction<IN, KafkaTransactionState, KafkaTransactionContext> {

17

18

// Simple constructors for basic use cases

19

FlinkKafkaProducer011(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);

20

FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);

21

FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner);

22

23

// Keyed serialization constructors

24

FlinkKafkaProducer011(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);

25

FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);

26

FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Semantic semantic);

27

FlinkKafkaProducer011(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner);

28

29

// Full constructor with all options

30

FlinkKafkaProducer011(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner, Semantic semantic, int kafkaProducersPoolSize);

31

}

32

```

33

34

**Usage Examples:**

35

36

```java

37

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

38

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;

39

import org.apache.flink.api.common.serialization.SimpleStringSchema;

40

41

// Basic producer setup

42

Properties props = new Properties();

43

props.setProperty("bootstrap.servers", "localhost:9092");

44

45

FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(

46

"output-topic",

47

new SimpleStringSchema(),

48

props

49

);

50

51

// Producer with exactly-once semantics

52

props.setProperty("transaction.timeout.ms", "900000"); // 15 minutes

53

FlinkKafkaProducer011<String> exactlyOnceProducer = new FlinkKafkaProducer011<>(

54

"critical-topic",

55

new SimpleStringSchema(),

56

props,

57

Semantic.EXACTLY_ONCE

58

);

59

60

// Producer with custom partitioner

61

FlinkFixedPartitioner<String> partitioner = new FlinkFixedPartitioner<>();

62

FlinkKafkaProducer011<String> partitionedProducer = new FlinkKafkaProducer011<>(

63

"partitioned-topic",

64

new SimpleStringSchema(),

65

props,

66

Optional.of(partitioner)

67

);

68

```

69

70

### Delivery Semantics

71

72

Configure delivery guarantees and transaction behavior.

73

74

```java { .api }

75

/**

76

* Delivery semantics for the Kafka producer

77

*/

78

enum Semantic {

79

/** Transactional writes with exactly-once guarantees */

80

EXACTLY_ONCE,

81

/** At-least-once delivery semantics */

82

AT_LEAST_ONCE,

83

/** No delivery guarantees */

84

NONE

85

}

86

```

87

88

**Usage Examples:**

89

90

```java

91

// Exactly-once semantics (requires Kafka transactions)

92

Properties exactlyOnceProps = new Properties();

93

exactlyOnceProps.setProperty("bootstrap.servers", "localhost:9092");

94

exactlyOnceProps.setProperty("transaction.timeout.ms", "900000");

95

exactlyOnceProps.setProperty("enable.idempotence", "true");

96

97

FlinkKafkaProducer011<String> exactlyOnceProducer = new FlinkKafkaProducer011<>(

98

"reliable-topic",

99

new SimpleStringSchema(),

100

exactlyOnceProps,

101

Semantic.EXACTLY_ONCE

102

);

103

104

// At-least-once semantics (faster, but may have duplicates)

105

FlinkKafkaProducer011<String> atLeastOnceProducer = new FlinkKafkaProducer011<>(

106

"fast-topic",

107

new SimpleStringSchema(),

108

props,

109

Semantic.AT_LEAST_ONCE

110

);

111

112

// No guarantees (fastest, fire-and-forget)

113

FlinkKafkaProducer011<String> fireForgetProducer = new FlinkKafkaProducer011<>(

114

"logs-topic",

115

new SimpleStringSchema(),

116

props,

117

Semantic.NONE

118

);

119

```

120

121

### Producer Configuration

122

123

Essential configuration options and constants for optimal producer behavior.

124

125

```java { .api }

126

/**

127

* Configuration constants for producer tuning

128

*/

129

static final int SAFE_SCALE_DOWN_FACTOR = 5; // Safe scale down factor for transactional IDs

130

static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; // Default producer pool size

131

static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours(1); // Default transaction timeout

132

static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; // Configuration key for disabling metrics

133

```

134

135

**Usage Examples:**

136

137

```java

138

Properties producerProps = new Properties();

139

producerProps.setProperty("bootstrap.servers", "localhost:9092");

140

141

// Transaction configuration for exactly-once

142

producerProps.setProperty("transaction.timeout.ms", "900000"); // 15 minutes

143

producerProps.setProperty("enable.idempotence", "true");

144

producerProps.setProperty("retries", "2147483647"); // Max retries

145

producerProps.setProperty("max.in.flight.requests.per.connection", "5");

146

147

// Performance tuning

148

producerProps.setProperty("batch.size", "16384");

149

producerProps.setProperty("linger.ms", "5");

150

producerProps.setProperty("buffer.memory", "33554432");

151

producerProps.setProperty("compression.type", "snappy");

152

153

// Flink-specific configuration

154

producerProps.setProperty("flink.disable-metrics", "false");

155

156

// Create producer with custom pool size

157

FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(

158

"my-topic",

159

new KeyedSerializationSchema<String>() {

160

@Override

161

public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {

162

return new ProducerRecord<>("my-topic", element.getBytes());

163

}

164

},

165

producerProps,

166

Optional.empty(),

167

Semantic.EXACTLY_ONCE,

168

10 // Custom producer pool size

169

);

170

```

171

172

### Advanced Producer Methods

173

174

Additional configuration methods for specialized use cases.

175

176

```java { .api }

177

/**

178

* Configure timestamp writing to Kafka records

179

* @param writeTimestampToKafka whether to write Flink timestamps to Kafka

180

*/

181

void setWriteTimestampToKafka(boolean writeTimestampToKafka);

182

183

/**

184

* Configure error handling mode

185

* @param logFailuresOnly if true, only log failures instead of failing the job

186

*/

187

void setLogFailuresOnly(boolean logFailuresOnly);

188

189

/**

190

* Override transaction timeout handling for specific error scenarios

191

* @return this producer instance for method chaining

192

*/

193

FlinkKafkaProducer011<IN> ignoreFailuresAfterTransactionTimeout();

194

```

195

196

**Usage Examples:**

197

198

```java

199

FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(

200

"timestamped-topic",

201

new SimpleStringSchema(),

202

props,

203

Semantic.EXACTLY_ONCE

204

);

205

206

// Configure timestamp writing

207

producer.setWriteTimestampToKafka(true);

208

209

// Configure error handling for non-critical data

210

FlinkKafkaProducer011<String> loggingProducer = new FlinkKafkaProducer011<>(

211

"logs-topic",

212

new SimpleStringSchema(),

213

props,

214

Semantic.AT_LEAST_ONCE

215

);

216

loggingProducer.setLogFailuresOnly(true);

217

218

// Handle transaction timeout gracefully

219

producer.ignoreFailuresAfterTransactionTimeout();

220

221

// Add to DataStream

222

dataStream.addSink(producer).name("Kafka Sink");

223

```

224

225

### Custom Partitioning

226

227

Implement custom partitioning strategies for message distribution.

228

229

```java { .api }

230

/**

231

* Base class for custom Kafka partitioners

232

*/

233

@PublicEvolving

234

abstract class FlinkKafkaPartitioner<T> implements Serializable {

235

/**

236

* Initialize the partitioner

237

* @param parallelInstanceId the parallel instance ID of this subtask

238

* @param parallelInstances total number of parallel instances

239

*/

240

void open(int parallelInstanceId, int parallelInstances);

241

242

/**

243

* Determine the partition for a record

244

* @param record the record to partition

245

* @param key the record key

246

* @param value the record value

247

* @param targetTopic the target topic name

248

* @param partitions available partition IDs

249

* @return the partition ID to use

250

*/

251

abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);

252

}

253

254

/**

255

* Fixed partitioner ensuring each Flink partition goes to one Kafka partition

256

*/

257

@PublicEvolving

258

class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {

259

// Implementation ensures deterministic partition assignment

260

}

261

```

262

263

**Usage Examples:**

264

265

```java

266

// Custom partitioner based on record content

267

public class CustomPartitioner<String> extends FlinkKafkaPartitioner<String> {

268

@Override

269

public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {

270

// Hash-based partitioning on record content

271

return Math.abs(record.hashCode()) % partitions.length;

272

}

273

}

274

275

// Use custom partitioner

276

CustomPartitioner<String> customPartitioner = new CustomPartitioner<>();

277

FlinkKafkaProducer011<String> partitionedProducer = new FlinkKafkaProducer011<>(

278

"custom-partitioned-topic",

279

new SimpleStringSchema(),

280

props,

281

Optional.of(customPartitioner),

282

Semantic.EXACTLY_ONCE,

283

DEFAULT_KAFKA_PRODUCERS_POOL_SIZE

284

);

285

```

286

287

## Error Handling

288

289

The producer integrates with the Flink Kafka exception hierarchy for comprehensive error management.

290

291

```java { .api }

292

// Producer operations may throw FlinkKafka011Exception for transactional or configuration errors

293

// Common error scenarios:

294

// - PRODUCERS_POOL_EMPTY: No available producers in the pool

295

// - EXTERNAL_ERROR: Kafka broker or network issues

296

```

297

298

**Configuration for resilient operation:**

299

300

```java

301

Properties resilientProps = new Properties();

302

resilientProps.setProperty("bootstrap.servers", "localhost:9092");

303

resilientProps.setProperty("retries", "2147483647");

304

resilientProps.setProperty("max.in.flight.requests.per.connection", "5");

305

resilientProps.setProperty("enable.idempotence", "true");

306

resilientProps.setProperty("acks", "all");

307

resilientProps.setProperty("transaction.timeout.ms", "600000"); // 10 minutes

308

resilientProps.setProperty("delivery.timeout.ms", "600000");

309

resilientProps.setProperty("request.timeout.ms", "300000");

310

```