0
# Streaming Producer
1
2
The `FlinkKafkaProducer011` provides transactional Kafka message production with exactly-once semantics, flexible partitioning, and comprehensive delivery guarantee options optimized for Kafka 0.11.x.
3
4
## Capabilities
5
6
### FlinkKafkaProducer011 Class
7
8
Main producer class for writing to Kafka 0.11.x topics with transaction support and exactly-once guarantees.
9
10
```java { .api }
11
/**
12
* Kafka producer for Kafka 0.11.x with support for transactional writes and exactly-once semantics
13
* Extends TwoPhaseCommitSinkFunction for transaction coordination
14
*/
15
@PublicEvolving
16
class FlinkKafkaProducer011<IN> extends TwoPhaseCommitSinkFunction<IN, KafkaTransactionState, KafkaTransactionContext> {
17
18
// Simple constructors for basic use cases
19
FlinkKafkaProducer011(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
20
FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
21
FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner);
22
23
// Keyed serialization constructors
24
FlinkKafkaProducer011(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);
25
FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
26
FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Semantic semantic);
27
FlinkKafkaProducer011(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner);
28
29
// Full constructor with all options
30
FlinkKafkaProducer011(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner, Semantic semantic, int kafkaProducersPoolSize);
31
}
32
```
33
34
**Usage Examples:**
35
36
```java
37
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
38
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;
39
import org.apache.flink.api.common.serialization.SimpleStringSchema;
40
41
// Basic producer setup
42
Properties props = new Properties();
43
props.setProperty("bootstrap.servers", "localhost:9092");
44
45
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
46
"output-topic",
47
new SimpleStringSchema(),
48
props
49
);
50
51
// Producer with exactly-once semantics
52
props.setProperty("transaction.timeout.ms", "900000"); // 15 minutes
53
FlinkKafkaProducer011<String> exactlyOnceProducer = new FlinkKafkaProducer011<>(
54
"critical-topic",
55
new SimpleStringSchema(),
56
props,
57
Semantic.EXACTLY_ONCE
58
);
59
60
// Producer with custom partitioner
61
FlinkFixedPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
62
FlinkKafkaProducer011<String> partitionedProducer = new FlinkKafkaProducer011<>(
63
"partitioned-topic",
64
new SimpleStringSchema(),
65
props,
66
Optional.of(partitioner)
67
);
68
```
69
70
### Delivery Semantics
71
72
Configure delivery guarantees and transaction behavior.
73
74
```java { .api }
75
/**
76
* Delivery semantics for the Kafka producer
77
*/
78
enum Semantic {
79
/** Transactional writes with exactly-once guarantees */
80
EXACTLY_ONCE,
81
/** At-least-once delivery semantics */
82
AT_LEAST_ONCE,
83
/** No delivery guarantees */
84
NONE
85
}
86
```
87
88
**Usage Examples:**
89
90
```java
91
// Exactly-once semantics (requires Kafka transactions)
92
Properties exactlyOnceProps = new Properties();
93
exactlyOnceProps.setProperty("bootstrap.servers", "localhost:9092");
94
exactlyOnceProps.setProperty("transaction.timeout.ms", "900000");
95
exactlyOnceProps.setProperty("enable.idempotence", "true");
96
97
FlinkKafkaProducer011<String> exactlyOnceProducer = new FlinkKafkaProducer011<>(
98
"reliable-topic",
99
new SimpleStringSchema(),
100
exactlyOnceProps,
101
Semantic.EXACTLY_ONCE
102
);
103
104
// At-least-once semantics (faster, but may have duplicates)
105
FlinkKafkaProducer011<String> atLeastOnceProducer = new FlinkKafkaProducer011<>(
106
"fast-topic",
107
new SimpleStringSchema(),
108
props,
109
Semantic.AT_LEAST_ONCE
110
);
111
112
// No guarantees (fastest, fire-and-forget)
113
FlinkKafkaProducer011<String> fireForgetProducer = new FlinkKafkaProducer011<>(
114
"logs-topic",
115
new SimpleStringSchema(),
116
props,
117
Semantic.NONE
118
);
119
```
120
121
### Producer Configuration
122
123
Essential configuration options and constants for optimal producer behavior.
124
125
```java { .api }
126
/**
127
* Configuration constants for producer tuning
128
*/
129
static final int SAFE_SCALE_DOWN_FACTOR = 5; // Safe scale down factor for transactional IDs
130
static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; // Default producer pool size
131
static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours(1); // Default transaction timeout
132
static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; // Configuration key for disabling metrics
133
```
134
135
**Usage Examples:**
136
137
```java
138
Properties producerProps = new Properties();
139
producerProps.setProperty("bootstrap.servers", "localhost:9092");
140
141
// Transaction configuration for exactly-once
142
producerProps.setProperty("transaction.timeout.ms", "900000"); // 15 minutes
143
producerProps.setProperty("enable.idempotence", "true");
144
producerProps.setProperty("retries", "2147483647"); // Max retries
145
producerProps.setProperty("max.in.flight.requests.per.connection", "5");
146
147
// Performance tuning
148
producerProps.setProperty("batch.size", "16384");
149
producerProps.setProperty("linger.ms", "5");
150
producerProps.setProperty("buffer.memory", "33554432");
151
producerProps.setProperty("compression.type", "snappy");
152
153
// Flink-specific configuration
154
producerProps.setProperty("flink.disable-metrics", "false");
155
156
// Create producer with custom pool size
157
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
158
"my-topic",
159
new KeyedSerializationSchema<String>() {
160
@Override
161
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
162
return new ProducerRecord<>("my-topic", element.getBytes());
163
}
164
},
165
producerProps,
166
Optional.empty(),
167
Semantic.EXACTLY_ONCE,
168
10 // Custom producer pool size
169
);
170
```
171
172
### Advanced Producer Methods
173
174
Additional configuration methods for specialized use cases.
175
176
```java { .api }
177
/**
178
* Configure timestamp writing to Kafka records
179
* @param writeTimestampToKafka whether to write Flink timestamps to Kafka
180
*/
181
void setWriteTimestampToKafka(boolean writeTimestampToKafka);
182
183
/**
184
* Configure error handling mode
185
* @param logFailuresOnly if true, only log failures instead of failing the job
186
*/
187
void setLogFailuresOnly(boolean logFailuresOnly);
188
189
/**
190
* Override transaction timeout handling for specific error scenarios
191
* @return this producer instance for method chaining
192
*/
193
FlinkKafkaProducer011<IN> ignoreFailuresAfterTransactionTimeout();
194
```
195
196
**Usage Examples:**
197
198
```java
199
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
200
"timestamped-topic",
201
new SimpleStringSchema(),
202
props,
203
Semantic.EXACTLY_ONCE
204
);
205
206
// Configure timestamp writing
207
producer.setWriteTimestampToKafka(true);
208
209
// Configure error handling for non-critical data
210
FlinkKafkaProducer011<String> loggingProducer = new FlinkKafkaProducer011<>(
211
"logs-topic",
212
new SimpleStringSchema(),
213
props,
214
Semantic.AT_LEAST_ONCE
215
);
216
loggingProducer.setLogFailuresOnly(true);
217
218
// Handle transaction timeout gracefully
219
producer.ignoreFailuresAfterTransactionTimeout();
220
221
// Add to DataStream
222
dataStream.addSink(producer).name("Kafka Sink");
223
```
224
225
### Custom Partitioning
226
227
Implement custom partitioning strategies for message distribution.
228
229
```java { .api }
230
/**
231
* Base class for custom Kafka partitioners
232
*/
233
@PublicEvolving
234
abstract class FlinkKafkaPartitioner<T> implements Serializable {
235
/**
236
* Initialize the partitioner
237
* @param parallelInstanceId the parallel instance ID of this subtask
238
* @param parallelInstances total number of parallel instances
239
*/
240
void open(int parallelInstanceId, int parallelInstances);
241
242
/**
243
* Determine the partition for a record
244
* @param record the record to partition
245
* @param key the record key
246
* @param value the record value
247
* @param targetTopic the target topic name
248
* @param partitions available partition IDs
249
* @return the partition ID to use
250
*/
251
abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
252
}
253
254
/**
255
* Fixed partitioner ensuring each Flink partition goes to one Kafka partition
256
*/
257
@PublicEvolving
258
class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
259
// Implementation ensures deterministic partition assignment
260
}
261
```
262
263
**Usage Examples:**
264
265
```java
266
// Custom partitioner based on record content
267
public class CustomPartitioner<String> extends FlinkKafkaPartitioner<String> {
268
@Override
269
public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
270
// Hash-based partitioning on record content
271
return Math.abs(record.hashCode()) % partitions.length;
272
}
273
}
274
275
// Use custom partitioner
276
CustomPartitioner<String> customPartitioner = new CustomPartitioner<>();
277
FlinkKafkaProducer011<String> partitionedProducer = new FlinkKafkaProducer011<>(
278
"custom-partitioned-topic",
279
new SimpleStringSchema(),
280
props,
281
Optional.of(customPartitioner),
282
Semantic.EXACTLY_ONCE,
283
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE
284
);
285
```
286
287
## Error Handling
288
289
The producer integrates with the Flink Kafka exception hierarchy for comprehensive error management.
290
291
```java { .api }
292
// Producer operations may throw FlinkKafka011Exception for transactional or configuration errors
293
// Common error scenarios:
294
// - PRODUCERS_POOL_EMPTY: No available producers in the pool
295
// - EXTERNAL_ERROR: Kafka broker or network issues
296
```
297
298
**Configuration for resilient operation:**
299
300
```java
301
Properties resilientProps = new Properties();
302
resilientProps.setProperty("bootstrap.servers", "localhost:9092");
303
resilientProps.setProperty("retries", "2147483647");
304
resilientProps.setProperty("max.in.flight.requests.per.connection", "5");
305
resilientProps.setProperty("enable.idempotence", "true");
306
resilientProps.setProperty("acks", "all");
307
resilientProps.setProperty("transaction.timeout.ms", "600000"); // 10 minutes
308
resilientProps.setProperty("delivery.timeout.ms", "600000");
309
resilientProps.setProperty("request.timeout.ms", "300000");
310
```