0
# Data Stream Producer
1
2
The FlinkKafkaProducer010 provides comprehensive functionality for producing data to Apache Kafka 0.10.x topics with exactly-once processing guarantees, custom partitioning strategies, and timestamp support.
3
4
## Capabilities
5
6
### Value-Only Serialization Producers
7
8
Creates producers that serialize only the record values, without keys.
9
10
```java { .api }
11
/**
12
* Creates a FlinkKafkaProducer for a given topic using broker list
13
* @param brokerList Comma separated addresses of the brokers
14
* @param topicId ID of the Kafka topic
15
* @param serializationSchema User defined key-less serialization schema
16
*/
17
public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema);
18
19
/**
20
* Creates a FlinkKafkaProducer for a given topic using properties
21
* @param topicId ID of the Kafka topic
22
* @param serializationSchema User defined key-less serialization schema
23
* @param producerConfig Properties with the producer configuration
24
*/
25
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig);
26
27
/**
28
* Creates a FlinkKafkaProducer with custom partitioning
29
* @param topicId The topic to write data to
30
* @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
31
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument
32
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. If set to null, records will be distributed to Kafka partitions in a round-robin fashion
33
*/
34
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);
35
```
36
37
**Usage Examples:**
38
39
```java
40
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
41
import org.apache.flink.api.common.serialization.SimpleStringSchema;
42
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
43
44
import java.util.Properties;
45
46
// Simple producer with broker list
47
FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(
48
"localhost:9092",
49
"output-topic",
50
new SimpleStringSchema()
51
);
52
53
// Producer with properties configuration
54
Properties props = new Properties();
55
props.setProperty("bootstrap.servers", "localhost:9092");
56
props.setProperty("acks", "all");
57
props.setProperty("retries", "3");
58
59
FlinkKafkaProducer010<String> configuredProducer = new FlinkKafkaProducer010<>(
60
"output-topic",
61
new SimpleStringSchema(),
62
props
63
);
64
65
// Producer with custom partitioner
66
FlinkKafkaProducer010<MyEvent> eventProducer = new FlinkKafkaProducer010<>(
67
"events-topic",
68
new MyEventSerializationSchema(),
69
props,
70
new MyCustomPartitioner<>()
71
);
72
```
73
74
### Key-Value Serialization Producers
75
76
Creates producers that serialize both keys and values, enabling key-based partitioning.
77
78
```java { .api }
79
/**
80
* Creates a FlinkKafkaProducer with key-value serialization using broker list
81
* @param brokerList Comma separated addresses of the brokers
82
* @param topicId ID of the Kafka topic
83
* @param serializationSchema User defined serialization schema supporting key/value messages
84
*/
85
public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema);
86
87
/**
88
* Creates a FlinkKafkaProducer with key-value serialization using properties
89
* @param topicId ID of the Kafka topic
90
* @param serializationSchema User defined serialization schema supporting key/value messages
91
* @param producerConfig Properties with the producer configuration
92
*/
93
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig);
94
95
/**
96
* Creates a FlinkKafkaProducer with key-value serialization and custom partitioning
97
* @param topicId The topic to write data to
98
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
99
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument
100
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. If set to null, records will be partitioned by the key of each record
101
*/
102
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);
103
```
104
105
**Usage Examples:**
106
107
```java
108
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
109
110
// Custom keyed serialization schema
111
KeyedSerializationSchema<MyEvent> keyedSchema = new KeyedSerializationSchema<MyEvent>() {
112
@Override
113
public byte[] serializeKey(MyEvent element) {
114
return element.getUserId().getBytes();
115
}
116
117
@Override
118
public byte[] serializeValue(MyEvent element) {
119
return element.toJson().getBytes();
120
}
121
122
@Override
123
public String getTargetTopic(MyEvent element) {
124
return null; // Use default topic
125
}
126
};
127
128
// Producer with key-value serialization
129
FlinkKafkaProducer010<MyEvent> keyedProducer = new FlinkKafkaProducer010<>(
130
"keyed-events-topic",
131
keyedSchema,
132
props
133
);
134
135
// Producer with custom partitioner for key-value data
136
FlinkKafkaProducer010<MyEvent> customKeyedProducer = new FlinkKafkaProducer010<>(
137
"partitioned-events-topic",
138
keyedSchema,
139
props,
140
new UserIdPartitioner<>()
141
);
142
```
143
144
### Timestamp Configuration
145
146
Configure the producer to write Flink's event time timestamps to Kafka records.
147
148
```java { .api }
149
/**
150
* If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
151
* Timestamps must be positive for Kafka to accept them.
152
* @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka
153
*/
154
public void setWriteTimestampToKafka(boolean writeTimestampToKafka);
155
```
156
157
**Usage Examples:**
158
159
```java
160
FlinkKafkaProducer010<MyEvent> producer = new FlinkKafkaProducer010<>(
161
"timestamped-events",
162
new MyEventSerializationSchema(),
163
props
164
);
165
166
// Enable timestamp writing to Kafka
167
producer.setWriteTimestampToKafka(true);
168
169
// Use in streaming pipeline
170
DataStream<MyEvent> events = env.addSource(new MyEventSource());
171
events.addSink(producer);
172
```
173
174
## Deprecated Factory Methods
175
176
Legacy factory methods for creating producers with timestamp support (deprecated in favor of constructor + setWriteTimestampToKafka).
177
178
```java { .api }
179
/**
180
* @deprecated Use FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties) and call setWriteTimestampToKafka(boolean)
181
*/
182
@Deprecated
183
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(
184
DataStream<T> inStream,
185
String topicId,
186
KeyedSerializationSchema<T> serializationSchema,
187
Properties producerConfig);
188
189
/**
190
* @deprecated Use FlinkKafkaProducer010(String, SerializationSchema, Properties) and call setWriteTimestampToKafka(boolean)
191
*/
192
@Deprecated
193
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(
194
DataStream<T> inStream,
195
String topicId,
196
SerializationSchema<T> serializationSchema,
197
Properties producerConfig);
198
199
/**
200
* @deprecated Use FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner) and call setWriteTimestampToKafka(boolean)
201
*/
202
@Deprecated
203
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(
204
DataStream<T> inStream,
205
String topicId,
206
KeyedSerializationSchema<T> serializationSchema,
207
Properties producerConfig,
208
FlinkKafkaPartitioner<T> customPartitioner);
209
```
210
211
### Configuration Wrapper (Deprecated)
212
213
```java { .api }
214
/**
215
* Configuration wrapper for deprecated timestamp-enabled producer factory methods
216
* @deprecated Use constructor approach instead
217
*/
218
@Deprecated
219
public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {
220
/**
221
* Configure failure logging behavior
222
* @param logFailuresOnly Flag indicating if failures should only be logged instead of causing job failure
223
*/
224
public void setLogFailuresOnly(boolean logFailuresOnly);
225
226
/**
227
* Configure checkpoint flushing behavior
228
* @param flush Flag indicating if producer should flush on checkpoint
229
*/
230
public void setFlushOnCheckpoint(boolean flush);
231
232
/**
233
* Configure timestamp writing to Kafka
234
* @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka
235
*/
236
public void setWriteTimestampToKafka(boolean writeTimestampToKafka);
237
}
238
```
239
240
## Configuration Properties
241
242
### Producer-Specific Properties
243
244
Required properties:
245
- **bootstrap.servers**: Comma-separated list of Kafka broker addresses
246
247
Recommended properties for exactly-once:
248
- **acks**: Set to "all" for maximum durability
249
- **retries**: Number of retries for failed sends (e.g., "3")
250
- **enable.idempotence**: Set to "true" for exactly-once semantics
251
- **max.in.flight.requests.per.connection**: Set to "1" for ordering guarantees
252
253
Performance tuning properties:
254
- **batch.size**: Batch size for grouping records (default: 16384)
255
- **linger.ms**: Time to wait before sending batch (default: 0)
256
- **buffer.memory**: Total memory available for buffering (default: 33554432)
257
- **compression.type**: Compression algorithm ("none", "gzip", "snappy", "lz4", "zstd")
258
259
## Custom Partitioning
260
261
Implement custom partitioning logic by extending FlinkKafkaPartitioner:
262
263
```java
264
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
265
266
public class UserIdPartitioner<T> extends FlinkKafkaPartitioner<MyEvent> {
267
@Override
268
public int partition(MyEvent record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
269
// Partition based on user ID hash
270
return Math.abs(record.getUserId().hashCode()) % partitions.length;
271
}
272
273
@Override
274
public void open(int parallelInstanceId, int parallelInstances) {
275
// Optional: Initialize partitioner
276
}
277
}
278
279
// Use custom partitioner
280
FlinkKafkaProducer010<MyEvent> producer = new FlinkKafkaProducer010<>(
281
"user-events",
282
new MyEventSerializationSchema(),
283
props,
284
new UserIdPartitioner<>()
285
);
286
```
287
288
## Exactly-Once Processing
289
290
Configure the producer for exactly-once processing guarantees:
291
292
```java
293
Properties props = new Properties();
294
props.setProperty("bootstrap.servers", "localhost:9092");
295
props.setProperty("acks", "all");
296
props.setProperty("retries", "3");
297
props.setProperty("enable.idempotence", "true");
298
props.setProperty("max.in.flight.requests.per.connection", "1");
299
300
FlinkKafkaProducer010<String> exactlyOnceProducer = new FlinkKafkaProducer010<>(
301
"exactly-once-topic",
302
new SimpleStringSchema(),
303
props
304
);
305
306
// Enable checkpointing in streaming environment
307
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
308
env.enableCheckpointing(5000); // Checkpoint every 5 seconds
309
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
310
```
311
312
## Error Handling
313
314
The producer handles various error scenarios:
315
316
- **Broker failures**: Automatic reconnection with configurable retries
317
- **Serialization errors**: Configurable failure or logging behavior
318
- **Network partitions**: Buffering and retry mechanisms
319
- **Topic creation**: Automatic topic creation if enabled in Kafka
320
321
## Fault Tolerance
322
323
The producer integrates with Flink's fault tolerance mechanisms:
324
325
1. **Checkpointing**: Producer state is included in Flink checkpoints
326
2. **Recovery**: On restart, exactly-once guarantees are maintained
327
3. **Commit Protocol**: Two-phase commit protocol for exactly-once end-to-end guarantees
328
4. **Flush on Checkpoint**: Ensures all buffered records are committed before checkpoint completion
329
330
```java
331
// Configure fault tolerance
332
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
333
env.enableCheckpointing(5000);
334
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
335
env.getCheckpointConfig().setCheckpointTimeout(60000);
336
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
337
```